mirror of
https://github.com/Mezeporta/Erupe.git
synced 2026-03-22 07:32:32 +01:00
Enable multiple Erupe instances to share a single PostgreSQL database without destroying each other's state, fix existing data races in cross-channel access, and lay groundwork for future distributed channel server deployments. Phase 1 — DB safety: - Scope DELETE FROM servers/sign_sessions to this instance's server IDs - Fix ci++ bug where failed channel start shifted subsequent IDs Phase 2 — Fix data races in cross-channel access: - Lock sessions map in FindSessionByCharID and DisconnectUser - Lock stagesLock in handleMsgSysLockGlobalSema - Snapshot sessions/stages under lock in TransitMessage types 1-4 - Lock channel when finding mail notification targets Phase 3 — ChannelRegistry interface: - Define ChannelRegistry interface with 7 cross-channel operations - Implement LocalChannelRegistry with proper locking - Add SessionSnapshot/StageSnapshot immutable copy types - Delegate WorldcastMHF, FindSessionByCharID, DisconnectUser to Registry - Migrate LockGlobalSema and guild mail handlers to use Registry - Add comprehensive tests including concurrent access Phase 4 — Per-channel enable/disable: - Add Enabled *bool to EntranceChannelInfo (nil defaults to true) - Skip disabled channels in startup loop, preserving ID stability - Add IsEnabled() helper with backward-compatible default - Update config.example.json with Enabled field
157 lines
3.8 KiB
Go
157 lines
3.8 KiB
Go
package channelserver
|
|
|
|
import (
|
|
"erupe-ce/network/mhfpacket"
|
|
"net"
|
|
"strings"
|
|
)
|
|
|
|
// LocalChannelRegistry is the in-process ChannelRegistry backed by []*Server.
|
|
type LocalChannelRegistry struct {
|
|
channels []*Server
|
|
}
|
|
|
|
// NewLocalChannelRegistry creates a LocalChannelRegistry wrapping the given channels.
|
|
func NewLocalChannelRegistry(channels []*Server) *LocalChannelRegistry {
|
|
return &LocalChannelRegistry{channels: channels}
|
|
}
|
|
|
|
func (r *LocalChannelRegistry) Worldcast(pkt mhfpacket.MHFPacket, ignoredSession *Session, ignoredChannel *Server) {
|
|
for _, c := range r.channels {
|
|
if c == ignoredChannel {
|
|
continue
|
|
}
|
|
c.BroadcastMHF(pkt, ignoredSession)
|
|
}
|
|
}
|
|
|
|
func (r *LocalChannelRegistry) FindSessionByCharID(charID uint32) *Session {
|
|
for _, c := range r.channels {
|
|
c.Lock()
|
|
for _, session := range c.sessions {
|
|
if session.charID == charID {
|
|
c.Unlock()
|
|
return session
|
|
}
|
|
}
|
|
c.Unlock()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (r *LocalChannelRegistry) DisconnectUser(cids []uint32) {
|
|
for _, c := range r.channels {
|
|
c.Lock()
|
|
for _, session := range c.sessions {
|
|
for _, cid := range cids {
|
|
if session.charID == cid {
|
|
_ = session.rawConn.Close()
|
|
break
|
|
}
|
|
}
|
|
}
|
|
c.Unlock()
|
|
}
|
|
}
|
|
|
|
func (r *LocalChannelRegistry) FindChannelForStage(stageSuffix string) string {
|
|
for _, channel := range r.channels {
|
|
channel.stagesLock.RLock()
|
|
for id := range channel.stages {
|
|
if strings.HasSuffix(id, stageSuffix) {
|
|
gid := channel.GlobalID
|
|
channel.stagesLock.RUnlock()
|
|
return gid
|
|
}
|
|
}
|
|
channel.stagesLock.RUnlock()
|
|
}
|
|
return ""
|
|
}
|
|
|
|
func (r *LocalChannelRegistry) SearchSessions(predicate func(SessionSnapshot) bool, max int) []SessionSnapshot {
|
|
var results []SessionSnapshot
|
|
for _, c := range r.channels {
|
|
if len(results) >= max {
|
|
break
|
|
}
|
|
c.Lock()
|
|
c.userBinaryPartsLock.RLock()
|
|
for _, session := range c.sessions {
|
|
if len(results) >= max {
|
|
break
|
|
}
|
|
snap := SessionSnapshot{
|
|
CharID: session.charID,
|
|
Name: session.Name,
|
|
ServerIP: net.ParseIP(c.IP).To4(),
|
|
ServerPort: c.Port,
|
|
}
|
|
if session.stage != nil {
|
|
snap.StageID = session.stage.id
|
|
}
|
|
ub3 := c.userBinaryParts[userBinaryPartID{charID: session.charID, index: 3}]
|
|
if len(ub3) > 0 {
|
|
snap.UserBinary3 = make([]byte, len(ub3))
|
|
copy(snap.UserBinary3, ub3)
|
|
}
|
|
if predicate(snap) {
|
|
results = append(results, snap)
|
|
}
|
|
}
|
|
c.userBinaryPartsLock.RUnlock()
|
|
c.Unlock()
|
|
}
|
|
return results
|
|
}
|
|
|
|
func (r *LocalChannelRegistry) SearchStages(stagePrefix string, max int) []StageSnapshot {
|
|
var results []StageSnapshot
|
|
for _, c := range r.channels {
|
|
if len(results) >= max {
|
|
break
|
|
}
|
|
c.stagesLock.RLock()
|
|
for _, stage := range c.stages {
|
|
if len(results) >= max {
|
|
break
|
|
}
|
|
if !strings.HasPrefix(stage.id, stagePrefix) {
|
|
continue
|
|
}
|
|
stage.RLock()
|
|
bin0 := stage.rawBinaryData[stageBinaryKey{1, 0}]
|
|
bin0Copy := make([]byte, len(bin0))
|
|
copy(bin0Copy, bin0)
|
|
bin1 := stage.rawBinaryData[stageBinaryKey{1, 1}]
|
|
bin1Copy := make([]byte, len(bin1))
|
|
copy(bin1Copy, bin1)
|
|
bin3 := stage.rawBinaryData[stageBinaryKey{1, 3}]
|
|
bin3Copy := make([]byte, len(bin3))
|
|
copy(bin3Copy, bin3)
|
|
|
|
results = append(results, StageSnapshot{
|
|
ServerIP: net.ParseIP(c.IP).To4(),
|
|
ServerPort: c.Port,
|
|
StageID: stage.id,
|
|
ClientCount: len(stage.clients) + len(stage.reservedClientSlots),
|
|
Reserved: len(stage.reservedClientSlots),
|
|
MaxPlayers: stage.maxPlayers,
|
|
RawBinData0: bin0Copy,
|
|
RawBinData1: bin1Copy,
|
|
RawBinData3: bin3Copy,
|
|
})
|
|
stage.RUnlock()
|
|
}
|
|
c.stagesLock.RUnlock()
|
|
}
|
|
return results
|
|
}
|
|
|
|
func (r *LocalChannelRegistry) NotifyMailToCharID(charID uint32, sender *Session, mail *Mail) {
|
|
session := r.FindSessionByCharID(charID)
|
|
if session != nil {
|
|
SendMailNotification(sender, mail, session)
|
|
}
|
|
}
|