address packet queueing issues

This commit is contained in:
wish
2022-10-12 01:41:29 +11:00
parent f2e697f3e7
commit 0bbb90a927
2 changed files with 61 additions and 47 deletions

View File

@@ -182,11 +182,19 @@ func handleMsgSysLogout(s *Session, p mhfpacket.MHFPacket) {
}
func logoutPlayer(s *Session) {
s.server.Lock()
if _, exists := s.server.sessions[s.rawConn]; exists {
delete(s.server.sessions, s.rawConn)
s.rawConn.Close()
} else {
return // Prevent re-running logout logic on real logouts
}
s.rawConn.Close()
s.server.Unlock()
for _, stage := range s.server.stages {
for session := range stage.clients {
if session.charID == s.charID {
delete(stage.clients, session)
}
}
}
_, err := s.server.db.Exec("UPDATE sign_sessions SET server_id=NULL, char_id=NULL WHERE token=$1", s.token)
@@ -1794,9 +1802,7 @@ func handleMsgMhfGetLobbyCrowd(s *Session, p mhfpacket.MHFPacket) {
// It can be worried about later if we ever get to the point where there are
// full servers to actually need to migrate people from and empty ones to
pkt := p.(*mhfpacket.MsgMhfGetLobbyCrowd)
blankData := make([]byte, 0x320)
doAckBufSucceed(s, pkt.AckHandle, blankData)
doAckSimpleSucceed(s, pkt.AckHandle, []byte{0x00, 0x00, 0x00, 0x00})
doAckBufSucceed(s, pkt.AckHandle, make([]byte, 0x320))
}
func handleMsgMhfGetTrendWeapon(s *Session, p mhfpacket.MHFPacket) {

View File

@@ -1,11 +1,13 @@
package channelserver
import (
"encoding/binary"
"encoding/hex"
"fmt"
"io"
"net"
"sync"
"time"
"erupe-ce/common/byteframe"
"erupe-ce/common/stringstack"
@@ -17,6 +19,11 @@ import (
"golang.org/x/text/encoding/japanese"
)
type packet struct {
data []byte
nonBlocking bool
}
// Session holds state for the channel server connection.
type Session struct {
sync.Mutex
@@ -24,11 +31,10 @@ type Session struct {
server *Server
rawConn net.Conn
cryptConn *network.CryptConn
sendPackets chan []byte
sendPackets chan packet
clientContext *clientctx.ClientContext
userEnteredStage bool // If the user has entered a stage before
myseries MySeries
stageID string
stage *Stage
reservationStage *Stage // Required for the stateful MsgSysUnreserveStage packet.
@@ -55,18 +61,8 @@ type Session struct {
mailList []int
// For Debuging
Name string
}
type MySeries struct {
houseTier []byte
houseData []byte
bookshelfData []byte
galleryData []byte
toreData []byte
gardenData []byte
state uint8
password string
Name string
closed bool
}
// NewSession creates a new Session type.
@@ -76,15 +72,14 @@ func NewSession(server *Server, conn net.Conn) *Session {
server: server,
rawConn: conn,
cryptConn: network.NewCryptConn(conn),
sendPackets: make(chan []byte, 20),
sendPackets: make(chan packet, 20),
clientContext: &clientctx.ClientContext{
StrConv: &stringsupport.StringConverter{
Encoding: japanese.ShiftJIS,
},
},
userEnteredStage: false,
sessionStart: Time_Current_Adjusted().Unix(),
stageMoveStack: stringstack.New(),
sessionStart: Time_Current_Adjusted().Unix(),
stageMoveStack: stringstack.New(),
}
return s
}
@@ -102,19 +97,34 @@ func (s *Session) Start() {
// QueueSend queues a packet (raw []byte) to be sent.
func (s *Session) QueueSend(data []byte) {
bf := byteframe.NewByteFrameFromBytes(data[:2])
s.logMessage(bf.ReadUint16(), data, "Server", s.Name)
s.sendPackets <- data
s.logMessage(binary.BigEndian.Uint16(data[0:2]), data, "Server", s.Name)
select {
case s.sendPackets <- packet{data, false}:
// Enqueued data
default:
s.logger.Warn("Packet queue too full, flushing!")
var tempPackets []packet
for len(s.sendPackets) > 0 {
tempPacket := <-s.sendPackets
if !tempPacket.nonBlocking {
tempPackets = append(tempPackets, tempPacket)
}
}
for _, tempPacket := range tempPackets {
s.sendPackets <- tempPacket
}
s.sendPackets <- packet{data, false}
}
}
// QueueSendNonBlocking queues a packet (raw []byte) to be sent, dropping the packet entirely if the queue is full.
func (s *Session) QueueSendNonBlocking(data []byte) {
select {
case s.sendPackets <- data:
// Enqueued properly.
case s.sendPackets <- packet{data, true}:
// Enqueued data
default:
// Couldn't enqueue, likely something wrong with the connection.
s.logger.Warn("Dropped packet for session because of full send buffer, something is probably wrong")
s.logger.Warn("Packet queue too full, dropping!")
// Queue too full
}
}
@@ -142,29 +152,25 @@ func (s *Session) QueueAck(ackHandle uint32, data []byte) {
func (s *Session) sendLoop() {
for {
// TODO(Andoryuuta): Test making this into a buffered channel and grouping the packet together before sending.
rawPacket := <-s.sendPackets
if rawPacket == nil {
s.logger.Debug("Got nil from s.SendPackets, exiting send loop")
if s.closed {
return
}
// Make a copy of the data.
terminatedPacket := make([]byte, len(rawPacket))
copy(terminatedPacket, rawPacket)
// Append the MSG_SYS_END tailing opcode.
terminatedPacket = append(terminatedPacket, []byte{0x00, 0x10}...)
s.cryptConn.SendPacket(terminatedPacket)
pkt := <-s.sendPackets
err := s.cryptConn.SendPacket(append(pkt.data, []byte{0x00, 0x10}...))
if err != nil {
s.logger.Warn("Failed to send packet")
}
time.Sleep(10 * time.Millisecond)
}
}
func (s *Session) recvLoop() {
for {
if s.closed {
logoutPlayer(s)
return
}
pkt, err := s.cryptConn.ReadPacket()
if err == io.EOF {
s.logger.Info(fmt.Sprintf("[%s] Disconnected", s.Name))
logoutPlayer(s)
@@ -176,6 +182,7 @@ func (s *Session) recvLoop() {
return
}
s.handlePacketGroup(pkt)
time.Sleep(10 * time.Millisecond)
}
}
@@ -195,7 +202,8 @@ func (s *Session) handlePacketGroup(pktGroup []byte) {
s.logMessage(opcodeUint16, pktGroup, s.Name, "Server")
if opcode == network.MSG_SYS_LOGOUT {
s.rawConn.Close()
s.closed = true
return
}
// Get the packet parser and handler for this opcode.
mhfPkt := mhfpacket.FromOpcode(opcode)