aboutsummaryrefslogtreecommitdiff
path: root/lib/Net.Transport.SimpleTCP/src
diff options
context:
space:
mode:
Diffstat (limited to 'lib/Net.Transport.SimpleTCP/src')
-rw-r--r--lib/Net.Transport.SimpleTCP/src/ITcpConnectionDescriptor.cs54
-rw-r--r--lib/Net.Transport.SimpleTCP/src/SocketPipeLineWorker.cs138
-rw-r--r--lib/Net.Transport.SimpleTCP/src/TcpServer.cs67
-rw-r--r--lib/Net.Transport.SimpleTCP/src/TcpServerExtensions.cs8
-rw-r--r--lib/Net.Transport.SimpleTCP/src/TransportEventContext.cs73
-rw-r--r--lib/Net.Transport.SimpleTCP/src/VnSocketAsyncArgs.cs16
6 files changed, 235 insertions, 121 deletions
diff --git a/lib/Net.Transport.SimpleTCP/src/ITcpConnectionDescriptor.cs b/lib/Net.Transport.SimpleTCP/src/ITcpConnectionDescriptor.cs
new file mode 100644
index 0000000..fe3013c
--- /dev/null
+++ b/lib/Net.Transport.SimpleTCP/src/ITcpConnectionDescriptor.cs
@@ -0,0 +1,54 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Net.Transport.SimpleTCP
+* File: ITcpConnectionDescriptor.cs
+*
+* ITcpConnectionDescriptor.cs is part of VNLib.Net.Transport.SimpleTCP which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* VNLib.Net.Transport.SimpleTCP is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 2 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Net.Transport.SimpleTCP is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System.IO;
+using System.Net.Sockets;
+
+
+namespace VNLib.Net.Transport.Tcp
+{
+ /// <summary>
+ /// An opaque TCP connection descriptor
+ /// </summary>
+ public interface ITcpConnectionDescriptor
+ {
+ internal Socket Socket { get; }
+
+ /// <summary>
+ /// Gets a stream wrapper around the connection.
+ /// </summary>
+ /// <remarks>
+ /// You must dispose of this stream when you are done with it.
+ /// </remarks>
+ Stream GetStream();
+
+ /// <summary>
+ /// Closes a connection and cleans up any resources
+ /// </summary>
+ /// <remarks>
+ /// You must destory any references to this connection descriptor after calling this method.
+ /// </remarks>
+ void CloseConnection();
+ }
+} \ No newline at end of file
diff --git a/lib/Net.Transport.SimpleTCP/src/SocketPipeLineWorker.cs b/lib/Net.Transport.SimpleTCP/src/SocketPipeLineWorker.cs
index dda859c..db87357 100644
--- a/lib/Net.Transport.SimpleTCP/src/SocketPipeLineWorker.cs
+++ b/lib/Net.Transport.SimpleTCP/src/SocketPipeLineWorker.cs
@@ -29,7 +29,6 @@ using System.Threading;
using System.Net.Sockets;
using System.IO.Pipelines;
using System.Threading.Tasks;
-using System.Runtime.CompilerServices;
using VNLib.Utils.Memory;
using VNLib.Utils.Memory.Caching;
@@ -43,62 +42,24 @@ namespace VNLib.Net.Transport.Tcp
/// </summary>
internal sealed class SocketPipeLineWorker : ITransportInterface, IReusable
{
- public void Prepare()
- {}
-
- public bool Release()
- {
- /*
- * If the pipeline has been started, then the pipes
- * will be completed by the worker threads (or by the streams)
- * and when release is called, there will no longer be
- * an observer for the result, which means the pipes
- * may be safely reset for reuse
- */
- if (_recvTask != null)
- {
- SendPipe.Reset();
- RecvPipe.Reset();
- }
- /*
- * If socket had an error and was not started,
- * it means there may be data written to the
- * recv pipe from the accept operation, that
- * needs to be cleared
- */
- else
- {
- //Complete the recvpipe then reset it to discard buffered data
- RecvPipe.Reader.Complete();
- RecvPipe.Writer.Complete();
- //now reset it
- RecvPipe.Reset();
- }
-
- //Cleanup tasks
- _recvTask = null;
- _sendTask = null;
-
- //Cleanup cts
- _cts?.Dispose();
- _cts = null;
-
- return true;
- }
+ /*
+ * [0] = Recv
+ * [1] = Send
+ * [2] = Send Complete
+ * [3] = Read Complete
+ */
- private Task? _recvTask;
- private Task? _sendTask;
-
- private CancellationTokenSource? _cts;
+ private readonly Task[] _tasks;
public readonly ReusableNetworkStream NetworkStream;
-
private readonly Pipe SendPipe;
private readonly Pipe RecvPipe;
private readonly Timer RecvTimer;
private readonly Timer SendTimer;
private readonly Stream RecvStream;
+ private CancellationTokenSource? _cts;
+
///<inheritdoc/>
public int SendTimeoutMs { get; set; }
@@ -127,6 +88,54 @@ namespace VNLib.Net.Transport.Tcp
SendTimeoutMs = Timeout.Infinite;
RecvTimeoutMs = Timeout.Infinite;
+
+ /*
+ * Store the operation tasks in an array, so they can be
+ * joined when the stream is closed
+ */
+ _tasks = new Task[4];
+ }
+
+ public void Prepare()
+ { }
+
+ public bool Release()
+ {
+ /*
+ * If the pipeline has been started, then the pipes
+ * will be completed by the worker threads (or by the streams)
+ * and when release is called, there will no longer be
+ * an observer for the result, which means the pipes
+ * may be safely reset for reuse
+ */
+ if (_tasks[0] != null)
+ {
+ SendPipe.Reset();
+ RecvPipe.Reset();
+ }
+ /*
+ * If socket had an error and was not started,
+ * it means there may be data written to the
+ * recv pipe from the accept operation, that
+ * needs to be cleared
+ */
+ else
+ {
+ //Complete the recvpipe then reset it to discard buffered data
+ RecvPipe.Reader.Complete();
+ RecvPipe.Writer.Complete();
+ //now reset it
+ RecvPipe.Reset();
+ }
+
+ //Cleanup tasks
+ Array.Clear(_tasks);
+
+ //Cleanup cts
+ _cts?.Dispose();
+ _cts = null;
+
+ return true;
}
/// <summary>
@@ -146,8 +155,8 @@ namespace VNLib.Net.Transport.Tcp
//Advance writer
RecvPipe.Writer.Advance(bytesTransferred);
//begin recv tasks, and pass inital data to be flushed flag
- _recvTask = RecvDoWorkAsync(client, bytesTransferred > 0);
- _sendTask = SendDoWorkAsync(client);
+ _tasks[0] = RecvDoWorkAsync(client, bytesTransferred > 0);
+ _tasks[1] = SendDoWorkAsync(client);
}
@@ -185,7 +194,6 @@ namespace VNLib.Net.Transport.Tcp
private async Task SendDoWorkAsync(Socket sock)
{
- Exception? cause = null;
try
{
//Enter work loop
@@ -247,23 +255,24 @@ namespace VNLib.Net.Transport.Tcp
break;
}
}
+
+ //All done, complete the send pipe reader
+ await SendPipe.Reader.CompleteAsync();
}
catch (Exception ex)
{
- cause = ex;
+ //Complete the send pipe reader
+ await SendPipe.Reader.CompleteAsync(ex);
}
finally
{
_sendReadRes = default;
-
- //Complete the send pipe writer
- await SendPipe.Reader.CompleteAsync(cause);
-
//Cancel the recv task
_cts!.Cancel();
}
}
+
private FlushResult _recvFlushRes;
private async Task RecvDoWorkAsync(Socket sock, bool initialData)
@@ -344,6 +353,7 @@ namespace VNLib.Net.Transport.Tcp
}
}
+
/// <summary>
/// The internal cleanup/dispose method to be called
/// when the pipeline is no longer needed
@@ -354,10 +364,9 @@ namespace VNLib.Net.Transport.Tcp
SendTimer.Dispose();
//Perform some managed cleanup
-
+
//Cleanup tasks
- _recvTask = null;
- _sendTask = null;
+ Array.Clear(_tasks);
//Cleanup cts
_cts?.Dispose();
@@ -544,17 +553,18 @@ namespace VNLib.Net.Transport.Tcp
Task ITransportInterface.CloseAsync()
{
//Complete the send pipe writer since stream is closed
- ValueTask vt = SendPipe.Writer.CompleteAsync();
+ _tasks[2] = SendPipe.Writer.CompleteAsync().AsTask();
+
//Complete the recv pipe reader since its no longer used
- ValueTask rv = RecvPipe.Reader.CompleteAsync();
+ _tasks[3] = RecvPipe.Reader.CompleteAsync().AsTask();
+
//Join worker tasks, no alloc if completed sync, otherwise alloc anyway
- return Task.WhenAll(vt.AsTask(), rv.AsTask(), _recvTask!, _sendTask!);
+ return Task.WhenAll(_tasks);
}
private static class ThrowHelpers
- {
- [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ {
public static void ThrowWriterCanceled()
{
throw new OperationCanceledException("The write operation was canceled by the underlying PipeWriter");
diff --git a/lib/Net.Transport.SimpleTCP/src/TcpServer.cs b/lib/Net.Transport.SimpleTCP/src/TcpServer.cs
index 6deeb56..b85aa33 100644
--- a/lib/Net.Transport.SimpleTCP/src/TcpServer.cs
+++ b/lib/Net.Transport.SimpleTCP/src/TcpServer.cs
@@ -65,9 +65,13 @@ namespace VNLib.Net.Transport.Tcp
/// <exception cref="ArgumentOutOfRangeException"></exception>
public TcpServer(TCPConfig config, PipeOptions? pipeOptions = null)
{
- Config = config;
//Check config
- _ = config.BufferPool ?? throw new ArgumentException("Buffer pool argument cannot be null");
+ if(pipeOptions == null)
+ {
+ //Pool is required when using default pipe options
+ _ = config.BufferPool ?? throw new ArgumentException("Buffer pool argument cannot be null");
+ }
+
_ = config.Log ?? throw new ArgumentException("Log argument is required");
if (config.MaxRecvBufferData < 4096)
@@ -82,10 +86,13 @@ namespace VNLib.Net.Transport.Tcp
{
config.Log.Debug("Suggestion: Setting accept threads to {pc}", Environment.ProcessorCount);
}
+
+ Config = config;
+
//Cache pipe options
PipeOptions = pipeOptions ?? new(
config.BufferPool,
- readerScheduler:PipeScheduler.ThreadPool,
+ readerScheduler:PipeScheduler.ThreadPool,
writerScheduler:PipeScheduler.ThreadPool,
pauseWriterThreshold: config.MaxRecvBufferData,
minimumSegmentSize: 8192,
@@ -240,12 +247,23 @@ namespace VNLib.Net.Transport.Tcp
args.Dispose();
return;
}
- //Check for error on accept, and if no error, enqueue the socket, otherwise disconnect the socket
- if (!args.EndAccept() || !WaitingSockets!.TryEnque(args))
+
+ //Check for error and log it
+ if (!args.EndAccept())
{
- //Disconnect the socket (will return the args to the pool)
args.Disconnect();
+ Config.Log.Debug("Socket accept failed with error code {ec}", args.SocketError);
+ return;
}
+
+ //Try to enqueue the args to the waiting queue, if the queue is full, disconnect the socket
+ if (!WaitingSockets!.TryEnque(args))
+ {
+ args.Disconnect();
+ Config.Log.Warn("Socket {e} disconnected because the waiting queue is overflowing", args.GetHashCode());
+ return;
+ }
+
//Accept a new connection
AcceptConnection();
}
@@ -265,29 +283,44 @@ namespace VNLib.Net.Transport.Tcp
return ValueTask.FromResult<TransportEventContext>(new(args, args.Stream));
}
- return AcceptAsyncCore(cancellation);
- }
+ return AcceptAsyncCore(this, cancellation);
- private async ValueTask<TransportEventContext> AcceptAsyncCore(CancellationToken cancellation)
- {
- //Await async
- VnSocketAsyncArgs args = await WaitingSockets.DequeueAsync(cancellation);
+ static async ValueTask<TransportEventContext> AcceptAsyncCore(TcpServer server, CancellationToken cancellation)
+ {
+ //Await async
+ VnSocketAsyncArgs args = await server.WaitingSockets!.DequeueAsync(cancellation);
- return new(args, args.Stream);
+ return new(args, args.Stream);
+ }
}
- internal ValueTask<VnSocketAsyncArgs> AcceptArgsAsync(CancellationToken cancellation)
+ /// <summary>
+ /// Accepts a connection and returns the connection descriptor.
+ /// </summary>
+ /// <param name="cancellation">A token to cancel the operation</param>
+ /// <returns>The connection descriptor</returns>
+ /// <remarks>
+ /// NOTE: You must always call the <see cref="ITcpConnectionDescriptor.CloseConnection"/> and
+ /// destroy all references to it when you are done. You must also dispose the stream returned
+ /// from the <see cref="ITcpConnectionDescriptor.GetStream"/> method.
+ /// </remarks>
+ /// <exception cref="InvalidOperationException"></exception>
+ public ValueTask<ITcpConnectionDescriptor> AcceptConnectionAsync(CancellationToken cancellation)
{
_ = WaitingSockets ?? throw new InvalidOperationException("Server is not listening");
//Try get args from queue
if (WaitingSockets.TryDequeue(out VnSocketAsyncArgs? args))
{
- return ValueTask.FromResult(args);
+ return ValueTask.FromResult<ITcpConnectionDescriptor>(args);
}
- else
+
+ return AcceptConnectionAsyncCore(this, cancellation);
+
+ static async ValueTask<ITcpConnectionDescriptor> AcceptConnectionAsyncCore(TcpServer server, CancellationToken cancellation)
{
- return WaitingSockets.DequeueAsync(cancellation);
+ //Await async
+ return await server.WaitingSockets!.DequeueAsync(cancellation);
}
}
}
diff --git a/lib/Net.Transport.SimpleTCP/src/TcpServerExtensions.cs b/lib/Net.Transport.SimpleTCP/src/TcpServerExtensions.cs
index a6a4327..3b86b5b 100644
--- a/lib/Net.Transport.SimpleTCP/src/TcpServerExtensions.cs
+++ b/lib/Net.Transport.SimpleTCP/src/TcpServerExtensions.cs
@@ -48,10 +48,10 @@ namespace VNLib.Net.Transport.Tcp
public static async Task<TransportEventContext> AcceptSslAsync(this TcpServer server, SslServerAuthenticationOptions options, CancellationToken cancellation = default)
{
//accept internal args
- VnSocketAsyncArgs args = await server.AcceptArgsAsync(cancellation);
+ ITcpConnectionDescriptor args = await server.AcceptConnectionAsync(cancellation);
//Begin authenication and make sure the socket stream is closed as its required to cleanup
- SslStream stream = new(args.Stream, false);
+ SslStream stream = new(args.GetStream(), false);
try
{
//auth the new connection
@@ -64,14 +64,14 @@ namespace VNLib.Net.Transport.Tcp
await stream.DisposeAsync();
//Disconnect socket
- args.Disconnect();
+ args.CloseConnection();
throw new AuthenticationException("Failed client/server TLS authentication", ex);
}
}
/// <summary>
- /// Safley closes an ssl connection
+ /// Safely closes an ssl connection
/// </summary>
/// <param name="ctx">The context to close the connection on</param>
/// <returns>A value task that completes when the connection is closed</returns>
diff --git a/lib/Net.Transport.SimpleTCP/src/TransportEventContext.cs b/lib/Net.Transport.SimpleTCP/src/TransportEventContext.cs
index 994b2ba..ec686ca 100644
--- a/lib/Net.Transport.SimpleTCP/src/TransportEventContext.cs
+++ b/lib/Net.Transport.SimpleTCP/src/TransportEventContext.cs
@@ -25,70 +25,75 @@
using System;
using System.IO;
using System.Net;
-using System.Net.Sockets;
-using System.Net.Security;
-using System.Security.Authentication;
-using System.Runtime.CompilerServices;
using System.Threading.Tasks;
-
namespace VNLib.Net.Transport.Tcp
{
+
/// <summary>
/// Represents the context of a transport connection. It includes the active socket
/// and a stream representing the active transport.
/// </summary>
public readonly record struct TransportEventContext
{
- /// <summary>
- /// The socket referrence to the incoming connection
- /// </summary>
- private readonly Socket Socket;
-
- private readonly VnSocketAsyncArgs _socketArgs;
+ private readonly ITcpConnectionDescriptor _descriptor;
/// <summary>
/// A copy of the local endpoint of the listening socket
/// </summary>
- public readonly IPEndPoint LocalEndPoint
- {
- [MethodImpl(MethodImplOptions.AggressiveInlining)]
- get => (Socket.LocalEndPoint as IPEndPoint)!;
- }
+ public readonly IPEndPoint LocalEndPoint;
/// <summary>
/// The <see cref="IPEndPoint"/> representing the client's connection information
/// </summary>
- public readonly IPEndPoint RemoteEndpoint
- {
- [MethodImpl(MethodImplOptions.AggressiveInlining)]
- get => (Socket.RemoteEndPoint as IPEndPoint)!;
- }
+ public readonly IPEndPoint RemoteEndpoint;
/// <summary>
- /// The transport stream to be actively read
+ /// The transport stream that wraps the connection
/// </summary>
public readonly Stream ConnectionStream;
- internal TransportEventContext(VnSocketAsyncArgs args, Stream @stream)
+ /// <summary>
+ /// Creates a new <see cref="TransportEventContext"/> wrapper for the given connection descriptor
+ /// and captures the default stream from the descriptor.
+ /// </summary>
+ /// <param name="descriptor">The connection to wrap</param>
+ public TransportEventContext(ITcpConnectionDescriptor descriptor):this(descriptor, descriptor.GetStream())
+ { }
+
+ /// <summary>
+ /// Creates a new <see cref="TransportEventContext"/> wrapper for the given connection descriptor
+ /// and your custom stream implementation.
+ /// </summary>
+ /// <param name="descriptor">The connection descriptor to wrap</param>
+ /// <param name="customStream">Your custom stream wrapper around the transport stream</param>
+ public TransportEventContext(ITcpConnectionDescriptor descriptor, Stream customStream)
{
- _socketArgs = args;
- Socket = args.AcceptSocket!;
- ConnectionStream = stream;
+ _descriptor = descriptor;
+ ConnectionStream = customStream;
+
+ //Call once and store locally
+ LocalEndPoint = (descriptor.Socket.LocalEndPoint as IPEndPoint)!;
+ RemoteEndpoint = (descriptor.Socket.RemoteEndPoint as IPEndPoint)!;
}
/// <summary>
- /// Closes a connection and cleans up any resources
+ /// Cleans up the stream and closes the connection descriptor
/// </summary>
- /// <returns></returns>
- public async ValueTask CloseConnectionAsync()
+ /// <returns>A value-task that completes when the resources have been cleaned up</returns>
+ public readonly async ValueTask CloseConnectionAsync()
{
- //dispose the stream and wait for buffered data to be sent
- await ConnectionStream.DisposeAsync();
-
- //Disconnect
- _socketArgs.Disconnect();
+ try
+ {
+ //dispose the stream and wait for buffered data to be sent
+ await ConnectionStream.DisposeAsync();
+ }
+ finally
+ {
+ //Disconnect
+ _descriptor.CloseConnection();
+ }
}
}
} \ No newline at end of file
diff --git a/lib/Net.Transport.SimpleTCP/src/VnSocketAsyncArgs.cs b/lib/Net.Transport.SimpleTCP/src/VnSocketAsyncArgs.cs
index 766a866..0481f21 100644
--- a/lib/Net.Transport.SimpleTCP/src/VnSocketAsyncArgs.cs
+++ b/lib/Net.Transport.SimpleTCP/src/VnSocketAsyncArgs.cs
@@ -36,13 +36,14 @@ namespace VNLib.Net.Transport.Tcp
/// Reusable <see cref="SocketAsyncEventArgs"/> that manages a pipeline for sending and recieving data.
/// on the connected socket
/// </summary>
- internal sealed class VnSocketAsyncArgs : SocketAsyncEventArgs, IReusable
+ internal sealed class VnSocketAsyncArgs : SocketAsyncEventArgs, ITcpConnectionDescriptor, IReusable
{
private readonly ISockAsyncArgsHandler _handler;
public readonly SocketPipeLineWorker SocketWorker;
public Stream Stream => SocketWorker.NetworkStream;
+
public VnSocketAsyncArgs(ISockAsyncArgsHandler handler, PipeOptions options) : base()
{
@@ -142,7 +143,17 @@ namespace VNLib.Net.Transport.Tcp
SocketError = SocketError.Success;
SocketFlags = SocketFlags.None;
}
-
+
+ ///<inheritdoc/>
+ Socket ITcpConnectionDescriptor.Socket => AcceptSocket!;
+
+ ///<inheritdoc/>
+ Stream ITcpConnectionDescriptor.GetStream() => SocketWorker.NetworkStream;
+
+ ///<inheritdoc/>
+ void ITcpConnectionDescriptor.CloseConnection() => Disconnect();
+
+
void IReusable.Prepare()
{
SocketWorker.Prepare();
@@ -174,5 +185,6 @@ namespace VNLib.Net.Transport.Tcp
//Cleanup socket worker
SocketWorker.DisposeInternal();
}
+
}
}