diff --git a/server/channelserver/handlers.go b/server/channelserver/handlers.go index c0d141f15..da357700d 100644 --- a/server/channelserver/handlers.go +++ b/server/channelserver/handlers.go @@ -88,7 +88,7 @@ func updateRights(s *Session) { Rights: s.courses, UnkSize: 0, } - s.QueueSendMHF(update) + s.QueueSendMHFNonBlocking(update) } func handleMsgHead(s *Session, p mhfpacket.MHFPacket) {} @@ -192,7 +192,7 @@ func logoutPlayer(s *Session) { for _, sess := range s.server.sessions { for rSlot := range stage.reservedClientSlots { if sess.charID == rSlot && sess.stage != nil && sess.stage.id[3:5] != "Qs" { - sess.QueueSendMHF(&mhfpacket.MsgSysStageDestruct{}) + sess.QueueSendMHFNonBlocking(&mhfpacket.MsgSysStageDestruct{}) } } } diff --git a/server/channelserver/handlers_cast_binary.go b/server/channelserver/handlers_cast_binary.go index e43ff3b1d..a3f2ecfb6 100644 --- a/server/channelserver/handlers_cast_binary.go +++ b/server/channelserver/handlers_cast_binary.go @@ -82,7 +82,7 @@ func sendServerChatMessage(s *Session, message string) { RawDataPayload: bf.Data(), } - s.QueueSendMHF(castedBin) + s.QueueSendMHFNonBlocking(castedBin) } func parseChatCommand(s *Session, command string) { @@ -198,7 +198,7 @@ func parseChatCommand(s *Session, command string) { temp.Build(deleteNotif, s.clientContext) } deleteNotif.WriteUint16(uint16(network.MSG_SYS_END)) - s.QueueSend(deleteNotif.Data()) + s.QueueSendNonBlocking(deleteNotif.Data()) time.Sleep(500 * time.Millisecond) reloadNotif := byteframe.NewByteFrame() for _, session := range s.server.sessions { @@ -233,7 +233,7 @@ func parseChatCommand(s *Session, command string) { temp.Build(reloadNotif, s.clientContext) } reloadNotif.WriteUint16(uint16(network.MSG_SYS_END)) - s.QueueSend(reloadNotif.Data()) + s.QueueSendNonBlocking(reloadNotif.Data()) } else { sendDisabledCommandMessage(s, commands["Reload"]) } @@ -381,7 +381,7 @@ func parseChatCommand(s *Session, command string) { payload.WriteInt16(int16(x)) // X payload.WriteInt16(int16(y)) // Y payloadBytes := payload.Data() - s.QueueSendMHF(&mhfpacket.MsgSysCastedBinary{ + s.QueueSendMHFNonBlocking(&mhfpacket.MsgSysCastedBinary{ CharID: s.charID, MessageType: BinaryMessageTypeState, RawDataPayload: payloadBytes, @@ -539,7 +539,7 @@ func handleMsgSysCastBinary(s *Session, p mhfpacket.MHFPacket) { char := s.server.FindSessionByCharID(targetID) if char != nil { - char.QueueSendMHF(resp) + char.QueueSendMHFNonBlocking(resp) } } default: diff --git a/server/channelserver/handlers_mail.go b/server/channelserver/handlers_mail.go index a596d927b..41f721a1e 100644 --- a/server/channelserver/handlers_mail.go +++ b/server/channelserver/handlers_mail.go @@ -185,7 +185,7 @@ func SendMailNotification(s *Session, m *Mail, recipient *Session) { castedBinary.Build(bf, s.clientContext) - recipient.QueueSendMHF(castedBinary) + recipient.QueueSendMHFNonBlocking(castedBinary) } func getCharacterName(s *Session, charID uint32) string { diff --git a/server/channelserver/handlers_register.go b/server/channelserver/handlers_register.go index 3d47c3633..895e1c096 100644 --- a/server/channelserver/handlers_register.go +++ b/server/channelserver/handlers_register.go @@ -129,12 +129,12 @@ func (s *Session) notifyRavi() { raviNotif.WriteUint16(0x0010) // End it. if s.server.erupeConfig.GameplayOptions.LowLatencyRaviente { for session := range sema.clients { - session.QueueSend(raviNotif.Data()) + session.QueueSendNonBlocking(raviNotif.Data()) } } else { for session := range sema.clients { if session.charID == s.charID { - session.QueueSend(raviNotif.Data()) + session.QueueSendNonBlocking(raviNotif.Data()) } } } diff --git a/server/channelserver/handlers_stage.go b/server/channelserver/handlers_stage.go index 8a1abbc35..d0468fb8c 100644 --- a/server/channelserver/handlers_stage.go +++ b/server/channelserver/handlers_stage.go @@ -59,7 +59,7 @@ func doStageTransfer(s *Session, ackHandle uint32, stageID string) { s.Unlock() // Tell the client to cleanup its current stage objects. - s.QueueSendMHF(&mhfpacket.MsgSysCleanupObject{}) + s.QueueSendMHFNonBlocking(&mhfpacket.MsgSysCleanupObject{}) // Confirm the stage entry. doAckSimpleSucceed(s, ackHandle, []byte{0x00, 0x00, 0x00, 0x00}) @@ -112,9 +112,8 @@ func doStageTransfer(s *Session, ackHandle uint32, stageID string) { s.stage.RUnlock() } - newNotif.WriteUint16(0x0010) // End it. if len(newNotif.Data()) > 2 { - s.QueueSend(newNotif.Data()) + s.QueueSendNonBlocking(newNotif.Data()) } } @@ -238,7 +237,7 @@ func handleMsgSysUnlockStage(s *Session, p mhfpacket.MHFPacket) { for charID := range s.reservationStage.reservedClientSlots { session := s.server.FindSessionByCharID(charID) if session != nil { - session.QueueSendMHF(&mhfpacket.MsgSysStageDestruct{}) + session.QueueSendMHFNonBlocking(&mhfpacket.MsgSysStageDestruct{}) } } diff --git a/server/channelserver/sys_session.go b/server/channelserver/sys_session.go index a13d0d4ce..93bb359b1 100644 --- a/server/channelserver/sys_session.go +++ b/server/channelserver/sys_session.go @@ -104,22 +104,9 @@ func (s *Session) Start() { // QueueSend queues a packet (raw []byte) to be sent. func (s *Session) QueueSend(data []byte) { s.logMessage(binary.BigEndian.Uint16(data[0:2]), data, "Server", s.Name) - select { - case s.sendPackets <- packet{data, false}: - // Enqueued data - default: - s.logger.Warn("Packet queue too full, flushing!") - var tempPackets []packet - for len(s.sendPackets) > 0 { - tempPacket := <-s.sendPackets - if !tempPacket.nonBlocking { - tempPackets = append(tempPackets, tempPacket) - } - } - for _, tempPacket := range tempPackets { - s.sendPackets <- tempPacket - } - s.sendPackets <- packet{data, false} + err := s.cryptConn.SendPacket(append(data, []byte{0x00, 0x10}...)) + if err != nil { + s.logger.Warn("Failed to send packet") } } @@ -146,6 +133,19 @@ func (s *Session) QueueSendMHF(pkt mhfpacket.MHFPacket) { s.QueueSend(bf.Data()) } +// QueueSendMHFNonBlocking queues a MHFPacket to be sent, dropping the packet entirely if the queue is full. +func (s *Session) QueueSendMHFNonBlocking(pkt mhfpacket.MHFPacket) { + // Make the header + bf := byteframe.NewByteFrame() + bf.WriteUint16(uint16(pkt.Opcode())) + + // Build the packet onto the byteframe. + pkt.Build(bf, s.clientContext) + + // Queue it. + s.QueueSendNonBlocking(bf.Data()) +} + // QueueAck is a helper function to queue an MSG_SYS_ACK with the given ack handle and data. func (s *Session) QueueAck(ackHandle uint32, data []byte) { bf := byteframe.NewByteFrame() @@ -158,12 +158,16 @@ func (s *Session) QueueAck(ackHandle uint32, data []byte) { func (s *Session) sendLoop() { var pkt packet for { + var buf []byte if s.closed { return } for len(s.sendPackets) > 0 { pkt = <-s.sendPackets - err := s.cryptConn.SendPacket(append(pkt.data, []byte{0x00, 0x10}...)) + buf = append(buf, pkt.data...) + } + if len(buf) > 0 { + err := s.cryptConn.SendPacket(append(buf, []byte{0x00, 0x10}...)) if err != nil { s.logger.Warn("Failed to send packet") }