using System.Buffers.Binary;
using System.Collections.Concurrent;
using System.Net;
using System.Net.Sockets;
namespace KianaBH.KcpSharp.Base;
///
/// Multiplex many channels or conversations over the same transport.
///
/// The state of the channel.
public sealed class KcpMultiplexConnection : IKcpTransport, IKcpConversation, IKcpMultiplexConnection
{
private readonly ConcurrentDictionary _conversations = new();
private readonly Action? _disposeAction;
private readonly IKcpTransport _transport;
private bool _disposed;
private bool _transportClosed;
///
/// Construct a multiplexed connection over a transport.
///
/// The underlying transport.
public KcpMultiplexConnection(IKcpTransport transport)
{
_transport = transport ?? throw new ArgumentNullException(nameof(transport));
_disposeAction = null;
}
///
/// Construct a multiplexed connection over a transport.
///
/// The underlying transport.
/// The action to invoke when state object is removed.
public KcpMultiplexConnection(IKcpTransport transport, Action? disposeAction)
{
_transport = transport ?? throw new ArgumentNullException(nameof(transport));
_disposeAction = disposeAction;
}
///
/// Process a newly received packet from the transport.
///
/// The content of the packet with conversation ID.
/// A token to cancel this operation.
///
/// A that completes when the packet is handled by the corresponding channel or
/// conversation.
///
public ValueTask InputPakcetAsync(UdpReceiveResult packet, CancellationToken cancellationToken = default)
{
ReadOnlySpan span = packet.Buffer.AsSpan();
if (span.Length < KcpGlobalVars.CONVID_LENGTH) return default;
if (_transportClosed || _disposed) return default;
var id = BinaryPrimitives.ReadInt64BigEndian(span);
if (_conversations.TryGetValue(id, out var value))
return value.Conversation.InputPakcetAsync(packet, cancellationToken);
return default;
}
///
public void SetTransportClosed()
{
_transportClosed = true;
foreach (var (conversation, _) in _conversations.Values) conversation.SetTransportClosed();
}
///
public void Dispose()
{
if (_disposed) return;
_transportClosed = true;
_disposed = true;
while (!_conversations.IsEmpty)
foreach (var id in _conversations.Keys)
if (_conversations.TryRemove(id, out var value))
{
value.Conversation.Dispose();
if (_disposeAction is not null) _disposeAction.Invoke(value.State);
}
}
///
/// Determine whether the multiplex connection contains a conversation with the specified id.
///
/// The conversation ID.
/// True if the multiplex connection contains the specified conversation. Otherwise false.
public bool Contains(long id)
{
CheckDispose();
return _conversations.ContainsKey(id);
}
///
/// Create a raw channel with the specified conversation ID.
///
/// The conversation ID.
/// The remote Endpoint
/// The options of the .
/// The raw channel created.
/// The current instance is disposed.
/// Another channel or conversation with the same ID was already registered.
public KcpRawChannel CreateRawChannel(long id, IPEndPoint remoteEndpoint, KcpRawChannelOptions? options = null)
{
KcpRawChannel? channel = new(remoteEndpoint, this, id, options);
try
{
RegisterConversation(channel, id, default);
if (_transportClosed) channel.SetTransportClosed();
return Interlocked.Exchange(ref channel, null)!;
}
finally
{
if (channel is not null) channel.Dispose();
}
}
///
/// Create a raw channel with the specified conversation ID.
///
/// The conversation ID.
/// The remote Endpoint
/// The user state of this channel.
/// The options of the .
/// The raw channel created.
/// The current instance is disposed.
/// Another channel or conversation with the same ID was already registered.
public KcpRawChannel CreateRawChannel(long id, IPEndPoint remoteEndpoint, T state,
KcpRawChannelOptions? options = null)
{
KcpRawChannel? channel = new(remoteEndpoint, this, id, options);
try
{
RegisterConversation(channel, id, state);
if (_transportClosed) channel.SetTransportClosed();
return Interlocked.Exchange(ref channel, null)!;
}
finally
{
if (channel is not null) channel.Dispose();
}
}
///
/// Create a conversation with the specified conversation ID.
///
/// The conversation ID.
/// The remote Endpoint
/// The options of the .
/// The KCP conversation created.
/// The current instance is disposed.
/// Another channel or conversation with the same ID was already registered.
public KcpConversation CreateConversation(long id, IPEndPoint remoteEndpoint,
KcpConversationOptions? options = null)
{
KcpConversation? conversation = new(remoteEndpoint, this, id, options);
try
{
RegisterConversation(conversation, id, default);
if (_transportClosed) conversation.SetTransportClosed();
return Interlocked.Exchange(ref conversation, null)!;
}
finally
{
if (conversation is not null) conversation.Dispose();
}
}
///
/// Create a conversation with the specified conversation ID.
///
/// The conversation ID.
/// The remote Endpoint
/// The user state of this conversation.
/// The options of the .
/// The KCP conversation created.
/// The current instance is disposed.
/// Another channel or conversation with the same ID was already registered.
public KcpConversation CreateConversation(long id, IPEndPoint remoteEndpoint, T state,
KcpConversationOptions? options = null)
{
KcpConversation? conversation = new(remoteEndpoint, this, id, options);
try
{
RegisterConversation(conversation, id, state);
if (_transportClosed) conversation.SetTransportClosed();
return Interlocked.Exchange(ref conversation, null)!;
}
finally
{
if (conversation is not null) conversation.Dispose();
}
}
///
/// Register a conversation or channel with the specified conversation ID and user state.
///
/// The conversation or channel to register.
/// The conversation ID.
/// is not provided.
/// The current instance is disposed.
/// Another channel or conversation with the same ID was already registered.
public void RegisterConversation(IKcpConversation conversation, long id)
{
RegisterConversation(conversation, id, default);
}
///
/// Register a conversation or channel with the specified conversation ID and user state.
///
/// The conversation or channel to register.
/// The conversation ID.
/// The user state
/// is not provided.
/// The current instance is disposed.
/// Another channel or conversation with the same ID was already registered.
public void RegisterConversation(IKcpConversation conversation, long id, T? state)
{
if (conversation is null) throw new ArgumentNullException(nameof(conversation));
CheckDispose();
var (addedConversation, _) = _conversations.GetOrAdd(id, (conversation, state));
if (!ReferenceEquals(addedConversation, conversation))
throw new InvalidOperationException("Duplicated conversation.");
if (_disposed)
{
if (_conversations.TryRemove(id, out var value) && _disposeAction is not null)
_disposeAction.Invoke(value.State);
ThrowObjectDisposedException();
}
}
///
/// Unregister a conversation or channel with the specified conversation ID.
///
/// The conversation ID.
/// The conversation unregistered. Returns null when the conversation with the specified ID is not found.
public IKcpConversation? UnregisterConversation(long id)
{
return UnregisterConversation(id, out _);
}
///
/// Unregister a conversation or channel with the specified conversation ID.
///
/// The conversation ID.
/// The user state.
/// The conversation unregistered. Returns null when the conversation with the specified ID is not found.
public IKcpConversation? UnregisterConversation(long id, out T? state)
{
if (!_transportClosed && !_disposed && _conversations.TryRemove(id, out var value))
{
value.Conversation.SetTransportClosed();
state = value.State;
if (_disposeAction is not null) _disposeAction.Invoke(state);
return value.Conversation;
}
state = default;
return default;
}
///
public ValueTask SendPacketAsync(Memory packet, IPEndPoint remoteEndpoint,
CancellationToken cancellationToken = default)
{
if (_transportClosed || _disposed) return default;
return _transport.SendPacketAsync(packet, remoteEndpoint, cancellationToken);
}
private void CheckDispose()
{
if (_disposed) ThrowObjectDisposedException();
}
private static void ThrowObjectDisposedException()
{
throw new ObjectDisposedException(nameof(KcpMultiplexConnection));
}
}