refactor packet sending functions

This commit is contained in:
wish
2024-10-10 22:27:12 +11:00
parent 62fe5cf277
commit 830834c5b5
7 changed files with 38 additions and 140 deletions

View File

@@ -20,11 +20,6 @@ import (
"go.uber.org/zap"
)
type packet struct {
data []byte
nonBlocking bool
}
// Session holds state for the channel server connection.
type Session struct {
sync.Mutex
@@ -32,7 +27,7 @@ type Session struct {
server *Server
rawConn net.Conn
cryptConn *network.CryptConn
sendPackets chan packet
sendPackets chan mhfpacket.MHFPacket
lastPacket time.Time
objectIndex uint16
@@ -76,7 +71,7 @@ func NewSession(server *Server, conn net.Conn) *Session {
server: server,
rawConn: conn,
cryptConn: network.NewCryptConn(conn),
sendPackets: make(chan packet, 20),
sendPackets: make(chan mhfpacket.MHFPacket, 20),
lastPacket: time.Now(),
sessionStart: gametime.TimeAdjusted().Unix(),
stageMoveStack: stringstack.New(),
@@ -96,73 +91,36 @@ func (s *Session) Start() {
go s.recvLoop()
}
// QueueSend queues a packet (raw []byte) to be sent.
func (s *Session) QueueSend(data []byte) {
s.logMessage(binary.BigEndian.Uint16(data[0:2]), data, "Server", s.Name)
// QueueSendMHF queues a MHFPacket to be sent.
func (s *Session) QueueSendMHF(pkt mhfpacket.MHFPacket) {
select {
case s.sendPackets <- packet{data, false}:
// Enqueued data
default:
s.logger.Warn("Packet queue too full, flushing!")
var tempPackets []packet
for len(s.sendPackets) > 0 {
tempPacket := <-s.sendPackets
if !tempPacket.nonBlocking {
tempPackets = append(tempPackets, tempPacket)
}
}
for _, tempPacket := range tempPackets {
s.sendPackets <- tempPacket
}
s.sendPackets <- packet{data, false}
}
}
// QueueSendNonBlocking queues a packet (raw []byte) to be sent, dropping the packet entirely if the queue is full.
func (s *Session) QueueSendNonBlocking(data []byte) {
select {
case s.sendPackets <- packet{data, true}:
s.logMessage(binary.BigEndian.Uint16(data[0:2]), data, "Server", s.Name)
case s.sendPackets <- pkt:
default:
s.logger.Warn("Packet queue too full, dropping!")
}
}
// QueueSendMHF queues a MHFPacket to be sent.
func (s *Session) QueueSendMHF(pkt mhfpacket.MHFPacket) {
// Make the header
bf := byteframe.NewByteFrame()
bf.WriteUint16(uint16(pkt.Opcode()))
// Build the packet onto the byteframe.
pkt.Build(bf)
// Queue it.
s.QueueSend(bf.Data())
}
// QueueAck is a helper function to queue an MSG_SYS_ACK with the given ack handle and data.
func (s *Session) QueueAck(ackHandle uint32, data []byte) {
bf := byteframe.NewByteFrame()
bf.WriteUint16(uint16(network.MSG_SYS_ACK))
bf.WriteUint32(ackHandle)
bf.WriteBytes(data)
s.QueueSend(bf.Data())
}
func (s *Session) sendLoop() {
var pkt packet
var pkt mhfpacket.MHFPacket
var buffer []byte
end := &mhfpacket.MsgSysEnd{}
for {
if s.closed {
return
}
for len(s.sendPackets) > 0 {
pkt = <-s.sendPackets
buffer = append(buffer, pkt.data...)
bf := byteframe.NewByteFrame()
bf.WriteUint16(uint16(pkt.Opcode()))
pkt.Build(bf)
s.logMessage(uint16(pkt.Opcode()), bf.Data()[2:], "Server", s.Name)
buffer = append(buffer, bf.Data()...)
}
bf := byteframe.NewByteFrame()
bf.WriteUint16(uint16(end.Opcode()))
buffer = append(buffer, bf.Data()...)
if len(buffer) > 0 {
err := s.cryptConn.SendPacket(append(buffer, []byte{0x00, 0x10}...))
err := s.cryptConn.SendPacket(buffer)
if err != nil {
s.logger.Warn("Failed to send packet")
}