diff options
author | vnugent <public@vaughnnugent.com> | 2023-03-06 01:53:26 -0500 |
---|---|---|
committer | vnugent <public@vaughnnugent.com> | 2023-03-06 01:53:26 -0500 |
commit | dab71d5597fdfbe71f6ac310a240835716e952a5 (patch) | |
tree | 7f5d3b6052f6ff7c90442527050c236d08f57889 /lib | |
parent | b78036ed9a1030d619b8f9de4dceabbfaa07861f (diff) |
FBM and caching non-breaking updates
Diffstat (limited to 'lib')
-rw-r--r-- | lib/Net.Messaging.FBM/src/Client/FBMClient.cs | 124 | ||||
-rw-r--r-- | lib/Net.Messaging.FBM/src/Client/FBMClientConfig.cs | 7 | ||||
-rw-r--r-- | lib/Net.Messaging.FBM/src/Client/FBMRequest.cs | 148 | ||||
-rw-r--r-- | lib/Net.Messaging.FBM/src/Client/IFBMMessageWaiter.cs | 75 | ||||
-rw-r--r-- | lib/Plugins.Essentials/src/Extensions/ConnectionInfoExtensions.cs | 32 | ||||
-rw-r--r-- | lib/Plugins.Essentials/src/Extensions/HttpCookie.cs | 53 |
6 files changed, 367 insertions, 72 deletions
diff --git a/lib/Net.Messaging.FBM/src/Client/FBMClient.cs b/lib/Net.Messaging.FBM/src/Client/FBMClient.cs index c4f9f54..5f99d69 100644 --- a/lib/Net.Messaging.FBM/src/Client/FBMClient.cs +++ b/lib/Net.Messaging.FBM/src/Client/FBMClient.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2022 Vaughn Nugent +* Copyright (c) 2023 Vaughn Nugent * * Library: VNLib * Package: VNLib.Net.Messaging.FBM @@ -40,6 +40,7 @@ using VNLib.Utils.Memory.Caching; namespace VNLib.Net.Messaging.FBM.Client { + /// <summary> /// A Fixed Buffer Message Protocol client. Allows for high performance client-server messaging /// with minimal memory overhead. @@ -73,6 +74,7 @@ namespace VNLib.Net.Messaging.FBM.Client private readonly ConcurrentDictionary<int, FBMRequest> ActiveRequests; private readonly ReusableStore<FBMRequest> RequestRental; private readonly FBMRequest _controlFrame; + /// <summary> /// The configuration for the current client /// </summary> @@ -105,7 +107,7 @@ namespace VNLib.Net.Messaging.FBM.Client /// <param name="config">The client configuration</param> public FBMClient(FBMClientConfig config) { - RequestRental = ObjectRental.CreateReusable(ReuseableRequestConstructor); + RequestRental = ObjectRental.CreateReusable(ReuseableRequestConstructor, 200); SendLock = new(1); ConnectionStatusHandle = new(true); ActiveRequests = new(Environment.ProcessorCount, 100); @@ -187,25 +189,19 @@ namespace VNLib.Net.Messaging.FBM.Client public async Task<FBMResponse> SendAsync(FBMRequest request, CancellationToken cancellationToken = default) { Check(); - //Length of the request must contains at least 1 int and header byte - if (request.Length < 1 + sizeof(int)) - { - throw new FBMInvalidRequestException("Message is not initialized"); - } - //Store a null value in the request queue so the response can store a buffer - if (!ActiveRequests.TryAdd(request.MessageId, request)) - { - throw new ArgumentException("Message with the same ID is already being processed"); - } + + cancellationToken.ThrowIfCancellationRequested(); + + ValidateRequest(request); + + CheckOrEnqueue(request); + try { //Get the request data segment ReadOnlyMemory<byte> requestData = request.GetRequestData(); Debug("Sending {bytes} with id {id}", requestData.Length, request.MessageId); - - //Reset the wait handle - request.ResponseWaitEvent.Reset(); //Wait for send-lock using (SemSlimReleaser releaser = await SendLock.GetReleaserAsync(cancellationToken)) @@ -215,7 +211,7 @@ namespace VNLib.Net.Messaging.FBM.Client } //wait for the response to be set - await request.WaitForResponseAsync(cancellationToken); + await request.Waiter.WaitAsync(Config.RequestTimeout, cancellationToken).ConfigureAwait(true); Debug("Received {size} bytes for message {id}", request.Response?.Length ?? 0, request.MessageId); @@ -225,11 +221,14 @@ namespace VNLib.Net.Messaging.FBM.Client { //Remove the request since packet was never sent ActiveRequests.Remove(request.MessageId, out _); - //Clear waiting flag - request.ResponseWaitEvent.Set(); + + //Cleanup waiter + request.Waiter.OnEndRequest(); + throw; } } + /// <summary> /// Streams arbitrary binary data to the server with the initial request message /// </summary> @@ -237,23 +236,20 @@ namespace VNLib.Net.Messaging.FBM.Client /// <param name="payload">Data to stream to the server</param> /// <param name="ct">The content type of the stream of data</param> /// <param name="cancellationToken"></param> - /// <returns>When awaited, yields the server response</returns> + /// <returns>A task that resolves when the data is sent and the resonse is received</returns> /// <exception cref="ArgumentException"></exception> /// <exception cref="ObjectDisposedException"></exception> /// <exception cref="InvalidOperationException"></exception> public async Task StreamDataAsync(FBMRequest request, Stream payload, ContentType ct, CancellationToken cancellationToken = default) { Check(); - //Length of the request must contains at least 1 int and header byte - if(request.Length < 1 + sizeof(int)) - { - throw new FBMInvalidRequestException("Message is not initialized"); - } - //Store a null value in the request queue so the response can store a buffer - if (!ActiveRequests.TryAdd(request.MessageId, request)) - { - throw new ArgumentException("Message with the same ID is already being processed"); - } + + cancellationToken.ThrowIfCancellationRequested(); + + ValidateRequest(request); + + CheckOrEnqueue(request); + try { //Get the request data segment @@ -261,21 +257,20 @@ namespace VNLib.Net.Messaging.FBM.Client Debug("Streaming {bytes} with id {id}", requestData.Length, request.MessageId); - //Reset the wait handle - request.ResponseWaitEvent.Reset(); - //Write an empty body in the request request.WriteBody(ReadOnlySpan<byte>.Empty, ct); + //Calc buffer size + int bufSize = (int)Math.Clamp(payload.Length, Config.MessageBufferSize, Config.MaxMessageSize); + + //Alloc a streaming buffer + using IMemoryOwner<byte> buffer = Config.BufferHeap.DirectAlloc<byte>(bufSize); + //Wait for send-lock using (SemSlimReleaser releaser = await SendLock.GetReleaserAsync(cancellationToken)) { //Send the initial request packet await ClientSocket.SendAsync(requestData, WebSocketMessageType.Binary, false, cancellationToken); - //Calc buffer size - int bufSize = (int)Math.Clamp(payload.Length, Config.MessageBufferSize, Config.MaxMessageSize); - //Alloc a streaming buffer - using IMemoryOwner<byte> buffer = Config.BufferHeap.DirectAlloc<byte>(bufSize); //Stream mesage body do { @@ -291,21 +286,53 @@ namespace VNLib.Net.Messaging.FBM.Client } while (true); } + //wait for the server to respond - await request.WaitForResponseAsync(cancellationToken); + await request.Waiter.WaitAsync(Config.RequestTimeout, cancellationToken).ConfigureAwait(true); Debug("Response recieved {size} bytes for message {id}", request.Response?.Length ?? 0, request.MessageId); } catch { //Remove the request since packet was never sent or cancelled - ActiveRequests.Remove(request.MessageId, out _); - //Clear wait lock so the request state is reset - request.ResponseWaitEvent.Set(); + _ = ActiveRequests.TryRemove(request.MessageId, out _); + + //Cleanup request waiter + request.Waiter.OnEndRequest(); + throw; } } + + + private void CheckOrEnqueue(FBMRequest request) + { + /* + * We need to check that the request is not already queued because a wait may be pending + * and calling SetupAsyncRequest may overwite another wait and cause a deadlock + */ + + if (!ActiveRequests.TryAdd(request.MessageId, request)) + { + throw new ArgumentException("Message with the same ID is already being processed"); + } + + //Configure the request/response task + request.Waiter.OnBeginRequest(); + } + + private static void ValidateRequest(FBMRequest? request) + { + _ = request ?? throw new ArgumentNullException(nameof(request)); + + //Length of the request must contains at least 1 int and header byte + if (request.Length < 1 + sizeof(int)) + { + throw new FBMInvalidRequestException("Message is not initialized"); + } + } + /// <summary> /// Begins listening for messages from the server on the internal socket (must be connected), /// until the socket is closed, or canceled @@ -381,17 +408,20 @@ namespace VNLib.Net.Messaging.FBM.Client { //Dispose the recv buffer recvBuffer.Dispose(); + //Cleanup the socket when exiting + ClientSocket.Cleanup(); + //Set status handle as unset + ConnectionStatusHandle.Set(); + //Set all pending events foreach (FBMRequest request in ActiveRequests.Values) { - request.ResponseWaitEvent.Set(); + request.Waiter.ManualCancellation(); } + //Clear dict ActiveRequests.Clear(); - //Cleanup the socket when exiting - ClientSocket.Cleanup(); - //Set status handle as unset - ConnectionStatusHandle.Set(); + //Invoke connection closed ConnectionClosed?.Invoke(this, EventArgs.Empty); } @@ -427,7 +457,7 @@ namespace VNLib.Net.Messaging.FBM.Client if (ActiveRequests.TryRemove(messageId, out FBMRequest? request)) { //Set the new response message - request.SetResponse(responseMessage); + request.Waiter.Complete(responseMessage); } else { @@ -437,6 +467,7 @@ namespace VNLib.Net.Messaging.FBM.Client responseMessage.Dispose(); } } + /// <summary> /// Processes a control frame response from the server /// </summary> @@ -445,6 +476,7 @@ namespace VNLib.Net.Messaging.FBM.Client { vms.Dispose(); } + /// <summary> /// Processes a control frame response from the server /// </summary> diff --git a/lib/Net.Messaging.FBM/src/Client/FBMClientConfig.cs b/lib/Net.Messaging.FBM/src/Client/FBMClientConfig.cs index 77278f1..fb5f32f 100644 --- a/lib/Net.Messaging.FBM/src/Client/FBMClientConfig.cs +++ b/lib/Net.Messaging.FBM/src/Client/FBMClientConfig.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2022 Vaughn Nugent +* Copyright (c) 2023 Vaughn Nugent * * Library: VNLib * Package: VNLib.Net.Messaging.FBM @@ -64,6 +64,11 @@ namespace VNLib.Net.Messaging.FBM.Client /// The websocket keepalive interval to use (leaving this property default disables keepalives) /// </summary> public readonly TimeSpan KeepAliveInterval { get; init; } + + /// <summary> + /// If configured, configures a maximum request timout + /// </summary> + public readonly TimeSpan RequestTimeout { get; init; } /// <summary> /// The websocket sub-protocol to use /// </summary> diff --git a/lib/Net.Messaging.FBM/src/Client/FBMRequest.cs b/lib/Net.Messaging.FBM/src/Client/FBMRequest.cs index 01302a9..bffa1ca 100644 --- a/lib/Net.Messaging.FBM/src/Client/FBMRequest.cs +++ b/lib/Net.Messaging.FBM/src/Client/FBMRequest.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2022 Vaughn Nugent +* Copyright (c) 2023 Vaughn Nugent * * Library: VNLib * Package: VNLib.Net.Messaging.FBM @@ -37,9 +37,9 @@ using VNLib.Utils.Memory; using VNLib.Utils.Extensions; using VNLib.Utils.Memory.Caching; + namespace VNLib.Net.Messaging.FBM.Client { - /// <summary> /// <para> /// A reusable Fixed Buffer Message request container. This class is not thread-safe @@ -55,8 +55,16 @@ namespace VNLib.Net.Messaging.FBM.Client #pragma warning restore CA2213 // Disposable fields should be disposed private readonly Encoding HeaderEncoding; + /* + * Local list stores processed headers for response messages + * which are structures and will be allocted in the list. + * FBMMessagesHeader's are essentially pointers to locations + * in the reused buffer (in response "mode") cast to a + * character buffer. + */ private readonly List<FBMMessageHeader> ResponseHeaderList = new(); + /// <summary> /// The size (in bytes) of the request message /// </summary> @@ -66,12 +74,12 @@ namespace VNLib.Net.Messaging.FBM.Client /// The id of the current request message /// </summary> public int MessageId { get; } - + /// <summary> - /// An <see cref="ManualResetEvent"/> to signal request/response - /// event completion + /// Gets the request message waiter /// </summary> - internal ManualResetEvent ResponseWaitEvent { get; } + internal IFBMMessageWaiter Waiter { get; } + internal VnMemoryStream? Response { get; private set; } @@ -92,7 +100,6 @@ namespace VNLib.Net.Messaging.FBM.Client :this(messageId, config.BufferHeap, config.MessageBufferSize, config.HeaderEncoding) { } - /// <summary> /// Initializes a new <see cref="FBMRequest"/> with the sepcified message buffer size and a custom MessageId /// </summary> @@ -105,13 +112,13 @@ namespace VNLib.Net.Messaging.FBM.Client MessageId = messageId; HeaderEncoding = headerEncoding; + //Configure waiter + Waiter = new FBMMessageWaiter(this); + //Alloc the buffer as a memory owner so a memory buffer can be used IMemoryOwner<byte> HeapBuffer = heap.DirectAlloc<byte>(bufferSize); Buffer = new(HeapBuffer); - //Setup response wait handle but make sure the contuation runs async - ResponseWaitEvent = new(true); - //Prepare the message incase the request is fresh Reset(); } @@ -139,7 +146,6 @@ namespace VNLib.Net.Messaging.FBM.Client [MethodImpl(MethodImplOptions.AggressiveInlining)] public IBufferWriter<byte> GetBodyWriter() => Buffer.GetBodyWriter(); - /// <summary> /// The request message packet, this may cause side effects /// </summary> @@ -158,25 +164,17 @@ namespace VNLib.Net.Messaging.FBM.Client Buffer.Reset(MessageId); } - internal void SetResponse(VnMemoryStream? vms) - { - Response = vms; - ResponseWaitEvent.Set(); - } - - internal Task WaitForResponseAsync(CancellationToken token) - { - return ResponseWaitEvent.WaitAsync().WaitAsync(token); - } - ///<inheritdoc/> protected override void Free() { Buffer.Dispose(); - ResponseWaitEvent.Dispose(); Response?.Dispose(); + //Dispose waiter + (Waiter as FBMMessageWaiter)!.Dispose(); } + void IReusable.Prepare() => Reset(); + bool IReusable.Release() { //Make sure response header list is clear @@ -270,5 +268,109 @@ namespace VNLib.Net.Messaging.FBM.Client public override string ToString() => Compile(); #endregion + + #region waiter + private sealed class FBMMessageWaiter : IFBMMessageWaiter, IDisposable + { + private readonly Timer _timer; + private readonly FBMRequest _request; + + private TaskCompletionSource? _tcs; + + public FBMMessageWaiter(FBMRequest request) + { + _request = request; + + //Configure timer + _timer = new(OnCancelled, this, Timeout.Infinite, Timeout.Infinite); + } + + ///<inheritdoc/> + public void OnBeginRequest() + { + //Configure new tcs + _tcs = new(TaskCreationOptions.None); + } + + ///<inheritdoc/> + public void OnEndRequest() + { + //Cleanup tcs ref + _ = Interlocked.Exchange(ref _tcs, null); + + //Stop timer if set + _timer.Stop(); + } + + ///<inheritdoc/> + public void Complete(VnMemoryStream ms) + { + //Read the current state of the tcs + TaskCompletionSource? tcs = _tcs; + + if (tcs == null) + { + //Work is done/cancelled, dispose the ms and leave + ms.Dispose(); + } + + //Store response + _request.Response = ms; + + //Transition to completed state in background thread + static void OnTpCallback(object? state) + { + _ = (state as TaskCompletionSource)!.TrySetResult(); + } + + /* + * The calling thread may be a TP thread proccessing an async event loop. + * We do not want to block this worker thread. + */ + ThreadPool.UnsafeQueueUserWorkItem(OnTpCallback, tcs); + } + + ///<inheritdoc/> + public async Task WaitAsync(TimeSpan timeout, CancellationToken cancellation) + { + if (timeout.Ticks > 0) + { + //Restart timer if timeout is configured + _timer.Restart(timeout); + } + + //Confim the token may be cancelled + if (cancellation.CanBeCanceled) + { + //Register cancellation + using CancellationTokenRegistration reg = cancellation.Register(OnCancelled, this, false); + + //await task that may be canclled + await _tcs.Task.ConfigureAwait(false); + } + else + { + //await the task directly + await _tcs.Task.ConfigureAwait(false); + } + } + + ///<inheritdoc/> + public void ManualCancellation() => OnCancelled(this); + + private void OnCancelled(object? state) + { + //Set cancelled state if exists + _ = _tcs?.TrySetCanceled(); + } + + ///<inheritdoc/> + public void Dispose() + { + _timer.Dispose(); + } + } + + #endregion } } diff --git a/lib/Net.Messaging.FBM/src/Client/IFBMMessageWaiter.cs b/lib/Net.Messaging.FBM/src/Client/IFBMMessageWaiter.cs new file mode 100644 index 0000000..5000711 --- /dev/null +++ b/lib/Net.Messaging.FBM/src/Client/IFBMMessageWaiter.cs @@ -0,0 +1,75 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Net.Messaging.FBM +* File: FBMRequest.cs +* +* FBMRequest.cs is part of VNLib.Net.Messaging.FBM which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Threading; +using System.Threading.Tasks; + +using VNLib.Utils.IO; + + +namespace VNLib.Net.Messaging.FBM.Client +{ + /// <summary> + /// A data structure that exposes controls for the request/response + /// async control flow. + /// </summary> + internal interface IFBMMessageWaiter + { + /// <summary> + /// Called by the client to prepare the waiter before + /// sending the request to the server + /// </summary> + void OnBeginRequest(); + + /// <summary> + /// Asynchronously waits for the server to respond while observing a cancellation token + /// or a timeout + /// </summary> + /// <param name="timeout">The maxium time to wait for the server to respond (may be default/0)</param> + /// <param name="cancellation">The cancellation token to observe</param> + /// <returns>A task that completes when the server responds</returns> + Task WaitAsync(TimeSpan timeout, CancellationToken cancellation); + + /// <summary> + /// Called by the client to cleanup the waiter when the request is completed + /// or errored. This method is exposed incase an error happens before the wait is + /// entered. + /// </summary> + void OnEndRequest(); + + /// <summary> + /// Set by the client when the response has been successfully received by the client + /// </summary> + /// <param name="ms">The response data to pass to the response</param> + void Complete(VnMemoryStream ms); + + /// <summary> + /// Called to invoke a manual cancellation of a request waiter. This method should + /// be called from a different thread, it may yield to complete the cancellation + /// operation. + /// </summary> + void ManualCancellation(); + } +} diff --git a/lib/Plugins.Essentials/src/Extensions/ConnectionInfoExtensions.cs b/lib/Plugins.Essentials/src/Extensions/ConnectionInfoExtensions.cs index ba01132..a91f196 100644 --- a/lib/Plugins.Essentials/src/Extensions/ConnectionInfoExtensions.cs +++ b/lib/Plugins.Essentials/src/Extensions/ConnectionInfoExtensions.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2022 Vaughn Nugent +* Copyright (c) 2023 Vaughn Nugent * * Library: VNLib * Package: VNLib.Plugins.Essentials @@ -23,8 +23,8 @@ */ using System; -using System.Diagnostics.CodeAnalysis; using System.Net; +using System.Diagnostics.CodeAnalysis; using System.Runtime.CompilerServices; using VNLib.Net.Http; @@ -33,6 +33,7 @@ using VNLib.Net.Http; namespace VNLib.Plugins.Essentials.Extensions { + /// <summary> /// Provides <see cref="ConnectionInfo"/> extension methods /// for common use cases @@ -234,6 +235,33 @@ namespace VNLib.Plugins.Essentials.Extensions server.SetCookie(name, value, domain, path, expires, sameSite, httpOnly, secure); } + + /// <summary> + /// Sets a cookie with an infinite (session life-span) + /// </summary> + /// <param name="server"></param> + /// <param name="cookie">The cookie to set for the server</param> + /// <exception cref="ArgumentException"></exception> + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public static void SetCookie(this IConnectionInfo server, in HttpCookie cookie) + { + //Cookie name is required + if(string.IsNullOrWhiteSpace(cookie.Name)) + { + throw new ArgumentException("A nonn-null cookie name is required"); + } + + //Set the cookie + server.SetCookie(cookie.Name, + cookie.Value, + cookie.Domain, + cookie.Path, + cookie.ValidFor, + cookie.SameSite, + cookie.HttpOnly, + cookie.Secure); + } + /// <summary> /// Is the current connection a "browser" ? /// </summary> diff --git a/lib/Plugins.Essentials/src/Extensions/HttpCookie.cs b/lib/Plugins.Essentials/src/Extensions/HttpCookie.cs new file mode 100644 index 0000000..d7f73a3 --- /dev/null +++ b/lib/Plugins.Essentials/src/Extensions/HttpCookie.cs @@ -0,0 +1,53 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Plugins.Essentials +* File: HttpCookie.cs +* +* HttpCookie.cs is part of VNLib.Plugins.Essentials which is part of +* the larger VNLib collection of libraries and utilities. +* +* VNLib.Plugins.Essentials is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Plugins.Essentials is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; + +using VNLib.Net.Http; + +#nullable enable + +namespace VNLib.Plugins.Essentials.Extensions +{ + /// <summary> + /// A structure for defining an HTTP cookie + /// </summary> + /// <param name="Name">The cookie name</param> + /// <param name="Value">The cookie value</param> + public readonly record struct HttpCookie (string Name, string Value) + { + public readonly TimeSpan ValidFor { get; init; } = TimeSpan.MaxValue; + public readonly string Domain { get; init; } = ""; + public readonly string Path { get; init; } = "/"; + public readonly CookieSameSite SameSite { get; init; } = CookieSameSite.None; + public readonly bool HttpOnly { get; init; } = false; + public readonly bool Secure { get; init; } = false; + + /// <summary> + /// Configures the default <see cref="HttpCookie"/> + /// </summary> + public HttpCookie():this(string.Empty, string.Empty) + { } + } +}
\ No newline at end of file |