#if NEED_LINKEDLIST_SHIM using LinkedListOfBufferItem = KcpSharp.NetstandardShim.LinkedList; using LinkedListNodeOfBufferItem = KcpSharp.NetstandardShim.LinkedListNode; #else using LinkedListOfBufferItem = System.Collections.Generic.LinkedList; using LinkedListNodeOfBufferItem = System.Collections.Generic.LinkedListNode; #endif using System.Buffers.Binary; using System.Net; using System.Net.Sockets; using System.Runtime.CompilerServices; using KianaBH.Util; namespace KianaBH.KcpSharp.Base; /// /// A reliable channel over an unreliable transport implemented in KCP protocol. /// public sealed partial class KcpConversation : IKcpConversation, IKcpExceptionProducer { private readonly IKcpBufferPool _bufferPool; private readonly IKcpTransport _transport; private readonly IPEndPoint _remoteEndPoint; private readonly ulong? _id; private readonly int _mtu; private readonly int _mss; private readonly int _preBufferSize; private readonly int _postBufferSize; private uint _snd_una; private uint _snd_nxt; private uint _rcv_nxt; private uint _ssthresh; private int _rx_rttval; private int _rx_srtt; private uint _rx_rto; private readonly uint _rx_minrto; private readonly uint _snd_wnd; private readonly uint _rcv_wnd; private uint _rmt_wnd; private uint _cwnd; private KcpProbeType _probe; private SpinLock _cwndUpdateLock; private readonly uint _interval; private uint _ts_flush; private readonly bool _nodelay; private uint _ts_probe; private uint _probe_wait; private uint _incr; private readonly KcpSendReceiveQueueItemCache _queueItemCache; private readonly KcpSendQueue _sendQueue; private readonly KcpReceiveQueue _receiveQueue; private readonly LinkedListOfBufferItem _sndBuf = new(); private readonly LinkedListOfBufferItem _rcvBuf = new(); private KcpSendReceiveBufferItemCache _cache = KcpSendReceiveBufferItemCache.Create(); private readonly KcpAcknowledgeList _ackList; private readonly int _fastresend; private readonly int _fastlimit; private readonly bool _nocwnd; private readonly bool _keepAliveEnabled; private readonly uint _keepAliveInterval; private readonly uint _keepAliveGracePeriod; private uint _lastReceiveTick; private uint _lastSendTick; private readonly KcpReceiveWindowNotificationOptions? _receiveWindowNotificationOptions; private uint _ts_rcv_notify; private uint _ts_rcv_notify_wait; private KcpConversationUpdateActivation? _updateActivation; private CancellationTokenSource? _updateLoopCts; private bool _disposed; private Func? _exceptionHandler; private object? _exceptionHandlerState; private const uint IKCP_RTO_MAX = 60000; private const int IKCP_THRESH_MIN = 2; private const uint IKCP_PROBE_INIT = 7000; // 7 secs to probe window size private const uint IKCP_PROBE_LIMIT = 120000; // up to 120 secs to probe window /// /// Construct a reliable channel using KCP protocol. /// /// The remote endpoint /// The underlying transport. /// The options of the . public KcpConversation(IPEndPoint remoteEndpoint, IKcpTransport transport, KcpConversationOptions? options) : this(remoteEndpoint, transport, null, options) { } /// /// Construct a reliable channel using KCP protocol. /// /// The remote endpoint /// The underlying transport. /// The conversation ID. /// The options of the . public KcpConversation(IPEndPoint remoteEndpoint, IKcpTransport transport, long conversationId, KcpConversationOptions? options) : this(remoteEndpoint, transport, (ulong)conversationId, options) { } private KcpConversation(IPEndPoint remoteEndpoint, IKcpTransport transport, ulong? conversationId, KcpConversationOptions? options) { _bufferPool = options?.BufferPool ?? DefaultArrayPoolBufferAllocator.Default; _transport = transport; _remoteEndPoint = remoteEndpoint; _id = conversationId; if (options is null) _mtu = KcpConversationOptions.MtuDefaultValue; else if (options.Mtu < 50) throw new ArgumentException("MTU must be at least 50.", nameof(options)); else _mtu = options.Mtu; _preBufferSize = options?.PreBufferSize ?? 0; _postBufferSize = options?.PostBufferSize ?? 0; if (_preBufferSize < 0) throw new ArgumentException("PreBufferSize must be a non-negative integer.", nameof(options)); if (_postBufferSize < 0) throw new ArgumentException("PostBufferSize must be a non-negative integer.", nameof(options)); if ((uint)(_preBufferSize + _postBufferSize) >= (uint)(_mtu - KcpGlobalVars.HEADER_LENGTH_WITHOUT_CONVID)) throw new ArgumentException( "The sum of PreBufferSize and PostBufferSize is too large. There is not enough space in the packet for the KCP header.", nameof(options)); if (conversationId.HasValue && (uint)(_preBufferSize + _postBufferSize) >= (uint)(_mtu - KcpGlobalVars.HEADER_LENGTH_WITH_CONVID)) throw new ArgumentException( "The sum of PreBufferSize and PostBufferSize is too large. There is not enough space in the packet for the KCP header.", nameof(options)); _mss = conversationId.HasValue ? _mtu - KcpGlobalVars.HEADER_LENGTH_WITH_CONVID : _mtu - KcpGlobalVars.HEADER_LENGTH_WITHOUT_CONVID; _mss = _mss - _preBufferSize - _postBufferSize; _ssthresh = 2; _nodelay = options is not null && options.NoDelay; if (_nodelay) { _rx_minrto = 30; } else { _rx_rto = 200; _rx_minrto = 100; } _snd_wnd = options is null || options.SendWindow <= 0 ? KcpConversationOptions.SendWindowDefaultValue : (uint)options.SendWindow; _rcv_wnd = options is null || options.ReceiveWindow <= 0 ? KcpConversationOptions.ReceiveWindowDefaultValue : (uint)options.ReceiveWindow; _rmt_wnd = options is null || options.RemoteReceiveWindow <= 0 ? KcpConversationOptions.RemoteReceiveWindowDefaultValue : (uint)options.RemoteReceiveWindow; _rcv_nxt = 0; _interval = options is null || options.UpdateInterval < 10 ? KcpConversationOptions.UpdateIntervalDefaultValue : (uint)options.UpdateInterval; _fastresend = options is null ? 0 : options.FastResend; _fastlimit = 5; _nocwnd = options is not null && options.DisableCongestionControl; StreamMode = options is not null && options.StreamMode; _updateActivation = new KcpConversationUpdateActivation((int)_interval); _queueItemCache = new KcpSendReceiveQueueItemCache(); _sendQueue = new KcpSendQueue(_bufferPool, _updateActivation, StreamMode, options is null || options.SendQueueSize <= 0 ? KcpConversationOptions.SendQueueSizeDefaultValue : options.SendQueueSize, _mss, _queueItemCache); _receiveQueue = new KcpReceiveQueue(StreamMode, options is null || options.ReceiveQueueSize <= 0 ? KcpConversationOptions.ReceiveQueueSizeDefaultValue : options.ReceiveQueueSize, _queueItemCache); _ackList = new KcpAcknowledgeList(_sendQueue, (int)_snd_wnd); _updateLoopCts = new CancellationTokenSource(); _ts_flush = GetTimestamp(); _lastSendTick = _ts_flush; _lastReceiveTick = _ts_flush; var keepAliveOptions = options?.KeepAliveOptions; if (keepAliveOptions is not null) { _keepAliveEnabled = true; _keepAliveInterval = (uint)keepAliveOptions.SendInterval; _keepAliveGracePeriod = (uint)keepAliveOptions.GracePeriod; } _receiveWindowNotificationOptions = options?.ReceiveWindowNotificationOptions; if (_receiveWindowNotificationOptions is not null) { _ts_rcv_notify_wait = 0; _ts_rcv_notify = _ts_flush + (uint)_receiveWindowNotificationOptions.InitialInterval; } RunUpdateOnActivation(); } /// /// Set the handler to invoke when exception is thrown during flushing packets to the transport. Return true in the /// handler to ignore the error and continue running. Return false in the handler to abort the operation and mark the /// transport as closed. /// /// The exception handler. /// The state object to pass into the exception handler. public void SetExceptionHandler(Func handler, object? state) { if (handler is null) throw new ArgumentNullException(nameof(handler)); _exceptionHandler = handler; _exceptionHandlerState = state; } /// /// Get the ID of the current conversation. /// public long? ConversationId => (long?)_id; /// /// Get whether the transport is marked as closed. /// public bool TransportClosed { get; private set; } /// /// Get whether the conversation is in stream mode. /// public bool StreamMode { get; } /// /// Get the available byte count and available segment count in the send queue. /// /// The available byte count in the send queue. /// The available segment count in the send queue. /// True if the transport is not closed. Otherwise false. public bool TryGetSendQueueAvailableSpace(out int byteCount, out int segmentCount) { return _sendQueue.TryGetAvailableSpace(out byteCount, out segmentCount); } /// /// Try to put message into the send queue. /// /// The content of the message. /// /// True if the message is put into the send queue. False if the message is too large to fit in the send queue, or /// the transport is closed. /// /// /// The size of the message is larger than 256 * mtu, thus it can not be correctly /// fragmented and sent. This exception is never thrown in stream mode. /// /// The send or flush operation is initiated concurrently. public bool TrySend(ReadOnlySpan buffer) { return _sendQueue.TrySend(buffer, false, out _); } /// /// Try to put message into the send queue. /// /// The content of the message. /// /// Whether partial sending is allowed in stream mode. This must not be true in non-stream /// mode. /// /// /// The number of bytes put into the send queue. This is always the same as the size of the /// unless is set to true. /// /// /// True if the message is put into the send queue. False if the message is too large to fit in the send queue, or /// the transport is closed. /// /// /// is set to true in non-stream mode. Or the size /// of the message is larger than 256 * mtu, thus it can not be correctly fragmented and sent. This exception is never /// thrown in stream mode. /// /// The send or flush operation is initiated concurrently. public bool TrySend(ReadOnlySpan buffer, bool allowPartialSend, out int bytesWritten) { return _sendQueue.TrySend(buffer, allowPartialSend, out bytesWritten); } /// /// Wait until the send queue contains at least bytes of free space, and also /// available segments. /// /// The number of bytes in the available space. /// The count of segments in the available space. /// The token to cancel this operation. /// /// or /// is larger than the total space of the send queue. /// /// /// The is fired before send operation /// is completed. Or is called before this operation is /// completed. /// /// /// A that completes when there is enough space in the send queue. The result of /// the task is false when the transport is closed. /// public ValueTask WaitForSendQueueAvailableSpaceAsync(int minimumBytes, int minimumSegments, CancellationToken cancellationToken = default) { return _sendQueue.WaitForAvailableSpaceAsync(minimumBytes, minimumSegments, cancellationToken); } /// /// Put message into the send queue. /// /// The content of the message. /// The token to cancel this operation. /// /// The size of the message is larger than 256 * mtu, thus it can not be correctly /// fragmented and sent. This exception is never thrown in stream mode. /// /// /// The is fired before send operation /// is completed. Or is called before this operation is /// completed. /// /// The send or flush operation is initiated concurrently. /// /// A that completes when the entire message is put into the queue. The result /// of the task is false when the transport is closed. /// public ValueTask SendAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken = default) { return _sendQueue.SendAsync(buffer, cancellationToken); } internal ValueTask WriteAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken) { return _sendQueue.WriteAsync(buffer, cancellationToken); } /// /// Cancel the current send operation or flush operation. /// /// True if the current operation is canceled. False if there is no active send operation. public bool CancelPendingSend() { return _sendQueue.CancelPendingOperation(null, default); } /// /// Cancel the current send operation or flush operation. /// /// /// The inner exception of the thrown by the /// method or /// method. /// /// /// The in the /// thrown by the method or /// method. /// /// True if the current operation is canceled. False if there is no active send operation. public bool CancelPendingSend(Exception? innerException, CancellationToken cancellationToken) { return _sendQueue.CancelPendingOperation(innerException, cancellationToken); } /// /// Gets the count of bytes not yet sent to the remote host or not acknowledged by the remote host. /// public long UnflushedBytes => _sendQueue.GetUnflushedBytes(); /// /// Wait until all messages are sent and acknowledged by the remote host, as well as all the acknowledgements are sent. /// /// The token to cancel this operation. /// /// The is fired before send operation /// is completed. Or is called before this operation is /// completed. /// /// The send or flush operation is initiated concurrently. /// The instance is disposed. /// /// A that completes when the all messages are sent and acknowledged. The result /// of the task is false when the transport is closed. /// public ValueTask FlushAsync(CancellationToken cancellationToken = default) { return _sendQueue.FlushAsync(cancellationToken); } internal ValueTask FlushForStreamAsync(CancellationToken cancellationToken) { return _sendQueue.FlushForStreamAsync(cancellationToken); } #if !NET6_0_OR_GREATER private ValueTask FlushCoreAsync(CancellationToken cancellationToken) => new ValueTask(FlushCore2Async(cancellationToken)); private async Task FlushCore2Async(CancellationToken cancellationToken) #else private ValueTask FlushCoreAsync(CancellationToken cancellationToken) { s_currentObject = this; return FlushCore2Async(cancellationToken); } [AsyncMethodBuilder(typeof(KcpFlushAsyncMethodBuilder))] private async ValueTask FlushCore2Async(CancellationToken cancellationToken) #endif { var preBufferSize = _preBufferSize; var postBufferSize = _postBufferSize; int packetHeaderSize = _id.HasValue ? KcpGlobalVars.HEADER_LENGTH_WITH_CONVID : KcpGlobalVars.HEADER_LENGTH_WITHOUT_CONVID; var sizeLimitBeforePostBuffer = _mtu - _postBufferSize; var anyPacketSent = false; var windowSize = (ushort)GetUnusedReceiveWindow(); var unacknowledged = _rcv_nxt; using var bufferOwner = _bufferPool.Rent(new KcpBufferPoolRentOptions(_mtu + (_mtu - preBufferSize - postBufferSize), true)); var buffer = bufferOwner.Memory; var size = preBufferSize; buffer.Span.Slice(0, size).Clear(); // flush acknowledges { var index = 0; while (_ackList.TryGetAt(index++, out var serialNumber, out var timestamp)) { if (size + packetHeaderSize > sizeLimitBeforePostBuffer) { buffer.Span.Slice(size, postBufferSize).Clear(); await _transport .SendPacketAsync(buffer.Slice(0, size + postBufferSize), _remoteEndPoint, cancellationToken) .ConfigureAwait(false); _lastSendTick = GetTimestamp(); size = preBufferSize; buffer.Span.Slice(0, size).Clear(); anyPacketSent = true; } KcpPacketHeader header = new(KcpCommand.Ack, 0, windowSize, timestamp, serialNumber, unacknowledged); header.EncodeHeader(_id, 0, buffer.Span.Slice(size), out var bytesWritten); size += bytesWritten; } } var current = GetTimestamp(); // calculate window size var cwnd = Math.Min(_snd_wnd, _rmt_wnd); if (!_nocwnd) cwnd = Math.Min(_cwnd, cwnd); // move data from snd_queue to snd_buf while (TimeDiff(_snd_nxt, _snd_una + cwnd) < 0) { if (!_sendQueue.TryDequeue(out var data, out var fragment)) break; lock (_sndBuf) { if (TransportClosed) { data.Release(); return; } _sndBuf.AddLast(CreateSendBufferItem(in data, fragment, current, windowSize, (uint)Interlocked.Increment(ref Unsafe.As(ref _snd_nxt)) - 1, unacknowledged, _rx_rto)); } } // calculate resent var resent = _fastresend > 0 ? (uint)_fastresend : 0xffffffff; var rtomin = !_nodelay ? _rx_rto >> 3 : 0; // flush data segments var lost = false; var change = false; var segmentNode = _sndBuf.First; while (segmentNode is not null && !TransportClosed) { var nextSegmentNode = segmentNode.Next; var needsend = false; var stats = segmentNode.ValueRef.Stats; if (segmentNode.ValueRef.Stats.TransmitCount == 0) { needsend = true; segmentNode.ValueRef.Stats = new KcpSendSegmentStats(current + segmentNode.ValueRef.Stats.Rto + rtomin, _rx_rto, stats.FastAck, stats.TransmitCount + 1); } else if (TimeDiff(current, stats.ResendTimestamp) >= 0) { needsend = true; var rto = stats.Rto; if (!_nodelay) { rto += Math.Max(stats.Rto, _rx_rto); } else { var step = rto; //_nodelay < 2 ? segment.rto : _rx_rto; rto += step / 2; } segmentNode.ValueRef.Stats = new KcpSendSegmentStats(current + rto, rto, stats.FastAck, stats.TransmitCount + 1); lost = true; } else if (stats.FastAck > resent) { if (stats.TransmitCount <= _fastlimit || _fastlimit == 0) { needsend = true; segmentNode.ValueRef.Stats = new KcpSendSegmentStats(current, stats.Rto, 0, stats.TransmitCount + 1); change = true; } } if (needsend) { var header = DeplicateHeader(ref segmentNode.ValueRef.Segment, current, windowSize, unacknowledged); var need = packetHeaderSize + segmentNode.ValueRef.Data.Length; if (size + need > sizeLimitBeforePostBuffer) { buffer.Span.Slice(size, postBufferSize).Clear(); await _transport .SendPacketAsync(buffer.Slice(0, size + postBufferSize), _remoteEndPoint, cancellationToken) .ConfigureAwait(false); _lastSendTick = GetTimestamp(); size = preBufferSize; buffer.Span.Slice(0, size).Clear(); anyPacketSent = true; } lock (segmentNode) { var data = segmentNode.ValueRef.Data; if (!TransportClosed) { header.EncodeHeader(_id, data.Length, buffer.Span.Slice(size), out var bytesWritten); size += bytesWritten; if (data.Length > 0) { data.DataRegion.CopyTo(buffer.Slice(size)); size += data.Length; } } } } segmentNode = nextSegmentNode; } _ackList.Clear(); // probe window size (if remote window size equals zero) if (_rmt_wnd == 0) { if (_probe_wait == 0) { _probe_wait = IKCP_PROBE_INIT; _ts_probe = current + _probe_wait; } else { if (TimeDiff(current, _ts_probe) >= 0) { if (_probe_wait < IKCP_PROBE_INIT) _probe_wait = IKCP_PROBE_INIT; _probe_wait += _probe_wait / 2; if (_probe_wait > IKCP_PROBE_LIMIT) _probe_wait = IKCP_PROBE_LIMIT; _ts_probe = current + _probe_wait; _probe |= KcpProbeType.AskSend; } } } else { _ts_probe = 0; _probe_wait = 0; } // flush window probing command if ((_probe & KcpProbeType.AskSend) != 0) { if (size + packetHeaderSize > sizeLimitBeforePostBuffer) { buffer.Span.Slice(size, postBufferSize).Clear(); await _transport .SendPacketAsync(buffer.Slice(0, size + postBufferSize), _remoteEndPoint, cancellationToken) .ConfigureAwait(false); _lastSendTick = GetTimestamp(); size = preBufferSize; buffer.Span.Slice(0, size).Clear(); anyPacketSent = true; } KcpPacketHeader header = new(KcpCommand.WindowProbe, 0, windowSize, 0, 0, unacknowledged); header.EncodeHeader(_id, 0, buffer.Span.Slice(size), out var bytesWritten); size += bytesWritten; } // flush window probing response if (!anyPacketSent && ShouldSendWindowSize(current)) { if (size + packetHeaderSize > sizeLimitBeforePostBuffer) { buffer.Span.Slice(size, postBufferSize).Clear(); await _transport .SendPacketAsync(buffer.Slice(0, size + postBufferSize), _remoteEndPoint, cancellationToken) .ConfigureAwait(false); _lastSendTick = GetTimestamp(); size = preBufferSize; buffer.Span.Slice(0, size).Clear(); } KcpPacketHeader header = new(KcpCommand.WindowSize, 0, windowSize, 0, 0, unacknowledged); header.EncodeHeader(_id, 0, buffer.Span.Slice(size), out var bytesWritten); size += bytesWritten; } _probe = KcpProbeType.None; // flush remaining segments if (size > preBufferSize) { buffer.Span.Slice(size, postBufferSize).Clear(); try { await _transport .SendPacketAsync(buffer.Slice(0, size + postBufferSize), _remoteEndPoint, cancellationToken) .ConfigureAwait(false); } catch (Exception ex) { new Logger("KcpServer").Error("transport send error", ex); } _lastSendTick = GetTimestamp(); } // update window var lockTaken = false; try { _cwndUpdateLock.Enter(ref lockTaken); var updatedCwnd = _cwnd; var incr = _incr; // update sshthresh if (change) { var inflight = _snd_nxt - _snd_una; _ssthresh = Math.Max(inflight / 2, IKCP_THRESH_MIN); updatedCwnd = _ssthresh + resent; incr = updatedCwnd * (uint)_mss; } if (lost) { _ssthresh = Math.Max(cwnd / 2, IKCP_THRESH_MIN); updatedCwnd = 1; incr = (uint)_mss; } if (updatedCwnd < 1) { updatedCwnd = 1; incr = (uint)_mss; } _cwnd = updatedCwnd; _incr = incr; } finally { if (lockTaken) _cwndUpdateLock.Exit(); } // send keep-alive if (_keepAliveEnabled) if (TimeDiff(GetTimestamp(), _lastSendTick) > _keepAliveInterval) { KcpPacketHeader header = new(KcpCommand.WindowSize, 0, windowSize, 0, 0, unacknowledged); header.EncodeHeader(_id, 0, buffer.Span, out var bytesWritten); await _transport.SendPacketAsync(buffer.Slice(0, bytesWritten), _remoteEndPoint, cancellationToken) .ConfigureAwait(false); _lastSendTick = GetTimestamp(); } } private bool ShouldSendWindowSize(uint current) { if ((_probe & KcpProbeType.AskTell) != 0) return true; var options = _receiveWindowNotificationOptions; if (options is null) return false; if (TimeDiff(current, _ts_rcv_notify) < 0) return false; var inital = (uint)options.InitialInterval; var maximum = (uint)options.MaximumInterval; if (_ts_rcv_notify_wait < inital) _ts_rcv_notify_wait = inital; else if (_ts_rcv_notify_wait >= maximum) _ts_rcv_notify_wait = maximum; else _ts_rcv_notify_wait = Math.Min(maximum, _ts_rcv_notify_wait + _ts_rcv_notify_wait / 2); _ts_rcv_notify = current + _ts_rcv_notify_wait; return true; } private LinkedListNodeOfBufferItem CreateSendBufferItem(in KcpBuffer data, byte fragment, uint current, ushort windowSize, uint serialNumber, uint unacknowledged, uint rto) { KcpSendReceiveBufferItem newseg = new() { Data = data, Segment = new KcpPacketHeader(KcpCommand.Push, fragment, windowSize, current, serialNumber, unacknowledged), Stats = new KcpSendSegmentStats(current, rto, 0, 0) }; return _cache.Allocate(in newseg); } private static KcpPacketHeader DeplicateHeader(ref KcpPacketHeader header, uint timestamp, ushort windowSize, uint unacknowledged) { return new KcpPacketHeader(header.Command, header.Fragment, windowSize, timestamp, header.SerialNumber, unacknowledged); } private uint GetUnusedReceiveWindow() { var count = (uint)_receiveQueue.GetQueueSize(); if (count < _rcv_wnd) return _rcv_wnd - count; return 0; } private async void RunUpdateOnActivation() { var cancellationToken = _updateLoopCts?.Token ?? new CancellationToken(true); var activation = _updateActivation; if (activation is null) return; while (!cancellationToken.IsCancellationRequested) { var update = false; using (var notification = await activation.WaitAsync(CancellationToken.None).ConfigureAwait(false)) { if (TransportClosed) break; var packet = notification.Packet; if (!packet.IsEmpty) try { update = SetInput(packet.Span); } catch (Exception ex) { new Logger("KcpServer").Error("Update error", ex); } if (TransportClosed) break; update |= notification.TimerNotification; } try { if (update) await UpdateCoreAsync(cancellationToken).ConfigureAwait(false); } catch (OperationCanceledException) { break; } catch (Exception ex) { if (!HandleFlushException(ex)) break; } if (_keepAliveEnabled && TimeDiff(GetTimestamp(), _lastReceiveTick) > _keepAliveGracePeriod) SetTransportClosed(); } } private ValueTask UpdateCoreAsync(CancellationToken cancellationToken) { var current = GetTimestamp(); long slap = TimeDiff(current, _ts_flush); if (slap > 10000 || slap < -10000) { _ts_flush = current; slap = 0; } if (slap >= 0 || _nodelay) { _ts_flush += _interval; if (TimeDiff(current, _ts_flush) >= 0) _ts_flush = current + _interval; return FlushCoreAsync(cancellationToken); } return default; } private bool HandleFlushException(Exception ex) { var handler = _exceptionHandler; var state = _exceptionHandlerState; var result = false; if (handler is not null) try { result = handler.Invoke(ex, this, state); } catch { result = false; } if (!result) SetTransportClosed(); return result; } /// public ValueTask InputPakcetAsync(UdpReceiveResult packet, CancellationToken cancellationToken = default) { if (cancellationToken.IsCancellationRequested) return new ValueTask(Task.FromCanceled(cancellationToken)); int packetHeaderSize = _id.HasValue ? KcpGlobalVars.HEADER_LENGTH_WITH_CONVID : KcpGlobalVars.HEADER_LENGTH_WITHOUT_CONVID; if (packet.Buffer.Length < packetHeaderSize) return default; ReadOnlySpan packetSpan = packet.Buffer.AsSpan(); if (_id.HasValue) { var conversationId = BinaryPrimitives.ReadUInt64BigEndian(packet.Buffer.AsSpan()); if (conversationId != _id.GetValueOrDefault()) return default; packetSpan = packetSpan.Slice(8); } var length = BinaryPrimitives.ReadUInt32LittleEndian(packetSpan.Slice(16)); if (length > (uint)(packetSpan.Length - 20)) // implicitly checked for (int)length < 0 return default; var activation = _updateActivation; if (activation is null) return default; return activation.InputPacketAsync(packet.Buffer.AsMemory(), cancellationToken); } private bool SetInput(ReadOnlySpan packet) { var current = GetTimestamp(); var packetHeaderSize = _id.HasValue ? 28 : 20; var prev_una = _snd_una; uint maxack = 0, latest_ts = 0; var flag = false; var mutated = false; while (true) { if (packet.Length < packetHeaderSize) break; if (_id.HasValue) { if (BinaryPrimitives.ReadUInt64BigEndian(packet) != _id.GetValueOrDefault()) return mutated; packet = packet.Slice(8); } var header = KcpPacketHeader.Parse(packet); var length = BinaryPrimitives.ReadInt32LittleEndian(packet.Slice(16)); packet = packet.Slice(20); if ((uint)length > (uint)packet.Length) return mutated; if (header.Command != KcpCommand.Push && header.Command != KcpCommand.Ack && header.Command != KcpCommand.WindowProbe && header.Command != KcpCommand.WindowSize) return mutated; _lastReceiveTick = current; _rmt_wnd = header.WindowSize; mutated = HandleUnacknowledged(header.Unacknowledged) | mutated; mutated = UpdateSendUnacknowledged() | mutated; if (header.Command == KcpCommand.Ack) { var rtt = TimeDiff(current, header.Timestamp); if (rtt >= 0) UpdateRto(rtt); mutated = HandleAck(header.SerialNumber) | mutated; mutated = UpdateSendUnacknowledged() | mutated; if (!flag) { flag = true; maxack = header.SerialNumber; latest_ts = header.Timestamp; } else { if (TimeDiff(_snd_nxt, maxack) > 0) { #if !IKCP_FASTACK_CONSERVE maxack = header.SerialNumber; latest_ts = header.Timestamp; #else if (TimeDiff(header.Timestamp, latest_ts) > 0) { maxack = header.SerialNumber; latest_ts = header.Timestamp; } #endif } } } else if (header.Command == KcpCommand.Push) { if (TimeDiff(header.SerialNumber, _rcv_nxt + _rcv_wnd) < 0) { AckPush(header.SerialNumber, header.Timestamp); if (TimeDiff(header.SerialNumber, _rcv_nxt) >= 0) mutated = HandleData(header, packet.Slice(0, length)) | mutated; if (_receiveWindowNotificationOptions is not null) if (_ts_rcv_notify_wait != 0) { _ts_rcv_notify_wait = 0; _ts_rcv_notify = current + (uint)_receiveWindowNotificationOptions.InitialInterval; } } } else if (header.Command == KcpCommand.WindowProbe) { _probe |= KcpProbeType.AskTell; } else if (header.Command == KcpCommand.WindowSize) { // do nothing } else { return mutated; } packet = packet.Slice(length); } if (flag) HandleFastAck(maxack, latest_ts); if (TimeDiff(_snd_una, prev_una) > 0) { var lockTaken = false; try { _cwndUpdateLock.Enter(ref lockTaken); var cwnd = _cwnd; var incr = _incr; if (cwnd < _rmt_wnd) { var mss = (uint)_mss; if (cwnd < _ssthresh) { cwnd++; incr += mss; } else { if (incr < mss) incr = mss; incr += mss * mss / incr + mss / 16; cwnd = (incr + mss - 1) / (mss > 0 ? mss : 1); } if (cwnd > _rmt_wnd) { cwnd = _rmt_wnd; incr = _rmt_wnd * mss; } } _cwnd = cwnd; _incr = incr; } finally { if (lockTaken) _cwndUpdateLock.Exit(); } } return mutated; } private bool HandleUnacknowledged(uint una) { var mutated = false; lock (_sndBuf) { var node = _sndBuf.First; while (node is not null) { var next = node.Next; if (TimeDiff(una, node.ValueRef.Segment.SerialNumber) > 0) { _sndBuf.Remove(node); ref var dataRef = ref node.ValueRef.Data; _sendQueue.SubtractUnflushedBytes(dataRef.Length); dataRef.Release(); dataRef = default; _cache.Return(node); mutated = true; } else { break; } node = next; } } return mutated; } private bool UpdateSendUnacknowledged() { lock (_sndBuf) { var first = _sndBuf.First; var snd_una = first is null ? _snd_nxt : first.ValueRef.Segment.SerialNumber; var old_snd_una = (uint)Interlocked.Exchange(ref Unsafe.As(ref _snd_una), (int)snd_una); return snd_una != old_snd_una; } } private void UpdateRto(int rtt) { if (_rx_srtt == 0) { _rx_srtt = rtt; _rx_rttval = rtt / 2; } else { var delta = rtt - _rx_srtt; if (delta < 0) delta = -delta; _rx_rttval = (3 * _rx_rttval + delta) / 4; _rx_srtt = (7 * _rx_srtt + rtt) / 8; if (_rx_srtt < 1) _rx_srtt = 1; } var rto = _rx_srtt + Math.Max((int)_interval, 4 * _rx_rttval); #if NEED_MATH_SHIM _rx_rto = Math.Min(Math.Max((uint)rto, _rx_minrto), IKCP_RTO_MAX); #else _rx_rto = Math.Clamp((uint)rto, _rx_minrto, IKCP_RTO_MAX); #endif } private bool HandleAck(uint serialNumber) { if (TimeDiff(serialNumber, _snd_una) < 0 || TimeDiff(serialNumber, _snd_nxt) >= 0) return false; lock (_sndBuf) { var node = _sndBuf.First; while (node is not null) { var next = node.Next; if (serialNumber == node.ValueRef.Segment.SerialNumber) { _sndBuf.Remove(node); ref var dataRef = ref node.ValueRef.Data; _sendQueue.SubtractUnflushedBytes(dataRef.Length); dataRef.Release(); dataRef = default; _cache.Return(node); return true; } if (TimeDiff(serialNumber, node.ValueRef.Segment.SerialNumber) < 0) return false; node = next; } } return false; } private bool HandleData(KcpPacketHeader header, ReadOnlySpan data) { var serialNumber = header.SerialNumber; if (TimeDiff(serialNumber, _rcv_nxt + _rcv_wnd) >= 0 || TimeDiff(serialNumber, _rcv_nxt) < 0) return false; var mutated = false; var repeat = false; LinkedListNodeOfBufferItem? node; lock (_rcvBuf) { if (TransportClosed) return false; node = _rcvBuf.Last; while (node is not null) { var nodeSerialNumber = node.ValueRef.Segment.SerialNumber; if (serialNumber == nodeSerialNumber) { repeat = true; break; } if (TimeDiff(serialNumber, nodeSerialNumber) > 0) break; node = node.Previous; } if (!repeat) { var buffer = _bufferPool.Rent(new KcpBufferPoolRentOptions(data.Length, false)); KcpSendReceiveBufferItem item = new() { Data = KcpBuffer.CreateFromSpan(buffer, data), Segment = header }; if (node is null) _rcvBuf.AddFirst(_cache.Allocate(in item)); else _rcvBuf.AddAfter(node, _cache.Allocate(in item)); mutated = true; } // move available data from rcv_buf -> rcv_queue node = _rcvBuf.First; while (node is not null) { var next = node.Next; if (node.ValueRef.Segment.SerialNumber == _rcv_nxt && _receiveQueue.GetQueueSize() < _rcv_wnd) { _rcvBuf.Remove(node); _receiveQueue.Enqueue(node.ValueRef.Data, node.ValueRef.Segment.Fragment); node.ValueRef.Data = default; _cache.Return(node); _rcv_nxt++; mutated = true; } else { break; } node = next; } } return mutated; } private void AckPush(uint serialNumber, uint timestamp) { _ackList.Add(serialNumber, timestamp); } private void HandleFastAck(uint serialNumber, uint timestamp) { if (TimeDiff(serialNumber, _snd_una) < 0 || TimeDiff(serialNumber, _snd_nxt) >= 0) return; lock (_sndBuf) { var node = _sndBuf.First; while (node is not null) { var next = node.Next; if (TimeDiff(serialNumber, node.ValueRef.Segment.SerialNumber) < 0) break; if (serialNumber != node.ValueRef.Segment.SerialNumber) { ref var stats = ref node.ValueRef.Stats; #if !IKCP_FASTACK_CONSERVE stats = new KcpSendSegmentStats(stats.ResendTimestamp, stats.Rto, stats.FastAck + 1, stats.TransmitCount); #else if (TimeDiff(timestamp, node.ValueRef.Segment.Timestamp) >= 0) { stats = new KcpSendSegmentStats(stats.ResendTimestamp, stats.Rto, stats.FastAck + 1, stats.TransmitCount); } #endif } node = next; } } } private static uint GetTimestamp() { return (uint)Environment.TickCount; } private static int TimeDiff(uint later, uint earlier) { return (int)(later - earlier); } /// /// Get the size of the next available message in the receive queue. /// /// The transport state and the size of the next available message. /// The receive or peek operation is initiated concurrently. /// /// True if the receive queue contains at least one message. False if the receive queue is empty or the transport /// is closed. /// public bool TryPeek(out KcpConversationReceiveResult result) { return _receiveQueue.TryPeek(out result); } /// /// Remove the next available message in the receive queue and copy its content into . When /// in stream mode, move as many bytes as possible into . /// /// The buffer to receive message. /// The transport state and the count of bytes moved into . /// /// The size of the next available message is larger than the size of /// . This exception is never thrown in stream mode. /// /// The receive or peek operation is initiated concurrently. /// /// True if the next available message is moved into . False if the receive queue is /// empty or the transport is closed. /// public bool TryReceive(Span buffer, out KcpConversationReceiveResult result) { return _receiveQueue.TryReceive(buffer, out result); } /// /// Wait until the receive queue contains at least one full message, or at least one byte in stream mode. /// /// The token to cancel this operation. /// /// The is fired before receive /// operation is completed. /// /// The receive or peek operation is initiated concurrently. /// /// A that completes when the receive queue contains at /// least one full message, or at least one byte in stream mode. Its result contains the transport state and the size /// of the available message. /// public ValueTask WaitToReceiveAsync(CancellationToken cancellationToken = default) { return _receiveQueue.WaitToReceiveAsync(cancellationToken); } /// /// Wait until the receive queue contains at leat bytes. /// /// The minimum bytes in the receive queue. /// The token to cancel this operation. /// is a negative integer. /// /// The is fired before receive /// operation is completed. /// /// The receive or peek operation is initiated concurrently. /// /// A that completes when the receive queue contains at least /// bytes. The result of the task is false when the transport is closed. /// public ValueTask WaitForReceiveQueueAvailableDataAsync(int minimumBytes, CancellationToken cancellationToken = default) { return _receiveQueue.WaitForAvailableDataAsync(minimumBytes, 0, cancellationToken); } /// /// Wait until the receive queue contains at leat bytes, and also /// segments. /// /// The minimum bytes in the receive queue. /// The minimum segments in the receive queue /// The token to cancel this operation. /// /// Any od and /// is a negative integer. /// /// /// The is fired before receive /// operation is completed. /// /// The receive or peek operation is initiated concurrently. /// /// A that completes when the receive queue contains at least /// bytes. The result of the task is false when the transport is closed. /// public ValueTask WaitForReceiveQueueAvailableDataAsync(int minimumBytes, int minimumSegments, CancellationToken cancellationToken = default) { return _receiveQueue.WaitForAvailableDataAsync(minimumBytes, minimumSegments, cancellationToken); } /// /// Wait for the next full message to arrive if the receive queue is empty. Remove the next available message in the /// receive queue and copy its content into . When in stream mode, move as many bytes as /// possible into . /// /// The buffer to receive message. /// The token to cancel this operation. /// /// The size of the next available message is larger than the size of /// . This exception is never thrown in stream mode. /// /// /// The is fired before send operation /// is completed. /// /// The receive or peek operation is initiated concurrently. /// /// A that completes when a full message is moved into /// or the transport is closed. Its result contains the transport state and the count of /// bytes written into . /// public ValueTask ReceiveAsync(Memory buffer, CancellationToken cancellationToken = default) { return _receiveQueue.ReceiveAsync(buffer, cancellationToken); } internal ValueTask ReadAsync(Memory buffer, CancellationToken cancellationToken) { return _receiveQueue.ReadAsync(buffer, cancellationToken); } /// /// Cancel the current receive operation. /// /// True if the current operation is canceled. False if there is no active send operation. public bool CancelPendingReceive() { return _receiveQueue.CancelPendingOperation(null, default); } /// /// Cancel the current receive operation. /// /// /// The inner exception of the thrown by the /// method or /// method. /// /// /// The in the /// thrown by the method or /// method. /// /// True if the current operation is canceled. False if there is no active send operation. public bool CancelPendingReceive(Exception? innerException, CancellationToken cancellationToken) { return _receiveQueue.CancelPendingOperation(innerException, cancellationToken); } /// public void SetTransportClosed() { TransportClosed = true; Interlocked.Exchange(ref _updateActivation, null)?.Dispose(); var updateLoopCts = Interlocked.Exchange(ref _updateLoopCts, null); if (updateLoopCts is not null) { updateLoopCts.Cancel(); updateLoopCts.Dispose(); } _sendQueue.SetTransportClosed(); _receiveQueue.SetTransportClosed(); lock (_sndBuf) { var node = _sndBuf.First; var next = node?.Next; while (node is not null) { lock (node) { node.ValueRef.Data.Release(); node.ValueRef = default; } _sndBuf.Remove(node); node = next; next = node?.Next; } } lock (_rcvBuf) { var node = _rcvBuf.First; while (node is not null) { node.ValueRef.Data.Release(); node = node.Next; } _rcvBuf.Clear(); } _queueItemCache.Clear(); } /// public void Dispose() { var disposed = _disposed; _disposed = true; SetTransportClosed(); if (!disposed) { _sendQueue.Dispose(); _receiveQueue.Dispose(); } } }