From 1c32be98cc58923539a2fd60b7c186fbdbf39afa Mon Sep 17 00:00:00 2001 From: Houmgaor Date: Sun, 19 Oct 2025 23:49:23 +0200 Subject: [PATCH] fix(session): race condition. --- server/channelserver/integration_test.go | 33 ++++++++++-------------- server/channelserver/sys_session.go | 9 ++++--- server/channelserver/sys_session_test.go | 13 +++++----- 3 files changed, 24 insertions(+), 31 deletions(-) diff --git a/server/channelserver/integration_test.go b/server/channelserver/integration_test.go index c40102d48..f1bd5a12e 100644 --- a/server/channelserver/integration_test.go +++ b/server/channelserver/integration_test.go @@ -44,8 +44,7 @@ func IntegrationTest_PacketQueueFlow(t *testing.T) { s := &Session{ sendPackets: make(chan packet, 100), - closed: false, - server: &Server{ + server: &Server{ erupeConfig: &_config.Config{ DebugOptions: _config.DebugOptions{ LogOutboundMessages: false, @@ -84,7 +83,7 @@ func IntegrationTest_PacketQueueFlow(t *testing.T) { } done: - s.closed = true + s.closed.Store(true) time.Sleep(50 * time.Millisecond) sentPackets := mock.GetSentPackets() @@ -119,7 +118,6 @@ func IntegrationTest_ConcurrentQueueing(t *testing.T) { s := &Session{ sendPackets: make(chan packet, 200), - closed: false, server: &Server{ erupeConfig: &_config.Config{ DebugOptions: _config.DebugOptions{ @@ -176,7 +174,7 @@ func IntegrationTest_ConcurrentQueueing(t *testing.T) { } done: - s.closed = true + s.closed.Store(true) time.Sleep(50 * time.Millisecond) sentPackets := mock.GetSentPackets() @@ -218,7 +216,6 @@ func IntegrationTest_AckPacketFlow(t *testing.T) { s := &Session{ sendPackets: make(chan packet, 100), - closed: false, server: &Server{ erupeConfig: &_config.Config{ DebugOptions: _config.DebugOptions{ @@ -241,7 +238,7 @@ func IntegrationTest_AckPacketFlow(t *testing.T) { // Wait for ACKs to be sent time.Sleep(200 * time.Millisecond) - s.closed = true + s.closed.Store(true) time.Sleep(50 * time.Millisecond) sentPackets := mock.GetSentPackets() @@ -284,7 +281,6 @@ func IntegrationTest_MixedPacketTypes(t *testing.T) { s := &Session{ sendPackets: make(chan packet, 100), - closed: false, server: &Server{ erupeConfig: &_config.Config{ DebugOptions: _config.DebugOptions{ @@ -312,7 +308,7 @@ func IntegrationTest_MixedPacketTypes(t *testing.T) { // Wait for all packets time.Sleep(200 * time.Millisecond) - s.closed = true + s.closed.Store(true) time.Sleep(50 * time.Millisecond) sentPackets := mock.GetSentPackets() @@ -341,7 +337,6 @@ func IntegrationTest_PacketOrderPreservation(t *testing.T) { s := &Session{ sendPackets: make(chan packet, 100), - closed: false, server: &Server{ erupeConfig: &_config.Config{ DebugOptions: _config.DebugOptions{ @@ -363,7 +358,7 @@ func IntegrationTest_PacketOrderPreservation(t *testing.T) { // Wait for packets time.Sleep(300 * time.Millisecond) - s.closed = true + s.closed.Store(true) time.Sleep(50 * time.Millisecond) sentPackets := mock.GetSentPackets() @@ -399,7 +394,6 @@ func IntegrationTest_QueueBackpressure(t *testing.T) { // Small queue to test backpressure s := &Session{ sendPackets: make(chan packet, 5), - closed: false, server: &Server{ erupeConfig: &_config.Config{ DebugOptions: _config.DebugOptions{ @@ -430,7 +424,7 @@ func IntegrationTest_QueueBackpressure(t *testing.T) { // Wait for processing time.Sleep(1 * time.Second) - s.closed = true + s.closed.Store(true) time.Sleep(50 * time.Millisecond) // Some packets should have been sent @@ -507,7 +501,7 @@ func IntegrationTest_GuildEnumerationFlow(t *testing.T) { } done: - s.closed = true + s.closed.Store(true) time.Sleep(50 * time.Millisecond) sentPackets := mock.GetSentPackets() @@ -578,7 +572,7 @@ func IntegrationTest_ConcurrentClientAccess(t *testing.T) { } time.Sleep(100 * time.Millisecond) - s.closed = true + s.closed.Store(true) time.Sleep(50 * time.Millisecond) sentCount := mock.PacketCount() @@ -635,8 +629,7 @@ func IntegrationTest_ClientVersionCompatibility(t *testing.T) { mock := &MockCryptConn{sentPackets: make([][]byte, 0)} s := &Session{ sendPackets: make(chan packet, 100), - closed: false, - server: &Server{ + server: &Server{ erupeConfig: _config.ErupeConfig, }, } @@ -649,7 +642,7 @@ func IntegrationTest_ClientVersionCompatibility(t *testing.T) { s.QueueSend(testData) time.Sleep(100 * time.Millisecond) - s.closed = true + s.closed.Store(true) time.Sleep(50 * time.Millisecond) sentCount := mock.PacketCount() @@ -685,7 +678,7 @@ func IntegrationTest_PacketPrioritization(t *testing.T) { } time.Sleep(200 * time.Millisecond) - s.closed = true + s.closed.Store(true) time.Sleep(50 * time.Millisecond) sentPackets := mock.GetSentPackets() @@ -741,7 +734,7 @@ func IntegrationTest_DataIntegrityUnderLoad(t *testing.T) { } done: - s.closed = true + s.closed.Store(true) time.Sleep(50 * time.Millisecond) sentPackets := mock.GetSentPackets() diff --git a/server/channelserver/sys_session.go b/server/channelserver/sys_session.go index 4b5858e14..ac69131b0 100644 --- a/server/channelserver/sys_session.go +++ b/server/channelserver/sys_session.go @@ -9,6 +9,7 @@ import ( "io" "net" "sync" + "sync/atomic" "time" "erupe-ce/common/byteframe" @@ -69,7 +70,7 @@ type Session struct { // For Debuging Name string - closed bool + closed atomic.Bool ackStart map[uint32]time.Time } @@ -154,7 +155,7 @@ func (s *Session) QueueAck(ackHandle uint32, data []byte) { func (s *Session) sendLoop() { for { - if s.closed { + if s.closed.Load() { return } // Send each packet individually with its own terminator @@ -171,7 +172,7 @@ func (s *Session) sendLoop() { func (s *Session) recvLoop() { for { - if s.closed { + if s.closed.Load() { logoutPlayer(s) return } @@ -211,7 +212,7 @@ func (s *Session) handlePacketGroup(pktGroup []byte) { s.logMessage(opcodeUint16, pktGroup, s.Name, "Server") if opcode == network.MSG_SYS_LOGOUT { - s.closed = true + s.closed.Store(true) return } // Get the packet parser and handler for this opcode. diff --git a/server/channelserver/sys_session_test.go b/server/channelserver/sys_session_test.go index 3bb6775e8..4510d2641 100644 --- a/server/channelserver/sys_session_test.go +++ b/server/channelserver/sys_session_test.go @@ -55,7 +55,6 @@ func createTestSession(mock network.Conn) *Session { s := &Session{ logger: logger, sendPackets: make(chan packet, 20), - closed: false, cryptConn: mock, server: &Server{ erupeConfig: &_config.Config{ @@ -115,7 +114,7 @@ func TestPacketQueueIndividualSending(t *testing.T) { time.Sleep(100 * time.Millisecond) // Stop the session - s.closed = true + s.closed.Store(true) time.Sleep(50 * time.Millisecond) // Verify packet count @@ -162,7 +161,7 @@ func TestPacketQueueNoConcatenation(t *testing.T) { s.sendPackets <- packet{packet3, true} time.Sleep(100 * time.Millisecond) - s.closed = true + s.closed.Store(true) time.Sleep(50 * time.Millisecond) sentPackets := mock.GetSentPackets() @@ -220,7 +219,7 @@ func TestQueueSendUsesQueue(t *testing.T) { t.Errorf("expected 1 packet sent after sendLoop, got %d", mock.PacketCount()) } - s.closed = true + s.closed.Store(true) } // TestPacketTerminatorFormat verifies the exact terminator format @@ -234,7 +233,7 @@ func TestPacketTerminatorFormat(t *testing.T) { s.sendPackets <- packet{testData, true} time.Sleep(100 * time.Millisecond) - s.closed = true + s.closed.Store(true) time.Sleep(50 * time.Millisecond) sentPackets := mock.GetSentPackets() @@ -294,7 +293,7 @@ func TestQueueSendNonBlockingDropsOnFull(t *testing.T) { t.Errorf("expected 2 packets in queue, got %d", len(s.sendPackets)) } - s.closed = true + s.closed.Store(true) } // TestPacketQueueAckFormat verifies ACK packet format @@ -310,7 +309,7 @@ func TestPacketQueueAckFormat(t *testing.T) { s.QueueAck(ackHandle, ackData) time.Sleep(100 * time.Millisecond) - s.closed = true + s.closed.Store(true) time.Sleep(50 * time.Millisecond) sentPackets := mock.GetSentPackets()