diff options
author | vnugent <public@vaughnnugent.com> | 2023-11-29 00:15:28 -0500 |
---|---|---|
committer | vnugent <public@vaughnnugent.com> | 2023-11-29 00:15:28 -0500 |
commit | 29371caa9c29fd6cfdfb238d98f53fda59e2e8a7 (patch) | |
tree | 47bb4d4726f2cafb1af41460d3356519b4074198 /lib | |
parent | 07824a130c7608337a36382dbfa40198a8c70297 (diff) |
immutable fbm clients, websocket abstractions, prep for monocypher/argon2 bindings
Diffstat (limited to 'lib')
19 files changed, 698 insertions, 527 deletions
diff --git a/lib/Hashing.Portable/src/Argon2/VnArgon2.cs b/lib/Hashing.Portable/src/Argon2/VnArgon2.cs index c07a02e..9b27194 100644 --- a/lib/Hashing.Portable/src/Argon2/VnArgon2.cs +++ b/lib/Hashing.Portable/src/Argon2/VnArgon2.cs @@ -133,7 +133,7 @@ namespace VNLib.Hashing using IMemoryHandle<byte> buffer = PwHeap.Alloc<byte>(saltbytes + passBytes); Span<byte> saltBuffer = buffer.AsSpan(0, saltbytes); - Span<byte> passBuffer = buffer.AsSpan(passBytes); + Span<byte> passBuffer = buffer.AsSpan(saltbytes, passBytes); //Encode salt with span the same size of the salt _ = LocEncoding.GetBytes(salt, saltBuffer); @@ -142,7 +142,12 @@ namespace VNLib.Hashing _ = LocEncoding.GetBytes(password, passBuffer); //Hash - return Hash2id(lib, passBuffer, saltBuffer, secret, in costParams, hashLen); + string result = Hash2id(lib, passBuffer, saltBuffer, secret, in costParams, hashLen); + + //Zero buffer + MemoryUtil.InitializeBlock(ref buffer.GetReference(), buffer.GetIntLength()); + + return result; } /// <summary> @@ -176,7 +181,12 @@ namespace VNLib.Hashing _ = LocEncoding.GetBytes(password, pwdHandle.Span); //Hash - return Hash2id(lib, pwdHandle.Span, salt, secret, in costParams, hashLen); + string result = Hash2id(lib, pwdHandle.Span, salt, secret, in costParams, hashLen); + + //Zero buffer + MemoryUtil.InitializeBlock(ref pwdHandle.GetReference(), pwdHandle.GetIntLength()); + + return result; } /// <summary> @@ -213,6 +223,9 @@ namespace VNLib.Hashing //encode salt salts = Convert.ToBase64String(salt); + //Zero buffer + MemoryUtil.InitializeBlock(ref hashHandle.GetReference(), hashHandle.GetIntLength()); + //Encode salt in base64 return $"${ID_MODE}$v={(int)Argon2Version.Version13},m={costParams.MemoryCost},t={costParams.TimeCost},p={costParams.Parallelism},s={salts}${hash}"; } @@ -347,7 +360,12 @@ namespace VNLib.Hashing //encode password bytes rawPassLen = LocEncoding.GetBytes(rawPass, rawPassBuf); //Verify password - return Verify2id(lib, rawPassBuf[..rawPassLen], saltBuf, secret, passBuf, in costParams); + bool result = Verify2id(lib, rawPassBuf[..rawPassLen], saltBuf, secret, passBuf, in costParams); + + //Zero buffer + MemoryUtil.InitializeBlock(ref rawBufferHandle.GetReference(), rawBufferHandle.GetIntLength()); + + return result; } /// <summary> @@ -410,12 +428,17 @@ namespace VNLib.Hashing context->outptr = outputPtr.Pointer; context->outlen = (uint)outputHandle.Length; //Hash - Argon2_ErrorCodes result = (Argon2_ErrorCodes)lib.Argon2Hash((IntPtr)context); + Argon2_ErrorCodes argResult = (Argon2_ErrorCodes)lib.Argon2Hash((IntPtr)context); //Throw an excpetion if an error ocurred - ThrowOnArgonErr(result); + ThrowOnArgonErr(argResult); } //Return the comparison - return CryptographicOperations.FixedTimeEquals(outputHandle.Span, hashBytes); + bool result = CryptographicOperations.FixedTimeEquals(outputHandle.Span, hashBytes); + + //Zero buffer + MemoryUtil.InitializeBlock(ref outputHandle.GetReference(), outputHandle.GetIntLength()); + + return result; } private static void ThrowOnArgonErr(Argon2_ErrorCodes result) diff --git a/lib/Net.Http/src/Core/TransportReader.cs b/lib/Net.Http/src/Core/TransportReader.cs index 8d605d1..a512331 100644 --- a/lib/Net.Http/src/Core/TransportReader.cs +++ b/lib/Net.Http/src/Core/TransportReader.cs @@ -41,14 +41,7 @@ namespace VNLib.Net.Http.Core /// </summary> internal readonly struct TransportReader : IVnTextReader { - /* - * To make this structure read-only we can store the - * mutable values in a private segment of the internal - * buffer. 8 bytes are reserved at the beining and an - * additional word is added for padding incase small/wild - * under/over run occurs. - */ - const int PrivateBufferOffset = 4 * sizeof(int); + private readonly static int BufferPosStructSize = Unsafe.SizeOf<BufferPosition>(); ///<inheritdoc/> public readonly Encoding Encoding { get; } @@ -58,26 +51,10 @@ namespace VNLib.Net.Http.Core ///<inheritdoc/> public readonly Stream BaseStream { get; } - - /* - * Store the window start/end in the begging of the - * data buffer. Then use a constant offset to get the - * start of the buffer - */ - private readonly int BufWindowStart - { - get => MemoryMarshal.Read<int>(Buffer.GetBinSpan()); - set => MemoryMarshal.Write(Buffer.GetBinSpan(), ref value); - } - - private readonly int BufWindowEnd - { - get => MemoryMarshal.Read<int>(Buffer.GetBinSpan()[sizeof(int)..]); - set => MemoryMarshal.Write(Buffer.GetBinSpan()[sizeof(int)..], ref value); - } + private readonly IHttpHeaderParseBuffer Buffer; - private readonly int MAxBufferSize; + private readonly uint MaxBufferSize; /// <summary> /// Initializes a new <see cref="TransportReader"/> for reading text lines from the transport stream @@ -92,78 +69,177 @@ namespace VNLib.Net.Http.Core BaseStream = transport; LineTermination = lineTermination; Buffer = buffer; - MAxBufferSize = buffer.BinSize - PrivateBufferOffset; + MaxBufferSize = (uint)(buffer.BinSize - BufferPosStructSize); + + //Assign an zeroed position + BufferPosition position = default; + SetPosition(ref position); - //Initialize the buffer window - SafeZeroPrivateSegments(Buffer); + AssertZeroPosition(); + } + + [Conditional("DEBUG")] + private void AssertZeroPosition() + { + BufferPosition position = default; + GetPosition(ref position); + Debug.Assert(position.WindowStart == 0); + Debug.Assert(position.WindowEnd == 0); + } - Debug.Assert(BufWindowEnd == 0 && BufWindowStart == 0); + /// <summary> + /// Reads the current position from the buffer segment + /// </summary> + /// <param name="position">A reference to the varable to write the position to</param> + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private readonly void GetPosition(ref BufferPosition position) + { + //Get the beining of the segment and read the position + Span<byte> span = Buffer.GetBinSpan(); + position = MemoryMarshal.Read<BufferPosition>(span); } /// <summary> - /// Clears the initial window start/end values with the - /// extra padding + /// Updates the current position in the buffer segment /// </summary> - /// <param name="buffer">The buffer segment to initialize</param> - private static void SafeZeroPrivateSegments(IHttpHeaderParseBuffer buffer) + /// <param name="position"></param> + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private readonly void SetPosition(ref BufferPosition position) { - ref byte start = ref MemoryMarshal.GetReference(buffer.GetBinSpan()); - Unsafe.InitBlock(ref start, 0, PrivateBufferOffset); + //Store the position at the beining of the segment + Span<byte> span = Buffer.GetBinSpan(); + MemoryMarshal.Write(span, ref position); } /// <summary> /// Gets the data segment of the buffer after the private segment /// </summary> /// <returns></returns> - private readonly Span<byte> GetDataSegment() => Buffer.GetBinSpan()[PrivateBufferOffset..]; + private readonly Span<byte> GetDataSegment() + { + //Get the beining of the segment + Span<byte> span = Buffer.GetBinSpan(); + //Return the segment after the private segment + return span[BufferPosStructSize..]; + } ///<inheritdoc/> - public readonly int Available => BufWindowEnd - BufWindowStart; + public readonly int Available + { + get + { + //Read position and return the window size + BufferPosition position = default; + GetPosition(ref position); + return (int)position.GetWindowSize(); + } + } ///<inheritdoc/> - public readonly Span<byte> BufferedDataWindow => GetDataSegment()[BufWindowStart..BufWindowEnd]; - + public readonly Span<byte> BufferedDataWindow + { + get + { + //Read current position and return the window + BufferPosition position = default; + GetPosition(ref position); + return GetDataSegment()[(int)position.WindowStart..(int)position.WindowEnd]; + } + } ///<inheritdoc/> - public readonly void Advance(int count) => BufWindowStart += count; + public readonly void Advance(int count) + { + if (count < 0) + { + throw new ArgumentOutOfRangeException(nameof(count), "Count must be positive"); + } + + //read the current position + BufferPosition position = default; + GetPosition(ref position); + + //Advance the window start by the count and set the position + position.AdvanceStart(count); + SetPosition(ref position); + } ///<inheritdoc/> public readonly void FillBuffer() { + //Read the current position + BufferPosition bufferPosition = default; + GetPosition(ref bufferPosition); + //Get a buffer from the end of the current window to the end of the buffer - Span<byte> bufferWindow = GetDataSegment()[BufWindowEnd..]; + Span<byte> bufferWindow = GetDataSegment()[(int)bufferPosition.WindowEnd..]; //Read from stream int read = BaseStream.Read(bufferWindow); + Debug.Assert(read > -1, "Read should never be negative"); //Update the end of the buffer window to the end of the read data - BufWindowEnd += read; + bufferPosition.AdvanceEnd(read); + SetPosition(ref bufferPosition); } ///<inheritdoc/> public readonly ERRNO CompactBufferWindow() { + //Read the current position + BufferPosition bufferPosition = default; + GetPosition(ref bufferPosition); + //No data to compact if window is not shifted away from start - if (BufWindowStart > 0) + if (bufferPosition.WindowStart > 0) { //Get span over engire buffer Span<byte> buffer = GetDataSegment(); //Get used data segment within window - Span<byte> usedData = buffer[BufWindowStart..BufWindowEnd]; + Span<byte> usedData = buffer[(int)bufferPosition.WindowStart..(int)bufferPosition.WindowEnd]; //Copy remaining to the begining of the buffer usedData.CopyTo(buffer); - - //Buffer window start is 0 - BufWindowStart = 0; - - //Buffer window end is now the remaining size - BufWindowEnd = usedData.Length; + + /* + * Now that data has been shifted, update the position to + * the new window and write the new position to the buffer + */ + bufferPosition.Set(0, usedData.Length); + SetPosition(ref bufferPosition); } //Return the number of bytes of available space from the end of the current window - return MAxBufferSize - BufWindowEnd; + return (nint)(MaxBufferSize - bufferPosition.WindowEnd); + } + + [StructLayout(LayoutKind.Sequential)] + private record struct BufferPosition + { + public uint WindowStart; + public uint WindowEnd; + + /// <summary> + /// Sets the the buffer window position + /// </summary> + /// <param name="start">Window start</param> + /// <param name="end">Window end</param> + public void Set(int start, int end) + { + //Verify that the start and end are not negative + Debug.Assert(start >= 0, "Negative internal value passed to http buffer window start"); + Debug.Assert(end >= 0, "Negative internal value passed to http buffer window end"); + + WindowStart = (uint)start; + WindowEnd = (uint)end; + } + + public readonly uint GetWindowSize() => WindowEnd - WindowStart; + + public void AdvanceEnd(int count) => WindowEnd += (uint)count; + + public void AdvanceStart(int count) => WindowStart += (uint)count; } } } diff --git a/lib/Net.Http/src/Helpers/HelperTypes.cs b/lib/Net.Http/src/Helpers/HelperTypes.cs index 7e7e068..ce13874 100644 --- a/lib/Net.Http/src/Helpers/HelperTypes.cs +++ b/lib/Net.Http/src/Helpers/HelperTypes.cs @@ -87,7 +87,11 @@ namespace VNLib.Net.Http /// <summary> /// Http UNLOCK request method /// </summary> - UNLOCK = 0x1000 + UNLOCK = 0x1000, + /// <summary> + /// Http LIST request method + /// </summary> + LIST = 0x2000 } /// <summary> diff --git a/lib/Net.Http/src/Helpers/HttpHelpers.cs b/lib/Net.Http/src/Helpers/HttpHelpers.cs index d5c471f..198396f 100644 --- a/lib/Net.Http/src/Helpers/HttpHelpers.cs +++ b/lib/Net.Http/src/Helpers/HttpHelpers.cs @@ -29,7 +29,6 @@ using System.Net.Sockets; using System.Collections.Generic; using System.Text.RegularExpressions; -using VNLib.Net.Http.Core; using VNLib.Utils.Memory; using VNLib.Utils.Extensions; @@ -66,7 +65,7 @@ namespace VNLib.Net.Http * an HttpMethod enum value, */ - private static readonly IReadOnlyDictionary<int, HttpMethod> MethodHashLookup; + private static readonly IReadOnlyDictionary<int, HttpMethod> MethodHashLookup = HashHttpMethods(); /* * Provides a constant lookup table from an MIME http request header string to a .NET @@ -128,8 +127,8 @@ namespace VNLib.Net.Http * during request parsing) * */ - private static readonly IReadOnlyDictionary<int, HttpRequestHeader> RequestHeaderHashLookup; - + private static readonly IReadOnlyDictionary<int, HttpRequestHeader> RequestHeaderHashLookup = HashRequestHeaders(); + /* * Provides a constant lookup table for http version hashcodes to an http * version enum value @@ -143,68 +142,63 @@ namespace VNLib.Net.Http }; - //Pre-compiled strings for all status codes for http 1, 1.1, and 2 - private static readonly IReadOnlyDictionary<HttpStatusCode, string> V1_STAUTS_CODES; - private static readonly IReadOnlyDictionary<HttpStatusCode, string> V1_1_STATUS_CODES; - private static readonly IReadOnlyDictionary<HttpStatusCode, string> V2_STAUTS_CODES; + //Pre-compiled strings for all status codes for http 0.9 1, 1.1 + private static readonly IReadOnlyDictionary<HttpStatusCode, string> V0_9_STATUS_CODES = GetStatusCodes("0.9"); + private static readonly IReadOnlyDictionary<HttpStatusCode, string> V1_STAUTS_CODES = GetStatusCodes("1.0"); + private static readonly IReadOnlyDictionary<HttpStatusCode, string> V1_1_STATUS_CODES = GetStatusCodes("1.1"); + private static readonly IReadOnlyDictionary<HttpStatusCode, string> V2_STATUS_CODES = GetStatusCodes("2.0"); - static HttpHelpers() + private static IReadOnlyDictionary<HttpStatusCode, string> GetStatusCodes(string version) { + //Setup status code dict + Dictionary<HttpStatusCode, string> statusCodes = new(); + //Get all status codes + foreach (HttpStatusCode code in Enum.GetValues<HttpStatusCode>()) { - //Setup status code dict - Dictionary<HttpStatusCode, string> v1status = new(); - Dictionary<HttpStatusCode, string> v11status = new(); - Dictionary<HttpStatusCode, string> v2status = new(); - //Get all status codes - foreach (HttpStatusCode code in Enum.GetValues<HttpStatusCode>()) - { - //Use a regex to write the status code value as a string - v1status[code] = $"HTTP/1.0 {(int)code} {HttpRequestBuilderRegex.Replace(code.ToString(), " $1")}"; - v11status[code] = $"HTTP/1.1 {(int)code} {HttpRequestBuilderRegex.Replace(code.ToString(), " $1")}"; - v2status[code] = $"HTTP/2.0 {(int)code} {HttpRequestBuilderRegex.Replace(code.ToString(), " $1")}"; - } - //Store as readonly - V1_STAUTS_CODES = v1status; - V1_1_STATUS_CODES = v11status; - V2_STAUTS_CODES = v2status; + //Use a regex to write the status code value as a string + statusCodes[code] = $"HTTP/{version} {(int)code} {HttpRequestBuilderRegex.Replace(code.ToString(), " $1")}"; } + return statusCodes; + } + + private static IReadOnlyDictionary<int, HttpMethod> HashHttpMethods() + { + /* + * Http methods are hashed at runtime using the HttpMethod enum + * values, purley for compatability and automation + */ + Dictionary<int, HttpMethod> methods = new(); + //Add all HTTP methods + foreach (HttpMethod method in Enum.GetValues<HttpMethod>()) { - /* - * Http methods are hashed at runtime using the HttpMethod enum - * values, purley for compatability and automation - */ - Dictionary<int, HttpMethod> methods = new(); - //Add all HTTP methods - foreach (HttpMethod method in Enum.GetValues<HttpMethod>()) + //Exclude the not supported method + if (method == HttpMethod.None) { - //Exclude the not supported method - if (method == HttpMethod.None) - { - continue; - } - //Store method string's hashcode for faster lookups - methods[string.GetHashCode(method.ToString(), StringComparison.OrdinalIgnoreCase)] = method; + continue; } - MethodHashLookup = methods; + //Store method string's hashcode for faster lookups + methods[string.GetHashCode(method.ToString(), StringComparison.OrdinalIgnoreCase)] = method; } - { - /* - * Pre-compute common headers - */ - Dictionary<int, HttpRequestHeader> requestHeaderHashes = new(); + return methods; + } - //Add all HTTP methods - foreach (string headerValue in RequestHeaderLookup.Keys) - { - //Compute the hashcode for the header value - int hashCode = string.GetHashCode(headerValue, StringComparison.OrdinalIgnoreCase); - //Store the http header enum value with the hash-code of the string of said header - requestHeaderHashes[hashCode] = RequestHeaderLookup[headerValue]; - } + private static IReadOnlyDictionary<int, HttpRequestHeader> HashRequestHeaders() + { + /* + * Pre-compute common headers + */ + Dictionary<int, HttpRequestHeader> requestHeaderHashes = new(); - RequestHeaderHashLookup = requestHeaderHashes; + //Add all HTTP methods + foreach (string headerValue in RequestHeaderLookup.Keys) + { + //Compute the hashcode for the header value + int hashCode = string.GetHashCode(headerValue, StringComparison.OrdinalIgnoreCase); + //Store the http header enum value with the hash-code of the string of said header + requestHeaderHashes[hashCode] = RequestHeaderLookup[headerValue]; } - } + return requestHeaderHashes; + } /// <summary> @@ -358,8 +352,10 @@ namespace VNLib.Net.Http { return version switch { + HttpVersion.Http09 => V0_9_STATUS_CODES[code], HttpVersion.Http1 => V1_STAUTS_CODES[code], - HttpVersion.Http2 => V2_STAUTS_CODES[code], + HttpVersion.Http2 => V2_STATUS_CODES[code], + //Default to HTTP/1.1 _ => V1_1_STATUS_CODES[code], }; } diff --git a/lib/Net.Messaging.FBM/src/Client/FBMClient.cs b/lib/Net.Messaging.FBM/src/Client/FBMClient.cs index ee64d53..9628313 100644 --- a/lib/Net.Messaging.FBM/src/Client/FBMClient.cs +++ b/lib/Net.Messaging.FBM/src/Client/FBMClient.cs @@ -47,7 +47,7 @@ namespace VNLib.Net.Messaging.FBM.Client /// A Fixed Buffer Message Protocol client. Allows for high performance client-server messaging /// with minimal memory overhead. /// </summary> - public class FBMClient : VnDisposeable, IStatefulConnection, ICacheHolder + public class FBMClient : VnDisposeable, IStatefulConnection { /// <summary> /// The WS connection query arguments to specify a receive buffer size @@ -77,9 +77,12 @@ namespace VNLib.Net.Messaging.FBM.Client private readonly SemaphoreSlim SendLock; private readonly ConcurrentDictionary<int, FBMRequest> ActiveRequests; - private readonly ObjectRental<FBMRequest> RequestRental; + private readonly IFBMMemoryHandle _streamBuffer; + private readonly IFbmClientWebsocket _socket; private readonly FBMClientConfig _config; - private readonly byte[] _streamBuffer; + + private readonly IObjectRental<FBMRequest> _requestRental; + private readonly bool _ownsObjectRenal; /// <summary> /// The configuration for the current client @@ -90,34 +93,63 @@ namespace VNLib.Net.Messaging.FBM.Client /// A handle that is reset when a connection has been successfully set, and is set /// when the connection exists /// </summary> - public ManualResetEvent ConnectionStatusHandle { get; } - + public ManualResetEvent ConnectionStatusHandle { get; } + /// <summary> - /// The <see cref="ClientWebSocket"/> to send/recieve message on + /// The client's http header collection used when making connections /// </summary> - public ManagedClientWebSocket ClientSocket { get; } - + public VnWebHeaderCollection Headers { get; } /// <summary> - /// Creates a new <see cref="FBMClient"/> in a closed state + /// Creates an immutable FBMClient that wraps the supplied web socket using the + /// supplied config. /// </summary> - /// <param name="config">The client configuration</param> - public FBMClient(FBMClientConfig config) + /// <param name="config">The client config</param> + /// <param name="websocket">The websocket instance used to comunicate with an FBMServer</param> + public FBMClient(in FBMClientConfig config, IFbmClientWebsocket websocket) + :this(in config, websocket, null) { - RequestRental = ObjectRental.CreateReusable(ReuseableRequestConstructor, 200); + } + + internal FBMClient(in FBMClientConfig config, IFbmClientWebsocket websocket, IObjectRental<FBMRequest>? requestRental) + { + _config = config; + _socket = websocket ?? throw new ArgumentNullException(nameof(websocket)); + + _ = config.MemoryManager ?? throw new ArgumentException("FBM memory manager is required", nameof(config)); + + //Create new request rental if none supplied + if(requestRental is null) + { + _ownsObjectRenal = true; + _requestRental = ObjectRental.CreateReusable(ReuseableRequestConstructor, 100); + } + else + { + _requestRental = requestRental; + } + + Headers = new(); SendLock = new(1); ConnectionStatusHandle = new(true); ActiveRequests = new(Environment.ProcessorCount, 100); - _config = config; - //Init the new client socket - ClientSocket = new(config.RecvBufferSize, config.RecvBufferSize, config.KeepAliveInterval, config.SubProtocol); - - //Init the stream buffer + /* + * We can use the pool to allocate a single stream buffer that will be shared. + * This is because there is only 1 thread allowed to send/copy data at a time + * so it can be allocated once and shared + */ int maxStrmBufSize = Math.Min(config.MaxMessageSize, MAX_STREAM_BUFFER_SIZE); - _streamBuffer = new byte[maxStrmBufSize]; + _streamBuffer = config.MemoryManager.InitHandle(); + config.MemoryManager.AllocBuffer(_streamBuffer, maxStrmBufSize); } + /// <summary> + /// Allocates and configures a new <see cref="FBMRequest"/> message object for use within the reusable store + /// </summary> + /// <returns>The configured <see cref="FBMRequest"/></returns> + protected virtual FBMRequest ReuseableRequestConstructor() => new(in _config); + private void Debug(string format, params string[] strings) { if(Config.DebugLog != null) @@ -125,6 +157,7 @@ namespace VNLib.Net.Messaging.FBM.Client Config.DebugLog.Debug($"[DEBUG] FBM Client: {format}", strings); } } + private void Debug(string format, long value, long other) { if (Config.DebugLog != null) @@ -132,12 +165,7 @@ namespace VNLib.Net.Messaging.FBM.Client Config.DebugLog.Debug($"[DEBUG] FBM Client: {format}", value, other); } } - - /// <summary> - /// Allocates and configures a new <see cref="FBMRequest"/> message object for use within the reusable store - /// </summary> - /// <returns>The configured <see cref="FBMRequest"/></returns> - protected virtual FBMRequest ReuseableRequestConstructor() => new(in _config); + /// <summary> /// Asynchronously opens a websocket connection with the specifed remote server @@ -157,28 +185,28 @@ namespace VNLib.Net.Messaging.FBM.Client Debug("Connection string {con}", urib.Uri.ToString()); //Connect to server - await ClientSocket.ConnectAsync(urib.Uri, cancellationToken); + await _socket.ConnectAsync(urib.Uri, Headers, cancellationToken); //Reset wait handle before return ConnectionStatusHandle.Reset(); - //Begin listeing for requets in a background task - _ = Task.Run(() => ProcessContinuousRecvAsync(ClientSocket), cancellationToken); + //Begin listeing for requests in a background task + _ = Task.Run(ProcessContinuousRecvAsync, cancellationToken); } /// <summary> - /// Rents a new <see cref="FBMRequest"/> from the internal <see cref="ReusableStore{T}"/>. + /// Rents a new <see cref="FBMRequest"/> from the internal <see cref="ObjectRental{T}"/>. /// Use <see cref="ReturnRequest(FBMRequest)"/> when request is no longer in use /// </summary> /// <returns>The configured (rented or new) <see cref="FBMRequest"/> ready for use</returns> - public FBMRequest RentRequest() => RequestRental.Rent(); + public FBMRequest RentRequest() => _requestRental.Rent(); /// <summary> /// Stores (or returns) the reusable request in cache for use with <see cref="RentRequest"/> /// </summary> /// <param name="request">The request to return to the store</param> /// <exception cref="InvalidOperationException"></exception> - public void ReturnRequest(FBMRequest request) => RequestRental.Return(request); + public void ReturnRequest(FBMRequest request) => _requestRental.Return(request); /// <summary> /// Sends a <see cref="FBMRequest"/> to the connected server @@ -190,7 +218,7 @@ namespace VNLib.Net.Messaging.FBM.Client /// <exception cref="ObjectDisposedException"></exception> /// <exception cref="InvalidOperationException"></exception> /// <exception cref="FBMInvalidRequestException"></exception> - public Task<FBMResponse> SendAsync(FBMRequest request, CancellationToken cancellationToken = default) => SendAsync(request, _config.RequestTimeout, cancellationToken); + public Task<FBMResponse> SendAsync(FBMRequest request, CancellationToken cancellationToken = default) => SendAsync(request, Config.RequestTimeout, cancellationToken); /// <summary> /// Sends a <see cref="FBMRequest"/> to the connected server @@ -229,7 +257,7 @@ namespace VNLib.Net.Messaging.FBM.Client using (SemSlimReleaser releaser = await SendLock.GetReleaserAsync(cancellationToken)) { //Send the data to the server - await ClientSocket.SendAsync(requestData, WebSocketMessageType.Binary, true, cancellationToken); + await _socket.SendAsync(requestData, WebSocketMessageType.Binary, true, cancellationToken); } //wait for the response to be set @@ -267,7 +295,7 @@ namespace VNLib.Net.Messaging.FBM.Client /// <exception cref="ObjectDisposedException"></exception> /// <exception cref="InvalidOperationException"></exception> public Task<FBMResponse> StreamDataAsync(FBMRequest request, Stream payload, ContentType contentType, CancellationToken cancellationToken = default) - => StreamDataAsync(request, payload, contentType, _config.RequestTimeout, cancellationToken); + => StreamDataAsync(request, payload, contentType, Config.RequestTimeout, cancellationToken); /// <summary> /// Streams arbitrary binary data to the server with the initial request message @@ -283,6 +311,8 @@ namespace VNLib.Net.Messaging.FBM.Client /// <exception cref="InvalidOperationException"></exception> public async Task<FBMResponse> StreamDataAsync(FBMRequest request, Stream payload, ContentType contentType, TimeSpan timeout, CancellationToken cancellationToken = default) { + _ = payload ?? throw new ArgumentNullException(nameof(payload)); + Check(); cancellationToken.ThrowIfCancellationRequested(); @@ -303,17 +333,19 @@ namespace VNLib.Net.Messaging.FBM.Client //Write an empty body in the request so a content type header is writen request.WriteBody(ReadOnlySpan<byte>.Empty, contentType); + Memory<byte> bufferMemory = _streamBuffer.GetMemory(); + //Wait for send-lock using (SemSlimReleaser releaser = await SendLock.GetReleaserAsync(cancellationToken)) { //Send the initial request packet - await ClientSocket.SendAsync(requestData, WebSocketMessageType.Binary, false, cancellationToken); + await _socket.SendAsync(requestData, WebSocketMessageType.Binary, false, cancellationToken); //Stream mesage body do { //Read data - int read = await payload.ReadAsync(_streamBuffer, cancellationToken); + int read = await payload.ReadAsync(bufferMemory, cancellationToken); if (read == 0) { @@ -322,7 +354,7 @@ namespace VNLib.Net.Messaging.FBM.Client } //write message to socket, if the read data was smaller than the buffer, we can send the last packet - await ClientSocket.SendAsync(_streamBuffer[..read], WebSocketMessageType.Binary, read < _streamBuffer.Length, cancellationToken); + await _socket.SendAsync(bufferMemory[..read], WebSocketMessageType.Binary, read < bufferMemory.Length, cancellationToken); } while (true); } @@ -358,15 +390,9 @@ namespace VNLib.Net.Messaging.FBM.Client { Check(); //Close the connection - await ClientSocket.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, "Closing", cancellationToken); + await _socket.DisconnectAsync(WebSocketCloseStatus.NormalClosure, cancellationToken); } - ///<inheritdoc/> - public void CacheClear() => RequestRental.CacheClear(); - ///<inheritdoc/> - public void CacheHardClear() => RequestRental.CacheHardClear(); - - private void CheckOrEnqueue(FBMRequest request) { /* @@ -404,25 +430,27 @@ namespace VNLib.Net.Messaging.FBM.Client /// until the socket is closed, or canceled /// </summary> /// <returns></returns> - protected async Task ProcessContinuousRecvAsync(WebSocket socket) + protected async Task ProcessContinuousRecvAsync() { Debug("Begining receive loop"); //Alloc recv buffer - IFBMMemoryHandle recvBuffer = _config.MemoryManager.InitHandle(); - _config.MemoryManager.AllocBuffer(recvBuffer, _config.RecvBufferSize); + IFBMMemoryHandle recvBuffer = Config.MemoryManager.InitHandle(); + Config.MemoryManager.AllocBuffer(recvBuffer, Config.RecvBufferSize); try { - if(!_config.MemoryManager.TryGetHeap(out IUnmangedHeap? heap)) + if(!Config.MemoryManager.TryGetHeap(out IUnmangedHeap? heap)) { throw new NotSupportedException("The memory manager must support using IUnmanagedHeaps"); } + Memory<byte> rcvMemory = recvBuffer.GetMemory(); + //Recv event loop while (true) { //Listen for incoming packets with the intial data buffer - ValueWebSocketReceiveResult result = await socket.ReceiveAsync(recvBuffer.GetMemory(), CancellationToken.None); + ValueWebSocketReceiveResult result = await _socket.ReceiveAsync(rcvMemory, CancellationToken.None); //If the message is a close message, its time to exit if (result.MessageType == WebSocketMessageType.Close) @@ -444,10 +472,10 @@ namespace VNLib.Net.Messaging.FBM.Client while (!result.EndOfMessage) { //recive more data - result = await socket.ReceiveAsync(recvBuffer.GetMemory(), CancellationToken.None); + result = await _socket.ReceiveAsync(rcvMemory, CancellationToken.None); //Make sure the buffer is not too large - if ((responseBuffer.Length + result.Count) > _config.MaxMessageSize) + if ((responseBuffer.Length + result.Count) > Config.MaxMessageSize) { //Dispose the buffer before exiting responseBuffer.Dispose(); @@ -485,9 +513,7 @@ namespace VNLib.Net.Messaging.FBM.Client finally { //Dispose the recv buffer - _config.MemoryManager.FreeBuffer(recvBuffer); - //Cleanup the socket when exiting - ClientSocket.Cleanup(); + Config.MemoryManager.FreeBuffer(recvBuffer); //Set status handle as unset ConnectionStatusHandle.Set(); @@ -573,12 +599,19 @@ namespace VNLib.Net.Messaging.FBM.Client ///<inheritdoc/> protected override void Free() { - //Dispose socket - ClientSocket.Dispose(); + //Free stream buffer + Config.MemoryManager.FreeBuffer(_streamBuffer); + //Dispose client buffer - RequestRental.Dispose(); + _socket.Dispose(); SendLock.Dispose(); ConnectionStatusHandle.Dispose(); + + //Dispose object rental if we own it + if (_ownsObjectRenal && _requestRental is IDisposable disp) + { + disp.Dispose(); + } } } } diff --git a/lib/Net.Messaging.FBM/src/Client/FBMClientFactory.cs b/lib/Net.Messaging.FBM/src/Client/FBMClientFactory.cs new file mode 100644 index 0000000..51d3768 --- /dev/null +++ b/lib/Net.Messaging.FBM/src/Client/FBMClientFactory.cs @@ -0,0 +1,85 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Net.Messaging.FBM +* File: FBMClientFactory.cs +* +* FBMClientFactory.cs is part of VNLib.Net.Messaging.FBM which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; + +using VNLib.Utils.Memory.Caching; + +namespace VNLib.Net.Messaging.FBM.Client +{ + /// <summary> + /// An FBMClient factory that creates immutable clients from fbm + /// websockets + /// </summary> + public sealed class FBMClientFactory: ICacheHolder + { + private readonly ObjectRental<FBMRequest> _internalRequestPool; + private readonly FBMClientConfig _config; + private readonly IFbmWebsocketFactory _socketMan; + + /// <summary> + /// Initlaizes a new client factory from the websocket manager + /// </summary> + /// <param name="config">The configuration state</param> + /// <param name="webSocketManager">The client websocket factory</param> + /// <exception cref="ArgumentNullException"></exception> + public FBMClientFactory(in FBMClientConfig config, IFbmWebsocketFactory webSocketManager) + { + _config = config; + _ = config.MemoryManager ?? throw new ArgumentException("The client memory manager must not be null", nameof(config)); + _socketMan = webSocketManager ?? throw new ArgumentNullException(nameof(webSocketManager)); + _internalRequestPool = ObjectRental.CreateReusable(ReuseableRequestConstructor, 1000); + } + + /// <summary> + /// The configuration for the current client + /// </summary> + public ref readonly FBMClientConfig Config => ref _config; + + /// <summary> + /// Allocates and configures a new <see cref="FBMRequest"/> message object for use within the reusable store + /// </summary> + /// <returns>The configured <see cref="FBMRequest"/></returns> + private FBMRequest ReuseableRequestConstructor() => new(in _config); + + /// <summary> + /// Initializes a new websocket and creates a new <see cref="FBMClient"/> instance + /// </summary> + /// <returns>The initialized FBM client instance</returns> + public FBMClient CreateClient() + { + //Init new socket + IFbmClientWebsocket socket = _socketMan.CreateWebsocket(in _config); + + //Create client wrapper + return new(in _config, socket, _internalRequestPool); + } + + ///<inheritdoc/> + public void CacheClear() => _internalRequestPool.CacheClear(); + + ///<inheritdoc/> + public void CacheHardClear() => _internalRequestPool.CacheHardClear(); + } +} diff --git a/lib/Net.Messaging.FBM/src/Client/FBMClientWorkerBase.cs b/lib/Net.Messaging.FBM/src/Client/FBMClientWorkerBase.cs deleted file mode 100644 index b4056dc..0000000 --- a/lib/Net.Messaging.FBM/src/Client/FBMClientWorkerBase.cs +++ /dev/null @@ -1,125 +0,0 @@ -/* -* Copyright (c) 2022 Vaughn Nugent -* -* Library: VNLib -* Package: VNLib.Net.Messaging.FBM -* File: FBMClientWorkerBase.cs -* -* FBMClientWorkerBase.cs is part of VNLib.Net.Messaging.FBM which is part of the larger -* VNLib collection of libraries and utilities. -* -* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License as -* published by the Free Software Foundation, either version 3 of the -* License, or (at your option) any later version. -* -* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see https://www.gnu.org/licenses/. -*/ - -using System; -using System.Threading; -using System.Threading.Tasks; - -using VNLib.Utils; - -namespace VNLib.Net.Messaging.FBM.Client -{ - /// <summary> - /// A base class for objects that implement <see cref="FBMClient"/> - /// operations - /// </summary> - public abstract class FBMClientWorkerBase : VnDisposeable, IStatefulConnection - { - /// <summary> - /// Allows configuration of websocket configuration options - /// </summary> - public ManagedClientWebSocket SocketOptions => Client.ClientSocket; - -#nullable disable - /// <summary> - /// The <see cref="FBMClient"/> to sent requests from - /// </summary> - public FBMClient Client { get; private set; } - - /// <summary> - /// Raised when the client has connected successfully - /// </summary> - public event Action<FBMClient, FBMClientWorkerBase> Connected; -#nullable enable - - ///<inheritdoc/> - public event EventHandler ConnectionClosed - { - add => Client.ConnectionClosed += value; - remove => Client.ConnectionClosed -= value; - } - - /// <summary> - /// Creates and initializes a the internal <see cref="FBMClient"/> - /// </summary> - /// <param name="config">The client config</param> - protected void InitClient(in FBMClientConfig config) - { - Client = new(config); - Client.ConnectionClosedOnError += Client_ConnectionClosedOnError; - Client.ConnectionClosed += Client_ConnectionClosed; - } - - private void Client_ConnectionClosed(object? sender, EventArgs e) => OnDisconnected(); - private void Client_ConnectionClosedOnError(object? sender, FMBClientErrorEventArgs e) => OnError(e); - - /// <summary> - /// Asynchronously connects to a remote server by the specified uri - /// </summary> - /// <param name="serverUri">The remote uri of a server to connect to</param> - /// <param name="cancellationToken">A token to cancel the connect operation</param> - /// <returns>A task that compeltes when the client has connected to the remote server</returns> - public virtual async Task ConnectAsync(Uri serverUri, CancellationToken cancellationToken = default) - { - //Connect to server - await Client.ConnectAsync(serverUri, cancellationToken).ConfigureAwait(true); - //Invoke child on-connected event - OnConnected(); - Connected?.Invoke(Client, this); - } - - /// <summary> - /// Asynchronously disonnects a client only if the client is currently connected, - /// returns otherwise - /// </summary> - /// <param name="cancellationToken"></param> - /// <returns>A task that compeltes when the client has disconnected</returns> - public virtual Task DisconnectAsync(CancellationToken cancellationToken = default) - { - return Client.DisconnectAsync(cancellationToken); - } - - /// <summary> - /// Invoked when a client has successfully connected to the remote server - /// </summary> - protected abstract void OnConnected(); - /// <summary> - /// Invoked when the client has disconnected cleanly - /// </summary> - protected abstract void OnDisconnected(); - /// <summary> - /// Invoked when the connected client is closed because of a connection error - /// </summary> - /// <param name="e">A <see cref="EventArgs"/> that contains the client error data</param> - protected abstract void OnError(FMBClientErrorEventArgs e); - - ///<inheritdoc/> - protected override void Free() - { - Client.ConnectionClosedOnError -= Client_ConnectionClosedOnError; - Client.ConnectionClosed -= Client_ConnectionClosed; - Client.Dispose(); - } - } -} diff --git a/lib/Net.Messaging.FBM/src/Client/FBMFallbackClientWsFactory.cs b/lib/Net.Messaging.FBM/src/Client/FBMFallbackClientWsFactory.cs new file mode 100644 index 0000000..5ee4142 --- /dev/null +++ b/lib/Net.Messaging.FBM/src/Client/FBMFallbackClientWsFactory.cs @@ -0,0 +1,117 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Net.Messaging.FBM +* File: FBMFallbackClientWsFactory.cs +* +* FBMFallbackClientWsFactory.cs is part of VNLib.Net.Messaging.FBM which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Buffers; +using System.Threading; +using System.Net.WebSockets; +using System.Threading.Tasks; + +using VNLib.Net.Http; + +namespace VNLib.Net.Messaging.FBM.Client +{ + /// <summary> + /// Creates a new <see cref="IFbmWebsocketFactory"/> that builds new client websockets + /// on demand using the <see cref="ClientWebSocket"/> .NET default implementation + /// </summary> + public class FBMFallbackClientWsFactory : IFbmWebsocketFactory + { + private readonly Action<ClientWebSocketOptions>? _onConfigure; + + /// <summary> + /// Initalizes a new <see cref="FBMFallbackClientWsFactory"/> instance + /// </summary> + /// <param name="onConfigureSocket">A callback function that allows users to configure sockets when created</param> + public FBMFallbackClientWsFactory(Action<ClientWebSocketOptions>? onConfigureSocket = null) + => _onConfigure = onConfigureSocket; + + ///<inheritdoc/> + public IFbmClientWebsocket CreateWebsocket(in FBMClientConfig clientConfig) + { + ClientWebSocket socket = new(); + + socket.Options.KeepAliveInterval = clientConfig.KeepAliveInterval; + + //Setup the socket receive buffer + byte[] poolBuffer = ArrayPool<byte>.Shared.Rent(clientConfig.MaxMessageSize); + socket.Options.SetBuffer(clientConfig.MaxMessageSize, clientConfig.MaxMessageSize, poolBuffer); + + //If config specifies a sub protocol, set it + if (!string.IsNullOrEmpty(clientConfig.SubProtocol)) + { + socket.Options.AddSubProtocol(clientConfig.SubProtocol); + } + + //invoke client configuration user callback + _onConfigure?.Invoke(socket.Options); + + return new FBMWebsocket(socket, poolBuffer); + } + + private sealed record class FBMWebsocket(ClientWebSocket Socket, byte[] Buffer) : IFbmClientWebsocket + { + ///<inheritdoc/> + public async Task ConnectAsync(Uri address, VnWebHeaderCollection headers, CancellationToken cancellation) + { + //Set headers + for (int i = 0; i < headers.Count; i++) + { + string name = headers.GetKey(i); + string? value = headers.Get(i); + //Set header + Socket.Options.SetRequestHeader(name, value); + } + + //Connect to server + await Socket.ConnectAsync(address, cancellation); + } + + ///<inheritdoc/> + public Task DisconnectAsync(WebSocketCloseStatus status, CancellationToken cancellation) + { + if (Socket?.State == WebSocketState.Open || Socket?.State == WebSocketState.CloseSent) + { + return Socket.CloseOutputAsync(status, "Socket disconnected", cancellation); + } + return Task.CompletedTask; + } + + ///<inheritdoc/> + public ValueTask<ValueWebSocketReceiveResult> ReceiveAsync(Memory<byte> buffer, CancellationToken cancellationToken) + => Socket.ReceiveAsync(buffer, cancellationToken); + + ///<inheritdoc/> + public ValueTask SendAsync(ReadOnlyMemory<byte> buffer, WebSocketMessageType messageType, bool endOfMessage, CancellationToken cancellationToken) + => Socket.SendAsync(buffer, messageType, endOfMessage, cancellationToken); + + public void Dispose() + { + //Remove buffer refs and return to pool + Socket.Dispose(); + ArrayPool<byte>.Shared.Return(Buffer); + } + } + } +} diff --git a/lib/Net.Messaging.FBM/src/Client/IFbmClientWebsocket.cs b/lib/Net.Messaging.FBM/src/Client/IFbmClientWebsocket.cs new file mode 100644 index 0000000..bea9c28 --- /dev/null +++ b/lib/Net.Messaging.FBM/src/Client/IFbmClientWebsocket.cs @@ -0,0 +1,76 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Net.Messaging.FBM +* File: IFbmClientWebsocket.cs +* +* IFbmClientWebsocket.cs is part of VNLib.Net.Messaging.FBM which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Threading; +using System.Net.WebSockets; +using System.Threading.Tasks; + +using VNLib.Net.Http; + +namespace VNLib.Net.Messaging.FBM.Client +{ + /// <summary> + /// Represents a client websocket + /// </summary> + public interface IFbmClientWebsocket: IDisposable + { + /// <summary> + /// Connects the client to the remote server at the specified address, + /// with the supplied headers + /// </summary> + /// <param name="address">The server address to connect to</param> + /// <param name="headers">A header collection used when making the initial upgrade request to the server</param> + /// <param name="cancellation">A token to cancel the connect operation</param> + /// <returns>A task that completes when the socket connection has been established</returns> + Task ConnectAsync(Uri address, VnWebHeaderCollection headers, CancellationToken cancellation); + + /// <summary> + /// Cleanly disconnects the connected web socket from the + /// remote server. + /// </summary> + /// <param name="status">The websocket status to send on closure</param> + /// <param name="cancellation">A token to cancel the operation</param> + /// <returns>A task that completes when the operation complets</returns> + Task DisconnectAsync(WebSocketCloseStatus status, CancellationToken cancellation); + + /// <summary> + /// Sends the supplied memory segment to the connected server + /// </summary> + /// <param name="buffer">The data buffer to send to the server</param> + /// <param name="messageType">A websocket message type</param> + /// <param name="endOfMessage">A value that indicates if the segment is the last message in the sequence</param> + /// <param name="cancellationToken">A token to cancel the send operation</param> + /// <returns>A value task that resovles when the message has been sent</returns> + ValueTask SendAsync(ReadOnlyMemory<byte> buffer, WebSocketMessageType messageType, bool endOfMessage, CancellationToken cancellationToken); + + /// <summary> + /// Receives data from the connected server and write data to the supplied buffer + /// </summary> + /// <param name="buffer">The buffer to write data to</param> + /// <param name="cancellationToken">A token to cancel the read operation</param> + /// <returns>A value task that completes with the receive result</returns> + ValueTask<ValueWebSocketReceiveResult> ReceiveAsync(Memory<byte> buffer, CancellationToken cancellationToken); + } +} diff --git a/lib/Net.Messaging.FBM/src/Client/IFbmWebsocketFactory.cs b/lib/Net.Messaging.FBM/src/Client/IFbmWebsocketFactory.cs new file mode 100644 index 0000000..24aff0c --- /dev/null +++ b/lib/Net.Messaging.FBM/src/Client/IFbmWebsocketFactory.cs @@ -0,0 +1,40 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Net.Messaging.FBM +* File: IFbmWebsocketFactory.cs +* +* IFbmWebsocketFactory.cs is part of VNLib.Net.Messaging.FBM which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + + +namespace VNLib.Net.Messaging.FBM.Client +{ + /// <summary> + /// Represents a factory that creates new client websockets on demand + /// </summary> + public interface IFbmWebsocketFactory + { + /// <summary> + /// Creates a client websocket for new connections, may be used once and discarded + /// </summary> + /// <param name="clientConfig">The fbm client configuration requesting the new websocket</param> + /// <returns>The client websocket</returns> + IFbmClientWebsocket CreateWebsocket(in FBMClientConfig clientConfig); + } +} diff --git a/lib/Net.Messaging.FBM/src/Client/ManagedClientWebSocket.cs b/lib/Net.Messaging.FBM/src/Client/ManagedClientWebSocket.cs deleted file mode 100644 index fc2e417..0000000 --- a/lib/Net.Messaging.FBM/src/Client/ManagedClientWebSocket.cs +++ /dev/null @@ -1,203 +0,0 @@ -/* -* Copyright (c) 2023 Vaughn Nugent -* -* Library: VNLib -* Package: VNLib.Net.Messaging.FBM -* File: ManagedClientWebSocket.cs -* -* ManagedClientWebSocket.cs is part of VNLib.Net.Messaging.FBM which is part of the larger -* VNLib collection of libraries and utilities. -* -* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License as -* published by the Free Software Foundation, either version 3 of the -* License, or (at your option) any later version. -* -* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see https://www.gnu.org/licenses/. -*/ - -using System; -using System.Net; -using System.Threading; -using System.Net.Security; -using System.Net.WebSockets; -using System.Threading.Tasks; -using System.Security.Cryptography.X509Certificates; - -using VNLib.Utils.Memory; - -namespace VNLib.Net.Messaging.FBM.Client -{ - - /// <summary> - /// A wrapper container to manage client websockets - /// </summary> - public class ManagedClientWebSocket : WebSocket - { - private readonly int TxBufferSize; - private readonly int RxBufferSize; - private readonly TimeSpan KeepAliveInterval; - private readonly ArrayPoolBuffer<byte> _dataBuffer; - private readonly string? _subProtocol; - - /// <summary> - /// A collection of headers to add to the client - /// </summary> - public WebHeaderCollection Headers { get; } - - public X509CertificateCollection Certificates { get; } - - public IWebProxy? Proxy { get; set; } - - public CookieContainer? Cookies { get; set; } - - public RemoteCertificateValidationCallback? RemoteCertificateValidationCallback { get; set; } - - - private ClientWebSocket? _socket; - - /// <summary> - /// Initiaizes a new <see cref="ManagedClientWebSocket"/> that accepts an optional sub-protocol for connections - /// </summary> - /// <param name="txBufferSize">The size (in bytes) of the send buffer size</param> - /// <param name="rxBufferSize">The size (in bytes) of the receive buffer size to use</param> - /// <param name="keepAlive">The WS keepalive interval</param> - /// <param name="subProtocol">The optional sub-protocol to use</param> - public ManagedClientWebSocket(int txBufferSize, int rxBufferSize, TimeSpan keepAlive, string? subProtocol) - { - //Init header collection - Headers = new(); - Certificates = new(); - //Alloc buffer - _dataBuffer = new(rxBufferSize); - TxBufferSize = txBufferSize; - RxBufferSize = rxBufferSize; - KeepAliveInterval = keepAlive; - _subProtocol = subProtocol; - } - - /// <summary> - /// Asyncrhonously prepares a new client web-socket and connects to the remote endpoint - /// </summary> - /// <param name="serverUri">The endpoint to connect to</param> - /// <param name="token">A token to cancel the connect operation</param> - /// <returns>A task that compeltes when the client has connected</returns> - public async Task ConnectAsync(Uri serverUri, CancellationToken token) - { - //Init new socket - _socket = new(); - try - { - //Set buffer - _socket.Options.SetBuffer(RxBufferSize, TxBufferSize, _dataBuffer.AsArraySegment()); - //Set remaining stored options - _socket.Options.ClientCertificates = Certificates; - _socket.Options.KeepAliveInterval = KeepAliveInterval; - _socket.Options.Cookies = Cookies; - _socket.Options.Proxy = Proxy; - _socket.Options.RemoteCertificateValidationCallback = RemoteCertificateValidationCallback; - //Specify sub-protocol - if (!string.IsNullOrEmpty(_subProtocol)) - { - _socket.Options.AddSubProtocol(_subProtocol); - } - //Set headers - for (int i = 0; i < Headers.Count; i++) - { - string name = Headers.GetKey(i); - string? value = Headers.Get(i); - //Set header - _socket.Options.SetRequestHeader(name, value); - } - //Connect to server - await _socket.ConnectAsync(serverUri, token); - } - catch - { - //Cleanup the socket - Cleanup(); - throw; - } - } - - /// <summary> - /// Cleans up internal resources to prepare for another connection - /// </summary> - public void Cleanup() - { - //Dispose old socket if set - _socket?.Dispose(); - _socket = null; - } - ///<inheritdoc/> - public override WebSocketCloseStatus? CloseStatus => _socket?.CloseStatus; - ///<inheritdoc/> - public override string CloseStatusDescription => _socket?.CloseStatusDescription ?? string.Empty; - ///<inheritdoc/> - public override WebSocketState State => _socket?.State ?? WebSocketState.Closed; - ///<inheritdoc/> - public override string SubProtocol => _subProtocol ?? string.Empty; - - - ///<inheritdoc/> - public override void Abort() - { - _socket?.Abort(); - } - ///<inheritdoc/> - public override Task CloseAsync(WebSocketCloseStatus closeStatus, string? statusDescription, CancellationToken cancellationToken) - { - return _socket?.CloseAsync(closeStatus, statusDescription, cancellationToken) ?? Task.CompletedTask; - } - ///<inheritdoc/> - public override Task CloseOutputAsync(WebSocketCloseStatus closeStatus, string? statusDescription, CancellationToken cancellationToken) - { - if (_socket?.State == WebSocketState.Open || _socket?.State == WebSocketState.CloseSent) - { - return _socket.CloseOutputAsync(closeStatus, statusDescription, cancellationToken); - } - return Task.CompletedTask; - } - ///<inheritdoc/> - public override ValueTask<ValueWebSocketReceiveResult> ReceiveAsync(Memory<byte> buffer, CancellationToken cancellationToken) - { - _ = _socket ?? throw new WebSocketException(WebSocketError.ConnectionClosedPrematurely, "The connected socket has been disconnected"); - - return _socket.ReceiveAsync(buffer, cancellationToken); - } - ///<inheritdoc/> - public override Task<WebSocketReceiveResult> ReceiveAsync(ArraySegment<byte> buffer, CancellationToken cancellationToken) - { - _ = _socket ?? throw new WebSocketException(WebSocketError.ConnectionClosedPrematurely, "The connected socket has been disconnected"); - - return _socket.ReceiveAsync(buffer, cancellationToken); - } - ///<inheritdoc/> - public override ValueTask SendAsync(ReadOnlyMemory<byte> buffer, WebSocketMessageType messageType, bool endOfMessage, CancellationToken cancellationToken) - { - _ = _socket ?? throw new WebSocketException(WebSocketError.ConnectionClosedPrematurely, "The connected socket has been disconnected"); - return _socket.SendAsync(buffer, messageType, endOfMessage, cancellationToken); - } - ///<inheritdoc/> - public override Task SendAsync(ArraySegment<byte> buffer, WebSocketMessageType messageType, bool endOfMessage, CancellationToken cancellationToken) - { - _ = _socket ?? throw new WebSocketException(WebSocketError.ConnectionClosedPrematurely, "The connected socket has been disconnected"); - return _socket.SendAsync(buffer, messageType, endOfMessage, cancellationToken); - } - - ///<inheritdoc/> - public override void Dispose() - { - //Free buffer - _dataBuffer.Dispose(); - _socket?.Dispose(); - GC.SuppressFinalize(this); - } - } -} diff --git a/lib/Plugins.Essentials/src/Accounts/PasswordHashing.cs b/lib/Plugins.Essentials/src/Accounts/PasswordHashing.cs index 509a8d0..d80ee06 100644 --- a/lib/Plugins.Essentials/src/Accounts/PasswordHashing.cs +++ b/lib/Plugins.Essentials/src/Accounts/PasswordHashing.cs @@ -184,7 +184,7 @@ namespace VNLib.Plugins.Essentials.Accounts } finally { - MemoryUtil.InitializeBlock(buffer.Span); + MemoryUtil.InitializeBlock(ref buffer.GetReference(),buffer.IntLength); } } diff --git a/lib/Utils.Memory/NativeHeapApi/src/NativeHeapApi.h b/lib/Utils.Memory/NativeHeapApi/src/NativeHeapApi.h index eec4a8a..c5d07b2 100644 --- a/lib/Utils.Memory/NativeHeapApi/src/NativeHeapApi.h +++ b/lib/Utils.Memory/NativeHeapApi/src/NativeHeapApi.h @@ -73,7 +73,11 @@ typedef enum HeapCreationFlags /// <summary> /// Specifies that the requested heap will be a shared heap for the process/library /// </summary> - HEAP_CREATION_IS_SHARED = 0x04 + HEAP_CREATION_IS_SHARED = 0x04, + /// <summary> + /// Specifies that the heap will support block reallocation + /// </summary> + HEAP_CREATION_SUPPORTS_REALLOC = 0x08, } HeapCreationFlags; /// <summary> diff --git a/lib/Utils/src/Extensions/MemoryExtensions.cs b/lib/Utils/src/Extensions/MemoryExtensions.cs index 7ff9936..58d3d23 100644 --- a/lib/Utils/src/Extensions/MemoryExtensions.cs +++ b/lib/Utils/src/Extensions/MemoryExtensions.cs @@ -450,7 +450,6 @@ namespace VNLib.Utils.Extensions /// <exception cref="ArgumentException"></exception> /// <exception cref="OutOfMemoryException"></exception> /// <exception cref="ObjectDisposedException"></exception> - [MethodImpl(MethodImplOptions.AggressiveInlining)] public static unsafe MemoryHandle<T> Alloc<T>(this IUnmangedHeap heap, nuint elements, bool zero = false) where T : unmanaged { _ = heap ?? throw new ArgumentNullException(nameof(heap)); @@ -821,7 +820,59 @@ namespace VNLib.Utils.Extensions /// <returns>The sub-sequence of the current handle</returns> /// <exception cref="ArgumentOutOfRangeException"></exception> [MethodImpl(MethodImplOptions.AggressiveInlining)] - public static Span<T> AsSpan<T>(this IMemoryHandle<T> handle, int start) => handle.Span[start..]; + public static Span<T> AsSpan<T>(this IMemoryHandle<T> handle, nint start) + { + _ = handle ?? throw new ArgumentNullException(nameof(handle)); + //calculate a remaining count + int count = (int)(handle.Length - (nuint)start); + //call the other overload + return AsSpan(handle, start, count); + } + + /// <summary> + /// Creates a new sub-sequence over the target handle. (allows for convient sub span) + /// </summary> + /// <typeparam name="T"></typeparam> + /// <param name="handle"></param> + /// <param name="start">Intial offset into the handle</param> + /// <returns>The sub-sequence of the current handle</returns> + /// <exception cref="ArgumentOutOfRangeException"></exception> + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public static Span<T> AsSpan<T>(this IMemoryHandle<T> handle, int start) => AsSpan(handle, (nint)start); + + /// <summary> + /// Creates a new sub-sequence over the target handle. (allows for convient sub span) + /// </summary> + /// <typeparam name="T"></typeparam> + /// <param name="handle"></param> + /// <param name="start">Intial offset into the handle</param> + /// <param name="count">The number of elements within the new sequence</param> + /// <returns>The sub-sequence of the current handle</returns> + /// <exception cref="ArgumentOutOfRangeException"></exception> + public static Span<T> AsSpan<T>(this IMemoryHandle<T> handle, nint start, int count) + { + _ = handle ?? throw new ArgumentNullException(nameof(handle)); + if(start < 0) + { + throw new ArgumentOutOfRangeException(nameof(start)); + } + if(count < 0) + { + throw new ArgumentOutOfRangeException(nameof(count)); + } + + //guard against buffer overrun + MemoryUtil.CheckBounds(handle, (nuint)start, (nuint)count); + + if(count == 0) + { + return Span<T>.Empty; + } + + //Get the offset ref and create a new span from the pointer + ref T asRef = ref handle.GetOffsetRef((nuint)start); + return MemoryMarshal.CreateSpan(ref asRef, count); + } /// <summary> /// Creates a new sub-sequence over the target handle. (allows for convient sub span) @@ -833,7 +884,7 @@ namespace VNLib.Utils.Extensions /// <returns>The sub-sequence of the current handle</returns> /// <exception cref="ArgumentOutOfRangeException"></exception> [MethodImpl(MethodImplOptions.AggressiveInlining)] - public static Span<T> AsSpan<T>(this IMemoryHandle<T> handle, int start, int count) => handle.Span.Slice(start, count); + public static Span<T> AsSpan<T>(this IMemoryHandle<T> handle, int start, int count) => AsSpan(handle, (nint)start, count); /// <summary> /// Creates a new sub-sequence over the target handle. (allows for convient sub span) diff --git a/lib/Utils/src/Extensions/StringExtensions.cs b/lib/Utils/src/Extensions/StringExtensions.cs index f211b73..42b7b32 100644 --- a/lib/Utils/src/Extensions/StringExtensions.cs +++ b/lib/Utils/src/Extensions/StringExtensions.cs @@ -439,7 +439,7 @@ namespace VNLib.Utils.Extensions char t = data[end - 1]; //If character \r or \n slice it off - if (t != '\r' && t != '\n' && t != ' ') + if (t != '\r' && t != '\n' && t != ' ') { break; } diff --git a/lib/Utils/src/Memory/Caching/LRUCache.cs b/lib/Utils/src/Memory/Caching/LRUCache.cs index 0c8e169..4cadc9d 100644 --- a/lib/Utils/src/Memory/Caching/LRUCache.cs +++ b/lib/Utils/src/Memory/Caching/LRUCache.cs @@ -67,24 +67,24 @@ namespace VNLib.Utils.Memory.Caching //A record needs to be evicted before a new record can be added //Get the oldest node from the list to reuse its instance and remove the old value - LinkedListNode<KeyValuePair<TKey, TValue>> oldNode = List.First!; //not null because count is at max capacity so an item must be at the end of the list - //Store old node value field on the stack - ref KeyValuePair<TKey, TValue> oldRecord = ref oldNode.ValueRef; + LinkedListNode<KeyValuePair<TKey, TValue>> node = List.First!; //not null because count is at max capacity so an item must be at the end of the list //Remove from lookup - LookupTable.Remove(oldRecord.Key); + LookupTable.Remove(node.ValueRef.Key); //Remove the node List.RemoveFirst(); //Invoke evicted method - Evicted(ref oldRecord); + Evicted(ref node.ValueRef); //Reuse the old ll node - oldNode.Value = item; + node.Value = item; + //add lookup with new key - LookupTable.Add(item.Key, oldNode); + LookupTable.Add(item.Key, node); + //Add to end of list - List.AddLast(oldNode); + List.AddLast(node); } else { diff --git a/lib/Utils/src/Memory/Caching/ObjectRental.cs b/lib/Utils/src/Memory/Caching/ObjectRental.cs index 212aedb..86ea63a 100644 --- a/lib/Utils/src/Memory/Caching/ObjectRental.cs +++ b/lib/Utils/src/Memory/Caching/ObjectRental.cs @@ -36,11 +36,6 @@ namespace VNLib.Utils.Memory.Caching /// <typeparam name="T">The data type to reuse</typeparam> public class ObjectRental<T> : ObjectRental, IObjectRental<T>, ICacheHolder where T: class { - /// <summary> - /// The initial data-structure capacity if quota is not defined - /// </summary> - public const int INITIAL_STRUCTURE_SIZE = 50; - protected readonly ConcurrentStack<T> Storage; protected readonly Action<T>? ReturnAction; @@ -157,12 +152,8 @@ namespace VNLib.Utils.Memory.Caching /// while holding the lock /// </summary> /// <returns></returns> - protected T[] GetElementsWithLock() - { - //Enumerate all items to the array - return Storage.ToArray(); - } - + protected T[] GetElementsWithLock() => Storage.ToArray(); + /// <inheritdoc/> /// <exception cref="ObjectDisposedException"></exception> @@ -182,7 +173,9 @@ namespace VNLib.Utils.Memory.Caching if (IsDisposableType) { - T[] result = GetElementsWithLock(); + //Get all elements and clear the store + T[] result = Storage.ToArray(); + Storage.Clear(); //Dispose all elements foreach (T element in result) diff --git a/lib/Utils/src/Memory/MemoryUtil.cs b/lib/Utils/src/Memory/MemoryUtil.cs index 068a99a..75e2d4c 100644 --- a/lib/Utils/src/Memory/MemoryUtil.cs +++ b/lib/Utils/src/Memory/MemoryUtil.cs @@ -682,7 +682,7 @@ namespace VNLib.Utils.Memory /// <param name="target">A reference to the first byte of the memory location to copy the struct data to</param> /// <exception cref="ArgumentNullException"></exception> [MethodImpl(MethodImplOptions.AggressiveInlining)] - public static void CopyStruct<T>(IntPtr source, ref byte target) where T : unmanaged => CopyStruct<T>(source.ToPointer(), ref target); + public static void CopyStruct<T>(IntPtr source, ref byte target) where T : unmanaged => CopyStruct(ref GetRef<T>(source), ref target); /// <summary> diff --git a/lib/Utils/src/Memory/PrivateString.cs b/lib/Utils/src/Memory/PrivateString.cs index 8300b97..3952084 100644 --- a/lib/Utils/src/Memory/PrivateString.cs +++ b/lib/Utils/src/Memory/PrivateString.cs @@ -142,6 +142,7 @@ namespace VNLib.Utils.Memory /// A nullable cast to a <see cref="PrivateString"/> /// </summary> /// <param name="data"></param> + [return:NotNullIfNotNull(nameof(data))] public static explicit operator PrivateString?(string? data) => ToPrivateString(data, true); /// <summary> |