packet queue fix proposal

This commit is contained in:
wish
2025-02-18 03:12:09 +11:00
parent b3305d1185
commit d1dfc3fbb1
6 changed files with 34 additions and 31 deletions

View File

@@ -88,7 +88,7 @@ func updateRights(s *Session) {
Rights: s.courses, Rights: s.courses,
UnkSize: 0, UnkSize: 0,
} }
s.QueueSendMHF(update) s.QueueSendMHFNonBlocking(update)
} }
func handleMsgHead(s *Session, p mhfpacket.MHFPacket) {} func handleMsgHead(s *Session, p mhfpacket.MHFPacket) {}
@@ -192,7 +192,7 @@ func logoutPlayer(s *Session) {
for _, sess := range s.server.sessions { for _, sess := range s.server.sessions {
for rSlot := range stage.reservedClientSlots { for rSlot := range stage.reservedClientSlots {
if sess.charID == rSlot && sess.stage != nil && sess.stage.id[3:5] != "Qs" { if sess.charID == rSlot && sess.stage != nil && sess.stage.id[3:5] != "Qs" {
sess.QueueSendMHF(&mhfpacket.MsgSysStageDestruct{}) sess.QueueSendMHFNonBlocking(&mhfpacket.MsgSysStageDestruct{})
} }
} }
} }

View File

@@ -82,7 +82,7 @@ func sendServerChatMessage(s *Session, message string) {
RawDataPayload: bf.Data(), RawDataPayload: bf.Data(),
} }
s.QueueSendMHF(castedBin) s.QueueSendMHFNonBlocking(castedBin)
} }
func parseChatCommand(s *Session, command string) { func parseChatCommand(s *Session, command string) {
@@ -198,7 +198,7 @@ func parseChatCommand(s *Session, command string) {
temp.Build(deleteNotif, s.clientContext) temp.Build(deleteNotif, s.clientContext)
} }
deleteNotif.WriteUint16(uint16(network.MSG_SYS_END)) deleteNotif.WriteUint16(uint16(network.MSG_SYS_END))
s.QueueSend(deleteNotif.Data()) s.QueueSendNonBlocking(deleteNotif.Data())
time.Sleep(500 * time.Millisecond) time.Sleep(500 * time.Millisecond)
reloadNotif := byteframe.NewByteFrame() reloadNotif := byteframe.NewByteFrame()
for _, session := range s.server.sessions { for _, session := range s.server.sessions {
@@ -233,7 +233,7 @@ func parseChatCommand(s *Session, command string) {
temp.Build(reloadNotif, s.clientContext) temp.Build(reloadNotif, s.clientContext)
} }
reloadNotif.WriteUint16(uint16(network.MSG_SYS_END)) reloadNotif.WriteUint16(uint16(network.MSG_SYS_END))
s.QueueSend(reloadNotif.Data()) s.QueueSendNonBlocking(reloadNotif.Data())
} else { } else {
sendDisabledCommandMessage(s, commands["Reload"]) sendDisabledCommandMessage(s, commands["Reload"])
} }
@@ -381,7 +381,7 @@ func parseChatCommand(s *Session, command string) {
payload.WriteInt16(int16(x)) // X payload.WriteInt16(int16(x)) // X
payload.WriteInt16(int16(y)) // Y payload.WriteInt16(int16(y)) // Y
payloadBytes := payload.Data() payloadBytes := payload.Data()
s.QueueSendMHF(&mhfpacket.MsgSysCastedBinary{ s.QueueSendMHFNonBlocking(&mhfpacket.MsgSysCastedBinary{
CharID: s.charID, CharID: s.charID,
MessageType: BinaryMessageTypeState, MessageType: BinaryMessageTypeState,
RawDataPayload: payloadBytes, RawDataPayload: payloadBytes,
@@ -539,7 +539,7 @@ func handleMsgSysCastBinary(s *Session, p mhfpacket.MHFPacket) {
char := s.server.FindSessionByCharID(targetID) char := s.server.FindSessionByCharID(targetID)
if char != nil { if char != nil {
char.QueueSendMHF(resp) char.QueueSendMHFNonBlocking(resp)
} }
} }
default: default:

View File

@@ -185,7 +185,7 @@ func SendMailNotification(s *Session, m *Mail, recipient *Session) {
castedBinary.Build(bf, s.clientContext) castedBinary.Build(bf, s.clientContext)
recipient.QueueSendMHF(castedBinary) recipient.QueueSendMHFNonBlocking(castedBinary)
} }
func getCharacterName(s *Session, charID uint32) string { func getCharacterName(s *Session, charID uint32) string {

View File

@@ -129,12 +129,12 @@ func (s *Session) notifyRavi() {
raviNotif.WriteUint16(0x0010) // End it. raviNotif.WriteUint16(0x0010) // End it.
if s.server.erupeConfig.GameplayOptions.LowLatencyRaviente { if s.server.erupeConfig.GameplayOptions.LowLatencyRaviente {
for session := range sema.clients { for session := range sema.clients {
session.QueueSend(raviNotif.Data()) session.QueueSendNonBlocking(raviNotif.Data())
} }
} else { } else {
for session := range sema.clients { for session := range sema.clients {
if session.charID == s.charID { if session.charID == s.charID {
session.QueueSend(raviNotif.Data()) session.QueueSendNonBlocking(raviNotif.Data())
} }
} }
} }

View File

@@ -59,7 +59,7 @@ func doStageTransfer(s *Session, ackHandle uint32, stageID string) {
s.Unlock() s.Unlock()
// Tell the client to cleanup its current stage objects. // Tell the client to cleanup its current stage objects.
s.QueueSendMHF(&mhfpacket.MsgSysCleanupObject{}) s.QueueSendMHFNonBlocking(&mhfpacket.MsgSysCleanupObject{})
// Confirm the stage entry. // Confirm the stage entry.
doAckSimpleSucceed(s, ackHandle, []byte{0x00, 0x00, 0x00, 0x00}) doAckSimpleSucceed(s, ackHandle, []byte{0x00, 0x00, 0x00, 0x00})
@@ -112,9 +112,8 @@ func doStageTransfer(s *Session, ackHandle uint32, stageID string) {
s.stage.RUnlock() s.stage.RUnlock()
} }
newNotif.WriteUint16(0x0010) // End it.
if len(newNotif.Data()) > 2 { if len(newNotif.Data()) > 2 {
s.QueueSend(newNotif.Data()) s.QueueSendNonBlocking(newNotif.Data())
} }
} }
@@ -238,7 +237,7 @@ func handleMsgSysUnlockStage(s *Session, p mhfpacket.MHFPacket) {
for charID := range s.reservationStage.reservedClientSlots { for charID := range s.reservationStage.reservedClientSlots {
session := s.server.FindSessionByCharID(charID) session := s.server.FindSessionByCharID(charID)
if session != nil { if session != nil {
session.QueueSendMHF(&mhfpacket.MsgSysStageDestruct{}) session.QueueSendMHFNonBlocking(&mhfpacket.MsgSysStageDestruct{})
} }
} }

View File

@@ -104,22 +104,9 @@ func (s *Session) Start() {
// QueueSend queues a packet (raw []byte) to be sent. // QueueSend queues a packet (raw []byte) to be sent.
func (s *Session) QueueSend(data []byte) { func (s *Session) QueueSend(data []byte) {
s.logMessage(binary.BigEndian.Uint16(data[0:2]), data, "Server", s.Name) s.logMessage(binary.BigEndian.Uint16(data[0:2]), data, "Server", s.Name)
select { err := s.cryptConn.SendPacket(append(data, []byte{0x00, 0x10}...))
case s.sendPackets <- packet{data, false}: if err != nil {
// Enqueued data s.logger.Warn("Failed to send packet")
default:
s.logger.Warn("Packet queue too full, flushing!")
var tempPackets []packet
for len(s.sendPackets) > 0 {
tempPacket := <-s.sendPackets
if !tempPacket.nonBlocking {
tempPackets = append(tempPackets, tempPacket)
}
}
for _, tempPacket := range tempPackets {
s.sendPackets <- tempPacket
}
s.sendPackets <- packet{data, false}
} }
} }
@@ -146,6 +133,19 @@ func (s *Session) QueueSendMHF(pkt mhfpacket.MHFPacket) {
s.QueueSend(bf.Data()) s.QueueSend(bf.Data())
} }
// QueueSendMHFNonBlocking queues a MHFPacket to be sent, dropping the packet entirely if the queue is full.
func (s *Session) QueueSendMHFNonBlocking(pkt mhfpacket.MHFPacket) {
// Make the header
bf := byteframe.NewByteFrame()
bf.WriteUint16(uint16(pkt.Opcode()))
// Build the packet onto the byteframe.
pkt.Build(bf, s.clientContext)
// Queue it.
s.QueueSendNonBlocking(bf.Data())
}
// QueueAck is a helper function to queue an MSG_SYS_ACK with the given ack handle and data. // QueueAck is a helper function to queue an MSG_SYS_ACK with the given ack handle and data.
func (s *Session) QueueAck(ackHandle uint32, data []byte) { func (s *Session) QueueAck(ackHandle uint32, data []byte) {
bf := byteframe.NewByteFrame() bf := byteframe.NewByteFrame()
@@ -158,12 +158,16 @@ func (s *Session) QueueAck(ackHandle uint32, data []byte) {
func (s *Session) sendLoop() { func (s *Session) sendLoop() {
var pkt packet var pkt packet
for { for {
var buf []byte
if s.closed { if s.closed {
return return
} }
for len(s.sendPackets) > 0 { for len(s.sendPackets) > 0 {
pkt = <-s.sendPackets pkt = <-s.sendPackets
err := s.cryptConn.SendPacket(append(pkt.data, []byte{0x00, 0x10}...)) buf = append(buf, pkt.data...)
}
if len(buf) > 0 {
err := s.cryptConn.SendPacket(append(buf, []byte{0x00, 0x10}...))
if err != nil { if err != nil {
s.logger.Warn("Failed to send packet") s.logger.Warn("Failed to send packet")
} }