package channelserver import ( "encoding/binary" "encoding/hex" "erupe-ce/common/mhfcourse" "fmt" "io" "net" "sync" "sync/atomic" "time" "erupe-ce/common/byteframe" "erupe-ce/common/stringstack" "erupe-ce/network" "erupe-ce/network/clientctx" "erupe-ce/network/mhfpacket" "go.uber.org/zap" ) type packet struct { data []byte nonBlocking bool } // Session holds state for the channel server connection. type Session struct { sync.Mutex logger *zap.Logger server *Server rawConn net.Conn cryptConn network.Conn sendPackets chan packet clientContext *clientctx.ClientContext lastPacket time.Time objectID uint16 objectIndex uint16 loaded bool stage *Stage reservationStage *Stage // Required for the stateful MsgSysUnreserveStage packet. stagePass string // Temporary storage prevGuildID uint32 // Stores the last GuildID used in InfoGuild charID uint32 logKey []byte sessionStart int64 courses []mhfcourse.Course token string kqf []byte kqfOverride bool playtime uint32 playtimeTime time.Time semaphore *Semaphore // Required for the stateful MsgSysUnreserveStage packet. semaphoreMode bool semaphoreID []uint16 // A stack containing the stage movement history (push on enter/move, pop on back) stageMoveStack *stringstack.StringStack // Accumulated index used for identifying mail for a client // I'm not certain why this is used, but since the client is sending it // I want to rely on it for now as it might be important later. mailAccIndex uint8 // Contains the mail list that maps accumulated indexes to mail IDs mailList []int // For Debuging Name string closed atomic.Bool ackStart map[uint32]time.Time } // NewSession creates a new Session type. func NewSession(server *Server, conn net.Conn) *Session { s := &Session{ logger: server.logger.Named(conn.RemoteAddr().String()), server: server, rawConn: conn, cryptConn: network.NewCryptConn(conn), sendPackets: make(chan packet, 20), clientContext: &clientctx.ClientContext{}, lastPacket: time.Now(), objectID: server.getObjectId(), sessionStart: TimeAdjusted().Unix(), stageMoveStack: stringstack.New(), ackStart: make(map[uint32]time.Time), semaphoreID: make([]uint16, 2), } return s } // Start starts the session packet send and recv loop(s). func (s *Session) Start() { s.logger.Debug("New connection", zap.String("RemoteAddr", s.rawConn.RemoteAddr().String())) // Unlike the sign and entrance server, // the client DOES NOT initalize the channel connection with 8 NULL bytes. go s.sendLoop() go s.recvLoop() } // QueueSend queues a packet (raw []byte) to be sent. func (s *Session) QueueSend(data []byte) { if len(data) >= 2 { s.logMessage(binary.BigEndian.Uint16(data[0:2]), data, "Server", s.Name) } s.sendPackets <- packet{data, true} } // QueueSendNonBlocking queues a packet (raw []byte) to be sent, dropping the packet entirely if the queue is full. func (s *Session) QueueSendNonBlocking(data []byte) { select { case s.sendPackets <- packet{data, true}: if len(data) >= 2 { s.logMessage(binary.BigEndian.Uint16(data[0:2]), data, "Server", s.Name) } default: s.logger.Warn("Packet queue too full, dropping!") } } // QueueSendMHF queues a MHFPacket to be sent. func (s *Session) QueueSendMHF(pkt mhfpacket.MHFPacket) { // Make the header bf := byteframe.NewByteFrame() bf.WriteUint16(uint16(pkt.Opcode())) // Build the packet onto the byteframe. _ = pkt.Build(bf, s.clientContext) // Queue it. s.QueueSend(bf.Data()) } // QueueSendMHFNonBlocking queues a MHFPacket to be sent, dropping the packet entirely if the queue is full. func (s *Session) QueueSendMHFNonBlocking(pkt mhfpacket.MHFPacket) { // Make the header bf := byteframe.NewByteFrame() bf.WriteUint16(uint16(pkt.Opcode())) // Build the packet onto the byteframe. _ = pkt.Build(bf, s.clientContext) // Queue it. s.QueueSendNonBlocking(bf.Data()) } // QueueAck is a helper function to queue an MSG_SYS_ACK with the given ack handle and data. func (s *Session) QueueAck(ackHandle uint32, data []byte) { bf := byteframe.NewByteFrame() bf.WriteUint16(uint16(network.MSG_SYS_ACK)) bf.WriteUint32(ackHandle) bf.WriteBytes(data) s.QueueSend(bf.Data()) } func (s *Session) sendLoop() { for { if s.closed.Load() { return } // Send each packet individually with its own terminator for len(s.sendPackets) > 0 { pkt := <-s.sendPackets err := s.cryptConn.SendPacket(append(pkt.data, []byte{0x00, 0x10}...)) if err != nil { s.logger.Warn("Failed to send packet", zap.Error(err)) } } time.Sleep(time.Duration(s.server.erupeConfig.LoopDelay) * time.Millisecond) } } func (s *Session) recvLoop() { for { if s.closed.Load() { // Graceful disconnect - client sent logout packet s.logger.Info("Session closed gracefully", zap.Uint32("charID", s.charID), zap.String("name", s.Name), zap.String("disconnect_type", "graceful"), ) logoutPlayer(s) return } pkt, err := s.cryptConn.ReadPacket() if err == io.EOF { // Connection lost - client disconnected without logout packet sessionDuration := time.Duration(0) if s.sessionStart > 0 { sessionDuration = time.Since(time.Unix(s.sessionStart, 0)) } s.logger.Info("Connection lost (EOF)", zap.Uint32("charID", s.charID), zap.String("name", s.Name), zap.String("disconnect_type", "connection_lost"), zap.Duration("session_duration", sessionDuration), ) logoutPlayer(s) return } else if err != nil { // Connection error - network issue or malformed packet s.logger.Warn("Connection error, exiting recv loop", zap.Error(err), zap.Uint32("charID", s.charID), zap.String("name", s.Name), zap.String("disconnect_type", "error"), ) logoutPlayer(s) return } s.handlePacketGroup(pkt) time.Sleep(time.Duration(s.server.erupeConfig.LoopDelay) * time.Millisecond) } } func (s *Session) handlePacketGroup(pktGroup []byte) { s.lastPacket = time.Now() bf := byteframe.NewByteFrameFromBytes(pktGroup) opcodeUint16 := bf.ReadUint16() if len(bf.Data()) >= 6 { s.ackStart[bf.ReadUint32()] = time.Now() _, _ = bf.Seek(2, io.SeekStart) } opcode := network.PacketID(opcodeUint16) // This shouldn't be needed, but it's better to recover and let the connection die than to panic the server. defer func() { if r := recover(); r != nil { s.logger.Error("Recovered from panic", zap.String("name", s.Name), zap.Any("panic", r)) } }() s.logMessage(opcodeUint16, pktGroup, s.Name, "Server") if opcode == network.MSG_SYS_LOGOUT { s.closed.Store(true) return } // Get the packet parser and handler for this opcode. mhfPkt := mhfpacket.FromOpcode(opcode) if mhfPkt == nil { s.logger.Warn("Got opcode which we don't know how to parse, can't parse anymore for this group") return } // Parse the packet. err := mhfPkt.Parse(bf, s.clientContext) if err != nil { s.logger.Warn("Packet not implemented", zap.String("name", s.Name), zap.Stringer("opcode", opcode), ) return } // Handle the packet. handlerTable[opcode](s, mhfPkt) // If there is more data on the stream that the .Parse method didn't read, then read another packet off it. remainingData := bf.DataFromCurrent() if len(remainingData) >= 2 { s.handlePacketGroup(remainingData) } } func ignored(opcode network.PacketID) bool { ignoreList := []network.PacketID{ network.MSG_SYS_END, network.MSG_SYS_PING, network.MSG_SYS_NOP, network.MSG_SYS_TIME, network.MSG_SYS_EXTEND_THRESHOLD, network.MSG_SYS_POSITION_OBJECT, // network.MSG_MHF_SAVEDATA, // Temporarily enabled for debugging save issues } set := make(map[network.PacketID]struct{}, len(ignoreList)) for _, s := range ignoreList { set[s] = struct{}{} } _, r := set[opcode] return r } func (s *Session) logMessage(opcode uint16, data []byte, sender string, recipient string) { if sender == "Server" && !s.server.erupeConfig.DebugOptions.LogOutboundMessages { return } else if sender != "Server" && !s.server.erupeConfig.DebugOptions.LogInboundMessages { return } opcodePID := network.PacketID(opcode) if ignored(opcodePID) { return } var ackHandle uint32 if len(data) >= 6 { ackHandle = binary.BigEndian.Uint32(data[2:6]) } fields := []zap.Field{ zap.String("sender", sender), zap.String("recipient", recipient), zap.Uint16("opcode_dec", opcode), zap.String("opcode_hex", fmt.Sprintf("0x%04X", opcode)), zap.Stringer("opcode_name", opcodePID), zap.Int("data_bytes", len(data)), } if t, ok := s.ackStart[ackHandle]; ok { fields = append(fields, zap.Duration("ack_latency", time.Since(t))) } if s.server.erupeConfig.DebugOptions.LogMessageData { if len(data) <= s.server.erupeConfig.DebugOptions.MaxHexdumpLength { fields = append(fields, zap.String("data", hex.Dump(data))) } } s.logger.Debug("Packet", fields...) } func (s *Session) getObjectId() uint32 { s.objectIndex++ return uint32(s.objectID)<<16 | uint32(s.objectIndex) } // GetSemaphoreID returns the semaphore ID held by the session, varying by semaphore mode. func (s *Session) GetSemaphoreID() uint32 { if s.semaphoreMode { return 0x000E0000 + uint32(s.semaphoreID[1]) } else { return 0x000F0000 + uint32(s.semaphoreID[0]) } } func (s *Session) isOp() bool { var op bool err := s.server.db.QueryRow(`SELECT op FROM users u WHERE u.id=(SELECT c.user_id FROM characters c WHERE c.id=$1)`, s.charID).Scan(&op) if err == nil && op { return true } return false }