Merge remote-tracking branch 'origin/main'

This commit is contained in:
wish
2025-03-10 11:38:23 +11:00
6 changed files with 34 additions and 31 deletions

View File

@@ -88,7 +88,7 @@ func updateRights(s *Session) {
Rights: s.courses,
UnkSize: 0,
}
s.QueueSendMHF(update)
s.QueueSendMHFNonBlocking(update)
}
func handleMsgHead(s *Session, p mhfpacket.MHFPacket) {}
@@ -192,7 +192,7 @@ func logoutPlayer(s *Session) {
for _, sess := range s.server.sessions {
for rSlot := range stage.reservedClientSlots {
if sess.charID == rSlot && sess.stage != nil && sess.stage.id[3:5] != "Qs" {
sess.QueueSendMHF(&mhfpacket.MsgSysStageDestruct{})
sess.QueueSendMHFNonBlocking(&mhfpacket.MsgSysStageDestruct{})
}
}
}

View File

@@ -82,7 +82,7 @@ func sendServerChatMessage(s *Session, message string) {
RawDataPayload: bf.Data(),
}
s.QueueSendMHF(castedBin)
s.QueueSendMHFNonBlocking(castedBin)
}
func parseChatCommand(s *Session, command string) {
@@ -198,7 +198,7 @@ func parseChatCommand(s *Session, command string) {
temp.Build(deleteNotif, s.clientContext)
}
deleteNotif.WriteUint16(uint16(network.MSG_SYS_END))
s.QueueSend(deleteNotif.Data())
s.QueueSendNonBlocking(deleteNotif.Data())
time.Sleep(500 * time.Millisecond)
reloadNotif := byteframe.NewByteFrame()
for _, session := range s.server.sessions {
@@ -233,7 +233,7 @@ func parseChatCommand(s *Session, command string) {
temp.Build(reloadNotif, s.clientContext)
}
reloadNotif.WriteUint16(uint16(network.MSG_SYS_END))
s.QueueSend(reloadNotif.Data())
s.QueueSendNonBlocking(reloadNotif.Data())
} else {
sendDisabledCommandMessage(s, commands["Reload"])
}
@@ -381,7 +381,7 @@ func parseChatCommand(s *Session, command string) {
payload.WriteInt16(int16(x)) // X
payload.WriteInt16(int16(y)) // Y
payloadBytes := payload.Data()
s.QueueSendMHF(&mhfpacket.MsgSysCastedBinary{
s.QueueSendMHFNonBlocking(&mhfpacket.MsgSysCastedBinary{
CharID: s.charID,
MessageType: BinaryMessageTypeState,
RawDataPayload: payloadBytes,
@@ -539,7 +539,7 @@ func handleMsgSysCastBinary(s *Session, p mhfpacket.MHFPacket) {
char := s.server.FindSessionByCharID(targetID)
if char != nil {
char.QueueSendMHF(resp)
char.QueueSendMHFNonBlocking(resp)
}
}
default:

View File

@@ -185,7 +185,7 @@ func SendMailNotification(s *Session, m *Mail, recipient *Session) {
castedBinary.Build(bf, s.clientContext)
recipient.QueueSendMHF(castedBinary)
recipient.QueueSendMHFNonBlocking(castedBinary)
}
func getCharacterName(s *Session, charID uint32) string {

View File

@@ -129,12 +129,12 @@ func (s *Session) notifyRavi() {
raviNotif.WriteUint16(0x0010) // End it.
if s.server.erupeConfig.GameplayOptions.LowLatencyRaviente {
for session := range sema.clients {
session.QueueSend(raviNotif.Data())
session.QueueSendNonBlocking(raviNotif.Data())
}
} else {
for session := range sema.clients {
if session.charID == s.charID {
session.QueueSend(raviNotif.Data())
session.QueueSendNonBlocking(raviNotif.Data())
}
}
}

View File

@@ -59,7 +59,7 @@ func doStageTransfer(s *Session, ackHandle uint32, stageID string) {
s.Unlock()
// Tell the client to cleanup its current stage objects.
s.QueueSendMHF(&mhfpacket.MsgSysCleanupObject{})
s.QueueSendMHFNonBlocking(&mhfpacket.MsgSysCleanupObject{})
// Confirm the stage entry.
doAckSimpleSucceed(s, ackHandle, []byte{0x00, 0x00, 0x00, 0x00})
@@ -112,9 +112,8 @@ func doStageTransfer(s *Session, ackHandle uint32, stageID string) {
s.stage.RUnlock()
}
newNotif.WriteUint16(0x0010) // End it.
if len(newNotif.Data()) > 2 {
s.QueueSend(newNotif.Data())
s.QueueSendNonBlocking(newNotif.Data())
}
}
@@ -238,7 +237,7 @@ func handleMsgSysUnlockStage(s *Session, p mhfpacket.MHFPacket) {
for charID := range s.reservationStage.reservedClientSlots {
session := s.server.FindSessionByCharID(charID)
if session != nil {
session.QueueSendMHF(&mhfpacket.MsgSysStageDestruct{})
session.QueueSendMHFNonBlocking(&mhfpacket.MsgSysStageDestruct{})
}
}

View File

@@ -104,22 +104,9 @@ func (s *Session) Start() {
// QueueSend queues a packet (raw []byte) to be sent.
func (s *Session) QueueSend(data []byte) {
s.logMessage(binary.BigEndian.Uint16(data[0:2]), data, "Server", s.Name)
select {
case s.sendPackets <- packet{data, false}:
// Enqueued data
default:
s.logger.Warn("Packet queue too full, flushing!")
var tempPackets []packet
for len(s.sendPackets) > 0 {
tempPacket := <-s.sendPackets
if !tempPacket.nonBlocking {
tempPackets = append(tempPackets, tempPacket)
}
}
for _, tempPacket := range tempPackets {
s.sendPackets <- tempPacket
}
s.sendPackets <- packet{data, false}
err := s.cryptConn.SendPacket(append(data, []byte{0x00, 0x10}...))
if err != nil {
s.logger.Warn("Failed to send packet")
}
}
@@ -146,6 +133,19 @@ func (s *Session) QueueSendMHF(pkt mhfpacket.MHFPacket) {
s.QueueSend(bf.Data())
}
// QueueSendMHFNonBlocking queues a MHFPacket to be sent, dropping the packet entirely if the queue is full.
func (s *Session) QueueSendMHFNonBlocking(pkt mhfpacket.MHFPacket) {
// Make the header
bf := byteframe.NewByteFrame()
bf.WriteUint16(uint16(pkt.Opcode()))
// Build the packet onto the byteframe.
pkt.Build(bf, s.clientContext)
// Queue it.
s.QueueSendNonBlocking(bf.Data())
}
// QueueAck is a helper function to queue an MSG_SYS_ACK with the given ack handle and data.
func (s *Session) QueueAck(ackHandle uint32, data []byte) {
bf := byteframe.NewByteFrame()
@@ -158,12 +158,16 @@ func (s *Session) QueueAck(ackHandle uint32, data []byte) {
func (s *Session) sendLoop() {
var pkt packet
for {
var buf []byte
if s.closed {
return
}
for len(s.sendPackets) > 0 {
pkt = <-s.sendPackets
err := s.cryptConn.SendPacket(append(pkt.data, []byte{0x00, 0x10}...))
buf = append(buf, pkt.data...)
}
if len(buf) > 0 {
err := s.cryptConn.SendPacket(append(buf, []byte{0x00, 0x10}...))
if err != nil {
s.logger.Warn("Failed to send packet")
}