aboutsummaryrefslogtreecommitdiff
path: root/lib/Net.Messaging.FBM
diff options
context:
space:
mode:
authorLibravatar vnugent <public@vaughnnugent.com>2023-03-11 18:17:47 -0500
committerLibravatar vnugent <public@vaughnnugent.com>2023-03-11 18:17:47 -0500
commitcef1e794ba190292bcfc0274e9d91afe535adb7e (patch)
treeacc8522bd46e5a1754104089da72021479b6ed46 /lib/Net.Messaging.FBM
parentee7a6272742ce9b50e5b3f0dfc2f2623f36e9057 (diff)
Patch cache server FBMClient default timeout, extened timeout paremters to public api
Diffstat (limited to 'lib/Net.Messaging.FBM')
-rw-r--r--lib/Net.Messaging.FBM/src/Client/FBMClient.cs108
1 files changed, 69 insertions, 39 deletions
diff --git a/lib/Net.Messaging.FBM/src/Client/FBMClient.cs b/lib/Net.Messaging.FBM/src/Client/FBMClient.cs
index 5f99d69..861811e 100644
--- a/lib/Net.Messaging.FBM/src/Client/FBMClient.cs
+++ b/lib/Net.Messaging.FBM/src/Client/FBMClient.cs
@@ -73,7 +73,6 @@ namespace VNLib.Net.Messaging.FBM.Client
private readonly SemaphoreSlim SendLock;
private readonly ConcurrentDictionary<int, FBMRequest> ActiveRequests;
private readonly ReusableStore<FBMRequest> RequestRental;
- private readonly FBMRequest _controlFrame;
/// <summary>
/// The configuration for the current client
@@ -88,18 +87,7 @@ namespace VNLib.Net.Messaging.FBM.Client
/// The <see cref="ClientWebSocket"/> to send/recieve message on
/// </summary>
public ManagedClientWebSocket ClientSocket { get; }
- /// <summary>
- /// Gets the shared control frame for the current instance. The request is reset when
- /// this property is called. (Not thread safe)
- /// </summary>
- protected FBMRequest ControlFrame
- {
- get
- {
- _controlFrame.Reset();
- return _controlFrame;
- }
- }
+
/// <summary>
/// Creates a new <see cref="FBMClient"/> in a closed state
@@ -113,8 +101,6 @@ namespace VNLib.Net.Messaging.FBM.Client
ActiveRequests = new(Environment.ProcessorCount, 100);
Config = config;
- //Init control frame
- _controlFrame = new (Helpers.CONTROL_FRAME_MID, in config);
//Init the new client socket
ClientSocket = new(config.RecvBufferSize, config.RecvBufferSize, config.KeepAliveInterval, config.SubProtocol);
}
@@ -169,6 +155,7 @@ namespace VNLib.Net.Messaging.FBM.Client
/// </summary>
/// <returns>The configured (rented or new) <see cref="FBMRequest"/> ready for use</returns>
public FBMRequest RentRequest() => RequestRental.Rent();
+
/// <summary>
/// Stores (or returns) the reusable request in cache for use with <see cref="RentRequest"/>
/// </summary>
@@ -180,13 +167,32 @@ namespace VNLib.Net.Messaging.FBM.Client
/// Sends a <see cref="FBMRequest"/> to the connected server
/// </summary>
/// <param name="request">The request message to send to the server</param>
- /// <param name="cancellationToken"></param>
+ /// <param name="cancellationToken">A token to cancel the operation</param>
+ /// <returns>When awaited, yields the server response</returns>
+ /// <exception cref="ArgumentException"></exception>
+ /// <exception cref="ObjectDisposedException"></exception>
+ /// <exception cref="InvalidOperationException"></exception>
+ /// <exception cref="FBMInvalidRequestException"></exception>
+ public Task<FBMResponse> SendAsync(FBMRequest request, CancellationToken cancellationToken = default)
+ {
+ return SendAsync(request, Config.RequestTimeout, cancellationToken);
+ }
+
+ /// <summary>
+ /// Sends a <see cref="FBMRequest"/> to the connected server
+ /// </summary>
+ /// <param name="request">The request message to send to the server</param>
+ /// <param name="cancellationToken">A token to cancel the async send operation</param>
+ /// <param name="timeout">
+ /// A maximum time to wait for the operation to complete before the wait is cancelled. An infinite
+ /// timeout (-1) or 0 will disable the timer.
+ /// </param>
/// <returns>When awaited, yields the server response</returns>
/// <exception cref="ArgumentException"></exception>
/// <exception cref="ObjectDisposedException"></exception>
/// <exception cref="InvalidOperationException"></exception>
/// <exception cref="FBMInvalidRequestException"></exception>
- public async Task<FBMResponse> SendAsync(FBMRequest request, CancellationToken cancellationToken = default)
+ public async Task<FBMResponse> SendAsync(FBMRequest request, TimeSpan timeout, CancellationToken cancellationToken = default)
{
Check();
@@ -211,7 +217,7 @@ namespace VNLib.Net.Messaging.FBM.Client
}
//wait for the response to be set
- await request.Waiter.WaitAsync(Config.RequestTimeout, cancellationToken).ConfigureAwait(true);
+ await request.Waiter.WaitAsync(timeout, cancellationToken).ConfigureAwait(true);
Debug("Received {size} bytes for message {id}", request.Response?.Length ?? 0, request.MessageId);
@@ -235,12 +241,29 @@ namespace VNLib.Net.Messaging.FBM.Client
/// <param name="request">The request message to send to the server</param>
/// <param name="payload">Data to stream to the server</param>
/// <param name="ct">The content type of the stream of data</param>
- /// <param name="cancellationToken"></param>
+ /// <param name="cancellationToken">A token to cancel the operation</param>
/// <returns>A task that resolves when the data is sent and the resonse is received</returns>
/// <exception cref="ArgumentException"></exception>
/// <exception cref="ObjectDisposedException"></exception>
/// <exception cref="InvalidOperationException"></exception>
- public async Task StreamDataAsync(FBMRequest request, Stream payload, ContentType ct, CancellationToken cancellationToken = default)
+ public Task StreamDataAsync(FBMRequest request, Stream payload, ContentType ct, CancellationToken cancellationToken = default)
+ {
+ return StreamDataAsync(request, payload, ct, Config.RequestTimeout, cancellationToken);
+ }
+
+ /// <summary>
+ /// Streams arbitrary binary data to the server with the initial request message
+ /// </summary>
+ /// <param name="request">The request message to send to the server</param>
+ /// <param name="payload">Data to stream to the server</param>
+ /// <param name="ct">The content type of the stream of data</param>
+ /// <param name="cancellationToken">A token to cancel the operation</param>
+ /// <param name="timeout">A maxium wait timeout period. If -1 or 0 the timeout is disabled</param>
+ /// <returns>A task that resolves when the data is sent and the resonse is received</returns>
+ /// <exception cref="ArgumentException"></exception>
+ /// <exception cref="ObjectDisposedException"></exception>
+ /// <exception cref="InvalidOperationException"></exception>
+ public async Task StreamDataAsync(FBMRequest request, Stream payload, ContentType ct, TimeSpan timeout, CancellationToken cancellationToken = default)
{
Check();
@@ -288,7 +311,7 @@ namespace VNLib.Net.Messaging.FBM.Client
}
//wait for the server to respond
- await request.Waiter.WaitAsync(Config.RequestTimeout, cancellationToken).ConfigureAwait(true);
+ await request.Waiter.WaitAsync(timeout, cancellationToken).ConfigureAwait(true);
Debug("Response recieved {size} bytes for message {id}", request.Response?.Length ?? 0, request.MessageId);
}
@@ -304,7 +327,25 @@ namespace VNLib.Net.Messaging.FBM.Client
}
}
-
+
+ /// <summary>
+ /// Closes the underlying <see cref="WebSocket"/> and cancels all pending operations
+ /// </summary>
+ /// <param name="cancellationToken"></param>
+ /// <returns></returns>
+ /// <exception cref="ObjectDisposedException"></exception>
+ public async Task DisconnectAsync(CancellationToken cancellationToken = default)
+ {
+ Check();
+ //Close the connection
+ await ClientSocket.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, "Closing", cancellationToken);
+ }
+
+ ///<inheritdoc/>
+ public void CacheClear() => RequestRental.CacheClear();
+ ///<inheritdoc/>
+ public void CacheHardClear() => RequestRental.CacheHardClear();
+
private void CheckOrEnqueue(FBMRequest request)
{
@@ -326,6 +367,11 @@ namespace VNLib.Net.Messaging.FBM.Client
{
_ = request ?? throw new ArgumentNullException(nameof(request));
+ if(request.MessageId == 0)
+ {
+ throw new FBMInvalidRequestException("The request message id must NOT be 0");
+ }
+
//Length of the request must contains at least 1 int and header byte
if (request.Length < 1 + sizeof(int))
{
@@ -485,19 +531,7 @@ namespace VNLib.Net.Messaging.FBM.Client
{
}
-
- /// <summary>
- /// Closes the underlying <see cref="WebSocket"/> and cancels all pending operations
- /// </summary>
- /// <param name="cancellationToken"></param>
- /// <returns></returns>
- /// <exception cref="ObjectDisposedException"></exception>
- public async Task DisconnectAsync(CancellationToken cancellationToken = default)
- {
- Check();
- //Close the connection
- await ClientSocket.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, "Closing", cancellationToken);
- }
+
///<inheritdoc/>
protected override void Free()
{
@@ -508,9 +542,5 @@ namespace VNLib.Net.Messaging.FBM.Client
SendLock.Dispose();
ConnectionStatusHandle.Dispose();
}
- ///<inheritdoc/>
- public void CacheClear() => RequestRental.CacheClear();
- ///<inheritdoc/>
- public void CacheHardClear() => RequestRental.CacheHardClear();
}
}