From cef1e794ba190292bcfc0274e9d91afe535adb7e Mon Sep 17 00:00:00 2001 From: vnugent Date: Sat, 11 Mar 2023 18:17:47 -0500 Subject: Patch cache server FBMClient default timeout, extened timeout paremters to public api --- lib/Net.Messaging.FBM/src/Client/FBMClient.cs | 108 ++++++++++++++++---------- 1 file changed, 69 insertions(+), 39 deletions(-) (limited to 'lib') diff --git a/lib/Net.Messaging.FBM/src/Client/FBMClient.cs b/lib/Net.Messaging.FBM/src/Client/FBMClient.cs index 5f99d69..861811e 100644 --- a/lib/Net.Messaging.FBM/src/Client/FBMClient.cs +++ b/lib/Net.Messaging.FBM/src/Client/FBMClient.cs @@ -73,7 +73,6 @@ namespace VNLib.Net.Messaging.FBM.Client private readonly SemaphoreSlim SendLock; private readonly ConcurrentDictionary ActiveRequests; private readonly ReusableStore RequestRental; - private readonly FBMRequest _controlFrame; /// /// The configuration for the current client @@ -88,18 +87,7 @@ namespace VNLib.Net.Messaging.FBM.Client /// The to send/recieve message on /// public ManagedClientWebSocket ClientSocket { get; } - /// - /// Gets the shared control frame for the current instance. The request is reset when - /// this property is called. (Not thread safe) - /// - protected FBMRequest ControlFrame - { - get - { - _controlFrame.Reset(); - return _controlFrame; - } - } + /// /// Creates a new in a closed state @@ -113,8 +101,6 @@ namespace VNLib.Net.Messaging.FBM.Client ActiveRequests = new(Environment.ProcessorCount, 100); Config = config; - //Init control frame - _controlFrame = new (Helpers.CONTROL_FRAME_MID, in config); //Init the new client socket ClientSocket = new(config.RecvBufferSize, config.RecvBufferSize, config.KeepAliveInterval, config.SubProtocol); } @@ -169,6 +155,7 @@ namespace VNLib.Net.Messaging.FBM.Client /// /// The configured (rented or new) ready for use public FBMRequest RentRequest() => RequestRental.Rent(); + /// /// Stores (or returns) the reusable request in cache for use with /// @@ -180,13 +167,32 @@ namespace VNLib.Net.Messaging.FBM.Client /// Sends a to the connected server /// /// The request message to send to the server - /// + /// A token to cancel the operation + /// When awaited, yields the server response + /// + /// + /// + /// + public Task SendAsync(FBMRequest request, CancellationToken cancellationToken = default) + { + return SendAsync(request, Config.RequestTimeout, cancellationToken); + } + + /// + /// Sends a to the connected server + /// + /// The request message to send to the server + /// A token to cancel the async send operation + /// + /// A maximum time to wait for the operation to complete before the wait is cancelled. An infinite + /// timeout (-1) or 0 will disable the timer. + /// /// When awaited, yields the server response /// /// /// /// - public async Task SendAsync(FBMRequest request, CancellationToken cancellationToken = default) + public async Task SendAsync(FBMRequest request, TimeSpan timeout, CancellationToken cancellationToken = default) { Check(); @@ -211,7 +217,7 @@ namespace VNLib.Net.Messaging.FBM.Client } //wait for the response to be set - await request.Waiter.WaitAsync(Config.RequestTimeout, cancellationToken).ConfigureAwait(true); + await request.Waiter.WaitAsync(timeout, cancellationToken).ConfigureAwait(true); Debug("Received {size} bytes for message {id}", request.Response?.Length ?? 0, request.MessageId); @@ -235,12 +241,29 @@ namespace VNLib.Net.Messaging.FBM.Client /// The request message to send to the server /// Data to stream to the server /// The content type of the stream of data - /// + /// A token to cancel the operation /// A task that resolves when the data is sent and the resonse is received /// /// /// - public async Task StreamDataAsync(FBMRequest request, Stream payload, ContentType ct, CancellationToken cancellationToken = default) + public Task StreamDataAsync(FBMRequest request, Stream payload, ContentType ct, CancellationToken cancellationToken = default) + { + return StreamDataAsync(request, payload, ct, Config.RequestTimeout, cancellationToken); + } + + /// + /// Streams arbitrary binary data to the server with the initial request message + /// + /// The request message to send to the server + /// Data to stream to the server + /// The content type of the stream of data + /// A token to cancel the operation + /// A maxium wait timeout period. If -1 or 0 the timeout is disabled + /// A task that resolves when the data is sent and the resonse is received + /// + /// + /// + public async Task StreamDataAsync(FBMRequest request, Stream payload, ContentType ct, TimeSpan timeout, CancellationToken cancellationToken = default) { Check(); @@ -288,7 +311,7 @@ namespace VNLib.Net.Messaging.FBM.Client } //wait for the server to respond - await request.Waiter.WaitAsync(Config.RequestTimeout, cancellationToken).ConfigureAwait(true); + await request.Waiter.WaitAsync(timeout, cancellationToken).ConfigureAwait(true); Debug("Response recieved {size} bytes for message {id}", request.Response?.Length ?? 0, request.MessageId); } @@ -304,7 +327,25 @@ namespace VNLib.Net.Messaging.FBM.Client } } - + + /// + /// Closes the underlying and cancels all pending operations + /// + /// + /// + /// + public async Task DisconnectAsync(CancellationToken cancellationToken = default) + { + Check(); + //Close the connection + await ClientSocket.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, "Closing", cancellationToken); + } + + /// + public void CacheClear() => RequestRental.CacheClear(); + /// + public void CacheHardClear() => RequestRental.CacheHardClear(); + private void CheckOrEnqueue(FBMRequest request) { @@ -326,6 +367,11 @@ namespace VNLib.Net.Messaging.FBM.Client { _ = request ?? throw new ArgumentNullException(nameof(request)); + if(request.MessageId == 0) + { + throw new FBMInvalidRequestException("The request message id must NOT be 0"); + } + //Length of the request must contains at least 1 int and header byte if (request.Length < 1 + sizeof(int)) { @@ -485,19 +531,7 @@ namespace VNLib.Net.Messaging.FBM.Client { } - - /// - /// Closes the underlying and cancels all pending operations - /// - /// - /// - /// - public async Task DisconnectAsync(CancellationToken cancellationToken = default) - { - Check(); - //Close the connection - await ClientSocket.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, "Closing", cancellationToken); - } + /// protected override void Free() { @@ -508,9 +542,5 @@ namespace VNLib.Net.Messaging.FBM.Client SendLock.Dispose(); ConnectionStatusHandle.Dispose(); } - /// - public void CacheClear() => RequestRental.CacheClear(); - /// - public void CacheHardClear() => RequestRental.CacheHardClear(); } } -- cgit