diff options
Diffstat (limited to 'lib/Net.Transport.SimpleTCP/src')
6 files changed, 235 insertions, 121 deletions
diff --git a/lib/Net.Transport.SimpleTCP/src/ITcpConnectionDescriptor.cs b/lib/Net.Transport.SimpleTCP/src/ITcpConnectionDescriptor.cs new file mode 100644 index 0000000..fe3013c --- /dev/null +++ b/lib/Net.Transport.SimpleTCP/src/ITcpConnectionDescriptor.cs @@ -0,0 +1,54 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Net.Transport.SimpleTCP +* File: ITcpConnectionDescriptor.cs +* +* ITcpConnectionDescriptor.cs is part of VNLib.Net.Transport.SimpleTCP which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Net.Transport.SimpleTCP is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 2 of the +* License, or (at your option) any later version. +* +* VNLib.Net.Transport.SimpleTCP is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System.IO; +using System.Net.Sockets; + + +namespace VNLib.Net.Transport.Tcp +{ + /// <summary> + /// An opaque TCP connection descriptor + /// </summary> + public interface ITcpConnectionDescriptor + { + internal Socket Socket { get; } + + /// <summary> + /// Gets a stream wrapper around the connection. + /// </summary> + /// <remarks> + /// You must dispose of this stream when you are done with it. + /// </remarks> + Stream GetStream(); + + /// <summary> + /// Closes a connection and cleans up any resources + /// </summary> + /// <remarks> + /// You must destory any references to this connection descriptor after calling this method. + /// </remarks> + void CloseConnection(); + } +}
\ No newline at end of file diff --git a/lib/Net.Transport.SimpleTCP/src/SocketPipeLineWorker.cs b/lib/Net.Transport.SimpleTCP/src/SocketPipeLineWorker.cs index dda859c..db87357 100644 --- a/lib/Net.Transport.SimpleTCP/src/SocketPipeLineWorker.cs +++ b/lib/Net.Transport.SimpleTCP/src/SocketPipeLineWorker.cs @@ -29,7 +29,6 @@ using System.Threading; using System.Net.Sockets; using System.IO.Pipelines; using System.Threading.Tasks; -using System.Runtime.CompilerServices; using VNLib.Utils.Memory; using VNLib.Utils.Memory.Caching; @@ -43,62 +42,24 @@ namespace VNLib.Net.Transport.Tcp /// </summary> internal sealed class SocketPipeLineWorker : ITransportInterface, IReusable { - public void Prepare() - {} - - public bool Release() - { - /* - * If the pipeline has been started, then the pipes - * will be completed by the worker threads (or by the streams) - * and when release is called, there will no longer be - * an observer for the result, which means the pipes - * may be safely reset for reuse - */ - if (_recvTask != null) - { - SendPipe.Reset(); - RecvPipe.Reset(); - } - /* - * If socket had an error and was not started, - * it means there may be data written to the - * recv pipe from the accept operation, that - * needs to be cleared - */ - else - { - //Complete the recvpipe then reset it to discard buffered data - RecvPipe.Reader.Complete(); - RecvPipe.Writer.Complete(); - //now reset it - RecvPipe.Reset(); - } - - //Cleanup tasks - _recvTask = null; - _sendTask = null; - - //Cleanup cts - _cts?.Dispose(); - _cts = null; - - return true; - } + /* + * [0] = Recv + * [1] = Send + * [2] = Send Complete + * [3] = Read Complete + */ - private Task? _recvTask; - private Task? _sendTask; - - private CancellationTokenSource? _cts; + private readonly Task[] _tasks; public readonly ReusableNetworkStream NetworkStream; - private readonly Pipe SendPipe; private readonly Pipe RecvPipe; private readonly Timer RecvTimer; private readonly Timer SendTimer; private readonly Stream RecvStream; + private CancellationTokenSource? _cts; + ///<inheritdoc/> public int SendTimeoutMs { get; set; } @@ -127,6 +88,54 @@ namespace VNLib.Net.Transport.Tcp SendTimeoutMs = Timeout.Infinite; RecvTimeoutMs = Timeout.Infinite; + + /* + * Store the operation tasks in an array, so they can be + * joined when the stream is closed + */ + _tasks = new Task[4]; + } + + public void Prepare() + { } + + public bool Release() + { + /* + * If the pipeline has been started, then the pipes + * will be completed by the worker threads (or by the streams) + * and when release is called, there will no longer be + * an observer for the result, which means the pipes + * may be safely reset for reuse + */ + if (_tasks[0] != null) + { + SendPipe.Reset(); + RecvPipe.Reset(); + } + /* + * If socket had an error and was not started, + * it means there may be data written to the + * recv pipe from the accept operation, that + * needs to be cleared + */ + else + { + //Complete the recvpipe then reset it to discard buffered data + RecvPipe.Reader.Complete(); + RecvPipe.Writer.Complete(); + //now reset it + RecvPipe.Reset(); + } + + //Cleanup tasks + Array.Clear(_tasks); + + //Cleanup cts + _cts?.Dispose(); + _cts = null; + + return true; } /// <summary> @@ -146,8 +155,8 @@ namespace VNLib.Net.Transport.Tcp //Advance writer RecvPipe.Writer.Advance(bytesTransferred); //begin recv tasks, and pass inital data to be flushed flag - _recvTask = RecvDoWorkAsync(client, bytesTransferred > 0); - _sendTask = SendDoWorkAsync(client); + _tasks[0] = RecvDoWorkAsync(client, bytesTransferred > 0); + _tasks[1] = SendDoWorkAsync(client); } @@ -185,7 +194,6 @@ namespace VNLib.Net.Transport.Tcp private async Task SendDoWorkAsync(Socket sock) { - Exception? cause = null; try { //Enter work loop @@ -247,23 +255,24 @@ namespace VNLib.Net.Transport.Tcp break; } } + + //All done, complete the send pipe reader + await SendPipe.Reader.CompleteAsync(); } catch (Exception ex) { - cause = ex; + //Complete the send pipe reader + await SendPipe.Reader.CompleteAsync(ex); } finally { _sendReadRes = default; - - //Complete the send pipe writer - await SendPipe.Reader.CompleteAsync(cause); - //Cancel the recv task _cts!.Cancel(); } } + private FlushResult _recvFlushRes; private async Task RecvDoWorkAsync(Socket sock, bool initialData) @@ -344,6 +353,7 @@ namespace VNLib.Net.Transport.Tcp } } + /// <summary> /// The internal cleanup/dispose method to be called /// when the pipeline is no longer needed @@ -354,10 +364,9 @@ namespace VNLib.Net.Transport.Tcp SendTimer.Dispose(); //Perform some managed cleanup - + //Cleanup tasks - _recvTask = null; - _sendTask = null; + Array.Clear(_tasks); //Cleanup cts _cts?.Dispose(); @@ -544,17 +553,18 @@ namespace VNLib.Net.Transport.Tcp Task ITransportInterface.CloseAsync() { //Complete the send pipe writer since stream is closed - ValueTask vt = SendPipe.Writer.CompleteAsync(); + _tasks[2] = SendPipe.Writer.CompleteAsync().AsTask(); + //Complete the recv pipe reader since its no longer used - ValueTask rv = RecvPipe.Reader.CompleteAsync(); + _tasks[3] = RecvPipe.Reader.CompleteAsync().AsTask(); + //Join worker tasks, no alloc if completed sync, otherwise alloc anyway - return Task.WhenAll(vt.AsTask(), rv.AsTask(), _recvTask!, _sendTask!); + return Task.WhenAll(_tasks); } private static class ThrowHelpers - { - [MethodImpl(MethodImplOptions.AggressiveInlining)] + { public static void ThrowWriterCanceled() { throw new OperationCanceledException("The write operation was canceled by the underlying PipeWriter"); diff --git a/lib/Net.Transport.SimpleTCP/src/TcpServer.cs b/lib/Net.Transport.SimpleTCP/src/TcpServer.cs index 6deeb56..b85aa33 100644 --- a/lib/Net.Transport.SimpleTCP/src/TcpServer.cs +++ b/lib/Net.Transport.SimpleTCP/src/TcpServer.cs @@ -65,9 +65,13 @@ namespace VNLib.Net.Transport.Tcp /// <exception cref="ArgumentOutOfRangeException"></exception> public TcpServer(TCPConfig config, PipeOptions? pipeOptions = null) { - Config = config; //Check config - _ = config.BufferPool ?? throw new ArgumentException("Buffer pool argument cannot be null"); + if(pipeOptions == null) + { + //Pool is required when using default pipe options + _ = config.BufferPool ?? throw new ArgumentException("Buffer pool argument cannot be null"); + } + _ = config.Log ?? throw new ArgumentException("Log argument is required"); if (config.MaxRecvBufferData < 4096) @@ -82,10 +86,13 @@ namespace VNLib.Net.Transport.Tcp { config.Log.Debug("Suggestion: Setting accept threads to {pc}", Environment.ProcessorCount); } + + Config = config; + //Cache pipe options PipeOptions = pipeOptions ?? new( config.BufferPool, - readerScheduler:PipeScheduler.ThreadPool, + readerScheduler:PipeScheduler.ThreadPool, writerScheduler:PipeScheduler.ThreadPool, pauseWriterThreshold: config.MaxRecvBufferData, minimumSegmentSize: 8192, @@ -240,12 +247,23 @@ namespace VNLib.Net.Transport.Tcp args.Dispose(); return; } - //Check for error on accept, and if no error, enqueue the socket, otherwise disconnect the socket - if (!args.EndAccept() || !WaitingSockets!.TryEnque(args)) + + //Check for error and log it + if (!args.EndAccept()) { - //Disconnect the socket (will return the args to the pool) args.Disconnect(); + Config.Log.Debug("Socket accept failed with error code {ec}", args.SocketError); + return; } + + //Try to enqueue the args to the waiting queue, if the queue is full, disconnect the socket + if (!WaitingSockets!.TryEnque(args)) + { + args.Disconnect(); + Config.Log.Warn("Socket {e} disconnected because the waiting queue is overflowing", args.GetHashCode()); + return; + } + //Accept a new connection AcceptConnection(); } @@ -265,29 +283,44 @@ namespace VNLib.Net.Transport.Tcp return ValueTask.FromResult<TransportEventContext>(new(args, args.Stream)); } - return AcceptAsyncCore(cancellation); - } + return AcceptAsyncCore(this, cancellation); - private async ValueTask<TransportEventContext> AcceptAsyncCore(CancellationToken cancellation) - { - //Await async - VnSocketAsyncArgs args = await WaitingSockets.DequeueAsync(cancellation); + static async ValueTask<TransportEventContext> AcceptAsyncCore(TcpServer server, CancellationToken cancellation) + { + //Await async + VnSocketAsyncArgs args = await server.WaitingSockets!.DequeueAsync(cancellation); - return new(args, args.Stream); + return new(args, args.Stream); + } } - internal ValueTask<VnSocketAsyncArgs> AcceptArgsAsync(CancellationToken cancellation) + /// <summary> + /// Accepts a connection and returns the connection descriptor. + /// </summary> + /// <param name="cancellation">A token to cancel the operation</param> + /// <returns>The connection descriptor</returns> + /// <remarks> + /// NOTE: You must always call the <see cref="ITcpConnectionDescriptor.CloseConnection"/> and + /// destroy all references to it when you are done. You must also dispose the stream returned + /// from the <see cref="ITcpConnectionDescriptor.GetStream"/> method. + /// </remarks> + /// <exception cref="InvalidOperationException"></exception> + public ValueTask<ITcpConnectionDescriptor> AcceptConnectionAsync(CancellationToken cancellation) { _ = WaitingSockets ?? throw new InvalidOperationException("Server is not listening"); //Try get args from queue if (WaitingSockets.TryDequeue(out VnSocketAsyncArgs? args)) { - return ValueTask.FromResult(args); + return ValueTask.FromResult<ITcpConnectionDescriptor>(args); } - else + + return AcceptConnectionAsyncCore(this, cancellation); + + static async ValueTask<ITcpConnectionDescriptor> AcceptConnectionAsyncCore(TcpServer server, CancellationToken cancellation) { - return WaitingSockets.DequeueAsync(cancellation); + //Await async + return await server.WaitingSockets!.DequeueAsync(cancellation); } } } diff --git a/lib/Net.Transport.SimpleTCP/src/TcpServerExtensions.cs b/lib/Net.Transport.SimpleTCP/src/TcpServerExtensions.cs index a6a4327..3b86b5b 100644 --- a/lib/Net.Transport.SimpleTCP/src/TcpServerExtensions.cs +++ b/lib/Net.Transport.SimpleTCP/src/TcpServerExtensions.cs @@ -48,10 +48,10 @@ namespace VNLib.Net.Transport.Tcp public static async Task<TransportEventContext> AcceptSslAsync(this TcpServer server, SslServerAuthenticationOptions options, CancellationToken cancellation = default) { //accept internal args - VnSocketAsyncArgs args = await server.AcceptArgsAsync(cancellation); + ITcpConnectionDescriptor args = await server.AcceptConnectionAsync(cancellation); //Begin authenication and make sure the socket stream is closed as its required to cleanup - SslStream stream = new(args.Stream, false); + SslStream stream = new(args.GetStream(), false); try { //auth the new connection @@ -64,14 +64,14 @@ namespace VNLib.Net.Transport.Tcp await stream.DisposeAsync(); //Disconnect socket - args.Disconnect(); + args.CloseConnection(); throw new AuthenticationException("Failed client/server TLS authentication", ex); } } /// <summary> - /// Safley closes an ssl connection + /// Safely closes an ssl connection /// </summary> /// <param name="ctx">The context to close the connection on</param> /// <returns>A value task that completes when the connection is closed</returns> diff --git a/lib/Net.Transport.SimpleTCP/src/TransportEventContext.cs b/lib/Net.Transport.SimpleTCP/src/TransportEventContext.cs index 994b2ba..ec686ca 100644 --- a/lib/Net.Transport.SimpleTCP/src/TransportEventContext.cs +++ b/lib/Net.Transport.SimpleTCP/src/TransportEventContext.cs @@ -25,70 +25,75 @@ using System; using System.IO; using System.Net; -using System.Net.Sockets; -using System.Net.Security; -using System.Security.Authentication; -using System.Runtime.CompilerServices; using System.Threading.Tasks; - namespace VNLib.Net.Transport.Tcp { + /// <summary> /// Represents the context of a transport connection. It includes the active socket /// and a stream representing the active transport. /// </summary> public readonly record struct TransportEventContext { - /// <summary> - /// The socket referrence to the incoming connection - /// </summary> - private readonly Socket Socket; - - private readonly VnSocketAsyncArgs _socketArgs; + private readonly ITcpConnectionDescriptor _descriptor; /// <summary> /// A copy of the local endpoint of the listening socket /// </summary> - public readonly IPEndPoint LocalEndPoint - { - [MethodImpl(MethodImplOptions.AggressiveInlining)] - get => (Socket.LocalEndPoint as IPEndPoint)!; - } + public readonly IPEndPoint LocalEndPoint; /// <summary> /// The <see cref="IPEndPoint"/> representing the client's connection information /// </summary> - public readonly IPEndPoint RemoteEndpoint - { - [MethodImpl(MethodImplOptions.AggressiveInlining)] - get => (Socket.RemoteEndPoint as IPEndPoint)!; - } + public readonly IPEndPoint RemoteEndpoint; /// <summary> - /// The transport stream to be actively read + /// The transport stream that wraps the connection /// </summary> public readonly Stream ConnectionStream; - internal TransportEventContext(VnSocketAsyncArgs args, Stream @stream) + /// <summary> + /// Creates a new <see cref="TransportEventContext"/> wrapper for the given connection descriptor + /// and captures the default stream from the descriptor. + /// </summary> + /// <param name="descriptor">The connection to wrap</param> + public TransportEventContext(ITcpConnectionDescriptor descriptor):this(descriptor, descriptor.GetStream()) + { } + + /// <summary> + /// Creates a new <see cref="TransportEventContext"/> wrapper for the given connection descriptor + /// and your custom stream implementation. + /// </summary> + /// <param name="descriptor">The connection descriptor to wrap</param> + /// <param name="customStream">Your custom stream wrapper around the transport stream</param> + public TransportEventContext(ITcpConnectionDescriptor descriptor, Stream customStream) { - _socketArgs = args; - Socket = args.AcceptSocket!; - ConnectionStream = stream; + _descriptor = descriptor; + ConnectionStream = customStream; + + //Call once and store locally + LocalEndPoint = (descriptor.Socket.LocalEndPoint as IPEndPoint)!; + RemoteEndpoint = (descriptor.Socket.RemoteEndPoint as IPEndPoint)!; } /// <summary> - /// Closes a connection and cleans up any resources + /// Cleans up the stream and closes the connection descriptor /// </summary> - /// <returns></returns> - public async ValueTask CloseConnectionAsync() + /// <returns>A value-task that completes when the resources have been cleaned up</returns> + public readonly async ValueTask CloseConnectionAsync() { - //dispose the stream and wait for buffered data to be sent - await ConnectionStream.DisposeAsync(); - - //Disconnect - _socketArgs.Disconnect(); + try + { + //dispose the stream and wait for buffered data to be sent + await ConnectionStream.DisposeAsync(); + } + finally + { + //Disconnect + _descriptor.CloseConnection(); + } } } }
\ No newline at end of file diff --git a/lib/Net.Transport.SimpleTCP/src/VnSocketAsyncArgs.cs b/lib/Net.Transport.SimpleTCP/src/VnSocketAsyncArgs.cs index 766a866..0481f21 100644 --- a/lib/Net.Transport.SimpleTCP/src/VnSocketAsyncArgs.cs +++ b/lib/Net.Transport.SimpleTCP/src/VnSocketAsyncArgs.cs @@ -36,13 +36,14 @@ namespace VNLib.Net.Transport.Tcp /// Reusable <see cref="SocketAsyncEventArgs"/> that manages a pipeline for sending and recieving data. /// on the connected socket /// </summary> - internal sealed class VnSocketAsyncArgs : SocketAsyncEventArgs, IReusable + internal sealed class VnSocketAsyncArgs : SocketAsyncEventArgs, ITcpConnectionDescriptor, IReusable { private readonly ISockAsyncArgsHandler _handler; public readonly SocketPipeLineWorker SocketWorker; public Stream Stream => SocketWorker.NetworkStream; + public VnSocketAsyncArgs(ISockAsyncArgsHandler handler, PipeOptions options) : base() { @@ -142,7 +143,17 @@ namespace VNLib.Net.Transport.Tcp SocketError = SocketError.Success; SocketFlags = SocketFlags.None; } - + + ///<inheritdoc/> + Socket ITcpConnectionDescriptor.Socket => AcceptSocket!; + + ///<inheritdoc/> + Stream ITcpConnectionDescriptor.GetStream() => SocketWorker.NetworkStream; + + ///<inheritdoc/> + void ITcpConnectionDescriptor.CloseConnection() => Disconnect(); + + void IReusable.Prepare() { SocketWorker.Prepare(); @@ -174,5 +185,6 @@ namespace VNLib.Net.Transport.Tcp //Cleanup socket worker SocketWorker.DisposeInternal(); } + } } |