From db4120bb8522fafb767833b431a621fc08504c14 Mon Sep 17 00:00:00 2001 From: wish Date: Mon, 28 Oct 2024 23:48:49 +1100 Subject: [PATCH] reimplement lazy packets --- internal/service/mail.go | 6 +-- internal/system/stage.go | 4 +- server/channelserver/chat_commands.go | 12 +++--- server/channelserver/handlers.go | 6 +-- server/channelserver/handlers_cast_binary.go | 4 +- server/channelserver/handlers_guild.go | 2 +- server/channelserver/handlers_register.go | 4 +- server/channelserver/handlers_stage.go | 10 ++--- server/channelserver/sys_broadcast.go | 2 +- server/channelserver/sys_semaphore.go | 2 +- server/channelserver/sys_session.go | 41 +++++++++++++++----- 11 files changed, 57 insertions(+), 36 deletions(-) diff --git a/internal/service/mail.go b/internal/service/mail.go index d3a3aa681..7d384e4d6 100644 --- a/internal/service/mail.go +++ b/internal/service/mail.go @@ -192,10 +192,10 @@ func GetMailByID(ID int) (*Mail, error) { } type SessionMail interface { - QueueSendMHF(packet mhfpacket.MHFPacket) + QueueSendMHFLazy(packet mhfpacket.MHFPacket) } -func SendMailNotification(s SessionMail, m *Mail, recipient SessionMail) { +func SendMailNotification(m *Mail, recipient SessionMail) { bf := byteframe.NewByteFrame() notification := &binpacket.MsgBinMailNotify{ @@ -213,7 +213,7 @@ func SendMailNotification(s SessionMail, m *Mail, recipient SessionMail) { castedBinary.Build(bf) - recipient.QueueSendMHF(castedBinary) + recipient.QueueSendMHFLazy(castedBinary) } func getCharacterName(charID uint32) string { diff --git a/internal/system/stage.go b/internal/system/stage.go index 93e4e92ae..a30ab4c08 100644 --- a/internal/system/stage.go +++ b/internal/system/stage.go @@ -7,7 +7,7 @@ import ( ) type SessionStage interface { - QueueSendMHF(packet mhfpacket.MHFPacket) + QueueSendMHFLazy(packet mhfpacket.MHFPacket) GetCharID() uint32 GetName() string } @@ -77,7 +77,7 @@ func (s *Stage) BroadcastMHF(pkt mhfpacket.MHFPacket, ignoredSession SessionStag if session == ignoredSession { continue } - session.QueueSendMHF(pkt) + session.QueueSendMHFLazy(pkt) } } diff --git a/server/channelserver/chat_commands.go b/server/channelserver/chat_commands.go index 55dd03861..cdeb7fb0c 100644 --- a/server/channelserver/chat_commands.go +++ b/server/channelserver/chat_commands.go @@ -151,14 +151,14 @@ func reload(s *Session, _ []string) error { continue } temp = &mhfpacket.MsgSysDeleteObject{ObjID: object.Id} - s.QueueSendMHF(temp) + s.QueueSendMHFLazy(temp) } for _, session := range s.Server.sessions { if s == session { continue } temp = &mhfpacket.MsgSysDeleteUser{CharID: session.CharID} - s.QueueSendMHF(temp) + s.QueueSendMHFLazy(temp) } time.Sleep(500 * time.Millisecond) for _, session := range s.Server.sessions { @@ -166,13 +166,13 @@ func reload(s *Session, _ []string) error { continue } temp = &mhfpacket.MsgSysInsertUser{CharID: session.CharID} - s.QueueSendMHF(temp) + s.QueueSendMHFLazy(temp) for i := 0; i < 3; i++ { temp = &mhfpacket.MsgSysNotifyUserBinary{ CharID: session.CharID, BinaryType: uint8(i + 1), } - s.QueueSendMHF(temp) + s.QueueSendMHFLazy(temp) } } for _, obj := range s.stage.Objects { @@ -187,7 +187,7 @@ func reload(s *Session, _ []string) error { Unk0: 0, OwnerCharID: obj.OwnerCharID, } - s.QueueSendMHF(temp) + s.QueueSendMHFLazy(temp) } return nil } @@ -356,7 +356,7 @@ func teleport(s *Session, args []string) error { payload.WriteUint8(2) // SetState type(position == 2) payload.WriteInt16(int16(x)) payload.WriteInt16(int16(y)) - s.QueueSendMHF(&mhfpacket.MsgSysCastedBinary{ + s.QueueSendMHFLazy(&mhfpacket.MsgSysCastedBinary{ CharID: s.CharID, MessageType: constant.BinaryMessageTypeState, RawDataPayload: payload.Data(), diff --git a/server/channelserver/handlers.go b/server/channelserver/handlers.go index bb19b4707..9615f12b3 100644 --- a/server/channelserver/handlers.go +++ b/server/channelserver/handlers.go @@ -53,7 +53,7 @@ func updateRights(s *Session) { Rights: s.courses, UnkSize: 0, } - s.QueueSendMHF(update) + s.QueueSendMHFLazy(update) } func handleMsgHead(s *Session, db *sqlx.DB, p mhfpacket.MHFPacket) {} @@ -161,7 +161,7 @@ func logoutPlayer(s *Session) { for _, sess := range s.Server.sessions { for rSlot := range stage.ReservedClientSlots { if sess.CharID == rSlot && sess.stage != nil && sess.stage.Id[3:5] != "Qs" { - sess.QueueSendMHF(&mhfpacket.MsgSysStageDestruct{}) + sess.QueueSendMHFLazy(&mhfpacket.MsgSysStageDestruct{}) } } } @@ -246,7 +246,7 @@ func handleMsgSysTime(s *Session, db *sqlx.DB, p mhfpacket.MHFPacket) { GetRemoteTime: false, Timestamp: uint32(gametime.TimeAdjusted().Unix()), // JP timezone } - s.QueueSendMHF(resp) + s.QueueSendMHFLazy(resp) s.notifyRavi() } diff --git a/server/channelserver/handlers_cast_binary.go b/server/channelserver/handlers_cast_binary.go index f61593429..bef2f2262 100644 --- a/server/channelserver/handlers_cast_binary.go +++ b/server/channelserver/handlers_cast_binary.go @@ -61,7 +61,7 @@ func sendServerChatMessage(s *Session, message string) { RawDataPayload: bf.Data(), } - s.QueueSendMHF(castedBin) + s.QueueSendMHFLazy(castedBin) } func handleMsgSysCastBinary(s *Session, db *sqlx.DB, p mhfpacket.MHFPacket) { @@ -185,7 +185,7 @@ func handleMsgSysCastBinary(s *Session, db *sqlx.DB, p mhfpacket.MHFPacket) { char := s.Server.FindSessionByCharID(targetID) if char != nil { - char.QueueSendMHF(resp) + char.QueueSendMHFLazy(resp) } } default: diff --git a/server/channelserver/handlers_guild.go b/server/channelserver/handlers_guild.go index 82680b32c..7a75cb7f6 100644 --- a/server/channelserver/handlers_guild.go +++ b/server/channelserver/handlers_guild.go @@ -324,7 +324,7 @@ func HandleMsgMhfOperateGuildMember(s *Session, db *sqlx.DB, p mhfpacket.MHFPack for _, channel := range s.Server.Channels { for _, session := range channel.sessions { if session.CharID == pkt.CharID { - service.SendMailNotification(s, &mail, session) + service.SendMailNotification(&mail, session) } } } diff --git a/server/channelserver/handlers_register.go b/server/channelserver/handlers_register.go index 5650fd561..b4a8702bc 100644 --- a/server/channelserver/handlers_register.go +++ b/server/channelserver/handlers_register.go @@ -123,12 +123,12 @@ func (s *Session) notifyRavi() { temp = &mhfpacket.MsgSysLoadRegister{RegisterID: uint32(0x40000 + i*0x10000)} if config.GetConfig().GameplayOptions.LowLatencyRaviente { for session := range sema.clients { - session.QueueSendMHF(temp) + session.QueueSendMHFLazy(temp) } } else { for session := range sema.clients { if session.CharID == s.CharID { - session.QueueSendMHF(temp) + session.QueueSendMHFLazy(temp) } } } diff --git a/server/channelserver/handlers_stage.go b/server/channelserver/handlers_stage.go index 81e8f66f4..c2fc8535c 100644 --- a/server/channelserver/handlers_stage.go +++ b/server/channelserver/handlers_stage.go @@ -63,7 +63,7 @@ func doStageTransfer(s *Session, ackHandle uint32, stageID string) { s.Unlock() // Tell the client to cleanup its current stage objects. - s.QueueSendMHF(&mhfpacket.MsgSysCleanupObject{}) + s.QueueSendMHFLazy(&mhfpacket.MsgSysCleanupObject{}) // Confirm the stage entry. s.DoAckSimpleSucceed(ackHandle, []byte{0x00, 0x00, 0x00, 0x00}) @@ -79,13 +79,13 @@ func doStageTransfer(s *Session, ackHandle uint32, stageID string) { continue } temp = &mhfpacket.MsgSysInsertUser{CharID: session.CharID} - s.QueueSendMHF(temp) + s.QueueSendMHFLazy(temp) for i := 0; i < 3; i++ { temp = &mhfpacket.MsgSysNotifyUserBinary{ CharID: session.CharID, BinaryType: uint8(i + 1), } - s.QueueSendMHF(temp) + s.QueueSendMHFLazy(temp) } } } @@ -106,7 +106,7 @@ func doStageTransfer(s *Session, ackHandle uint32, stageID string) { Unk0: 0, OwnerCharID: obj.OwnerCharID, } - s.QueueSendMHF(temp) + s.QueueSendMHFLazy(temp) } s.stage.RUnlock() } @@ -232,7 +232,7 @@ func handleMsgSysUnlockStage(s *Session, db *sqlx.DB, p mhfpacket.MHFPacket) { for charID := range s.reservationStage.ReservedClientSlots { session := s.Server.FindSessionByCharID(charID) if session != nil { - session.QueueSendMHF(&mhfpacket.MsgSysStageDestruct{}) + session.QueueSendMHFLazy(&mhfpacket.MsgSysStageDestruct{}) } } diff --git a/server/channelserver/sys_broadcast.go b/server/channelserver/sys_broadcast.go index 518e5f691..f7a12f17f 100644 --- a/server/channelserver/sys_broadcast.go +++ b/server/channelserver/sys_broadcast.go @@ -19,7 +19,7 @@ func (server *ChannelServer) BroadcastMHF(pkt mhfpacket.MHFPacket, ignoredSessio if session == ignoredSession { continue } - session.QueueSendMHF(pkt) + session.QueueSendMHFLazy(pkt) } } diff --git a/server/channelserver/sys_semaphore.go b/server/channelserver/sys_semaphore.go index a8d3f9405..43a726a06 100644 --- a/server/channelserver/sys_semaphore.go +++ b/server/channelserver/sys_semaphore.go @@ -43,6 +43,6 @@ func (s *Semaphore) BroadcastMHF(pkt mhfpacket.MHFPacket, ignoredSession *Sessio if session == ignoredSession { continue } - session.QueueSendMHF(pkt) + session.QueueSendMHFLazy(pkt) } } diff --git a/server/channelserver/sys_session.go b/server/channelserver/sys_session.go index 4c3ec2864..d30662832 100644 --- a/server/channelserver/sys_session.go +++ b/server/channelserver/sys_session.go @@ -32,7 +32,7 @@ type Session struct { Server *ChannelServer rawConn net.Conn cryptConn *network.CryptConn - sendPackets chan mhfpacket.MHFPacket + sendPackets chan queuedMHFPacket lastPacket time.Time objectIndex uint16 @@ -69,6 +69,11 @@ type Session struct { ackStart map[uint32]time.Time } +type queuedMHFPacket struct { + Packet mhfpacket.MHFPacket + Lazy bool +} + // NewSession creates a new Session type. func NewSession(server *ChannelServer, conn net.Conn) *Session { s := &Session{ @@ -76,7 +81,7 @@ func NewSession(server *ChannelServer, conn net.Conn) *Session { Server: server, rawConn: conn, cryptConn: network.NewCryptConn(conn), - sendPackets: make(chan mhfpacket.MHFPacket, 20), + sendPackets: make(chan queuedMHFPacket, 20), lastPacket: time.Now(), sessionStart: gametime.TimeAdjusted().Unix(), stageMoveStack: stringstack.New(), @@ -99,28 +104,43 @@ func (s *Session) Start() { // QueueSendMHF queues a MHFPacket to be sent. func (s *Session) QueueSendMHF(pkt mhfpacket.MHFPacket) { select { - case s.sendPackets <- pkt: + case s.sendPackets <- queuedMHFPacket{Packet: pkt}: + default: + s.Logger.Warn("Packet queue too full, dropping!") + } +} + +func (s *Session) QueueSendMHFLazy(pkt mhfpacket.MHFPacket) { + qp := queuedMHFPacket{Packet: pkt, Lazy: true} + select { + case s.sendPackets <- qp: default: s.Logger.Warn("Packet queue too full, dropping!") } } func (s *Session) sendLoop() { - var pkt mhfpacket.MHFPacket + var qp queuedMHFPacket var buffer []byte + var lazybuffer []byte end := &mhfpacket.MsgSysEnd{} for { if s.closed { return } for len(s.sendPackets) > 0 { - pkt = <-s.sendPackets + qp = <-s.sendPackets bf := byteframe.NewByteFrame() - bf.WriteUint16(uint16(pkt.Opcode())) - pkt.Build(bf) - s.logMessage(uint16(pkt.Opcode()), bf.Data()[2:], "Server", s.Name) - buffer = append(buffer, bf.Data()...) + bf.WriteUint16(uint16(qp.Packet.Opcode())) + qp.Packet.Build(bf) + s.logMessage(uint16(qp.Packet.Opcode()), bf.Data()[2:], "Server", s.Name) + if qp.Lazy { + lazybuffer = append(lazybuffer, bf.Data()...) + } else { + buffer = append(buffer, bf.Data()...) + } } + buffer = append(buffer, lazybuffer...) bf := byteframe.NewByteFrame() bf.WriteUint16(uint16(end.Opcode())) buffer = append(buffer, bf.Data()...) @@ -130,6 +150,7 @@ func (s *Session) sendLoop() { s.Logger.Warn("Failed to send packet") } buffer = buffer[:0] + lazybuffer = lazybuffer[:0] } time.Sleep(time.Duration(config.GetConfig().LoopDelay) * time.Millisecond) } @@ -275,7 +296,7 @@ func (s *Session) sendMessage(message string) { MessageType: constant.BinaryMessageTypeChat, RawDataPayload: bf.Data(), } - s.QueueSendMHF(castedBin) + s.QueueSendMHFLazy(castedBin) } func (s *Session) SetObjectID() {