diff --git a/server/channelserver/handlers_cast_binary.go b/server/channelserver/handlers_cast_binary.go index ecdf09d79..cf51b6188 100644 --- a/server/channelserver/handlers_cast_binary.go +++ b/server/channelserver/handlers_cast_binary.go @@ -4,7 +4,6 @@ import ( "crypto/rand" "encoding/hex" _config "erupe-ce/config" - "erupe-ce/network" "erupe-ce/network/binpacket" "erupe-ce/network/mhfpacket" "erupe-ce/utils/byteframe" @@ -181,41 +180,33 @@ func parseChatCommand(s *Session, command string) { if commands["Reload"].Enabled || s.isOp() { sendServerChatMessage(s, s.server.i18n.commands.reload) var temp mhfpacket.MHFPacket - deleteNotif := byteframe.NewByteFrame() for _, object := range s.stage.objects { if object.ownerCharID == s.charID { continue } temp = &mhfpacket.MsgSysDeleteObject{ObjID: object.id} - deleteNotif.WriteUint16(uint16(temp.Opcode())) - temp.Build(deleteNotif) + s.QueueSendMHF(temp) } for _, session := range s.server.sessions { if s == session { continue } temp = &mhfpacket.MsgSysDeleteUser{CharID: session.charID} - deleteNotif.WriteUint16(uint16(temp.Opcode())) - temp.Build(deleteNotif) + s.QueueSendMHF(temp) } - deleteNotif.WriteUint16(uint16(network.MSG_SYS_END)) - s.QueueSend(deleteNotif.Data()) time.Sleep(500 * time.Millisecond) - reloadNotif := byteframe.NewByteFrame() for _, session := range s.server.sessions { if s == session { continue } temp = &mhfpacket.MsgSysInsertUser{CharID: session.charID} - reloadNotif.WriteUint16(uint16(temp.Opcode())) - temp.Build(reloadNotif) + s.QueueSendMHF(temp) for i := 0; i < 3; i++ { temp = &mhfpacket.MsgSysNotifyUserBinary{ CharID: session.charID, BinaryType: uint8(i + 1), } - reloadNotif.WriteUint16(uint16(temp.Opcode())) - temp.Build(reloadNotif) + s.QueueSendMHF(temp) } } for _, obj := range s.stage.objects { @@ -230,11 +221,8 @@ func parseChatCommand(s *Session, command string) { Unk0: 0, OwnerCharID: obj.ownerCharID, } - reloadNotif.WriteUint16(uint16(temp.Opcode())) - temp.Build(reloadNotif) + s.QueueSendMHF(temp) } - reloadNotif.WriteUint16(uint16(network.MSG_SYS_END)) - s.QueueSend(reloadNotif.Data()) } else { sendDisabledCommandMessage(s, commands["Reload"]) } diff --git a/server/channelserver/handlers_register.go b/server/channelserver/handlers_register.go index a7f1509f2..9e5d40aab 100644 --- a/server/channelserver/handlers_register.go +++ b/server/channelserver/handlers_register.go @@ -116,25 +116,17 @@ func (s *Session) notifyRavi() { return } var temp mhfpacket.MHFPacket - raviNotif := byteframe.NewByteFrame() - temp = &mhfpacket.MsgSysNotifyRegister{RegisterID: 0x40000} - raviNotif.WriteUint16(uint16(temp.Opcode())) - temp.Build(raviNotif) - temp = &mhfpacket.MsgSysNotifyRegister{RegisterID: 0x50000} - raviNotif.WriteUint16(uint16(temp.Opcode())) - temp.Build(raviNotif) - temp = &mhfpacket.MsgSysNotifyRegister{RegisterID: 0x60000} - raviNotif.WriteUint16(uint16(temp.Opcode())) - temp.Build(raviNotif) - raviNotif.WriteUint16(0x0010) // End it. - if s.server.erupeConfig.GameplayOptions.LowLatencyRaviente { - for session := range sema.clients { - session.QueueSend(raviNotif.Data()) - } - } else { - for session := range sema.clients { - if session.charID == s.charID { - session.QueueSend(raviNotif.Data()) + for i := 0; i < 3; i++ { + temp = &mhfpacket.MsgSysLoadRegister{RegisterID: uint32(0x40000 + i*0x10000)} + if s.server.erupeConfig.GameplayOptions.LowLatencyRaviente { + for session := range sema.clients { + session.QueueSendMHF(temp) + } + } else { + for session := range sema.clients { + if session.charID == s.charID { + session.QueueSendMHF(temp) + } } } } diff --git a/server/channelserver/handlers_stage.go b/server/channelserver/handlers_stage.go index 0813e7096..107ab2b85 100644 --- a/server/channelserver/handlers_stage.go +++ b/server/channelserver/handlers_stage.go @@ -66,7 +66,6 @@ func doStageTransfer(s *Session, ackHandle uint32, stageID string) { doAckSimpleSucceed(s, ackHandle, []byte{0x00, 0x00, 0x00, 0x00}) var temp mhfpacket.MHFPacket - newNotif := byteframe.NewByteFrame() // Cast existing user data to new user if !s.userEnteredStage { @@ -77,15 +76,13 @@ func doStageTransfer(s *Session, ackHandle uint32, stageID string) { continue } temp = &mhfpacket.MsgSysInsertUser{CharID: session.charID} - newNotif.WriteUint16(uint16(temp.Opcode())) - temp.Build(newNotif) + s.QueueSendMHF(temp) for i := 0; i < 3; i++ { temp = &mhfpacket.MsgSysNotifyUserBinary{ CharID: session.charID, BinaryType: uint8(i + 1), } - newNotif.WriteUint16(uint16(temp.Opcode())) - temp.Build(newNotif) + s.QueueSendMHF(temp) } } } @@ -94,7 +91,6 @@ func doStageTransfer(s *Session, ackHandle uint32, stageID string) { // Notify the client to duplicate the existing objects. s.logger.Info(fmt.Sprintf("Sending existing stage objects to %s", s.Name)) s.stage.RLock() - var temp mhfpacket.MHFPacket for _, obj := range s.stage.objects { if obj.ownerCharID == s.charID { continue @@ -107,16 +103,10 @@ func doStageTransfer(s *Session, ackHandle uint32, stageID string) { Unk0: 0, OwnerCharID: obj.ownerCharID, } - newNotif.WriteUint16(uint16(temp.Opcode())) - temp.Build(newNotif) + s.QueueSendMHF(temp) } s.stage.RUnlock() } - - newNotif.WriteUint16(0x0010) // End it. - if len(newNotif.Data()) > 2 { - s.QueueSend(newNotif.Data()) - } } func destructEmptyStages(s *Session) { diff --git a/server/channelserver/sys_channel_server.go b/server/channelserver/sys_channel_server.go index bf007811a..456962680 100644 --- a/server/channelserver/sys_channel_server.go +++ b/server/channelserver/sys_channel_server.go @@ -288,16 +288,7 @@ func (s *Server) BroadcastMHF(pkt mhfpacket.MHFPacket, ignoredSession *Session) if session == ignoredSession { continue } - - // Make the header - bf := byteframe.NewByteFrame() - bf.WriteUint16(uint16(pkt.Opcode())) - - // Build the packet onto the byteframe. - pkt.Build(bf) - - // Enqueue in a non-blocking way that drops the packet if the connections send buffer channel is full. - session.QueueSendNonBlocking(bf.Data()) + session.QueueSendMHF(pkt) } } diff --git a/server/channelserver/sys_semaphore.go b/server/channelserver/sys_semaphore.go index 56dabb277..a8d3f9405 100644 --- a/server/channelserver/sys_semaphore.go +++ b/server/channelserver/sys_semaphore.go @@ -2,8 +2,6 @@ package channelserver import ( "erupe-ce/network/mhfpacket" - "erupe-ce/utils/byteframe" - "sync" ) @@ -45,15 +43,6 @@ func (s *Semaphore) BroadcastMHF(pkt mhfpacket.MHFPacket, ignoredSession *Sessio if session == ignoredSession { continue } - - // Make the header - bf := byteframe.NewByteFrame() - bf.WriteUint16(uint16(pkt.Opcode())) - - // Build the packet onto the byteframe. - pkt.Build(bf) - - // Enqueue in a non-blocking way that drops the packet if the connections send buffer channel is full. - session.QueueSendNonBlocking(bf.Data()) + session.QueueSendMHF(pkt) } } diff --git a/server/channelserver/sys_session.go b/server/channelserver/sys_session.go index 48fa947de..aefd5f864 100644 --- a/server/channelserver/sys_session.go +++ b/server/channelserver/sys_session.go @@ -20,11 +20,6 @@ import ( "go.uber.org/zap" ) -type packet struct { - data []byte - nonBlocking bool -} - // Session holds state for the channel server connection. type Session struct { sync.Mutex @@ -32,7 +27,7 @@ type Session struct { server *Server rawConn net.Conn cryptConn *network.CryptConn - sendPackets chan packet + sendPackets chan mhfpacket.MHFPacket lastPacket time.Time objectIndex uint16 @@ -76,7 +71,7 @@ func NewSession(server *Server, conn net.Conn) *Session { server: server, rawConn: conn, cryptConn: network.NewCryptConn(conn), - sendPackets: make(chan packet, 20), + sendPackets: make(chan mhfpacket.MHFPacket, 20), lastPacket: time.Now(), sessionStart: gametime.TimeAdjusted().Unix(), stageMoveStack: stringstack.New(), @@ -96,73 +91,36 @@ func (s *Session) Start() { go s.recvLoop() } -// QueueSend queues a packet (raw []byte) to be sent. -func (s *Session) QueueSend(data []byte) { - s.logMessage(binary.BigEndian.Uint16(data[0:2]), data, "Server", s.Name) +// QueueSendMHF queues a MHFPacket to be sent. +func (s *Session) QueueSendMHF(pkt mhfpacket.MHFPacket) { select { - case s.sendPackets <- packet{data, false}: - // Enqueued data - default: - s.logger.Warn("Packet queue too full, flushing!") - var tempPackets []packet - for len(s.sendPackets) > 0 { - tempPacket := <-s.sendPackets - if !tempPacket.nonBlocking { - tempPackets = append(tempPackets, tempPacket) - } - } - for _, tempPacket := range tempPackets { - s.sendPackets <- tempPacket - } - s.sendPackets <- packet{data, false} - } -} - -// QueueSendNonBlocking queues a packet (raw []byte) to be sent, dropping the packet entirely if the queue is full. -func (s *Session) QueueSendNonBlocking(data []byte) { - select { - case s.sendPackets <- packet{data, true}: - s.logMessage(binary.BigEndian.Uint16(data[0:2]), data, "Server", s.Name) + case s.sendPackets <- pkt: default: s.logger.Warn("Packet queue too full, dropping!") } } -// QueueSendMHF queues a MHFPacket to be sent. -func (s *Session) QueueSendMHF(pkt mhfpacket.MHFPacket) { - // Make the header - bf := byteframe.NewByteFrame() - bf.WriteUint16(uint16(pkt.Opcode())) - - // Build the packet onto the byteframe. - pkt.Build(bf) - - // Queue it. - s.QueueSend(bf.Data()) -} - -// QueueAck is a helper function to queue an MSG_SYS_ACK with the given ack handle and data. -func (s *Session) QueueAck(ackHandle uint32, data []byte) { - bf := byteframe.NewByteFrame() - bf.WriteUint16(uint16(network.MSG_SYS_ACK)) - bf.WriteUint32(ackHandle) - bf.WriteBytes(data) - s.QueueSend(bf.Data()) -} - func (s *Session) sendLoop() { - var pkt packet + var pkt mhfpacket.MHFPacket var buffer []byte + end := &mhfpacket.MsgSysEnd{} for { if s.closed { return } for len(s.sendPackets) > 0 { pkt = <-s.sendPackets - buffer = append(buffer, pkt.data...) + bf := byteframe.NewByteFrame() + bf.WriteUint16(uint16(pkt.Opcode())) + pkt.Build(bf) + s.logMessage(uint16(pkt.Opcode()), bf.Data()[2:], "Server", s.Name) + buffer = append(buffer, bf.Data()...) } + bf := byteframe.NewByteFrame() + bf.WriteUint16(uint16(end.Opcode())) + buffer = append(buffer, bf.Data()...) if len(buffer) > 0 { - err := s.cryptConn.SendPacket(append(buffer, []byte{0x00, 0x10}...)) + err := s.cryptConn.SendPacket(buffer) if err != nil { s.logger.Warn("Failed to send packet") } diff --git a/server/channelserver/sys_stage.go b/server/channelserver/sys_stage.go index 7b4192b42..b6435eed2 100644 --- a/server/channelserver/sys_stage.go +++ b/server/channelserver/sys_stage.go @@ -4,7 +4,6 @@ import ( "sync" "erupe-ce/network/mhfpacket" - "erupe-ce/utils/byteframe" ) // Object holds infomation about a specific object. @@ -72,16 +71,7 @@ func (s *Stage) BroadcastMHF(pkt mhfpacket.MHFPacket, ignoredSession *Session) { if session == ignoredSession { continue } - - // Make the header - bf := byteframe.NewByteFrame() - bf.WriteUint16(uint16(pkt.Opcode())) - - // Build the packet onto the byteframe. - pkt.Build(bf) - - // Enqueue in a non-blocking way that drops the packet if the connections send buffer channel is full. - session.QueueSendNonBlocking(bf.Data()) + session.QueueSendMHF(pkt) } }