Merge pull request #43 from ZeruLight/packet-fix

address packet queueing issues
This commit is contained in:
wish
2022-10-12 05:07:40 +11:00
committed by GitHub
2 changed files with 61 additions and 47 deletions

View File

@@ -182,11 +182,19 @@ func handleMsgSysLogout(s *Session, p mhfpacket.MHFPacket) {
} }
func logoutPlayer(s *Session) { func logoutPlayer(s *Session) {
s.server.Lock()
if _, exists := s.server.sessions[s.rawConn]; exists { if _, exists := s.server.sessions[s.rawConn]; exists {
delete(s.server.sessions, s.rawConn) delete(s.server.sessions, s.rawConn)
s.rawConn.Close() }
} else { s.rawConn.Close()
return // Prevent re-running logout logic on real logouts s.server.Unlock()
for _, stage := range s.server.stages {
for session := range stage.clients {
if session.charID == s.charID {
delete(stage.clients, session)
}
}
} }
_, err := s.server.db.Exec("UPDATE sign_sessions SET server_id=NULL, char_id=NULL WHERE token=$1", s.token) _, err := s.server.db.Exec("UPDATE sign_sessions SET server_id=NULL, char_id=NULL WHERE token=$1", s.token)
@@ -1794,9 +1802,7 @@ func handleMsgMhfGetLobbyCrowd(s *Session, p mhfpacket.MHFPacket) {
// It can be worried about later if we ever get to the point where there are // It can be worried about later if we ever get to the point where there are
// full servers to actually need to migrate people from and empty ones to // full servers to actually need to migrate people from and empty ones to
pkt := p.(*mhfpacket.MsgMhfGetLobbyCrowd) pkt := p.(*mhfpacket.MsgMhfGetLobbyCrowd)
blankData := make([]byte, 0x320) doAckBufSucceed(s, pkt.AckHandle, make([]byte, 0x320))
doAckBufSucceed(s, pkt.AckHandle, blankData)
doAckSimpleSucceed(s, pkt.AckHandle, []byte{0x00, 0x00, 0x00, 0x00})
} }
func handleMsgMhfGetTrendWeapon(s *Session, p mhfpacket.MHFPacket) { func handleMsgMhfGetTrendWeapon(s *Session, p mhfpacket.MHFPacket) {

View File

@@ -1,11 +1,13 @@
package channelserver package channelserver
import ( import (
"encoding/binary"
"encoding/hex" "encoding/hex"
"fmt" "fmt"
"io" "io"
"net" "net"
"sync" "sync"
"time"
"erupe-ce/common/byteframe" "erupe-ce/common/byteframe"
"erupe-ce/common/stringstack" "erupe-ce/common/stringstack"
@@ -17,6 +19,11 @@ import (
"golang.org/x/text/encoding/japanese" "golang.org/x/text/encoding/japanese"
) )
type packet struct {
data []byte
nonBlocking bool
}
// Session holds state for the channel server connection. // Session holds state for the channel server connection.
type Session struct { type Session struct {
sync.Mutex sync.Mutex
@@ -24,11 +31,10 @@ type Session struct {
server *Server server *Server
rawConn net.Conn rawConn net.Conn
cryptConn *network.CryptConn cryptConn *network.CryptConn
sendPackets chan []byte sendPackets chan packet
clientContext *clientctx.ClientContext clientContext *clientctx.ClientContext
userEnteredStage bool // If the user has entered a stage before userEnteredStage bool // If the user has entered a stage before
myseries MySeries
stageID string stageID string
stage *Stage stage *Stage
reservationStage *Stage // Required for the stateful MsgSysUnreserveStage packet. reservationStage *Stage // Required for the stateful MsgSysUnreserveStage packet.
@@ -55,18 +61,8 @@ type Session struct {
mailList []int mailList []int
// For Debuging // For Debuging
Name string Name string
} closed bool
type MySeries struct {
houseTier []byte
houseData []byte
bookshelfData []byte
galleryData []byte
toreData []byte
gardenData []byte
state uint8
password string
} }
// NewSession creates a new Session type. // NewSession creates a new Session type.
@@ -76,15 +72,14 @@ func NewSession(server *Server, conn net.Conn) *Session {
server: server, server: server,
rawConn: conn, rawConn: conn,
cryptConn: network.NewCryptConn(conn), cryptConn: network.NewCryptConn(conn),
sendPackets: make(chan []byte, 20), sendPackets: make(chan packet, 20),
clientContext: &clientctx.ClientContext{ clientContext: &clientctx.ClientContext{
StrConv: &stringsupport.StringConverter{ StrConv: &stringsupport.StringConverter{
Encoding: japanese.ShiftJIS, Encoding: japanese.ShiftJIS,
}, },
}, },
userEnteredStage: false, sessionStart: Time_Current_Adjusted().Unix(),
sessionStart: Time_Current_Adjusted().Unix(), stageMoveStack: stringstack.New(),
stageMoveStack: stringstack.New(),
} }
return s return s
} }
@@ -102,19 +97,34 @@ func (s *Session) Start() {
// QueueSend queues a packet (raw []byte) to be sent. // QueueSend queues a packet (raw []byte) to be sent.
func (s *Session) QueueSend(data []byte) { func (s *Session) QueueSend(data []byte) {
bf := byteframe.NewByteFrameFromBytes(data[:2]) s.logMessage(binary.BigEndian.Uint16(data[0:2]), data, "Server", s.Name)
s.logMessage(bf.ReadUint16(), data, "Server", s.Name) select {
s.sendPackets <- data case s.sendPackets <- packet{data, false}:
// Enqueued data
default:
s.logger.Warn("Packet queue too full, flushing!")
var tempPackets []packet
for len(s.sendPackets) > 0 {
tempPacket := <-s.sendPackets
if !tempPacket.nonBlocking {
tempPackets = append(tempPackets, tempPacket)
}
}
for _, tempPacket := range tempPackets {
s.sendPackets <- tempPacket
}
s.sendPackets <- packet{data, false}
}
} }
// QueueSendNonBlocking queues a packet (raw []byte) to be sent, dropping the packet entirely if the queue is full. // QueueSendNonBlocking queues a packet (raw []byte) to be sent, dropping the packet entirely if the queue is full.
func (s *Session) QueueSendNonBlocking(data []byte) { func (s *Session) QueueSendNonBlocking(data []byte) {
select { select {
case s.sendPackets <- data: case s.sendPackets <- packet{data, true}:
// Enqueued properly. // Enqueued data
default: default:
// Couldn't enqueue, likely something wrong with the connection. s.logger.Warn("Packet queue too full, dropping!")
s.logger.Warn("Dropped packet for session because of full send buffer, something is probably wrong") // Queue too full
} }
} }
@@ -142,29 +152,25 @@ func (s *Session) QueueAck(ackHandle uint32, data []byte) {
func (s *Session) sendLoop() { func (s *Session) sendLoop() {
for { for {
// TODO(Andoryuuta): Test making this into a buffered channel and grouping the packet together before sending. if s.closed {
rawPacket := <-s.sendPackets
if rawPacket == nil {
s.logger.Debug("Got nil from s.SendPackets, exiting send loop")
return return
} }
pkt := <-s.sendPackets
// Make a copy of the data. err := s.cryptConn.SendPacket(append(pkt.data, []byte{0x00, 0x10}...))
terminatedPacket := make([]byte, len(rawPacket)) if err != nil {
copy(terminatedPacket, rawPacket) s.logger.Warn("Failed to send packet")
}
// Append the MSG_SYS_END tailing opcode. time.Sleep(10 * time.Millisecond)
terminatedPacket = append(terminatedPacket, []byte{0x00, 0x10}...)
s.cryptConn.SendPacket(terminatedPacket)
} }
} }
func (s *Session) recvLoop() { func (s *Session) recvLoop() {
for { for {
if s.closed {
logoutPlayer(s)
return
}
pkt, err := s.cryptConn.ReadPacket() pkt, err := s.cryptConn.ReadPacket()
if err == io.EOF { if err == io.EOF {
s.logger.Info(fmt.Sprintf("[%s] Disconnected", s.Name)) s.logger.Info(fmt.Sprintf("[%s] Disconnected", s.Name))
logoutPlayer(s) logoutPlayer(s)
@@ -176,6 +182,7 @@ func (s *Session) recvLoop() {
return return
} }
s.handlePacketGroup(pkt) s.handlePacketGroup(pkt)
time.Sleep(10 * time.Millisecond)
} }
} }
@@ -195,7 +202,8 @@ func (s *Session) handlePacketGroup(pktGroup []byte) {
s.logMessage(opcodeUint16, pktGroup, s.Name, "Server") s.logMessage(opcodeUint16, pktGroup, s.Name, "Server")
if opcode == network.MSG_SYS_LOGOUT { if opcode == network.MSG_SYS_LOGOUT {
s.rawConn.Close() s.closed = true
return
} }
// Get the packet parser and handler for this opcode. // Get the packet parser and handler for this opcode.
mhfPkt := mhfpacket.FromOpcode(opcode) mhfPkt := mhfpacket.FromOpcode(opcode)