aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLibravatar vnugent <public@vaughnnugent.com>2023-12-30 21:15:51 -0500
committerLibravatar vnugent <public@vaughnnugent.com>2023-12-30 21:15:51 -0500
commit484c7bc63baab39904a1a715d199f879fd7a067a (patch)
tree949383bee253c5094536468be298c1ab1c03f7bf
parenta241620dd73f97496855284cf1fb1503debd28ab (diff)
tcp lib refactor, hardware accelerated memcopy, service stack features
-rw-r--r--lib/Hashing.Portable/README.md91
-rw-r--r--lib/Hashing.Portable/src/IdentityUtility/JsonWebKey.cs6
-rw-r--r--lib/Net.Http/src/Core/IHttpResponseBody.cs4
-rw-r--r--lib/Net.Http/src/Core/Response/HttpContextResponseWriting.cs4
-rw-r--r--lib/Net.Http/src/Core/Response/ResponseWriter.cs12
-rw-r--r--lib/Net.Transport.SimpleTCP/src/AwaitableAsyncServerSocket.cs376
-rw-r--r--lib/Net.Transport.SimpleTCP/src/ISocketIo.cs (renamed from lib/Net.Transport.SimpleTCP/src/ISockAsyncArgsHandler.cs)28
-rw-r--r--lib/Net.Transport.SimpleTCP/src/ITcpConnectionDescriptor.cs20
-rw-r--r--lib/Net.Transport.SimpleTCP/src/ITransportInterface.cs33
-rw-r--r--lib/Net.Transport.SimpleTCP/src/ReusableNetworkStream.cs27
-rw-r--r--lib/Net.Transport.SimpleTCP/src/SocketPipeLineWorker.cs439
-rw-r--r--lib/Net.Transport.SimpleTCP/src/TCPConfig.cs9
-rw-r--r--lib/Net.Transport.SimpleTCP/src/TcpServer.cs331
-rw-r--r--lib/Net.Transport.SimpleTCP/src/TcpServerExtensions.cs104
-rw-r--r--lib/Net.Transport.SimpleTCP/src/TransportEventContext.cs99
-rw-r--r--lib/Net.Transport.SimpleTCP/src/VnSocketAsyncArgs.cs190
-rw-r--r--lib/Plugins.Essentials.ServiceStack/src/Construction/HttpServiceStackBuilder.cs16
-rw-r--r--lib/Plugins.Essentials.ServiceStack/src/PluginStackInitializer.cs21
-rw-r--r--lib/Plugins.Runtime/src/LoaderExtensions.cs77
-rw-r--r--lib/Utils.Cryptography/argon2/CMakeLists.txt2
-rw-r--r--lib/Utils.Cryptography/monocypher/Taskfile.yaml2
-rw-r--r--lib/Utils/src/Extensions/MemoryExtensions.cs8
-rw-r--r--lib/Utils/src/IIndexable.cs4
-rw-r--r--lib/Utils/src/Memory/Caching/ObjectRental.cs50
-rw-r--r--lib/Utils/src/Memory/Caching/ObjectRentalBase.cs69
-rw-r--r--lib/Utils/src/Memory/Caching/ThreadLocalObjectStorage.cs22
-rw-r--r--lib/Utils/src/Memory/MemoryUtil.cs331
27 files changed, 1307 insertions, 1068 deletions
diff --git a/lib/Hashing.Portable/README.md b/lib/Hashing.Portable/README.md
index 08a1f44..95709e7 100644
--- a/lib/Hashing.Portable/README.md
+++ b/lib/Hashing.Portable/README.md
@@ -1,89 +1,18 @@
# VNLib.Hashing.Portable
-This library is a collection of common cryptographic functions, optimized using the VNLib.Utils
-library for interop and memory management.
+*A collection of common cryptographic functions, optimized using the VNLib.Utils library for interop and memory management.*
-#### Builds
-Debug build w/ symbols & xml docs, release builds, NuGet packages, and individually packaged source code are available on my [website](https://www.vaughnnugent.com/resources/software). All tar-gzip (.tgz) files will have an associated .sha384 appended checksum of the desired download file.
+This library supports optional loading of native libraries such as Argon2 and vnlib_monocyper at runtime for extended feature support such as Argon2id password hashing, Blake2 support, and more.
-## Argon2
-This library contains an native library interface with the Argon2 Cryptographic Hashing library. If you wish to use the Argon2 hashing functions, you must include the [Argon2 native library](https://github.com/P-H-C/phc-winner-argon2) in your project, and accept the license.
+## Builds
+Debug build w/ symbols & xml docs, release builds, NuGet packages, and individually packaged source code are available on my website (link below).All tar-gzip (.tgz) files will have an associated checksum and PGP signature of the desired download file.
-The Argon2 native libary is lazy loaded and therefor not required for the other functions in this library, if it is not included. You may specify the exact path to the native library by setting the `ARGON2_DLL_PATH`environment variable to the value of the path.
+## Docs and Guides
+Documentation, specifications, and setup guides are available on my website.
-**Notice:**
-This library does not, modify, contribute, or affect the functionality of the Argon2 library in any way.
+[Docs and Articles](https://www.vaughnnugent.com/resources/software/articles?tags=docs,_vnlib.hashing.portable)
+[Builds and Source](https://www.vaughnnugent.com/resources/software/modules/VNLib.Core)
-### Usage:
-```
-//Using the managed hash version, inputs may be binary or utf8 chars
-string encodedHash = VnArgon2.Hash2id(<password>,<salt>,<secret>,...<argon params>)
-
-//The 'raw' or 'passthru' 2id managed hashing method, binary only
-VnArgon2.Hash2id(<passbytes>,<saltbytes><secretbytes>,<rawHashOutput>,...<params>)
-
-//Verification used CryptographicOperations.FixedTimeEquals for comparison
-//managed verification, only valid with previously hashed methods
-bool valid = VnArgon2.Verify2id(<rawPass>,<hash>,<encodedHash>)
-
-//Binary only 'raw' or 'passthru' 2id managed verification
-bool valid = VnArgon2.Verify2id(<rawPass>,<salt>,<secret>,<rawHashBytes>)
-```
-
-## Other Classes
-
-The ManagedHash and RandomHash classes are simple "shortcut" methods for common hashing operations with common data encoding/decoding.
-
-The IdentityUtility namespace includes classes and methods for generating and validating JWE types, such as JWT (Json Web Token) and JWK (Json Web Key), and their various extension/helper methods.
-
-### Basic Usage
-```
-//RandomHash
-byte[] cngBytes = RandomHash.GetRandomBytes();
-RandomHash.GetRandomBytes(<binary span>);
-string base64 = RandomHash.GetRandomBase64(<size>);
-string base32 = RandomHash.GetRandomBase32(<size>);
-string hex = RandomHash.GetRandomHex(<size>);
-string encodedHash = RandomHash.GetRandomHash(<hashAlg>,<size>,<encoding>);
-GUID cngGuid = RandomHash.GetSecureGuid();
-
-//Managed hash
-ERRNO result = ManagedHash.ComputeHash(<data>,<args>);
-string encoded = ManagedHash.ComputeHash(<data>,<args>);
-byte[] rawHash = ManagedHash.ComputeHash(<data>,<args>);
-
-//HMAC
-ERRNO result = ManagedHash.ComputeHmac(<key>,<data>,<args>);
-string encoded = ManagedHash.ComputeHmac(<key>,<data>,<args>);
-byte[] rawHash = ManagedHash.ComputeHmac(<key>,<data>,<args>);
-
-
-//Parse jwt
-using JsonWebToken jwt = JsonWebToken.Parse(<jwtEncodedString>);
-bool valid = jwt.verify(<Algorithm>,<hashMethod>...);
-//Get the payload (or header, they use the same methods)
-T payload = jwt.GetPaylod<T>();//OR
-JsonDocument payload = jwt.GetPayload();
-
-//Create new JWT
-using JsonWebToken jwt = new(<optionalHeap>);
-jwt.WriteHeader(<object or binary>); //Set header
-
-jwt.WritePayload(<object or binary>); //Set by serializing it, or binary
-
-//OR init fluent payload builder
-jwt.InitPayloadClaim()
- .AddClaim(<string name>, <object value>)
- ...
- .CommitClaims(); //Serializes the claims and writes them to the JWT payload
-
-jwt.Sign(<HashAlgorithm, RSA, ECDsa>... <params>); //Sign the JWT
-
-string jwtData = jwt.Compile(); //Serialize the JWT
-```
-
-### License
-
-The software in this repository is licensed under the GNU GPL version 2.0 (or any later version).
-See the LICENSE files for more information. \ No newline at end of file
+## License
+The software in this repository is licensed under the GNU GPL version 2.0 (or any later version). See the LICENSE files for more information. \ No newline at end of file
diff --git a/lib/Hashing.Portable/src/IdentityUtility/JsonWebKey.cs b/lib/Hashing.Portable/src/IdentityUtility/JsonWebKey.cs
index a741762..576c096 100644
--- a/lib/Hashing.Portable/src/IdentityUtility/JsonWebKey.cs
+++ b/lib/Hashing.Portable/src/IdentityUtility/JsonWebKey.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2023 Vaughn Nugent
+* Copyright (c) 2024 Vaughn Nugent
*
* Library: VNLib
* Package: VNLib.Hashing.Portable
@@ -269,7 +269,7 @@ namespace VNLib.Hashing.IdentityUtility
{
using ECDsa? eCDsa = GetECDsaPrivateKey(jwk);
_ = eCDsa ?? throw new InvalidOperationException("JWK Does not contain an ECDsa private key");
- token.Sign(eCDsa, HashAlg.SHA512);
+ token.Sign(eCDsa, HashAlg.SHA256);
return;
}
default:
@@ -431,7 +431,7 @@ namespace VNLib.Hashing.IdentityUtility
if(base64.Length <= 64)
{
//Use stack buffer
- Span<byte> buffer = stackalloc byte[84];
+ Span<byte> buffer = stackalloc byte[64];
//base64url decode
ERRNO count = VnEncoding.Base64UrlDecode(base64, buffer);
diff --git a/lib/Net.Http/src/Core/IHttpResponseBody.cs b/lib/Net.Http/src/Core/IHttpResponseBody.cs
index facf8b0..0fb26c4 100644
--- a/lib/Net.Http/src/Core/IHttpResponseBody.cs
+++ b/lib/Net.Http/src/Core/IHttpResponseBody.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2023 Vaughn Nugent
+* Copyright (c) 2024 Vaughn Nugent
*
* Library: VNLib
* Package: VNLib.Net.Http
@@ -71,6 +71,6 @@ namespace VNLib.Net.Http.Core
/// <param name="writer">The response output writer</param>
/// <param name="buffer">An optional buffer used to buffer responses</param>
/// <returns>A task that resolves when the response is completed</returns>
- Task WriteEntityAsync(IResponseCompressor comp, IResponseDataWriter writer, Memory<byte> buffer);
+ Task WriteEntityAsync<TComp>(TComp comp, IResponseDataWriter writer, Memory<byte> buffer) where TComp : IResponseCompressor;
}
} \ No newline at end of file
diff --git a/lib/Net.Http/src/Core/Response/HttpContextResponseWriting.cs b/lib/Net.Http/src/Core/Response/HttpContextResponseWriting.cs
index 98c6043..918ffe1 100644
--- a/lib/Net.Http/src/Core/Response/HttpContextResponseWriting.cs
+++ b/lib/Net.Http/src/Core/Response/HttpContextResponseWriting.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2023 Vaughn Nugent
+* Copyright (c) 2024 Vaughn Nugent
*
* Library: VNLib
* Package: VNLib.Net.Http
@@ -45,7 +45,7 @@ namespace VNLib.Net.Http.Core
ValueTask discardTask = Request.InputStream.DiscardRemainingAsync();
- //See if discard is needed
+ //See if response data needs to be written
if (ResponseBody.HasData)
{
//Parallel the write and discard
diff --git a/lib/Net.Http/src/Core/Response/ResponseWriter.cs b/lib/Net.Http/src/Core/Response/ResponseWriter.cs
index 75ef790..a030784 100644
--- a/lib/Net.Http/src/Core/Response/ResponseWriter.cs
+++ b/lib/Net.Http/src/Core/Response/ResponseWriter.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2023 Vaughn Nugent
+* Copyright (c) 2024 Vaughn Nugent
*
* Library: VNLib
* Package: VNLib.Net.Http
@@ -158,7 +158,7 @@ namespace VNLib.Net.Http.Core.Response
}
///<inheritdoc/>
- public async Task WriteEntityAsync(IResponseCompressor comp, IResponseDataWriter writer, Memory<byte> buffer)
+ public async Task WriteEntityAsync<TComp>(TComp comp, IResponseDataWriter writer, Memory<byte> buffer) where TComp : IResponseCompressor
{
//Write a sliding window response
if (_userState.MemResponse != null)
@@ -254,7 +254,8 @@ namespace VNLib.Net.Http.Core.Response
} while (true);
}
- private static bool CompressNextSegment(IMemoryResponseReader reader, IResponseCompressor comp, IResponseDataWriter writer)
+ private static bool CompressNextSegment<TComp>(IMemoryResponseReader reader, TComp comp, IResponseDataWriter writer)
+ where TComp: IResponseCompressor
{
//Read the next segment
ReadOnlyMemory<byte> readSegment = comp.BlockSize > 0 ? reader.GetRemainingConstrained(comp.BlockSize) : reader.GetMemory();
@@ -271,7 +272,8 @@ namespace VNLib.Net.Http.Core.Response
return writer.Advance(res.BytesWritten) == 0;
}
- private static bool CompressNextSegment(ref ForwardOnlyMemoryReader<byte> reader, IResponseCompressor comp, IResponseDataWriter writer)
+ private static bool CompressNextSegment<TComp>(ref ForwardOnlyMemoryReader<byte> reader, TComp comp, IResponseDataWriter writer)
+ where TComp: IResponseCompressor
{
//Get output buffer
Memory<byte> output = writer.GetMemory();
@@ -316,8 +318,8 @@ namespace VNLib.Net.Http.Core.Response
public ResponsBodyDataState(IMemoryResponseReader reader)
{
Legnth = reader.Remaining;
- Stream = null;
MemResponse = reader;
+ Stream = null;
IsSet = true;
}
diff --git a/lib/Net.Transport.SimpleTCP/src/AwaitableAsyncServerSocket.cs b/lib/Net.Transport.SimpleTCP/src/AwaitableAsyncServerSocket.cs
new file mode 100644
index 0000000..8bedf89
--- /dev/null
+++ b/lib/Net.Transport.SimpleTCP/src/AwaitableAsyncServerSocket.cs
@@ -0,0 +1,376 @@
+/*
+* Copyright (c) 2024 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Net.Transport.SimpleTCP
+* File: AwaitableAsyncServerSocket.cs
+*
+* AwaitableAsyncServerSocket.cs is part of VNLib.Net.Transport.SimpleTCP which
+* is part of the larger VNLib collection of libraries and utilities.
+*
+* VNLib.Net.Transport.SimpleTCP is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 2 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Net.Transport.SimpleTCP is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+using System.IO;
+using System.Net;
+using System.Diagnostics;
+using System.Net.Sockets;
+using System.IO.Pipelines;
+using System.Threading.Tasks;
+using System.Threading.Tasks.Sources;
+using System.Runtime.InteropServices;
+
+using VNLib.Utils.Memory.Caching;
+
+namespace VNLib.Net.Transport.Tcp
+{
+ internal sealed class AwaitableAsyncServerSocket :
+ ITcpConnectionDescriptor,
+ IDisposable,
+ ISocketIo,
+ IReusable
+ {
+ private static readonly bool IsWindows = OperatingSystem.IsWindows();
+
+ private Socket? _socket;
+
+ public readonly SocketPipeLineWorker SocketWorker;
+ private readonly AwaitableValueSocketEventArgs _recvArgs = new();
+ private readonly AwaitableValueSocketEventArgs _allArgs = new();
+
+ private Task _sendTask = Task.CompletedTask;
+ private Task _recvTask = Task.CompletedTask;
+
+ public AwaitableAsyncServerSocket(PipeOptions options) : base()
+ {
+ SocketWorker = new(options);
+
+ //Set reuse flags now
+ _recvArgs.DisconnectReuseSocket = IsWindows;
+ _allArgs.DisconnectReuseSocket = IsWindows;
+ }
+
+
+ public async ValueTask<SocketError> AcceptAsync(Socket serverSocket, int recvBuffSize, int sendBuffSize)
+ {
+ /*
+ * WSA allows the kernel to wait for data during an accept before
+ * invoking user-space callback to save a kernel trap. Since this is
+ * only available on Windows
+ */
+ if (IsWindows)
+ {
+ //get buffer from the pipe to write initial accept data to
+ Memory<byte> buffer = SocketWorker.GetMemory(recvBuffSize);
+ _allArgs.SetBuffer(buffer);
+
+ //Also on windows we can reuse the previous socket if its set
+ _allArgs.AcceptSocket = _socket;
+ }
+
+ //Begin the accept
+ SocketError error = await _allArgs.AcceptAsync(serverSocket);
+
+ if(error == SocketError.Success)
+ {
+ //Store socket on success
+ _socket = _allArgs.AcceptSocket!;
+
+ //It is safe to start the pipeline now
+ _sendTask = SocketWorker.SendDoWorkAsync(this, sendBuffSize);
+
+ /*
+ * Passing the number of transferred bytes to the recv task will cause accepted
+ * data to be published (if zero thats fine too)
+ */
+ _recvTask = SocketWorker.RecvDoWorkAsync(this, _allArgs.BytesTransferred, recvBuffSize);
+ }
+
+ //Clear the buffer reference
+ _allArgs.SetBuffer(default);
+
+ return error;
+ }
+
+
+ public async ValueTask<SocketError> CloseConnectionAsync()
+ {
+ _ = _socket ?? throw new InvalidOperationException("Socket is not connected");
+
+ //Wait for the pipeline to end before disconnecting the socket
+ await SocketWorker.ShutDownClientPipeAsync();
+
+ //Wait for the send task to complete before disconnecting
+ await _sendTask.ConfigureAwait(false);
+
+ //Disconnect the socket
+ SocketError error = await _allArgs.DisconnectAsync(_socket);
+
+ /*
+ * Release hooks will take care of socket cleanup
+ * if it's required.
+ */
+
+ //Wait for recv to complete
+ await _recvTask.ConfigureAwait(false);
+
+ return error;
+ }
+
+ ///<inheritdoc/>
+ ValueTask<int> ISocketIo.SendAsync(ReadOnlyMemory<byte> buffer, SocketFlags socketFlags)
+ {
+ //Socket must always be defined as this function is called from the pipeline
+ Debug.Assert(_socket != null, "Socket is not connected");
+
+ //Get memory from readonly memory and set the send buffer
+ Memory<byte> asMemory = MemoryMarshal.AsMemory(buffer);
+ _allArgs.SetBuffer(asMemory);
+
+ return _allArgs.SendAsync(_socket, socketFlags);
+ }
+
+ ///<inheritdoc/>
+ ValueTask<int> ISocketIo.ReceiveAsync(Memory<byte> buffer, SocketFlags socketFlags)
+ {
+ //Socket must always be defined as this function is called from the pipeline
+ Debug.Assert(_socket != null, "Socket is not connected");
+
+ _recvArgs.SetBuffer(buffer);
+ return _recvArgs.ReceiveAsync(_socket, socketFlags);
+ }
+
+ void IReusable.Prepare()
+ {
+ _allArgs.Prepare();
+ _recvArgs.Prepare();
+ SocketWorker.Prepare();
+ }
+
+ bool IReusable.Release()
+ {
+ //Release should never be called before the pipeline is complete
+ Debug.Assert(_sendTask.IsCompleted, "Socket was released before send task completed");
+ Debug.Assert(_recvTask.IsCompleted, "Socket was released before recv task completed");
+
+ _allArgs.Release();
+ _recvArgs.Release();
+
+ //if the sockeet is connected (or not windows), dispose it and clear the accept socket
+ if (_socket?.Connected == true || !IsWindows)
+ {
+ _socket?.Dispose();
+ _socket = null;
+ }
+
+ return SocketWorker.Release();
+ }
+
+ ///<inheritdoc/>
+ Stream ITcpConnectionDescriptor.GetStream() => SocketWorker.NetworkStream;
+
+ ///<inheritdoc/>
+ void ITcpConnectionDescriptor.GetEndpoints(out IPEndPoint localEndpoint, out IPEndPoint remoteEndpoint)
+ {
+ localEndpoint = (_socket!.LocalEndPoint as IPEndPoint)!;
+ remoteEndpoint = (_socket!.RemoteEndPoint as IPEndPoint)!;
+ }
+
+ ///<inheritdoc/>
+ public void Dispose()
+ {
+ //Dispose the socket if its set
+ _socket?.Dispose();
+ _socket = null;
+
+ _allArgs.Dispose();
+ _recvArgs.Dispose();
+
+ //Cleanup socket worker
+ SocketWorker.DisposeInternal();
+ }
+
+ private sealed class AwaitableValueSocketEventArgs :
+ SocketAsyncEventArgs,
+ IValueTaskSource<SocketError>,
+ IValueTaskSource<int>
+ {
+ private ManualResetValueTaskSourceCore<int> AsyncTaskCore;
+
+ public void Prepare()
+ {
+ SocketError = SocketError.Success;
+ SocketFlags = SocketFlags.None;
+ }
+
+ public void Release()
+ {
+ AcceptSocket = null;
+ UserToken = null;
+ }
+
+ protected override void OnCompleted(SocketAsyncEventArgs e)
+ {
+
+ switch (e.LastOperation)
+ {
+ case SocketAsyncOperation.Receive:
+ case SocketAsyncOperation.Send:
+
+ //Clear buffer after async op
+ SetBuffer(default);
+
+ //If the operation was successfull, set the number of bytes transferred
+ if (SocketError == SocketError.Success)
+ {
+ AsyncTaskCore.SetResult(e.BytesTransferred);
+ }
+ else
+ {
+ AsyncTaskCore.SetException(new SocketException((int)SocketError));
+ }
+ break;
+
+ case SocketAsyncOperation.Accept:
+ AsyncTaskCore.SetResult((int)e.SocketError);
+ break;
+
+ case SocketAsyncOperation.Disconnect:
+ AsyncTaskCore.SetResult((int)e.SocketError);
+ break;
+
+ default:
+ AsyncTaskCore.SetException(new InvalidOperationException("Invalid socket operation"));
+ break;
+ }
+
+ //Clear flags/errors on completion
+ SocketError = SocketError.Success;
+ SocketFlags = SocketFlags.None;
+ }
+
+ /// <summary>
+ /// Begins an asynchronous accept operation on the current (bound) socket
+ /// </summary>
+ /// <param name="sock">The server socket to accept the connection</param>
+ /// <returns>True if the IO operation is pending</returns>
+ public ValueTask<SocketError> AcceptAsync(Socket sock)
+ {
+ //Store the semaphore in the user token event args
+ SocketError = SocketError.Success;
+ SocketFlags = SocketFlags.None;
+
+ //Reset task source
+ AsyncTaskCore = default;
+
+ if(!sock.AcceptAsync(this))
+ {
+ //Async op pending, return the task
+ return ValueTask.FromResult(SocketError);
+ }
+
+ //Async accept
+ return new ValueTask<SocketError>(this, AsyncTaskCore.Version);
+ }
+
+ /// <summary>
+ /// Begins an async disconnect operation on a currentl connected socket
+ /// </summary>
+ /// <returns>True if the operation is pending</returns>
+ public ValueTask<SocketError> DisconnectAsync(Socket serverSock)
+ {
+ //Clear flags
+ SocketError = SocketError.Success;
+
+ //Reset task source
+ AsyncTaskCore = default;
+
+ //accept async
+ if (!serverSock.DisconnectAsync(this))
+ {
+ return ValueTask.FromResult(SocketError);
+ }
+
+ //Async accept
+ return new ValueTask<SocketError>(this, AsyncTaskCore.Version);
+ }
+
+
+ public ValueTask<int> SendAsync(Socket socket, SocketFlags flags)
+ {
+ //Store the semaphore in the user token event args
+ SocketError = SocketError.Success;
+ SocketFlags = flags;
+
+ //Clear task source
+ AsyncTaskCore = default;
+
+ if (socket.SendAsync(this))
+ {
+ //Async accept
+ return new ValueTask<int>(this, AsyncTaskCore.Version);
+ }
+
+ //Sync send
+ return SocketError switch
+ {
+ SocketError.Success => ValueTask.FromResult(BytesTransferred),
+ _ => ValueTask.FromException<int>(new SocketException((int)SocketError))
+ };
+ }
+
+ public ValueTask<int> ReceiveAsync(Socket socket, SocketFlags flags)
+ {
+ //Store the semaphore in the user token event args
+ SocketError = SocketError.Success;
+ SocketFlags = flags;
+
+ //Clear task source
+ AsyncTaskCore = default;
+
+ if (socket.ReceiveAsync(this))
+ {
+ //Async accept
+ return new ValueTask<int>(this, AsyncTaskCore.Version);
+ }
+
+ //Clear buffer
+ SetBuffer(default);
+
+ //Sync send
+ return SocketError switch
+ {
+ SocketError.Success => ValueTask.FromResult(BytesTransferred),
+ _ => ValueTask.FromException<int>(new SocketException((int)SocketError))
+ };
+ }
+
+
+ ///<inheritdoc/>
+ public SocketError GetResult(short token) => (SocketError)AsyncTaskCore.GetResult(token);
+
+ ///<inheritdoc/>
+ public ValueTaskSourceStatus GetStatus(short token) => AsyncTaskCore.GetStatus(token);
+
+ ///<inheritdoc/>
+ public void OnCompleted(Action<object?> continuation, object? state, short token, ValueTaskSourceOnCompletedFlags flags)
+ => AsyncTaskCore.OnCompleted(continuation, state, token, flags);
+
+ ///<inheritdoc/>
+ int IValueTaskSource<int>.GetResult(short token) => AsyncTaskCore.GetResult(token);
+ }
+ }
+
+}
diff --git a/lib/Net.Transport.SimpleTCP/src/ISockAsyncArgsHandler.cs b/lib/Net.Transport.SimpleTCP/src/ISocketIo.cs
index 76fb0ae..d9ebb21 100644
--- a/lib/Net.Transport.SimpleTCP/src/ISockAsyncArgsHandler.cs
+++ b/lib/Net.Transport.SimpleTCP/src/ISocketIo.cs
@@ -1,12 +1,12 @@
/*
-* Copyright (c) 2023 Vaughn Nugent
+* Copyright (c) 2024 Vaughn Nugent
*
* Library: VNLib
* Package: VNLib.Net.Transport.SimpleTCP
-* File: ISockAsyncArgsHandler.cs
+* File: ISocketIo.cs
*
-* ISockAsyncArgsHandler.cs is part of VNLib.Net.Transport.SimpleTCP which
-* is part of the larger VNLib collection of libraries and utilities.
+* ISocketIo.cs is part of VNLib.Net.Transport.SimpleTCP which is part of the larger
+* VNLib collection of libraries and utilities.
*
* VNLib.Net.Transport.SimpleTCP is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
@@ -22,20 +22,18 @@
* along with this program. If not, see https://www.gnu.org/licenses/.
*/
+using System;
+using System.Net.Sockets;
+using System.Threading.Tasks;
+
+
+
namespace VNLib.Net.Transport.Tcp
{
- internal interface ISockAsyncArgsHandler
+ internal interface ISocketIo
{
- /// <summary>
- /// Called when an asynchronous accept operation has completed
- /// </summary>
- /// <param name="args">The arguments that completed the accept operation</param>
- void OnSocketAccepted(VnSocketAsyncArgs args);
+ ValueTask<int> SendAsync(ReadOnlyMemory<byte> buffer, SocketFlags socketFlags);
- /// <summary>
- /// Called when an asynchronous disconnect operation has completed
- /// </summary>
- /// <param name="args">The args that are disconnecting</param>
- void OnSocketDisconnected(VnSocketAsyncArgs args);
+ ValueTask<int> ReceiveAsync(Memory<byte> buffer, SocketFlags socketFlags);
}
}
diff --git a/lib/Net.Transport.SimpleTCP/src/ITcpConnectionDescriptor.cs b/lib/Net.Transport.SimpleTCP/src/ITcpConnectionDescriptor.cs
index fe3013c..e722e46 100644
--- a/lib/Net.Transport.SimpleTCP/src/ITcpConnectionDescriptor.cs
+++ b/lib/Net.Transport.SimpleTCP/src/ITcpConnectionDescriptor.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2023 Vaughn Nugent
+* Copyright (c) 2024 Vaughn Nugent
*
* Library: VNLib
* Package: VNLib.Net.Transport.SimpleTCP
@@ -23,8 +23,7 @@
*/
using System.IO;
-using System.Net.Sockets;
-
+using System.Net;
namespace VNLib.Net.Transport.Tcp
{
@@ -33,7 +32,12 @@ namespace VNLib.Net.Transport.Tcp
/// </summary>
public interface ITcpConnectionDescriptor
{
- internal Socket Socket { get; }
+ /// <summary>
+ /// Gets the local and remote endpoints of the connection
+ /// </summary>
+ /// <param name="localEndpoint">The local socket connection</param>
+ /// <param name="remoteEndpoint">The remote client connection endpoint</param>
+ void GetEndpoints(out IPEndPoint localEndpoint, out IPEndPoint remoteEndpoint);
/// <summary>
/// Gets a stream wrapper around the connection.
@@ -42,13 +46,5 @@ namespace VNLib.Net.Transport.Tcp
/// You must dispose of this stream when you are done with it.
/// </remarks>
Stream GetStream();
-
- /// <summary>
- /// Closes a connection and cleans up any resources
- /// </summary>
- /// <remarks>
- /// You must destory any references to this connection descriptor after calling this method.
- /// </remarks>
- void CloseConnection();
}
} \ No newline at end of file
diff --git a/lib/Net.Transport.SimpleTCP/src/ITransportInterface.cs b/lib/Net.Transport.SimpleTCP/src/ITransportInterface.cs
index 7d21995..5d16c69 100644
--- a/lib/Net.Transport.SimpleTCP/src/ITransportInterface.cs
+++ b/lib/Net.Transport.SimpleTCP/src/ITransportInterface.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2022 Vaughn Nugent
+* Copyright (c) 2024 Vaughn Nugent
*
* Library: VNLib
* Package: VNLib.Net.Transport.SimpleTCP
@@ -36,50 +36,37 @@ namespace VNLib.Net.Transport.Tcp
interface ITransportInterface
{
/// <summary>
- /// Gets or sets the read timeout in milliseconds
- /// </summary>
- int RecvTimeoutMs { get; set; }
-
- /// <summary>
- /// Gets or set the time (in milliseconds) the transport should wait for a send operation
- /// </summary>
- int SendTimeoutMs { get; set; }
-
- /// <summary>
/// Performs an asynchronous send operation
/// </summary>
/// <param name="data">The buffer containing the data to send to the client</param>
+ /// <param name="timeout">The timeout in milliseconds</param>
/// <param name="cancellation">A token to cancel the operation</param>
/// <returns>A ValueTask that completes when the send operation is complete</returns>
- ValueTask SendAsync(ReadOnlyMemory<byte> data, CancellationToken cancellation);
+ ValueTask SendAsync(ReadOnlyMemory<byte> data, int timeout, CancellationToken cancellation);
/// <summary>
/// Performs an asynchronous send operation
/// </summary>
/// <param name="buffer">The data buffer to write received data to</param>
+ /// <param name="timeout">The timeout in milliseconds</param>
/// <param name="cancellation">A token to cancel the operation</param>
/// <returns>A ValueTask that returns the number of bytes read into the buffer</returns>
- ValueTask<int> RecvAsync(Memory<byte> buffer, CancellationToken cancellation);
-
+ ValueTask<int> RecvAsync(Memory<byte> buffer, int timeout, CancellationToken cancellation);
+
/// <summary>
/// Performs a synchronous send operation
/// </summary>
+ /// <param name="timeout">The timeout in milliseconds</param>
/// <param name="data">The buffer to send to the client</param>
- void Send(ReadOnlySpan<byte> data);
+ void Send(ReadOnlySpan<byte> data, int timeout);
/// <summary>
/// Performs a synchronous receive operation
/// </summary>
+ /// <param name="timeout">The timeout in milliseconds</param>
/// <param name="buffer">The buffer to copy output data to</param>
/// <returns>The number of bytes received</returns>
- int Recv(Span<byte> buffer);
-
- /// <summary>
- /// Raised when the interface is no longer required and resources
- /// related to the connection should be released.
- /// </summary>
- /// <returns>A task that resolves when the operation is complete</returns>
- Task CloseAsync();
+ int Recv(Span<byte> buffer, int timeout);
}
} \ No newline at end of file
diff --git a/lib/Net.Transport.SimpleTCP/src/ReusableNetworkStream.cs b/lib/Net.Transport.SimpleTCP/src/ReusableNetworkStream.cs
index ce989b3..00b5d7c 100644
--- a/lib/Net.Transport.SimpleTCP/src/ReusableNetworkStream.cs
+++ b/lib/Net.Transport.SimpleTCP/src/ReusableNetworkStream.cs
@@ -62,20 +62,23 @@ namespace VNLib.Net.Transport.Tcp
public override void CopyTo(Stream destination, int bufferSize) => throw new NotSupportedException("CopyTo is not supported");
#endregion
+ private int _recvTimeoutMs;
+ private int _sendTimeoutMs;
+
//Read timeout to use when receiving data
public override int ReadTimeout
{
- get => Transport.RecvTimeoutMs;
+ get => _recvTimeoutMs;
//Allow -1 to set to infinite timeout
- set => Transport.RecvTimeoutMs = value > -2 ? value : throw new ArgumentException("Write timeout must be a 32bit signed integer larger than 0");
+ set => _recvTimeoutMs = value > -2 ? value : throw new ArgumentException("Write timeout must be a 32bit signed integer larger than 0");
}
// Write timeout is not currently used, becasue the writer managed socket timeouts
public override int WriteTimeout
{
- get => Transport.SendTimeoutMs;
+ get => _sendTimeoutMs;
//Allow -1 to set to infinite timeout
- set => Transport.SendTimeoutMs = value > -2 ? value : throw new ArgumentException("Write timeout must be a 32bit signed integer larger than -1");
+ set => _sendTimeoutMs = value > -2 ? value : throw new ArgumentException("Write timeout must be a 32bit signed integer larger than -1");
}
//Timer used to cancel pipeline recv timeouts
@@ -88,11 +91,7 @@ namespace VNLib.Net.Transport.Tcp
///<inheritdoc/>
public override void Close()
- {
- //Call sync
- Task closing = Transport.CloseAsync();
- closing.GetAwaiter().GetResult();
- }
+ { }
///<inheritdoc/>
public override Task FlushAsync(CancellationToken cancellationToken) => Task.CompletedTask;
@@ -105,7 +104,7 @@ namespace VNLib.Net.Transport.Tcp
public override int Read(byte[] buffer, int offset, int count) => Read(buffer.AsSpan(offset, count));
///<inheritdoc/>
- public override int Read(Span<byte> buffer) => Transport.Recv(buffer);
+ public override int Read(Span<byte> buffer) => Transport.Recv(buffer, _recvTimeoutMs);
///<inheritdoc/>
public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
@@ -114,13 +113,13 @@ namespace VNLib.Net.Transport.Tcp
///<inheritdoc/>
public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
- => Transport.RecvAsync(buffer, cancellationToken);
+ => Transport.RecvAsync(buffer, _recvTimeoutMs, cancellationToken);
///<inheritdoc/>
public override void Write(byte[] buffer, int offset, int count) => Write(buffer.AsSpan(offset, count));
///<inheritdoc/>
- public override void Write(ReadOnlySpan<byte> buffer) => Transport.Send(buffer);
+ public override void Write(ReadOnlySpan<byte> buffer) => Transport.Send(buffer, _sendTimeoutMs);
///<inheritdoc/>
public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
@@ -130,12 +129,12 @@ namespace VNLib.Net.Transport.Tcp
///<exception cref="IOException"></exception>
///<exception cref="ObjectDisposedException"></exception>
public override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellation = default)
- => Transport.SendAsync(buffer, cancellation);
+ => Transport.SendAsync(buffer, _sendTimeoutMs, cancellation);
/*
* Override dispose to intercept base cleanup until the internal release
*/
- public override ValueTask DisposeAsync() => new (Transport.CloseAsync());
+ public override ValueTask DisposeAsync() => ValueTask.CompletedTask;
}
} \ No newline at end of file
diff --git a/lib/Net.Transport.SimpleTCP/src/SocketPipeLineWorker.cs b/lib/Net.Transport.SimpleTCP/src/SocketPipeLineWorker.cs
index c0321a4..30f89fe 100644
--- a/lib/Net.Transport.SimpleTCP/src/SocketPipeLineWorker.cs
+++ b/lib/Net.Transport.SimpleTCP/src/SocketPipeLineWorker.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2023 Vaughn Nugent
+* Copyright (c) 2024 Vaughn Nugent
*
* Library: VNLib
* Package: VNLib.Net.Transport.SimpleTCP
@@ -38,21 +38,13 @@ using VNLib.Utils.Extensions;
namespace VNLib.Net.Transport.Tcp
{
+
/// <summary>
/// A reuseable socket pipeline provider, that marshals data from a network stream
/// to a connected socket.
/// </summary>
internal sealed class SocketPipeLineWorker : ITransportInterface, IReusable
{
- /*
- * [0] = Recv
- * [1] = Send
- * [2] = Send Complete
- * [3] = Read Complete
- */
-
- private readonly Task[] _tasks;
-
public readonly ReusableNetworkStream NetworkStream;
private readonly Pipe SendPipe;
private readonly Pipe RecvPipe;
@@ -60,15 +52,6 @@ namespace VNLib.Net.Transport.Tcp
private readonly Timer SendTimer;
private readonly Stream RecvStream;
- private CancellationTokenSource? _cts;
-
- ///<inheritdoc/>
- public int SendTimeoutMs { get; set; }
-
- ///<inheritdoc/>
- public int RecvTimeoutMs { get; set; }
-
-
/// <summary>
/// Initalizes a new reusable socket pipeline worker
/// </summary>
@@ -87,57 +70,21 @@ namespace VNLib.Net.Transport.Tcp
//Init reusable network stream
NetworkStream = new(this);
-
- SendTimeoutMs = Timeout.Infinite;
- RecvTimeoutMs = Timeout.Infinite;
-
- /*
- * Store the operation tasks in an array, so they can be
- * joined when the stream is closed
- */
- _tasks = new Task[4];
}
public void Prepare()
- { }
+ {
+ NetworkStream.ReadTimeout = Timeout.Infinite;
+ NetworkStream.WriteTimeout = Timeout.Infinite;
+ }
public bool Release()
{
_sysSocketBufferSize = 0;
- /*
- * If the pipeline has been started, then the pipes
- * will be completed by the worker threads (or by the streams)
- * and when release is called, there will no longer be
- * an observer for the result, which means the pipes
- * may be safely reset for reuse
- */
- if (_tasks[0] != null)
- {
- SendPipe.Reset();
- RecvPipe.Reset();
- }
- /*
- * If socket had an error and was not started,
- * it means there may be data written to the
- * recv pipe from the accept operation, that
- * needs to be cleared
- */
- else
- {
- //Complete the recvpipe then reset it to discard buffered data
- RecvPipe.Reader.Complete();
- RecvPipe.Writer.Complete();
- //now reset it
- RecvPipe.Reset();
- }
-
- //Cleanup tasks
- Array.Clear(_tasks);
-
- //Cleanup cts
- _cts?.Dispose();
- _cts = null;
+ //Reset pipes for use
+ SendPipe.Reset();
+ RecvPipe.Reset();
return true;
}
@@ -149,21 +96,6 @@ namespace VNLib.Net.Transport.Tcp
/// <returns>A memory structure of the specified size</returns>
public Memory<byte> GetMemory(int bufferSize) => RecvPipe.Writer.GetMemory(bufferSize);
- /// <summary>
- /// Begins async work to receive and send data on a connected socket
- /// </summary>
- /// <param name="client">The socket to read/write from</param>
- /// <param name="bytesTransferred">The number of bytes to be commited</param>
- public void Start(Socket client, int bytesTransferred)
- {
- //Advance writer
- RecvPipe.Writer.Advance(bytesTransferred);
- //begin recv tasks, and pass inital data to be flushed flag
- _tasks[0] = RecvDoWorkAsync(client, bytesTransferred > 0);
- _tasks[1] = SendDoWorkAsync(client);
- }
-
-
/*
* NOTES
*
@@ -195,11 +127,19 @@ namespace VNLib.Net.Transport.Tcp
*/
private ReadResult _sendReadRes;
+ private int _sysSocketBufferSize;
- private async Task SendDoWorkAsync(Socket sock)
+ public async Task SendDoWorkAsync<TIO>(TIO sock, int sendBufferSize)
+ where TIO : ISocketIo
{
+ Exception? errCause = null;
+ ReadOnlySequence<byte>.Enumerator enumerator;
+ ForwardOnlyMemoryReader<byte> segmentReader;
+
try
{
+ _sysSocketBufferSize = sendBufferSize;
+
//Enter work loop
while (true)
{
@@ -207,7 +147,7 @@ namespace VNLib.Net.Transport.Tcp
_sendReadRes = await SendPipe.Reader.ReadAsync(CancellationToken.None);
//Catch error/cancel conditions and break the loop
- if (_sendReadRes.IsCanceled || !sock.Connected || _sendReadRes.Buffer.IsEmpty)
+ if (_sendReadRes.IsCanceled || _sendReadRes.Buffer.IsEmpty)
{
break;
}
@@ -218,7 +158,7 @@ namespace VNLib.Net.Transport.Tcp
*/
//Get enumerator to write memory segments
- ReadOnlySequence<byte>.Enumerator enumerator = _sendReadRes.Buffer.GetEnumerator();
+ enumerator = _sendReadRes.Buffer.GetEnumerator();
//Begin enumerator
while (enumerator.MoveNext())
@@ -231,21 +171,27 @@ namespace VNLib.Net.Transport.Tcp
* move to the next segment
*/
- ForwardOnlyMemoryReader<byte> reader = new(enumerator.Current);
+ segmentReader = new(enumerator.Current);
- while(reader.WindowSize > 0)
+ while(segmentReader.WindowSize > 0)
{
//Write segment to socket, and upate written data
- int written = await sock.SendAsync(reader.Window, SocketFlags.None);
+ int written = await sock.SendAsync(segmentReader.Window, SocketFlags.None)
+ .ConfigureAwait(false);
- if(written == reader.WindowSize)
+ if(written < 0)
+ {
+ goto ExitOnSocketErr;
+ }
+
+ if(written == segmentReader.WindowSize)
{
//All data was written
break;
}
//Advance unread window to end of the written data
- reader.Advance(written);
+ segmentReader.Advance(written);
}
//Advance to next window/segment
}
@@ -260,44 +206,45 @@ namespace VNLib.Net.Transport.Tcp
}
}
- //All done, complete the send pipe reader
- await SendPipe.Reader.CompleteAsync();
+ ExitOnSocketErr:
+ ;
+
}
catch (Exception ex)
{
- //Complete the send pipe reader
- await SendPipe.Reader.CompleteAsync(ex);
+ errCause = ex;
}
finally
{
_sendReadRes = default;
- //Cancel the recv task
- _cts!.Cancel();
+
+ //Complete the send pipe reader
+ await SendPipe.Reader.CompleteAsync(errCause);
}
}
private FlushResult _recvFlushRes;
- private int _sysSocketBufferSize;
- private async Task RecvDoWorkAsync(Socket sock, bool initialData)
- {
- //init new cts
- _cts = new();
-
+ public async Task RecvDoWorkAsync<TIO>(TIO sock, int bytesTransferred, int recvBufferSize)
+ where TIO : ISocketIo
+ {
Exception? cause = null;
+ Memory<byte> buffer;
+
try
{
- //Avoid syscall?
- _sysSocketBufferSize = sock.ReceiveBufferSize;
-
//If initial data was buffered, it needs to be published to the reader
- if (initialData)
+ if (bytesTransferred > 0)
{
+ //Advance the write to written data from accept
+ RecvPipe.Writer.Advance(bytesTransferred);
+
//Flush initial data
- FlushResult res = await RecvPipe.Writer.FlushAsync(CancellationToken.None);
+ _recvFlushRes = await RecvPipe.Writer.FlushAsync(CancellationToken.None);
- if (res.IsCompleted || res.IsCanceled)
+ //Check flush result for error/cancel
+ if (IsPipeClosedAfterFlush(ref _recvFlushRes))
{
//Exit
return;
@@ -308,14 +255,14 @@ namespace VNLib.Net.Transport.Tcp
while (true)
{
//Get buffer from pipe writer
- Memory<byte> buffer = RecvPipe.Writer.GetMemory(_sysSocketBufferSize);
+ buffer = RecvPipe.Writer.GetMemory(recvBufferSize);
//Wait for data or error from socket
- int count = await sock.ReceiveAsync(buffer, SocketFlags.None, _cts.Token);
+ int count = await sock.ReceiveAsync(buffer, SocketFlags.None);
- //socket returned emtpy data
- if (count == 0 || !sock.Connected)
+ if(count <= 0)
{
+ //Connection is softly closing, exit
break;
}
@@ -326,24 +273,18 @@ namespace VNLib.Net.Transport.Tcp
_recvFlushRes = await RecvPipe.Writer.FlushAsync(CancellationToken.None);
//Writing has completed, time to exit
- if (_recvFlushRes.IsCompleted || _recvFlushRes.IsCanceled)
+ if (IsPipeClosedAfterFlush(ref _recvFlushRes))
{
break;
}
}
}
- //Normal exit
- catch (OperationCanceledException)
- {}
- catch (SocketException se)
- {
- cause = se;
- //Cancel sending reader task because the socket has an error and cannot be used
- SendPipe.Reader.CancelPendingRead();
- }
catch (Exception ex)
{
cause = ex;
+
+ //Cancel sending reader task because the socket has an error and cannot be used
+ SendPipe.Reader.CancelPendingRead();
}
finally
{
@@ -359,6 +300,9 @@ namespace VNLib.Net.Transport.Tcp
}
+ private static bool IsPipeClosedAfterFlush(ref FlushResult result) => result.IsCanceled || result.IsCompleted;
+
+
/// <summary>
/// The internal cleanup/dispose method to be called
/// when the pipeline is no longer needed
@@ -367,43 +311,41 @@ namespace VNLib.Net.Transport.Tcp
{
RecvTimer.Dispose();
SendTimer.Dispose();
+ }
- //Perform some managed cleanup
-
- //Cleanup tasks
- Array.Clear(_tasks);
-
- //Cleanup cts
- _cts?.Dispose();
- _cts = null;
+ /// <summary>
+ /// Must be called when the pipeline is requested to be closed
+ /// </summary>
+ /// <returns>A value task that complets when the piepline is completed</returns>
+ internal async ValueTask ShutDownClientPipeAsync()
+ {
+ //Complete the data input so sending completes
+ await SendPipe.Writer.CompleteAsync();
+ await RecvPipe.Reader.CompleteAsync();
}
-
- private static async Task AwaitFlushTask(ValueTask<FlushResult> valueTask, Timer? sendTimer)
+
+ private static async Task AwaitFlushTask<TTimer>(ValueTask<FlushResult> valueTask, TTimer timer)
+ where TTimer : INetTimer
{
try
{
FlushResult result = await valueTask.ConfigureAwait(false);
-
- if (result.IsCanceled)
- {
- ThrowHelpers.ThrowWriterCanceled();
- }
+ ThrowHelpers.ThrowIfWriterCanceled(result.IsCanceled);
}
finally
{
- sendTimer?.Stop();
+ timer.Stop();
}
}
- private ValueTask SendWithTimerInternalAsync(ReadOnlyMemory<byte> data, CancellationToken cancellation)
+ private ValueTask SendWithTimerInternalAsync<TTimer>(in TTimer timer, CancellationToken cancellation)
+ where TTimer : INetTimer
{
//Start send timer
- SendTimer.Restart(SendTimeoutMs);
+ timer.Start();
try
{
- CopyAndPublishDataOnSendPipe(data);
-
//Send the segment
ValueTask<FlushResult> result = SendPipe.Writer.FlushAsync(cancellation);
@@ -411,140 +353,111 @@ namespace VNLib.Net.Transport.Tcp
if (result.IsCompleted)
{
//Stop timer
- SendTimer.Stop();
+ timer.Stop();
//safe to get the flush result sync, may throw, so preserve the call stack
FlushResult fr = result.GetAwaiter().GetResult();
-
+
//Check for canceled and throw
return fr.IsCanceled
- ? throw new OperationCanceledException("The write operation was canceled by the underlying PipeWriter")
+ ? ValueTask.FromException(new OperationCanceledException("The write operation was canceled by the underlying PipeWriter"))
: ValueTask.CompletedTask;
}
else
{
//Wrap the task in a ValueTask since it must be awaited, and will happen on background thread
- return new(AwaitFlushTask(result, SendTimer));
+ return new(AwaitFlushTask(result, timer));
}
}
catch
{
//Stop timer on exception
- SendTimer.Stop();
+ timer.Stop();
throw;
}
}
- private ValueTask SendWithoutTimerInternalAsync(ReadOnlyMemory<byte> data, CancellationToken cancellation)
+ private ValueTask SendAsync(ReadOnlySpan<byte> data, int timeout, CancellationToken cancellation)
{
- CopyAndPublishDataOnSendPipe(data);
-
- //Send the segment
- ValueTask<FlushResult> result = SendPipe.Writer.FlushAsync(cancellation);
+ //Publish send data to send pipe
+ CopyAndPublishDataOnSendPipe(data, _sysSocketBufferSize, SendPipe.Writer);
- //Task completed successfully, so
- if (result.IsCompleted)
+ //See if timer is required
+ if (timeout < 1)
{
- /*
- * We can get the flush result synchronously, it may throw
- * so preserve the call stack
- */
- FlushResult fr = result.GetAwaiter().GetResult();
-
- //Check for canceled and throw
- return fr.IsCanceled
- ? throw new OperationCanceledException("The write operation was canceled by the underlying PipeWriter")
- : ValueTask.CompletedTask;
+ NoOpTimerWrapper noOpTimer = default;
+
+ //no timer
+ return SendWithTimerInternalAsync(in noOpTimer, cancellation);
}
else
{
- //Wrap the task in a ValueTask since it must be awaited, and will happen on background thread
- return new(AwaitFlushTask(result, null));
+ TpTimerWrapper sendTimer = new(SendTimer, timeout);
+
+ //Pass new send timer to send method
+ return SendWithTimerInternalAsync(in sendTimer, cancellation);
}
}
- private void CopyAndPublishDataOnSendPipe(ReadOnlyMemory<byte> src)
+ ValueTask ITransportInterface.SendAsync(ReadOnlyMemory<byte> data, int timeout, CancellationToken cancellation)
{
- Debug.Assert(_sysSocketBufferSize > 0, "A call to CopyAndPublishDataOnSendPipe was made before a socket was connected");
+ return SendAsync(data.Span, timeout, cancellation);
+ }
+
+ private static void CopyAndPublishDataOnSendPipe<TWriter>(ReadOnlySpan<byte> src, int bufferSize, TWriter writer)
+ where TWriter: IBufferWriter<byte>
+ {
+ Debug.Assert(bufferSize > 0, "A call to CopyAndPublishDataOnSendPipe was made before a socket was connected");
+
+ ref byte srcRef = ref MemoryMarshal.GetReference(src);
/*
- * Clamp the buffer size to the system socket buffer size. If the
- * buffer is larger then, we will need to publish multiple segments
+ * Only publish blocks up to the size of the socket buffer
+ * If blocks are larger than the socket buffer, they will
+ * be published in chunks up to the size of the socket buffer
*/
- if(src.Length > _sysSocketBufferSize)
+ uint written = 0;
+ while (written < src.Length)
{
- //Store local src buffer reference to copy to
- ref byte srcRef = ref MemoryMarshal.GetReference(src.Span);
-
- uint written = 0;
- while (written < src.Length)
- {
- int dataToCopy = (int)Math.Min(_sysSocketBufferSize, src.Length - written);
+ //Clamp the data to copy to the size of the socket buffer
+ int dataToCopy = (int)Math.Min(bufferSize, src.Length - written);
- //Get a new buffer span, and ref
- Span<byte> dest = SendPipe.Writer.GetSpan(dataToCopy);
- ref byte destRef = ref MemoryMarshal.GetReference(dest);
+ //Get a new buffer span, as large as the data to copy
+ Span<byte> dest = writer.GetSpan(dataToCopy);
+ ref byte destRef = ref MemoryMarshal.GetReference(dest);
- //Copy data to the buffer at the new position
- MemoryUtil.Memmove(ref srcRef, written, ref destRef, 0, (uint)dataToCopy);
+ //Copy data to the buffer at the new position (attempt to use hardware acceleration)
+ MemoryUtil.AcceleratedMemmove(ref srcRef, written, ref destRef, 0, (uint)dataToCopy);
- //Advance the writer by the number of bytes written
- SendPipe.Writer.Advance(dataToCopy);
+ //Advance the writer by the number of bytes written
+ writer.Advance(dataToCopy);
- //Increment the written count
- written += (uint)dataToCopy;
- }
+ //Increment the written count
+ written += (uint)dataToCopy;
}
- else
- {
- //Single segment, just copy to the writer
- Span<byte> dest = SendPipe.Writer.GetSpan(src.Length);
- src.Span.CopyTo(dest);
- SendPipe.Writer.Advance(src.Length);
- }
- }
+ }
- ValueTask ITransportInterface.SendAsync(ReadOnlyMemory<byte> data, CancellationToken cancellation)
- {
- //Use timer if timeout is set, dont otherwise
- return SendTimeoutMs < 1 ? SendWithoutTimerInternalAsync(data, cancellation) : SendWithTimerInternalAsync(data, cancellation);
- }
-
- void ITransportInterface.Send(ReadOnlySpan<byte> data)
+ void ITransportInterface.Send(ReadOnlySpan<byte> data, int timeout)
{
- //Determine if the send timer should be used
- Timer? _timer = SendTimeoutMs < 1 ? null : SendTimer;
-
- //Write data directly to the writer buffer
- SendPipe.Writer.Write(data);
+ //Call async send and wait for completion
+ ValueTask result = SendAsync(data, timeout, CancellationToken.None);
- //Start send timer
- _timer?.Restart(SendTimeoutMs);
-
- try
+ //If the task is completed, then it was sync, so get the result
+ if (result.IsCompleted)
{
- //Send the segment
- ValueTask<FlushResult> result = SendPipe.Writer.FlushAsync(CancellationToken.None);
-
- //Await the result synchronously
- FlushResult fr = result.ConfigureAwait(false).GetAwaiter().GetResult();
-
- if (fr.IsCanceled)
- {
- ThrowHelpers.ThrowWriterCanceled();
- }
+ result.GetAwaiter().GetResult();
}
- finally
+ //Otherwise convert to task then await it
+ else
{
- //Stop timer
- _timer?.Stop();
+ result.AsTask().GetAwaiter().GetResult();
}
}
-
- ValueTask<int> ITransportInterface.RecvAsync(Memory<byte> buffer, CancellationToken cancellation)
+ private ValueTask<int> RecvWithTimeoutAsync<TTimer>(Memory<byte> data, in TTimer timer, CancellationToken cancellation)
+ where TTimer : INetTimer
{
- static async Task<int> AwaitAsyncRead(ValueTask<int> task, Timer recvTimer)
+ static async Task<int> AwaitAsyncRead(ValueTask<int> task, TTimer recvTimer)
{
try
{
@@ -556,12 +469,12 @@ namespace VNLib.Net.Transport.Tcp
}
}
- //Restart recv timer
- RecvTimer.Restart(RecvTimeoutMs);
+ //Restart timer
+ timer.Start();
try
{
//Read async and get the value task
- ValueTask<int> result = RecvStream.ReadAsync(buffer, cancellation);
+ ValueTask<int> result = RecvStream.ReadAsync(data, cancellation);
if (result.IsCompleted)
{
@@ -576,20 +489,39 @@ namespace VNLib.Net.Transport.Tcp
else
{
//return async as value task
- return new(AwaitAsyncRead(result, RecvTimer));
+ return new(AwaitAsyncRead(result, timer));
}
}
catch
{
- RecvTimer.Stop();
+ timer.Stop();
throw;
}
}
- int ITransportInterface.Recv(Span<byte> buffer)
+ ValueTask<int> ITransportInterface.RecvAsync(Memory<byte> buffer, int timeout, CancellationToken cancellation)
+ {
+ //See if timer is required
+ if (timeout < 1)
+ {
+ NoOpTimerWrapper noOpTimer = default;
+
+ //no timer
+ return RecvWithTimeoutAsync(buffer, in noOpTimer, cancellation);
+ }
+ else
+ {
+ TpTimerWrapper recvTimer = new(RecvTimer, timeout);
+
+ //Pass new send timer to send method
+ return RecvWithTimeoutAsync(buffer, in recvTimer, cancellation);
+ }
+ }
+
+ int ITransportInterface.Recv(Span<byte> buffer, int timeout)
{
//Restart timer
- RecvTimer.Restart(RecvTimeoutMs);
+ RecvTimer.Restart(timeout);
try
{
return RecvStream.Read(buffer);
@@ -599,26 +531,47 @@ namespace VNLib.Net.Transport.Tcp
RecvTimer.Stop();
}
}
+
- Task ITransportInterface.CloseAsync()
- {
- //Complete the send pipe writer since stream is closed
- _tasks[2] = SendPipe.Writer.CompleteAsync().AsTask();
+ private static class ThrowHelpers
+ {
+ public static void ThrowIfWriterCanceled(bool isCancelled)
+ {
+ if(isCancelled)
+ {
+ throw new OperationCanceledException("The write operation was canceled by the underlying PipeWriter");
+ }
+ }
+ }
- //Complete the recv pipe reader since its no longer used
- _tasks[3] = RecvPipe.Reader.CompleteAsync().AsTask();
+ private interface INetTimer
+ {
+ void Start();
- //Join worker tasks, no alloc if completed sync, otherwise alloc anyway
- return Task.WhenAll(_tasks);
+ void Stop();
}
-
- private static class ThrowHelpers
- {
- public static void ThrowWriterCanceled()
+ private readonly struct TpTimerWrapper : INetTimer
+ {
+ private readonly Timer _timer;
+ private readonly int _timeout;
+
+ public TpTimerWrapper(Timer timer, int timeout)
{
- throw new OperationCanceledException("The write operation was canceled by the underlying PipeWriter");
+ _timer = timer;
+ _timeout = timeout;
}
+
+ public readonly void Start() => _timer.Restart(_timeout);
+
+ public readonly void Stop() => _timer.Stop();
+ }
+
+ private readonly struct NoOpTimerWrapper : INetTimer
+ {
+ public readonly void Start() { }
+
+ public readonly void Stop() { }
}
}
}
diff --git a/lib/Net.Transport.SimpleTCP/src/TCPConfig.cs b/lib/Net.Transport.SimpleTCP/src/TCPConfig.cs
index a6a416c..1dbf6e4 100644
--- a/lib/Net.Transport.SimpleTCP/src/TCPConfig.cs
+++ b/lib/Net.Transport.SimpleTCP/src/TCPConfig.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2023 Vaughn Nugent
+* Copyright (c) 2024 Vaughn Nugent
*
* Library: VNLib
* Package: VNLib.Net.Transport.SimpleTCP
@@ -87,6 +87,11 @@ namespace VNLib.Net.Transport.Tcp
/// An optional callback invoked after the socket has been created
/// for optional appliction specific socket configuration
/// </summary>
- public Action<Socket>? OnSocketCreated { get; init; }
+ public readonly Action<Socket>? OnSocketCreated { get; init; }
+ /// <summary>
+ /// Enables verbose logging of TCP operations using the <see cref="LogLevel.Verbose"/>
+ /// level
+ /// </summary>
+ public readonly bool DebugTcpLog { get; init; }
}
} \ No newline at end of file
diff --git a/lib/Net.Transport.SimpleTCP/src/TcpServer.cs b/lib/Net.Transport.SimpleTCP/src/TcpServer.cs
index b85aa33..67a1751 100644
--- a/lib/Net.Transport.SimpleTCP/src/TcpServer.cs
+++ b/lib/Net.Transport.SimpleTCP/src/TcpServer.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2023 Vaughn Nugent
+* Copyright (c) 2024 Vaughn Nugent
*
* Library: VNLib
* Package: VNLib.Net.Transport.SimpleTCP
@@ -23,13 +23,16 @@
*/
using System;
+using System.Net;
using System.Security;
using System.Threading;
+using System.Diagnostics;
using System.Net.Sockets;
-using System.IO.Pipelines;
using System.Threading.Tasks;
using System.Runtime.CompilerServices;
+using System.IO.Pipelines;
+
using VNLib.Utils.Async;
using VNLib.Utils.Logging;
using VNLib.Utils.Memory.Caching;
@@ -46,15 +49,17 @@ namespace VNLib.Net.Transport.Tcp
/// connections is expected. This class cannot be inherited
/// </para>
/// </summary>
- public sealed class TcpServer : ICacheHolder, ISockAsyncArgsHandler
+ public sealed class TcpServer : ICacheHolder
{
+ private readonly TCPConfig _config;
+
/// <summary>
/// The current <see cref="TcpServer"/> configuration
/// </summary>
- public TCPConfig Config { get; }
+ public ref readonly TCPConfig Config => ref _config;
- private readonly ObjectRental<VnSocketAsyncArgs> SockAsyncArgPool;
- private readonly PipeOptions PipeOptions;
+ private readonly ObjectRental<AwaitableAsyncServerSocket> SockAsyncArgPool;
+ private readonly AsyncQueue<ITcpConnectionDescriptor> WaitingSockets;
/// <summary>
/// Initializes a new <see cref="TcpServer"/> with the specified <see cref="TCPConfig"/>
@@ -66,40 +71,47 @@ namespace VNLib.Net.Transport.Tcp
public TcpServer(TCPConfig config, PipeOptions? pipeOptions = null)
{
//Check config
- if(pipeOptions == null)
+ if (pipeOptions == null)
{
//Pool is required when using default pipe options
_ = config.BufferPool ?? throw new ArgumentException("Buffer pool argument cannot be null");
}
-
+
_ = config.Log ?? throw new ArgumentException("Log argument is required");
-
+
if (config.MaxRecvBufferData < 4096)
{
throw new ArgumentException("MaxRecvBufferData size must be at least 4096 bytes to avoid data pipeline pefromance issues");
}
- if(config.AcceptThreads < 1)
+ if (config.AcceptThreads < 1)
{
throw new ArgumentException("Accept thread count must be greater than 0");
}
- if(config.AcceptThreads > Environment.ProcessorCount)
+ if (config.AcceptThreads > Environment.ProcessorCount)
{
config.Log.Debug("Suggestion: Setting accept threads to {pc}", Environment.ProcessorCount);
}
- Config = config;
+ _config = config;
- //Cache pipe options
- PipeOptions = pipeOptions ?? new(
+ //Assign default pipe options
+ pipeOptions ??= new(
config.BufferPool,
- readerScheduler:PipeScheduler.ThreadPool,
- writerScheduler:PipeScheduler.ThreadPool,
- pauseWriterThreshold: config.MaxRecvBufferData,
+ readerScheduler: PipeScheduler.ThreadPool,
+ writerScheduler: PipeScheduler.ThreadPool,
+ pauseWriterThreshold: config.MaxRecvBufferData,
minimumSegmentSize: 8192,
- useSynchronizationContext:false
- );
+ useSynchronizationContext: false
+ );
+
+ //Arguments constructor
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ AwaitableAsyncServerSocket ArgsConstructor() => new(pipeOptions);
- SockAsyncArgPool = ObjectRental.CreateReusable(ArgsConstructor, Config.CacheQuota);
+ SockAsyncArgPool = ObjectRental.CreateReusable(ArgsConstructor, config.CacheQuota);
+
+ //Init waiting socket queue, always multi-threaded
+ WaitingSockets = new(false, false);
}
///<inheritdoc/>
@@ -107,192 +119,151 @@ namespace VNLib.Net.Transport.Tcp
///<inheritdoc/>
public void CacheHardClear() => SockAsyncArgPool.CacheHardClear();
-
- private AsyncQueue<VnSocketAsyncArgs>? WaitingSockets;
- private Socket? ServerSock;
- private bool _canceledFlag;
/// <summary>
/// Begins listening for incoming TCP connections on the configured socket
/// </summary>
/// <param name="token">A token that is used to abort listening operations and close the socket</param>
+ /// <returns>A task that resolves when all accept threads have exited. The task does not need to be observed</returns>
/// <exception cref="SocketException"></exception>
/// <exception cref="SecurityException"></exception>
/// <exception cref="ArgumentException"></exception>
/// <exception cref="InvalidOperationException"></exception>
- public void Start(CancellationToken token)
+ public Task Start(CancellationToken token)
{
- //If the socket is still listening
- if (ServerSock != null)
- {
- throw new InvalidOperationException("The server thread is currently listening and cannot be re-started");
- }
+ Socket serverSock;
//make sure the token isnt already canceled
if (token.IsCancellationRequested)
{
throw new ArgumentException("Token is already canceled", nameof(token));
}
-
+
//Configure socket on the current thread so exceptions will be raised to the caller
- ServerSock = new(Config.LocalEndPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
+ serverSock = new(_config.LocalEndPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
//Bind socket
- ServerSock.Bind(Config.LocalEndPoint);
+ serverSock.Bind(_config.LocalEndPoint);
//Begin listening
- ServerSock.Listen(Config.BackLog);
+ serverSock.Listen(_config.BackLog);
//See if keepalive should be used
- if (Config.TcpKeepalive)
- {
+ if (_config.TcpKeepalive)
+ {
//Setup socket keepalive from config
- ServerSock.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive, true);
- ServerSock.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.TcpKeepAliveInterval, Config.KeepaliveInterval);
- ServerSock.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.TcpKeepAliveTime, Config.TcpKeepAliveTime);
+ serverSock.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive, true);
+ serverSock.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.TcpKeepAliveInterval, _config.KeepaliveInterval);
+ serverSock.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.TcpKeepAliveTime, _config.TcpKeepAliveTime);
}
//Invoke socket created callback
- Config.OnSocketCreated?.Invoke(ServerSock);
-
- //Init waiting socket queue
- WaitingSockets = new(false, false);
+ _config.OnSocketCreated?.Invoke(serverSock);
//Clear canceled flag
- _canceledFlag = false;
+ StrongBox<bool> canceledFlag = new(false);
+
+ Task[] acceptWorkers = new Task[_config.AcceptThreads];
//Start listening for connections
- for (int i = 0; i < Config.AcceptThreads; i++)
+ for (int i = 0; i < _config.AcceptThreads; i++)
{
- AcceptConnection();
+ acceptWorkers[i] = Task.Run(() => ExecAcceptAsync(serverSock, canceledFlag), token);
}
+ CancellationTokenRegistration reg = default;
+
//Cleanup callback
- static void cleanup(object? state)
+ void cleanup()
{
- TcpServer server = (TcpServer)state!;
-
//Set canceled flag
- server._canceledFlag = true;
-
+ canceledFlag.Value = true;
+
//Clean up socket
- server.ServerSock!.Dispose();
- server.ServerSock = null;
+ serverSock.Dispose();
- server.SockAsyncArgPool.CacheHardClear();
+ //Cleanup pool
+ SockAsyncArgPool.CacheHardClear();
//Dispose any queued sockets
- while (server.WaitingSockets!.TryDequeue(out VnSocketAsyncArgs? args))
+ while (WaitingSockets!.TryDequeue(out ITcpConnectionDescriptor? args))
{
- args.Dispose();
+ (args as IDisposable)!.Dispose();
}
+
+ reg.Dispose();
}
-
+
//Register cleanup
- _ = token.Register(cleanup, this, false);
- }
-
+ reg = token.Register(cleanup, false);
- [MethodImpl(MethodImplOptions.AggressiveInlining)]
- private VnSocketAsyncArgs ArgsConstructor()
- {
- //Socket args accept callback functions for this
- VnSocketAsyncArgs args = new(this, PipeOptions);
- return args;
+ return Task.WhenAll(acceptWorkers);
}
- [MethodImpl(MethodImplOptions.AggressiveInlining)]
- void ISockAsyncArgsHandler.OnSocketAccepted(VnSocketAsyncArgs args) => AcceptCompleted(args);
-
- [MethodImpl(MethodImplOptions.AggressiveInlining)]
- void ISockAsyncArgsHandler.OnSocketDisconnected(VnSocketAsyncArgs args)
+ private async Task ExecAcceptAsync(Socket serverSock, StrongBox<bool> canceled)
{
- //If the is closed, dispose the args and exit
- if (_canceledFlag)
- {
- args.Dispose();
- }
- else
- {
- SockAsyncArgPool.Return(args);
- }
- }
+ Debug.Assert(serverSock != null, "Expected not-null server socket value");
+ Debug.Assert(canceled != null && !canceled.Value, "Expected a valid canceled flag instance");
- [MethodImpl(MethodImplOptions.AggressiveInlining)]
- private void AcceptConnection()
- {
- //Make sure cancellation isnt pending
- if (_canceledFlag)
- {
- return;
- }
+ //Cache buffer sizes
+ int recBufferSize = serverSock.ReceiveBufferSize;
+ int sendBufferSize = serverSock.SendBufferSize;
- //Rent new args
- VnSocketAsyncArgs acceptArgs = SockAsyncArgPool!.Rent();
+ //Cache local endpoint for multi-server logging
+ EndPoint localEndpoint = serverSock.LocalEndPoint!;
- //Accept another socket
- if (!acceptArgs.BeginAccept(ServerSock!))
- {
- //Completed synchronously
- AcceptCompleted(acceptArgs);
- }
- //Completed async
- }
+ Debug.Assert(localEndpoint != null, "Expected a socket bound to a local endpoint");
- [MethodImpl(MethodImplOptions.AggressiveInlining)]
- private void AcceptCompleted(VnSocketAsyncArgs args)
- {
- //Examine last op for aborted error, if aborted, then the listening socket has exited
- if (args.SocketError == SocketError.OperationAborted)
+ try
{
- //Dispose args since server is exiting
- args.Dispose();
- return;
- }
-
- //Check for error and log it
- if (!args.EndAccept())
- {
- args.Disconnect();
- Config.Log.Debug("Socket accept failed with error code {ec}", args.SocketError);
- return;
+ while (!canceled.Value)
+ {
+ //Rent new args
+ AwaitableAsyncServerSocket acceptArgs = SockAsyncArgPool.Rent();
+
+ //Accept new connection
+ SocketError err = await acceptArgs.AcceptAsync(serverSock, recBufferSize, sendBufferSize);
+
+ //Check canceled flag before proceeding
+ if (canceled.Value)
+ {
+ //dispose and bail
+ acceptArgs.Dispose();
+ _config.Log.Verbose("Accept thread aborted for {socket}", localEndpoint);
+ }
+ else if (err == SocketError.Success)
+ {
+ // Add to waiting queue
+ if (!WaitingSockets!.TryEnque(acceptArgs))
+ {
+ _ = await acceptArgs.CloseConnectionAsync();
+
+ /*
+ * Writing to log will likely compound resource exhaustion, but the user must be informed
+ * connections are being dropped.
+ */
+ _config.Log.Warn("Socket {e} disconnected because the waiting queue is overflowing", acceptArgs.GetHashCode());
+
+ //Re-eqnue
+ SockAsyncArgPool.Return(acceptArgs);
+ }
+
+ //Success
+ PrintConnectionInfo(acceptArgs, SocketAsyncOperation.Accept);
+ }
+ else
+ {
+ //Error
+ _config.Log.Debug("Socket accept failed with error code {ec}", err);
+ //Return args to pool
+ SockAsyncArgPool.Return(acceptArgs);
+ }
+ }
}
-
- //Try to enqueue the args to the waiting queue, if the queue is full, disconnect the socket
- if (!WaitingSockets!.TryEnque(args))
+ catch(Exception ex)
{
- args.Disconnect();
- Config.Log.Warn("Socket {e} disconnected because the waiting queue is overflowing", args.GetHashCode());
- return;
+ _config.Log.Fatal(ex, "Accept thread failed with exception");
}
-
- //Accept a new connection
- AcceptConnection();
}
-
- /// <summary>
- /// Retreives a connected socket from the waiting queue
- /// </summary>
- /// <returns>The context of the connect</returns>
- /// <exception cref="InvalidOperationException"></exception>
- public ValueTask<TransportEventContext> AcceptAsync(CancellationToken cancellation)
- {
- _ = WaitingSockets ?? throw new InvalidOperationException("Server is not listening");
-
- //Try get args from queue
- if(WaitingSockets.TryDequeue(out VnSocketAsyncArgs? args))
- {
- return ValueTask.FromResult<TransportEventContext>(new(args, args.Stream));
- }
- return AcceptAsyncCore(this, cancellation);
-
- static async ValueTask<TransportEventContext> AcceptAsyncCore(TcpServer server, CancellationToken cancellation)
- {
- //Await async
- VnSocketAsyncArgs args = await server.WaitingSockets!.DequeueAsync(cancellation);
-
- return new(args, args.Stream);
- }
- }
/// <summary>
/// Accepts a connection and returns the connection descriptor.
@@ -300,7 +271,7 @@ namespace VNLib.Net.Transport.Tcp
/// <param name="cancellation">A token to cancel the operation</param>
/// <returns>The connection descriptor</returns>
/// <remarks>
- /// NOTE: You must always call the <see cref="ITcpConnectionDescriptor.CloseConnection"/> and
+ /// NOTE: You must always call the <see cref="CloseConnectionAsync"/> and
/// destroy all references to it when you are done. You must also dispose the stream returned
/// from the <see cref="ITcpConnectionDescriptor.GetStream"/> method.
/// </remarks>
@@ -310,17 +281,65 @@ namespace VNLib.Net.Transport.Tcp
_ = WaitingSockets ?? throw new InvalidOperationException("Server is not listening");
//Try get args from queue
- if (WaitingSockets.TryDequeue(out VnSocketAsyncArgs? args))
+ if (WaitingSockets.TryDequeue(out ITcpConnectionDescriptor? args))
+ {
+ return ValueTask.FromResult(args);
+ }
+
+ return WaitingSockets!.DequeueAsync(cancellation);
+ }
+
+ /// <summary>
+ /// Cleanly closes an existing TCP connection obtained from <see cref="AcceptConnectionAsync(CancellationToken)"/>
+ /// and returns the instance to the pool for reuse.
+ /// <para>
+ /// You should destroy all references to the
+ /// connection descriptor and dispose the stream returned from <see cref="ITcpConnectionDescriptor.GetStream"/>
+ /// </para>
+ /// </summary>
+ /// <param name="descriptor">The existing descriptor to close</param>
+ /// <returns>A task that represents the closing operations</returns>
+ /// <exception cref="ArgumentNullException"></exception>
+ public async ValueTask CloseConnectionAsync(ITcpConnectionDescriptor descriptor)
+ {
+ ArgumentNullException.ThrowIfNull(descriptor, nameof(descriptor));
+
+ //Recover args
+ AwaitableAsyncServerSocket args = (AwaitableAsyncServerSocket)descriptor;
+
+ PrintConnectionInfo(args, SocketAsyncOperation.Disconnect);
+
+ //Close the socket and cleanup resources
+ SocketError err = await args.CloseConnectionAsync();
+
+ if (err == SocketError.Success)
{
- return ValueTask.FromResult<ITcpConnectionDescriptor>(args);
+ //Return to pool
+ SockAsyncArgPool.Return(args);
+ }
+ else
+ {
+ args.Dispose();
+ _config.Log.Verbose("Socket disconnected failed with error code {ec}. Resources disposed", err);
+ }
+ }
+
+
+ [Conditional("DEBUG")]
+ private void PrintConnectionInfo(ITcpConnectionDescriptor con, SocketAsyncOperation operation)
+ {
+ if (!_config.DebugTcpLog)
+ {
+ return;
}
- return AcceptConnectionAsyncCore(this, cancellation);
+ con.GetEndpoints(out IPEndPoint local, out IPEndPoint remote);
- static async ValueTask<ITcpConnectionDescriptor> AcceptConnectionAsyncCore(TcpServer server, CancellationToken cancellation)
+ switch (operation)
{
- //Await async
- return await server.WaitingSockets!.DequeueAsync(cancellation);
+ default:
+ _config.Log.Verbose("Socket {operation} on {local} -> {remote}", operation, local, remote);
+ break;
}
}
}
diff --git a/lib/Net.Transport.SimpleTCP/src/TcpServerExtensions.cs b/lib/Net.Transport.SimpleTCP/src/TcpServerExtensions.cs
deleted file mode 100644
index 3b86b5b..0000000
--- a/lib/Net.Transport.SimpleTCP/src/TcpServerExtensions.cs
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
-* Copyright (c) 2023 Vaughn Nugent
-*
-* Library: VNLib
-* Package: VNLib.Net.Transport.SimpleTCP
-* File: TcpServerExtensions.cs
-*
-* TcpServerExtensions.cs is part of VNLib.Net.Transport.SimpleTCP which is
-* part of the larger VNLib collection of libraries and utilities.
-*
-* VNLib.Net.Transport.SimpleTCP is free software: you can redistribute it and/or modify
-* it under the terms of the GNU Affero General Public License as
-* published by the Free Software Foundation, either version 2 of the
-* License, or (at your option) any later version.
-*
-* VNLib.Net.Transport.SimpleTCP is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-* GNU Affero General Public License for more details.
-*
-* You should have received a copy of the GNU Affero General Public License
-* along with this program. If not, see https://www.gnu.org/licenses/.
-*/
-
-using System;
-using System.Threading;
-using System.Net.Security;
-using System.Threading.Tasks;
-using System.Security.Authentication;
-using System.Runtime.CompilerServices;
-
-namespace VNLib.Net.Transport.Tcp
-{
- /// <summary>
- /// Exposes extension methods for <see cref="TcpServer"/>
- /// </summary>
- public static class TcpServerExtensions
- {
-
- /// <summary>
- /// Accepts a new ssl connection and attemts to authenticate it as a server
- /// </summary>
- /// <param name="server"></param>
- /// <param name="options">The ssl server authentication options used to initalize the connection</param>
- /// <param name="cancellation">A token to cancel the async accept operation</param>
- /// <returns>A <see cref="ValueTask"/> that resolve the <see cref="TransportEventContext"/> around the connection</returns>
- /// <exception cref="AuthenticationException"></exception>
- public static async Task<TransportEventContext> AcceptSslAsync(this TcpServer server, SslServerAuthenticationOptions options, CancellationToken cancellation = default)
- {
- //accept internal args
- ITcpConnectionDescriptor args = await server.AcceptConnectionAsync(cancellation);
-
- //Begin authenication and make sure the socket stream is closed as its required to cleanup
- SslStream stream = new(args.GetStream(), false);
- try
- {
- //auth the new connection
- await stream.AuthenticateAsServerAsync(options, cancellation);
- return new(args, stream);
- }
- catch (Exception ex)
- {
- //Cleanup the socket when auth fails
- await stream.DisposeAsync();
-
- //Disconnect socket
- args.CloseConnection();
-
- throw new AuthenticationException("Failed client/server TLS authentication", ex);
- }
- }
-
- /// <summary>
- /// Safely closes an ssl connection
- /// </summary>
- /// <param name="ctx">The context to close the connection on</param>
- /// <returns>A value task that completes when the connection is closed</returns>
- public static async ValueTask CloseSslConnectionAsync(this TransportEventContext ctx)
- {
- try
- {
- //Close the ssl stream
- await (ctx.ConnectionStream as SslStream)!.ShutdownAsync();
- }
- finally
- {
- //Always close the connection
- await ctx.CloseConnectionAsync();
- }
- }
-
- /// <summary>
- /// Gets the SslProtocol for an ssl connection
- /// </summary>
- /// <param name="ctx">The <see cref="TransportEventContext"/> that contains the ssl connection stream</param>
- /// <returns>The current <see cref="SslProtocols"/> of the connection</returns>
- [MethodImpl(MethodImplOptions.AggressiveInlining)]
- public static SslProtocols GetSslProtocol(this in TransportEventContext ctx)
- {
- return (ctx.ConnectionStream as SslStream)!.SslProtocol;
- }
-
- }
-} \ No newline at end of file
diff --git a/lib/Net.Transport.SimpleTCP/src/TransportEventContext.cs b/lib/Net.Transport.SimpleTCP/src/TransportEventContext.cs
deleted file mode 100644
index ec686ca..0000000
--- a/lib/Net.Transport.SimpleTCP/src/TransportEventContext.cs
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
-* Copyright (c) 2023 Vaughn Nugent
-*
-* Library: VNLib
-* Package: VNLib.Net.Transport.SimpleTCP
-* File: TransportEventContext.cs
-*
-* TransportEventContext.cs is part of VNLib.Net.Transport.SimpleTCP which is part of the larger
-* VNLib collection of libraries and utilities.
-*
-* VNLib.Net.Transport.SimpleTCP is free software: you can redistribute it and/or modify
-* it under the terms of the GNU Affero General Public License as
-* published by the Free Software Foundation, either version 2 of the
-* License, or (at your option) any later version.
-*
-* VNLib.Net.Transport.SimpleTCP is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-* GNU Affero General Public License for more details.
-*
-* You should have received a copy of the GNU Affero General Public License
-* along with this program. If not, see https://www.gnu.org/licenses/.
-*/
-
-using System;
-using System.IO;
-using System.Net;
-using System.Threading.Tasks;
-
-namespace VNLib.Net.Transport.Tcp
-{
-
- /// <summary>
- /// Represents the context of a transport connection. It includes the active socket
- /// and a stream representing the active transport.
- /// </summary>
- public readonly record struct TransportEventContext
- {
- private readonly ITcpConnectionDescriptor _descriptor;
-
- /// <summary>
- /// A copy of the local endpoint of the listening socket
- /// </summary>
- public readonly IPEndPoint LocalEndPoint;
-
- /// <summary>
- /// The <see cref="IPEndPoint"/> representing the client's connection information
- /// </summary>
- public readonly IPEndPoint RemoteEndpoint;
-
- /// <summary>
- /// The transport stream that wraps the connection
- /// </summary>
- public readonly Stream ConnectionStream;
-
-
- /// <summary>
- /// Creates a new <see cref="TransportEventContext"/> wrapper for the given connection descriptor
- /// and captures the default stream from the descriptor.
- /// </summary>
- /// <param name="descriptor">The connection to wrap</param>
- public TransportEventContext(ITcpConnectionDescriptor descriptor):this(descriptor, descriptor.GetStream())
- { }
-
- /// <summary>
- /// Creates a new <see cref="TransportEventContext"/> wrapper for the given connection descriptor
- /// and your custom stream implementation.
- /// </summary>
- /// <param name="descriptor">The connection descriptor to wrap</param>
- /// <param name="customStream">Your custom stream wrapper around the transport stream</param>
- public TransportEventContext(ITcpConnectionDescriptor descriptor, Stream customStream)
- {
- _descriptor = descriptor;
- ConnectionStream = customStream;
-
- //Call once and store locally
- LocalEndPoint = (descriptor.Socket.LocalEndPoint as IPEndPoint)!;
- RemoteEndpoint = (descriptor.Socket.RemoteEndPoint as IPEndPoint)!;
- }
-
- /// <summary>
- /// Cleans up the stream and closes the connection descriptor
- /// </summary>
- /// <returns>A value-task that completes when the resources have been cleaned up</returns>
- public readonly async ValueTask CloseConnectionAsync()
- {
- try
- {
- //dispose the stream and wait for buffered data to be sent
- await ConnectionStream.DisposeAsync();
- }
- finally
- {
- //Disconnect
- _descriptor.CloseConnection();
- }
- }
- }
-} \ No newline at end of file
diff --git a/lib/Net.Transport.SimpleTCP/src/VnSocketAsyncArgs.cs b/lib/Net.Transport.SimpleTCP/src/VnSocketAsyncArgs.cs
deleted file mode 100644
index 0481f21..0000000
--- a/lib/Net.Transport.SimpleTCP/src/VnSocketAsyncArgs.cs
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
-* Copyright (c) 2023 Vaughn Nugent
-*
-* Library: VNLib
-* Package: VNLib.Net.Transport.SimpleTCP
-* File: VnSocketAsyncArgs.cs
-*
-* VnSocketAsyncArgs.cs is part of VNLib.Net.Transport.SimpleTCP which is part of the larger
-* VNLib collection of libraries and utilities.
-*
-* VNLib.Net.Transport.SimpleTCP is free software: you can redistribute it and/or modify
-* it under the terms of the GNU Affero General Public License as
-* published by the Free Software Foundation, either version 2 of the
-* License, or (at your option) any later version.
-*
-* VNLib.Net.Transport.SimpleTCP is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-* GNU Affero General Public License for more details.
-*
-* You should have received a copy of the GNU Affero General Public License
-* along with this program. If not, see https://www.gnu.org/licenses/.
-*/
-
-using System;
-using System.IO;
-using System.Net.Sockets;
-using System.IO.Pipelines;
-
-using VNLib.Utils.Memory.Caching;
-
-namespace VNLib.Net.Transport.Tcp
-{
-
- /// <summary>
- /// Reusable <see cref="SocketAsyncEventArgs"/> that manages a pipeline for sending and recieving data.
- /// on the connected socket
- /// </summary>
- internal sealed class VnSocketAsyncArgs : SocketAsyncEventArgs, ITcpConnectionDescriptor, IReusable
- {
- private readonly ISockAsyncArgsHandler _handler;
-
- public readonly SocketPipeLineWorker SocketWorker;
-
- public Stream Stream => SocketWorker.NetworkStream;
-
-
- public VnSocketAsyncArgs(ISockAsyncArgsHandler handler, PipeOptions options) : base()
- {
- SocketWorker = new(options);
- _handler = handler;
- //Only reuse socketes if windows
- DisconnectReuseSocket = OperatingSystem.IsWindows();
- }
-
- /// <summary>
- /// Begins an asynchronous accept operation on the current (bound) socket
- /// </summary>
- /// <param name="sock">The server socket to accept the connection</param>
- /// <returns>True if the IO operation is pending</returns>
- public bool BeginAccept(Socket sock)
- {
- //Store the semaphore in the user token event args
- SocketError = SocketError.Success;
- SocketFlags = SocketFlags.None;
-
- //Recv during accept is only supported on windows, this flag is true when on windows
- if (DisconnectReuseSocket)
- {
- //get buffer from the pipe to write initial accept data to
- Memory<byte> buffer = SocketWorker.GetMemory(sock.ReceiveBufferSize);
- SetBuffer(buffer);
- }
-
- //accept async
- return sock.AcceptAsync(this);
- }
-
- /// <summary>
- /// Determines if an asynchronous accept operation has completed successsfully
- /// and the socket is connected.
- /// </summary>
- /// <returns>True if the accept was successful, and the accepted socket is connected, false otherwise</returns>
- public bool EndAccept()
- {
- if(SocketError == SocketError.Success)
- {
- //remove ref to buffer
- SetBuffer(null);
- //start the socket worker
- SocketWorker.Start(AcceptSocket!, BytesTransferred);
- return true;
- }
- return false;
- }
-
- /// <summary>
- /// Begins an async disconnect operation on a currentl connected socket
- /// </summary>
- /// <returns>True if the operation is pending</returns>
- public void Disconnect()
- {
- //Clear flags
- SocketError = SocketError.Success;
- //accept async
- if (!AcceptSocket!.DisconnectAsync(this))
- {
- //Invoke disconnected callback since op completed sync
- EndDisconnect();
- //Invoke disconnected callback since op completed sync
- _handler.OnSocketDisconnected(this);
- }
- }
-
- private void EndDisconnect()
- {
- //If the disconnection operation failed, do not reuse the socket on next accept
- if (SocketError != SocketError.Success)
- {
- //Dispose the socket before clearing the socket
- AcceptSocket?.Dispose();
- AcceptSocket = null;
- }
- }
-
- protected override void OnCompleted(SocketAsyncEventArgs e)
- {
- switch (e.LastOperation)
- {
- case SocketAsyncOperation.Accept:
- //Invoke the accepted callback
- _handler.OnSocketAccepted(this);
- break;
- case SocketAsyncOperation.Disconnect:
- EndDisconnect();
- //Invoke disconnected callback since op completed sync
- _handler.OnSocketDisconnected(this);
- break;
- default:
- throw new InvalidOperationException("Invalid socket operation");
- }
- //Clear flags/errors on completion
- SocketError = SocketError.Success;
- SocketFlags = SocketFlags.None;
- }
-
- ///<inheritdoc/>
- Socket ITcpConnectionDescriptor.Socket => AcceptSocket!;
-
- ///<inheritdoc/>
- Stream ITcpConnectionDescriptor.GetStream() => SocketWorker.NetworkStream;
-
- ///<inheritdoc/>
- void ITcpConnectionDescriptor.CloseConnection() => Disconnect();
-
-
- void IReusable.Prepare()
- {
- SocketWorker.Prepare();
- }
-
- bool IReusable.Release()
- {
- UserToken = null;
- SocketWorker.Release();
-
- //if the sockeet is connected (or not windows), dispose it and clear the accept socket
- if (AcceptSocket?.Connected == true || !DisconnectReuseSocket)
- {
- AcceptSocket?.Dispose();
- AcceptSocket = null;
- }
- return true;
- }
-
- public new void Dispose()
- {
- //Dispose the base class
- base.Dispose();
-
- //Dispose the socket if its set
- AcceptSocket?.Dispose();
- AcceptSocket = null;
-
- //Cleanup socket worker
- SocketWorker.DisposeInternal();
- }
-
- }
-}
diff --git a/lib/Plugins.Essentials.ServiceStack/src/Construction/HttpServiceStackBuilder.cs b/lib/Plugins.Essentials.ServiceStack/src/Construction/HttpServiceStackBuilder.cs
index 91184bd..f100c1d 100644
--- a/lib/Plugins.Essentials.ServiceStack/src/Construction/HttpServiceStackBuilder.cs
+++ b/lib/Plugins.Essentials.ServiceStack/src/Construction/HttpServiceStackBuilder.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2023 Vaughn Nugent
+* Copyright (c) 2024 Vaughn Nugent
*
* Library: VNLib
* Package: VNLib.Plugins.Essentials.ServiceStack
@@ -51,6 +51,7 @@ namespace VNLib.Plugins.Essentials.ServiceStack.Construction
private Func<ServiceGroup, IHttpServer>? _getServers;
private Func<IPluginStack>? _getPlugins;
private IManualPlugin[]? manualPlugins;
+ private bool loadConcurrently;
/// <summary>
/// Uses the supplied callback to get a collection of virtual hosts
@@ -121,6 +122,17 @@ namespace VNLib.Plugins.Essentials.ServiceStack.Construction
}
/// <summary>
+ /// Sets the load concurrency flag for the plugin stack
+ /// </summary>
+ /// <param name="value">True to enable concurrent loading, false for serial loading</param>
+ /// <returns>The current instance for chaining</returns>
+ public HttpServiceStackBuilder LoadPluginsConcurrently(bool value)
+ {
+ loadConcurrently = value;
+ return this;
+ }
+
+ /// <summary>
/// Builds the new <see cref="HttpServiceStack"/> from the configured callbacks
/// </summary>
/// <returns>The newly constructed <see cref="HttpServiceStack"/> that may be used to manage your http services</returns>
@@ -165,7 +177,7 @@ namespace VNLib.Plugins.Essentials.ServiceStack.Construction
plugins ??= new EmptyPluginStack();
#pragma warning restore CA2000 // Dispose objects before losing scope
- return new (domain.GetListener(), plugins, manualPlugins);
+ return new (domain.GetListener(), plugins, manualPlugins, loadConcurrently);
}
/*
diff --git a/lib/Plugins.Essentials.ServiceStack/src/PluginStackInitializer.cs b/lib/Plugins.Essentials.ServiceStack/src/PluginStackInitializer.cs
index 8a4e801..c2ff1e4 100644
--- a/lib/Plugins.Essentials.ServiceStack/src/PluginStackInitializer.cs
+++ b/lib/Plugins.Essentials.ServiceStack/src/PluginStackInitializer.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2023 Vaughn Nugent
+* Copyright (c) 2024 Vaughn Nugent
*
* Library: VNLib
* Package: VNLib.Plugins.Essentials.ServiceStack
@@ -40,7 +40,8 @@ using VNLib.Plugins.Attributes;
namespace VNLib.Plugins.Essentials.ServiceStack
{
- internal sealed record class PluginStackInitializer(PluginLoadEventListener Listener, IPluginStack Stack, IManualPlugin[] ManualPlugins) : IPluginInitializer
+ internal sealed record class PluginStackInitializer(PluginLoadEventListener Listener, IPluginStack Stack, IManualPlugin[] ManualPlugins, bool ConcurrentLoad)
+ : IPluginInitializer
{
private readonly LinkedList<IManagedPlugin> _managedPlugins = new();
private readonly LinkedList<ManualPluginWrapper> _manualPlugins = new();
@@ -88,8 +89,20 @@ namespace VNLib.Plugins.Essentials.ServiceStack
}
}
- //Load stage, load only initialized plugins
- Parallel.ForEach(_loadedPlugins, wp => LoadPlugin(wp, debugLog));
+ /*
+ * Load stage, load only initialized plugins.
+ *
+ * Optionally single-threaded or parallel
+ */
+
+ if (ConcurrentLoad)
+ {
+ Parallel.ForEach(_loadedPlugins, wp => LoadPlugin(wp, debugLog));
+ }
+ else
+ {
+ _loadedPlugins.TryForeach(_loadedPlugins => LoadPlugin(_loadedPlugins, debugLog));
+ }
return _loadedPlugins.ToArray();
}
diff --git a/lib/Plugins.Runtime/src/LoaderExtensions.cs b/lib/Plugins.Runtime/src/LoaderExtensions.cs
index 2c3caf5..d167488 100644
--- a/lib/Plugins.Runtime/src/LoaderExtensions.cs
+++ b/lib/Plugins.Runtime/src/LoaderExtensions.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2023 Vaughn Nugent
+* Copyright (c) 2024 Vaughn Nugent
*
* Library: VNLib
* Package: VNLib.Plugins.Runtime
@@ -266,10 +266,63 @@ namespace VNLib.Plugins.Runtime
/// <param name="builder"></param>
/// <param name="hostConfig">An optional configuration element to pass to the plugin's host config element</param>
/// <returns>The current builder instance for chaining</returns>
- public static PluginStackBuilder WithLocalJsonConfig(this PluginStackBuilder builder, in JsonElement? hostConfig)
+ public static PluginStackBuilder WithLocalJsonConfig(
+ this PluginStackBuilder builder,
+ in JsonElement? hostConfig
+ )
+ => WithJsonConfig(builder, in hostConfig, null);
+
+ /// <summary>
+ /// Configures the plugin stack to retrieve plugin-local json configuration files
+ /// from the same directory as the plugin assembly file.
+ /// </summary>
+ /// <param name="builder"></param>
+ /// <param name="configDir">The directory containing all configuration files for the stack</param>
+ /// <param name="hostConfig">An optional configuration element to pass to the plugin's host config element</param>
+ /// <returns>The current builder instance for chaining</returns>
+ public static PluginStackBuilder WithJsonConfigDir(
+ this PluginStackBuilder builder,
+ in JsonElement? hostConfig,
+ DirectoryInfo configDir
+ )
+ {
+ /*
+ * Local function forces config files to be located in the
+ * specified directory.
+ */
+ string AltDirConfigFileFinder(IPluginAssemblyLoadConfig asmConfig)
+ {
+ //Get the plugin config file name
+ string configFileName = Path.ChangeExtension(asmConfig.AssemblyFile, ".json");
+ configFileName = Path.GetFileName(configFileName);
+
+ //Search for the file within the config directory
+ return Path.Combine(configDir.FullName, configFileName);
+ }
+
+ //Use the alternate directory finder
+ return WithJsonConfig(builder, in hostConfig, AltDirConfigFileFinder);
+ }
+
+ /// <summary>
+ /// Configures the plugin stack to retrieve a json configuration file from the specified callbac function,
+ /// or local to the assembly if the callback is null.
+ /// </summary>
+ /// <param name="builder"></param>
+ /// <param name="getPluginJsonFile">An optional callback function that finds the plugin json config file from its assembly path</param>
+ /// <param name="hostConfig">An optional configuration element to pass to the plugin's host config element</param>
+ /// <returns>The current builder instance for chaining</returns>
+ public static PluginStackBuilder WithJsonConfig(
+ this PluginStackBuilder builder,
+ in JsonElement? hostConfig,
+ Func<IPluginAssemblyLoadConfig, string>? getPluginJsonFile
+ )
{
ArgumentNullException.ThrowIfNull(builder, nameof(builder));
+ //Set default callback if not specified
+ getPluginJsonFile ??= GetDefaultFileNameCallback;
+
LocalFilePluginConfigReader reader;
//Host config is optional
@@ -283,17 +336,26 @@ namespace VNLib.Plugins.Runtime
}
//Create a reader from the binary
- reader = new LocalFilePluginConfigReader(ms.ToArray());
+ reader = new LocalFilePluginConfigReader(ms.ToArray(), getPluginJsonFile);
}
else
{
//Empty json
byte[] emptyJson = Encoding.UTF8.GetBytes("{}");
- reader = new LocalFilePluginConfigReader(emptyJson);
+ reader = new LocalFilePluginConfigReader(emptyJson, getPluginJsonFile);
}
//Store binary
return builder.WithConfigurationReader(reader);
+
+ static string GetDefaultFileNameCallback(IPluginAssemblyLoadConfig asmConfig)
+ {
+ /*
+ * Just changing the asm file extention means the the json file
+ * will be in the same directory as the plugin assembly file
+ */
+ return Path.ChangeExtension(asmConfig.AssemblyFile, ".json");
+ }
}
/// <summary>
@@ -381,7 +443,8 @@ namespace VNLib.Plugins.Runtime
* The json file is local for the specific plugin and is not shared between plugins. The host
* configuration is also required
*/
- private sealed record class LocalFilePluginConfigReader(ReadOnlyMemory<byte> HostJson) : IPluginConfigReader
+ private sealed record class LocalFilePluginConfigReader(ReadOnlyMemory<byte> HostJson, Func<IPluginAssemblyLoadConfig, string> GetConfigFilePathCallback)
+ : IPluginConfigReader
{
public void ReadPluginConfigData(IPluginAssemblyLoadConfig asmConfig, Stream configData)
{
@@ -392,8 +455,8 @@ namespace VNLib.Plugins.Runtime
CommentHandling = JsonCommentHandling.Skip,
};
- //Config file is the same name as the assembly but with a json extension
- string pluginConfigFile = Path.ChangeExtension(asmConfig.AssemblyFile, ".json");
+ //Get the plugin config file name
+ string pluginConfigFile = GetConfigFilePathCallback(asmConfig);
using JsonDocument hConfig = JsonDocument.Parse(HostJson, jdo);
diff --git a/lib/Utils.Cryptography/argon2/CMakeLists.txt b/lib/Utils.Cryptography/argon2/CMakeLists.txt
index a153c7b..5a75786 100644
--- a/lib/Utils.Cryptography/argon2/CMakeLists.txt
+++ b/lib/Utils.Cryptography/argon2/CMakeLists.txt
@@ -26,7 +26,7 @@ add_library(${CMAKE_PROJECT_NAME}_static STATIC ${ARGON_SRCS} ${HEADERS})
#if on unix lib will be appended, so we can adjust
if(UNIX)
- set_target_properties(${CMAKE_PROJECT_NAME} ${CMAKE_PROJECT_NAME}_static PROPERTIES OUTPUT_NAME _argon2)
+ set_target_properties(${CMAKE_PROJECT_NAME} ${CMAKE_PROJECT_NAME}_static PROPERTIES OUTPUT_NAME argon2)
endif()
#Setup the compiler options
diff --git a/lib/Utils.Cryptography/monocypher/Taskfile.yaml b/lib/Utils.Cryptography/monocypher/Taskfile.yaml
index 8619d2c..de4278f 100644
--- a/lib/Utils.Cryptography/monocypher/Taskfile.yaml
+++ b/lib/Utils.Cryptography/monocypher/Taskfile.yaml
@@ -96,7 +96,7 @@ tasks:
dir: '{{.USER_WORKING_DIR}}'
cmds:
#pack monocypher source code and create the archive
- - powershell -Command "tar --exclude build/* --exclude bin/* --exclude .git/* -cvf 'bin/src.tgz' ."
+ - powershell -Command "tar --exclude build/* --exclude bin/* --exclude .git/* -czvf 'bin/src.tgz' ."
clean:
ignore_error: true
diff --git a/lib/Utils/src/Extensions/MemoryExtensions.cs b/lib/Utils/src/Extensions/MemoryExtensions.cs
index 58d3d23..28969ea 100644
--- a/lib/Utils/src/Extensions/MemoryExtensions.cs
+++ b/lib/Utils/src/Extensions/MemoryExtensions.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2023 Vaughn Nugent
+* Copyright (c) 2024 Vaughn Nugent
*
* Library: VNLib
* Package: VNLib.Utils
@@ -823,8 +823,12 @@ namespace VNLib.Utils.Extensions
public static Span<T> AsSpan<T>(this IMemoryHandle<T> handle, nint start)
{
_ = handle ?? throw new ArgumentNullException(nameof(handle));
+ if(start < 0 || (uint)start > handle.Length)
+ {
+ throw new ArgumentOutOfRangeException(nameof(start));
+ }
//calculate a remaining count
- int count = (int)(handle.Length - (nuint)start);
+ int count = checked((int)(handle.Length - (uint)start));
//call the other overload
return AsSpan(handle, start, count);
}
diff --git a/lib/Utils/src/IIndexable.cs b/lib/Utils/src/IIndexable.cs
index 129d703..d4481a0 100644
--- a/lib/Utils/src/IIndexable.cs
+++ b/lib/Utils/src/IIndexable.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2022 Vaughn Nugent
+* Copyright (c) 2024 Vaughn Nugent
*
* Library: VNLib
* Package: VNLib.Utils
@@ -22,8 +22,6 @@
* along with VNLib.Utils. If not, see http://www.gnu.org/licenses/.
*/
-using System;
-
namespace VNLib.Utils
{
/// <summary>
diff --git a/lib/Utils/src/Memory/Caching/ObjectRental.cs b/lib/Utils/src/Memory/Caching/ObjectRental.cs
index 86ea63a..e80b0b5 100644
--- a/lib/Utils/src/Memory/Caching/ObjectRental.cs
+++ b/lib/Utils/src/Memory/Caching/ObjectRental.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2023 Vaughn Nugent
+* Copyright (c) 2024 Vaughn Nugent
*
* Library: VNLib
* Package: VNLib.Utils
@@ -25,6 +25,7 @@
using System;
using System.Diagnostics;
using System.Collections.Concurrent;
+using System.Runtime.CompilerServices;
namespace VNLib.Utils.Memory.Caching
{
@@ -38,7 +39,7 @@ namespace VNLib.Utils.Memory.Caching
{
protected readonly ConcurrentStack<T> Storage;
- protected readonly Action<T>? ReturnAction;
+ protected readonly Func<T, bool>? ReturnAction;
protected readonly Action<T>? RentAction;
protected readonly Func<T> Constructor;
@@ -73,11 +74,11 @@ namespace VNLib.Utils.Memory.Caching
/// <param name="rentCb">The pre-retnal preperation action</param>
/// <param name="returnCb">The pre-return cleanup action</param>
/// <param name="quota">The maximum number of elements to cache in the store</param>
- protected internal ObjectRental(Func<T> constructor, Action<T>? rentCb, Action<T>? returnCb, int quota) : this(quota)
+ protected internal ObjectRental(Func<T> constructor, Action<T>? rentCb, Func<T, bool>? returnCb, int quota) : this(quota)
{
- this.RentAction = rentCb;
- this.ReturnAction = returnCb;
- this.Constructor = constructor;
+ RentAction = rentCb;
+ ReturnAction = returnCb;
+ Constructor = constructor;
}
/// <inheritdoc/>
@@ -108,23 +109,27 @@ namespace VNLib.Utils.Memory.Caching
_ = item ?? throw new ArgumentNullException(nameof(item));
Check();
-
- //Invoke return callback if set
- ReturnAction?.Invoke(item);
+
+ //Invoke return callback if set and check if the item should be returned
+ if (ReturnAction != null && ReturnAction(item) == false)
+ {
+ //If the return action returns false, the item should not be returned to the store
+ DisposeIfDisposeable(item);
+ return;
+ }
//Check quota limit (Doesnt need to be perfect)
if (Storage.Count < QuotaLimit)
{
//Store the object
Storage.Push(item);
+ return;
}
- else if(IsDisposableType)
- {
- //If the element was not added and is disposeable, we can dispose the element
- (item as IDisposable)!.Dispose();
- //Write debug message
- Debug.WriteLine("Object rental disposed an object over quota");
- }
+
+ //Cleanup the object
+ DisposeIfDisposeable(item);
+ //Write debug message
+ Debug.WriteLine("Object rental disposed an object over quota");
}
/// <remarks>
@@ -154,6 +159,19 @@ namespace VNLib.Utils.Memory.Caching
/// <returns></returns>
protected T[] GetElementsWithLock() => Storage.ToArray();
+ /// <summary>
+ /// Disposes the item if it implements <see cref="IDisposable"/>
+ /// based on a cached value of the type for performance
+ /// </summary>
+ /// <param name="item">The item to clean up</param>
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ protected void DisposeIfDisposeable(T item)
+ {
+ if (IsDisposableType)
+ {
+ (item as IDisposable)!.Dispose();
+ }
+ }
/// <inheritdoc/>
/// <exception cref="ObjectDisposedException"></exception>
diff --git a/lib/Utils/src/Memory/Caching/ObjectRentalBase.cs b/lib/Utils/src/Memory/Caching/ObjectRentalBase.cs
index a68f8e7..b60cf06 100644
--- a/lib/Utils/src/Memory/Caching/ObjectRentalBase.cs
+++ b/lib/Utils/src/Memory/Caching/ObjectRentalBase.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2022 Vaughn Nugent
+* Copyright (c) 2024 Vaughn Nugent
*
* Library: VNLib
* Package: VNLib.Utils
@@ -48,13 +48,22 @@ namespace VNLib.Utils.Memory.Caching
/// <param name="rentCb">Function responsible for preparing an instance to be rented</param>
/// <param name="returnCb">Function responsible for cleaning up an instance before reuse</param>
/// <param name="quota">The maximum number of elements that will be cached</param>
- public static ObjectRental<TNew> Create<TNew>(Action<TNew>? rentCb, Action<TNew>? returnCb, int quota = 0) where TNew : class, new()
+ public static ObjectRental<TNew> Create<TNew>(Action<TNew>? rentCb, Func<TNew, bool>? returnCb, int quota = 0) where TNew : class, new()
{
static TNew constructor() => new();
return Create(constructor, rentCb, returnCb, quota);
}
/// <summary>
+ /// Creates a new <see cref="ObjectRental{T}"/> store with generic rental and return callback handlers
+ /// </summary>
+ /// <param name="rentCb">Function responsible for preparing an instance to be rented</param>
+ /// <param name="returnCb">Function responsible for cleaning up an instance before reuse</param>
+ /// <param name="quota">The maximum number of elements that will be cached</param>
+ public static ObjectRental<TNew> Create<TNew>(Action<TNew>? rentCb, Action<TNew>? returnCb, int quota = 0) where TNew : class, new()
+ => Create(rentCb, FromAction(returnCb), quota);
+
+ /// <summary>
/// Creates a new <see cref="ObjectRental{T}"/> store with a generic constructor function
/// </summary>
/// <param name="constructor">The function invoked to create a new instance when required</param>
@@ -69,9 +78,20 @@ namespace VNLib.Utils.Memory.Caching
/// <param name="rentCb">Function responsible for preparing an instance to be rented</param>
/// <param name="returnCb">Function responsible for cleaning up an instance before reuse</param>
/// <param name="quota">The maximum number of elements that will be cached</param>
- public static ObjectRental<TNew> Create<TNew>(Func<TNew> constructor, Action<TNew>? rentCb, Action<TNew>? returnCb, int quota = 0) where TNew : class
+ public static ObjectRental<TNew> Create<TNew>(Func<TNew> constructor, Action<TNew>? rentCb, Func<TNew, bool>? returnCb, int quota = 0) where TNew : class
=> new(constructor, rentCb, returnCb, quota);
+
+ /// <summary>
+ /// Creates a new <see cref="ObjectRental{T}"/> store with generic rental and return callback handlers
+ /// </summary>
+ /// <param name="constructor">The function invoked to create a new instance when required</param>
+ /// <param name="rentCb">Function responsible for preparing an instance to be rented</param>
+ /// <param name="returnCb">Function responsible for cleaning up an instance before reuse</param>
+ /// <param name="quota">The maximum number of elements that will be cached</param>
+ public static ObjectRental<TNew> Create<TNew>(Func<TNew> constructor, Action<TNew>? rentCb, Action<TNew>? returnCb, int quota = 0) where TNew : class
+ => Create(constructor, rentCb, FromAction(returnCb), quota);
+
/// <summary>
/// Creates a new <see cref="ThreadLocalObjectStorage{TNew}"/> store with generic rental and return callback handlers
/// </summary>
@@ -80,10 +100,32 @@ namespace VNLib.Utils.Memory.Caching
/// <param name="rentCb">Function responsible for preparing an instance to be rented</param>
/// <param name="returnCb">Function responsible for cleaning up an instance before reuse</param>
/// <returns>The initialized store</returns>
- public static ThreadLocalObjectStorage<TNew> CreateThreadLocal<TNew>(Func<TNew> constructor, Action<TNew>? rentCb, Action<TNew>? returnCb) where TNew : class
+ public static ThreadLocalObjectStorage<TNew> CreateThreadLocal<TNew>(Func<TNew> constructor, Action<TNew>? rentCb, Func<TNew, bool>? returnCb) where TNew : class
=> new (constructor, rentCb, returnCb);
/// <summary>
+ /// Creates a new <see cref="ThreadLocalObjectStorage{TNew}"/> store with generic rental and return callback handlers
+ /// </summary>
+ /// <typeparam name="TNew"></typeparam>
+ /// <param name="constructor">The function invoked to create a new instance when required</param>
+ /// <param name="rentCb">Function responsible for preparing an instance to be rented</param>
+ /// <param name="returnCb">Function responsible for cleaning up an instance before reuse</param>
+ /// <returns>The initialized store</returns>
+ public static ThreadLocalObjectStorage<TNew> CreateThreadLocal<TNew>(Func<TNew> constructor, Action<TNew>? rentCb, Action<TNew>? returnCb) where TNew : class
+ => CreateThreadLocal(constructor, rentCb, FromAction(returnCb));
+
+ /// <summary>
+ /// Creates a new <see cref="ThreadLocalObjectStorage{T}"/> store with generic rental and return callback handlers
+ /// </summary>
+ /// <param name="rentCb">Function responsible for preparing an instance to be rented</param>
+ /// <param name="returnCb">Function responsible for cleaning up an instance before reuse</param>
+ public static ThreadLocalObjectStorage<TNew> CreateThreadLocal<TNew>(Action<TNew>? rentCb, Func<TNew, bool>? returnCb) where TNew : class, new()
+ {
+ static TNew constructor() => new();
+ return CreateThreadLocal(constructor, rentCb, returnCb);
+ }
+
+ /// <summary>
/// Creates a new <see cref="ThreadLocalObjectStorage{T}"/> store with generic rental and return callback handlers
/// </summary>
/// <param name="rentCb">Function responsible for preparing an instance to be rented</param>
@@ -154,6 +196,23 @@ namespace VNLib.Utils.Memory.Caching
private static void StaticOnReusablePrepare<T>(T item) where T: IReusable => item.Prepare();
- private static void StaticOnReusableRelease<T>(T item) where T : IReusable => item.Release();
+ private static bool StaticOnReusableRelease<T>(T item) where T : IReusable => item.Release();
+
+
+ private static Func<T, bool>? FromAction<T>(Action<T>? callback)
+ {
+ //Propagate null callback
+ if(callback is null)
+ {
+ return null;
+ }
+
+ //Local function always returns true
+ return (T item) =>
+ {
+ callback(item);
+ return true;
+ };
+ }
}
}
diff --git a/lib/Utils/src/Memory/Caching/ThreadLocalObjectStorage.cs b/lib/Utils/src/Memory/Caching/ThreadLocalObjectStorage.cs
index a2e5ef6..50915fb 100644
--- a/lib/Utils/src/Memory/Caching/ThreadLocalObjectStorage.cs
+++ b/lib/Utils/src/Memory/Caching/ThreadLocalObjectStorage.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2023 Vaughn Nugent
+* Copyright (c) 2024 Vaughn Nugent
*
* Library: VNLib
* Package: VNLib.Utils
@@ -37,7 +37,7 @@ namespace VNLib.Utils.Memory.Caching
{
protected ThreadLocal<T> Store { get; }
- internal ThreadLocalObjectStorage(Func<T> constructor, Action<T>? rentCb, Action<T>? returnCb)
+ internal ThreadLocalObjectStorage(Func<T> constructor, Action<T>? rentCb, Func<T, bool>? returnCb)
:base(constructor, rentCb, returnCb, 0)
{
Store = new(Constructor);
@@ -51,10 +51,10 @@ namespace VNLib.Utils.Memory.Caching
public override T Rent()
{
Check();
- //Get the tlocal value
+ //Get the tlocal value or init if null (and assign to thread-local slot)
T value = Store.Value!;
//Invoke the rent action if set
- base.RentAction?.Invoke(value);
+ RentAction?.Invoke(value);
return value;
}
@@ -63,15 +63,19 @@ namespace VNLib.Utils.Memory.Caching
public override void Return(T item)
{
Check();
+
//Invoke the rent action
- base.ReturnAction?.Invoke(item);
+ if(ReturnAction != null && ReturnAction.Invoke(item) == false)
+ {
+ //Assign a new value to the thread-local slot and clean-up the old one
+ Store.Value = Constructor();
+
+ DisposeIfDisposeable(item);
+ }
}
///<inheritdoc/>
- public override T[] GetItems()
- {
- return Store.Values.ToArray();
- }
+ public override T[] GetItems() => Store.Values.ToArray();
///<inheritdoc/>
protected override void Free()
diff --git a/lib/Utils/src/Memory/MemoryUtil.cs b/lib/Utils/src/Memory/MemoryUtil.cs
index 5d979c9..6d0690b 100644
--- a/lib/Utils/src/Memory/MemoryUtil.cs
+++ b/lib/Utils/src/Memory/MemoryUtil.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2023 Vaughn Nugent
+* Copyright (c) 2024 Vaughn Nugent
*
* Library: VNLib
* Package: VNLib.Utils
@@ -31,6 +31,9 @@ using System.Globalization;
using System.Runtime.InteropServices;
using System.Runtime.CompilerServices;
+using System.Runtime.Intrinsics;
+using System.Runtime.Intrinsics.X86;
+
using VNLib.Utils.Resources;
using VNLib.Utils.Memory.Diagnostics;
@@ -89,7 +92,10 @@ namespace VNLib.Utils.Memory
//Cache the system page size
private static readonly int SystemPageSize = Environment.SystemPageSize;
-
+
+ //Cache avx2 support
+ private static readonly bool IsAvx2Supported = Avx2.IsSupported;
+
/// <summary>
/// Provides a shared heap instance for the process to allocate memory from.
/// </summary>
@@ -469,9 +475,7 @@ namespace VNLib.Utils.Memory
//Recover byte reference of target struct
ref byte dst = ref Unsafe.As<T, byte>(ref target);
- //Memmove
- bool result = MemmoveByRef(ref source, 0, ref dst, 0, (nuint)sizeof(T));
- Debug.Assert(result, "Memmove 32bit copy failed");
+ Unsafe.CopyBlockUnaligned(ref dst, ref source, (uint)sizeof(T));
}
/// <summary>
@@ -501,8 +505,7 @@ namespace VNLib.Utils.Memory
ref byte src = ref Unsafe.As<T, byte>(ref source);
//Memmove
- bool result = MemmoveByRef(ref src, 0, ref target, 0, (nuint)sizeof(T));
- Debug.Assert(result, "Memmove 32bit copy failed");
+ Unsafe.CopyBlockUnaligned(ref target, ref src, (uint)sizeof(T));
}
@@ -701,7 +704,23 @@ namespace VNLib.Utils.Memory
/// <param name="source">A reference to the source structure to copy from</param>
/// <param name="target">A reference to the target structure to copy to</param>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
- public static void CloneStruct<T>(ref T source, ref T target) where T : struct => Memmove(ref source, 0, ref target, 0, 1);
+ public static void CloneStruct<T>(ref T source, ref T target) where T : unmanaged
+ {
+ if (Unsafe.IsNullRef(ref source))
+ {
+ throw new ArgumentNullException(nameof(source));
+ }
+ if (Unsafe.IsNullRef(ref target))
+ {
+ throw new ArgumentNullException(nameof(target));
+ }
+
+ //Byte refs
+ ref byte src = ref Unsafe.As<T, byte>(ref source);
+ ref byte dst = ref Unsafe.As<T, byte>(ref target);
+
+ Unsafe.CopyBlockUnaligned(ref dst, ref src, (uint)sizeof(T));
+ }
/// <summary>
/// Copies the memory of the structure pointed to by the source pointer to the target
@@ -750,13 +769,11 @@ namespace VNLib.Utils.Memory
CheckBounds(dest, destOffset, (uint)count);
//Use memmove by ref
- bool success = MemmoveByRef(
- ref MemoryMarshal.GetReference(source),
- (uint)sourceOffset,
- ref dest.GetReference(),
- (uint)destOffset,
- //Get byte ref and byte count
- ByteCount<T>((uint)count)
+ bool success = CopyUtilCore.MemmoveByRef(
+ ref Refs.AsByte(source, (nuint)sourceOffset),
+ ref Refs.AsByte(dest, destOffset),
+ ByteCount<T>((uint)count),
+ false
);
Debug.Assert(success, "Memmove by ref call failed during a 32bit copy");
@@ -804,12 +821,11 @@ namespace VNLib.Utils.Memory
CheckBounds(dest, destOffset, count);
//Use memmove by ref
- bool success = MemmoveByRef(
- ref source.GetReference(),
- (uint)sourceOffset,
- ref MemoryMarshal.GetReference(dest),
- (uint)destOffset,
- ByteCount<T>((uint)count)
+ bool success = CopyUtilCore.MemmoveByRef(
+ ref Refs.AsByte(source, (nuint)sourceOffset),
+ ref Refs.AsByte(dest, (nuint)destOffset),
+ ByteCount<T>((uint)count),
+ false
);
Debug.Assert(success, "Memmove by ref call failed during a 32bit copy");
@@ -854,7 +870,13 @@ namespace VNLib.Utils.Memory
//Get byte ref and byte count
nuint byteCount = ByteCount<T>(count);
- if (!MemmoveByRef(ref source.GetReference(), sourceOffset, ref dest.GetReference(), destOffset, byteCount))
+ if (!CopyUtilCore.MemmoveByRef(
+ ref Refs.AsByte(source, sourceOffset),
+ ref Refs.AsByte(dest, destOffset),
+ byteCount,
+ false
+ )
+ )
{
//Copying block larger than 32bit must be done with pointers
using MemoryHandle srcH = source.Pin(0);
@@ -888,7 +910,7 @@ namespace VNLib.Utils.Memory
}
/// <summary>
- /// Preforms a fast referrence based copy on very large blocks of memory
+ /// Performs a fast reference based copy on very large blocks of memory
/// using pinning and pointers only when the number of bytes to copy is
/// larger than <see cref="UInt32.MaxValue"/>
/// </summary>
@@ -918,7 +940,13 @@ namespace VNLib.Utils.Memory
nuint byteCount = ByteCount<T>(count);
//Try to memove by ref first, otherwise fallback to pinning
- if (!MemmoveByRef(ref source.GetReference(), sourceOffset, ref MemoryMarshal.GetArrayDataReference(dest), destOffset, byteCount))
+ if (!CopyUtilCore.MemmoveByRef(
+ ref Refs.AsByte(source, sourceOffset),
+ ref Refs.AsByte(dest, destOffset),
+ byteCount,
+ false
+ )
+ )
{
//Copying block larger than 32bit must be done with pointers
using MemoryHandle srcH = source.Pin(0);
@@ -933,9 +961,8 @@ namespace VNLib.Utils.Memory
}
}
-
/// <summary>
- /// Preforms a fast referrence based copy on very large blocks of memory
+ /// Performs a fast reference based copy on very large blocks of memory
/// using pinning and pointers only when the number of bytes to copy is
/// larger than <see cref="UInt32.MaxValue"/>
/// </summary>
@@ -965,7 +992,13 @@ namespace VNLib.Utils.Memory
nuint byteCount = ByteCount<T>(count);
//Try to memove by ref first, otherwise fallback to pinning
- if (!MemmoveByRef(ref MemoryMarshal.GetArrayDataReference(source), sourceOffset, ref dest.GetReference(), destOffset, byteCount))
+ if (!CopyUtilCore.MemmoveByRef(
+ ref Refs.AsByte(source, sourceOffset),
+ ref Refs.AsByte(dest, destOffset),
+ byteCount,
+ false
+ )
+ )
{
//Copying block larger than 32bit must be done with pointers
using MemoryHandle srcH = PinArrayAndGetHandle(source, 0);
@@ -996,61 +1029,65 @@ namespace VNLib.Utils.Memory
/// <param name="elementCount"></param>
/// <exception cref="ArgumentException"></exception>
/// <exception cref="ArgumentNullException"></exception>
- public static void Memmove<T>(ref T src, nuint srcOffset, ref T dst, nuint dstOffset, nuint elementCount) where T : struct
+ public static void Memmove<T>(ref T src, nuint srcOffset, ref T dst, nuint dstOffset, nuint elementCount) where T : struct
+ => Memmove(ref src, srcOffset, ref dst, dstOffset, elementCount, false);
+
+ /// <summary>
+ /// Low level api for copying data from source memory to destination memory of an
+ /// umanged data type. This call attempts to force hadrware acceleration if supported.
+ /// <para>
+ /// Understand that using this function attempts to force hardware acceleration, which
+ /// may hurt performance if the data is not large enough to justify the overhead.
+ /// </para>
+ /// <para>
+ /// If the <see cref="Avx.IsSupported"/> flag is false, this function will fallback to
+ /// the default method used by <see cref="Memmove{T}(ref T, nuint, ref T, nuint, nuint)"/>
+ /// </para>
+ /// </summary>
+ /// <remarks>
+ /// WARNING: It's not possible to do bounds checking when using references. Be sure you
+ /// know what you are doing!
+ /// </remarks>
+ /// <typeparam name="T">The unmanaged or structure type to copy</typeparam>
+ /// <param name="src">A reference to the source data to copy from</param>
+ /// <param name="srcOffset">The offset (in elements) from the reference to begin the copy from</param>
+ /// <param name="dst">The detination</param>
+ /// <param name="dstOffset"></param>
+ /// <param name="elementCount"></param>
+ /// <exception cref="ArgumentException"></exception>
+ /// <exception cref="ArgumentNullException"></exception>
+ public static void AcceleratedMemmove<T>(ref T src, nuint srcOffset, ref T dst, nuint dstOffset, nuint elementCount) where T : struct
+ => Memmove(ref src, srcOffset, ref dst, dstOffset, elementCount, IsAvx2Supported);
+
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ private static void Memmove<T>(ref T src, nuint srcOffset, ref T dst, nuint dstOffset, nuint elementCount, bool useAcceleration) where T : struct
{
- if(Unsafe.IsNullRef(ref src))
+ if (Unsafe.IsNullRef(ref src))
{
throw new ArgumentNullException(nameof(src));
}
- if(Unsafe.IsNullRef(ref dst))
+ if (Unsafe.IsNullRef(ref dst))
{
throw new ArgumentNullException(nameof(dst));
}
- if(elementCount == 0)
+ if (elementCount == 0)
{
return;
}
- if(!MemmoveByRef(ref src, srcOffset, ref dst, dstOffset, ByteCount<T>(elementCount)))
+ if (!CopyUtilCore.MemmoveByRef(
+ ref Refs.AsByte(ref src, srcOffset),
+ ref Refs.AsByte(ref dst, dstOffset),
+ ByteCount<T>(elementCount),
+ useAcceleration)
+ )
{
throw new ArgumentException("The number of bytes to copy was larger than Uint32.MaxValue and was unsupported on this platform", nameof(elementCount));
}
}
- [MethodImpl(MethodImplOptions.AggressiveInlining | MethodImplOptions.AggressiveOptimization)]
- private static bool MemmoveByRef<T>(ref T src, nuint srcOffset, ref T dst, nuint dstOffset, nuint byteCount) where T : struct
- {
- Debug.Assert(!Unsafe.IsNullRef(ref src), "Null source reference passed to MemmoveByRef");
- Debug.Assert(!Unsafe.IsNullRef(ref dst), "Null destination reference passed to MemmoveByRef");
-
- //Get offset referrences to the source and destination
- ref T srcOffsetPtr = ref Unsafe.Add(ref src, srcOffset);
- ref T dstOffsetPtr = ref Unsafe.Add(ref dst, dstOffset);
-
- //Cast to byte pointers
- ref byte srcByte = ref Unsafe.As<T, byte>(ref srcOffsetPtr);
- ref byte dstByte = ref Unsafe.As<T, byte>(ref dstOffsetPtr);
-
- if (_clrMemmove != null)
- {
- //Call sysinternal memmove
- _clrMemmove(ref dstByte, ref srcByte, byteCount);
- return true;
- }
- else if(byteCount < uint.MaxValue)
- {
- //Use safe 32bit block copy
- Unsafe.CopyBlock(ref dstByte, ref srcByte, (uint)byteCount);
- return true;
- }
- else
- {
- return false;
- }
- }
-
#endregion
#region Validation
@@ -1203,17 +1240,31 @@ namespace VNLib.Utils.Memory
/// <param name="elementOffset">The address offset</param>
/// <returns>A <see cref="MemoryHandle"/> that manages the pinning of the supplied array</returns>
/// <exception cref="IndexOutOfRangeException"></exception>
- public static MemoryHandle PinArrayAndGetHandle<T>(T[] array, int elementOffset)
+ public static MemoryHandle PinArrayAndGetHandle<T>(T[] array, nint elementOffset)
{
if(elementOffset < 0)
{
throw new ArgumentOutOfRangeException(nameof(elementOffset));
}
- _ = array ?? throw new ArgumentNullException(nameof(array));
+ return PinArrayAndGetHandle(array, (nuint)elementOffset);
+ }
+
+ /// <summary>
+ /// Pins the supplied array and gets the memory handle that controls
+ /// the pinning lifetime via GC handle
+ /// </summary>
+ /// <typeparam name="T"></typeparam>
+ /// <param name="array">The array to pin</param>
+ /// <param name="elementOffset">The address offset</param>
+ /// <returns>A <see cref="MemoryHandle"/> that manages the pinning of the supplied array</returns>
+ /// <exception cref="IndexOutOfRangeException"></exception>
+ public static MemoryHandle PinArrayAndGetHandle<T>(T[] array, nuint elementOffset)
+ {
+ ArgumentNullException.ThrowIfNull(array, nameof(array));
//Quick verify index exists, may be the very last index
- CheckBounds(array, (nuint)elementOffset, 1);
+ CheckBounds(array, elementOffset, 1);
//Pin the array
GCHandle arrHandle = GCHandle.Alloc(array, GCHandleType.Pinned);
@@ -1356,5 +1407,151 @@ namespace VNLib.Utils.Memory
//Round to nearest page (in bytes)
return NearestPage(elements * elSize) / elSize;
}
+
+
+ private static class CopyUtilCore
+ {
+
+ [MethodImpl(MethodImplOptions.AggressiveInlining | MethodImplOptions.AggressiveOptimization)]
+ public static bool MemmoveByRef(ref byte srcByte, ref byte dstByte, nuint byteCount, bool forceAcceleration)
+ {
+ Debug.Assert(!Unsafe.IsNullRef(ref srcByte), "Null source reference passed to MemmoveByRef");
+ Debug.Assert(!Unsafe.IsNullRef(ref dstByte), "Null destination reference passed to MemmoveByRef");
+
+ if (IsAvx2Supported)
+ {
+ //If the data is aligned, always use the 32 byte copy
+ if (Is32ByteAligned(byteCount))
+ {
+ _avx32ByteCopy(ref srcByte, ref dstByte, byteCount);
+ return true;
+ }
+
+ //See if forcing acceleration is desired
+ if (forceAcceleration)
+ {
+ //not aligned, so we need to only copy the aligned portion
+ nuint remainder = byteCount % 0x20u;
+ nuint alignedCount = byteCount - remainder;
+
+ //Copy aligned portion
+ _avx32ByteCopy(ref srcByte, ref dstByte, alignedCount);
+
+ //Upshift references to the remainder addresses
+ ref byte srcRemainder = ref Unsafe.Add(ref srcByte, alignedCount);
+ ref byte dstRemainder = ref Unsafe.Add(ref dstByte, alignedCount);
+
+ //finish copying remaining data
+ bool success = _memmove(ref srcRemainder, ref dstRemainder, remainder);
+ Debug.Assert(success, "Memmove by ref call failed during a 32bit copy");
+
+ return true;
+ }
+ }
+
+ //fallback to memmove
+ return _memmove(ref srcByte, ref dstByte, byteCount);
+ }
+
+
+ [MethodImpl(MethodImplOptions.AggressiveInlining | MethodImplOptions.AggressiveOptimization)]
+ private static bool _memmove(ref byte src, ref byte dst, nuint byteCount)
+ {
+ if (_clrMemmove != null)
+ {
+ //Call sysinternal memmove
+ _clrMemmove(ref dst, ref src, byteCount);
+ return true;
+ }
+ else if (byteCount < uint.MaxValue)
+ {
+ //Use safe 32bit block copy
+ Unsafe.CopyBlock(ref dst, ref src, (uint)byteCount);
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+
+ [MethodImpl(MethodImplOptions.AggressiveInlining | MethodImplOptions.AggressiveOptimization)]
+ private static void _avx32ByteCopy(
+ ref byte src,
+ ref byte dst,
+ nuint count
+ )
+ {
+ Debug.Assert(Is32ByteAligned(count), "Byte count must be 32 byte aligned");
+ Debug.Assert(Avx2.IsSupported, "AVX2 is not supported on this platform");
+
+ //determine the number of loops
+ nuint loopCount = count / (nuint)Vector256<byte>.Count;
+
+ fixed (byte* srcPtr = &src, dstPtr = &dst)
+ {
+ //local mutable copies
+ byte* srcOffset = srcPtr;
+ byte* dstOffset = dstPtr;
+
+ for (nuint i = 0; i < loopCount; i++)
+ {
+ //avx vector load
+ Vector256<byte> srcVector = Avx.LoadVector256(srcOffset);
+ Avx.Store(dstOffset, srcVector);
+
+ //Upshift pointers
+ srcOffset += Vector256<byte>.Count;
+ dstOffset += Vector256<byte>.Count;
+ }
+ }
+ }
+
+
+ /// <summary>
+ /// Determines if the given size 32-byte aligned
+ /// </summary>
+ /// <param name="size">The block size to test</param>
+ /// <returns>A value that indicates if the block size is 32byte aligned</returns>
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public static bool Is32ByteAligned(nuint size) => unchecked(size % 0x20u) == 0;
+ }
+
+ private static class Refs
+ {
+ public static ref byte AsByte<T>(ref T ptr, nuint elementOffset)
+ {
+ ref T offset = ref Unsafe.Add(ref ptr, (nint)elementOffset);
+ return ref Unsafe.As<T, byte>(ref offset);
+ }
+
+ public static ref byte AsByte<T>(T[] arr, nuint elementOffset)
+ {
+ ref T ptr = ref MemoryMarshal.GetArrayDataReference(arr);
+ ref T offset = ref Unsafe.Add(ref ptr, (nint)elementOffset);
+ return ref Unsafe.As<T, byte>(ref offset);
+ }
+
+ public static ref byte AsByte<T>(Span<T> span, nuint elementOffset)
+ {
+ ref T ptr = ref MemoryMarshal.GetReference(span);
+ ref T offset = ref Unsafe.Add(ref ptr, (nint)elementOffset);
+ return ref Unsafe.As<T, byte>(ref offset);
+ }
+
+ public static ref byte AsByte<T>(ReadOnlySpan<T> span, nuint elementOffset)
+ {
+ ref T ptr = ref MemoryMarshal.GetReference(span);
+ ref T offset = ref Unsafe.Add(ref ptr, (nint)elementOffset);
+ return ref Unsafe.As<T, byte>(ref offset);
+ }
+
+ public static ref byte AsByte<T>(IMemoryHandle<T> handle, nuint elementOffset)
+ {
+ ref T ptr = ref handle.GetReference();
+ ref T offset = ref Unsafe.Add(ref ptr, (nint)elementOffset);
+ return ref Unsafe.As<T, byte>(ref offset);
+ }
+ }
}
} \ No newline at end of file