From dab71d5597fdfbe71f6ac310a240835716e952a5 Mon Sep 17 00:00:00 2001 From: vnugent Date: Mon, 6 Mar 2023 01:53:26 -0500 Subject: FBM and caching non-breaking updates --- lib/Net.Messaging.FBM/src/Client/FBMClient.cs | 124 ++++++++++------- .../src/Client/FBMClientConfig.cs | 7 +- lib/Net.Messaging.FBM/src/Client/FBMRequest.cs | 148 +++++++++++++++++---- .../src/Client/IFBMMessageWaiter.cs | 75 +++++++++++ 4 files changed, 284 insertions(+), 70 deletions(-) create mode 100644 lib/Net.Messaging.FBM/src/Client/IFBMMessageWaiter.cs (limited to 'lib/Net.Messaging.FBM') diff --git a/lib/Net.Messaging.FBM/src/Client/FBMClient.cs b/lib/Net.Messaging.FBM/src/Client/FBMClient.cs index c4f9f54..5f99d69 100644 --- a/lib/Net.Messaging.FBM/src/Client/FBMClient.cs +++ b/lib/Net.Messaging.FBM/src/Client/FBMClient.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2022 Vaughn Nugent +* Copyright (c) 2023 Vaughn Nugent * * Library: VNLib * Package: VNLib.Net.Messaging.FBM @@ -40,6 +40,7 @@ using VNLib.Utils.Memory.Caching; namespace VNLib.Net.Messaging.FBM.Client { + /// /// A Fixed Buffer Message Protocol client. Allows for high performance client-server messaging /// with minimal memory overhead. @@ -73,6 +74,7 @@ namespace VNLib.Net.Messaging.FBM.Client private readonly ConcurrentDictionary ActiveRequests; private readonly ReusableStore RequestRental; private readonly FBMRequest _controlFrame; + /// /// The configuration for the current client /// @@ -105,7 +107,7 @@ namespace VNLib.Net.Messaging.FBM.Client /// The client configuration public FBMClient(FBMClientConfig config) { - RequestRental = ObjectRental.CreateReusable(ReuseableRequestConstructor); + RequestRental = ObjectRental.CreateReusable(ReuseableRequestConstructor, 200); SendLock = new(1); ConnectionStatusHandle = new(true); ActiveRequests = new(Environment.ProcessorCount, 100); @@ -187,25 +189,19 @@ namespace VNLib.Net.Messaging.FBM.Client public async Task SendAsync(FBMRequest request, CancellationToken cancellationToken = default) { Check(); - //Length of the request must contains at least 1 int and header byte - if (request.Length < 1 + sizeof(int)) - { - throw new FBMInvalidRequestException("Message is not initialized"); - } - //Store a null value in the request queue so the response can store a buffer - if (!ActiveRequests.TryAdd(request.MessageId, request)) - { - throw new ArgumentException("Message with the same ID is already being processed"); - } + + cancellationToken.ThrowIfCancellationRequested(); + + ValidateRequest(request); + + CheckOrEnqueue(request); + try { //Get the request data segment ReadOnlyMemory requestData = request.GetRequestData(); Debug("Sending {bytes} with id {id}", requestData.Length, request.MessageId); - - //Reset the wait handle - request.ResponseWaitEvent.Reset(); //Wait for send-lock using (SemSlimReleaser releaser = await SendLock.GetReleaserAsync(cancellationToken)) @@ -215,7 +211,7 @@ namespace VNLib.Net.Messaging.FBM.Client } //wait for the response to be set - await request.WaitForResponseAsync(cancellationToken); + await request.Waiter.WaitAsync(Config.RequestTimeout, cancellationToken).ConfigureAwait(true); Debug("Received {size} bytes for message {id}", request.Response?.Length ?? 0, request.MessageId); @@ -225,11 +221,14 @@ namespace VNLib.Net.Messaging.FBM.Client { //Remove the request since packet was never sent ActiveRequests.Remove(request.MessageId, out _); - //Clear waiting flag - request.ResponseWaitEvent.Set(); + + //Cleanup waiter + request.Waiter.OnEndRequest(); + throw; } } + /// /// Streams arbitrary binary data to the server with the initial request message /// @@ -237,23 +236,20 @@ namespace VNLib.Net.Messaging.FBM.Client /// Data to stream to the server /// The content type of the stream of data /// - /// When awaited, yields the server response + /// A task that resolves when the data is sent and the resonse is received /// /// /// public async Task StreamDataAsync(FBMRequest request, Stream payload, ContentType ct, CancellationToken cancellationToken = default) { Check(); - //Length of the request must contains at least 1 int and header byte - if(request.Length < 1 + sizeof(int)) - { - throw new FBMInvalidRequestException("Message is not initialized"); - } - //Store a null value in the request queue so the response can store a buffer - if (!ActiveRequests.TryAdd(request.MessageId, request)) - { - throw new ArgumentException("Message with the same ID is already being processed"); - } + + cancellationToken.ThrowIfCancellationRequested(); + + ValidateRequest(request); + + CheckOrEnqueue(request); + try { //Get the request data segment @@ -261,21 +257,20 @@ namespace VNLib.Net.Messaging.FBM.Client Debug("Streaming {bytes} with id {id}", requestData.Length, request.MessageId); - //Reset the wait handle - request.ResponseWaitEvent.Reset(); - //Write an empty body in the request request.WriteBody(ReadOnlySpan.Empty, ct); + //Calc buffer size + int bufSize = (int)Math.Clamp(payload.Length, Config.MessageBufferSize, Config.MaxMessageSize); + + //Alloc a streaming buffer + using IMemoryOwner buffer = Config.BufferHeap.DirectAlloc(bufSize); + //Wait for send-lock using (SemSlimReleaser releaser = await SendLock.GetReleaserAsync(cancellationToken)) { //Send the initial request packet await ClientSocket.SendAsync(requestData, WebSocketMessageType.Binary, false, cancellationToken); - //Calc buffer size - int bufSize = (int)Math.Clamp(payload.Length, Config.MessageBufferSize, Config.MaxMessageSize); - //Alloc a streaming buffer - using IMemoryOwner buffer = Config.BufferHeap.DirectAlloc(bufSize); //Stream mesage body do { @@ -291,21 +286,53 @@ namespace VNLib.Net.Messaging.FBM.Client } while (true); } + //wait for the server to respond - await request.WaitForResponseAsync(cancellationToken); + await request.Waiter.WaitAsync(Config.RequestTimeout, cancellationToken).ConfigureAwait(true); Debug("Response recieved {size} bytes for message {id}", request.Response?.Length ?? 0, request.MessageId); } catch { //Remove the request since packet was never sent or cancelled - ActiveRequests.Remove(request.MessageId, out _); - //Clear wait lock so the request state is reset - request.ResponseWaitEvent.Set(); + _ = ActiveRequests.TryRemove(request.MessageId, out _); + + //Cleanup request waiter + request.Waiter.OnEndRequest(); + throw; } } + + + private void CheckOrEnqueue(FBMRequest request) + { + /* + * We need to check that the request is not already queued because a wait may be pending + * and calling SetupAsyncRequest may overwite another wait and cause a deadlock + */ + + if (!ActiveRequests.TryAdd(request.MessageId, request)) + { + throw new ArgumentException("Message with the same ID is already being processed"); + } + + //Configure the request/response task + request.Waiter.OnBeginRequest(); + } + + private static void ValidateRequest(FBMRequest? request) + { + _ = request ?? throw new ArgumentNullException(nameof(request)); + + //Length of the request must contains at least 1 int and header byte + if (request.Length < 1 + sizeof(int)) + { + throw new FBMInvalidRequestException("Message is not initialized"); + } + } + /// /// Begins listening for messages from the server on the internal socket (must be connected), /// until the socket is closed, or canceled @@ -381,17 +408,20 @@ namespace VNLib.Net.Messaging.FBM.Client { //Dispose the recv buffer recvBuffer.Dispose(); + //Cleanup the socket when exiting + ClientSocket.Cleanup(); + //Set status handle as unset + ConnectionStatusHandle.Set(); + //Set all pending events foreach (FBMRequest request in ActiveRequests.Values) { - request.ResponseWaitEvent.Set(); + request.Waiter.ManualCancellation(); } + //Clear dict ActiveRequests.Clear(); - //Cleanup the socket when exiting - ClientSocket.Cleanup(); - //Set status handle as unset - ConnectionStatusHandle.Set(); + //Invoke connection closed ConnectionClosed?.Invoke(this, EventArgs.Empty); } @@ -427,7 +457,7 @@ namespace VNLib.Net.Messaging.FBM.Client if (ActiveRequests.TryRemove(messageId, out FBMRequest? request)) { //Set the new response message - request.SetResponse(responseMessage); + request.Waiter.Complete(responseMessage); } else { @@ -437,6 +467,7 @@ namespace VNLib.Net.Messaging.FBM.Client responseMessage.Dispose(); } } + /// /// Processes a control frame response from the server /// @@ -445,6 +476,7 @@ namespace VNLib.Net.Messaging.FBM.Client { vms.Dispose(); } + /// /// Processes a control frame response from the server /// diff --git a/lib/Net.Messaging.FBM/src/Client/FBMClientConfig.cs b/lib/Net.Messaging.FBM/src/Client/FBMClientConfig.cs index 77278f1..fb5f32f 100644 --- a/lib/Net.Messaging.FBM/src/Client/FBMClientConfig.cs +++ b/lib/Net.Messaging.FBM/src/Client/FBMClientConfig.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2022 Vaughn Nugent +* Copyright (c) 2023 Vaughn Nugent * * Library: VNLib * Package: VNLib.Net.Messaging.FBM @@ -64,6 +64,11 @@ namespace VNLib.Net.Messaging.FBM.Client /// The websocket keepalive interval to use (leaving this property default disables keepalives) /// public readonly TimeSpan KeepAliveInterval { get; init; } + + /// + /// If configured, configures a maximum request timout + /// + public readonly TimeSpan RequestTimeout { get; init; } /// /// The websocket sub-protocol to use /// diff --git a/lib/Net.Messaging.FBM/src/Client/FBMRequest.cs b/lib/Net.Messaging.FBM/src/Client/FBMRequest.cs index 01302a9..bffa1ca 100644 --- a/lib/Net.Messaging.FBM/src/Client/FBMRequest.cs +++ b/lib/Net.Messaging.FBM/src/Client/FBMRequest.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2022 Vaughn Nugent +* Copyright (c) 2023 Vaughn Nugent * * Library: VNLib * Package: VNLib.Net.Messaging.FBM @@ -37,9 +37,9 @@ using VNLib.Utils.Memory; using VNLib.Utils.Extensions; using VNLib.Utils.Memory.Caching; + namespace VNLib.Net.Messaging.FBM.Client { - /// /// /// A reusable Fixed Buffer Message request container. This class is not thread-safe @@ -55,8 +55,16 @@ namespace VNLib.Net.Messaging.FBM.Client #pragma warning restore CA2213 // Disposable fields should be disposed private readonly Encoding HeaderEncoding; + /* + * Local list stores processed headers for response messages + * which are structures and will be allocted in the list. + * FBMMessagesHeader's are essentially pointers to locations + * in the reused buffer (in response "mode") cast to a + * character buffer. + */ private readonly List ResponseHeaderList = new(); + /// /// The size (in bytes) of the request message /// @@ -66,12 +74,12 @@ namespace VNLib.Net.Messaging.FBM.Client /// The id of the current request message /// public int MessageId { get; } - + /// - /// An to signal request/response - /// event completion + /// Gets the request message waiter /// - internal ManualResetEvent ResponseWaitEvent { get; } + internal IFBMMessageWaiter Waiter { get; } + internal VnMemoryStream? Response { get; private set; } @@ -92,7 +100,6 @@ namespace VNLib.Net.Messaging.FBM.Client :this(messageId, config.BufferHeap, config.MessageBufferSize, config.HeaderEncoding) { } - /// /// Initializes a new with the sepcified message buffer size and a custom MessageId /// @@ -105,13 +112,13 @@ namespace VNLib.Net.Messaging.FBM.Client MessageId = messageId; HeaderEncoding = headerEncoding; + //Configure waiter + Waiter = new FBMMessageWaiter(this); + //Alloc the buffer as a memory owner so a memory buffer can be used IMemoryOwner HeapBuffer = heap.DirectAlloc(bufferSize); Buffer = new(HeapBuffer); - //Setup response wait handle but make sure the contuation runs async - ResponseWaitEvent = new(true); - //Prepare the message incase the request is fresh Reset(); } @@ -139,7 +146,6 @@ namespace VNLib.Net.Messaging.FBM.Client [MethodImpl(MethodImplOptions.AggressiveInlining)] public IBufferWriter GetBodyWriter() => Buffer.GetBodyWriter(); - /// /// The request message packet, this may cause side effects /// @@ -158,25 +164,17 @@ namespace VNLib.Net.Messaging.FBM.Client Buffer.Reset(MessageId); } - internal void SetResponse(VnMemoryStream? vms) - { - Response = vms; - ResponseWaitEvent.Set(); - } - - internal Task WaitForResponseAsync(CancellationToken token) - { - return ResponseWaitEvent.WaitAsync().WaitAsync(token); - } - /// protected override void Free() { Buffer.Dispose(); - ResponseWaitEvent.Dispose(); Response?.Dispose(); + //Dispose waiter + (Waiter as FBMMessageWaiter)!.Dispose(); } + void IReusable.Prepare() => Reset(); + bool IReusable.Release() { //Make sure response header list is clear @@ -270,5 +268,109 @@ namespace VNLib.Net.Messaging.FBM.Client public override string ToString() => Compile(); #endregion + + #region waiter + private sealed class FBMMessageWaiter : IFBMMessageWaiter, IDisposable + { + private readonly Timer _timer; + private readonly FBMRequest _request; + + private TaskCompletionSource? _tcs; + + public FBMMessageWaiter(FBMRequest request) + { + _request = request; + + //Configure timer + _timer = new(OnCancelled, this, Timeout.Infinite, Timeout.Infinite); + } + + /// + public void OnBeginRequest() + { + //Configure new tcs + _tcs = new(TaskCreationOptions.None); + } + + /// + public void OnEndRequest() + { + //Cleanup tcs ref + _ = Interlocked.Exchange(ref _tcs, null); + + //Stop timer if set + _timer.Stop(); + } + + /// + public void Complete(VnMemoryStream ms) + { + //Read the current state of the tcs + TaskCompletionSource? tcs = _tcs; + + if (tcs == null) + { + //Work is done/cancelled, dispose the ms and leave + ms.Dispose(); + } + + //Store response + _request.Response = ms; + + //Transition to completed state in background thread + static void OnTpCallback(object? state) + { + _ = (state as TaskCompletionSource)!.TrySetResult(); + } + + /* + * The calling thread may be a TP thread proccessing an async event loop. + * We do not want to block this worker thread. + */ + ThreadPool.UnsafeQueueUserWorkItem(OnTpCallback, tcs); + } + + /// + public async Task WaitAsync(TimeSpan timeout, CancellationToken cancellation) + { + if (timeout.Ticks > 0) + { + //Restart timer if timeout is configured + _timer.Restart(timeout); + } + + //Confim the token may be cancelled + if (cancellation.CanBeCanceled) + { + //Register cancellation + using CancellationTokenRegistration reg = cancellation.Register(OnCancelled, this, false); + + //await task that may be canclled + await _tcs.Task.ConfigureAwait(false); + } + else + { + //await the task directly + await _tcs.Task.ConfigureAwait(false); + } + } + + /// + public void ManualCancellation() => OnCancelled(this); + + private void OnCancelled(object? state) + { + //Set cancelled state if exists + _ = _tcs?.TrySetCanceled(); + } + + /// + public void Dispose() + { + _timer.Dispose(); + } + } + + #endregion } } diff --git a/lib/Net.Messaging.FBM/src/Client/IFBMMessageWaiter.cs b/lib/Net.Messaging.FBM/src/Client/IFBMMessageWaiter.cs new file mode 100644 index 0000000..5000711 --- /dev/null +++ b/lib/Net.Messaging.FBM/src/Client/IFBMMessageWaiter.cs @@ -0,0 +1,75 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Net.Messaging.FBM +* File: FBMRequest.cs +* +* FBMRequest.cs is part of VNLib.Net.Messaging.FBM which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Threading; +using System.Threading.Tasks; + +using VNLib.Utils.IO; + + +namespace VNLib.Net.Messaging.FBM.Client +{ + /// + /// A data structure that exposes controls for the request/response + /// async control flow. + /// + internal interface IFBMMessageWaiter + { + /// + /// Called by the client to prepare the waiter before + /// sending the request to the server + /// + void OnBeginRequest(); + + /// + /// Asynchronously waits for the server to respond while observing a cancellation token + /// or a timeout + /// + /// The maxium time to wait for the server to respond (may be default/0) + /// The cancellation token to observe + /// A task that completes when the server responds + Task WaitAsync(TimeSpan timeout, CancellationToken cancellation); + + /// + /// Called by the client to cleanup the waiter when the request is completed + /// or errored. This method is exposed incase an error happens before the wait is + /// entered. + /// + void OnEndRequest(); + + /// + /// Set by the client when the response has been successfully received by the client + /// + /// The response data to pass to the response + void Complete(VnMemoryStream ms); + + /// + /// Called to invoke a manual cancellation of a request waiter. This method should + /// be called from a different thread, it may yield to complete the cancellation + /// operation. + /// + void ManualCancellation(); + } +} -- cgit