using System.Buffers.Binary; using System.Net; using System.Net.Sockets; namespace KianaBH.KcpSharp.Base; /// /// An unreliable channel with a conversation ID. /// public sealed class KcpRawChannel : IKcpConversation, IKcpExceptionProducer { private readonly IKcpBufferPool _bufferPool; private readonly ulong? _id; private readonly int _mtu; private readonly int _postBufferSize; private readonly int _preBufferSize; private readonly KcpRawReceiveQueue _receiveQueue; private readonly IPEndPoint _remoteEndPoint; private readonly AsyncAutoResetEvent _sendNotification; private readonly KcpRawSendOperation _sendOperation; private readonly IKcpTransport _transport; private Func? _exceptionHandler; private object? _exceptionHandlerState; private CancellationTokenSource? _sendLoopCts; /// /// Construct a unreliable channel with a conversation ID. /// /// The remote Endpoint /// The underlying transport. /// The options of the . public KcpRawChannel(IPEndPoint remoteEndPoint, IKcpTransport transport, KcpRawChannelOptions? options) : this(remoteEndPoint, transport, null, options) { } /// /// Construct a unreliable channel with a conversation ID. /// /// The remote Endpoint /// The underlying transport. /// The conversation ID. /// The options of the . public KcpRawChannel(IPEndPoint remoteEndPoint, IKcpTransport transport, long conversationId, KcpRawChannelOptions? options) : this(remoteEndPoint, transport, (ulong)conversationId, options) { } private KcpRawChannel(IPEndPoint remoteEndPoint, IKcpTransport transport, ulong? conversationId, KcpRawChannelOptions? options) { _bufferPool = options?.BufferPool ?? DefaultArrayPoolBufferAllocator.Default; _remoteEndPoint = remoteEndPoint; _transport = transport; _id = conversationId; if (options is null) _mtu = KcpConversationOptions.MtuDefaultValue; else if (options.Mtu < 50) throw new ArgumentException("MTU must be at least 50.", nameof(options)); else _mtu = options.Mtu; _preBufferSize = options?.PreBufferSize ?? 0; _postBufferSize = options?.PostBufferSize ?? 0; if (_preBufferSize < 0) throw new ArgumentException("PreBufferSize must be a non-negative integer.", nameof(options)); if (_postBufferSize < 0) throw new ArgumentException("PostBufferSize must be a non-negative integer.", nameof(options)); if ((uint)(_preBufferSize + _postBufferSize) >= (uint)_mtu) throw new ArgumentException("The sum of PreBufferSize and PostBufferSize must be less than MTU.", nameof(options)); if (conversationId.HasValue && (uint)(_preBufferSize + _postBufferSize) >= (uint)(_mtu - 4)) throw new ArgumentException( "The sum of PreBufferSize and PostBufferSize is too large. There is not enough space in the packet for the conversation ID.", nameof(options)); var queueSize = options?.ReceiveQueueSize ?? 32; if (queueSize < 1) throw new ArgumentException("QueueSize must be a positive integer.", nameof(options)); _sendLoopCts = new CancellationTokenSource(); _sendNotification = new AsyncAutoResetEvent(); _receiveQueue = new KcpRawReceiveQueue(_bufferPool, queueSize); _sendOperation = new KcpRawSendOperation(_sendNotification); RunSendLoop(); } /// /// Get the ID of the current conversation. /// public long? ConversationId => (long?)_id; /// /// Get whether the transport is marked as closed. /// public bool TransportClosed => _sendLoopCts is null; /// public ValueTask InputPakcetAsync(UdpReceiveResult packet, CancellationToken cancellationToken = default) { ReadOnlySpan span = packet.Buffer.AsSpan(); var overhead = _id.HasValue ? KcpGlobalVars.CONVID_LENGTH : 0; if (span.Length < overhead || span.Length > _mtu) return default; if (_id.HasValue) { if (BinaryPrimitives.ReadUInt64BigEndian(span) != _id.GetValueOrDefault()) return default; span = span.Slice(8); } _receiveQueue.Enqueue(span); return default; } /// public void SetTransportClosed() { var cts = Interlocked.Exchange(ref _sendLoopCts, null); if (cts is not null) { cts.Cancel(); cts.Dispose(); } _receiveQueue.SetTransportClosed(); _sendOperation.SetTransportClosed(); _sendNotification.Set(0); } /// public void Dispose() { SetTransportClosed(); _receiveQueue.Dispose(); _sendOperation.Dispose(); } /// /// Set the handler to invoke when exception is thrown during flushing packets to the transport. Return true in the /// handler to ignore the error and continue running. Return false in the handler to abort the operation and mark the /// transport as closed. /// /// The exception handler. /// The state object to pass into the exception handler. public void SetExceptionHandler(Func handler, object? state) { if (handler is null) throw new ArgumentNullException(nameof(handler)); _exceptionHandler = handler; _exceptionHandlerState = state; } /// /// Send message to the underlying transport. /// /// The content of the message /// The token to cancel this operation. /// The size of the message is larger than mtu, thus it can not be sent. /// /// The is fired before send operation /// is completed. /// /// The send operation is initiated concurrently. /// The instance is disposed. /// /// A that completes when the entire message is put into the queue. The result /// of the task is false when the transport is closed. /// public ValueTask SendAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken = default) { return _sendOperation.SendAsync(buffer, cancellationToken); } /// /// Cancel the current send operation or flush operation. /// /// True if the current operation is canceled. False if there is no active send operation. public bool CancelPendingSend() { return _sendOperation.CancelPendingOperation(null, default); } /// /// Cancel the current send operation or flush operation. /// /// /// The inner exception of the thrown by the /// method. /// /// /// The in the /// thrown by the method. /// /// True if the current operation is canceled. False if there is no active send operation. public bool CancelPendingSend(Exception? innerException, CancellationToken cancellationToken) { return _sendOperation.CancelPendingOperation(innerException, cancellationToken); } private async void RunSendLoop() { var cancellationToken = _sendLoopCts?.Token ?? new CancellationToken(true); var sendOperation = _sendOperation; var ev = _sendNotification; var mss = _mtu - _preBufferSize - _postBufferSize; if (_id.HasValue) mss -= 8; try { while (!cancellationToken.IsCancellationRequested) { var payloadSize = await ev.WaitAsync().ConfigureAwait(false); if (cancellationToken.IsCancellationRequested) break; if (payloadSize < 0 || payloadSize > mss) { _ = sendOperation.TryConsume(default, out _); continue; } var overhead = _preBufferSize + _postBufferSize; if (_id.HasValue) overhead += 8; { using var owner = _bufferPool.Rent(new KcpBufferPoolRentOptions(payloadSize + overhead, true)); var memory = owner.Memory; // Fill the buffer if (_preBufferSize != 0) { memory.Span.Slice(0, _preBufferSize).Clear(); memory = memory.Slice(_preBufferSize); } if (_id.HasValue) { BinaryPrimitives.WriteUInt64LittleEndian(memory.Span, _id.GetValueOrDefault()); memory = memory.Slice(8); } if (!sendOperation.TryConsume(memory, out var bytesWritten)) continue; payloadSize = Math.Min(payloadSize, bytesWritten); memory = memory.Slice(payloadSize); if (_postBufferSize != 0) memory.Span.Slice(0, _postBufferSize).Clear(); // Send the buffer try { await _transport.SendPacketAsync(owner.Memory.Slice(0, payloadSize + overhead), _remoteEndPoint, cancellationToken).ConfigureAwait(false); } catch (Exception ex) { if (!HandleFlushException(ex)) break; } } } } catch (OperationCanceledException) { // Do nothing } catch (Exception ex) { HandleFlushException(ex); } } private bool HandleFlushException(Exception ex) { var handler = _exceptionHandler; var state = _exceptionHandlerState; var result = false; if (handler is not null) try { result = handler.Invoke(ex, this, state); } catch { result = false; } if (!result) SetTransportClosed(); return result; } /// /// Get the size of the next available message in the receive queue. /// /// The transport state and the size of the next available message. /// The receive or peek operation is initiated concurrently. /// /// True if the receive queue contains at least one message. False if the receive queue is empty or the transport /// is closed. /// public bool TryPeek(out KcpConversationReceiveResult result) { return _receiveQueue.TryPeek(out result); } /// /// Remove the next available message in the receive queue and copy its content into . /// /// The buffer to receive message. /// The transport state and the count of bytes moved into . /// /// The size of the next available message is larger than the size of /// . /// /// The receive or peek operation is initiated concurrently. /// /// True if the next available message is moved into . False if the receive queue is /// empty or the transport is closed. /// public bool TryReceive(Span buffer, out KcpConversationReceiveResult result) { return _receiveQueue.TryReceive(buffer, out result); } /// /// Wait until the receive queue contains at least one message. /// /// The token to cancel this operation. /// /// The is fired before receive /// operation is completed. /// /// The receive or peek operation is initiated concurrently. /// /// A that completes when the receive queue contains at /// least one full message, or at least one byte in stream mode. Its result contains the transport state and the size /// of the available message. /// public ValueTask WaitToReceiveAsync(CancellationToken cancellationToken) { return _receiveQueue.WaitToReceiveAsync(cancellationToken); } /// /// Wait for the next full message to arrive if the receive queue is empty. Remove the next available message in the /// receive queue and copy its content into . /// /// The buffer to receive message. /// The token to cancel this operation. /// /// The size of the next available message is larger than the size of /// . /// /// /// The is fired before send operation /// is completed. /// /// The receive or peek operation is initiated concurrently. /// /// A that completes when a message is moved into /// or the transport is closed. Its result contains the transport state and the count of /// bytes written into . /// public ValueTask ReceiveAsync(Memory buffer, CancellationToken cancellationToken = default) { return _receiveQueue.ReceiveAsync(buffer, cancellationToken); } /// /// Cancel the current receive operation. /// /// True if the current operation is canceled. False if there is no active send operation. public bool CancelPendingReceive() { return _receiveQueue.CancelPendingOperation(null, default); } /// /// Cancel the current send operation or flush operation. /// /// /// The inner exception of the thrown by the /// method or /// method. /// /// /// The in the /// thrown by the method or /// method. /// /// True if the current operation is canceled. False if there is no active send operation. public bool CancelPendingReceive(Exception? innerException, CancellationToken cancellationToken) { return _receiveQueue.CancelPendingOperation(innerException, cancellationToken); } }