diff options
author | vnugent <public@vaughnnugent.com> | 2024-07-28 19:15:04 -0400 |
---|---|---|
committer | vnugent <public@vaughnnugent.com> | 2024-07-28 19:15:04 -0400 |
commit | 7be5d6648e633ba46a270ca5784de6f4a5a4e0a9 (patch) | |
tree | a6b53de82f12e6778ede2b3974073a6c3b51aace /lib/Net.Transport.SimpleTCP/src | |
parent | 1b590c2517fef110564943ed8a10edd11fa758b0 (diff) |
Squashed commit of the following:
commit a4dacd2909426bf628c1eee1253cc5c8a01e2691
Author: vnugent <public@vaughnnugent.com>
Date: Sat Jul 27 22:41:04 2024 -0400
package updates
commit f836e09981866f5c9f2ae46990d11b186a7d12bb
Author: vnugent <public@vaughnnugent.com>
Date: Wed Jul 24 19:15:54 2024 -0400
chore: Remove argon2 docs & optional tcp resuse
commit b9b892ab2143b0ab92e4dcf0a8b043c5c6c17271
Author: vnugent <public@vaughnnugent.com>
Date: Sun Jul 21 20:57:01 2024 -0400
fix spelling Enqueue and deprecate mispelled version
commit 21ffa816f18be4b765ad740ed5d93346ec3b1fda
Author: vnugent <public@vaughnnugent.com>
Date: Sat Jul 20 19:44:31 2024 -0400
static arugment list parsing functions
commit 85cd6793818a3edd0a963bb4829a960ee6b0e022
Author: vnugent <public@vaughnnugent.com>
Date: Mon Jul 15 18:58:06 2024 -0400
chore: Just some minor checks and adjustments
commit abfb5761ee381b7e1e5342a5525ceca8c8fd81dd
Author: vnugent <public@vaughnnugent.com>
Date: Thu Jul 4 23:57:37 2024 -0400
analyzer pass
commit 4a96dbb924f2b5bf80293e4054f221efe67151dd
Author: vnugent <public@vaughnnugent.com>
Date: Thu Jul 4 22:45:28 2024 -0400
package updates
commit 38ad7d923fa8d9e463d4aaa8e35f021086a03f2d
Author: vnugent <public@vaughnnugent.com>
Date: Thu Jul 4 16:20:48 2024 -0400
mimalloc merge upstream upgrades
commit 981ba286e4793de95bf65e6588313411344c4d53
Author: vnugent <public@vaughnnugent.com>
Date: Thu Jul 4 16:04:03 2024 -0400
refactor: Refactor extensions with perf updates
commit 6b8c67888731f7dd210acdb2b1160cdbdbe30d47
Author: vnugent <public@vaughnnugent.com>
Date: Fri Jun 28 15:48:22 2024 -0400
refactor: Update service stack to reflect new loading patterns
commit 12391e9a207b60b41a074600fc2373ad3eb1c3ab
Author: vnugent <public@vaughnnugent.com>
Date: Wed Jun 26 21:01:15 2024 -0400
feat(server): Server arch update, Memory struct access
commit 92e182ceaf843f8d859d38faa8b2c0ff53207ff6
Author: vnugent <public@vaughnnugent.com>
Date: Fri Jun 21 16:02:34 2024 -0400
feat: Multi transport listeners
commit ee3620b8168a42c8e571e853c751ad5999a9b907
Author: vnugent <public@vaughnnugent.com>
Date: Tue Jun 18 21:17:28 2024 -0400
feat: Add file path caching support
commit ff0926be56fc6eafdce36411847d73bf4ce9f183
Author: vnugent <public@vaughnnugent.com>
Date: Sun Jun 16 13:08:31 2024 -0400
feat: Allow multiple plugin loading directories
commit 07ddf6738d32127926d07b1366e56d2a2308b53b
Author: vnugent <public@vaughnnugent.com>
Date: Sun Jun 16 01:12:07 2024 -0400
perf: Absolutely yuge perf boosts
commit ff15c05a9c3e632c39f3889820fb7d889342b452
Author: vnugent <public@vaughnnugent.com>
Date: Fri Jun 14 14:16:24 2024 -0400
fix: Improper request buffer property assignment
commit 7d2987f1d4048c30808a85798e32c99747f6cfe3
Author: vnugent <public@vaughnnugent.com>
Date: Thu Jun 13 21:57:34 2024 -0400
perf: Async pre-buffer to avoid sync buffer
commit 75c1d0cbf9a5a7856c544671a45f1b4312ffe7ce
Author: vnugent <public@vaughnnugent.com>
Date: Tue Jun 11 22:11:45 2024 -0400
feat: Add a default site adapater and interceptors
commit a7c739b7db9a17622cee751fe0e8a10e4b84b48b
Author: vnugent <public@vaughnnugent.com>
Date: Sun Jun 9 13:05:12 2024 -0400
chore: Package updated
commit b4b506a4b6c7c1e90b5b0980e4cfe0460e7546a2
Author: vnugent <public@vaughnnugent.com>
Date: Sat Jun 8 21:54:52 2024 -0400
some minor touchups
commit 2160510fcc22a8574b0090fd91ca29072f45ab59
Author: vnugent <public@vaughnnugent.com>
Date: Fri May 31 15:12:20 2024 -0400
refactor: Immutable tcp listeners
commit 51cb4eb93e4f1b4c47d35b105e72af1fe771abcc
Author: vnugent <public@vaughnnugent.com>
Date: Thu May 30 17:31:16 2024 -0400
refactor: minor non-breaking changes to VNEncoding
commit 768ddc1eb949266d693f96c67d734e881bd59374
Merge: 9a835fe 1b590c2
Author: vnugent <public@vaughnnugent.com>
Date: Wed May 22 17:50:57 2024 -0400
Merge branch 'main' into develop
commit 9a835fe12c9586ab8dd44d7c96fef4a2d6017e4b
Author: vnugent <public@vaughnnugent.com>
Date: Fri May 17 18:27:03 2024 -0400
chore: Update mimmaloc v2.1.6, update fPIC & cleanup
commit 3b7004b88acfc7f7baa3a8857a5a2f7cf3dd560e
Author: vnugent <public@vaughnnugent.com>
Date: Fri May 17 16:03:28 2024 -0400
feat: Added ReadFileDataAsync function
commit 9a964795757bf0da4dd7fcab15ad304f4ea3fdf1
Author: vnugent <public@vaughnnugent.com>
Date: Wed May 15 21:57:39 2024 -0400
refactor: Harden some argon2 password hashing
commit 4035c838c1508af0aa7e767a97431a692958ce1c
Author: vnugent <public@vaughnnugent.com>
Date: Sun May 12 16:55:32 2024 -0400
perf: Utils + http perf mods
commit f4f0d4f74250257991c57bfae74c4852c7e1ae46
Author: vnugent <public@vaughnnugent.com>
Date: Thu May 2 15:22:53 2024 -0400
feat: Buff middleware handlers
|
| Added implicit support for middleware post processing of files before the filehandler closes the connection. Also cleaned up some project file stuff
commit f0b7dca107659df1d7d4631fdbd2aae9d716d053
Merge: 8c4a45e 107b058
Author: vnugent <public@vaughnnugent.com>
Date: Sat Apr 20 12:24:05 2024 -0400
Merge branch 'main' into develop
commit 8c4a45e384accf92b1b6d748530e8d46f7de40d6
Author: vnugent <public@vaughnnugent.com>
Date: Sat Apr 20 11:10:30 2024 -0400
refactor: Overhaul C libraries and fix builds
commit 42ff77080d10b0fc9fecbbc46141e8e23a1d066a
Author: vnugent <public@vaughnnugent.com>
Date: Sat Apr 20 00:45:57 2024 -0400
fix!: Middlware array, multiple cookie set, and cookie check
commit 97e82b9d66f387f9e6d21d88ddc7a8ab8693149c
Merge: 4ca5791 e07537a
Author: vnugent <public@vaughnnugent.com>
Date: Tue Apr 2 13:34:22 2024 -0400
Merge branch 'main' into develop
commit 4ca5791ed67b9834bdbd010206b30373e4705e9b
Author: vnugent <public@vaughnnugent.com>
Date: Tue Apr 2 13:32:12 2024 -0400
fix: Missed ! on null pointer check
commit 9b4036377c52200c6488c98180d69a0e63321f97
Author: vnugent <public@vaughnnugent.com>
Date: Tue Apr 2 13:22:29 2024 -0400
fix: Fix _In_ macro for compression public api
commit 53a7b4b5c5b67b4a4e06e1d9098cac4bcd6afd7c
Merge: 448a93b 21130c8
Author: vnugent <public@vaughnnugent.com>
Date: Sun Mar 31 17:01:15 2024 -0400
Merge branch 'main' into develop
commit 448a93bb1d18d032087afe2476ffccb98634a54c
Author: vnugent <public@vaughnnugent.com>
Date: Sun Mar 31 16:56:51 2024 -0400
ci: fix third-party dir cleanup
commit 9afed1427472da1ea13079f98dbe27339e55ee7d
Author: vnugent <public@vaughnnugent.com>
Date: Sun Mar 31 16:43:15 2024 -0400
perf: Deprecate unsafememoryhandle span extensions
commit 3ff90da4f02af47ea6d233fdd4445337ebe36452
Author: vnugent <public@vaughnnugent.com>
Date: Sat Mar 30 21:36:18 2024 -0400
refactor: Updates, advanced tracing, http optimizations
commit 8d6b79b5ae309b36f265ba81529bcef8bfcd7414
Merge: 6c1667b 5585915
Author: vnugent <public@vaughnnugent.com>
Date: Sun Mar 24 21:01:31 2024 -0400
Merge branch 'main' into develop
commit 6c1667be23597513537f8190e2f55d65eb9b7c7a
Author: vnugent <public@vaughnnugent.com>
Date: Fri Mar 22 12:01:53 2024 -0400
refactor: Overhauled native library loading and lazy init
commit ebf688f2f974295beabf7b5def7e6f6f150551d0
Author: vnugent <public@vaughnnugent.com>
Date: Wed Mar 20 22:16:17 2024 -0400
refactor: Update compression header files and macros + Ci build
commit 9c7b564911080ccd5cbbb9851a0757b05e1e9047
Author: vnugent <public@vaughnnugent.com>
Date: Tue Mar 19 21:54:49 2024 -0400
refactor: JWK overhaul & add length getter to FileUpload
commit 6d8c3444e09561e5957491b3cc1ae858e0abdd14
Author: vnugent <public@vaughnnugent.com>
Date: Mon Mar 18 16:13:20 2024 -0400
feat: Add FNV1a software checksum and basic correction tests
commit 00d182088cecefc08ca80b1faee9bed3f215f40b
Author: vnugent <public@vaughnnugent.com>
Date: Fri Mar 15 01:05:27 2024 -0400
chore: #6 Use utils filewatcher instead of built-in
commit d513c10d9895c6693519ef1d459c6a5a76929436
Author: vnugent <public@vaughnnugent.com>
Date: Sun Mar 10 21:58:14 2024 -0400
source tree project location updated
Diffstat (limited to 'lib/Net.Transport.SimpleTCP/src')
-rw-r--r-- | lib/Net.Transport.SimpleTCP/src/AwaitableAsyncServerSocket.cs | 24 | ||||
-rw-r--r-- | lib/Net.Transport.SimpleTCP/src/ITcpListner.cs | 80 | ||||
-rw-r--r-- | lib/Net.Transport.SimpleTCP/src/ITransportInterface.cs | 13 | ||||
-rw-r--r-- | lib/Net.Transport.SimpleTCP/src/ReusableNetworkStream.cs | 33 | ||||
-rw-r--r-- | lib/Net.Transport.SimpleTCP/src/SocketPipeLineWorker.cs | 58 | ||||
-rw-r--r-- | lib/Net.Transport.SimpleTCP/src/TCPConfig.cs | 29 | ||||
-rw-r--r-- | lib/Net.Transport.SimpleTCP/src/TcpListenerNode.cs | 291 | ||||
-rw-r--r-- | lib/Net.Transport.SimpleTCP/src/TcpServer.cs | 263 |
8 files changed, 494 insertions, 297 deletions
diff --git a/lib/Net.Transport.SimpleTCP/src/AwaitableAsyncServerSocket.cs b/lib/Net.Transport.SimpleTCP/src/AwaitableAsyncServerSocket.cs index e830107..1dbaa67 100644 --- a/lib/Net.Transport.SimpleTCP/src/AwaitableAsyncServerSocket.cs +++ b/lib/Net.Transport.SimpleTCP/src/AwaitableAsyncServerSocket.cs @@ -47,19 +47,21 @@ namespace VNLib.Net.Transport.Tcp private Socket? _socket; public readonly SocketPipeLineWorker SocketWorker; + private readonly bool _reuseSocket; private readonly AwaitableValueSocketEventArgs _recvArgs = new(); private readonly AwaitableValueSocketEventArgs _allArgs = new(); private Task _sendTask = Task.CompletedTask; private Task _recvTask = Task.CompletedTask; - public AwaitableAsyncServerSocket(PipeOptions options) : base() + public AwaitableAsyncServerSocket(bool reuseSocket, PipeOptions options) : base() { + _reuseSocket = reuseSocket && IsWindows; //Reuse only available on Windows SocketWorker = new(options); //Set reuse flags now - _recvArgs.DisconnectReuseSocket = IsWindows; - _allArgs.DisconnectReuseSocket = IsWindows; + _recvArgs.DisconnectReuseSocket = _reuseSocket; + _allArgs.DisconnectReuseSocket = _reuseSocket; } @@ -75,11 +77,11 @@ namespace VNLib.Net.Transport.Tcp //get buffer from the pipe to write initial accept data to Memory<byte> buffer = SocketWorker.GetMemory(recvBuffSize); _allArgs.SetBuffer(buffer); - - //Also on windows we can reuse the previous socket if its set - _allArgs.AcceptSocket = _socket; } + //Reuse socket if it's set + _allArgs.AcceptSocket = _socket; + //Begin the accept SocketError error = await _allArgs.AcceptAsync(serverSocket); @@ -129,12 +131,12 @@ namespace VNLib.Net.Transport.Tcp await _recvTask.ConfigureAwait(false); /* - * Sockets are reused as much as possible on Windows. If the socket + * Sockets can be reused as much as possible on Windows. If the socket * failes to disconnect cleanly, the release function won't clean it up * so it needs to be cleaned up here so at least our args instance * can be reused. */ - if(IsWindows && error != SocketError.Success) + if(_reuseSocket && error != SocketError.Success) { _socket.Dispose(); _socket = null; @@ -168,7 +170,7 @@ namespace VNLib.Net.Transport.Tcp void IReusable.Prepare() { - Debug.Assert(_socket == null || IsWindows, "Exepcted stale socket to be NULL on non-Windows platform"); + Debug.Assert(_socket == null || _reuseSocket, "Exepcted stale socket to be NULL on non-Windows platform"); _allArgs.Prepare(); _recvArgs.Prepare(); @@ -184,8 +186,8 @@ namespace VNLib.Net.Transport.Tcp _allArgs.Release(); _recvArgs.Release(); - //if the socket is still 'connected' (or not windows), dispose it and clear the accept socket - if (_socket?.Connected == true || !IsWindows) + //if the socket is still 'connected' (or no reuse), dispose it and clear the accept socket + if (_socket?.Connected == true || !_reuseSocket) { _socket?.Dispose(); _socket = null; diff --git a/lib/Net.Transport.SimpleTCP/src/ITcpListner.cs b/lib/Net.Transport.SimpleTCP/src/ITcpListner.cs new file mode 100644 index 0000000..10bfde0 --- /dev/null +++ b/lib/Net.Transport.SimpleTCP/src/ITcpListner.cs @@ -0,0 +1,80 @@ +/* +* Copyright (c) 2024 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Net.Transport.SimpleTCP +* File: ITcpListner.cs +* +* ITcpListner.cs is part of VNLib.Net.Transport.SimpleTCP which is part of +* the larger VNLib collection of libraries and utilities. +* +* VNLib.Net.Transport.SimpleTCP is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 2 of the +* License, or (at your option) any later version. +* +* VNLib.Net.Transport.SimpleTCP is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Threading; +using System.Threading.Tasks; + +using VNLib.Utils.Memory.Caching; + +namespace VNLib.Net.Transport.Tcp +{ + /// <summary> + /// An immutable TCP listening instance that has been configured to accept incoming + /// connections from a <see cref="TcpServer"/> instance + /// </summary> + public interface ITcpListner : ICacheHolder + { + /// <summary> + /// Accepts a connection and returns the connection descriptor. + /// </summary> + /// <param name="cancellation">A token to cancel the operation</param> + /// <returns>The connection descriptor</returns> + /// <remarks> + /// NOTE: You must always call the <see cref="CloseConnectionAsync"/> and + /// destroy all references to it when you are done. You must also dispose the stream returned + /// from the <see cref="ITcpConnectionDescriptor.GetStream"/> method. + /// </remarks> + /// <exception cref="InvalidOperationException"></exception> + ValueTask<ITcpConnectionDescriptor> AcceptConnectionAsync(CancellationToken cancellation); + + /// <summary> + /// Cleanly closes an existing TCP connection obtained from <see cref="AcceptConnectionAsync(CancellationToken)"/> + /// and returns the instance to the pool for reuse. + /// <para> + /// If you set <paramref name="reuse"/> to true, the server will attempt to reuse the descriptor instance, you + /// must ensure that all previous references to the descriptor are destroyed. If the value is false, resources + /// are freed and the instance is disposed. + /// </para> + /// </summary> + /// <param name="descriptor">The existing descriptor to close</param> + /// <param name="reuse">A value that indicates if the server can safley reuse the descriptor instance</param> + /// <returns>A task that represents the closing operations</returns> + /// <exception cref="ArgumentNullException"></exception> + ValueTask CloseConnectionAsync(ITcpConnectionDescriptor descriptor, bool reuse); + + /// <summary> + /// Stops the listener loop and attempts to cleanup all resources, + /// you should consider waiting for <see cref="WaitForExitAsync"/> + /// before disposing the listener. + /// </summary> + void Close(); + + /// <summary> + /// Waits for all listening threads to exit before completing the task + /// </summary> + /// <returns>A task that completes when all listening threads exit</returns> + Task WaitForExitAsync(); + } +}
\ No newline at end of file diff --git a/lib/Net.Transport.SimpleTCP/src/ITransportInterface.cs b/lib/Net.Transport.SimpleTCP/src/ITransportInterface.cs index 5d16c69..b5afef1 100644 --- a/lib/Net.Transport.SimpleTCP/src/ITransportInterface.cs +++ b/lib/Net.Transport.SimpleTCP/src/ITransportInterface.cs @@ -24,6 +24,7 @@ using System; +using System.Buffers; using System.Threading; using System.Threading.Tasks; @@ -68,5 +69,17 @@ namespace VNLib.Net.Transport.Tcp /// <returns>The number of bytes received</returns> int Recv(Span<byte> buffer, int timeout); + /// <summary> + /// Gets as transport buffer writer for more effecient writes + /// </summary> + IBufferWriter<byte> SendBuffer { get; } + + /// <summary> + /// Flushes the send buffer + /// </summary> + /// <param name="timeout"></param> + /// <param name="cancellation"></param> + /// <returns>A task that completes when pending write data has been sent</returns> + ValueTask FlushSendAsync(int timeout, CancellationToken cancellation); } }
\ No newline at end of file diff --git a/lib/Net.Transport.SimpleTCP/src/ReusableNetworkStream.cs b/lib/Net.Transport.SimpleTCP/src/ReusableNetworkStream.cs index 00b5d7c..1867748 100644 --- a/lib/Net.Transport.SimpleTCP/src/ReusableNetworkStream.cs +++ b/lib/Net.Transport.SimpleTCP/src/ReusableNetworkStream.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2023 Vaughn Nugent +* Copyright (c) 2024 Vaughn Nugent * * Library: VNLib * Package: VNLib.Net.Transport.SimpleTCP @@ -33,6 +33,7 @@ using System; +using System.Buffers; using System.IO; using System.Threading; using System.Threading.Tasks; @@ -45,7 +46,7 @@ namespace VNLib.Net.Transport.Tcp /// <summary> /// A reusable stream that marshals data between the socket pipeline and the application /// </summary> - internal sealed class ReusableNetworkStream : Stream + internal sealed class ReusableNetworkStream : Stream, IBufferWriter<byte> { #region stream basics public override bool CanRead => true; @@ -83,18 +84,31 @@ namespace VNLib.Net.Transport.Tcp //Timer used to cancel pipeline recv timeouts private readonly ITransportInterface Transport; - - internal ReusableNetworkStream(ITransportInterface transport) - { - Transport = transport; - } + + internal ReusableNetworkStream(ITransportInterface transport) => Transport = transport; ///<inheritdoc/> public override void Close() { } ///<inheritdoc/> - public override Task FlushAsync(CancellationToken cancellationToken) => Task.CompletedTask; + public override Task FlushAsync(CancellationToken cancellationToken) + => Transport.FlushSendAsync(_sendTimeoutMs, cancellationToken).AsTask(); + + + /* + * Expose the buffer writer interface on the stream + * for more efficient publishing + */ + + ///<inheritdoc/> + public void Advance(int count) => Transport.SendBuffer.Advance(count); + + ///<inheritdoc/> + public Memory<byte> GetMemory(int sizeHint = 0) => Transport.SendBuffer.GetMemory(sizeHint); + + ///<inheritdoc/> + public Span<byte> GetSpan(int sizeHint = 0) => Transport.SendBuffer.GetSpan(sizeHint); ///<inheritdoc/> public override void Flush() @@ -136,5 +150,6 @@ namespace VNLib.Net.Transport.Tcp */ public override ValueTask DisposeAsync() => ValueTask.CompletedTask; + } -}
\ No newline at end of file +} diff --git a/lib/Net.Transport.SimpleTCP/src/SocketPipeLineWorker.cs b/lib/Net.Transport.SimpleTCP/src/SocketPipeLineWorker.cs index cb3486f..27d1e27 100644 --- a/lib/Net.Transport.SimpleTCP/src/SocketPipeLineWorker.cs +++ b/lib/Net.Transport.SimpleTCP/src/SocketPipeLineWorker.cs @@ -234,6 +234,7 @@ namespace VNLib.Net.Transport.Tcp private FlushResult _recvFlushRes; + public async Task RecvDoWorkAsync<TIO>(TIO sock, int bytesTransferred, int recvBufferSize) where TIO : ISocketIo @@ -335,6 +336,28 @@ namespace VNLib.Net.Transport.Tcp await RecvPipe.Reader.CompleteAsync(); } + ///<inheritdoc/> + public IBufferWriter<byte> SendBuffer => SendPipe.Writer; + + ///<inheritdoc/> + public ValueTask FlushSendAsync(int timeout, CancellationToken cancellation) + { + //See if timer is required + if (timeout < 1) + { + NoOpTimerWrapper noOpTimer = default; + + //no timer + return SendWithTimerInternalAsync(in noOpTimer, cancellation); + } + else + { + TpTimerWrapper sendTimer = new(SendTimer, timeout); + + //Pass new send timer to send method + return SendWithTimerInternalAsync(in sendTimer, cancellation); + } + } private static async Task AwaitFlushTask<TTimer>(ValueTask<FlushResult> valueTask, TTimer timer) where TTimer : INetTimer @@ -350,6 +373,14 @@ namespace VNLib.Net.Transport.Tcp } } + private ValueTask SendAsync(ReadOnlySpan<byte> data, int timeout, CancellationToken cancellation) + { + //Publish send data to send pipe + CopyAndPublishDataOnSendPipe(data, _sysSocketBufferSize, SendPipe.Writer); + + return FlushSendAsync(timeout, cancellation); + } + private ValueTask SendWithTimerInternalAsync<TTimer>(in TTimer timer, CancellationToken cancellation) where TTimer : INetTimer { @@ -380,33 +411,11 @@ namespace VNLib.Net.Transport.Tcp return new(AwaitFlushTask(result, timer)); } } - catch + catch(Exception ex) { //Stop timer on exception timer.Stop(); - throw; - } - } - - private ValueTask SendAsync(ReadOnlySpan<byte> data, int timeout, CancellationToken cancellation) - { - //Publish send data to send pipe - CopyAndPublishDataOnSendPipe(data, _sysSocketBufferSize, SendPipe.Writer); - - //See if timer is required - if (timeout < 1) - { - NoOpTimerWrapper noOpTimer = default; - - //no timer - return SendWithTimerInternalAsync(in noOpTimer, cancellation); - } - else - { - TpTimerWrapper sendTimer = new(SendTimer, timeout); - - //Pass new send timer to send method - return SendWithTimerInternalAsync(in sendTimer, cancellation); + return ValueTask.FromException(ex); } } @@ -542,7 +551,6 @@ namespace VNLib.Net.Transport.Tcp RecvTimer.Stop(); } } - private static class ThrowHelpers { diff --git a/lib/Net.Transport.SimpleTCP/src/TCPConfig.cs b/lib/Net.Transport.SimpleTCP/src/TCPConfig.cs index 1dbf6e4..886e79d 100644 --- a/lib/Net.Transport.SimpleTCP/src/TCPConfig.cs +++ b/lib/Net.Transport.SimpleTCP/src/TCPConfig.cs @@ -39,40 +39,45 @@ namespace VNLib.Net.Transport.Tcp /// <summary> /// The <see cref="IPEndPoint"/> the listening socket will bind to /// </summary> - public readonly IPEndPoint LocalEndPoint { get; init; } + public required readonly IPEndPoint LocalEndPoint { get; init; } /// <summary> /// The log provider used to write logging information to /// </summary> - public readonly ILogProvider Log { get; init; } + public required readonly ILogProvider Log { get; init; } /// <summary> /// If TCP keepalive is enabled, the amount of time the connection is considered alive before another probe message is sent /// </summary> - public readonly int TcpKeepAliveTime { get; init; } + public required readonly int TcpKeepAliveTime { get; init; } /// <summary> /// If TCP keepalive is enabled, the amount of time the connection will wait for a keepalive message /// </summary> - public readonly int KeepaliveInterval { get; init; } - /// <summary> - /// Enables TCP keepalive - /// </summary> - public readonly bool TcpKeepalive { get; init; } + public required readonly int KeepaliveInterval { get; init; } /// <summary> /// The maximum number of waiting WSA asynchronous socket accept operations /// </summary> - public readonly uint AcceptThreads { get; init; } + public required readonly uint AcceptThreads { get; init; } /// <summary> /// The maximum size (in bytes) the transport will buffer in /// the receiving pipeline. /// </summary> - public readonly int MaxRecvBufferData { get; init; } + public required readonly int MaxRecvBufferData { get; init; } + /// <summary> + /// The maximum number of allowed socket connections to this server + /// </summary> + public required readonly long MaxConnections { get; init; } /// <summary> /// The listener socket backlog count /// </summary> - public readonly int BackLog { get; init; } + public required readonly int BackLog { get; init; } + /// <summary> + /// Reuse the TCP socket descriptor after a socket has been closed. + /// Only available on Windows platforms. + /// </summary> + public required readonly bool ReuseSocket { get; init; } /// <summary> /// The <see cref="MemoryPool{T}"/> to allocate transport buffers from /// </summary> - public readonly MemoryPool<byte> BufferPool { get; init; } + public required readonly MemoryPool<byte> BufferPool { get; init; } /// <summary> /// <para> /// The maxium number of event objects that will be cached diff --git a/lib/Net.Transport.SimpleTCP/src/TcpListenerNode.cs b/lib/Net.Transport.SimpleTCP/src/TcpListenerNode.cs new file mode 100644 index 0000000..665dd42 --- /dev/null +++ b/lib/Net.Transport.SimpleTCP/src/TcpListenerNode.cs @@ -0,0 +1,291 @@ +/* +* Copyright (c) 2024 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Net.Transport.SimpleTCP +* File: TcpListenerNode.cs +* +* TcpListenerNode.cs is part of VNLib.Net.Transport.SimpleTCP which is part +* of the larger VNLib collection of libraries and utilities. +* +* VNLib.Net.Transport.SimpleTCP is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 2 of the +* License, or (at your option) any later version. +* +* VNLib.Net.Transport.SimpleTCP is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Net; +using System.Threading; +using System.Diagnostics; +using System.Net.Sockets; +using System.Threading.Tasks; +using System.Runtime.CompilerServices; + +using System.IO.Pipelines; + +using VNLib.Utils.Async; +using VNLib.Utils.Logging; +using VNLib.Utils.Memory.Caching; + +namespace VNLib.Net.Transport.Tcp +{ + internal sealed class TcpListenerNode : ITcpListner + { + public readonly TCPConfig Config; + public readonly Socket ServerSocket; + public readonly ObjectRental<AwaitableAsyncServerSocket> SockAsyncArgPool; + public readonly AsyncQueue<ITcpConnectionDescriptor> WaitingSockets; + + private readonly int _recvBufferSize; + private readonly int _sendBufferSize; + + public bool IsCancelled; + private Task _onExitTask; + + public TcpListenerNode(in TCPConfig config, Socket serverSocket, PipeOptions pipeOptions) + { + Config = config; + ServerSocket = serverSocket; + + //Cache socket buffer sizes to avoid system calls + _recvBufferSize = ServerSocket.ReceiveBufferSize; + _sendBufferSize = ServerSocket.SendBufferSize; + + //Arguments constructor + [MethodImpl(MethodImplOptions.AggressiveInlining)] + AwaitableAsyncServerSocket ArgsConstructor() => new(Config.ReuseSocket, pipeOptions); + + SockAsyncArgPool = ObjectRental.CreateReusable(ArgsConstructor, config.CacheQuota); + + //Init waiting socket queue, always multi-threaded + WaitingSockets = new(singleWriter: false, singleReader: false); + + _onExitTask = Task.CompletedTask; + } + + internal void StartWorkers() + { + Task[] acceptWorkers = new Task[Config.AcceptThreads]; + + //Start listening for connections + for (int i = 0; i < Config.AcceptThreads; i++) + { + acceptWorkers[i] = Task.Run(ExecAcceptAsync); + } + + _onExitTask = Task.WhenAll(acceptWorkers); + } + + ///<inheritdoc/> + public void Close() + { + IsCancelled = true; + + /* + * Disposing the server socket will cause all accept + * operations to fail and allow accept threads to exit + */ + ServerSocket.Dispose(); + } + + ///<inheritdoc/> + public void CacheClear() => SockAsyncArgPool.CacheClear(); + + ///<inheritdoc/> + public void CacheHardClear() => SockAsyncArgPool.CacheHardClear(); + + ///<inheritdoc/> + public Task WaitForExitAsync() => _onExitTask; + + + public ValueTask<ITcpConnectionDescriptor> AcceptConnectionAsync(CancellationToken cancellation) + { + //Try get args from queue + if (WaitingSockets.TryDequeue(out ITcpConnectionDescriptor? args)) + { + return ValueTask.FromResult(args); + } + + return WaitingSockets!.DequeueAsync(cancellation); + } + + + public async ValueTask CloseConnectionAsync(ITcpConnectionDescriptor descriptor, bool reuse) + { + ArgumentNullException.ThrowIfNull(descriptor); + + //Recover args + AwaitableAsyncServerSocket args = (AwaitableAsyncServerSocket)descriptor; + + PrintConnectionInfo(args, SocketAsyncOperation.Disconnect); + + /* + * Technically a user can mess this counter up by continually calling + * this function even if the connection is already closed. + */ + OnClientDisconnected(); + + //Close the socket and cleanup resources + SocketError err = await args.CloseConnectionAsync(); + + if (err != SocketError.Success) + { + Config.Log.Verbose("Socket disconnect failed with error code {ec}.", err); + } + + //Can only reuse if the server is still listening + reuse &= !IsCancelled; + + //See if we can reuse the args + if (reuse) + { + //Return to pool + SockAsyncArgPool.Return(args); + } + else + { + //Dispose + args.Dispose(); + } + } + + internal async Task ExecAcceptAsync() + { + Debug.Assert(!IsCancelled, "Expected a valid canceled flag instance"); + + OnAcceptThreadStart(); + + try + { + do + { + //Rent new args + AwaitableAsyncServerSocket acceptArgs = SockAsyncArgPool.Rent(); + + //Accept new connection + SocketError err = await acceptArgs.AcceptAsync(ServerSocket, _recvBufferSize, _sendBufferSize); + + //Check canceled flag before proceeding + if (IsCancelled) + { + //dispose and bail + acceptArgs.Dispose(); + Config.Log.Verbose("Accept thread aborted for {socket}", Config.LocalEndPoint); + } + else if (err == SocketError.Success) + { + bool maxConsReached = _connectedClients > Config.MaxConnections; + + //Add to waiting queue + if (maxConsReached || !WaitingSockets!.TryEnqueue(acceptArgs)) + { + /* + * If max connections are reached or the queue is overflowing, + * connections must be dropped + */ + + _ = await acceptArgs.CloseConnectionAsync(); + + /* + * Writing to log will likely compound resource exhaustion, but the user must be informed + * connections are being dropped. + */ + Config.Log.Warn("Socket {e} disconnected because the waiting queue is overflowing", acceptArgs.GetHashCode()); + + //Re-eqnue + SockAsyncArgPool.Return(acceptArgs); + } + else + { + //Success + PrintConnectionInfo(acceptArgs, SocketAsyncOperation.Accept); + + OnClientConnected(); + } + } + else + { + //Error + Config.Log.Debug("Socket accept failed with error code {ec}", err); + //Safe to return args to the pool as long as the server is listening + SockAsyncArgPool.Return(acceptArgs); + } + } while (!IsCancelled); + } + catch (Exception ex) + { + Config.Log.Fatal(ex, "Accept thread failed with exception"); + } + finally + { + OnAcceptThreadExit(); + } + } + + [Conditional("DEBUG")] + internal void PrintConnectionInfo(ITcpConnectionDescriptor con, SocketAsyncOperation operation) + { + if (!Config.DebugTcpLog) + { + return; + } + + con.GetEndpoints(out IPEndPoint local, out IPEndPoint remote); + + switch (operation) + { + default: + Config.Log.Verbose("Socket {operation} on {local} -> {remote}", operation, local, remote); + break; + } + } + + + /* + * A reference counter for tracking + * accept threads + */ + private uint _acceptThreadsActive; + private long _connectedClients; + + private void OnClientConnected() => Interlocked.Increment(ref _connectedClients); + + private void OnClientDisconnected() => Interlocked.Decrement(ref _connectedClients); + + private void OnAcceptThreadStart() => Interlocked.Increment(ref _acceptThreadsActive); + + private void OnAcceptThreadExit() + { + /* + * Track active threads. When the last thread exists + * we can cleanup internal state + */ + + if (Interlocked.Decrement(ref _acceptThreadsActive) == 0) + { + Cleanup(); + } + } + + + private void Cleanup() + { + SockAsyncArgPool.Dispose(); + + //Dispose any queued client sockets that need to exit + while (WaitingSockets!.TryDequeue(out ITcpConnectionDescriptor? args)) + { + (args as IDisposable)!.Dispose(); + } + } + } +}
\ No newline at end of file diff --git a/lib/Net.Transport.SimpleTCP/src/TcpServer.cs b/lib/Net.Transport.SimpleTCP/src/TcpServer.cs index 08625d2..79d53fe 100644 --- a/lib/Net.Transport.SimpleTCP/src/TcpServer.cs +++ b/lib/Net.Transport.SimpleTCP/src/TcpServer.cs @@ -23,22 +23,16 @@ */ using System; -using System.Net; using System.Security; -using System.Threading; -using System.Diagnostics; using System.Net.Sockets; -using System.Threading.Tasks; -using System.Runtime.CompilerServices; using System.IO.Pipelines; -using VNLib.Utils.Async; using VNLib.Utils.Logging; -using VNLib.Utils.Memory.Caching; namespace VNLib.Net.Transport.Tcp { + /// <summary> /// <para> /// Provides a simple, high performance, single process, low/no allocation, @@ -49,18 +43,16 @@ namespace VNLib.Net.Transport.Tcp /// connections is expected. This class cannot be inherited /// </para> /// </summary> - public sealed class TcpServer : ICacheHolder + public sealed class TcpServer { private readonly TCPConfig _config; + private readonly PipeOptions _pipeOptions; /// <summary> /// The current <see cref="TcpServer"/> configuration /// </summary> public ref readonly TCPConfig Config => ref _config; - private readonly ObjectRental<AwaitableAsyncServerSocket> SockAsyncArgPool; - private readonly AsyncQueue<ITcpConnectionDescriptor> WaitingSockets; - /// <summary> /// Initializes a new <see cref="TcpServer"/> with the specified <see cref="TCPConfig"/> /// </summary> @@ -74,19 +66,14 @@ namespace VNLib.Net.Transport.Tcp if (pipeOptions == null) { //Pool is required when using default pipe options - _ = config.BufferPool ?? throw new ArgumentException("Buffer pool argument cannot be null"); + ArgumentNullException.ThrowIfNull(config.BufferPool); } - _ = config.Log ?? throw new ArgumentException("Log argument is required"); + ArgumentNullException.ThrowIfNull(config.Log, nameof(config.Log)); + + ArgumentOutOfRangeException.ThrowIfLessThan(config.MaxRecvBufferData, 4096); + ArgumentOutOfRangeException.ThrowIfLessThan(config.AcceptThreads, 1u); - if (config.MaxRecvBufferData < 4096) - { - throw new ArgumentException("MaxRecvBufferData size must be at least 4096 bytes to avoid data pipeline pefromance issues"); - } - if (config.AcceptThreads < 1) - { - throw new ArgumentException("Accept thread count must be greater than 0"); - } if (config.AcceptThreads > Environment.ProcessorCount) { config.Log.Debug("Suggestion: Setting accept threads to {pc}", Environment.ProcessorCount); @@ -96,7 +83,7 @@ namespace VNLib.Net.Transport.Tcp //Assign default pipe options pipeOptions ??= new( - config.BufferPool, + pool: config.BufferPool, readerScheduler: PipeScheduler.ThreadPool, writerScheduler: PipeScheduler.ThreadPool, pauseWriterThreshold: config.MaxRecvBufferData, @@ -104,50 +91,28 @@ namespace VNLib.Net.Transport.Tcp useSynchronizationContext: false ); - //Arguments constructor - [MethodImpl(MethodImplOptions.AggressiveInlining)] - AwaitableAsyncServerSocket ArgsConstructor() => new(pipeOptions); - - SockAsyncArgPool = ObjectRental.CreateReusable(ArgsConstructor, config.CacheQuota); - - //Init waiting socket queue, always multi-threaded - WaitingSockets = new(false, false); + _pipeOptions = pipeOptions; } - ///<inheritdoc/> - public void CacheClear() => SockAsyncArgPool.CacheClear(); - - ///<inheritdoc/> - public void CacheHardClear() => SockAsyncArgPool.CacheHardClear(); - /// <summary> /// Begins listening for incoming TCP connections on the configured socket /// </summary> - /// <param name="token">A token that is used to abort listening operations and close the socket</param> - /// <returns>A task that resolves when all accept threads have exited. The task does not need to be observed</returns> + /// <returns> + /// A new immutable <see cref="ITcpListner"/> + /// </returns> /// <exception cref="SocketException"></exception> /// <exception cref="SecurityException"></exception> /// <exception cref="ArgumentException"></exception> /// <exception cref="InvalidOperationException"></exception> - public Task Start(CancellationToken token) + public ITcpListner Listen() { - Socket serverSock; - - //make sure the token isnt already canceled - if (token.IsCancellationRequested) - { - throw new ArgumentException("Token is already canceled", nameof(token)); - } - //Configure socket on the current thread so exceptions will be raised to the caller - serverSock = new(_config.LocalEndPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp); - //Bind socket - serverSock.Bind(_config.LocalEndPoint); - //Begin listening - serverSock.Listen(_config.BackLog); + Socket serverSock = new(_config.LocalEndPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp); + serverSock.Bind(_config.LocalEndPoint); + //See if keepalive should be used - if (_config.TcpKeepalive) + if (_config.TcpKeepAliveTime > 0) { //Setup socket keepalive from config serverSock.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive, true); @@ -158,195 +123,13 @@ namespace VNLib.Net.Transport.Tcp //Invoke socket created callback _config.OnSocketCreated?.Invoke(serverSock); - //Clear canceled flag - StrongBox<bool> canceledFlag = new(false); - - Task[] acceptWorkers = new Task[_config.AcceptThreads]; - - //Start listening for connections - for (int i = 0; i < _config.AcceptThreads; i++) - { - acceptWorkers[i] = Task.Run(() => ExecAcceptAsync(serverSock, canceledFlag), token); - } - - CancellationTokenRegistration reg = default; - - //Cleanup callback - void cleanup() - { - //Set canceled flag - canceledFlag.Value = true; - - //Clean up socket - serverSock.Dispose(); - - //Cleanup pool - SockAsyncArgPool.CacheHardClear(); - - //Dispose any queued sockets - while (WaitingSockets!.TryDequeue(out ITcpConnectionDescriptor? args)) - { - (args as IDisposable)!.Dispose(); - } - - reg.Dispose(); - } - - //Register cleanup - reg = token.Register(cleanup, false); - - return Task.WhenAll(acceptWorkers); - } - - private async Task ExecAcceptAsync(Socket serverSock, StrongBox<bool> canceled) - { - Debug.Assert(serverSock != null, "Expected not-null server socket value"); - Debug.Assert(canceled != null && !canceled.Value, "Expected a valid canceled flag instance"); - - //Cache buffer sizes - int recBufferSize = serverSock.ReceiveBufferSize; - int sendBufferSize = serverSock.SendBufferSize; - - //Cache local endpoint for multi-server logging - EndPoint localEndpoint = serverSock.LocalEndPoint!; - - Debug.Assert(localEndpoint != null, "Expected a socket bound to a local endpoint"); - - try - { - while (!canceled.Value) - { - //Rent new args - AwaitableAsyncServerSocket acceptArgs = SockAsyncArgPool.Rent(); - - //Accept new connection - SocketError err = await acceptArgs.AcceptAsync(serverSock, recBufferSize, sendBufferSize); - - //Check canceled flag before proceeding - if (canceled.Value) - { - //dispose and bail - acceptArgs.Dispose(); - _config.Log.Verbose("Accept thread aborted for {socket}", localEndpoint); - } - else if (err == SocketError.Success) - { - // Add to waiting queue - if (!WaitingSockets!.TryEnque(acceptArgs)) - { - _ = await acceptArgs.CloseConnectionAsync(); - - /* - * Writing to log will likely compound resource exhaustion, but the user must be informed - * connections are being dropped. - */ - _config.Log.Warn("Socket {e} disconnected because the waiting queue is overflowing", acceptArgs.GetHashCode()); - - //Re-eqnue - SockAsyncArgPool.Return(acceptArgs); - } - - //Success - PrintConnectionInfo(acceptArgs, SocketAsyncOperation.Accept); - } - else - { - //Error - _config.Log.Debug("Socket accept failed with error code {ec}", err); - //Return args to pool - SockAsyncArgPool.Return(acceptArgs); - } - } - } - catch(Exception ex) - { - _config.Log.Fatal(ex, "Accept thread failed with exception"); - } - } - - - /// <summary> - /// Accepts a connection and returns the connection descriptor. - /// </summary> - /// <param name="cancellation">A token to cancel the operation</param> - /// <returns>The connection descriptor</returns> - /// <remarks> - /// NOTE: You must always call the <see cref="CloseConnectionAsync"/> and - /// destroy all references to it when you are done. You must also dispose the stream returned - /// from the <see cref="ITcpConnectionDescriptor.GetStream"/> method. - /// </remarks> - /// <exception cref="InvalidOperationException"></exception> - public ValueTask<ITcpConnectionDescriptor> AcceptConnectionAsync(CancellationToken cancellation) - { - //Try get args from queue - if (WaitingSockets.TryDequeue(out ITcpConnectionDescriptor? args)) - { - return ValueTask.FromResult(args); - } - - return WaitingSockets!.DequeueAsync(cancellation); - } - - /// <summary> - /// Cleanly closes an existing TCP connection obtained from <see cref="AcceptConnectionAsync(CancellationToken)"/> - /// and returns the instance to the pool for reuse. - /// <para> - /// If you set <paramref name="reuse"/> to true, the server will attempt to reuse the descriptor instance, you - /// must ensure that all previous references to the descriptor are destroyed. If the value is false, resources - /// are freed and the instance is disposed. - /// </para> - /// </summary> - /// <param name="descriptor">The existing descriptor to close</param> - /// <param name="reuse">A value that indicates if the server can safley reuse the descriptor instance</param> - /// <returns>A task that represents the closing operations</returns> - /// <exception cref="ArgumentNullException"></exception> - public async ValueTask CloseConnectionAsync(ITcpConnectionDescriptor descriptor, bool reuse) - { - ArgumentNullException.ThrowIfNull(descriptor); - - //Recover args - AwaitableAsyncServerSocket args = (AwaitableAsyncServerSocket)descriptor; - - PrintConnectionInfo(args, SocketAsyncOperation.Disconnect); - - //Close the socket and cleanup resources - SocketError err = await args.CloseConnectionAsync(); - - if (err != SocketError.Success) - { - _config.Log.Verbose("Socket disconnect failed with error code {ec}.", err); - } - - //See if we can reuse the args - if (reuse) - { - //Return to pool - SockAsyncArgPool.Return(args); - } - else - { - //Dispose - args.Dispose(); - } - } - + serverSock.Listen(_config.BackLog); - [Conditional("DEBUG")] - private void PrintConnectionInfo(ITcpConnectionDescriptor con, SocketAsyncOperation operation) - { - if (!_config.DebugTcpLog) - { - return; - } + TcpListenerNode listener = new(in Config, serverSock, _pipeOptions); - con.GetEndpoints(out IPEndPoint local, out IPEndPoint remote); + listener.StartWorkers(); - switch (operation) - { - default: - _config.Log.Verbose("Socket {operation} on {local} -> {remote}", operation, local, remote); - break; - } + return listener; } } -}
\ No newline at end of file +} |