mirror of
https://github.com/Mezeporta/Erupe.git
synced 2026-03-22 07:32:32 +01:00
Silently ignored DB errors in handlers could cause data loss (frontier point transactions completing without DB writes), reward duplication (stamp exchange granting items on failed UPDATE), and crashes (tower mission page=0 causing index-out-of-bounds). House access state defaulting to 0 on DB failure also bypassed all access controls. HIGH risk fixes: - frontier point buy/sell now fails with ACK on DB error - stamp exchange/stampcard abort on failed UPDATE - guild meal INSERT returns fail ACK instead of orphaned ID 0 - mercenary/airou creation aborts on failed sequence nextval MEDIUM risk fixes: - tower mission page clamped to >= 1 preventing array underflow - tower RP donation returns early on failed guild state read - house state defaults to 2 (password-protected) on DB failure - playtime read failure logged instead of silently resetting RP Also cache userID on Session at login time, eliminating ~25 redundant subqueries of the form WHERE u.id=(SELECT c.user_id FROM characters c WHERE c.id=$1) across shop, gacha, command, and distitem handlers.
362 lines
10 KiB
Go
362 lines
10 KiB
Go
package channelserver
|
|
|
|
import (
|
|
"encoding/binary"
|
|
"encoding/hex"
|
|
"erupe-ce/common/mhfcourse"
|
|
"fmt"
|
|
"io"
|
|
"net"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"erupe-ce/common/byteframe"
|
|
"erupe-ce/common/stringstack"
|
|
"erupe-ce/network"
|
|
"erupe-ce/network/clientctx"
|
|
"erupe-ce/network/mhfpacket"
|
|
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
type packet struct {
|
|
data []byte
|
|
nonBlocking bool
|
|
}
|
|
|
|
// Session holds state for the channel server connection.
|
|
type Session struct {
|
|
sync.Mutex
|
|
logger *zap.Logger
|
|
server *Server
|
|
rawConn net.Conn
|
|
cryptConn network.Conn
|
|
sendPackets chan packet
|
|
clientContext *clientctx.ClientContext
|
|
lastPacket time.Time
|
|
|
|
objectID uint16
|
|
objectIndex uint16
|
|
loaded bool
|
|
|
|
stage *Stage
|
|
reservationStage *Stage // Required for the stateful MsgSysUnreserveStage packet.
|
|
stagePass string // Temporary storage
|
|
prevGuildID uint32 // Stores the last GuildID used in InfoGuild
|
|
charID uint32
|
|
userID uint32
|
|
logKey []byte
|
|
sessionStart int64
|
|
courses []mhfcourse.Course
|
|
token string
|
|
kqf []byte
|
|
kqfOverride bool
|
|
|
|
playtime uint32
|
|
playtimeTime time.Time
|
|
|
|
semaphore *Semaphore // Required for the stateful MsgSysUnreserveStage packet.
|
|
semaphoreMode bool
|
|
semaphoreID []uint16
|
|
|
|
// A stack containing the stage movement history (push on enter/move, pop on back)
|
|
stageMoveStack *stringstack.StringStack
|
|
|
|
// Accumulated index used for identifying mail for a client
|
|
// I'm not certain why this is used, but since the client is sending it
|
|
// I want to rely on it for now as it might be important later.
|
|
mailAccIndex uint8
|
|
// Contains the mail list that maps accumulated indexes to mail IDs
|
|
mailList []int
|
|
|
|
// For Debuging
|
|
Name string
|
|
closed atomic.Bool
|
|
ackStart map[uint32]time.Time
|
|
}
|
|
|
|
// NewSession creates a new Session type.
|
|
func NewSession(server *Server, conn net.Conn) *Session {
|
|
s := &Session{
|
|
logger: server.logger.Named(conn.RemoteAddr().String()),
|
|
server: server,
|
|
rawConn: conn,
|
|
cryptConn: network.NewCryptConn(conn, server.erupeConfig.RealClientMode, server.logger.Named(conn.RemoteAddr().String())),
|
|
sendPackets: make(chan packet, 20),
|
|
clientContext: &clientctx.ClientContext{RealClientMode: server.erupeConfig.RealClientMode},
|
|
lastPacket: time.Now(),
|
|
objectID: server.getObjectId(),
|
|
sessionStart: TimeAdjusted().Unix(),
|
|
stageMoveStack: stringstack.New(),
|
|
ackStart: make(map[uint32]time.Time),
|
|
semaphoreID: make([]uint16, 2),
|
|
}
|
|
return s
|
|
}
|
|
|
|
// Start starts the session packet send and recv loop(s).
|
|
func (s *Session) Start() {
|
|
s.logger.Debug("New connection", zap.String("RemoteAddr", s.rawConn.RemoteAddr().String()))
|
|
// Unlike the sign and entrance server,
|
|
// the client DOES NOT initalize the channel connection with 8 NULL bytes.
|
|
go s.sendLoop()
|
|
go s.recvLoop()
|
|
}
|
|
|
|
// QueueSend queues a packet (raw []byte) to be sent.
|
|
func (s *Session) QueueSend(data []byte) {
|
|
if len(data) >= 2 {
|
|
s.logMessage(binary.BigEndian.Uint16(data[0:2]), data, "Server", s.Name)
|
|
}
|
|
s.sendPackets <- packet{data, true}
|
|
}
|
|
|
|
// QueueSendNonBlocking queues a packet (raw []byte) to be sent, dropping the packet entirely if the queue is full.
|
|
func (s *Session) QueueSendNonBlocking(data []byte) {
|
|
select {
|
|
case s.sendPackets <- packet{data, true}:
|
|
if len(data) >= 2 {
|
|
s.logMessage(binary.BigEndian.Uint16(data[0:2]), data, "Server", s.Name)
|
|
}
|
|
default:
|
|
s.logger.Warn("Packet queue too full, dropping!")
|
|
}
|
|
}
|
|
|
|
// QueueSendMHF queues a MHFPacket to be sent.
|
|
func (s *Session) QueueSendMHF(pkt mhfpacket.MHFPacket) {
|
|
// Make the header
|
|
bf := byteframe.NewByteFrame()
|
|
bf.WriteUint16(uint16(pkt.Opcode()))
|
|
|
|
// Build the packet onto the byteframe.
|
|
_ = pkt.Build(bf, s.clientContext)
|
|
|
|
// Queue it.
|
|
s.QueueSend(bf.Data())
|
|
}
|
|
|
|
// QueueSendMHFNonBlocking queues a MHFPacket to be sent, dropping the packet entirely if the queue is full.
|
|
func (s *Session) QueueSendMHFNonBlocking(pkt mhfpacket.MHFPacket) {
|
|
// Make the header
|
|
bf := byteframe.NewByteFrame()
|
|
bf.WriteUint16(uint16(pkt.Opcode()))
|
|
|
|
// Build the packet onto the byteframe.
|
|
_ = pkt.Build(bf, s.clientContext)
|
|
|
|
// Queue it.
|
|
s.QueueSendNonBlocking(bf.Data())
|
|
}
|
|
|
|
// QueueAck is a helper function to queue an MSG_SYS_ACK with the given ack handle and data.
|
|
func (s *Session) QueueAck(ackHandle uint32, data []byte) {
|
|
bf := byteframe.NewByteFrame()
|
|
bf.WriteUint16(uint16(network.MSG_SYS_ACK))
|
|
bf.WriteUint32(ackHandle)
|
|
bf.WriteBytes(data)
|
|
s.QueueSend(bf.Data())
|
|
}
|
|
|
|
func (s *Session) sendLoop() {
|
|
for {
|
|
if s.closed.Load() {
|
|
return
|
|
}
|
|
// Send each packet individually with its own terminator
|
|
for len(s.sendPackets) > 0 {
|
|
pkt := <-s.sendPackets
|
|
err := s.cryptConn.SendPacket(append(pkt.data, []byte{0x00, 0x10}...))
|
|
if err != nil {
|
|
s.logger.Warn("Failed to send packet", zap.Error(err))
|
|
}
|
|
}
|
|
time.Sleep(time.Duration(s.server.erupeConfig.LoopDelay) * time.Millisecond)
|
|
}
|
|
}
|
|
|
|
func (s *Session) recvLoop() {
|
|
for {
|
|
if s.closed.Load() {
|
|
// Graceful disconnect - client sent logout packet
|
|
s.logger.Info("Session closed gracefully",
|
|
zap.Uint32("charID", s.charID),
|
|
zap.String("name", s.Name),
|
|
zap.String("disconnect_type", "graceful"),
|
|
)
|
|
logoutPlayer(s)
|
|
return
|
|
}
|
|
pkt, err := s.cryptConn.ReadPacket()
|
|
if err == io.EOF {
|
|
// Connection lost - client disconnected without logout packet
|
|
sessionDuration := time.Duration(0)
|
|
if s.sessionStart > 0 {
|
|
sessionDuration = time.Since(time.Unix(s.sessionStart, 0))
|
|
}
|
|
s.logger.Info("Connection lost (EOF)",
|
|
zap.Uint32("charID", s.charID),
|
|
zap.String("name", s.Name),
|
|
zap.String("disconnect_type", "connection_lost"),
|
|
zap.Duration("session_duration", sessionDuration),
|
|
)
|
|
logoutPlayer(s)
|
|
return
|
|
} else if err != nil {
|
|
// Connection error - network issue or malformed packet
|
|
s.logger.Warn("Connection error, exiting recv loop",
|
|
zap.Error(err),
|
|
zap.Uint32("charID", s.charID),
|
|
zap.String("name", s.Name),
|
|
zap.String("disconnect_type", "error"),
|
|
)
|
|
logoutPlayer(s)
|
|
return
|
|
}
|
|
s.handlePacketGroup(pkt)
|
|
time.Sleep(time.Duration(s.server.erupeConfig.LoopDelay) * time.Millisecond)
|
|
}
|
|
}
|
|
|
|
func (s *Session) handlePacketGroup(pktGroup []byte) {
|
|
s.lastPacket = time.Now()
|
|
bf := byteframe.NewByteFrameFromBytes(pktGroup)
|
|
opcodeUint16 := bf.ReadUint16()
|
|
if len(bf.Data()) >= 6 {
|
|
s.ackStart[bf.ReadUint32()] = time.Now()
|
|
_, _ = bf.Seek(2, io.SeekStart)
|
|
}
|
|
opcode := network.PacketID(opcodeUint16)
|
|
|
|
// This shouldn't be needed, but it's better to recover and let the connection die than to panic the server.
|
|
defer func() {
|
|
if r := recover(); r != nil {
|
|
s.logger.Error("Recovered from panic", zap.String("name", s.Name), zap.Any("panic", r))
|
|
}
|
|
}()
|
|
|
|
s.logMessage(opcodeUint16, pktGroup, s.Name, "Server")
|
|
|
|
if opcode == network.MSG_SYS_LOGOUT {
|
|
s.closed.Store(true)
|
|
return
|
|
}
|
|
// Get the packet parser and handler for this opcode.
|
|
mhfPkt := mhfpacket.FromOpcode(opcode)
|
|
if mhfPkt == nil {
|
|
s.logger.Warn("Got opcode which we don't know how to parse, can't parse anymore for this group")
|
|
return
|
|
}
|
|
// Parse the packet.
|
|
err := mhfPkt.Parse(bf, s.clientContext)
|
|
if err != nil {
|
|
s.logger.Warn("Packet not implemented",
|
|
zap.String("name", s.Name),
|
|
zap.Stringer("opcode", opcode),
|
|
)
|
|
return
|
|
}
|
|
if bf.Err() != nil {
|
|
s.logger.Warn("Malformed packet (read overflow during parse)",
|
|
zap.String("name", s.Name),
|
|
zap.Stringer("opcode", opcode),
|
|
zap.Error(bf.Err()),
|
|
)
|
|
return
|
|
}
|
|
// Handle the packet.
|
|
handler, ok := s.server.handlerTable[opcode]
|
|
if !ok {
|
|
s.logger.Warn("No handler for opcode", zap.Stringer("opcode", opcode))
|
|
return
|
|
}
|
|
handler(s, mhfPkt)
|
|
// If there is more data on the stream that the .Parse method didn't read, then read another packet off it.
|
|
remainingData := bf.DataFromCurrent()
|
|
if len(remainingData) >= 2 {
|
|
s.handlePacketGroup(remainingData)
|
|
}
|
|
}
|
|
|
|
func ignored(opcode network.PacketID) bool {
|
|
ignoreList := []network.PacketID{
|
|
network.MSG_SYS_END,
|
|
network.MSG_SYS_PING,
|
|
network.MSG_SYS_NOP,
|
|
network.MSG_SYS_TIME,
|
|
network.MSG_SYS_EXTEND_THRESHOLD,
|
|
network.MSG_SYS_POSITION_OBJECT,
|
|
// network.MSG_MHF_SAVEDATA, // Temporarily enabled for debugging save issues
|
|
}
|
|
set := make(map[network.PacketID]struct{}, len(ignoreList))
|
|
for _, s := range ignoreList {
|
|
set[s] = struct{}{}
|
|
}
|
|
_, r := set[opcode]
|
|
return r
|
|
}
|
|
|
|
func (s *Session) logMessage(opcode uint16, data []byte, sender string, recipient string) {
|
|
if sender == "Server" && !s.server.erupeConfig.DebugOptions.LogOutboundMessages {
|
|
return
|
|
} else if sender != "Server" && !s.server.erupeConfig.DebugOptions.LogInboundMessages {
|
|
return
|
|
}
|
|
|
|
opcodePID := network.PacketID(opcode)
|
|
if ignored(opcodePID) {
|
|
return
|
|
}
|
|
var ackHandle uint32
|
|
if len(data) >= 6 {
|
|
ackHandle = binary.BigEndian.Uint32(data[2:6])
|
|
}
|
|
fields := []zap.Field{
|
|
zap.String("sender", sender),
|
|
zap.String("recipient", recipient),
|
|
zap.Uint16("opcode_dec", opcode),
|
|
zap.String("opcode_hex", fmt.Sprintf("0x%04X", opcode)),
|
|
zap.Stringer("opcode_name", opcodePID),
|
|
zap.Int("data_bytes", len(data)),
|
|
}
|
|
if t, ok := s.ackStart[ackHandle]; ok {
|
|
fields = append(fields, zap.Duration("ack_latency", time.Since(t)))
|
|
}
|
|
if s.server.erupeConfig.DebugOptions.LogMessageData {
|
|
if len(data) <= s.server.erupeConfig.DebugOptions.MaxHexdumpLength {
|
|
fields = append(fields, zap.String("data", hex.Dump(data)))
|
|
}
|
|
}
|
|
s.logger.Debug("Packet", fields...)
|
|
}
|
|
|
|
func (s *Session) getObjectId() uint32 {
|
|
s.objectIndex++
|
|
return uint32(s.objectID)<<16 | uint32(s.objectIndex)
|
|
}
|
|
|
|
// Semaphore ID base values
|
|
const (
|
|
semaphoreBaseDefault = uint32(0x000F0000)
|
|
semaphoreBaseAlt = uint32(0x000E0000)
|
|
)
|
|
|
|
// GetSemaphoreID returns the semaphore ID held by the session, varying by semaphore mode.
|
|
func (s *Session) GetSemaphoreID() uint32 {
|
|
if s.semaphoreMode {
|
|
return semaphoreBaseAlt + uint32(s.semaphoreID[1])
|
|
} else {
|
|
return semaphoreBaseDefault + uint32(s.semaphoreID[0])
|
|
}
|
|
}
|
|
|
|
func (s *Session) isOp() bool {
|
|
var op bool
|
|
err := s.server.db.QueryRow(`SELECT op FROM users WHERE id=$1`, s.userID).Scan(&op)
|
|
if err == nil && op {
|
|
return true
|
|
}
|
|
return false
|
|
}
|