diff options
Diffstat (limited to 'lib/Net.Transport.SimpleTCP')
-rw-r--r-- | lib/Net.Transport.SimpleTCP/src/ITcpListner.cs | 80 | ||||
-rw-r--r-- | lib/Net.Transport.SimpleTCP/src/TCPConfig.cs | 4 | ||||
-rw-r--r-- | lib/Net.Transport.SimpleTCP/src/TcpListenerNode.cs | 291 | ||||
-rw-r--r-- | lib/Net.Transport.SimpleTCP/src/TcpServer.cs | 246 |
4 files changed, 393 insertions, 228 deletions
diff --git a/lib/Net.Transport.SimpleTCP/src/ITcpListner.cs b/lib/Net.Transport.SimpleTCP/src/ITcpListner.cs new file mode 100644 index 0000000..0abf73c --- /dev/null +++ b/lib/Net.Transport.SimpleTCP/src/ITcpListner.cs @@ -0,0 +1,80 @@ +/* +* Copyright (c) 2024 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Net.Transport.SimpleTCP +* File: TcpServer.cs +* +* TcpServer.cs is part of VNLib.Net.Transport.SimpleTCP which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Net.Transport.SimpleTCP is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 2 of the +* License, or (at your option) any later version. +* +* VNLib.Net.Transport.SimpleTCP is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Threading; +using System.Threading.Tasks; + +using VNLib.Utils.Memory.Caching; + +namespace VNLib.Net.Transport.Tcp +{ + /// <summary> + /// An immutable TCP listening instance that has been configured to accept incoming + /// connections from a <see cref="TcpServer"/> instance + /// </summary> + public interface ITcpListner : ICacheHolder + { + /// <summary> + /// Accepts a connection and returns the connection descriptor. + /// </summary> + /// <param name="cancellation">A token to cancel the operation</param> + /// <returns>The connection descriptor</returns> + /// <remarks> + /// NOTE: You must always call the <see cref="CloseConnectionAsync"/> and + /// destroy all references to it when you are done. You must also dispose the stream returned + /// from the <see cref="ITcpConnectionDescriptor.GetStream"/> method. + /// </remarks> + /// <exception cref="InvalidOperationException"></exception> + ValueTask<ITcpConnectionDescriptor> AcceptConnectionAsync(CancellationToken cancellation); + + /// <summary> + /// Cleanly closes an existing TCP connection obtained from <see cref="AcceptConnectionAsync(CancellationToken)"/> + /// and returns the instance to the pool for reuse. + /// <para> + /// If you set <paramref name="reuse"/> to true, the server will attempt to reuse the descriptor instance, you + /// must ensure that all previous references to the descriptor are destroyed. If the value is false, resources + /// are freed and the instance is disposed. + /// </para> + /// </summary> + /// <param name="descriptor">The existing descriptor to close</param> + /// <param name="reuse">A value that indicates if the server can safley reuse the descriptor instance</param> + /// <returns>A task that represents the closing operations</returns> + /// <exception cref="ArgumentNullException"></exception> + ValueTask CloseConnectionAsync(ITcpConnectionDescriptor descriptor, bool reuse); + + /// <summary> + /// Stops the listener loop and attempts to cleanup all resources, + /// you should consider waiting for <see cref="WaitForExitAsync"/> + /// before disposing the listener. + /// </summary> + void Close(); + + /// <summary> + /// Waits for all listening threads to exit before completing the task + /// </summary> + /// <returns>A task that completes when all listening threads exit</returns> + Task WaitForExitAsync(); + } +}
\ No newline at end of file diff --git a/lib/Net.Transport.SimpleTCP/src/TCPConfig.cs b/lib/Net.Transport.SimpleTCP/src/TCPConfig.cs index 1dbf6e4..9003e0d 100644 --- a/lib/Net.Transport.SimpleTCP/src/TCPConfig.cs +++ b/lib/Net.Transport.SimpleTCP/src/TCPConfig.cs @@ -66,6 +66,10 @@ namespace VNLib.Net.Transport.Tcp /// </summary> public readonly int MaxRecvBufferData { get; init; } /// <summary> + /// The maximum number of allowed socket connections to this server + /// </summary> + public readonly long MaxConnections { get; init; } + /// <summary> /// The listener socket backlog count /// </summary> public readonly int BackLog { get; init; } diff --git a/lib/Net.Transport.SimpleTCP/src/TcpListenerNode.cs b/lib/Net.Transport.SimpleTCP/src/TcpListenerNode.cs new file mode 100644 index 0000000..bb0a39b --- /dev/null +++ b/lib/Net.Transport.SimpleTCP/src/TcpListenerNode.cs @@ -0,0 +1,291 @@ +/* +* Copyright (c) 2024 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Net.Transport.SimpleTCP +* File: TcpServer.cs +* +* TcpServer.cs is part of VNLib.Net.Transport.SimpleTCP which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Net.Transport.SimpleTCP is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 2 of the +* License, or (at your option) any later version. +* +* VNLib.Net.Transport.SimpleTCP is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Net; +using System.Threading; +using System.Diagnostics; +using System.Net.Sockets; +using System.Threading.Tasks; +using System.Runtime.CompilerServices; + +using System.IO.Pipelines; + +using VNLib.Utils.Async; +using VNLib.Utils.Logging; +using VNLib.Utils.Memory.Caching; + +namespace VNLib.Net.Transport.Tcp +{ + internal sealed class TcpListenerNode : ITcpListner + { + public readonly TCPConfig Config; + public readonly Socket ServerSocket; + public readonly ObjectRental<AwaitableAsyncServerSocket> SockAsyncArgPool; + public readonly AsyncQueue<ITcpConnectionDescriptor> WaitingSockets; + + private readonly int _recvBufferSize; + private readonly int _sendBufferSize; + + public bool IsCancelled; + private Task _onExitTask; + + public TcpListenerNode(in TCPConfig config, Socket serverSocket, PipeOptions pipeOptions) + { + Config = config; + ServerSocket = serverSocket; + + //Cache socket buffer sizes to avoid system calls + _recvBufferSize = ServerSocket.ReceiveBufferSize; + _sendBufferSize = ServerSocket.SendBufferSize; + + //Arguments constructor + [MethodImpl(MethodImplOptions.AggressiveInlining)] + AwaitableAsyncServerSocket ArgsConstructor() => new(pipeOptions); + + SockAsyncArgPool = ObjectRental.CreateReusable(ArgsConstructor, config.CacheQuota); + + //Init waiting socket queue, always multi-threaded + WaitingSockets = new(singleWriter: false, singleReader: false); + + _onExitTask = Task.CompletedTask; + } + + internal void StartWorkers() + { + Task[] acceptWorkers = new Task[Config.AcceptThreads]; + + //Start listening for connections + for (int i = 0; i < Config.AcceptThreads; i++) + { + acceptWorkers[i] = Task.Run(ExecAcceptAsync); + } + + _onExitTask = Task.WhenAll(acceptWorkers); + } + + ///<inheritdoc/> + public void Close() + { + IsCancelled = true; + + /* + * Disposing the server socket will cause all accept + * operations to fail and allow accept threads to exit + */ + ServerSocket.Dispose(); + } + + ///<inheritdoc/> + public void CacheClear() => SockAsyncArgPool.CacheClear(); + + ///<inheritdoc/> + public void CacheHardClear() => SockAsyncArgPool.CacheHardClear(); + + ///<inheritdoc/> + public Task WaitForExitAsync() => _onExitTask; + + + public ValueTask<ITcpConnectionDescriptor> AcceptConnectionAsync(CancellationToken cancellation) + { + //Try get args from queue + if (WaitingSockets.TryDequeue(out ITcpConnectionDescriptor? args)) + { + return ValueTask.FromResult(args); + } + + return WaitingSockets!.DequeueAsync(cancellation); + } + + + public async ValueTask CloseConnectionAsync(ITcpConnectionDescriptor descriptor, bool reuse) + { + ArgumentNullException.ThrowIfNull(descriptor); + + //Recover args + AwaitableAsyncServerSocket args = (AwaitableAsyncServerSocket)descriptor; + + PrintConnectionInfo(args, SocketAsyncOperation.Disconnect); + + /* + * Technically a user can mess this counter up by continually calling + * this function even if the connection is already closed. + */ + OnClientDisconnected(); + + //Close the socket and cleanup resources + SocketError err = await args.CloseConnectionAsync(); + + if (err != SocketError.Success) + { + Config.Log.Verbose("Socket disconnect failed with error code {ec}.", err); + } + + //Can only reuse if the server is still listening + reuse &= !IsCancelled; + + //See if we can reuse the args + if (reuse) + { + //Return to pool + SockAsyncArgPool.Return(args); + } + else + { + //Dispose + args.Dispose(); + } + } + + internal async Task ExecAcceptAsync() + { + Debug.Assert(!IsCancelled, "Expected a valid canceled flag instance"); + + OnAcceptThreadStart(); + + try + { + do + { + //Rent new args + AwaitableAsyncServerSocket acceptArgs = SockAsyncArgPool.Rent(); + + //Accept new connection + SocketError err = await acceptArgs.AcceptAsync(ServerSocket, _recvBufferSize, _sendBufferSize); + + //Check canceled flag before proceeding + if (IsCancelled) + { + //dispose and bail + acceptArgs.Dispose(); + Config.Log.Verbose("Accept thread aborted for {socket}", Config.LocalEndPoint); + } + else if (err == SocketError.Success) + { + bool maxConsReached = _connectedClients > Config.MaxConnections; + + //Add to waiting queue + if (maxConsReached || !WaitingSockets!.TryEnque(acceptArgs)) + { + /* + * If max connections are reached or the queue is overflowing, + * connections must be dropped + */ + + _ = await acceptArgs.CloseConnectionAsync(); + + /* + * Writing to log will likely compound resource exhaustion, but the user must be informed + * connections are being dropped. + */ + Config.Log.Warn("Socket {e} disconnected because the waiting queue is overflowing", acceptArgs.GetHashCode()); + + //Re-eqnue + SockAsyncArgPool.Return(acceptArgs); + } + else + { + //Success + PrintConnectionInfo(acceptArgs, SocketAsyncOperation.Accept); + + OnClientConnected(); + } + } + else + { + //Error + Config.Log.Debug("Socket accept failed with error code {ec}", err); + //Safe to return args to the pool as long as the server is listening + SockAsyncArgPool.Return(acceptArgs); + } + } while (!IsCancelled); + } + catch (Exception ex) + { + Config.Log.Fatal(ex, "Accept thread failed with exception"); + } + finally + { + OnAcceptThreadExit(); + } + } + + [Conditional("DEBUG")] + internal void PrintConnectionInfo(ITcpConnectionDescriptor con, SocketAsyncOperation operation) + { + if (!Config.DebugTcpLog) + { + return; + } + + con.GetEndpoints(out IPEndPoint local, out IPEndPoint remote); + + switch (operation) + { + default: + Config.Log.Verbose("Socket {operation} on {local} -> {remote}", operation, local, remote); + break; + } + } + + + /* + * A reference counter for tracking + * accept threads + */ + private uint _acceptThreadsActive; + private long _connectedClients; + + private void OnClientConnected() => Interlocked.Increment(ref _connectedClients); + + private void OnClientDisconnected() => Interlocked.Decrement(ref _connectedClients); + + private void OnAcceptThreadStart() => Interlocked.Increment(ref _acceptThreadsActive); + + private void OnAcceptThreadExit() + { + /* + * Track active threads. When the last thread exists + * we can cleanup internal state + */ + + if (Interlocked.Decrement(ref _acceptThreadsActive) == 0) + { + Cleanup(); + } + } + + + private void Cleanup() + { + SockAsyncArgPool.Dispose(); + + //Dispose any queued client sockets that need to exit + while (WaitingSockets!.TryDequeue(out ITcpConnectionDescriptor? args)) + { + (args as IDisposable)!.Dispose(); + } + } + } +}
\ No newline at end of file diff --git a/lib/Net.Transport.SimpleTCP/src/TcpServer.cs b/lib/Net.Transport.SimpleTCP/src/TcpServer.cs index 08625d2..ad8987e 100644 --- a/lib/Net.Transport.SimpleTCP/src/TcpServer.cs +++ b/lib/Net.Transport.SimpleTCP/src/TcpServer.cs @@ -23,22 +23,16 @@ */ using System; -using System.Net; using System.Security; -using System.Threading; -using System.Diagnostics; using System.Net.Sockets; -using System.Threading.Tasks; -using System.Runtime.CompilerServices; using System.IO.Pipelines; -using VNLib.Utils.Async; using VNLib.Utils.Logging; -using VNLib.Utils.Memory.Caching; namespace VNLib.Net.Transport.Tcp { + /// <summary> /// <para> /// Provides a simple, high performance, single process, low/no allocation, @@ -49,18 +43,16 @@ namespace VNLib.Net.Transport.Tcp /// connections is expected. This class cannot be inherited /// </para> /// </summary> - public sealed class TcpServer : ICacheHolder + public sealed class TcpServer { private readonly TCPConfig _config; + private readonly PipeOptions _pipeOptions; /// <summary> /// The current <see cref="TcpServer"/> configuration /// </summary> public ref readonly TCPConfig Config => ref _config; - private readonly ObjectRental<AwaitableAsyncServerSocket> SockAsyncArgPool; - private readonly AsyncQueue<ITcpConnectionDescriptor> WaitingSockets; - /// <summary> /// Initializes a new <see cref="TcpServer"/> with the specified <see cref="TCPConfig"/> /// </summary> @@ -96,7 +88,7 @@ namespace VNLib.Net.Transport.Tcp //Assign default pipe options pipeOptions ??= new( - config.BufferPool, + pool: config.BufferPool, readerScheduler: PipeScheduler.ThreadPool, writerScheduler: PipeScheduler.ThreadPool, pauseWriterThreshold: config.MaxRecvBufferData, @@ -104,48 +96,30 @@ namespace VNLib.Net.Transport.Tcp useSynchronizationContext: false ); - //Arguments constructor - [MethodImpl(MethodImplOptions.AggressiveInlining)] - AwaitableAsyncServerSocket ArgsConstructor() => new(pipeOptions); - - SockAsyncArgPool = ObjectRental.CreateReusable(ArgsConstructor, config.CacheQuota); - - //Init waiting socket queue, always multi-threaded - WaitingSockets = new(false, false); + _pipeOptions = pipeOptions; } - ///<inheritdoc/> - public void CacheClear() => SockAsyncArgPool.CacheClear(); - - ///<inheritdoc/> - public void CacheHardClear() => SockAsyncArgPool.CacheHardClear(); - /// <summary> /// Begins listening for incoming TCP connections on the configured socket /// </summary> - /// <param name="token">A token that is used to abort listening operations and close the socket</param> - /// <returns>A task that resolves when all accept threads have exited. The task does not need to be observed</returns> + /// <returns> + /// A new immutable <see cref="ITcpListner"/> + /// </returns> /// <exception cref="SocketException"></exception> /// <exception cref="SecurityException"></exception> /// <exception cref="ArgumentException"></exception> /// <exception cref="InvalidOperationException"></exception> - public Task Start(CancellationToken token) + public ITcpListner Listen() { Socket serverSock; - - //make sure the token isnt already canceled - if (token.IsCancellationRequested) - { - throw new ArgumentException("Token is already canceled", nameof(token)); - } - + //Configure socket on the current thread so exceptions will be raised to the caller serverSock = new(_config.LocalEndPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp); - //Bind socket + serverSock.Bind(_config.LocalEndPoint); - //Begin listening + serverSock.Listen(_config.BackLog); - + //See if keepalive should be used if (_config.TcpKeepalive) { @@ -157,196 +131,12 @@ namespace VNLib.Net.Transport.Tcp //Invoke socket created callback _config.OnSocketCreated?.Invoke(serverSock); + + TcpListenerNode listener = new(in Config, serverSock, _pipeOptions); - //Clear canceled flag - StrongBox<bool> canceledFlag = new(false); - - Task[] acceptWorkers = new Task[_config.AcceptThreads]; - - //Start listening for connections - for (int i = 0; i < _config.AcceptThreads; i++) - { - acceptWorkers[i] = Task.Run(() => ExecAcceptAsync(serverSock, canceledFlag), token); - } - - CancellationTokenRegistration reg = default; - - //Cleanup callback - void cleanup() - { - //Set canceled flag - canceledFlag.Value = true; - - //Clean up socket - serverSock.Dispose(); - - //Cleanup pool - SockAsyncArgPool.CacheHardClear(); - - //Dispose any queued sockets - while (WaitingSockets!.TryDequeue(out ITcpConnectionDescriptor? args)) - { - (args as IDisposable)!.Dispose(); - } - - reg.Dispose(); - } - - //Register cleanup - reg = token.Register(cleanup, false); - - return Task.WhenAll(acceptWorkers); - } - - private async Task ExecAcceptAsync(Socket serverSock, StrongBox<bool> canceled) - { - Debug.Assert(serverSock != null, "Expected not-null server socket value"); - Debug.Assert(canceled != null && !canceled.Value, "Expected a valid canceled flag instance"); - - //Cache buffer sizes - int recBufferSize = serverSock.ReceiveBufferSize; - int sendBufferSize = serverSock.SendBufferSize; - - //Cache local endpoint for multi-server logging - EndPoint localEndpoint = serverSock.LocalEndPoint!; - - Debug.Assert(localEndpoint != null, "Expected a socket bound to a local endpoint"); - - try - { - while (!canceled.Value) - { - //Rent new args - AwaitableAsyncServerSocket acceptArgs = SockAsyncArgPool.Rent(); - - //Accept new connection - SocketError err = await acceptArgs.AcceptAsync(serverSock, recBufferSize, sendBufferSize); - - //Check canceled flag before proceeding - if (canceled.Value) - { - //dispose and bail - acceptArgs.Dispose(); - _config.Log.Verbose("Accept thread aborted for {socket}", localEndpoint); - } - else if (err == SocketError.Success) - { - // Add to waiting queue - if (!WaitingSockets!.TryEnque(acceptArgs)) - { - _ = await acceptArgs.CloseConnectionAsync(); - - /* - * Writing to log will likely compound resource exhaustion, but the user must be informed - * connections are being dropped. - */ - _config.Log.Warn("Socket {e} disconnected because the waiting queue is overflowing", acceptArgs.GetHashCode()); - - //Re-eqnue - SockAsyncArgPool.Return(acceptArgs); - } - - //Success - PrintConnectionInfo(acceptArgs, SocketAsyncOperation.Accept); - } - else - { - //Error - _config.Log.Debug("Socket accept failed with error code {ec}", err); - //Return args to pool - SockAsyncArgPool.Return(acceptArgs); - } - } - } - catch(Exception ex) - { - _config.Log.Fatal(ex, "Accept thread failed with exception"); - } - } - - - /// <summary> - /// Accepts a connection and returns the connection descriptor. - /// </summary> - /// <param name="cancellation">A token to cancel the operation</param> - /// <returns>The connection descriptor</returns> - /// <remarks> - /// NOTE: You must always call the <see cref="CloseConnectionAsync"/> and - /// destroy all references to it when you are done. You must also dispose the stream returned - /// from the <see cref="ITcpConnectionDescriptor.GetStream"/> method. - /// </remarks> - /// <exception cref="InvalidOperationException"></exception> - public ValueTask<ITcpConnectionDescriptor> AcceptConnectionAsync(CancellationToken cancellation) - { - //Try get args from queue - if (WaitingSockets.TryDequeue(out ITcpConnectionDescriptor? args)) - { - return ValueTask.FromResult(args); - } - - return WaitingSockets!.DequeueAsync(cancellation); - } - - /// <summary> - /// Cleanly closes an existing TCP connection obtained from <see cref="AcceptConnectionAsync(CancellationToken)"/> - /// and returns the instance to the pool for reuse. - /// <para> - /// If you set <paramref name="reuse"/> to true, the server will attempt to reuse the descriptor instance, you - /// must ensure that all previous references to the descriptor are destroyed. If the value is false, resources - /// are freed and the instance is disposed. - /// </para> - /// </summary> - /// <param name="descriptor">The existing descriptor to close</param> - /// <param name="reuse">A value that indicates if the server can safley reuse the descriptor instance</param> - /// <returns>A task that represents the closing operations</returns> - /// <exception cref="ArgumentNullException"></exception> - public async ValueTask CloseConnectionAsync(ITcpConnectionDescriptor descriptor, bool reuse) - { - ArgumentNullException.ThrowIfNull(descriptor); - - //Recover args - AwaitableAsyncServerSocket args = (AwaitableAsyncServerSocket)descriptor; - - PrintConnectionInfo(args, SocketAsyncOperation.Disconnect); - - //Close the socket and cleanup resources - SocketError err = await args.CloseConnectionAsync(); - - if (err != SocketError.Success) - { - _config.Log.Verbose("Socket disconnect failed with error code {ec}.", err); - } - - //See if we can reuse the args - if (reuse) - { - //Return to pool - SockAsyncArgPool.Return(args); - } - else - { - //Dispose - args.Dispose(); - } - } - - - [Conditional("DEBUG")] - private void PrintConnectionInfo(ITcpConnectionDescriptor con, SocketAsyncOperation operation) - { - if (!_config.DebugTcpLog) - { - return; - } - - con.GetEndpoints(out IPEndPoint local, out IPEndPoint remote); + listener.StartWorkers(); - switch (operation) - { - default: - _config.Log.Verbose("Socket {operation} on {local} -> {remote}", operation, local, remote); - break; - } + return listener; } } -}
\ No newline at end of file +} |