mirror of
https://github.com/Mezeporta/Erupe.git
synced 2026-03-22 07:32:32 +01:00
fix(channelserver): consolidate stages map locking to prevent data race
The stages map was protected by two incompatible locks: the embedded Server.Mutex and Server.stagesLock (RWMutex). Since these are separate mutexes they don't exclude each other, and many handlers accessed the map with no lock at all. Route all stages map access through stagesLock: read-only lookups use RLock, writes (create/delete) use Lock. Per-stage field mutations continue to use each stage's own RWMutex. Restructure handleMsgSysUnlockStage to avoid holding stagesLock nested inside a stage RLock, preventing potential deadlock with destructEmptyStages.
This commit is contained in:
@@ -292,11 +292,20 @@ func logoutPlayer(s *Session) {
|
|||||||
_ = s.rawConn.Close()
|
_ = s.rawConn.Close()
|
||||||
s.server.Unlock()
|
s.server.Unlock()
|
||||||
|
|
||||||
// Stage cleanup
|
// Stage cleanup — snapshot sessions first under server mutex, then iterate stages under stagesLock
|
||||||
|
s.server.Lock()
|
||||||
|
sessionSnapshot := make([]*Session, 0, len(s.server.sessions))
|
||||||
|
for _, sess := range s.server.sessions {
|
||||||
|
sessionSnapshot = append(sessionSnapshot, sess)
|
||||||
|
}
|
||||||
|
s.server.Unlock()
|
||||||
|
|
||||||
|
s.server.stagesLock.RLock()
|
||||||
for _, stage := range s.server.stages {
|
for _, stage := range s.server.stages {
|
||||||
// Tell sessions registered to disconnecting players quest to unregister
|
stage.Lock()
|
||||||
|
// Tell sessions registered to disconnecting player's quest to unregister
|
||||||
if stage.host != nil && stage.host.charID == s.charID {
|
if stage.host != nil && stage.host.charID == s.charID {
|
||||||
for _, sess := range s.server.sessions {
|
for _, sess := range sessionSnapshot {
|
||||||
for rSlot := range stage.reservedClientSlots {
|
for rSlot := range stage.reservedClientSlots {
|
||||||
if sess.charID == rSlot && sess.stage != nil && sess.stage.id[3:5] != "Qs" {
|
if sess.charID == rSlot && sess.stage != nil && sess.stage.id[3:5] != "Qs" {
|
||||||
sess.QueueSendMHFNonBlocking(&mhfpacket.MsgSysStageDestruct{})
|
sess.QueueSendMHFNonBlocking(&mhfpacket.MsgSysStageDestruct{})
|
||||||
@@ -309,7 +318,9 @@ func logoutPlayer(s *Session) {
|
|||||||
delete(stage.clients, session)
|
delete(stage.clients, session)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
stage.Unlock()
|
||||||
}
|
}
|
||||||
|
s.server.stagesLock.RUnlock()
|
||||||
|
|
||||||
// Update sign sessions and server player count
|
// Update sign sessions and server player count
|
||||||
if s.server.db != nil {
|
if s.server.db != nil {
|
||||||
@@ -339,11 +350,13 @@ func logoutPlayer(s *Session) {
|
|||||||
CharID: s.charID,
|
CharID: s.charID,
|
||||||
}, s)
|
}, s)
|
||||||
|
|
||||||
s.server.Lock()
|
s.server.stagesLock.RLock()
|
||||||
for _, stage := range s.server.stages {
|
for _, stage := range s.server.stages {
|
||||||
|
stage.Lock()
|
||||||
delete(stage.reservedClientSlots, s.charID)
|
delete(stage.reservedClientSlots, s.charID)
|
||||||
|
stage.Unlock()
|
||||||
}
|
}
|
||||||
s.server.Unlock()
|
s.server.stagesLock.RUnlock()
|
||||||
|
|
||||||
removeSessionFromSemaphore(s)
|
removeSessionFromSemaphore(s)
|
||||||
removeSessionFromStage(s)
|
removeSessionFromStage(s)
|
||||||
|
|||||||
@@ -14,8 +14,8 @@ import (
|
|||||||
|
|
||||||
func handleMsgSysCreateStage(s *Session, p mhfpacket.MHFPacket) {
|
func handleMsgSysCreateStage(s *Session, p mhfpacket.MHFPacket) {
|
||||||
pkt := p.(*mhfpacket.MsgSysCreateStage)
|
pkt := p.(*mhfpacket.MsgSysCreateStage)
|
||||||
s.server.Lock()
|
s.server.stagesLock.Lock()
|
||||||
defer s.server.Unlock()
|
defer s.server.stagesLock.Unlock()
|
||||||
if _, exists := s.server.stages[pkt.StageID]; exists {
|
if _, exists := s.server.stages[pkt.StageID]; exists {
|
||||||
doAckSimpleFail(s, pkt.AckHandle, []byte{0x00, 0x00, 0x00, 0x00})
|
doAckSimpleFail(s, pkt.AckHandle, []byte{0x00, 0x00, 0x00, 0x00})
|
||||||
} else {
|
} else {
|
||||||
@@ -30,24 +30,20 @@ func handleMsgSysCreateStage(s *Session, p mhfpacket.MHFPacket) {
|
|||||||
func handleMsgSysStageDestruct(s *Session, p mhfpacket.MHFPacket) {}
|
func handleMsgSysStageDestruct(s *Session, p mhfpacket.MHFPacket) {}
|
||||||
|
|
||||||
func doStageTransfer(s *Session, ackHandle uint32, stageID string) {
|
func doStageTransfer(s *Session, ackHandle uint32, stageID string) {
|
||||||
s.server.Lock()
|
s.server.stagesLock.Lock()
|
||||||
stage, exists := s.server.stages[stageID]
|
stage, exists := s.server.stages[stageID]
|
||||||
s.server.Unlock()
|
if !exists {
|
||||||
|
|
||||||
if exists {
|
|
||||||
stage.Lock()
|
|
||||||
stage.clients[s] = s.charID
|
|
||||||
stage.Unlock()
|
|
||||||
} else { // Create new stage object
|
|
||||||
s.server.Lock()
|
|
||||||
s.server.stages[stageID] = NewStage(stageID)
|
s.server.stages[stageID] = NewStage(stageID)
|
||||||
stage = s.server.stages[stageID]
|
stage = s.server.stages[stageID]
|
||||||
s.server.Unlock()
|
|
||||||
stage.Lock()
|
|
||||||
stage.host = s
|
|
||||||
stage.clients[s] = s.charID
|
|
||||||
stage.Unlock()
|
|
||||||
}
|
}
|
||||||
|
s.server.stagesLock.Unlock()
|
||||||
|
|
||||||
|
stage.Lock()
|
||||||
|
if !exists {
|
||||||
|
stage.host = s
|
||||||
|
}
|
||||||
|
stage.clients[s] = s.charID
|
||||||
|
stage.Unlock()
|
||||||
|
|
||||||
// Ensure this session no longer belongs to reservations.
|
// Ensure this session no longer belongs to reservations.
|
||||||
if s.stage != nil {
|
if s.stage != nil {
|
||||||
@@ -55,8 +51,11 @@ func doStageTransfer(s *Session, ackHandle uint32, stageID string) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Save our new stage ID and pointer to the new stage itself.
|
// Save our new stage ID and pointer to the new stage itself.
|
||||||
|
s.server.stagesLock.RLock()
|
||||||
|
newStage := s.server.stages[stageID]
|
||||||
|
s.server.stagesLock.RUnlock()
|
||||||
s.Lock()
|
s.Lock()
|
||||||
s.stage = s.server.stages[stageID]
|
s.stage = newStage
|
||||||
s.Unlock()
|
s.Unlock()
|
||||||
|
|
||||||
// Tell the client to cleanup its current stage objects.
|
// Tell the client to cleanup its current stage objects.
|
||||||
@@ -141,8 +140,8 @@ func doStageTransfer(s *Session, ackHandle uint32, stageID string) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func destructEmptyStages(s *Session) {
|
func destructEmptyStages(s *Session) {
|
||||||
s.server.Lock()
|
s.server.stagesLock.Lock()
|
||||||
defer s.server.Unlock()
|
defer s.server.stagesLock.Unlock()
|
||||||
for _, stage := range s.server.stages {
|
for _, stage := range s.server.stages {
|
||||||
// Destroy empty Quest/My series/Guild stages.
|
// Destroy empty Quest/My series/Guild stages.
|
||||||
if stage.id[3:5] == "Qs" || stage.id[3:5] == "Ms" || stage.id[3:5] == "Gs" || stage.id[3:5] == "Ls" {
|
if stage.id[3:5] == "Qs" || stage.id[3:5] == "Ms" || stage.id[3:5] == "Gs" || stage.id[3:5] == "Ls" {
|
||||||
@@ -195,9 +194,9 @@ func removeSessionFromStage(s *Session) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func isStageFull(s *Session, StageID string) bool {
|
func isStageFull(s *Session, StageID string) bool {
|
||||||
s.server.Lock()
|
s.server.stagesLock.RLock()
|
||||||
stage, exists := s.server.stages[StageID]
|
stage, exists := s.server.stages[StageID]
|
||||||
s.server.Unlock()
|
s.server.stagesLock.RUnlock()
|
||||||
|
|
||||||
if exists {
|
if exists {
|
||||||
// Lock stage to safely check client counts
|
// Lock stage to safely check client counts
|
||||||
@@ -256,9 +255,20 @@ func handleMsgSysBackStage(s *Session, p mhfpacket.MHFPacket) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
delete(s.stage.reservedClientSlots, s.charID)
|
if s.stage != nil {
|
||||||
|
s.stage.Lock()
|
||||||
|
delete(s.stage.reservedClientSlots, s.charID)
|
||||||
|
s.stage.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
delete(s.server.stages[backStage].reservedClientSlots, s.charID)
|
s.server.stagesLock.RLock()
|
||||||
|
backStagePtr, exists := s.server.stages[backStage]
|
||||||
|
s.server.stagesLock.RUnlock()
|
||||||
|
if exists {
|
||||||
|
backStagePtr.Lock()
|
||||||
|
delete(backStagePtr.reservedClientSlots, s.charID)
|
||||||
|
backStagePtr.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
doStageTransfer(s, pkt.AckHandle, backStage)
|
doStageTransfer(s, pkt.AckHandle, backStage)
|
||||||
}
|
}
|
||||||
@@ -278,7 +288,10 @@ func handleMsgSysLeaveStage(s *Session, p mhfpacket.MHFPacket) {}
|
|||||||
|
|
||||||
func handleMsgSysLockStage(s *Session, p mhfpacket.MHFPacket) {
|
func handleMsgSysLockStage(s *Session, p mhfpacket.MHFPacket) {
|
||||||
pkt := p.(*mhfpacket.MsgSysLockStage)
|
pkt := p.(*mhfpacket.MsgSysLockStage)
|
||||||
if stage, exists := s.server.stages[pkt.StageID]; exists {
|
s.server.stagesLock.RLock()
|
||||||
|
stage, exists := s.server.stages[pkt.StageID]
|
||||||
|
s.server.stagesLock.RUnlock()
|
||||||
|
if exists {
|
||||||
stage.Lock()
|
stage.Lock()
|
||||||
stage.locked = true
|
stage.locked = true
|
||||||
stage.Unlock()
|
stage.Unlock()
|
||||||
@@ -288,17 +301,26 @@ func handleMsgSysLockStage(s *Session, p mhfpacket.MHFPacket) {
|
|||||||
|
|
||||||
func handleMsgSysUnlockStage(s *Session, p mhfpacket.MHFPacket) {
|
func handleMsgSysUnlockStage(s *Session, p mhfpacket.MHFPacket) {
|
||||||
if s.reservationStage != nil {
|
if s.reservationStage != nil {
|
||||||
|
// Read reserved client slots under stage RLock
|
||||||
s.reservationStage.RLock()
|
s.reservationStage.RLock()
|
||||||
defer s.reservationStage.RUnlock()
|
var charIDs []uint32
|
||||||
|
|
||||||
for charID := range s.reservationStage.reservedClientSlots {
|
for charID := range s.reservationStage.reservedClientSlots {
|
||||||
|
charIDs = append(charIDs, charID)
|
||||||
|
}
|
||||||
|
stageID := s.reservationStage.id
|
||||||
|
s.reservationStage.RUnlock()
|
||||||
|
|
||||||
|
for _, charID := range charIDs {
|
||||||
session := s.server.FindSessionByCharID(charID)
|
session := s.server.FindSessionByCharID(charID)
|
||||||
if session != nil {
|
if session != nil {
|
||||||
session.QueueSendMHFNonBlocking(&mhfpacket.MsgSysStageDestruct{})
|
session.QueueSendMHFNonBlocking(&mhfpacket.MsgSysStageDestruct{})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
delete(s.server.stages, s.reservationStage.id)
|
// Delete from stages map under stagesLock (not nested inside stage RLock)
|
||||||
|
s.server.stagesLock.Lock()
|
||||||
|
delete(s.server.stages, stageID)
|
||||||
|
s.server.stagesLock.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
destructEmptyStages(s)
|
destructEmptyStages(s)
|
||||||
@@ -306,7 +328,10 @@ func handleMsgSysUnlockStage(s *Session, p mhfpacket.MHFPacket) {
|
|||||||
|
|
||||||
func handleMsgSysReserveStage(s *Session, p mhfpacket.MHFPacket) {
|
func handleMsgSysReserveStage(s *Session, p mhfpacket.MHFPacket) {
|
||||||
pkt := p.(*mhfpacket.MsgSysReserveStage)
|
pkt := p.(*mhfpacket.MsgSysReserveStage)
|
||||||
if stage, exists := s.server.stages[pkt.StageID]; exists {
|
s.server.stagesLock.RLock()
|
||||||
|
stage, exists := s.server.stages[pkt.StageID]
|
||||||
|
s.server.stagesLock.RUnlock()
|
||||||
|
if exists {
|
||||||
stage.Lock()
|
stage.Lock()
|
||||||
defer stage.Unlock()
|
defer stage.Unlock()
|
||||||
if _, exists := stage.reservedClientSlots[s.charID]; exists {
|
if _, exists := stage.reservedClientSlots[s.charID]; exists {
|
||||||
@@ -377,7 +402,10 @@ func handleMsgSysSetStagePass(s *Session, p mhfpacket.MHFPacket) {
|
|||||||
|
|
||||||
func handleMsgSysSetStageBinary(s *Session, p mhfpacket.MHFPacket) {
|
func handleMsgSysSetStageBinary(s *Session, p mhfpacket.MHFPacket) {
|
||||||
pkt := p.(*mhfpacket.MsgSysSetStageBinary)
|
pkt := p.(*mhfpacket.MsgSysSetStageBinary)
|
||||||
if stage, exists := s.server.stages[pkt.StageID]; exists {
|
s.server.stagesLock.RLock()
|
||||||
|
stage, exists := s.server.stages[pkt.StageID]
|
||||||
|
s.server.stagesLock.RUnlock()
|
||||||
|
if exists {
|
||||||
stage.Lock()
|
stage.Lock()
|
||||||
stage.rawBinaryData[stageBinaryKey{pkt.BinaryType0, pkt.BinaryType1}] = pkt.RawDataPayload
|
stage.rawBinaryData[stageBinaryKey{pkt.BinaryType0, pkt.BinaryType1}] = pkt.RawDataPayload
|
||||||
stage.Unlock()
|
stage.Unlock()
|
||||||
@@ -388,7 +416,10 @@ func handleMsgSysSetStageBinary(s *Session, p mhfpacket.MHFPacket) {
|
|||||||
|
|
||||||
func handleMsgSysGetStageBinary(s *Session, p mhfpacket.MHFPacket) {
|
func handleMsgSysGetStageBinary(s *Session, p mhfpacket.MHFPacket) {
|
||||||
pkt := p.(*mhfpacket.MsgSysGetStageBinary)
|
pkt := p.(*mhfpacket.MsgSysGetStageBinary)
|
||||||
if stage, exists := s.server.stages[pkt.StageID]; exists {
|
s.server.stagesLock.RLock()
|
||||||
|
stage, exists := s.server.stages[pkt.StageID]
|
||||||
|
s.server.stagesLock.RUnlock()
|
||||||
|
if exists {
|
||||||
stage.Lock()
|
stage.Lock()
|
||||||
if binaryData, exists := stage.rawBinaryData[stageBinaryKey{pkt.BinaryType0, pkt.BinaryType1}]; exists {
|
if binaryData, exists := stage.rawBinaryData[stageBinaryKey{pkt.BinaryType0, pkt.BinaryType1}]; exists {
|
||||||
doAckBufSucceed(s, pkt.AckHandle, binaryData)
|
doAckBufSucceed(s, pkt.AckHandle, binaryData)
|
||||||
@@ -412,7 +443,10 @@ func handleMsgSysGetStageBinary(s *Session, p mhfpacket.MHFPacket) {
|
|||||||
|
|
||||||
func handleMsgSysWaitStageBinary(s *Session, p mhfpacket.MHFPacket) {
|
func handleMsgSysWaitStageBinary(s *Session, p mhfpacket.MHFPacket) {
|
||||||
pkt := p.(*mhfpacket.MsgSysWaitStageBinary)
|
pkt := p.(*mhfpacket.MsgSysWaitStageBinary)
|
||||||
if stage, exists := s.server.stages[pkt.StageID]; exists {
|
s.server.stagesLock.RLock()
|
||||||
|
stage, exists := s.server.stages[pkt.StageID]
|
||||||
|
s.server.stagesLock.RUnlock()
|
||||||
|
if exists {
|
||||||
if pkt.BinaryType0 == 1 && pkt.BinaryType1 == 12 {
|
if pkt.BinaryType0 == 1 && pkt.BinaryType1 == 12 {
|
||||||
// This might contain the hunter count, or max player count?
|
// This might contain the hunter count, or max player count?
|
||||||
doAckBufSucceed(s, pkt.AckHandle, []byte{0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00})
|
doAckBufSucceed(s, pkt.AckHandle, []byte{0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00})
|
||||||
|
|||||||
Reference in New Issue
Block a user