reimplement lazy packets

This commit is contained in:
wish
2024-10-28 23:48:49 +11:00
parent 67a5dc412c
commit db4120bb85
11 changed files with 57 additions and 36 deletions

View File

@@ -192,10 +192,10 @@ func GetMailByID(ID int) (*Mail, error) {
} }
type SessionMail interface { type SessionMail interface {
QueueSendMHF(packet mhfpacket.MHFPacket) QueueSendMHFLazy(packet mhfpacket.MHFPacket)
} }
func SendMailNotification(s SessionMail, m *Mail, recipient SessionMail) { func SendMailNotification(m *Mail, recipient SessionMail) {
bf := byteframe.NewByteFrame() bf := byteframe.NewByteFrame()
notification := &binpacket.MsgBinMailNotify{ notification := &binpacket.MsgBinMailNotify{
@@ -213,7 +213,7 @@ func SendMailNotification(s SessionMail, m *Mail, recipient SessionMail) {
castedBinary.Build(bf) castedBinary.Build(bf)
recipient.QueueSendMHF(castedBinary) recipient.QueueSendMHFLazy(castedBinary)
} }
func getCharacterName(charID uint32) string { func getCharacterName(charID uint32) string {

View File

@@ -7,7 +7,7 @@ import (
) )
type SessionStage interface { type SessionStage interface {
QueueSendMHF(packet mhfpacket.MHFPacket) QueueSendMHFLazy(packet mhfpacket.MHFPacket)
GetCharID() uint32 GetCharID() uint32
GetName() string GetName() string
} }
@@ -77,7 +77,7 @@ func (s *Stage) BroadcastMHF(pkt mhfpacket.MHFPacket, ignoredSession SessionStag
if session == ignoredSession { if session == ignoredSession {
continue continue
} }
session.QueueSendMHF(pkt) session.QueueSendMHFLazy(pkt)
} }
} }

View File

@@ -151,14 +151,14 @@ func reload(s *Session, _ []string) error {
continue continue
} }
temp = &mhfpacket.MsgSysDeleteObject{ObjID: object.Id} temp = &mhfpacket.MsgSysDeleteObject{ObjID: object.Id}
s.QueueSendMHF(temp) s.QueueSendMHFLazy(temp)
} }
for _, session := range s.Server.sessions { for _, session := range s.Server.sessions {
if s == session { if s == session {
continue continue
} }
temp = &mhfpacket.MsgSysDeleteUser{CharID: session.CharID} temp = &mhfpacket.MsgSysDeleteUser{CharID: session.CharID}
s.QueueSendMHF(temp) s.QueueSendMHFLazy(temp)
} }
time.Sleep(500 * time.Millisecond) time.Sleep(500 * time.Millisecond)
for _, session := range s.Server.sessions { for _, session := range s.Server.sessions {
@@ -166,13 +166,13 @@ func reload(s *Session, _ []string) error {
continue continue
} }
temp = &mhfpacket.MsgSysInsertUser{CharID: session.CharID} temp = &mhfpacket.MsgSysInsertUser{CharID: session.CharID}
s.QueueSendMHF(temp) s.QueueSendMHFLazy(temp)
for i := 0; i < 3; i++ { for i := 0; i < 3; i++ {
temp = &mhfpacket.MsgSysNotifyUserBinary{ temp = &mhfpacket.MsgSysNotifyUserBinary{
CharID: session.CharID, CharID: session.CharID,
BinaryType: uint8(i + 1), BinaryType: uint8(i + 1),
} }
s.QueueSendMHF(temp) s.QueueSendMHFLazy(temp)
} }
} }
for _, obj := range s.stage.Objects { for _, obj := range s.stage.Objects {
@@ -187,7 +187,7 @@ func reload(s *Session, _ []string) error {
Unk0: 0, Unk0: 0,
OwnerCharID: obj.OwnerCharID, OwnerCharID: obj.OwnerCharID,
} }
s.QueueSendMHF(temp) s.QueueSendMHFLazy(temp)
} }
return nil return nil
} }
@@ -356,7 +356,7 @@ func teleport(s *Session, args []string) error {
payload.WriteUint8(2) // SetState type(position == 2) payload.WriteUint8(2) // SetState type(position == 2)
payload.WriteInt16(int16(x)) payload.WriteInt16(int16(x))
payload.WriteInt16(int16(y)) payload.WriteInt16(int16(y))
s.QueueSendMHF(&mhfpacket.MsgSysCastedBinary{ s.QueueSendMHFLazy(&mhfpacket.MsgSysCastedBinary{
CharID: s.CharID, CharID: s.CharID,
MessageType: constant.BinaryMessageTypeState, MessageType: constant.BinaryMessageTypeState,
RawDataPayload: payload.Data(), RawDataPayload: payload.Data(),

View File

@@ -53,7 +53,7 @@ func updateRights(s *Session) {
Rights: s.courses, Rights: s.courses,
UnkSize: 0, UnkSize: 0,
} }
s.QueueSendMHF(update) s.QueueSendMHFLazy(update)
} }
func handleMsgHead(s *Session, db *sqlx.DB, p mhfpacket.MHFPacket) {} func handleMsgHead(s *Session, db *sqlx.DB, p mhfpacket.MHFPacket) {}
@@ -161,7 +161,7 @@ func logoutPlayer(s *Session) {
for _, sess := range s.Server.sessions { for _, sess := range s.Server.sessions {
for rSlot := range stage.ReservedClientSlots { for rSlot := range stage.ReservedClientSlots {
if sess.CharID == rSlot && sess.stage != nil && sess.stage.Id[3:5] != "Qs" { if sess.CharID == rSlot && sess.stage != nil && sess.stage.Id[3:5] != "Qs" {
sess.QueueSendMHF(&mhfpacket.MsgSysStageDestruct{}) sess.QueueSendMHFLazy(&mhfpacket.MsgSysStageDestruct{})
} }
} }
} }
@@ -246,7 +246,7 @@ func handleMsgSysTime(s *Session, db *sqlx.DB, p mhfpacket.MHFPacket) {
GetRemoteTime: false, GetRemoteTime: false,
Timestamp: uint32(gametime.TimeAdjusted().Unix()), // JP timezone Timestamp: uint32(gametime.TimeAdjusted().Unix()), // JP timezone
} }
s.QueueSendMHF(resp) s.QueueSendMHFLazy(resp)
s.notifyRavi() s.notifyRavi()
} }

View File

@@ -61,7 +61,7 @@ func sendServerChatMessage(s *Session, message string) {
RawDataPayload: bf.Data(), RawDataPayload: bf.Data(),
} }
s.QueueSendMHF(castedBin) s.QueueSendMHFLazy(castedBin)
} }
func handleMsgSysCastBinary(s *Session, db *sqlx.DB, p mhfpacket.MHFPacket) { func handleMsgSysCastBinary(s *Session, db *sqlx.DB, p mhfpacket.MHFPacket) {
@@ -185,7 +185,7 @@ func handleMsgSysCastBinary(s *Session, db *sqlx.DB, p mhfpacket.MHFPacket) {
char := s.Server.FindSessionByCharID(targetID) char := s.Server.FindSessionByCharID(targetID)
if char != nil { if char != nil {
char.QueueSendMHF(resp) char.QueueSendMHFLazy(resp)
} }
} }
default: default:

View File

@@ -324,7 +324,7 @@ func HandleMsgMhfOperateGuildMember(s *Session, db *sqlx.DB, p mhfpacket.MHFPack
for _, channel := range s.Server.Channels { for _, channel := range s.Server.Channels {
for _, session := range channel.sessions { for _, session := range channel.sessions {
if session.CharID == pkt.CharID { if session.CharID == pkt.CharID {
service.SendMailNotification(s, &mail, session) service.SendMailNotification(&mail, session)
} }
} }
} }

View File

@@ -123,12 +123,12 @@ func (s *Session) notifyRavi() {
temp = &mhfpacket.MsgSysLoadRegister{RegisterID: uint32(0x40000 + i*0x10000)} temp = &mhfpacket.MsgSysLoadRegister{RegisterID: uint32(0x40000 + i*0x10000)}
if config.GetConfig().GameplayOptions.LowLatencyRaviente { if config.GetConfig().GameplayOptions.LowLatencyRaviente {
for session := range sema.clients { for session := range sema.clients {
session.QueueSendMHF(temp) session.QueueSendMHFLazy(temp)
} }
} else { } else {
for session := range sema.clients { for session := range sema.clients {
if session.CharID == s.CharID { if session.CharID == s.CharID {
session.QueueSendMHF(temp) session.QueueSendMHFLazy(temp)
} }
} }
} }

View File

@@ -63,7 +63,7 @@ func doStageTransfer(s *Session, ackHandle uint32, stageID string) {
s.Unlock() s.Unlock()
// Tell the client to cleanup its current stage objects. // Tell the client to cleanup its current stage objects.
s.QueueSendMHF(&mhfpacket.MsgSysCleanupObject{}) s.QueueSendMHFLazy(&mhfpacket.MsgSysCleanupObject{})
// Confirm the stage entry. // Confirm the stage entry.
s.DoAckSimpleSucceed(ackHandle, []byte{0x00, 0x00, 0x00, 0x00}) s.DoAckSimpleSucceed(ackHandle, []byte{0x00, 0x00, 0x00, 0x00})
@@ -79,13 +79,13 @@ func doStageTransfer(s *Session, ackHandle uint32, stageID string) {
continue continue
} }
temp = &mhfpacket.MsgSysInsertUser{CharID: session.CharID} temp = &mhfpacket.MsgSysInsertUser{CharID: session.CharID}
s.QueueSendMHF(temp) s.QueueSendMHFLazy(temp)
for i := 0; i < 3; i++ { for i := 0; i < 3; i++ {
temp = &mhfpacket.MsgSysNotifyUserBinary{ temp = &mhfpacket.MsgSysNotifyUserBinary{
CharID: session.CharID, CharID: session.CharID,
BinaryType: uint8(i + 1), BinaryType: uint8(i + 1),
} }
s.QueueSendMHF(temp) s.QueueSendMHFLazy(temp)
} }
} }
} }
@@ -106,7 +106,7 @@ func doStageTransfer(s *Session, ackHandle uint32, stageID string) {
Unk0: 0, Unk0: 0,
OwnerCharID: obj.OwnerCharID, OwnerCharID: obj.OwnerCharID,
} }
s.QueueSendMHF(temp) s.QueueSendMHFLazy(temp)
} }
s.stage.RUnlock() s.stage.RUnlock()
} }
@@ -232,7 +232,7 @@ func handleMsgSysUnlockStage(s *Session, db *sqlx.DB, p mhfpacket.MHFPacket) {
for charID := range s.reservationStage.ReservedClientSlots { for charID := range s.reservationStage.ReservedClientSlots {
session := s.Server.FindSessionByCharID(charID) session := s.Server.FindSessionByCharID(charID)
if session != nil { if session != nil {
session.QueueSendMHF(&mhfpacket.MsgSysStageDestruct{}) session.QueueSendMHFLazy(&mhfpacket.MsgSysStageDestruct{})
} }
} }

View File

@@ -19,7 +19,7 @@ func (server *ChannelServer) BroadcastMHF(pkt mhfpacket.MHFPacket, ignoredSessio
if session == ignoredSession { if session == ignoredSession {
continue continue
} }
session.QueueSendMHF(pkt) session.QueueSendMHFLazy(pkt)
} }
} }

View File

@@ -43,6 +43,6 @@ func (s *Semaphore) BroadcastMHF(pkt mhfpacket.MHFPacket, ignoredSession *Sessio
if session == ignoredSession { if session == ignoredSession {
continue continue
} }
session.QueueSendMHF(pkt) session.QueueSendMHFLazy(pkt)
} }
} }

View File

@@ -32,7 +32,7 @@ type Session struct {
Server *ChannelServer Server *ChannelServer
rawConn net.Conn rawConn net.Conn
cryptConn *network.CryptConn cryptConn *network.CryptConn
sendPackets chan mhfpacket.MHFPacket sendPackets chan queuedMHFPacket
lastPacket time.Time lastPacket time.Time
objectIndex uint16 objectIndex uint16
@@ -69,6 +69,11 @@ type Session struct {
ackStart map[uint32]time.Time ackStart map[uint32]time.Time
} }
type queuedMHFPacket struct {
Packet mhfpacket.MHFPacket
Lazy bool
}
// NewSession creates a new Session type. // NewSession creates a new Session type.
func NewSession(server *ChannelServer, conn net.Conn) *Session { func NewSession(server *ChannelServer, conn net.Conn) *Session {
s := &Session{ s := &Session{
@@ -76,7 +81,7 @@ func NewSession(server *ChannelServer, conn net.Conn) *Session {
Server: server, Server: server,
rawConn: conn, rawConn: conn,
cryptConn: network.NewCryptConn(conn), cryptConn: network.NewCryptConn(conn),
sendPackets: make(chan mhfpacket.MHFPacket, 20), sendPackets: make(chan queuedMHFPacket, 20),
lastPacket: time.Now(), lastPacket: time.Now(),
sessionStart: gametime.TimeAdjusted().Unix(), sessionStart: gametime.TimeAdjusted().Unix(),
stageMoveStack: stringstack.New(), stageMoveStack: stringstack.New(),
@@ -99,28 +104,43 @@ func (s *Session) Start() {
// QueueSendMHF queues a MHFPacket to be sent. // QueueSendMHF queues a MHFPacket to be sent.
func (s *Session) QueueSendMHF(pkt mhfpacket.MHFPacket) { func (s *Session) QueueSendMHF(pkt mhfpacket.MHFPacket) {
select { select {
case s.sendPackets <- pkt: case s.sendPackets <- queuedMHFPacket{Packet: pkt}:
default:
s.Logger.Warn("Packet queue too full, dropping!")
}
}
func (s *Session) QueueSendMHFLazy(pkt mhfpacket.MHFPacket) {
qp := queuedMHFPacket{Packet: pkt, Lazy: true}
select {
case s.sendPackets <- qp:
default: default:
s.Logger.Warn("Packet queue too full, dropping!") s.Logger.Warn("Packet queue too full, dropping!")
} }
} }
func (s *Session) sendLoop() { func (s *Session) sendLoop() {
var pkt mhfpacket.MHFPacket var qp queuedMHFPacket
var buffer []byte var buffer []byte
var lazybuffer []byte
end := &mhfpacket.MsgSysEnd{} end := &mhfpacket.MsgSysEnd{}
for { for {
if s.closed { if s.closed {
return return
} }
for len(s.sendPackets) > 0 { for len(s.sendPackets) > 0 {
pkt = <-s.sendPackets qp = <-s.sendPackets
bf := byteframe.NewByteFrame() bf := byteframe.NewByteFrame()
bf.WriteUint16(uint16(pkt.Opcode())) bf.WriteUint16(uint16(qp.Packet.Opcode()))
pkt.Build(bf) qp.Packet.Build(bf)
s.logMessage(uint16(pkt.Opcode()), bf.Data()[2:], "Server", s.Name) s.logMessage(uint16(qp.Packet.Opcode()), bf.Data()[2:], "Server", s.Name)
if qp.Lazy {
lazybuffer = append(lazybuffer, bf.Data()...)
} else {
buffer = append(buffer, bf.Data()...) buffer = append(buffer, bf.Data()...)
} }
}
buffer = append(buffer, lazybuffer...)
bf := byteframe.NewByteFrame() bf := byteframe.NewByteFrame()
bf.WriteUint16(uint16(end.Opcode())) bf.WriteUint16(uint16(end.Opcode()))
buffer = append(buffer, bf.Data()...) buffer = append(buffer, bf.Data()...)
@@ -130,6 +150,7 @@ func (s *Session) sendLoop() {
s.Logger.Warn("Failed to send packet") s.Logger.Warn("Failed to send packet")
} }
buffer = buffer[:0] buffer = buffer[:0]
lazybuffer = lazybuffer[:0]
} }
time.Sleep(time.Duration(config.GetConfig().LoopDelay) * time.Millisecond) time.Sleep(time.Duration(config.GetConfig().LoopDelay) * time.Millisecond)
} }
@@ -275,7 +296,7 @@ func (s *Session) sendMessage(message string) {
MessageType: constant.BinaryMessageTypeChat, MessageType: constant.BinaryMessageTypeChat,
RawDataPayload: bf.Data(), RawDataPayload: bf.Data(),
} }
s.QueueSendMHF(castedBin) s.QueueSendMHFLazy(castedBin)
} }
func (s *Session) SetObjectID() { func (s *Session) SetObjectID() {