aboutsummaryrefslogtreecommitdiff
path: root/lib/Net.Transport.SimpleTCP/src
diff options
context:
space:
mode:
authorLibravatar vnugent <public@vaughnnugent.com>2023-01-08 16:01:54 -0500
committerLibravatar vnugent <public@vaughnnugent.com>2023-01-08 16:01:54 -0500
commitde94d788e9a47432a7630a8215896b8dd3628599 (patch)
tree666dec06eef861d101cb6948aff52a3d354c8d73 /lib/Net.Transport.SimpleTCP/src
parentbe6dc557a3b819248b014992eb96c1cb21f8112b (diff)
Reorder + analyzer cleanup
Diffstat (limited to 'lib/Net.Transport.SimpleTCP/src')
-rw-r--r--lib/Net.Transport.SimpleTCP/src/ITransportInterface.cs85
-rw-r--r--lib/Net.Transport.SimpleTCP/src/ReusableNetworkStream.cs153
-rw-r--r--lib/Net.Transport.SimpleTCP/src/SocketPipeLineWorker.cs521
-rw-r--r--lib/Net.Transport.SimpleTCP/src/TCPConfig.cs97
-rw-r--r--lib/Net.Transport.SimpleTCP/src/TcpServer.cs289
-rw-r--r--lib/Net.Transport.SimpleTCP/src/TransportEventContext.cs123
-rw-r--r--lib/Net.Transport.SimpleTCP/src/VNLib.Net.Transport.SimpleTCP.csproj45
-rw-r--r--lib/Net.Transport.SimpleTCP/src/VnSocketAsyncArgs.cs181
8 files changed, 1494 insertions, 0 deletions
diff --git a/lib/Net.Transport.SimpleTCP/src/ITransportInterface.cs b/lib/Net.Transport.SimpleTCP/src/ITransportInterface.cs
new file mode 100644
index 0000000..7d21995
--- /dev/null
+++ b/lib/Net.Transport.SimpleTCP/src/ITransportInterface.cs
@@ -0,0 +1,85 @@
+/*
+* Copyright (c) 2022 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Net.Transport.SimpleTCP
+* File: ITransportInterface.cs
+*
+* ITransportInterface.cs is part of VNLib.Net.Transport.SimpleTCP which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* VNLib.Net.Transport.SimpleTCP is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 2 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Net.Transport.SimpleTCP is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace VNLib.Net.Transport.Tcp
+{
+ /// <summary>
+ /// Abstraction layer for TCP transport operations with
+ /// sync and async support.
+ /// </summary>
+ interface ITransportInterface
+ {
+ /// <summary>
+ /// Gets or sets the read timeout in milliseconds
+ /// </summary>
+ int RecvTimeoutMs { get; set; }
+
+ /// <summary>
+ /// Gets or set the time (in milliseconds) the transport should wait for a send operation
+ /// </summary>
+ int SendTimeoutMs { get; set; }
+
+ /// <summary>
+ /// Performs an asynchronous send operation
+ /// </summary>
+ /// <param name="data">The buffer containing the data to send to the client</param>
+ /// <param name="cancellation">A token to cancel the operation</param>
+ /// <returns>A ValueTask that completes when the send operation is complete</returns>
+ ValueTask SendAsync(ReadOnlyMemory<byte> data, CancellationToken cancellation);
+
+ /// <summary>
+ /// Performs an asynchronous send operation
+ /// </summary>
+ /// <param name="buffer">The data buffer to write received data to</param>
+ /// <param name="cancellation">A token to cancel the operation</param>
+ /// <returns>A ValueTask that returns the number of bytes read into the buffer</returns>
+ ValueTask<int> RecvAsync(Memory<byte> buffer, CancellationToken cancellation);
+
+ /// <summary>
+ /// Performs a synchronous send operation
+ /// </summary>
+ /// <param name="data">The buffer to send to the client</param>
+ void Send(ReadOnlySpan<byte> data);
+
+ /// <summary>
+ /// Performs a synchronous receive operation
+ /// </summary>
+ /// <param name="buffer">The buffer to copy output data to</param>
+ /// <returns>The number of bytes received</returns>
+ int Recv(Span<byte> buffer);
+
+ /// <summary>
+ /// Raised when the interface is no longer required and resources
+ /// related to the connection should be released.
+ /// </summary>
+ /// <returns>A task that resolves when the operation is complete</returns>
+ Task CloseAsync();
+
+ }
+} \ No newline at end of file
diff --git a/lib/Net.Transport.SimpleTCP/src/ReusableNetworkStream.cs b/lib/Net.Transport.SimpleTCP/src/ReusableNetworkStream.cs
new file mode 100644
index 0000000..f4a5491
--- /dev/null
+++ b/lib/Net.Transport.SimpleTCP/src/ReusableNetworkStream.cs
@@ -0,0 +1,153 @@
+/*
+* Copyright (c) 2022 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Net.Transport.SimpleTCP
+* File: ReusableNetworkStream.cs
+*
+* ReusableNetworkStream.cs is part of VNLib.Net.Transport.SimpleTCP which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* VNLib.Net.Transport.SimpleTCP is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 2 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Net.Transport.SimpleTCP is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+/*
+ * A special stream that sits betnween the socket/pipeline listener
+ * that marshals data between the application and the socket pipeline.
+ * This stream uses a timer to cancel recv events. Because of this and
+ * pipeline aspects, it supports full duplex IO but it is not thread safe.
+ *
+ * IE one thread can read and write, but not more
+ */
+
+
+using System;
+using System.IO;
+using System.Threading;
+using System.Threading.Tasks;
+
+using VNLib.Utils.Extensions;
+
+namespace VNLib.Net.Transport.Tcp
+{
+
+ /// <summary>
+ /// A reusable stream that marshals data between the socket pipeline and the application
+ /// </summary>
+ internal sealed class ReusableNetworkStream : Stream
+ {
+ #region stream basics
+ public override bool CanRead => true;
+ public override bool CanSeek => false;
+ public override bool CanWrite => true;
+ public override bool CanTimeout => true;
+ public override long Length => throw new NotSupportedException();
+ public override long Position { get => throw new NotSupportedException(); set => throw new NotImplementedException(); }
+ public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException();
+ public override void SetLength(long value) => throw new NotSupportedException();
+ #endregion
+
+ //Read timeout to use when receiving data
+ public override int ReadTimeout
+ {
+ get => Transport.RecvTimeoutMs;
+ //Allow -1 to set to infinite timeout
+ set => Transport.RecvTimeoutMs = value > -2 ? value : throw new ArgumentException("Write timeout must be a 32bit signed integer larger than 0");
+ }
+
+ // Write timeout is not currently used, becasue the writer managed socket timeouts
+ public override int WriteTimeout
+ {
+ get => Transport.SendTimeoutMs;
+ //Allow -1 to set to infinite timeout
+ set => Transport.SendTimeoutMs = value > -2 ? value : throw new ArgumentException("Write timeout must be a 32bit signed integer larger than -1");
+ }
+
+ //Timer used to cancel pipeline recv timeouts
+ private readonly ITransportInterface Transport;
+
+ internal ReusableNetworkStream(ITransportInterface transport)
+ {
+ Transport = transport;
+ }
+
+ ///<inheritdoc/>
+ public override void Close()
+ { }
+ ///<inheritdoc/>
+ public override Task FlushAsync(CancellationToken cancellationToken) => Task.CompletedTask;
+ ///<inheritdoc/>
+ public override void Flush()
+ { }
+
+ ///<inheritdoc/>
+ public override int Read(byte[] buffer, int offset, int count) => Read(buffer.AsSpan(offset, count));
+ ///<inheritdoc/>
+ public override int Read(Span<byte> buffer) => Transport.Recv(buffer);
+
+ ///<inheritdoc/>
+ public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
+ {
+ //Since read returns a value, it isnt any cheaper not to alloc a task around the value-task
+ return ReadAsync(buffer.AsMemory(offset, count), cancellationToken).AsTask();
+ }
+ ///<inheritdoc/>
+ public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
+ {
+ return Transport.RecvAsync(buffer, cancellationToken);
+ }
+
+ ///<inheritdoc/>
+ public override void Write(byte[] buffer, int offset, int count) => Write(buffer.AsSpan(offset, count));
+ ///<inheritdoc/>
+ public override void Write(ReadOnlySpan<byte> buffer) => Transport.Send(buffer);
+
+ public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
+ {
+ //Allow synchronous complete to avoid alloc
+ return WriteAsync(buffer.AsMemory(offset, count), cancellationToken).AsTask();
+ }
+
+ ///<inheritdoc/>
+ ///<exception cref="IOException"></exception>
+ ///<exception cref="ObjectDisposedException"></exception>
+ public override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellation = default)
+ {
+ return Transport.SendAsync(buffer, cancellation);
+ }
+
+ /*
+ * Override dispose to intercept base cleanup until the internal release
+ */
+ /// <summary>
+ /// Not supported
+ /// </summary>
+ public new void Dispose()
+ {
+ //Call sync
+ Task closing = Transport.CloseAsync();
+ closing.Wait();
+ }
+
+ public override ValueTask DisposeAsync()
+ {
+ return new ValueTask(Transport.CloseAsync());
+ }
+
+ public override Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken)
+ {
+ throw new NotSupportedException("CopyToAsync is not supported");
+ }
+ }
+} \ No newline at end of file
diff --git a/lib/Net.Transport.SimpleTCP/src/SocketPipeLineWorker.cs b/lib/Net.Transport.SimpleTCP/src/SocketPipeLineWorker.cs
new file mode 100644
index 0000000..89c46e1
--- /dev/null
+++ b/lib/Net.Transport.SimpleTCP/src/SocketPipeLineWorker.cs
@@ -0,0 +1,521 @@
+/*
+* Copyright (c) 2022 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Net.Transport.SimpleTCP
+* File: SocketPipeLineWorker.cs
+*
+* SocketPipeLineWorker.cs is part of VNLib.Net.Transport.SimpleTCP which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* VNLib.Net.Transport.SimpleTCP is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 2 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Net.Transport.SimpleTCP is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+using System.IO;
+using System.Buffers;
+using System.Threading;
+using System.Net.Sockets;
+using System.IO.Pipelines;
+using System.Threading.Tasks;
+
+using VNLib.Utils.Memory;
+using VNLib.Utils.Memory.Caching;
+using VNLib.Utils.Extensions;
+
+namespace VNLib.Net.Transport.Tcp
+{
+ /// <summary>
+ /// A reuseable socket pipeline provider, that marshals data from a network stream
+ /// to a connected socket.
+ /// </summary>
+ internal sealed class SocketPipeLineWorker : ITransportInterface, IReusable
+ {
+ public void Prepare()
+ {}
+
+ public bool Release()
+ {
+ /*
+ * If the pipeline has been started, then the pipes
+ * will be completed by the worker threads (or by the streams)
+ * and when release is called, there will no longer be
+ * an observer for the result, which means the pipes
+ * may be safely reset for reuse
+ */
+ if (_recvTask != null)
+ {
+ //Since the
+ SendPipe.Reset();
+ RecvPipe.Reset();
+ }
+ /*
+ * If socket had an error and was not started,
+ * it means there may be data written to the
+ * recv pipe from the accept operation, that
+ * needs to be cleared
+ */
+ else
+ {
+ //Complete the recvpipe then reset it to discard buffered data
+ RecvPipe.Reader.Complete();
+ RecvPipe.Writer.Complete();
+ //now reset it
+ RecvPipe.Reset();
+ }
+
+ //Cleanup tasks
+ _recvTask = null;
+ _sendTask = null;
+
+ //Cleanup cts
+ _cts?.Dispose();
+ _cts = null;
+
+ return true;
+ }
+
+ private Task? _recvTask;
+ private Task? _sendTask;
+
+ private CancellationTokenSource? _cts;
+
+ public readonly ReusableNetworkStream NetworkStream;
+
+ private readonly Pipe SendPipe;
+ private readonly Pipe RecvPipe;
+ private readonly Timer RecvTimer;
+ private readonly Timer SendTimer;
+ private readonly Stream RecvStream;
+
+ public int SendTimeoutMs { get; set; }
+
+ public int RecvTimeoutMs { get; set; }
+
+
+ /// <summary>
+ /// Initalizes a new reusable socket pipeline worker
+ /// </summary>
+ /// <param name="pipeOptions"></param>
+ public SocketPipeLineWorker(PipeOptions pipeOptions)
+ {
+ //Init pipes
+ SendPipe = new(pipeOptions);
+ RecvPipe = new(pipeOptions);
+
+ RecvStream = RecvPipe.Reader.AsStream(true);
+
+ //Init timers to infinite
+ RecvTimer = new(OnRecvTimerElapsed, state: this, Timeout.Infinite, Timeout.Infinite);
+ SendTimer = new(OnSendTimerElapsed, state: this, Timeout.Infinite, Timeout.Infinite);
+
+ //Init reusable network stream
+ NetworkStream = new(this);
+
+ SendTimeoutMs = Timeout.Infinite;
+ RecvTimeoutMs = Timeout.Infinite;
+ }
+
+ /// <summary>
+ /// Gets a buffer used during a socket accept operation
+ /// </summary>
+ /// <param name="bufferSize">The size hint of the buffer to get</param>
+ /// <returns>A memory structure of the specified size</returns>
+ public Memory<byte> GetMemory(int bufferSize) => RecvPipe.Writer.GetMemory(bufferSize);
+
+ /// <summary>
+ /// Begins async work to receive and send data on a connected socket
+ /// </summary>
+ /// <param name="client">The socket to read/write from</param>
+ /// <param name="bytesTransferred">The number of bytes to be commited</param>
+ public void Start(Socket client, int bytesTransferred)
+ {
+ //Advance writer
+ RecvPipe.Writer.Advance(bytesTransferred);
+ //begin recv tasks, and pass inital data to be flushed flag
+ _recvTask = RecvDoWorkAsync(client, bytesTransferred > 0);
+ _sendTask = SendDoWorkAsync(client);
+ }
+
+
+ /*
+ * NOTES
+ *
+ * Timers used to maintain resource exhuastion independent
+ * of the actual socket pipeline, so to preserve the state
+ * of the pipelines until the writer is closed.
+ *
+ * This choice was made to allow the api consumer to decide how to
+ * process a timeout without affecting the state of the pipelines
+ * or socket until the close event.
+ */
+
+ private void OnRecvTimerElapsed(object? state)
+ {
+ //cancel pending read on recv pipe when timout expires
+ RecvPipe.Reader.CancelPendingRead();
+ }
+
+ private void OnSendTimerElapsed(object? state)
+ {
+ //Cancel pending flush
+ SendPipe.Writer.CancelPendingFlush();
+ }
+
+ /*
+ * Pipeline worker tasks. Listen for data on the socket,
+ * and listen for data on the pipe to marshal data between
+ * the pipes and the socket
+ */
+
+ private async Task SendDoWorkAsync(Socket sock)
+ {
+ Exception? cause = null;
+ try
+ {
+ //Enter work loop
+ while (true)
+ {
+ //wait for data from the write pipe and write it to the socket
+ ReadResult result = await SendPipe.Reader.ReadAsync();
+ //Catch error/cancel conditions and break the loop
+ if (result.IsCanceled || !sock.Connected || result.Buffer.IsEmpty)
+ {
+ break;
+ }
+ //get sequence
+ ReadOnlySequence<byte> buffer = result.Buffer;
+
+ //Get enumerator to write memory segments
+ ReadOnlySequence<byte>.Enumerator enumerator = buffer.GetEnumerator();
+
+ //Begin enumerator
+ while (enumerator.MoveNext())
+ {
+
+ /*
+ * Using a foward only reader allows the following loop
+ * to track the ammount of data written to the socket
+ * until the entire segment has been sent or if it has
+ * move to the next segment
+ */
+
+ ForwardOnlyMemoryReader<byte> reader = new(enumerator.Current);
+
+ while(reader.WindowSize > 0)
+ {
+ //Write segment to socket, and upate written data
+ int written = await sock.SendAsync(reader.Window, SocketFlags.None);
+
+ if(written >= reader.WindowSize)
+ {
+ //All data was written
+ break;
+ }
+
+ //Advance unread window to end of the written data
+ reader.Advance(written);
+ }
+ //Advance to next window/segment
+ }
+
+ //Advance pipe
+ SendPipe.Reader.AdvanceTo(buffer.End);
+
+ //Pipe has been completed and all data was written
+ if (result.IsCompleted)
+ {
+ break;
+ }
+ }
+ }
+ catch (Exception ex)
+ {
+ cause = ex;
+ }
+ finally
+ {
+ //Complete the send pipe writer
+ await SendPipe.Reader.CompleteAsync(cause);
+
+ //Cancel the recv task
+ _cts!.Cancel();
+ }
+ }
+
+ private async Task RecvDoWorkAsync(Socket sock, bool initialData)
+ {
+ //init new cts
+ _cts = new();
+
+ Exception? cause = null;
+ try
+ {
+ //Avoid syscall?
+ int bufferSize = sock.ReceiveBufferSize;
+
+ //If initial data was buffered, it needs to be published to the reader
+ if (initialData)
+ {
+ //Flush initial data
+ FlushResult res = await RecvPipe.Writer.FlushAsync(CancellationToken.None);
+
+ if (res.IsCompleted || res.IsCanceled)
+ {
+ //Exit
+ return;
+ }
+ }
+
+ //Enter work loop
+ while (true)
+ {
+ //Get buffer from pipe writer
+ Memory<byte> buffer = RecvPipe.Writer.GetMemory(bufferSize);
+
+ //Wait for data or error from socket
+ int count = await sock.ReceiveAsync(buffer, SocketFlags.None, _cts.Token);
+
+ //socket returned emtpy data
+ if (count == 0 || !sock.Connected)
+ {
+ break;
+ }
+
+ //Advance/notify the pipe
+ RecvPipe.Writer.Advance(count);
+
+ //Flush data at top of loop, since data is available from initial accept
+ FlushResult res = await RecvPipe.Writer.FlushAsync(CancellationToken.None);
+
+ //Writing has completed, time to exit
+ if (res.IsCompleted || res.IsCanceled)
+ {
+ break;
+ }
+ }
+ }
+ //Normal exit
+ catch (OperationCanceledException)
+ {}
+ catch (SocketException se)
+ {
+ cause = se;
+ //Cancel sending reader task because the socket has an error and cannot be used
+ SendPipe.Reader.CancelPendingRead();
+ }
+ catch (Exception ex)
+ {
+ cause = ex;
+ }
+ finally
+ {
+ //Stop timer incase exception
+ RecvTimer.Stop();
+
+ //Cleanup and complete the writer
+ await RecvPipe.Writer.CompleteAsync(cause);
+ //The recv reader is completed by the network stream
+ }
+ }
+
+ /// <summary>
+ /// The internal cleanup/dispose method to be called
+ /// when the pipeline is no longer needed
+ /// </summary>
+ public void DisposeInternal()
+ {
+ RecvTimer.Dispose();
+ SendTimer.Dispose();
+
+ //Perform some managed cleanup
+
+ //Cleanup tasks
+ _recvTask = null;
+ _sendTask = null;
+
+ //Cleanup cts
+ _cts?.Dispose();
+ _cts = null;
+ }
+
+
+ private static async Task AwaitFlushTask(ValueTask<FlushResult> valueTask, Timer? sendTimer)
+ {
+ try
+ {
+ FlushResult result = await valueTask.ConfigureAwait(false);
+
+ if (result.IsCanceled)
+ {
+ throw new OperationCanceledException("The operation was canceled by the underlying PipeWriter");
+ }
+ }
+ finally
+ {
+ sendTimer?.Stop();
+ }
+ }
+
+ private ValueTask SendWithTimerInternalAsync(ReadOnlyMemory<byte> data, CancellationToken cancellation)
+ {
+ //Start send timer
+ SendTimer.Restart(SendTimeoutMs);
+ try
+ {
+ //Send the segment
+ ValueTask<FlushResult> result = SendPipe.Writer.WriteAsync(data, cancellation);
+
+ //Task completed successfully, so
+ if (result.IsCompletedSuccessfully)
+ {
+ //Stop timer
+ SendTimer.Stop();
+
+ //Safe to get the rseult
+ FlushResult fr = result.Result;
+ //Check for canceled and throw
+ return fr.IsCanceled
+ ? throw new OperationCanceledException("The write operation was canceled by the underlying PipeWriter")
+ : ValueTask.CompletedTask;
+ }
+ else
+ {
+ //Wrap the task in a ValueTask since it must be awaited, and will happen on background thread
+ return new(AwaitFlushTask(result, SendTimer));
+ }
+ }
+ catch
+ {
+ //Stop timer on exception
+ SendTimer.Stop();
+ throw;
+ }
+ }
+
+ private ValueTask SendWithoutTimerInternalAsync(ReadOnlyMemory<byte> data, CancellationToken cancellation)
+ {
+ //Send the segment
+ ValueTask<FlushResult> result = SendPipe.Writer.WriteAsync(data, cancellation);
+
+ //Task completed successfully, so
+ if (result.IsCompletedSuccessfully)
+ {
+ //Safe to get the rseult
+ FlushResult fr = result.Result;
+ //Check for canceled and throw
+ return fr.IsCanceled
+ ? throw new OperationCanceledException("The write operation was canceled by the underlying PipeWriter")
+ : ValueTask.CompletedTask;
+ }
+ else
+ {
+ //Wrap the task in a ValueTask since it must be awaited, and will happen on background thread
+ return new(AwaitFlushTask(result, null));
+ }
+ }
+
+ ValueTask ITransportInterface.SendAsync(ReadOnlyMemory<byte> data, CancellationToken cancellation)
+ {
+ //Use timer if timeout is set, dont otherwise
+ return SendTimeoutMs < 1 ? SendWithoutTimerInternalAsync(data, cancellation) : SendWithTimerInternalAsync(data, cancellation);
+ }
+
+
+ void ITransportInterface.Send(ReadOnlySpan<byte> data)
+ {
+ //Determine if the send timer should be used
+ Timer? _timer = SendTimeoutMs < 1 ? null : SendTimer;
+
+ //Write data directly to the writer buffer
+ SendPipe.Writer.Write(data);
+
+ //Start send timer
+ _timer?.Restart(SendTimeoutMs);
+
+ try
+ {
+ //Send the segment
+ ValueTask<FlushResult> result = SendPipe.Writer.FlushAsync(CancellationToken.None);
+
+ //Task completed successfully, so
+ if (result.IsCompletedSuccessfully)
+ {
+ //Safe to get the rseult
+ FlushResult fr = result.Result;
+
+ //Check for canceled and throw
+ if (fr.IsCanceled)
+ {
+ throw new OperationCanceledException("The write operation was canceled by the underlying PipeWriter");
+ }
+ }
+ else
+ {
+ //Await the result
+ FlushResult fr = result.ConfigureAwait(false).GetAwaiter().GetResult();
+
+ if (fr.IsCanceled)
+ {
+ throw new OperationCanceledException("The write operation was canceled by the underlying PipeWriter");
+ }
+ }
+ }
+ finally
+ {
+ //Stop timer
+ _timer?.Stop();
+ }
+ }
+
+ async ValueTask<int> ITransportInterface.RecvAsync(Memory<byte> buffer, CancellationToken cancellation)
+ {
+ //Restart timer
+ RecvTimer.Restart(RecvTimeoutMs);
+ try
+ {
+ return await RecvStream.ReadAsync(buffer, cancellation);
+ }
+ finally
+ {
+ RecvTimer.Stop();
+ }
+ }
+
+ int ITransportInterface.Recv(Span<byte> buffer)
+ {
+ //Restart timer
+ RecvTimer.Restart(RecvTimeoutMs);
+ try
+ {
+ return RecvStream.Read(buffer);
+ }
+ finally
+ {
+ RecvTimer.Stop();
+ }
+ }
+
+ Task ITransportInterface.CloseAsync()
+ {
+ //Complete the send pipe writer since stream is closed
+ ValueTask vt = SendPipe.Writer.CompleteAsync();
+ //Complete the recv pipe reader since its no longer used
+ ValueTask rv = RecvPipe.Reader.CompleteAsync();
+ //Join worker tasks, no alloc if completed sync, otherwise alloc anyway
+ return Task.WhenAll(vt.AsTask(), rv.AsTask(), _recvTask!, _sendTask!);
+ }
+
+ }
+}
diff --git a/lib/Net.Transport.SimpleTCP/src/TCPConfig.cs b/lib/Net.Transport.SimpleTCP/src/TCPConfig.cs
new file mode 100644
index 0000000..6955e63
--- /dev/null
+++ b/lib/Net.Transport.SimpleTCP/src/TCPConfig.cs
@@ -0,0 +1,97 @@
+/*
+* Copyright (c) 2022 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Net.Transport.SimpleTCP
+* File: TCPConfig.cs
+*
+* TCPConfig.cs is part of VNLib.Net.Transport.SimpleTCP which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* VNLib.Net.Transport.SimpleTCP is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 2 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Net.Transport.SimpleTCP is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+using System.Net;
+using System.Buffers;
+using System.Net.Sockets;
+using System.Net.Security;
+
+using VNLib.Utils.Logging;
+
+namespace VNLib.Net.Transport.Tcp
+{
+ /// <summary>
+ /// Represents the required configuration variables for the transport
+ /// </summary>
+ public readonly struct TCPConfig
+ {
+ /// <summary>
+ /// The <see cref="IPEndPoint"/> the listening socket will bind to
+ /// </summary>
+ public readonly IPEndPoint LocalEndPoint { get; init; }
+ /// <summary>
+ /// The log provider used to write logging information to
+ /// </summary>
+ public readonly ILogProvider Log { get; init; }
+ /// <summary>
+ /// If TCP keepalive is enabled, the amount of time the connection is considered alive before another probe message is sent
+ /// </summary>
+ public readonly int TcpKeepAliveTime { get; init; }
+ /// <summary>
+ /// If TCP keepalive is enabled, the amount of time the connection will wait for a keepalive message
+ /// </summary>
+ public readonly int KeepaliveInterval { get; init; }
+ /// <summary>
+ /// Enables TCP keepalive
+ /// </summary>
+ public readonly bool TcpKeepalive { get; init; }
+ /// <summary>
+ /// The authentication options to use for processing TLS connections. This value must be set when a certificate has been specified
+ /// </summary>
+ public readonly SslServerAuthenticationOptions? AuthenticationOptions { get; init; }
+ /// <summary>
+ /// The maximum number of waiting WSA asynchronous socket accept operations
+ /// </summary>
+ public readonly uint AcceptThreads { get; init; }
+ /// <summary>
+ /// The maximum size (in bytes) the transport will buffer in
+ /// the receiving pipeline.
+ /// </summary>
+ public readonly int MaxRecvBufferData { get; init; }
+ /// <summary>
+ /// The listener socket backlog count
+ /// </summary>
+ public readonly int BackLog { get; init; }
+ /// <summary>
+ /// The <see cref="MemoryPool{T}"/> to allocate transport buffers from
+ /// </summary>
+ public readonly MemoryPool<byte> BufferPool { get; init; }
+ /// <summary>
+ /// <para>
+ /// The maxium number of event objects that will be cached
+ /// during normal operation
+ /// </para>
+ /// <para>
+ /// WARNING: Setting this value too low will cause significant CPU overhead and GC load
+ /// </para>
+ /// </summary>
+ public readonly int CacheQuota { get; init; }
+ /// <summary>
+ /// An optional callback invoked after the socket has been created
+ /// for optional appliction specific socket configuration
+ /// </summary>
+ public Action<Socket>? OnSocketCreated { get; init; }
+ }
+} \ No newline at end of file
diff --git a/lib/Net.Transport.SimpleTCP/src/TcpServer.cs b/lib/Net.Transport.SimpleTCP/src/TcpServer.cs
new file mode 100644
index 0000000..fc0bcc5
--- /dev/null
+++ b/lib/Net.Transport.SimpleTCP/src/TcpServer.cs
@@ -0,0 +1,289 @@
+/*
+* Copyright (c) 2022 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Net.Transport.SimpleTCP
+* File: TcpServer.cs
+*
+* TcpServer.cs is part of VNLib.Net.Transport.SimpleTCP which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* VNLib.Net.Transport.SimpleTCP is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 2 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Net.Transport.SimpleTCP is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+using System.Security;
+using System.Threading;
+using System.Net.Sockets;
+using System.Net.Security;
+using System.IO.Pipelines;
+using System.Threading.Tasks;
+using System.Security.Authentication;
+using System.Runtime.CompilerServices;
+
+using VNLib.Utils.Async;
+using VNLib.Utils.Logging;
+using VNLib.Utils.Memory.Caching;
+
+namespace VNLib.Net.Transport.Tcp
+{
+ /// <summary>
+ /// <para>
+ /// Provides a simple, high performance, single process, low/no allocation,
+ /// asynchronous, TCP socket server.
+ /// </para>
+ /// <para>
+ /// IO operations are full duplex so pipe-lining reused
+ /// connections is expected. This class cannot be inherited
+ /// </para>
+ /// </summary>
+ public sealed class TcpServer : ICacheHolder
+ {
+ /// <summary>
+ /// The current <see cref="TcpServer"/> configuration
+ /// </summary>
+ public TCPConfig Config { get; }
+
+ private readonly ObjectRental<VnSocketAsyncArgs> SockAsyncArgPool;
+ private readonly PipeOptions PipeOptions;
+ private readonly bool _usingTls;
+
+ /// <summary>
+ /// Initializes a new <see cref="TcpServer"/> with the specified <see cref="TCPConfig"/>
+ /// </summary>
+ /// <param name="config">Configuration to inalize with</param>
+ /// <param name="pipeOptions">Optional <see cref="PipeOptions"/> otherwise uses default</param>
+ /// <exception cref="ArgumentException"></exception>
+ /// <exception cref="ArgumentOutOfRangeException"></exception>
+ public TcpServer(TCPConfig config, PipeOptions? pipeOptions = null)
+ {
+ Config = config;
+ //Check config
+ _ = config.BufferPool ?? throw new ArgumentException("Buffer pool argument cannot be null");
+ _ = config.Log ?? throw new ArgumentException("Log argument is required");
+
+ if (config.MaxRecvBufferData < 4096)
+ {
+ throw new ArgumentException("MaxRecvBufferData size must be at least 4096 bytes to avoid data pipeline pefromance issues");
+ }
+ if(config.AcceptThreads < 1)
+ {
+ throw new ArgumentException("Accept thread count must be greater than 0");
+ }
+ if(config.AcceptThreads > Environment.ProcessorCount)
+ {
+ config.Log.Debug("Suggestion: Setting accept threads to {pc}", Environment.ProcessorCount);
+ }
+ //Cache pipe options
+ PipeOptions = pipeOptions ?? new(
+ config.BufferPool,
+ readerScheduler:PipeScheduler.ThreadPool,
+ writerScheduler:PipeScheduler.ThreadPool,
+ pauseWriterThreshold: config.MaxRecvBufferData,
+ minimumSegmentSize: 8192,
+ useSynchronizationContext:false
+ );
+ //store tls value
+ _usingTls = Config.AuthenticationOptions != null;
+
+ SockAsyncArgPool = ObjectRental.CreateReusable(ArgsConstructor, Config.CacheQuota);
+ }
+
+ ///<inheritdoc/>
+ public void CacheClear() => SockAsyncArgPool.CacheClear();
+ ///<inheritdoc/>
+ public void CacheHardClear() => SockAsyncArgPool.CacheHardClear();
+
+ private AsyncQueue<VnSocketAsyncArgs>? WaitingSockets;
+ private Socket? ServerSock;
+ //private CancellationToken Token;
+
+ private bool _canceledFlag;
+
+ /// <summary>
+ /// Begins listening for incoming TCP connections on the configured socket
+ /// </summary>
+ /// <param name="token">A token that is used to abort listening operations and close the socket</param>
+ /// <exception cref="SocketException"></exception>
+ /// <exception cref="SecurityException"></exception>
+ /// <exception cref="ArgumentException"></exception>
+ /// <exception cref="InvalidOperationException"></exception>
+ public void Start(CancellationToken token)
+ {
+ //If the socket is still listening
+ if (ServerSock != null)
+ {
+ throw new InvalidOperationException("The server thread is currently listening and cannot be re-started");
+ }
+ //make sure the token isnt already canceled
+ if (token.IsCancellationRequested)
+ {
+ throw new ArgumentException("Token is already canceled", nameof(token));
+ }
+
+ //Configure socket on the current thread so exceptions will be raised to the caller
+ ServerSock = new(Config.LocalEndPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
+ //Bind socket
+ ServerSock.Bind(Config.LocalEndPoint);
+ //Begin listening
+ ServerSock.Listen(Config.BackLog);
+
+ //See if keepalive should be used
+ if (Config.TcpKeepalive)
+ {
+ //Setup socket keepalive from config
+ ServerSock.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive, true);
+ ServerSock.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.TcpKeepAliveInterval, Config.KeepaliveInterval);
+ ServerSock.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.TcpKeepAliveTime, Config.TcpKeepAliveTime);
+ }
+
+ //Invoke socket created callback
+ Config.OnSocketCreated?.Invoke(ServerSock);
+
+ //Init waiting socket queue
+ WaitingSockets = new(false, true);
+
+ //Clear canceled flag
+ _canceledFlag = false;
+
+ //Start listening for connections
+ for (int i = 0; i < Config.AcceptThreads; i++)
+ {
+ AcceptConnection();
+ }
+
+ //Cleanup callback
+ static void cleanup(object? state)
+ {
+ TcpServer server = (TcpServer)state!;
+
+ //Set canceled flag
+ server._canceledFlag = true;
+
+ //Clean up socket
+ server.ServerSock!.Dispose();
+ server.ServerSock = null;
+
+ server.SockAsyncArgPool.CacheHardClear();
+
+ //Dispose any queued sockets
+ while (server.WaitingSockets!.TryDequeue(out VnSocketAsyncArgs? args))
+ {
+ args.Dispose();
+ }
+ }
+
+ //Register cleanup
+ _ = token.Register(cleanup, this, false);
+ }
+
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ private VnSocketAsyncArgs ArgsConstructor()
+ {
+ void ReturnCb(VnSocketAsyncArgs args)
+ {
+ //If the server has exited, dispose the args and dont return to pool
+ if (_canceledFlag)
+ {
+ args.Dispose();
+ }
+ else
+ {
+ SockAsyncArgPool.Return(args);
+ }
+ }
+
+ //Socket args accept callback functions for this
+ VnSocketAsyncArgs args = new(AcceptCompleted, ReturnCb, PipeOptions);
+ return args;
+ }
+
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ private void AcceptConnection()
+ {
+ //Make sure cancellation isnt pending
+ if (_canceledFlag)
+ {
+ return;
+ }
+ //Rent new args
+ VnSocketAsyncArgs acceptArgs = SockAsyncArgPool!.Rent();
+ //Accept another socket
+ if (!acceptArgs.BeginAccept(ServerSock!))
+ {
+ //Completed synchronously
+ AcceptCompleted(acceptArgs);
+ }
+ //Completed async
+ }
+
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ private void AcceptCompleted(VnSocketAsyncArgs args)
+ {
+ //Examine last op for aborted error, if aborted, then the listening socket has exited
+ if (args.SocketError == SocketError.OperationAborted)
+ {
+ //Dispose args since server is exiting
+ args.Dispose();
+ return;
+ }
+ //Check for error on accept, and if no error, enqueue the socket, otherwise disconnect the socket
+ if (!args.EndAccept() || !WaitingSockets!.TryEnque(args))
+ {
+ //Disconnect the socket (will return the args to the pool)
+ args.Disconnect();
+ }
+ //Accept a new connection
+ AcceptConnection();
+ }
+
+
+ /// <summary>
+ /// Retreives a connected socket from the waiting queue
+ /// </summary>
+ /// <returns>The context of the connect</returns>
+ /// <exception cref="InvalidOperationException"></exception>
+ public async ValueTask<TransportEventContext> AcceptAsync(CancellationToken cancellation)
+ {
+ _ = WaitingSockets ?? throw new InvalidOperationException("Server is not listening");
+ //Args is ready to use
+ VnSocketAsyncArgs args = await WaitingSockets.DequeueAsync(cancellation);
+ //See if tls is enabled, if so, start tls handshake
+ if (_usingTls)
+ {
+ //Begin authenication and make sure the socket stream is closed as its required to cleanup
+ SslStream stream = new(args.Stream, false);
+ try
+ {
+ //auth the new connection
+ await stream.AuthenticateAsServerAsync(Config.AuthenticationOptions!, cancellation);
+ return new(args, stream);
+ }
+ catch(Exception ex)
+ {
+ await stream.DisposeAsync();
+
+ //Disconnect socket
+ args.Disconnect();
+
+ throw new AuthenticationException("Failed client/server TLS authentication", ex);
+ }
+ }
+ else
+ {
+ return new(args, args.Stream);
+ }
+ }
+ }
+} \ No newline at end of file
diff --git a/lib/Net.Transport.SimpleTCP/src/TransportEventContext.cs b/lib/Net.Transport.SimpleTCP/src/TransportEventContext.cs
new file mode 100644
index 0000000..fc04d0c
--- /dev/null
+++ b/lib/Net.Transport.SimpleTCP/src/TransportEventContext.cs
@@ -0,0 +1,123 @@
+/*
+* Copyright (c) 2022 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Net.Transport.SimpleTCP
+* File: TransportEventContext.cs
+*
+* TransportEventContext.cs is part of VNLib.Net.Transport.SimpleTCP which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* VNLib.Net.Transport.SimpleTCP is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 2 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Net.Transport.SimpleTCP is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+using System.IO;
+using System.Net;
+using System.Net.Sockets;
+using System.Net.Security;
+using System.Security.Authentication;
+using System.Runtime.CompilerServices;
+using System.Threading.Tasks;
+
+
+namespace VNLib.Net.Transport.Tcp
+{
+ /// <summary>
+ /// Represents the context of a transport connection. It includes the active socket
+ /// and a stream representing the active transport.
+ /// </summary>
+ public readonly struct TransportEventContext
+ {
+ /// <summary>
+ /// The socket referrence to the incoming connection
+ /// </summary>
+ private readonly Socket Socket;
+
+ internal readonly VnSocketAsyncArgs _socketArgs;
+
+ /// <summary>
+ /// The transport security layer security protocol
+ /// </summary>
+ public readonly SslProtocols SslVersion { get; } = SslProtocols.None;
+ /// <summary>
+ /// A copy of the local endpoint of the listening socket
+ /// </summary>
+ public readonly IPEndPoint LocalEndPoint
+ {
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ get => (Socket.LocalEndPoint as IPEndPoint)!;
+ }
+ /// <summary>
+ /// The <see cref="IPEndPoint"/> representing the client's connection information
+ /// </summary>
+ public readonly IPEndPoint RemoteEndpoint
+ {
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ get => (Socket.RemoteEndPoint as IPEndPoint)!;
+ }
+ /// <summary>
+ /// The transport stream to be actively read
+ /// </summary>
+ public readonly Stream ConnectionStream;
+
+
+ internal TransportEventContext(VnSocketAsyncArgs args, Stream @stream)
+ {
+ _socketArgs = args;
+ Socket = args.Socket!;
+ ConnectionStream = stream;
+ }
+ internal TransportEventContext(VnSocketAsyncArgs args, SslStream @stream):this(args, (Stream)stream)
+ {
+ SslVersion = stream.SslProtocol;
+ }
+
+ /// <summary>
+ /// Closes a connection and cleans up any resources
+ /// </summary>
+ /// <param name="ctx"></param>
+ /// <returns></returns>
+ public async ValueTask CloseConnectionAsync()
+ {
+ //Var to capture ssl shudown exception
+ Exception? closeExp = null;
+
+ //Verify ssl is being used and the socket is still 'connected'
+ if (SslVersion > SslProtocols.None && _socketArgs.Socket!.Connected)
+ {
+ try
+ {
+ await (ConnectionStream as SslStream)!.ShutdownAsync();
+ }
+ catch (Exception ex)
+ {
+ closeExp = ex;
+ }
+ }
+
+ //dispose the stream and wait for buffered data to be sent
+ await ConnectionStream.DisposeAsync();
+
+ //Disconnect
+ _socketArgs.Disconnect();
+
+ //if excp occured, re-throw
+ if (closeExp != null)
+ {
+ throw closeExp;
+ }
+ }
+ }
+} \ No newline at end of file
diff --git a/lib/Net.Transport.SimpleTCP/src/VNLib.Net.Transport.SimpleTCP.csproj b/lib/Net.Transport.SimpleTCP/src/VNLib.Net.Transport.SimpleTCP.csproj
new file mode 100644
index 0000000..7a476da
--- /dev/null
+++ b/lib/Net.Transport.SimpleTCP/src/VNLib.Net.Transport.SimpleTCP.csproj
@@ -0,0 +1,45 @@
+<Project Sdk="Microsoft.NET.Sdk">
+
+ <PropertyGroup>
+ <TargetFramework>net6.0</TargetFramework>
+ <RootNamespace>VNLib.Net.Transport</RootNamespace>
+ <Version>1.0.1.4</Version>
+ <Product>VNLib Simple Transport Library</Product>
+ <Description>Provides a library for single process asynchronous, event driven, TCP socket listening and supporting structures to implement
+simple high performance TCP servers with or without TLS security.</Description>
+ <Authors>Vaughn Nugent</Authors>
+ <Copyright>Copyright © 2022 Vaughn Nugent</Copyright>
+ <PackageProjectUrl>https://www.vaughnnugent.com/resources</PackageProjectUrl>
+ <AssemblyName>VNLib.Net.Transport.SimpleTCP</AssemblyName>
+ <SignAssembly>True</SignAssembly>
+ <AssemblyOriginatorKeyFile>\\vaughnnugent.com\Internal\Folder Redirection\vman\Documents\Programming\Software\StrongNameingKey.snk</AssemblyOriginatorKeyFile>
+ </PropertyGroup>
+
+ <!-- Resolve nuget dll files and store them in the output dir -->
+ <PropertyGroup>
+ <CopyLocalLockFileAssemblies>true</CopyLocalLockFileAssemblies>
+
+ <Nullable>enable</Nullable>
+
+ <GenerateDocumentationFile>True</GenerateDocumentationFile>
+
+ <AnalysisLevel>latest-all</AnalysisLevel>
+ </PropertyGroup>
+
+ <ItemGroup>
+ <PackageReference Include="ErrorProne.NET.CoreAnalyzers" Version="0.1.2">
+ <PrivateAssets>all</PrivateAssets>
+ <IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
+ </PackageReference>
+ <PackageReference Include="ErrorProne.NET.Structs" Version="0.1.2">
+ <PrivateAssets>all</PrivateAssets>
+ <IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
+ </PackageReference>
+ <PackageReference Include="System.IO.Pipelines" Version="6.0.3" />
+ </ItemGroup>
+
+ <ItemGroup>
+ <ProjectReference Include="..\..\Utils\src\VNLib.Utils.csproj" />
+ </ItemGroup>
+
+</Project>
diff --git a/lib/Net.Transport.SimpleTCP/src/VnSocketAsyncArgs.cs b/lib/Net.Transport.SimpleTCP/src/VnSocketAsyncArgs.cs
new file mode 100644
index 0000000..9f37762
--- /dev/null
+++ b/lib/Net.Transport.SimpleTCP/src/VnSocketAsyncArgs.cs
@@ -0,0 +1,181 @@
+/*
+* Copyright (c) 2022 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Net.Transport.SimpleTCP
+* File: VnSocketAsyncArgs.cs
+*
+* VnSocketAsyncArgs.cs is part of VNLib.Net.Transport.SimpleTCP which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* VNLib.Net.Transport.SimpleTCP is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 2 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Net.Transport.SimpleTCP is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+using System.IO;
+using System.Net.Sockets;
+using System.IO.Pipelines;
+
+using VNLib.Utils.Memory.Caching;
+
+
+namespace VNLib.Net.Transport.Tcp
+{
+ internal delegate void SocketCallback(VnSocketAsyncArgs args);
+
+ /// <summary>
+ /// Reusable <see cref="SocketAsyncEventArgs"/> that manages a pipeline for sending and recieving data.
+ /// on the connected socket
+ /// </summary>
+ internal sealed class VnSocketAsyncArgs : SocketAsyncEventArgs, IReusable
+ {
+ private readonly SocketCallback SocketAccepted;
+ private readonly SocketCallback SocketDisconnected;
+
+ public readonly SocketPipeLineWorker SocketWorker;
+
+ public Socket? Socket => AcceptSocket;
+
+ public VnSocketAsyncArgs(SocketCallback accepted, SocketCallback disconnected, PipeOptions options) : base()
+ {
+ SocketWorker = new(options);
+ SocketAccepted = accepted;
+ //Only reuse socketes if windows
+ DisconnectReuseSocket = OperatingSystem.IsWindows();
+ SocketDisconnected = disconnected;
+ }
+
+ public Stream Stream => SocketWorker.NetworkStream;
+
+ /// <summary>
+ /// Begins an asynchronous accept operation on the current (bound) socket
+ /// </summary>
+ /// <param name="sock">The server socket to accept the connection</param>
+ /// <returns>True if the IO operation is pending</returns>
+ public bool BeginAccept(Socket sock)
+ {
+ //Store the semaphore in the user token event args
+ SocketError = SocketError.Success;
+ SocketFlags = SocketFlags.None;
+
+ //Recv during accept is not supported on linux, this flag is set to false on linux
+ if (DisconnectReuseSocket)
+ {
+ //get buffer from the pipe to write initial accept data to
+ Memory<byte> buffer = SocketWorker.GetMemory(sock.ReceiveBufferSize);
+ SetBuffer(buffer);
+ }
+
+ //accept async
+ return sock.AcceptAsync(this);
+ }
+
+ /// <summary>
+ /// Determines if an asynchronous accept operation has completed successsfully
+ /// and the socket is connected.
+ /// </summary>
+ /// <returns>True if the accept was successful, and the accepted socket is connected, false otherwise</returns>
+ public bool EndAccept()
+ {
+ if(SocketError == SocketError.Success)
+ {
+ //remove ref to buffer
+ SetBuffer(null);
+ //start the socket worker
+ SocketWorker.Start(Socket!, BytesTransferred);
+ return true;
+ }
+ return false;
+ }
+
+ /// <summary>
+ /// Begins an async disconnect operation on a currentl connected socket
+ /// </summary>
+ /// <returns>True if the operation is pending</returns>
+ public void Disconnect()
+ {
+ //Clear flags
+ SocketError = SocketError.Success;
+ //accept async
+ if (!Socket!.DisconnectAsync(this))
+ {
+ //Invoke disconnected callback since op completed sync
+ EndDisconnect();
+ //Invoke disconnected callback since op completed sync
+ SocketDisconnected(this);
+ }
+ }
+
+ private void EndDisconnect()
+ {
+ //If the disconnection operation failed, do not reuse the socket on next accept
+ if (SocketError != SocketError.Success)
+ {
+ //Dispose the socket before clearing the socket
+ Socket?.Dispose();
+ AcceptSocket = null;
+ }
+ }
+
+ protected override void OnCompleted(SocketAsyncEventArgs e)
+ {
+ switch (e.LastOperation)
+ {
+ case SocketAsyncOperation.Accept:
+ //Invoke the accepted callback
+ SocketAccepted(this);
+ break;
+ case SocketAsyncOperation.Disconnect:
+ EndDisconnect();
+ //Invoke disconnected callback since op completed sync
+ SocketDisconnected(this);
+ break;
+ default:
+ throw new InvalidOperationException("Invalid socket operation");
+ }
+ //Clear flags/errors on completion
+ SocketError = SocketError.Success;
+ SocketFlags = SocketFlags.None;
+ }
+
+ void IReusable.Prepare()
+ {
+ SocketWorker.Prepare();
+ }
+
+ bool IReusable.Release()
+ {
+ UserToken = null;
+ SocketWorker.Release();
+ //if the sockeet is connected (or not windows), dispose it and clear the accept socket
+ if (AcceptSocket?.Connected == true || !DisconnectReuseSocket)
+ {
+ AcceptSocket?.Dispose();
+ AcceptSocket = null;
+ }
+ return true;
+ }
+
+ public new void Dispose()
+ {
+ //Dispose the base class
+ base.Dispose();
+ //Dispose the socket if its set
+ AcceptSocket?.Dispose();
+ AcceptSocket = null;
+ //Dispose the overlapped stream
+ SocketWorker.DisposeInternal();
+ }
+ }
+}