Files
Erupe/server/channelserver/sys_session.go
Houmgaor d456bd23e0 fix(channelserver): handle ignored DB errors and cache userID on session
Silently ignored DB errors in handlers could cause data loss (frontier
point transactions completing without DB writes), reward duplication
(stamp exchange granting items on failed UPDATE), and crashes (tower
mission page=0 causing index-out-of-bounds). House access state
defaulting to 0 on DB failure also bypassed all access controls.

HIGH risk fixes:
- frontier point buy/sell now fails with ACK on DB error
- stamp exchange/stampcard abort on failed UPDATE
- guild meal INSERT returns fail ACK instead of orphaned ID 0
- mercenary/airou creation aborts on failed sequence nextval

MEDIUM risk fixes:
- tower mission page clamped to >= 1 preventing array underflow
- tower RP donation returns early on failed guild state read
- house state defaults to 2 (password-protected) on DB failure
- playtime read failure logged instead of silently resetting RP

Also cache userID on Session at login time, eliminating ~25 redundant
subqueries of the form WHERE u.id=(SELECT c.user_id FROM characters
c WHERE c.id=$1) across shop, gacha, command, and distitem handlers.
2026-02-20 21:06:16 +01:00

362 lines
10 KiB
Go

package channelserver
import (
"encoding/binary"
"encoding/hex"
"erupe-ce/common/mhfcourse"
"fmt"
"io"
"net"
"sync"
"sync/atomic"
"time"
"erupe-ce/common/byteframe"
"erupe-ce/common/stringstack"
"erupe-ce/network"
"erupe-ce/network/clientctx"
"erupe-ce/network/mhfpacket"
"go.uber.org/zap"
)
type packet struct {
data []byte
nonBlocking bool
}
// Session holds state for the channel server connection.
type Session struct {
sync.Mutex
logger *zap.Logger
server *Server
rawConn net.Conn
cryptConn network.Conn
sendPackets chan packet
clientContext *clientctx.ClientContext
lastPacket time.Time
objectID uint16
objectIndex uint16
loaded bool
stage *Stage
reservationStage *Stage // Required for the stateful MsgSysUnreserveStage packet.
stagePass string // Temporary storage
prevGuildID uint32 // Stores the last GuildID used in InfoGuild
charID uint32
userID uint32
logKey []byte
sessionStart int64
courses []mhfcourse.Course
token string
kqf []byte
kqfOverride bool
playtime uint32
playtimeTime time.Time
semaphore *Semaphore // Required for the stateful MsgSysUnreserveStage packet.
semaphoreMode bool
semaphoreID []uint16
// A stack containing the stage movement history (push on enter/move, pop on back)
stageMoveStack *stringstack.StringStack
// Accumulated index used for identifying mail for a client
// I'm not certain why this is used, but since the client is sending it
// I want to rely on it for now as it might be important later.
mailAccIndex uint8
// Contains the mail list that maps accumulated indexes to mail IDs
mailList []int
// For Debuging
Name string
closed atomic.Bool
ackStart map[uint32]time.Time
}
// NewSession creates a new Session type.
func NewSession(server *Server, conn net.Conn) *Session {
s := &Session{
logger: server.logger.Named(conn.RemoteAddr().String()),
server: server,
rawConn: conn,
cryptConn: network.NewCryptConn(conn, server.erupeConfig.RealClientMode, server.logger.Named(conn.RemoteAddr().String())),
sendPackets: make(chan packet, 20),
clientContext: &clientctx.ClientContext{RealClientMode: server.erupeConfig.RealClientMode},
lastPacket: time.Now(),
objectID: server.getObjectId(),
sessionStart: TimeAdjusted().Unix(),
stageMoveStack: stringstack.New(),
ackStart: make(map[uint32]time.Time),
semaphoreID: make([]uint16, 2),
}
return s
}
// Start starts the session packet send and recv loop(s).
func (s *Session) Start() {
s.logger.Debug("New connection", zap.String("RemoteAddr", s.rawConn.RemoteAddr().String()))
// Unlike the sign and entrance server,
// the client DOES NOT initalize the channel connection with 8 NULL bytes.
go s.sendLoop()
go s.recvLoop()
}
// QueueSend queues a packet (raw []byte) to be sent.
func (s *Session) QueueSend(data []byte) {
if len(data) >= 2 {
s.logMessage(binary.BigEndian.Uint16(data[0:2]), data, "Server", s.Name)
}
s.sendPackets <- packet{data, true}
}
// QueueSendNonBlocking queues a packet (raw []byte) to be sent, dropping the packet entirely if the queue is full.
func (s *Session) QueueSendNonBlocking(data []byte) {
select {
case s.sendPackets <- packet{data, true}:
if len(data) >= 2 {
s.logMessage(binary.BigEndian.Uint16(data[0:2]), data, "Server", s.Name)
}
default:
s.logger.Warn("Packet queue too full, dropping!")
}
}
// QueueSendMHF queues a MHFPacket to be sent.
func (s *Session) QueueSendMHF(pkt mhfpacket.MHFPacket) {
// Make the header
bf := byteframe.NewByteFrame()
bf.WriteUint16(uint16(pkt.Opcode()))
// Build the packet onto the byteframe.
_ = pkt.Build(bf, s.clientContext)
// Queue it.
s.QueueSend(bf.Data())
}
// QueueSendMHFNonBlocking queues a MHFPacket to be sent, dropping the packet entirely if the queue is full.
func (s *Session) QueueSendMHFNonBlocking(pkt mhfpacket.MHFPacket) {
// Make the header
bf := byteframe.NewByteFrame()
bf.WriteUint16(uint16(pkt.Opcode()))
// Build the packet onto the byteframe.
_ = pkt.Build(bf, s.clientContext)
// Queue it.
s.QueueSendNonBlocking(bf.Data())
}
// QueueAck is a helper function to queue an MSG_SYS_ACK with the given ack handle and data.
func (s *Session) QueueAck(ackHandle uint32, data []byte) {
bf := byteframe.NewByteFrame()
bf.WriteUint16(uint16(network.MSG_SYS_ACK))
bf.WriteUint32(ackHandle)
bf.WriteBytes(data)
s.QueueSend(bf.Data())
}
func (s *Session) sendLoop() {
for {
if s.closed.Load() {
return
}
// Send each packet individually with its own terminator
for len(s.sendPackets) > 0 {
pkt := <-s.sendPackets
err := s.cryptConn.SendPacket(append(pkt.data, []byte{0x00, 0x10}...))
if err != nil {
s.logger.Warn("Failed to send packet", zap.Error(err))
}
}
time.Sleep(time.Duration(s.server.erupeConfig.LoopDelay) * time.Millisecond)
}
}
func (s *Session) recvLoop() {
for {
if s.closed.Load() {
// Graceful disconnect - client sent logout packet
s.logger.Info("Session closed gracefully",
zap.Uint32("charID", s.charID),
zap.String("name", s.Name),
zap.String("disconnect_type", "graceful"),
)
logoutPlayer(s)
return
}
pkt, err := s.cryptConn.ReadPacket()
if err == io.EOF {
// Connection lost - client disconnected without logout packet
sessionDuration := time.Duration(0)
if s.sessionStart > 0 {
sessionDuration = time.Since(time.Unix(s.sessionStart, 0))
}
s.logger.Info("Connection lost (EOF)",
zap.Uint32("charID", s.charID),
zap.String("name", s.Name),
zap.String("disconnect_type", "connection_lost"),
zap.Duration("session_duration", sessionDuration),
)
logoutPlayer(s)
return
} else if err != nil {
// Connection error - network issue or malformed packet
s.logger.Warn("Connection error, exiting recv loop",
zap.Error(err),
zap.Uint32("charID", s.charID),
zap.String("name", s.Name),
zap.String("disconnect_type", "error"),
)
logoutPlayer(s)
return
}
s.handlePacketGroup(pkt)
time.Sleep(time.Duration(s.server.erupeConfig.LoopDelay) * time.Millisecond)
}
}
func (s *Session) handlePacketGroup(pktGroup []byte) {
s.lastPacket = time.Now()
bf := byteframe.NewByteFrameFromBytes(pktGroup)
opcodeUint16 := bf.ReadUint16()
if len(bf.Data()) >= 6 {
s.ackStart[bf.ReadUint32()] = time.Now()
_, _ = bf.Seek(2, io.SeekStart)
}
opcode := network.PacketID(opcodeUint16)
// This shouldn't be needed, but it's better to recover and let the connection die than to panic the server.
defer func() {
if r := recover(); r != nil {
s.logger.Error("Recovered from panic", zap.String("name", s.Name), zap.Any("panic", r))
}
}()
s.logMessage(opcodeUint16, pktGroup, s.Name, "Server")
if opcode == network.MSG_SYS_LOGOUT {
s.closed.Store(true)
return
}
// Get the packet parser and handler for this opcode.
mhfPkt := mhfpacket.FromOpcode(opcode)
if mhfPkt == nil {
s.logger.Warn("Got opcode which we don't know how to parse, can't parse anymore for this group")
return
}
// Parse the packet.
err := mhfPkt.Parse(bf, s.clientContext)
if err != nil {
s.logger.Warn("Packet not implemented",
zap.String("name", s.Name),
zap.Stringer("opcode", opcode),
)
return
}
if bf.Err() != nil {
s.logger.Warn("Malformed packet (read overflow during parse)",
zap.String("name", s.Name),
zap.Stringer("opcode", opcode),
zap.Error(bf.Err()),
)
return
}
// Handle the packet.
handler, ok := s.server.handlerTable[opcode]
if !ok {
s.logger.Warn("No handler for opcode", zap.Stringer("opcode", opcode))
return
}
handler(s, mhfPkt)
// If there is more data on the stream that the .Parse method didn't read, then read another packet off it.
remainingData := bf.DataFromCurrent()
if len(remainingData) >= 2 {
s.handlePacketGroup(remainingData)
}
}
func ignored(opcode network.PacketID) bool {
ignoreList := []network.PacketID{
network.MSG_SYS_END,
network.MSG_SYS_PING,
network.MSG_SYS_NOP,
network.MSG_SYS_TIME,
network.MSG_SYS_EXTEND_THRESHOLD,
network.MSG_SYS_POSITION_OBJECT,
// network.MSG_MHF_SAVEDATA, // Temporarily enabled for debugging save issues
}
set := make(map[network.PacketID]struct{}, len(ignoreList))
for _, s := range ignoreList {
set[s] = struct{}{}
}
_, r := set[opcode]
return r
}
func (s *Session) logMessage(opcode uint16, data []byte, sender string, recipient string) {
if sender == "Server" && !s.server.erupeConfig.DebugOptions.LogOutboundMessages {
return
} else if sender != "Server" && !s.server.erupeConfig.DebugOptions.LogInboundMessages {
return
}
opcodePID := network.PacketID(opcode)
if ignored(opcodePID) {
return
}
var ackHandle uint32
if len(data) >= 6 {
ackHandle = binary.BigEndian.Uint32(data[2:6])
}
fields := []zap.Field{
zap.String("sender", sender),
zap.String("recipient", recipient),
zap.Uint16("opcode_dec", opcode),
zap.String("opcode_hex", fmt.Sprintf("0x%04X", opcode)),
zap.Stringer("opcode_name", opcodePID),
zap.Int("data_bytes", len(data)),
}
if t, ok := s.ackStart[ackHandle]; ok {
fields = append(fields, zap.Duration("ack_latency", time.Since(t)))
}
if s.server.erupeConfig.DebugOptions.LogMessageData {
if len(data) <= s.server.erupeConfig.DebugOptions.MaxHexdumpLength {
fields = append(fields, zap.String("data", hex.Dump(data)))
}
}
s.logger.Debug("Packet", fields...)
}
func (s *Session) getObjectId() uint32 {
s.objectIndex++
return uint32(s.objectID)<<16 | uint32(s.objectIndex)
}
// Semaphore ID base values
const (
semaphoreBaseDefault = uint32(0x000F0000)
semaphoreBaseAlt = uint32(0x000E0000)
)
// GetSemaphoreID returns the semaphore ID held by the session, varying by semaphore mode.
func (s *Session) GetSemaphoreID() uint32 {
if s.semaphoreMode {
return semaphoreBaseAlt + uint32(s.semaphoreID[1])
} else {
return semaphoreBaseDefault + uint32(s.semaphoreID[0])
}
}
func (s *Session) isOp() bool {
var op bool
err := s.server.db.QueryRow(`SELECT op FROM users WHERE id=$1`, s.userID).Scan(&op)
if err == nil && op {
return true
}
return false
}