fix(session): race condition.

This commit is contained in:
Houmgaor
2025-10-19 23:49:23 +02:00
parent 4908731773
commit 1c32be98cc
3 changed files with 24 additions and 31 deletions

View File

@@ -44,8 +44,7 @@ func IntegrationTest_PacketQueueFlow(t *testing.T) {
s := &Session{
sendPackets: make(chan packet, 100),
closed: false,
server: &Server{
server: &Server{
erupeConfig: &_config.Config{
DebugOptions: _config.DebugOptions{
LogOutboundMessages: false,
@@ -84,7 +83,7 @@ func IntegrationTest_PacketQueueFlow(t *testing.T) {
}
done:
s.closed = true
s.closed.Store(true)
time.Sleep(50 * time.Millisecond)
sentPackets := mock.GetSentPackets()
@@ -119,7 +118,6 @@ func IntegrationTest_ConcurrentQueueing(t *testing.T) {
s := &Session{
sendPackets: make(chan packet, 200),
closed: false,
server: &Server{
erupeConfig: &_config.Config{
DebugOptions: _config.DebugOptions{
@@ -176,7 +174,7 @@ func IntegrationTest_ConcurrentQueueing(t *testing.T) {
}
done:
s.closed = true
s.closed.Store(true)
time.Sleep(50 * time.Millisecond)
sentPackets := mock.GetSentPackets()
@@ -218,7 +216,6 @@ func IntegrationTest_AckPacketFlow(t *testing.T) {
s := &Session{
sendPackets: make(chan packet, 100),
closed: false,
server: &Server{
erupeConfig: &_config.Config{
DebugOptions: _config.DebugOptions{
@@ -241,7 +238,7 @@ func IntegrationTest_AckPacketFlow(t *testing.T) {
// Wait for ACKs to be sent
time.Sleep(200 * time.Millisecond)
s.closed = true
s.closed.Store(true)
time.Sleep(50 * time.Millisecond)
sentPackets := mock.GetSentPackets()
@@ -284,7 +281,6 @@ func IntegrationTest_MixedPacketTypes(t *testing.T) {
s := &Session{
sendPackets: make(chan packet, 100),
closed: false,
server: &Server{
erupeConfig: &_config.Config{
DebugOptions: _config.DebugOptions{
@@ -312,7 +308,7 @@ func IntegrationTest_MixedPacketTypes(t *testing.T) {
// Wait for all packets
time.Sleep(200 * time.Millisecond)
s.closed = true
s.closed.Store(true)
time.Sleep(50 * time.Millisecond)
sentPackets := mock.GetSentPackets()
@@ -341,7 +337,6 @@ func IntegrationTest_PacketOrderPreservation(t *testing.T) {
s := &Session{
sendPackets: make(chan packet, 100),
closed: false,
server: &Server{
erupeConfig: &_config.Config{
DebugOptions: _config.DebugOptions{
@@ -363,7 +358,7 @@ func IntegrationTest_PacketOrderPreservation(t *testing.T) {
// Wait for packets
time.Sleep(300 * time.Millisecond)
s.closed = true
s.closed.Store(true)
time.Sleep(50 * time.Millisecond)
sentPackets := mock.GetSentPackets()
@@ -399,7 +394,6 @@ func IntegrationTest_QueueBackpressure(t *testing.T) {
// Small queue to test backpressure
s := &Session{
sendPackets: make(chan packet, 5),
closed: false,
server: &Server{
erupeConfig: &_config.Config{
DebugOptions: _config.DebugOptions{
@@ -430,7 +424,7 @@ func IntegrationTest_QueueBackpressure(t *testing.T) {
// Wait for processing
time.Sleep(1 * time.Second)
s.closed = true
s.closed.Store(true)
time.Sleep(50 * time.Millisecond)
// Some packets should have been sent
@@ -507,7 +501,7 @@ func IntegrationTest_GuildEnumerationFlow(t *testing.T) {
}
done:
s.closed = true
s.closed.Store(true)
time.Sleep(50 * time.Millisecond)
sentPackets := mock.GetSentPackets()
@@ -578,7 +572,7 @@ func IntegrationTest_ConcurrentClientAccess(t *testing.T) {
}
time.Sleep(100 * time.Millisecond)
s.closed = true
s.closed.Store(true)
time.Sleep(50 * time.Millisecond)
sentCount := mock.PacketCount()
@@ -635,8 +629,7 @@ func IntegrationTest_ClientVersionCompatibility(t *testing.T) {
mock := &MockCryptConn{sentPackets: make([][]byte, 0)}
s := &Session{
sendPackets: make(chan packet, 100),
closed: false,
server: &Server{
server: &Server{
erupeConfig: _config.ErupeConfig,
},
}
@@ -649,7 +642,7 @@ func IntegrationTest_ClientVersionCompatibility(t *testing.T) {
s.QueueSend(testData)
time.Sleep(100 * time.Millisecond)
s.closed = true
s.closed.Store(true)
time.Sleep(50 * time.Millisecond)
sentCount := mock.PacketCount()
@@ -685,7 +678,7 @@ func IntegrationTest_PacketPrioritization(t *testing.T) {
}
time.Sleep(200 * time.Millisecond)
s.closed = true
s.closed.Store(true)
time.Sleep(50 * time.Millisecond)
sentPackets := mock.GetSentPackets()
@@ -741,7 +734,7 @@ func IntegrationTest_DataIntegrityUnderLoad(t *testing.T) {
}
done:
s.closed = true
s.closed.Store(true)
time.Sleep(50 * time.Millisecond)
sentPackets := mock.GetSentPackets()

View File

@@ -9,6 +9,7 @@ import (
"io"
"net"
"sync"
"sync/atomic"
"time"
"erupe-ce/common/byteframe"
@@ -69,7 +70,7 @@ type Session struct {
// For Debuging
Name string
closed bool
closed atomic.Bool
ackStart map[uint32]time.Time
}
@@ -154,7 +155,7 @@ func (s *Session) QueueAck(ackHandle uint32, data []byte) {
func (s *Session) sendLoop() {
for {
if s.closed {
if s.closed.Load() {
return
}
// Send each packet individually with its own terminator
@@ -171,7 +172,7 @@ func (s *Session) sendLoop() {
func (s *Session) recvLoop() {
for {
if s.closed {
if s.closed.Load() {
logoutPlayer(s)
return
}
@@ -211,7 +212,7 @@ func (s *Session) handlePacketGroup(pktGroup []byte) {
s.logMessage(opcodeUint16, pktGroup, s.Name, "Server")
if opcode == network.MSG_SYS_LOGOUT {
s.closed = true
s.closed.Store(true)
return
}
// Get the packet parser and handler for this opcode.

View File

@@ -55,7 +55,6 @@ func createTestSession(mock network.Conn) *Session {
s := &Session{
logger: logger,
sendPackets: make(chan packet, 20),
closed: false,
cryptConn: mock,
server: &Server{
erupeConfig: &_config.Config{
@@ -115,7 +114,7 @@ func TestPacketQueueIndividualSending(t *testing.T) {
time.Sleep(100 * time.Millisecond)
// Stop the session
s.closed = true
s.closed.Store(true)
time.Sleep(50 * time.Millisecond)
// Verify packet count
@@ -162,7 +161,7 @@ func TestPacketQueueNoConcatenation(t *testing.T) {
s.sendPackets <- packet{packet3, true}
time.Sleep(100 * time.Millisecond)
s.closed = true
s.closed.Store(true)
time.Sleep(50 * time.Millisecond)
sentPackets := mock.GetSentPackets()
@@ -220,7 +219,7 @@ func TestQueueSendUsesQueue(t *testing.T) {
t.Errorf("expected 1 packet sent after sendLoop, got %d", mock.PacketCount())
}
s.closed = true
s.closed.Store(true)
}
// TestPacketTerminatorFormat verifies the exact terminator format
@@ -234,7 +233,7 @@ func TestPacketTerminatorFormat(t *testing.T) {
s.sendPackets <- packet{testData, true}
time.Sleep(100 * time.Millisecond)
s.closed = true
s.closed.Store(true)
time.Sleep(50 * time.Millisecond)
sentPackets := mock.GetSentPackets()
@@ -294,7 +293,7 @@ func TestQueueSendNonBlockingDropsOnFull(t *testing.T) {
t.Errorf("expected 2 packets in queue, got %d", len(s.sendPackets))
}
s.closed = true
s.closed.Store(true)
}
// TestPacketQueueAckFormat verifies ACK packet format
@@ -310,7 +309,7 @@ func TestPacketQueueAckFormat(t *testing.T) {
s.QueueAck(ackHandle, ackData)
time.Sleep(100 * time.Millisecond)
s.closed = true
s.closed.Store(true)
time.Sleep(50 * time.Millisecond)
sentPackets := mock.GetSentPackets()