From de94d788e9a47432a7630a8215896b8dd3628599 Mon Sep 17 00:00:00 2001 From: vnugent Date: Sun, 8 Jan 2023 16:01:54 -0500 Subject: Reorder + analyzer cleanup --- .../src/ITransportInterface.cs | 85 ++++ .../src/ReusableNetworkStream.cs | 153 ++++++ .../src/SocketPipeLineWorker.cs | 521 +++++++++++++++++++++ lib/Net.Transport.SimpleTCP/src/TCPConfig.cs | 97 ++++ lib/Net.Transport.SimpleTCP/src/TcpServer.cs | 289 ++++++++++++ .../src/TransportEventContext.cs | 123 +++++ .../src/VNLib.Net.Transport.SimpleTCP.csproj | 45 ++ .../src/VnSocketAsyncArgs.cs | 181 +++++++ 8 files changed, 1494 insertions(+) create mode 100644 lib/Net.Transport.SimpleTCP/src/ITransportInterface.cs create mode 100644 lib/Net.Transport.SimpleTCP/src/ReusableNetworkStream.cs create mode 100644 lib/Net.Transport.SimpleTCP/src/SocketPipeLineWorker.cs create mode 100644 lib/Net.Transport.SimpleTCP/src/TCPConfig.cs create mode 100644 lib/Net.Transport.SimpleTCP/src/TcpServer.cs create mode 100644 lib/Net.Transport.SimpleTCP/src/TransportEventContext.cs create mode 100644 lib/Net.Transport.SimpleTCP/src/VNLib.Net.Transport.SimpleTCP.csproj create mode 100644 lib/Net.Transport.SimpleTCP/src/VnSocketAsyncArgs.cs (limited to 'lib/Net.Transport.SimpleTCP/src') diff --git a/lib/Net.Transport.SimpleTCP/src/ITransportInterface.cs b/lib/Net.Transport.SimpleTCP/src/ITransportInterface.cs new file mode 100644 index 0000000..7d21995 --- /dev/null +++ b/lib/Net.Transport.SimpleTCP/src/ITransportInterface.cs @@ -0,0 +1,85 @@ +/* +* Copyright (c) 2022 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Net.Transport.SimpleTCP +* File: ITransportInterface.cs +* +* ITransportInterface.cs is part of VNLib.Net.Transport.SimpleTCP which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Net.Transport.SimpleTCP is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 2 of the +* License, or (at your option) any later version. +* +* VNLib.Net.Transport.SimpleTCP is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + + +using System; +using System.Threading; +using System.Threading.Tasks; + +namespace VNLib.Net.Transport.Tcp +{ + /// + /// Abstraction layer for TCP transport operations with + /// sync and async support. + /// + interface ITransportInterface + { + /// + /// Gets or sets the read timeout in milliseconds + /// + int RecvTimeoutMs { get; set; } + + /// + /// Gets or set the time (in milliseconds) the transport should wait for a send operation + /// + int SendTimeoutMs { get; set; } + + /// + /// Performs an asynchronous send operation + /// + /// The buffer containing the data to send to the client + /// A token to cancel the operation + /// A ValueTask that completes when the send operation is complete + ValueTask SendAsync(ReadOnlyMemory data, CancellationToken cancellation); + + /// + /// Performs an asynchronous send operation + /// + /// The data buffer to write received data to + /// A token to cancel the operation + /// A ValueTask that returns the number of bytes read into the buffer + ValueTask RecvAsync(Memory buffer, CancellationToken cancellation); + + /// + /// Performs a synchronous send operation + /// + /// The buffer to send to the client + void Send(ReadOnlySpan data); + + /// + /// Performs a synchronous receive operation + /// + /// The buffer to copy output data to + /// The number of bytes received + int Recv(Span buffer); + + /// + /// Raised when the interface is no longer required and resources + /// related to the connection should be released. + /// + /// A task that resolves when the operation is complete + Task CloseAsync(); + + } +} \ No newline at end of file diff --git a/lib/Net.Transport.SimpleTCP/src/ReusableNetworkStream.cs b/lib/Net.Transport.SimpleTCP/src/ReusableNetworkStream.cs new file mode 100644 index 0000000..f4a5491 --- /dev/null +++ b/lib/Net.Transport.SimpleTCP/src/ReusableNetworkStream.cs @@ -0,0 +1,153 @@ +/* +* Copyright (c) 2022 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Net.Transport.SimpleTCP +* File: ReusableNetworkStream.cs +* +* ReusableNetworkStream.cs is part of VNLib.Net.Transport.SimpleTCP which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Net.Transport.SimpleTCP is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 2 of the +* License, or (at your option) any later version. +* +* VNLib.Net.Transport.SimpleTCP is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +/* + * A special stream that sits betnween the socket/pipeline listener + * that marshals data between the application and the socket pipeline. + * This stream uses a timer to cancel recv events. Because of this and + * pipeline aspects, it supports full duplex IO but it is not thread safe. + * + * IE one thread can read and write, but not more + */ + + +using System; +using System.IO; +using System.Threading; +using System.Threading.Tasks; + +using VNLib.Utils.Extensions; + +namespace VNLib.Net.Transport.Tcp +{ + + /// + /// A reusable stream that marshals data between the socket pipeline and the application + /// + internal sealed class ReusableNetworkStream : Stream + { + #region stream basics + public override bool CanRead => true; + public override bool CanSeek => false; + public override bool CanWrite => true; + public override bool CanTimeout => true; + public override long Length => throw new NotSupportedException(); + public override long Position { get => throw new NotSupportedException(); set => throw new NotImplementedException(); } + public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException(); + public override void SetLength(long value) => throw new NotSupportedException(); + #endregion + + //Read timeout to use when receiving data + public override int ReadTimeout + { + get => Transport.RecvTimeoutMs; + //Allow -1 to set to infinite timeout + set => Transport.RecvTimeoutMs = value > -2 ? value : throw new ArgumentException("Write timeout must be a 32bit signed integer larger than 0"); + } + + // Write timeout is not currently used, becasue the writer managed socket timeouts + public override int WriteTimeout + { + get => Transport.SendTimeoutMs; + //Allow -1 to set to infinite timeout + set => Transport.SendTimeoutMs = value > -2 ? value : throw new ArgumentException("Write timeout must be a 32bit signed integer larger than -1"); + } + + //Timer used to cancel pipeline recv timeouts + private readonly ITransportInterface Transport; + + internal ReusableNetworkStream(ITransportInterface transport) + { + Transport = transport; + } + + /// + public override void Close() + { } + /// + public override Task FlushAsync(CancellationToken cancellationToken) => Task.CompletedTask; + /// + public override void Flush() + { } + + /// + public override int Read(byte[] buffer, int offset, int count) => Read(buffer.AsSpan(offset, count)); + /// + public override int Read(Span buffer) => Transport.Recv(buffer); + + /// + public override Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + //Since read returns a value, it isnt any cheaper not to alloc a task around the value-task + return ReadAsync(buffer.AsMemory(offset, count), cancellationToken).AsTask(); + } + /// + public override ValueTask ReadAsync(Memory buffer, CancellationToken cancellationToken = default) + { + return Transport.RecvAsync(buffer, cancellationToken); + } + + /// + public override void Write(byte[] buffer, int offset, int count) => Write(buffer.AsSpan(offset, count)); + /// + public override void Write(ReadOnlySpan buffer) => Transport.Send(buffer); + + public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + //Allow synchronous complete to avoid alloc + return WriteAsync(buffer.AsMemory(offset, count), cancellationToken).AsTask(); + } + + /// + /// + /// + public override ValueTask WriteAsync(ReadOnlyMemory buffer, CancellationToken cancellation = default) + { + return Transport.SendAsync(buffer, cancellation); + } + + /* + * Override dispose to intercept base cleanup until the internal release + */ + /// + /// Not supported + /// + public new void Dispose() + { + //Call sync + Task closing = Transport.CloseAsync(); + closing.Wait(); + } + + public override ValueTask DisposeAsync() + { + return new ValueTask(Transport.CloseAsync()); + } + + public override Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken) + { + throw new NotSupportedException("CopyToAsync is not supported"); + } + } +} \ No newline at end of file diff --git a/lib/Net.Transport.SimpleTCP/src/SocketPipeLineWorker.cs b/lib/Net.Transport.SimpleTCP/src/SocketPipeLineWorker.cs new file mode 100644 index 0000000..89c46e1 --- /dev/null +++ b/lib/Net.Transport.SimpleTCP/src/SocketPipeLineWorker.cs @@ -0,0 +1,521 @@ +/* +* Copyright (c) 2022 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Net.Transport.SimpleTCP +* File: SocketPipeLineWorker.cs +* +* SocketPipeLineWorker.cs is part of VNLib.Net.Transport.SimpleTCP which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Net.Transport.SimpleTCP is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 2 of the +* License, or (at your option) any later version. +* +* VNLib.Net.Transport.SimpleTCP is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.IO; +using System.Buffers; +using System.Threading; +using System.Net.Sockets; +using System.IO.Pipelines; +using System.Threading.Tasks; + +using VNLib.Utils.Memory; +using VNLib.Utils.Memory.Caching; +using VNLib.Utils.Extensions; + +namespace VNLib.Net.Transport.Tcp +{ + /// + /// A reuseable socket pipeline provider, that marshals data from a network stream + /// to a connected socket. + /// + internal sealed class SocketPipeLineWorker : ITransportInterface, IReusable + { + public void Prepare() + {} + + public bool Release() + { + /* + * If the pipeline has been started, then the pipes + * will be completed by the worker threads (or by the streams) + * and when release is called, there will no longer be + * an observer for the result, which means the pipes + * may be safely reset for reuse + */ + if (_recvTask != null) + { + //Since the + SendPipe.Reset(); + RecvPipe.Reset(); + } + /* + * If socket had an error and was not started, + * it means there may be data written to the + * recv pipe from the accept operation, that + * needs to be cleared + */ + else + { + //Complete the recvpipe then reset it to discard buffered data + RecvPipe.Reader.Complete(); + RecvPipe.Writer.Complete(); + //now reset it + RecvPipe.Reset(); + } + + //Cleanup tasks + _recvTask = null; + _sendTask = null; + + //Cleanup cts + _cts?.Dispose(); + _cts = null; + + return true; + } + + private Task? _recvTask; + private Task? _sendTask; + + private CancellationTokenSource? _cts; + + public readonly ReusableNetworkStream NetworkStream; + + private readonly Pipe SendPipe; + private readonly Pipe RecvPipe; + private readonly Timer RecvTimer; + private readonly Timer SendTimer; + private readonly Stream RecvStream; + + public int SendTimeoutMs { get; set; } + + public int RecvTimeoutMs { get; set; } + + + /// + /// Initalizes a new reusable socket pipeline worker + /// + /// + public SocketPipeLineWorker(PipeOptions pipeOptions) + { + //Init pipes + SendPipe = new(pipeOptions); + RecvPipe = new(pipeOptions); + + RecvStream = RecvPipe.Reader.AsStream(true); + + //Init timers to infinite + RecvTimer = new(OnRecvTimerElapsed, state: this, Timeout.Infinite, Timeout.Infinite); + SendTimer = new(OnSendTimerElapsed, state: this, Timeout.Infinite, Timeout.Infinite); + + //Init reusable network stream + NetworkStream = new(this); + + SendTimeoutMs = Timeout.Infinite; + RecvTimeoutMs = Timeout.Infinite; + } + + /// + /// Gets a buffer used during a socket accept operation + /// + /// The size hint of the buffer to get + /// A memory structure of the specified size + public Memory GetMemory(int bufferSize) => RecvPipe.Writer.GetMemory(bufferSize); + + /// + /// Begins async work to receive and send data on a connected socket + /// + /// The socket to read/write from + /// The number of bytes to be commited + public void Start(Socket client, int bytesTransferred) + { + //Advance writer + RecvPipe.Writer.Advance(bytesTransferred); + //begin recv tasks, and pass inital data to be flushed flag + _recvTask = RecvDoWorkAsync(client, bytesTransferred > 0); + _sendTask = SendDoWorkAsync(client); + } + + + /* + * NOTES + * + * Timers used to maintain resource exhuastion independent + * of the actual socket pipeline, so to preserve the state + * of the pipelines until the writer is closed. + * + * This choice was made to allow the api consumer to decide how to + * process a timeout without affecting the state of the pipelines + * or socket until the close event. + */ + + private void OnRecvTimerElapsed(object? state) + { + //cancel pending read on recv pipe when timout expires + RecvPipe.Reader.CancelPendingRead(); + } + + private void OnSendTimerElapsed(object? state) + { + //Cancel pending flush + SendPipe.Writer.CancelPendingFlush(); + } + + /* + * Pipeline worker tasks. Listen for data on the socket, + * and listen for data on the pipe to marshal data between + * the pipes and the socket + */ + + private async Task SendDoWorkAsync(Socket sock) + { + Exception? cause = null; + try + { + //Enter work loop + while (true) + { + //wait for data from the write pipe and write it to the socket + ReadResult result = await SendPipe.Reader.ReadAsync(); + //Catch error/cancel conditions and break the loop + if (result.IsCanceled || !sock.Connected || result.Buffer.IsEmpty) + { + break; + } + //get sequence + ReadOnlySequence buffer = result.Buffer; + + //Get enumerator to write memory segments + ReadOnlySequence.Enumerator enumerator = buffer.GetEnumerator(); + + //Begin enumerator + while (enumerator.MoveNext()) + { + + /* + * Using a foward only reader allows the following loop + * to track the ammount of data written to the socket + * until the entire segment has been sent or if it has + * move to the next segment + */ + + ForwardOnlyMemoryReader reader = new(enumerator.Current); + + while(reader.WindowSize > 0) + { + //Write segment to socket, and upate written data + int written = await sock.SendAsync(reader.Window, SocketFlags.None); + + if(written >= reader.WindowSize) + { + //All data was written + break; + } + + //Advance unread window to end of the written data + reader.Advance(written); + } + //Advance to next window/segment + } + + //Advance pipe + SendPipe.Reader.AdvanceTo(buffer.End); + + //Pipe has been completed and all data was written + if (result.IsCompleted) + { + break; + } + } + } + catch (Exception ex) + { + cause = ex; + } + finally + { + //Complete the send pipe writer + await SendPipe.Reader.CompleteAsync(cause); + + //Cancel the recv task + _cts!.Cancel(); + } + } + + private async Task RecvDoWorkAsync(Socket sock, bool initialData) + { + //init new cts + _cts = new(); + + Exception? cause = null; + try + { + //Avoid syscall? + int bufferSize = sock.ReceiveBufferSize; + + //If initial data was buffered, it needs to be published to the reader + if (initialData) + { + //Flush initial data + FlushResult res = await RecvPipe.Writer.FlushAsync(CancellationToken.None); + + if (res.IsCompleted || res.IsCanceled) + { + //Exit + return; + } + } + + //Enter work loop + while (true) + { + //Get buffer from pipe writer + Memory buffer = RecvPipe.Writer.GetMemory(bufferSize); + + //Wait for data or error from socket + int count = await sock.ReceiveAsync(buffer, SocketFlags.None, _cts.Token); + + //socket returned emtpy data + if (count == 0 || !sock.Connected) + { + break; + } + + //Advance/notify the pipe + RecvPipe.Writer.Advance(count); + + //Flush data at top of loop, since data is available from initial accept + FlushResult res = await RecvPipe.Writer.FlushAsync(CancellationToken.None); + + //Writing has completed, time to exit + if (res.IsCompleted || res.IsCanceled) + { + break; + } + } + } + //Normal exit + catch (OperationCanceledException) + {} + catch (SocketException se) + { + cause = se; + //Cancel sending reader task because the socket has an error and cannot be used + SendPipe.Reader.CancelPendingRead(); + } + catch (Exception ex) + { + cause = ex; + } + finally + { + //Stop timer incase exception + RecvTimer.Stop(); + + //Cleanup and complete the writer + await RecvPipe.Writer.CompleteAsync(cause); + //The recv reader is completed by the network stream + } + } + + /// + /// The internal cleanup/dispose method to be called + /// when the pipeline is no longer needed + /// + public void DisposeInternal() + { + RecvTimer.Dispose(); + SendTimer.Dispose(); + + //Perform some managed cleanup + + //Cleanup tasks + _recvTask = null; + _sendTask = null; + + //Cleanup cts + _cts?.Dispose(); + _cts = null; + } + + + private static async Task AwaitFlushTask(ValueTask valueTask, Timer? sendTimer) + { + try + { + FlushResult result = await valueTask.ConfigureAwait(false); + + if (result.IsCanceled) + { + throw new OperationCanceledException("The operation was canceled by the underlying PipeWriter"); + } + } + finally + { + sendTimer?.Stop(); + } + } + + private ValueTask SendWithTimerInternalAsync(ReadOnlyMemory data, CancellationToken cancellation) + { + //Start send timer + SendTimer.Restart(SendTimeoutMs); + try + { + //Send the segment + ValueTask result = SendPipe.Writer.WriteAsync(data, cancellation); + + //Task completed successfully, so + if (result.IsCompletedSuccessfully) + { + //Stop timer + SendTimer.Stop(); + + //Safe to get the rseult + FlushResult fr = result.Result; + //Check for canceled and throw + return fr.IsCanceled + ? throw new OperationCanceledException("The write operation was canceled by the underlying PipeWriter") + : ValueTask.CompletedTask; + } + else + { + //Wrap the task in a ValueTask since it must be awaited, and will happen on background thread + return new(AwaitFlushTask(result, SendTimer)); + } + } + catch + { + //Stop timer on exception + SendTimer.Stop(); + throw; + } + } + + private ValueTask SendWithoutTimerInternalAsync(ReadOnlyMemory data, CancellationToken cancellation) + { + //Send the segment + ValueTask result = SendPipe.Writer.WriteAsync(data, cancellation); + + //Task completed successfully, so + if (result.IsCompletedSuccessfully) + { + //Safe to get the rseult + FlushResult fr = result.Result; + //Check for canceled and throw + return fr.IsCanceled + ? throw new OperationCanceledException("The write operation was canceled by the underlying PipeWriter") + : ValueTask.CompletedTask; + } + else + { + //Wrap the task in a ValueTask since it must be awaited, and will happen on background thread + return new(AwaitFlushTask(result, null)); + } + } + + ValueTask ITransportInterface.SendAsync(ReadOnlyMemory data, CancellationToken cancellation) + { + //Use timer if timeout is set, dont otherwise + return SendTimeoutMs < 1 ? SendWithoutTimerInternalAsync(data, cancellation) : SendWithTimerInternalAsync(data, cancellation); + } + + + void ITransportInterface.Send(ReadOnlySpan data) + { + //Determine if the send timer should be used + Timer? _timer = SendTimeoutMs < 1 ? null : SendTimer; + + //Write data directly to the writer buffer + SendPipe.Writer.Write(data); + + //Start send timer + _timer?.Restart(SendTimeoutMs); + + try + { + //Send the segment + ValueTask result = SendPipe.Writer.FlushAsync(CancellationToken.None); + + //Task completed successfully, so + if (result.IsCompletedSuccessfully) + { + //Safe to get the rseult + FlushResult fr = result.Result; + + //Check for canceled and throw + if (fr.IsCanceled) + { + throw new OperationCanceledException("The write operation was canceled by the underlying PipeWriter"); + } + } + else + { + //Await the result + FlushResult fr = result.ConfigureAwait(false).GetAwaiter().GetResult(); + + if (fr.IsCanceled) + { + throw new OperationCanceledException("The write operation was canceled by the underlying PipeWriter"); + } + } + } + finally + { + //Stop timer + _timer?.Stop(); + } + } + + async ValueTask ITransportInterface.RecvAsync(Memory buffer, CancellationToken cancellation) + { + //Restart timer + RecvTimer.Restart(RecvTimeoutMs); + try + { + return await RecvStream.ReadAsync(buffer, cancellation); + } + finally + { + RecvTimer.Stop(); + } + } + + int ITransportInterface.Recv(Span buffer) + { + //Restart timer + RecvTimer.Restart(RecvTimeoutMs); + try + { + return RecvStream.Read(buffer); + } + finally + { + RecvTimer.Stop(); + } + } + + Task ITransportInterface.CloseAsync() + { + //Complete the send pipe writer since stream is closed + ValueTask vt = SendPipe.Writer.CompleteAsync(); + //Complete the recv pipe reader since its no longer used + ValueTask rv = RecvPipe.Reader.CompleteAsync(); + //Join worker tasks, no alloc if completed sync, otherwise alloc anyway + return Task.WhenAll(vt.AsTask(), rv.AsTask(), _recvTask!, _sendTask!); + } + + } +} diff --git a/lib/Net.Transport.SimpleTCP/src/TCPConfig.cs b/lib/Net.Transport.SimpleTCP/src/TCPConfig.cs new file mode 100644 index 0000000..6955e63 --- /dev/null +++ b/lib/Net.Transport.SimpleTCP/src/TCPConfig.cs @@ -0,0 +1,97 @@ +/* +* Copyright (c) 2022 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Net.Transport.SimpleTCP +* File: TCPConfig.cs +* +* TCPConfig.cs is part of VNLib.Net.Transport.SimpleTCP which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Net.Transport.SimpleTCP is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 2 of the +* License, or (at your option) any later version. +* +* VNLib.Net.Transport.SimpleTCP is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Net; +using System.Buffers; +using System.Net.Sockets; +using System.Net.Security; + +using VNLib.Utils.Logging; + +namespace VNLib.Net.Transport.Tcp +{ + /// + /// Represents the required configuration variables for the transport + /// + public readonly struct TCPConfig + { + /// + /// The the listening socket will bind to + /// + public readonly IPEndPoint LocalEndPoint { get; init; } + /// + /// The log provider used to write logging information to + /// + public readonly ILogProvider Log { get; init; } + /// + /// If TCP keepalive is enabled, the amount of time the connection is considered alive before another probe message is sent + /// + public readonly int TcpKeepAliveTime { get; init; } + /// + /// If TCP keepalive is enabled, the amount of time the connection will wait for a keepalive message + /// + public readonly int KeepaliveInterval { get; init; } + /// + /// Enables TCP keepalive + /// + public readonly bool TcpKeepalive { get; init; } + /// + /// The authentication options to use for processing TLS connections. This value must be set when a certificate has been specified + /// + public readonly SslServerAuthenticationOptions? AuthenticationOptions { get; init; } + /// + /// The maximum number of waiting WSA asynchronous socket accept operations + /// + public readonly uint AcceptThreads { get; init; } + /// + /// The maximum size (in bytes) the transport will buffer in + /// the receiving pipeline. + /// + public readonly int MaxRecvBufferData { get; init; } + /// + /// The listener socket backlog count + /// + public readonly int BackLog { get; init; } + /// + /// The to allocate transport buffers from + /// + public readonly MemoryPool BufferPool { get; init; } + /// + /// + /// The maxium number of event objects that will be cached + /// during normal operation + /// + /// + /// WARNING: Setting this value too low will cause significant CPU overhead and GC load + /// + /// + public readonly int CacheQuota { get; init; } + /// + /// An optional callback invoked after the socket has been created + /// for optional appliction specific socket configuration + /// + public Action? OnSocketCreated { get; init; } + } +} \ No newline at end of file diff --git a/lib/Net.Transport.SimpleTCP/src/TcpServer.cs b/lib/Net.Transport.SimpleTCP/src/TcpServer.cs new file mode 100644 index 0000000..fc0bcc5 --- /dev/null +++ b/lib/Net.Transport.SimpleTCP/src/TcpServer.cs @@ -0,0 +1,289 @@ +/* +* Copyright (c) 2022 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Net.Transport.SimpleTCP +* File: TcpServer.cs +* +* TcpServer.cs is part of VNLib.Net.Transport.SimpleTCP which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Net.Transport.SimpleTCP is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 2 of the +* License, or (at your option) any later version. +* +* VNLib.Net.Transport.SimpleTCP is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Security; +using System.Threading; +using System.Net.Sockets; +using System.Net.Security; +using System.IO.Pipelines; +using System.Threading.Tasks; +using System.Security.Authentication; +using System.Runtime.CompilerServices; + +using VNLib.Utils.Async; +using VNLib.Utils.Logging; +using VNLib.Utils.Memory.Caching; + +namespace VNLib.Net.Transport.Tcp +{ + /// + /// + /// Provides a simple, high performance, single process, low/no allocation, + /// asynchronous, TCP socket server. + /// + /// + /// IO operations are full duplex so pipe-lining reused + /// connections is expected. This class cannot be inherited + /// + /// + public sealed class TcpServer : ICacheHolder + { + /// + /// The current configuration + /// + public TCPConfig Config { get; } + + private readonly ObjectRental SockAsyncArgPool; + private readonly PipeOptions PipeOptions; + private readonly bool _usingTls; + + /// + /// Initializes a new with the specified + /// + /// Configuration to inalize with + /// Optional otherwise uses default + /// + /// + public TcpServer(TCPConfig config, PipeOptions? pipeOptions = null) + { + Config = config; + //Check config + _ = config.BufferPool ?? throw new ArgumentException("Buffer pool argument cannot be null"); + _ = config.Log ?? throw new ArgumentException("Log argument is required"); + + if (config.MaxRecvBufferData < 4096) + { + throw new ArgumentException("MaxRecvBufferData size must be at least 4096 bytes to avoid data pipeline pefromance issues"); + } + if(config.AcceptThreads < 1) + { + throw new ArgumentException("Accept thread count must be greater than 0"); + } + if(config.AcceptThreads > Environment.ProcessorCount) + { + config.Log.Debug("Suggestion: Setting accept threads to {pc}", Environment.ProcessorCount); + } + //Cache pipe options + PipeOptions = pipeOptions ?? new( + config.BufferPool, + readerScheduler:PipeScheduler.ThreadPool, + writerScheduler:PipeScheduler.ThreadPool, + pauseWriterThreshold: config.MaxRecvBufferData, + minimumSegmentSize: 8192, + useSynchronizationContext:false + ); + //store tls value + _usingTls = Config.AuthenticationOptions != null; + + SockAsyncArgPool = ObjectRental.CreateReusable(ArgsConstructor, Config.CacheQuota); + } + + /// + public void CacheClear() => SockAsyncArgPool.CacheClear(); + /// + public void CacheHardClear() => SockAsyncArgPool.CacheHardClear(); + + private AsyncQueue? WaitingSockets; + private Socket? ServerSock; + //private CancellationToken Token; + + private bool _canceledFlag; + + /// + /// Begins listening for incoming TCP connections on the configured socket + /// + /// A token that is used to abort listening operations and close the socket + /// + /// + /// + /// + public void Start(CancellationToken token) + { + //If the socket is still listening + if (ServerSock != null) + { + throw new InvalidOperationException("The server thread is currently listening and cannot be re-started"); + } + //make sure the token isnt already canceled + if (token.IsCancellationRequested) + { + throw new ArgumentException("Token is already canceled", nameof(token)); + } + + //Configure socket on the current thread so exceptions will be raised to the caller + ServerSock = new(Config.LocalEndPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp); + //Bind socket + ServerSock.Bind(Config.LocalEndPoint); + //Begin listening + ServerSock.Listen(Config.BackLog); + + //See if keepalive should be used + if (Config.TcpKeepalive) + { + //Setup socket keepalive from config + ServerSock.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive, true); + ServerSock.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.TcpKeepAliveInterval, Config.KeepaliveInterval); + ServerSock.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.TcpKeepAliveTime, Config.TcpKeepAliveTime); + } + + //Invoke socket created callback + Config.OnSocketCreated?.Invoke(ServerSock); + + //Init waiting socket queue + WaitingSockets = new(false, true); + + //Clear canceled flag + _canceledFlag = false; + + //Start listening for connections + for (int i = 0; i < Config.AcceptThreads; i++) + { + AcceptConnection(); + } + + //Cleanup callback + static void cleanup(object? state) + { + TcpServer server = (TcpServer)state!; + + //Set canceled flag + server._canceledFlag = true; + + //Clean up socket + server.ServerSock!.Dispose(); + server.ServerSock = null; + + server.SockAsyncArgPool.CacheHardClear(); + + //Dispose any queued sockets + while (server.WaitingSockets!.TryDequeue(out VnSocketAsyncArgs? args)) + { + args.Dispose(); + } + } + + //Register cleanup + _ = token.Register(cleanup, this, false); + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private VnSocketAsyncArgs ArgsConstructor() + { + void ReturnCb(VnSocketAsyncArgs args) + { + //If the server has exited, dispose the args and dont return to pool + if (_canceledFlag) + { + args.Dispose(); + } + else + { + SockAsyncArgPool.Return(args); + } + } + + //Socket args accept callback functions for this + VnSocketAsyncArgs args = new(AcceptCompleted, ReturnCb, PipeOptions); + return args; + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private void AcceptConnection() + { + //Make sure cancellation isnt pending + if (_canceledFlag) + { + return; + } + //Rent new args + VnSocketAsyncArgs acceptArgs = SockAsyncArgPool!.Rent(); + //Accept another socket + if (!acceptArgs.BeginAccept(ServerSock!)) + { + //Completed synchronously + AcceptCompleted(acceptArgs); + } + //Completed async + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private void AcceptCompleted(VnSocketAsyncArgs args) + { + //Examine last op for aborted error, if aborted, then the listening socket has exited + if (args.SocketError == SocketError.OperationAborted) + { + //Dispose args since server is exiting + args.Dispose(); + return; + } + //Check for error on accept, and if no error, enqueue the socket, otherwise disconnect the socket + if (!args.EndAccept() || !WaitingSockets!.TryEnque(args)) + { + //Disconnect the socket (will return the args to the pool) + args.Disconnect(); + } + //Accept a new connection + AcceptConnection(); + } + + + /// + /// Retreives a connected socket from the waiting queue + /// + /// The context of the connect + /// + public async ValueTask AcceptAsync(CancellationToken cancellation) + { + _ = WaitingSockets ?? throw new InvalidOperationException("Server is not listening"); + //Args is ready to use + VnSocketAsyncArgs args = await WaitingSockets.DequeueAsync(cancellation); + //See if tls is enabled, if so, start tls handshake + if (_usingTls) + { + //Begin authenication and make sure the socket stream is closed as its required to cleanup + SslStream stream = new(args.Stream, false); + try + { + //auth the new connection + await stream.AuthenticateAsServerAsync(Config.AuthenticationOptions!, cancellation); + return new(args, stream); + } + catch(Exception ex) + { + await stream.DisposeAsync(); + + //Disconnect socket + args.Disconnect(); + + throw new AuthenticationException("Failed client/server TLS authentication", ex); + } + } + else + { + return new(args, args.Stream); + } + } + } +} \ No newline at end of file diff --git a/lib/Net.Transport.SimpleTCP/src/TransportEventContext.cs b/lib/Net.Transport.SimpleTCP/src/TransportEventContext.cs new file mode 100644 index 0000000..fc04d0c --- /dev/null +++ b/lib/Net.Transport.SimpleTCP/src/TransportEventContext.cs @@ -0,0 +1,123 @@ +/* +* Copyright (c) 2022 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Net.Transport.SimpleTCP +* File: TransportEventContext.cs +* +* TransportEventContext.cs is part of VNLib.Net.Transport.SimpleTCP which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Net.Transport.SimpleTCP is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 2 of the +* License, or (at your option) any later version. +* +* VNLib.Net.Transport.SimpleTCP is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.IO; +using System.Net; +using System.Net.Sockets; +using System.Net.Security; +using System.Security.Authentication; +using System.Runtime.CompilerServices; +using System.Threading.Tasks; + + +namespace VNLib.Net.Transport.Tcp +{ + /// + /// Represents the context of a transport connection. It includes the active socket + /// and a stream representing the active transport. + /// + public readonly struct TransportEventContext + { + /// + /// The socket referrence to the incoming connection + /// + private readonly Socket Socket; + + internal readonly VnSocketAsyncArgs _socketArgs; + + /// + /// The transport security layer security protocol + /// + public readonly SslProtocols SslVersion { get; } = SslProtocols.None; + /// + /// A copy of the local endpoint of the listening socket + /// + public readonly IPEndPoint LocalEndPoint + { + [MethodImpl(MethodImplOptions.AggressiveInlining)] + get => (Socket.LocalEndPoint as IPEndPoint)!; + } + /// + /// The representing the client's connection information + /// + public readonly IPEndPoint RemoteEndpoint + { + [MethodImpl(MethodImplOptions.AggressiveInlining)] + get => (Socket.RemoteEndPoint as IPEndPoint)!; + } + /// + /// The transport stream to be actively read + /// + public readonly Stream ConnectionStream; + + + internal TransportEventContext(VnSocketAsyncArgs args, Stream @stream) + { + _socketArgs = args; + Socket = args.Socket!; + ConnectionStream = stream; + } + internal TransportEventContext(VnSocketAsyncArgs args, SslStream @stream):this(args, (Stream)stream) + { + SslVersion = stream.SslProtocol; + } + + /// + /// Closes a connection and cleans up any resources + /// + /// + /// + public async ValueTask CloseConnectionAsync() + { + //Var to capture ssl shudown exception + Exception? closeExp = null; + + //Verify ssl is being used and the socket is still 'connected' + if (SslVersion > SslProtocols.None && _socketArgs.Socket!.Connected) + { + try + { + await (ConnectionStream as SslStream)!.ShutdownAsync(); + } + catch (Exception ex) + { + closeExp = ex; + } + } + + //dispose the stream and wait for buffered data to be sent + await ConnectionStream.DisposeAsync(); + + //Disconnect + _socketArgs.Disconnect(); + + //if excp occured, re-throw + if (closeExp != null) + { + throw closeExp; + } + } + } +} \ No newline at end of file diff --git a/lib/Net.Transport.SimpleTCP/src/VNLib.Net.Transport.SimpleTCP.csproj b/lib/Net.Transport.SimpleTCP/src/VNLib.Net.Transport.SimpleTCP.csproj new file mode 100644 index 0000000..7a476da --- /dev/null +++ b/lib/Net.Transport.SimpleTCP/src/VNLib.Net.Transport.SimpleTCP.csproj @@ -0,0 +1,45 @@ + + + + net6.0 + VNLib.Net.Transport + 1.0.1.4 + VNLib Simple Transport Library + Provides a library for single process asynchronous, event driven, TCP socket listening and supporting structures to implement +simple high performance TCP servers with or without TLS security. + Vaughn Nugent + Copyright © 2022 Vaughn Nugent + https://www.vaughnnugent.com/resources + VNLib.Net.Transport.SimpleTCP + True + \\vaughnnugent.com\Internal\Folder Redirection\vman\Documents\Programming\Software\StrongNameingKey.snk + + + + + true + + enable + + True + + latest-all + + + + + all + runtime; build; native; contentfiles; analyzers; buildtransitive + + + all + runtime; build; native; contentfiles; analyzers; buildtransitive + + + + + + + + + diff --git a/lib/Net.Transport.SimpleTCP/src/VnSocketAsyncArgs.cs b/lib/Net.Transport.SimpleTCP/src/VnSocketAsyncArgs.cs new file mode 100644 index 0000000..9f37762 --- /dev/null +++ b/lib/Net.Transport.SimpleTCP/src/VnSocketAsyncArgs.cs @@ -0,0 +1,181 @@ +/* +* Copyright (c) 2022 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Net.Transport.SimpleTCP +* File: VnSocketAsyncArgs.cs +* +* VnSocketAsyncArgs.cs is part of VNLib.Net.Transport.SimpleTCP which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Net.Transport.SimpleTCP is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 2 of the +* License, or (at your option) any later version. +* +* VNLib.Net.Transport.SimpleTCP is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.IO; +using System.Net.Sockets; +using System.IO.Pipelines; + +using VNLib.Utils.Memory.Caching; + + +namespace VNLib.Net.Transport.Tcp +{ + internal delegate void SocketCallback(VnSocketAsyncArgs args); + + /// + /// Reusable that manages a pipeline for sending and recieving data. + /// on the connected socket + /// + internal sealed class VnSocketAsyncArgs : SocketAsyncEventArgs, IReusable + { + private readonly SocketCallback SocketAccepted; + private readonly SocketCallback SocketDisconnected; + + public readonly SocketPipeLineWorker SocketWorker; + + public Socket? Socket => AcceptSocket; + + public VnSocketAsyncArgs(SocketCallback accepted, SocketCallback disconnected, PipeOptions options) : base() + { + SocketWorker = new(options); + SocketAccepted = accepted; + //Only reuse socketes if windows + DisconnectReuseSocket = OperatingSystem.IsWindows(); + SocketDisconnected = disconnected; + } + + public Stream Stream => SocketWorker.NetworkStream; + + /// + /// Begins an asynchronous accept operation on the current (bound) socket + /// + /// The server socket to accept the connection + /// True if the IO operation is pending + public bool BeginAccept(Socket sock) + { + //Store the semaphore in the user token event args + SocketError = SocketError.Success; + SocketFlags = SocketFlags.None; + + //Recv during accept is not supported on linux, this flag is set to false on linux + if (DisconnectReuseSocket) + { + //get buffer from the pipe to write initial accept data to + Memory buffer = SocketWorker.GetMemory(sock.ReceiveBufferSize); + SetBuffer(buffer); + } + + //accept async + return sock.AcceptAsync(this); + } + + /// + /// Determines if an asynchronous accept operation has completed successsfully + /// and the socket is connected. + /// + /// True if the accept was successful, and the accepted socket is connected, false otherwise + public bool EndAccept() + { + if(SocketError == SocketError.Success) + { + //remove ref to buffer + SetBuffer(null); + //start the socket worker + SocketWorker.Start(Socket!, BytesTransferred); + return true; + } + return false; + } + + /// + /// Begins an async disconnect operation on a currentl connected socket + /// + /// True if the operation is pending + public void Disconnect() + { + //Clear flags + SocketError = SocketError.Success; + //accept async + if (!Socket!.DisconnectAsync(this)) + { + //Invoke disconnected callback since op completed sync + EndDisconnect(); + //Invoke disconnected callback since op completed sync + SocketDisconnected(this); + } + } + + private void EndDisconnect() + { + //If the disconnection operation failed, do not reuse the socket on next accept + if (SocketError != SocketError.Success) + { + //Dispose the socket before clearing the socket + Socket?.Dispose(); + AcceptSocket = null; + } + } + + protected override void OnCompleted(SocketAsyncEventArgs e) + { + switch (e.LastOperation) + { + case SocketAsyncOperation.Accept: + //Invoke the accepted callback + SocketAccepted(this); + break; + case SocketAsyncOperation.Disconnect: + EndDisconnect(); + //Invoke disconnected callback since op completed sync + SocketDisconnected(this); + break; + default: + throw new InvalidOperationException("Invalid socket operation"); + } + //Clear flags/errors on completion + SocketError = SocketError.Success; + SocketFlags = SocketFlags.None; + } + + void IReusable.Prepare() + { + SocketWorker.Prepare(); + } + + bool IReusable.Release() + { + UserToken = null; + SocketWorker.Release(); + //if the sockeet is connected (or not windows), dispose it and clear the accept socket + if (AcceptSocket?.Connected == true || !DisconnectReuseSocket) + { + AcceptSocket?.Dispose(); + AcceptSocket = null; + } + return true; + } + + public new void Dispose() + { + //Dispose the base class + base.Dispose(); + //Dispose the socket if its set + AcceptSocket?.Dispose(); + AcceptSocket = null; + //Dispose the overlapped stream + SocketWorker.DisposeInternal(); + } + } +} -- cgit