diff --git a/server/channelserver/handlers.go b/server/channelserver/handlers.go index bda3eb352..b0c4d8d7d 100644 --- a/server/channelserver/handlers.go +++ b/server/channelserver/handlers.go @@ -182,11 +182,19 @@ func handleMsgSysLogout(s *Session, p mhfpacket.MHFPacket) { } func logoutPlayer(s *Session) { + s.server.Lock() if _, exists := s.server.sessions[s.rawConn]; exists { delete(s.server.sessions, s.rawConn) - s.rawConn.Close() - } else { - return // Prevent re-running logout logic on real logouts + } + s.rawConn.Close() + s.server.Unlock() + + for _, stage := range s.server.stages { + for session := range stage.clients { + if session.charID == s.charID { + delete(stage.clients, session) + } + } } _, err := s.server.db.Exec("UPDATE sign_sessions SET server_id=NULL, char_id=NULL WHERE token=$1", s.token) @@ -1794,9 +1802,7 @@ func handleMsgMhfGetLobbyCrowd(s *Session, p mhfpacket.MHFPacket) { // It can be worried about later if we ever get to the point where there are // full servers to actually need to migrate people from and empty ones to pkt := p.(*mhfpacket.MsgMhfGetLobbyCrowd) - blankData := make([]byte, 0x320) - doAckBufSucceed(s, pkt.AckHandle, blankData) - doAckSimpleSucceed(s, pkt.AckHandle, []byte{0x00, 0x00, 0x00, 0x00}) + doAckBufSucceed(s, pkt.AckHandle, make([]byte, 0x320)) } func handleMsgMhfGetTrendWeapon(s *Session, p mhfpacket.MHFPacket) { diff --git a/server/channelserver/sys_session.go b/server/channelserver/sys_session.go index ec122a521..90430f4da 100644 --- a/server/channelserver/sys_session.go +++ b/server/channelserver/sys_session.go @@ -1,11 +1,13 @@ package channelserver import ( + "encoding/binary" "encoding/hex" "fmt" "io" "net" "sync" + "time" "erupe-ce/common/byteframe" "erupe-ce/common/stringstack" @@ -17,6 +19,11 @@ import ( "golang.org/x/text/encoding/japanese" ) +type packet struct { + data []byte + nonBlocking bool +} + // Session holds state for the channel server connection. type Session struct { sync.Mutex @@ -24,11 +31,10 @@ type Session struct { server *Server rawConn net.Conn cryptConn *network.CryptConn - sendPackets chan []byte + sendPackets chan packet clientContext *clientctx.ClientContext userEnteredStage bool // If the user has entered a stage before - myseries MySeries stageID string stage *Stage reservationStage *Stage // Required for the stateful MsgSysUnreserveStage packet. @@ -55,18 +61,8 @@ type Session struct { mailList []int // For Debuging - Name string -} - -type MySeries struct { - houseTier []byte - houseData []byte - bookshelfData []byte - galleryData []byte - toreData []byte - gardenData []byte - state uint8 - password string + Name string + closed bool } // NewSession creates a new Session type. @@ -76,15 +72,14 @@ func NewSession(server *Server, conn net.Conn) *Session { server: server, rawConn: conn, cryptConn: network.NewCryptConn(conn), - sendPackets: make(chan []byte, 20), + sendPackets: make(chan packet, 20), clientContext: &clientctx.ClientContext{ StrConv: &stringsupport.StringConverter{ Encoding: japanese.ShiftJIS, }, }, - userEnteredStage: false, - sessionStart: Time_Current_Adjusted().Unix(), - stageMoveStack: stringstack.New(), + sessionStart: Time_Current_Adjusted().Unix(), + stageMoveStack: stringstack.New(), } return s } @@ -102,19 +97,34 @@ func (s *Session) Start() { // QueueSend queues a packet (raw []byte) to be sent. func (s *Session) QueueSend(data []byte) { - bf := byteframe.NewByteFrameFromBytes(data[:2]) - s.logMessage(bf.ReadUint16(), data, "Server", s.Name) - s.sendPackets <- data + s.logMessage(binary.BigEndian.Uint16(data[0:2]), data, "Server", s.Name) + select { + case s.sendPackets <- packet{data, false}: + // Enqueued data + default: + s.logger.Warn("Packet queue too full, flushing!") + var tempPackets []packet + for len(s.sendPackets) > 0 { + tempPacket := <-s.sendPackets + if !tempPacket.nonBlocking { + tempPackets = append(tempPackets, tempPacket) + } + } + for _, tempPacket := range tempPackets { + s.sendPackets <- tempPacket + } + s.sendPackets <- packet{data, false} + } } // QueueSendNonBlocking queues a packet (raw []byte) to be sent, dropping the packet entirely if the queue is full. func (s *Session) QueueSendNonBlocking(data []byte) { select { - case s.sendPackets <- data: - // Enqueued properly. + case s.sendPackets <- packet{data, true}: + // Enqueued data default: - // Couldn't enqueue, likely something wrong with the connection. - s.logger.Warn("Dropped packet for session because of full send buffer, something is probably wrong") + s.logger.Warn("Packet queue too full, dropping!") + // Queue too full } } @@ -142,29 +152,25 @@ func (s *Session) QueueAck(ackHandle uint32, data []byte) { func (s *Session) sendLoop() { for { - // TODO(Andoryuuta): Test making this into a buffered channel and grouping the packet together before sending. - rawPacket := <-s.sendPackets - - if rawPacket == nil { - s.logger.Debug("Got nil from s.SendPackets, exiting send loop") + if s.closed { return } - - // Make a copy of the data. - terminatedPacket := make([]byte, len(rawPacket)) - copy(terminatedPacket, rawPacket) - - // Append the MSG_SYS_END tailing opcode. - terminatedPacket = append(terminatedPacket, []byte{0x00, 0x10}...) - - s.cryptConn.SendPacket(terminatedPacket) + pkt := <-s.sendPackets + err := s.cryptConn.SendPacket(append(pkt.data, []byte{0x00, 0x10}...)) + if err != nil { + s.logger.Warn("Failed to send packet") + } + time.Sleep(10 * time.Millisecond) } } func (s *Session) recvLoop() { for { + if s.closed { + logoutPlayer(s) + return + } pkt, err := s.cryptConn.ReadPacket() - if err == io.EOF { s.logger.Info(fmt.Sprintf("[%s] Disconnected", s.Name)) logoutPlayer(s) @@ -176,6 +182,7 @@ func (s *Session) recvLoop() { return } s.handlePacketGroup(pkt) + time.Sleep(10 * time.Millisecond) } } @@ -195,7 +202,8 @@ func (s *Session) handlePacketGroup(pktGroup []byte) { s.logMessage(opcodeUint16, pktGroup, s.Name, "Server") if opcode == network.MSG_SYS_LOGOUT { - s.rawConn.Close() + s.closed = true + return } // Get the packet parser and handler for this opcode. mhfPkt := mhfpacket.FromOpcode(opcode)