mirror of
https://github.com/MikuLeaks/KianaBH3.git
synced 2025-12-13 13:24:34 +01:00
400 lines
17 KiB
C#
400 lines
17 KiB
C#
using System.Buffers.Binary;
|
|
using System.Net;
|
|
using System.Net.Sockets;
|
|
|
|
namespace KianaBH.KcpSharp.Base;
|
|
|
|
/// <summary>
|
|
/// An unreliable channel with a conversation ID.
|
|
/// </summary>
|
|
public sealed class KcpRawChannel : IKcpConversation, IKcpExceptionProducer<KcpRawChannel>
|
|
{
|
|
private readonly IKcpBufferPool _bufferPool;
|
|
private readonly ulong? _id;
|
|
private readonly int _mtu;
|
|
private readonly int _postBufferSize;
|
|
private readonly int _preBufferSize;
|
|
private readonly KcpRawReceiveQueue _receiveQueue;
|
|
private readonly IPEndPoint _remoteEndPoint;
|
|
private readonly AsyncAutoResetEvent<int> _sendNotification;
|
|
private readonly KcpRawSendOperation _sendOperation;
|
|
private readonly IKcpTransport _transport;
|
|
|
|
private Func<Exception, KcpRawChannel, object?, bool>? _exceptionHandler;
|
|
private object? _exceptionHandlerState;
|
|
|
|
private CancellationTokenSource? _sendLoopCts;
|
|
|
|
/// <summary>
|
|
/// Construct a unreliable channel with a conversation ID.
|
|
/// </summary>
|
|
/// <param name="remoteEndPoint">The remote Endpoint</param>
|
|
/// <param name="transport">The underlying transport.</param>
|
|
/// <param name="options">The options of the <see cref="KcpRawChannel" />.</param>
|
|
public KcpRawChannel(IPEndPoint remoteEndPoint, IKcpTransport transport, KcpRawChannelOptions? options)
|
|
: this(remoteEndPoint, transport, null, options)
|
|
{
|
|
}
|
|
|
|
/// <summary>
|
|
/// Construct a unreliable channel with a conversation ID.
|
|
/// </summary>
|
|
/// <param name="remoteEndPoint">The remote Endpoint</param>
|
|
/// <param name="transport">The underlying transport.</param>
|
|
/// <param name="conversationId">The conversation ID.</param>
|
|
/// <param name="options">The options of the <see cref="KcpRawChannel" />.</param>
|
|
public KcpRawChannel(IPEndPoint remoteEndPoint, IKcpTransport transport, long conversationId,
|
|
KcpRawChannelOptions? options)
|
|
: this(remoteEndPoint, transport, (ulong)conversationId, options)
|
|
{
|
|
}
|
|
|
|
private KcpRawChannel(IPEndPoint remoteEndPoint, IKcpTransport transport, ulong? conversationId,
|
|
KcpRawChannelOptions? options)
|
|
{
|
|
_bufferPool = options?.BufferPool ?? DefaultArrayPoolBufferAllocator.Default;
|
|
_remoteEndPoint = remoteEndPoint;
|
|
_transport = transport;
|
|
_id = conversationId;
|
|
|
|
if (options is null)
|
|
_mtu = KcpConversationOptions.MtuDefaultValue;
|
|
else if (options.Mtu < 50)
|
|
throw new ArgumentException("MTU must be at least 50.", nameof(options));
|
|
else
|
|
_mtu = options.Mtu;
|
|
|
|
_preBufferSize = options?.PreBufferSize ?? 0;
|
|
_postBufferSize = options?.PostBufferSize ?? 0;
|
|
if (_preBufferSize < 0)
|
|
throw new ArgumentException("PreBufferSize must be a non-negative integer.", nameof(options));
|
|
if (_postBufferSize < 0)
|
|
throw new ArgumentException("PostBufferSize must be a non-negative integer.", nameof(options));
|
|
if ((uint)(_preBufferSize + _postBufferSize) >= (uint)_mtu)
|
|
throw new ArgumentException("The sum of PreBufferSize and PostBufferSize must be less than MTU.",
|
|
nameof(options));
|
|
if (conversationId.HasValue && (uint)(_preBufferSize + _postBufferSize) >= (uint)(_mtu - 4))
|
|
throw new ArgumentException(
|
|
"The sum of PreBufferSize and PostBufferSize is too large. There is not enough space in the packet for the conversation ID.",
|
|
nameof(options));
|
|
|
|
var queueSize = options?.ReceiveQueueSize ?? 32;
|
|
if (queueSize < 1) throw new ArgumentException("QueueSize must be a positive integer.", nameof(options));
|
|
|
|
_sendLoopCts = new CancellationTokenSource();
|
|
_sendNotification = new AsyncAutoResetEvent<int>();
|
|
_receiveQueue = new KcpRawReceiveQueue(_bufferPool, queueSize);
|
|
_sendOperation = new KcpRawSendOperation(_sendNotification);
|
|
|
|
RunSendLoop();
|
|
}
|
|
|
|
/// <summary>
|
|
/// Get the ID of the current conversation.
|
|
/// </summary>
|
|
public long? ConversationId => (long?)_id;
|
|
|
|
/// <summary>
|
|
/// Get whether the transport is marked as closed.
|
|
/// </summary>
|
|
public bool TransportClosed => _sendLoopCts is null;
|
|
|
|
/// <inheritdoc />
|
|
public ValueTask InputPakcetAsync(UdpReceiveResult packet, CancellationToken cancellationToken = default)
|
|
{
|
|
ReadOnlySpan<byte> span = packet.Buffer.AsSpan();
|
|
var overhead = _id.HasValue ? KcpGlobalVars.CONVID_LENGTH : 0;
|
|
if (span.Length < overhead || span.Length > _mtu) return default;
|
|
if (_id.HasValue)
|
|
{
|
|
if (BinaryPrimitives.ReadUInt64BigEndian(span) != _id.GetValueOrDefault()) return default;
|
|
span = span.Slice(8);
|
|
}
|
|
|
|
_receiveQueue.Enqueue(span);
|
|
return default;
|
|
}
|
|
|
|
|
|
/// <inheritdoc />
|
|
public void SetTransportClosed()
|
|
{
|
|
var cts = Interlocked.Exchange(ref _sendLoopCts, null);
|
|
if (cts is not null)
|
|
{
|
|
cts.Cancel();
|
|
cts.Dispose();
|
|
}
|
|
|
|
_receiveQueue.SetTransportClosed();
|
|
_sendOperation.SetTransportClosed();
|
|
_sendNotification.Set(0);
|
|
}
|
|
|
|
/// <inheritdoc />
|
|
public void Dispose()
|
|
{
|
|
SetTransportClosed();
|
|
_receiveQueue.Dispose();
|
|
_sendOperation.Dispose();
|
|
}
|
|
|
|
/// <summary>
|
|
/// Set the handler to invoke when exception is thrown during flushing packets to the transport. Return true in the
|
|
/// handler to ignore the error and continue running. Return false in the handler to abort the operation and mark the
|
|
/// transport as closed.
|
|
/// </summary>
|
|
/// <param name="handler">The exception handler.</param>
|
|
/// <param name="state">The state object to pass into the exception handler.</param>
|
|
public void SetExceptionHandler(Func<Exception, KcpRawChannel, object?, bool> handler, object? state)
|
|
{
|
|
if (handler is null) throw new ArgumentNullException(nameof(handler));
|
|
|
|
_exceptionHandler = handler;
|
|
_exceptionHandlerState = state;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Send message to the underlying transport.
|
|
/// </summary>
|
|
/// <param name="buffer">The content of the message</param>
|
|
/// <param name="cancellationToken">The token to cancel this operation.</param>
|
|
/// <exception cref="ArgumentException">The size of the message is larger than mtu, thus it can not be sent.</exception>
|
|
/// <exception cref="OperationCanceledException">
|
|
/// The <paramref name="cancellationToken" /> is fired before send operation
|
|
/// is completed.
|
|
/// </exception>
|
|
/// <exception cref="InvalidOperationException">The send operation is initiated concurrently.</exception>
|
|
/// <exception cref="ObjectDisposedException">The <see cref="KcpConversation" /> instance is disposed.</exception>
|
|
/// <returns>
|
|
/// A <see cref="ValueTask{Boolean}" /> that completes when the entire message is put into the queue. The result
|
|
/// of the task is false when the transport is closed.
|
|
/// </returns>
|
|
public ValueTask<bool> SendAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default)
|
|
{
|
|
return _sendOperation.SendAsync(buffer, cancellationToken);
|
|
}
|
|
|
|
|
|
/// <summary>
|
|
/// Cancel the current send operation or flush operation.
|
|
/// </summary>
|
|
/// <returns>True if the current operation is canceled. False if there is no active send operation.</returns>
|
|
public bool CancelPendingSend()
|
|
{
|
|
return _sendOperation.CancelPendingOperation(null, default);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Cancel the current send operation or flush operation.
|
|
/// </summary>
|
|
/// <param name="innerException">
|
|
/// The inner exception of the <see cref="OperationCanceledException" /> thrown by the
|
|
/// <see cref="SendAsync(ReadOnlyMemory{byte}, CancellationToken)" /> method.
|
|
/// </param>
|
|
/// <param name="cancellationToken">
|
|
/// The <see cref="CancellationToken" /> in the <see cref="OperationCanceledException" />
|
|
/// thrown by the <see cref="SendAsync(ReadOnlyMemory{byte}, CancellationToken)" /> method.
|
|
/// </param>
|
|
/// <returns>True if the current operation is canceled. False if there is no active send operation.</returns>
|
|
public bool CancelPendingSend(Exception? innerException, CancellationToken cancellationToken)
|
|
{
|
|
return _sendOperation.CancelPendingOperation(innerException, cancellationToken);
|
|
}
|
|
|
|
|
|
private async void RunSendLoop()
|
|
{
|
|
var cancellationToken = _sendLoopCts?.Token ?? new CancellationToken(true);
|
|
var sendOperation = _sendOperation;
|
|
var ev = _sendNotification;
|
|
var mss = _mtu - _preBufferSize - _postBufferSize;
|
|
if (_id.HasValue) mss -= 8;
|
|
|
|
try
|
|
{
|
|
while (!cancellationToken.IsCancellationRequested)
|
|
{
|
|
var payloadSize = await ev.WaitAsync().ConfigureAwait(false);
|
|
if (cancellationToken.IsCancellationRequested) break;
|
|
|
|
if (payloadSize < 0 || payloadSize > mss)
|
|
{
|
|
_ = sendOperation.TryConsume(default, out _);
|
|
continue;
|
|
}
|
|
|
|
var overhead = _preBufferSize + _postBufferSize;
|
|
if (_id.HasValue) overhead += 8;
|
|
{
|
|
using var owner = _bufferPool.Rent(new KcpBufferPoolRentOptions(payloadSize + overhead, true));
|
|
var memory = owner.Memory;
|
|
|
|
// Fill the buffer
|
|
if (_preBufferSize != 0)
|
|
{
|
|
memory.Span.Slice(0, _preBufferSize).Clear();
|
|
memory = memory.Slice(_preBufferSize);
|
|
}
|
|
|
|
if (_id.HasValue)
|
|
{
|
|
BinaryPrimitives.WriteUInt64LittleEndian(memory.Span, _id.GetValueOrDefault());
|
|
memory = memory.Slice(8);
|
|
}
|
|
|
|
if (!sendOperation.TryConsume(memory, out var bytesWritten)) continue;
|
|
payloadSize = Math.Min(payloadSize, bytesWritten);
|
|
memory = memory.Slice(payloadSize);
|
|
if (_postBufferSize != 0) memory.Span.Slice(0, _postBufferSize).Clear();
|
|
|
|
// Send the buffer
|
|
try
|
|
{
|
|
await _transport.SendPacketAsync(owner.Memory.Slice(0, payloadSize + overhead), _remoteEndPoint,
|
|
cancellationToken).ConfigureAwait(false);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
if (!HandleFlushException(ex)) break;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
catch (OperationCanceledException)
|
|
{
|
|
// Do nothing
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
HandleFlushException(ex);
|
|
}
|
|
}
|
|
|
|
|
|
private bool HandleFlushException(Exception ex)
|
|
{
|
|
var handler = _exceptionHandler;
|
|
var state = _exceptionHandlerState;
|
|
var result = false;
|
|
if (handler is not null)
|
|
try
|
|
{
|
|
result = handler.Invoke(ex, this, state);
|
|
}
|
|
catch
|
|
{
|
|
result = false;
|
|
}
|
|
|
|
if (!result) SetTransportClosed();
|
|
return result;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Get the size of the next available message in the receive queue.
|
|
/// </summary>
|
|
/// <param name="result">The transport state and the size of the next available message.</param>
|
|
/// <exception cref="InvalidOperationException">The receive or peek operation is initiated concurrently.</exception>
|
|
/// <returns>
|
|
/// True if the receive queue contains at least one message. False if the receive queue is empty or the transport
|
|
/// is closed.
|
|
/// </returns>
|
|
public bool TryPeek(out KcpConversationReceiveResult result)
|
|
{
|
|
return _receiveQueue.TryPeek(out result);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Remove the next available message in the receive queue and copy its content into <paramref name="buffer" />.
|
|
/// </summary>
|
|
/// <param name="buffer">The buffer to receive message.</param>
|
|
/// <param name="result">The transport state and the count of bytes moved into <paramref name="buffer" />.</param>
|
|
/// <exception cref="ArgumentException">
|
|
/// The size of the next available message is larger than the size of
|
|
/// <paramref name="buffer" />.
|
|
/// </exception>
|
|
/// <exception cref="InvalidOperationException">The receive or peek operation is initiated concurrently.</exception>
|
|
/// <returns>
|
|
/// True if the next available message is moved into <paramref name="buffer" />. False if the receive queue is
|
|
/// empty or the transport is closed.
|
|
/// </returns>
|
|
public bool TryReceive(Span<byte> buffer, out KcpConversationReceiveResult result)
|
|
{
|
|
return _receiveQueue.TryReceive(buffer, out result);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Wait until the receive queue contains at least one message.
|
|
/// </summary>
|
|
/// <param name="cancellationToken">The token to cancel this operation.</param>
|
|
/// <exception cref="OperationCanceledException">
|
|
/// The <paramref name="cancellationToken" /> is fired before receive
|
|
/// operation is completed.
|
|
/// </exception>
|
|
/// <exception cref="InvalidOperationException">The receive or peek operation is initiated concurrently.</exception>
|
|
/// <returns>
|
|
/// A <see cref="ValueTask{KcpConversationReceiveResult}" /> that completes when the receive queue contains at
|
|
/// least one full message, or at least one byte in stream mode. Its result contains the transport state and the size
|
|
/// of the available message.
|
|
/// </returns>
|
|
public ValueTask<KcpConversationReceiveResult> WaitToReceiveAsync(CancellationToken cancellationToken)
|
|
{
|
|
return _receiveQueue.WaitToReceiveAsync(cancellationToken);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Wait for the next full message to arrive if the receive queue is empty. Remove the next available message in the
|
|
/// receive queue and copy its content into <paramref name="buffer" />.
|
|
/// </summary>
|
|
/// <param name="buffer">The buffer to receive message.</param>
|
|
/// <param name="cancellationToken">The token to cancel this operation.</param>
|
|
/// <exception cref="ArgumentException">
|
|
/// The size of the next available message is larger than the size of
|
|
/// <paramref name="buffer" />.
|
|
/// </exception>
|
|
/// <exception cref="OperationCanceledException">
|
|
/// The <paramref name="cancellationToken" /> is fired before send operation
|
|
/// is completed.
|
|
/// </exception>
|
|
/// <exception cref="InvalidOperationException">The receive or peek operation is initiated concurrently.</exception>
|
|
/// <returns>
|
|
/// A <see cref="ValueTask{KcpConversationReceiveResult}" /> that completes when a message is moved into
|
|
/// <paramref name="buffer" /> or the transport is closed. Its result contains the transport state and the count of
|
|
/// bytes written into <paramref name="buffer" />.
|
|
/// </returns>
|
|
public ValueTask<KcpConversationReceiveResult> ReceiveAsync(Memory<byte> buffer,
|
|
CancellationToken cancellationToken = default)
|
|
{
|
|
return _receiveQueue.ReceiveAsync(buffer, cancellationToken);
|
|
}
|
|
|
|
|
|
/// <summary>
|
|
/// Cancel the current receive operation.
|
|
/// </summary>
|
|
/// <returns>True if the current operation is canceled. False if there is no active send operation.</returns>
|
|
public bool CancelPendingReceive()
|
|
{
|
|
return _receiveQueue.CancelPendingOperation(null, default);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Cancel the current send operation or flush operation.
|
|
/// </summary>
|
|
/// <param name="innerException">
|
|
/// The inner exception of the <see cref="OperationCanceledException" /> thrown by the
|
|
/// <see cref="ReceiveAsync(Memory{byte}, CancellationToken)" /> method or
|
|
/// <see cref="WaitToReceiveAsync(CancellationToken)" /> method.
|
|
/// </param>
|
|
/// <param name="cancellationToken">
|
|
/// The <see cref="CancellationToken" /> in the <see cref="OperationCanceledException" />
|
|
/// thrown by the <see cref="ReceiveAsync(Memory{byte}, CancellationToken)" /> method or
|
|
/// <see cref="WaitToReceiveAsync(CancellationToken)" /> method.
|
|
/// </param>
|
|
/// <returns>True if the current operation is canceled. False if there is no active send operation.</returns>
|
|
public bool CancelPendingReceive(Exception? innerException, CancellationToken cancellationToken)
|
|
{
|
|
return _receiveQueue.CancelPendingOperation(innerException, cancellationToken);
|
|
}
|
|
} |