mirror of
https://github.com/Mezeporta/Erupe.git
synced 2026-03-27 01:53:19 +01:00
feat(channelserver): decouple channel servers for independent operation (#33)
Enable multiple Erupe instances to share a single PostgreSQL database without destroying each other's state, fix existing data races in cross-channel access, and lay groundwork for future distributed channel server deployments. Phase 1 — DB safety: - Scope DELETE FROM servers/sign_sessions to this instance's server IDs - Fix ci++ bug where failed channel start shifted subsequent IDs Phase 2 — Fix data races in cross-channel access: - Lock sessions map in FindSessionByCharID and DisconnectUser - Lock stagesLock in handleMsgSysLockGlobalSema - Snapshot sessions/stages under lock in TransitMessage types 1-4 - Lock channel when finding mail notification targets Phase 3 — ChannelRegistry interface: - Define ChannelRegistry interface with 7 cross-channel operations - Implement LocalChannelRegistry with proper locking - Add SessionSnapshot/StageSnapshot immutable copy types - Delegate WorldcastMHF, FindSessionByCharID, DisconnectUser to Registry - Migrate LockGlobalSema and guild mail handlers to use Registry - Add comprehensive tests including concurrent access Phase 4 — Per-channel enable/disable: - Add Enabled *bool to EntranceChannelInfo (nil defaults to true) - Skip disabled channels in startup loop, preserving ID stability - Add IsEnabled() helper with backward-compatible default - Update config.example.json with Enabled field
This commit is contained in:
@@ -399,11 +399,17 @@ func handleMsgSysEcho(s *Session, p mhfpacket.MHFPacket) {}
|
||||
func handleMsgSysLockGlobalSema(s *Session, p mhfpacket.MHFPacket) {
|
||||
pkt := p.(*mhfpacket.MsgSysLockGlobalSema)
|
||||
var sgid string
|
||||
for _, channel := range s.server.Channels {
|
||||
for id := range channel.stages {
|
||||
if strings.HasSuffix(id, pkt.UserIDString) {
|
||||
sgid = channel.GlobalID
|
||||
if s.server.Registry != nil {
|
||||
sgid = s.server.Registry.FindChannelForStage(pkt.UserIDString)
|
||||
} else {
|
||||
for _, channel := range s.server.Channels {
|
||||
channel.stagesLock.RLock()
|
||||
for id := range channel.stages {
|
||||
if strings.HasSuffix(id, pkt.UserIDString) {
|
||||
sgid = channel.GlobalID
|
||||
}
|
||||
}
|
||||
channel.stagesLock.RUnlock()
|
||||
}
|
||||
}
|
||||
bf := byteframe.NewByteFrame()
|
||||
@@ -468,7 +474,23 @@ func handleMsgMhfTransitMessage(s *Session, p mhfpacket.MHFPacket) {
|
||||
resp.WriteUint16(0)
|
||||
switch pkt.SearchType {
|
||||
case 1, 2, 3: // usersearchidx, usersearchname, lobbysearchname
|
||||
// Snapshot matching sessions under lock, then build response outside locks.
|
||||
type sessionResult struct {
|
||||
charID uint32
|
||||
name []byte
|
||||
stageID []byte
|
||||
ip net.IP
|
||||
port uint16
|
||||
userBin3 []byte
|
||||
}
|
||||
var results []sessionResult
|
||||
|
||||
for _, c := range s.server.Channels {
|
||||
if count == maxResults {
|
||||
break
|
||||
}
|
||||
c.Lock()
|
||||
c.userBinaryPartsLock.RLock()
|
||||
for _, session := range c.sessions {
|
||||
if count == maxResults {
|
||||
break
|
||||
@@ -483,31 +505,45 @@ func handleMsgMhfTransitMessage(s *Session, p mhfpacket.MHFPacket) {
|
||||
continue
|
||||
}
|
||||
count++
|
||||
sessionName := stringsupport.UTF8ToSJIS(session.Name)
|
||||
sessionStage := stringsupport.UTF8ToSJIS(session.stage.id)
|
||||
if !local {
|
||||
resp.WriteUint32(binary.LittleEndian.Uint32(net.ParseIP(c.IP).To4()))
|
||||
} else {
|
||||
resp.WriteUint32(0x0100007F)
|
||||
}
|
||||
resp.WriteUint16(c.Port)
|
||||
resp.WriteUint32(session.charID)
|
||||
resp.WriteUint8(uint8(len(sessionStage) + 1))
|
||||
resp.WriteUint8(uint8(len(sessionName) + 1))
|
||||
resp.WriteUint16(uint16(len(c.userBinaryParts[userBinaryPartID{charID: session.charID, index: 3}])))
|
||||
|
||||
// TODO: This case might be <=G2
|
||||
if _config.ErupeConfig.RealClientMode <= _config.G1 {
|
||||
resp.WriteBytes(make([]byte, 8))
|
||||
} else {
|
||||
resp.WriteBytes(make([]byte, 40))
|
||||
}
|
||||
resp.WriteBytes(make([]byte, 8))
|
||||
|
||||
resp.WriteNullTerminatedBytes(sessionStage)
|
||||
resp.WriteNullTerminatedBytes(sessionName)
|
||||
resp.WriteBytes(c.userBinaryParts[userBinaryPartID{session.charID, 3}])
|
||||
ub3 := c.userBinaryParts[userBinaryPartID{charID: session.charID, index: 3}]
|
||||
ub3Copy := make([]byte, len(ub3))
|
||||
copy(ub3Copy, ub3)
|
||||
results = append(results, sessionResult{
|
||||
charID: session.charID,
|
||||
name: stringsupport.UTF8ToSJIS(session.Name),
|
||||
stageID: stringsupport.UTF8ToSJIS(session.stage.id),
|
||||
ip: net.ParseIP(c.IP).To4(),
|
||||
port: c.Port,
|
||||
userBin3: ub3Copy,
|
||||
})
|
||||
}
|
||||
c.userBinaryPartsLock.RUnlock()
|
||||
c.Unlock()
|
||||
}
|
||||
|
||||
for _, r := range results {
|
||||
if !local {
|
||||
resp.WriteUint32(binary.LittleEndian.Uint32(r.ip))
|
||||
} else {
|
||||
resp.WriteUint32(0x0100007F)
|
||||
}
|
||||
resp.WriteUint16(r.port)
|
||||
resp.WriteUint32(r.charID)
|
||||
resp.WriteUint8(uint8(len(r.stageID) + 1))
|
||||
resp.WriteUint8(uint8(len(r.name) + 1))
|
||||
resp.WriteUint16(uint16(len(r.userBin3)))
|
||||
|
||||
// TODO: This case might be <=G2
|
||||
if _config.ErupeConfig.RealClientMode <= _config.G1 {
|
||||
resp.WriteBytes(make([]byte, 8))
|
||||
} else {
|
||||
resp.WriteBytes(make([]byte, 40))
|
||||
}
|
||||
resp.WriteBytes(make([]byte, 8))
|
||||
|
||||
resp.WriteNullTerminatedBytes(r.stageID)
|
||||
resp.WriteNullTerminatedBytes(r.name)
|
||||
resp.WriteBytes(r.userBin3)
|
||||
}
|
||||
case 4: // lobbysearch
|
||||
type FindPartyParams struct {
|
||||
@@ -594,12 +630,31 @@ func handleMsgMhfTransitMessage(s *Session, p mhfpacket.MHFPacket) {
|
||||
}
|
||||
}
|
||||
}
|
||||
// Snapshot matching stages under lock, then build response outside locks.
|
||||
type stageResult struct {
|
||||
ip net.IP
|
||||
port uint16
|
||||
clientCount int
|
||||
reserved int
|
||||
maxPlayers uint16
|
||||
stageID string
|
||||
stageData []int16
|
||||
rawBinData0 []byte
|
||||
rawBinData1 []byte
|
||||
}
|
||||
var stageResults []stageResult
|
||||
|
||||
for _, c := range s.server.Channels {
|
||||
if count == maxResults {
|
||||
break
|
||||
}
|
||||
c.stagesLock.RLock()
|
||||
for _, stage := range c.stages {
|
||||
if count == maxResults {
|
||||
break
|
||||
}
|
||||
if strings.HasPrefix(stage.id, findPartyParams.StagePrefix) {
|
||||
stage.RLock()
|
||||
sb3 := byteframe.NewByteFrameFromBytes(stage.rawBinaryData[stageBinaryKey{1, 3}])
|
||||
_, _ = sb3.Seek(4, 0)
|
||||
|
||||
@@ -621,6 +676,7 @@ func handleMsgMhfTransitMessage(s *Session, p mhfpacket.MHFPacket) {
|
||||
|
||||
if findPartyParams.RankRestriction >= 0 {
|
||||
if stageData[0] > findPartyParams.RankRestriction {
|
||||
stage.RUnlock()
|
||||
continue
|
||||
}
|
||||
}
|
||||
@@ -634,47 +690,72 @@ func handleMsgMhfTransitMessage(s *Session, p mhfpacket.MHFPacket) {
|
||||
}
|
||||
}
|
||||
if !hasTarget {
|
||||
stage.RUnlock()
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
// Copy binary data under lock
|
||||
bin0 := stage.rawBinaryData[stageBinaryKey{1, 0}]
|
||||
bin0Copy := make([]byte, len(bin0))
|
||||
copy(bin0Copy, bin0)
|
||||
bin1 := stage.rawBinaryData[stageBinaryKey{1, 1}]
|
||||
bin1Copy := make([]byte, len(bin1))
|
||||
copy(bin1Copy, bin1)
|
||||
|
||||
count++
|
||||
if !local {
|
||||
resp.WriteUint32(binary.LittleEndian.Uint32(net.ParseIP(c.IP).To4()))
|
||||
} else {
|
||||
resp.WriteUint32(0x0100007F)
|
||||
}
|
||||
resp.WriteUint16(c.Port)
|
||||
|
||||
resp.WriteUint16(0) // Static?
|
||||
resp.WriteUint16(0) // Unk, [0 1 2]
|
||||
resp.WriteUint16(uint16(len(stage.clients) + len(stage.reservedClientSlots)))
|
||||
resp.WriteUint16(stage.maxPlayers)
|
||||
// TODO: Retail returned the number of clients in quests, not workshop/my series
|
||||
resp.WriteUint16(uint16(len(stage.reservedClientSlots)))
|
||||
|
||||
resp.WriteUint8(0) // Static?
|
||||
resp.WriteUint8(uint8(stage.maxPlayers))
|
||||
resp.WriteUint8(1) // Static?
|
||||
resp.WriteUint8(uint8(len(stage.id) + 1))
|
||||
resp.WriteUint8(uint8(len(stage.rawBinaryData[stageBinaryKey{1, 0}])))
|
||||
resp.WriteUint8(uint8(len(stage.rawBinaryData[stageBinaryKey{1, 1}])))
|
||||
|
||||
for i := range stageData {
|
||||
if _config.ErupeConfig.RealClientMode >= _config.Z1 {
|
||||
resp.WriteInt16(stageData[i])
|
||||
} else {
|
||||
resp.WriteInt8(int8(stageData[i]))
|
||||
}
|
||||
}
|
||||
resp.WriteUint8(0) // Unk
|
||||
resp.WriteUint8(0) // Unk
|
||||
|
||||
resp.WriteNullTerminatedBytes([]byte(stage.id))
|
||||
resp.WriteBytes(stage.rawBinaryData[stageBinaryKey{1, 0}])
|
||||
resp.WriteBytes(stage.rawBinaryData[stageBinaryKey{1, 1}])
|
||||
stageResults = append(stageResults, stageResult{
|
||||
ip: net.ParseIP(c.IP).To4(),
|
||||
port: c.Port,
|
||||
clientCount: len(stage.clients) + len(stage.reservedClientSlots),
|
||||
reserved: len(stage.reservedClientSlots),
|
||||
maxPlayers: stage.maxPlayers,
|
||||
stageID: stage.id,
|
||||
stageData: stageData,
|
||||
rawBinData0: bin0Copy,
|
||||
rawBinData1: bin1Copy,
|
||||
})
|
||||
stage.RUnlock()
|
||||
}
|
||||
}
|
||||
c.stagesLock.RUnlock()
|
||||
}
|
||||
|
||||
for _, sr := range stageResults {
|
||||
if !local {
|
||||
resp.WriteUint32(binary.LittleEndian.Uint32(sr.ip))
|
||||
} else {
|
||||
resp.WriteUint32(0x0100007F)
|
||||
}
|
||||
resp.WriteUint16(sr.port)
|
||||
|
||||
resp.WriteUint16(0) // Static?
|
||||
resp.WriteUint16(0) // Unk, [0 1 2]
|
||||
resp.WriteUint16(uint16(sr.clientCount))
|
||||
resp.WriteUint16(sr.maxPlayers)
|
||||
// TODO: Retail returned the number of clients in quests, not workshop/my series
|
||||
resp.WriteUint16(uint16(sr.reserved))
|
||||
|
||||
resp.WriteUint8(0) // Static?
|
||||
resp.WriteUint8(uint8(sr.maxPlayers))
|
||||
resp.WriteUint8(1) // Static?
|
||||
resp.WriteUint8(uint8(len(sr.stageID) + 1))
|
||||
resp.WriteUint8(uint8(len(sr.rawBinData0)))
|
||||
resp.WriteUint8(uint8(len(sr.rawBinData1)))
|
||||
|
||||
for i := range sr.stageData {
|
||||
if _config.ErupeConfig.RealClientMode >= _config.Z1 {
|
||||
resp.WriteInt16(sr.stageData[i])
|
||||
} else {
|
||||
resp.WriteInt8(int8(sr.stageData[i]))
|
||||
}
|
||||
}
|
||||
resp.WriteUint8(0) // Unk
|
||||
resp.WriteUint8(0) // Unk
|
||||
|
||||
resp.WriteNullTerminatedBytes([]byte(sr.stageID))
|
||||
resp.WriteBytes(sr.rawBinData0)
|
||||
resp.WriteBytes(sr.rawBinData1)
|
||||
}
|
||||
}
|
||||
_, _ = resp.Seek(0, io.SeekStart)
|
||||
|
||||
Reference in New Issue
Block a user