diff options
Diffstat (limited to 'lib')
-rw-r--r-- | lib/Net.Messaging.FBM/src/Client/FBMClient.cs | 108 |
1 files changed, 69 insertions, 39 deletions
diff --git a/lib/Net.Messaging.FBM/src/Client/FBMClient.cs b/lib/Net.Messaging.FBM/src/Client/FBMClient.cs index 5f99d69..861811e 100644 --- a/lib/Net.Messaging.FBM/src/Client/FBMClient.cs +++ b/lib/Net.Messaging.FBM/src/Client/FBMClient.cs @@ -73,7 +73,6 @@ namespace VNLib.Net.Messaging.FBM.Client private readonly SemaphoreSlim SendLock; private readonly ConcurrentDictionary<int, FBMRequest> ActiveRequests; private readonly ReusableStore<FBMRequest> RequestRental; - private readonly FBMRequest _controlFrame; /// <summary> /// The configuration for the current client @@ -88,18 +87,7 @@ namespace VNLib.Net.Messaging.FBM.Client /// The <see cref="ClientWebSocket"/> to send/recieve message on /// </summary> public ManagedClientWebSocket ClientSocket { get; } - /// <summary> - /// Gets the shared control frame for the current instance. The request is reset when - /// this property is called. (Not thread safe) - /// </summary> - protected FBMRequest ControlFrame - { - get - { - _controlFrame.Reset(); - return _controlFrame; - } - } + /// <summary> /// Creates a new <see cref="FBMClient"/> in a closed state @@ -113,8 +101,6 @@ namespace VNLib.Net.Messaging.FBM.Client ActiveRequests = new(Environment.ProcessorCount, 100); Config = config; - //Init control frame - _controlFrame = new (Helpers.CONTROL_FRAME_MID, in config); //Init the new client socket ClientSocket = new(config.RecvBufferSize, config.RecvBufferSize, config.KeepAliveInterval, config.SubProtocol); } @@ -169,6 +155,7 @@ namespace VNLib.Net.Messaging.FBM.Client /// </summary> /// <returns>The configured (rented or new) <see cref="FBMRequest"/> ready for use</returns> public FBMRequest RentRequest() => RequestRental.Rent(); + /// <summary> /// Stores (or returns) the reusable request in cache for use with <see cref="RentRequest"/> /// </summary> @@ -180,13 +167,32 @@ namespace VNLib.Net.Messaging.FBM.Client /// Sends a <see cref="FBMRequest"/> to the connected server /// </summary> /// <param name="request">The request message to send to the server</param> - /// <param name="cancellationToken"></param> + /// <param name="cancellationToken">A token to cancel the operation</param> + /// <returns>When awaited, yields the server response</returns> + /// <exception cref="ArgumentException"></exception> + /// <exception cref="ObjectDisposedException"></exception> + /// <exception cref="InvalidOperationException"></exception> + /// <exception cref="FBMInvalidRequestException"></exception> + public Task<FBMResponse> SendAsync(FBMRequest request, CancellationToken cancellationToken = default) + { + return SendAsync(request, Config.RequestTimeout, cancellationToken); + } + + /// <summary> + /// Sends a <see cref="FBMRequest"/> to the connected server + /// </summary> + /// <param name="request">The request message to send to the server</param> + /// <param name="cancellationToken">A token to cancel the async send operation</param> + /// <param name="timeout"> + /// A maximum time to wait for the operation to complete before the wait is cancelled. An infinite + /// timeout (-1) or 0 will disable the timer. + /// </param> /// <returns>When awaited, yields the server response</returns> /// <exception cref="ArgumentException"></exception> /// <exception cref="ObjectDisposedException"></exception> /// <exception cref="InvalidOperationException"></exception> /// <exception cref="FBMInvalidRequestException"></exception> - public async Task<FBMResponse> SendAsync(FBMRequest request, CancellationToken cancellationToken = default) + public async Task<FBMResponse> SendAsync(FBMRequest request, TimeSpan timeout, CancellationToken cancellationToken = default) { Check(); @@ -211,7 +217,7 @@ namespace VNLib.Net.Messaging.FBM.Client } //wait for the response to be set - await request.Waiter.WaitAsync(Config.RequestTimeout, cancellationToken).ConfigureAwait(true); + await request.Waiter.WaitAsync(timeout, cancellationToken).ConfigureAwait(true); Debug("Received {size} bytes for message {id}", request.Response?.Length ?? 0, request.MessageId); @@ -235,12 +241,29 @@ namespace VNLib.Net.Messaging.FBM.Client /// <param name="request">The request message to send to the server</param> /// <param name="payload">Data to stream to the server</param> /// <param name="ct">The content type of the stream of data</param> - /// <param name="cancellationToken"></param> + /// <param name="cancellationToken">A token to cancel the operation</param> /// <returns>A task that resolves when the data is sent and the resonse is received</returns> /// <exception cref="ArgumentException"></exception> /// <exception cref="ObjectDisposedException"></exception> /// <exception cref="InvalidOperationException"></exception> - public async Task StreamDataAsync(FBMRequest request, Stream payload, ContentType ct, CancellationToken cancellationToken = default) + public Task StreamDataAsync(FBMRequest request, Stream payload, ContentType ct, CancellationToken cancellationToken = default) + { + return StreamDataAsync(request, payload, ct, Config.RequestTimeout, cancellationToken); + } + + /// <summary> + /// Streams arbitrary binary data to the server with the initial request message + /// </summary> + /// <param name="request">The request message to send to the server</param> + /// <param name="payload">Data to stream to the server</param> + /// <param name="ct">The content type of the stream of data</param> + /// <param name="cancellationToken">A token to cancel the operation</param> + /// <param name="timeout">A maxium wait timeout period. If -1 or 0 the timeout is disabled</param> + /// <returns>A task that resolves when the data is sent and the resonse is received</returns> + /// <exception cref="ArgumentException"></exception> + /// <exception cref="ObjectDisposedException"></exception> + /// <exception cref="InvalidOperationException"></exception> + public async Task StreamDataAsync(FBMRequest request, Stream payload, ContentType ct, TimeSpan timeout, CancellationToken cancellationToken = default) { Check(); @@ -288,7 +311,7 @@ namespace VNLib.Net.Messaging.FBM.Client } //wait for the server to respond - await request.Waiter.WaitAsync(Config.RequestTimeout, cancellationToken).ConfigureAwait(true); + await request.Waiter.WaitAsync(timeout, cancellationToken).ConfigureAwait(true); Debug("Response recieved {size} bytes for message {id}", request.Response?.Length ?? 0, request.MessageId); } @@ -304,7 +327,25 @@ namespace VNLib.Net.Messaging.FBM.Client } } - + + /// <summary> + /// Closes the underlying <see cref="WebSocket"/> and cancels all pending operations + /// </summary> + /// <param name="cancellationToken"></param> + /// <returns></returns> + /// <exception cref="ObjectDisposedException"></exception> + public async Task DisconnectAsync(CancellationToken cancellationToken = default) + { + Check(); + //Close the connection + await ClientSocket.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, "Closing", cancellationToken); + } + + ///<inheritdoc/> + public void CacheClear() => RequestRental.CacheClear(); + ///<inheritdoc/> + public void CacheHardClear() => RequestRental.CacheHardClear(); + private void CheckOrEnqueue(FBMRequest request) { @@ -326,6 +367,11 @@ namespace VNLib.Net.Messaging.FBM.Client { _ = request ?? throw new ArgumentNullException(nameof(request)); + if(request.MessageId == 0) + { + throw new FBMInvalidRequestException("The request message id must NOT be 0"); + } + //Length of the request must contains at least 1 int and header byte if (request.Length < 1 + sizeof(int)) { @@ -485,19 +531,7 @@ namespace VNLib.Net.Messaging.FBM.Client { } - - /// <summary> - /// Closes the underlying <see cref="WebSocket"/> and cancels all pending operations - /// </summary> - /// <param name="cancellationToken"></param> - /// <returns></returns> - /// <exception cref="ObjectDisposedException"></exception> - public async Task DisconnectAsync(CancellationToken cancellationToken = default) - { - Check(); - //Close the connection - await ClientSocket.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, "Closing", cancellationToken); - } + ///<inheritdoc/> protected override void Free() { @@ -508,9 +542,5 @@ namespace VNLib.Net.Messaging.FBM.Client SendLock.Dispose(); ConnectionStatusHandle.Dispose(); } - ///<inheritdoc/> - public void CacheClear() => RequestRental.CacheClear(); - ///<inheritdoc/> - public void CacheHardClear() => RequestRental.CacheHardClear(); } } |