aboutsummaryrefslogtreecommitdiff
path: root/lib/Net.Messaging.FBM/src/Client
diff options
context:
space:
mode:
Diffstat (limited to 'lib/Net.Messaging.FBM/src/Client')
-rw-r--r--lib/Net.Messaging.FBM/src/Client/FBMBuffer.cs94
-rw-r--r--lib/Net.Messaging.FBM/src/Client/FBMClient.cs117
-rw-r--r--lib/Net.Messaging.FBM/src/Client/FBMClientConfig.cs3
-rw-r--r--lib/Net.Messaging.FBM/src/Client/FBMRequest.cs131
-rw-r--r--lib/Net.Messaging.FBM/src/Client/FBMResponse.cs9
-rw-r--r--lib/Net.Messaging.FBM/src/Client/IFBMMessage.cs62
-rw-r--r--lib/Net.Messaging.FBM/src/Client/IFBMMessageWaiter.cs6
-rw-r--r--lib/Net.Messaging.FBM/src/Client/ManagedClientWebSocket.cs4
8 files changed, 201 insertions, 225 deletions
diff --git a/lib/Net.Messaging.FBM/src/Client/FBMBuffer.cs b/lib/Net.Messaging.FBM/src/Client/FBMBuffer.cs
index fac41a6..698a98b 100644
--- a/lib/Net.Messaging.FBM/src/Client/FBMBuffer.cs
+++ b/lib/Net.Messaging.FBM/src/Client/FBMBuffer.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2022 Vaughn Nugent
+* Copyright (c) 2023 Vaughn Nugent
*
* Library: VNLib
* Package: VNLib.Net.Messaging.FBM
@@ -28,28 +28,50 @@ using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using VNLib.Utils.IO;
+using VNLib.Utils.Memory.Caching;
namespace VNLib.Net.Messaging.FBM.Client
{
/// <summary>
/// Represents a shared internal character and bianry buffer for
/// </summary>
- internal sealed class FBMBuffer : IFBMHeaderBuffer, IDisposable
+ internal sealed class FBMBuffer : IFBMHeaderBuffer, IDisposable, IReusable
{
- private readonly IMemoryOwner<byte> Handle;
+ private readonly IFBMMemoryHandle Handle;
+ private readonly IFBMMemoryManager _memoryManager;
+ private readonly int _size;
private readonly BufferWriter _writer;
private readonly BinaryRequestAccumulator _requestAccumulator;
- internal FBMBuffer(IMemoryOwner<byte> handle)
+ internal FBMBuffer(IFBMMemoryManager manager, int bufferSize)
{
- Handle = handle;
+ _memoryManager = manager;
+ Handle = manager.InitHandle();
_writer = new(this);
- _requestAccumulator = new(handle.Memory);
+ _size = bufferSize;
+ _requestAccumulator = new(this, bufferSize);
}
+ /*
+ * Reusable methods will alloc and free buffers during
+ * normal operation.
+ */
+
+ ///<inheritdoc/>
+ public void Prepare() => _memoryManager.AllocBuffer(Handle, _size);
+
+ ///<inheritdoc/>
+ public bool Release()
+ {
+ _memoryManager.FreeBuffer(Handle);
+ return true;
+ }
+
+ public void Dispose() => _ = Release();
+
/// <summary>
/// Gets the internal request data accumulator
/// </summary>
@@ -73,13 +95,6 @@ namespace VNLib.Net.Messaging.FBM.Client
//Return the internal writer
return _writer;
}
-
-
- public void Dispose()
- {
- //Dispose handle
- Handle.Dispose();
- }
/// <summary>
/// Resets the request accumulator and writes the initial message id
@@ -91,7 +106,7 @@ namespace VNLib.Net.Messaging.FBM.Client
_requestAccumulator.Reset();
//Write message id to accumulator, it should already be reset
- Helpers.WriteMessageid(RequestBuffer, messageId);
+ Helpers.WriteMessageid(_requestAccumulator, messageId);
}
///<inheritdoc/>
@@ -99,24 +114,24 @@ namespace VNLib.Net.Messaging.FBM.Client
Span<char> IFBMHeaderBuffer.GetSpan(int offset, int count)
{
//Get the character span
- Span<char> span = MemoryMarshal.Cast<byte, char>(Handle.Memory.Span);
+ Span<char> span = MemoryMarshal.Cast<byte, char>(Handle.GetSpan());
return span.Slice(offset, count);
}
///<inheritdoc/>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
- Span<char> IFBMHeaderBuffer.GetSpan() => MemoryMarshal.Cast<byte, char>(Handle.Memory.Span);
+ Span<char> IFBMHeaderBuffer.GetSpan() => MemoryMarshal.Cast<byte, char>(Handle.GetSpan());
private sealed class BinaryRequestAccumulator : IDataAccumulator<byte>
{
private readonly int Size;
- private readonly Memory<byte> Buffer;
+ private readonly FBMBuffer Buffer;
- internal BinaryRequestAccumulator(Memory<byte> buffer)
+ internal BinaryRequestAccumulator(FBMBuffer buffer, int size)
{
Buffer = buffer;
- Size = buffer.Length;
+ Size = size;
}
///<inheritdoc/>
@@ -126,52 +141,47 @@ namespace VNLib.Net.Messaging.FBM.Client
public int RemainingSize => Size - AccumulatedSize;
///<inheritdoc/>
- public Span<byte> Remaining => Buffer.Span.Slice(AccumulatedSize, RemainingSize);
+ public Span<byte> Remaining => Buffer.Handle.GetSpan().Slice(AccumulatedSize, RemainingSize);
+
///<inheritdoc/>
- public Span<byte> Accumulated => Buffer.Span[..AccumulatedSize];
+ public Span<byte> Accumulated => Buffer.Handle.GetSpan()[..AccumulatedSize];
/// <summary>
/// Gets the accumulated data as a memory segment
/// </summary>
- public Memory<byte> AccumulatedMemory => Buffer[..AccumulatedSize];
+ public Memory<byte> AccumulatedMemory => Buffer.Handle.GetMemory()[..AccumulatedSize];
/// <summary>
/// Gets the remaining buffer segment as a memory segment
/// </summary>
- public Memory<byte> RemainingMemory => Buffer.Slice(AccumulatedSize, RemainingSize);
+ public Memory<byte> RemainingMemory => Buffer.Handle.GetMemory().Slice(AccumulatedSize, RemainingSize);
///<inheritdoc/>
public void Advance(int count) => AccumulatedSize += count;
+
///<inheritdoc/>
public void Reset() => AccumulatedSize = 0;
}
+ /*
+ * A buffer writer that wraps the request accumulator to allow
+ * a finite amount of data to be written to the accumulator since
+ * it uses a fixed size internal buffer.
+ */
private sealed class BufferWriter : IBufferWriter<byte>
{
private readonly FBMBuffer Buffer;
- public BufferWriter(FBMBuffer buffer)
- {
- Buffer = buffer;
- }
+ public BufferWriter(FBMBuffer buffer) => Buffer = buffer;
- public void Advance(int count)
- {
- //Advance the writer
- Buffer.RequestBuffer.Advance(count);
- }
+ ///<inheritdoc/>
+ public void Advance(int count) => Buffer._requestAccumulator.Advance(count);
- public Memory<byte> GetMemory(int sizeHint = 0)
- {
- //Get the remaining memory segment
- return Buffer._requestAccumulator.RemainingMemory;
- }
+ ///<inheritdoc/>
+ public Memory<byte> GetMemory(int sizeHint = 0) => Buffer._requestAccumulator.RemainingMemory;
- public Span<byte> GetSpan(int sizeHint = 0)
- {
- //Get the remaining data segment
- return Buffer.RequestBuffer.Remaining;
- }
+ ///<inheritdoc/>
+ public Span<byte> GetSpan(int sizeHint = 0) => Buffer._requestAccumulator.Remaining;
}
}
}
diff --git a/lib/Net.Messaging.FBM/src/Client/FBMClient.cs b/lib/Net.Messaging.FBM/src/Client/FBMClient.cs
index 5184c38..c8319fa 100644
--- a/lib/Net.Messaging.FBM/src/Client/FBMClient.cs
+++ b/lib/Net.Messaging.FBM/src/Client/FBMClient.cs
@@ -24,7 +24,6 @@
using System;
using System.IO;
-using System.Buffers;
using System.Threading;
using System.Net.WebSockets;
using System.Threading.Tasks;
@@ -34,9 +33,12 @@ using System.Collections.Concurrent;
using VNLib.Net.Http;
using VNLib.Utils;
using VNLib.Utils.IO;
+using VNLib.Utils.Memory;
+using VNLib.Utils.Memory.Caching;
using VNLib.Utils.Logging;
using VNLib.Utils.Extensions;
-using VNLib.Utils.Memory.Caching;
+
+#pragma warning disable CA2007 // Consider calling ConfigureAwait on the awaited task
namespace VNLib.Net.Messaging.FBM.Client
{
@@ -60,6 +62,8 @@ namespace VNLib.Net.Messaging.FBM.Client
/// </summary>
public const string REQ_MAX_MESS_QUERY_ARG = "mx";
+ public const int MAX_STREAM_BUFFER_SIZE = 128 * 1024;
+
/// <summary>
/// Raised when the websocket has been closed because an error occured.
/// You may inspect the event args to determine the cause of the error.
@@ -74,12 +78,14 @@ namespace VNLib.Net.Messaging.FBM.Client
private readonly SemaphoreSlim SendLock;
private readonly ConcurrentDictionary<int, FBMRequest> ActiveRequests;
private readonly ObjectRental<FBMRequest> RequestRental;
+ private readonly FBMClientConfig _config;
+ private readonly byte[] _streamBuffer;
/// <summary>
/// The configuration for the current client
/// </summary>
- public FBMClientConfig Config { get; }
-
+ public ref readonly FBMClientConfig Config => ref _config;
+
/// <summary>
/// A handle that is reset when a connection has been successfully set, and is set
/// when the connection exists
@@ -103,9 +109,13 @@ namespace VNLib.Net.Messaging.FBM.Client
ConnectionStatusHandle = new(true);
ActiveRequests = new(Environment.ProcessorCount, 100);
- Config = config;
+ _config = config;
//Init the new client socket
ClientSocket = new(config.RecvBufferSize, config.RecvBufferSize, config.KeepAliveInterval, config.SubProtocol);
+
+ //Init the stream buffer
+ int maxStrmBufSize = Math.Min(config.MaxMessageSize, MAX_STREAM_BUFFER_SIZE);
+ _streamBuffer = new byte[maxStrmBufSize];
}
private void Debug(string format, params string[] strings)
@@ -127,7 +137,7 @@ namespace VNLib.Net.Messaging.FBM.Client
/// Allocates and configures a new <see cref="FBMRequest"/> message object for use within the reusable store
/// </summary>
/// <returns>The configured <see cref="FBMRequest"/></returns>
- protected virtual FBMRequest ReuseableRequestConstructor() => new(Config);
+ protected virtual FBMRequest ReuseableRequestConstructor() => new(in _config);
/// <summary>
/// Asynchronously opens a websocket connection with the specifed remote server
@@ -180,10 +190,7 @@ namespace VNLib.Net.Messaging.FBM.Client
/// <exception cref="ObjectDisposedException"></exception>
/// <exception cref="InvalidOperationException"></exception>
/// <exception cref="FBMInvalidRequestException"></exception>
- public Task<FBMResponse> SendAsync(FBMRequest request, CancellationToken cancellationToken = default)
- {
- return SendAsync(request, Config.RequestTimeout, cancellationToken);
- }
+ public Task<FBMResponse> SendAsync(FBMRequest request, CancellationToken cancellationToken = default) => SendAsync(request, _config.RequestTimeout, cancellationToken);
/// <summary>
/// Sends a <see cref="FBMRequest"/> to the connected server
@@ -211,6 +218,8 @@ namespace VNLib.Net.Messaging.FBM.Client
try
{
+ FBMResponse response = new();
+
//Get the request data segment
ReadOnlyMemory<byte> requestData = request.GetRequestData();
@@ -224,22 +233,26 @@ namespace VNLib.Net.Messaging.FBM.Client
}
//wait for the response to be set
- await request.Waiter.WaitAsync(timeout, cancellationToken).ConfigureAwait(true);
+ await request.Waiter.GetTask(timeout, cancellationToken).ConfigureAwait(true);
Debug("Received {size} bytes for message {id}", request.Response?.Length ?? 0, request.MessageId);
- return request.GetResponse();
+ //Get the response data
+ request.GetResponse(ref response);
+
+ return response;
}
catch
{
//Remove the request since packet was never sent
ActiveRequests.Remove(request.MessageId, out _);
-
- //Cleanup waiter
- request.Waiter.OnEndRequest();
-
throw;
}
+ finally
+ {
+ //Always cleanup waiter
+ request.Waiter.OnEndRequest();
+ }
}
/// <summary>
@@ -247,30 +260,28 @@ namespace VNLib.Net.Messaging.FBM.Client
/// </summary>
/// <param name="request">The request message to send to the server</param>
/// <param name="payload">Data to stream to the server</param>
- /// <param name="ct">The content type of the stream of data</param>
+ /// <param name="contentType">The content type of the stream of data</param>
/// <param name="cancellationToken">A token to cancel the operation</param>
/// <returns>A task that resolves when the data is sent and the resonse is received</returns>
/// <exception cref="ArgumentException"></exception>
/// <exception cref="ObjectDisposedException"></exception>
/// <exception cref="InvalidOperationException"></exception>
- public Task StreamDataAsync(FBMRequest request, Stream payload, ContentType ct, CancellationToken cancellationToken = default)
- {
- return StreamDataAsync(request, payload, ct, Config.RequestTimeout, cancellationToken);
- }
+ public Task<FBMResponse> StreamDataAsync(FBMRequest request, Stream payload, ContentType contentType, CancellationToken cancellationToken = default)
+ => StreamDataAsync(request, payload, contentType, _config.RequestTimeout, cancellationToken);
/// <summary>
/// Streams arbitrary binary data to the server with the initial request message
/// </summary>
/// <param name="request">The request message to send to the server</param>
/// <param name="payload">Data to stream to the server</param>
- /// <param name="ct">The content type of the stream of data</param>
+ /// <param name="contentType">The content type of the stream of data</param>
/// <param name="cancellationToken">A token to cancel the operation</param>
/// <param name="timeout">A maxium wait timeout period. If -1 or 0 the timeout is disabled</param>
/// <returns>A task that resolves when the data is sent and the resonse is received</returns>
/// <exception cref="ArgumentException"></exception>
/// <exception cref="ObjectDisposedException"></exception>
/// <exception cref="InvalidOperationException"></exception>
- public async Task StreamDataAsync(FBMRequest request, Stream payload, ContentType ct, TimeSpan timeout, CancellationToken cancellationToken = default)
+ public async Task<FBMResponse> StreamDataAsync(FBMRequest request, Stream payload, ContentType contentType, TimeSpan timeout, CancellationToken cancellationToken = default)
{
Check();
@@ -282,19 +293,15 @@ namespace VNLib.Net.Messaging.FBM.Client
try
{
+ FBMResponse response = new();
+
//Get the request data segment
ReadOnlyMemory<byte> requestData = request.GetRequestData();
Debug("Streaming {bytes} with id {id}", requestData.Length, request.MessageId);
- //Write an empty body in the request
- request.WriteBody(ReadOnlySpan<byte>.Empty, ct);
-
- //Calc buffer size
- int bufSize = (int)Math.Clamp(payload.Length, Config.MessageBufferSize, Config.MaxMessageSize);
-
- //Alloc a streaming buffer
- using IMemoryOwner<byte> buffer = Config.BufferHeap.DirectAlloc<byte>(bufSize);
+ //Write an empty body in the request so a content type header is writen
+ request.WriteBody(ReadOnlySpan<byte>.Empty, contentType);
//Wait for send-lock
using (SemSlimReleaser releaser = await SendLock.GetReleaserAsync(cancellationToken))
@@ -306,7 +313,7 @@ namespace VNLib.Net.Messaging.FBM.Client
do
{
//Read data
- int read = await payload.ReadAsync(buffer.Memory, cancellationToken);
+ int read = await payload.ReadAsync(_streamBuffer, cancellationToken);
if (read == 0)
{
@@ -315,29 +322,32 @@ namespace VNLib.Net.Messaging.FBM.Client
}
//write message to socket, if the read data was smaller than the buffer, we can send the last packet
- await ClientSocket.SendAsync(buffer.Memory[..read], WebSocketMessageType.Binary, read < bufSize, cancellationToken);
+ await ClientSocket.SendAsync(_streamBuffer[..read], WebSocketMessageType.Binary, read < _streamBuffer.Length, cancellationToken);
} while (true);
}
//wait for the server to respond
- await request.Waiter.WaitAsync(timeout, cancellationToken).ConfigureAwait(true);
+ await request.Waiter.GetTask(timeout, cancellationToken).ConfigureAwait(true);
Debug("Response recieved {size} bytes for message {id}", request.Response?.Length ?? 0, request.MessageId);
+
+ request.GetResponse(ref response);
+ return response;
}
catch
{
//Remove the request since packet was never sent or cancelled
_ = ActiveRequests.TryRemove(request.MessageId, out _);
-
- //Cleanup request waiter
- request.Waiter.OnEndRequest();
-
throw;
}
+ finally
+ {
+ //Always cleanup waiter
+ request.Waiter.OnEndRequest();
+ }
}
-
/// <summary>
/// Closes the underlying <see cref="WebSocket"/> and cancels all pending operations
/// </summary>
@@ -399,14 +409,20 @@ namespace VNLib.Net.Messaging.FBM.Client
Debug("Begining receive loop");
//Alloc recv buffer
- IMemoryOwner<byte> recvBuffer = Config.BufferHeap.DirectAlloc<byte>(Config.RecvBufferSize);
+ IFBMMemoryHandle recvBuffer = _config.MemoryManager.InitHandle();
+ _config.MemoryManager.AllocBuffer(recvBuffer, _config.RecvBufferSize);
try
{
+ if(!_config.MemoryManager.TryGetHeap(out IUnmangedHeap? heap))
+ {
+ throw new NotSupportedException("The memory manager must support using IUnmanagedHeaps");
+ }
+
//Recv event loop
while (true)
{
//Listen for incoming packets with the intial data buffer
- ValueWebSocketReceiveResult result = await socket.ReceiveAsync(recvBuffer.Memory, CancellationToken.None);
+ ValueWebSocketReceiveResult result = await socket.ReceiveAsync(recvBuffer.GetMemory(), CancellationToken.None);
//If the message is a close message, its time to exit
if (result.MessageType == WebSocketMessageType.Close)
@@ -422,19 +438,19 @@ namespace VNLib.Net.Messaging.FBM.Client
}
//Alloc data buffer and write initial data
- VnMemoryStream responseBuffer = new(Config.BufferHeap);
+ VnMemoryStream responseBuffer = new(heap);
//Copy initial data
- responseBuffer.Write(recvBuffer.Memory.Span[..result.Count]);
+ responseBuffer.Write(recvBuffer.GetSpan()[..result.Count]);
//Receive packets until the EOF is reached
while (!result.EndOfMessage)
{
//recive more data
- result = await socket.ReceiveAsync(recvBuffer.Memory, CancellationToken.None);
+ result = await socket.ReceiveAsync(recvBuffer.GetMemory(), CancellationToken.None);
//Make sure the buffer is not too large
- if ((responseBuffer.Length + result.Count) > Config.MaxMessageSize)
+ if ((responseBuffer.Length + result.Count) > _config.MaxMessageSize)
{
//Dispose the buffer before exiting
responseBuffer.Dispose();
@@ -443,7 +459,7 @@ namespace VNLib.Net.Messaging.FBM.Client
}
//Copy continuous data
- responseBuffer.Write(recvBuffer.Memory.Span[..result.Count]);
+ responseBuffer.Write(recvBuffer.GetSpan()[..result.Count]);
}
//Reset the buffer stream position
@@ -472,7 +488,7 @@ namespace VNLib.Net.Messaging.FBM.Client
finally
{
//Dispose the recv buffer
- recvBuffer.Dispose();
+ _config.MemoryManager.FreeBuffer(recvBuffer);
//Cleanup the socket when exiting
ClientSocket.Cleanup();
//Set status handle as unset
@@ -522,7 +538,12 @@ namespace VNLib.Net.Messaging.FBM.Client
if (ActiveRequests.TryRemove(messageId, out FBMRequest? request))
{
//Set the new response message
- request.Waiter.Complete(responseMessage);
+ if (!request.Waiter.Complete(responseMessage))
+ {
+ //Falied to complete, dispose the message data
+ responseMessage.Dispose();
+ Debug("Failed to transition waiting request {id}. Message was dropped", messageId, 0);
+ }
}
else
{
diff --git a/lib/Net.Messaging.FBM/src/Client/FBMClientConfig.cs b/lib/Net.Messaging.FBM/src/Client/FBMClientConfig.cs
index 30e9a95..735a0a8 100644
--- a/lib/Net.Messaging.FBM/src/Client/FBMClientConfig.cs
+++ b/lib/Net.Messaging.FBM/src/Client/FBMClientConfig.cs
@@ -25,7 +25,6 @@
using System;
using System.Text;
-using VNLib.Utils.Memory;
using VNLib.Utils.Logging;
namespace VNLib.Net.Messaging.FBM.Client
@@ -59,7 +58,7 @@ namespace VNLib.Net.Messaging.FBM.Client
/// <summary>
/// The heap to allocate internal (and message) buffers from
/// </summary>
- public readonly IUnmangedHeap BufferHeap { get; init; }
+ public readonly IFBMMemoryManager MemoryManager { get; init; }
/// <summary>
/// The websocket keepalive interval to use (leaving this property default disables keepalives)
/// </summary>
diff --git a/lib/Net.Messaging.FBM/src/Client/FBMRequest.cs b/lib/Net.Messaging.FBM/src/Client/FBMRequest.cs
index c4fb493..418a9ec 100644
--- a/lib/Net.Messaging.FBM/src/Client/FBMRequest.cs
+++ b/lib/Net.Messaging.FBM/src/Client/FBMRequest.cs
@@ -26,6 +26,7 @@ using System;
using System.Text;
using System.Buffers;
using System.Threading;
+using System.Diagnostics;
using System.Threading.Tasks;
using System.Collections.Generic;
using System.Runtime.CompilerServices;
@@ -37,7 +38,6 @@ using VNLib.Utils.Memory;
using VNLib.Utils.Extensions;
using VNLib.Utils.Memory.Caching;
-
namespace VNLib.Net.Messaging.FBM.Client
{
/// <summary>
@@ -97,29 +97,29 @@ namespace VNLib.Net.Messaging.FBM.Client
/// <param name="messageId">The custom message id</param>
/// <param name="config">The fbm client config storing required config variables</param>
public FBMRequest(int messageId, in FBMClientConfig config)
- :this(messageId, config.BufferHeap, config.MessageBufferSize, config.HeaderEncoding)
+ :this(messageId, config.MemoryManager, config.MessageBufferSize, config.HeaderEncoding)
{ }
/// <summary>
/// Initializes a new <see cref="FBMRequest"/> with the sepcified message buffer size and a custom MessageId
/// </summary>
/// <param name="messageId">The custom message id</param>
- /// <param name="heap">The heap to allocate the internal buffer from</param>
+ /// <param name="manager">The memory manager used to allocate the internal buffers</param>
/// <param name="bufferSize">The size of the internal buffer</param>
/// <param name="headerEncoding">The encoding instance used for header character encoding</param>
- public FBMRequest(int messageId, IUnmangedHeap heap, int bufferSize, Encoding headerEncoding)
+ public FBMRequest(int messageId, IFBMMemoryManager manager, int bufferSize, Encoding headerEncoding)
{
MessageId = messageId;
- HeaderEncoding = headerEncoding;
+ HeaderEncoding = headerEncoding ?? throw new ArgumentNullException(nameof(headerEncoding));
+ _ = manager ?? throw new ArgumentNullException(nameof(manager));
//Configure waiter
Waiter = new FBMMessageWaiter(this);
-
- //Alloc the buffer as a memory owner so a memory buffer can be used
- IMemoryOwner<byte> HeapBuffer = heap.DirectAlloc<byte>(bufferSize);
- Buffer = new(HeapBuffer);
+
+ Buffer = new(manager, bufferSize);
//Prepare the message incase the request is fresh
+ Buffer.Prepare();
Reset();
}
@@ -152,19 +152,13 @@ namespace VNLib.Net.Messaging.FBM.Client
/// The request message packet, this may cause side effects
/// </summary>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
- public ReadOnlyMemory<byte> GetRequestData()
- {
- return Buffer.RequestData;
- }
+ public ReadOnlyMemory<byte> GetRequestData() => Buffer.RequestData;
/// <summary>
/// Resets the internal buffer and allows for writing a new message with
/// the same message-id
/// </summary>
- public void Reset()
- {
- Buffer.Reset(MessageId);
- }
+ public void Reset() => Buffer.Reset(MessageId);
///<inheritdoc/>
protected override void Free()
@@ -175,7 +169,12 @@ namespace VNLib.Net.Messaging.FBM.Client
(Waiter as FBMMessageWaiter)!.Dispose();
}
- void IReusable.Prepare() => Reset();
+ void IReusable.Prepare()
+ {
+ //MUST BE CALLED FIRST!
+ Buffer.Prepare();
+ Reset();
+ }
bool IReusable.Release()
{
@@ -186,6 +185,9 @@ namespace VNLib.Net.Messaging.FBM.Client
Response?.Dispose();
Response = null;
+ //Free buffer
+ Buffer.Release();
+
return true;
}
@@ -195,7 +197,7 @@ namespace VNLib.Net.Messaging.FBM.Client
/// Gets the response of the sent message
/// </summary>
/// <returns>The response message for the current request</returns>
- internal FBMResponse GetResponse()
+ internal void GetResponse(ref FBMResponse response)
{
if (Response != null)
{
@@ -215,11 +217,7 @@ namespace VNLib.Net.Messaging.FBM.Client
HeaderParseError statusFlags = Helpers.ParseHeaders(Response, Buffer, ResponseHeaderList, HeaderEncoding);
//return response structure
- return new(Response, statusFlags, ResponseHeaderList, OnResponseDisposed);
- }
- else
- {
- return new();
+ response = new(Response, statusFlags, ResponseHeaderList, OnResponseDisposed);
}
}
@@ -272,12 +270,13 @@ namespace VNLib.Net.Messaging.FBM.Client
#endregion
#region waiter
- private sealed class FBMMessageWaiter : IFBMMessageWaiter, IDisposable
+ private sealed class FBMMessageWaiter : IFBMMessageWaiter, IDisposable, IThreadPoolWorkItem
{
private readonly Timer _timer;
private readonly FBMRequest _request;
private TaskCompletionSource? _tcs;
+ private CancellationTokenRegistration _token;
public FBMMessageWaiter(FBMRequest request)
{
@@ -298,79 +297,89 @@ namespace VNLib.Net.Messaging.FBM.Client
public void OnEndRequest()
{
//Cleanup tcs ref
- _ = Interlocked.Exchange(ref _tcs, null);
+ _tcs = null;
- //Stop timer if set
+ //Always stop timer if set
_timer.Stop();
+
+ //Cleanup cancellation token
+ _token.Dispose();
}
///<inheritdoc/>
- public void Complete(VnMemoryStream ms)
+ public bool Complete(VnMemoryStream ms)
{
- //Read the current state of the tcs
TaskCompletionSource? tcs = _tcs;
- if (tcs == null)
+ //Work is done/cancelled
+ if (tcs != null && tcs.Task.IsCompleted)
{
- //Work is done/cancelled, dispose the ms and leave
- ms.Dispose();
+ return false;
}
//Store response
_request.Response = ms;
- //Transition to completed state in background thread
- static void OnTpCallback(object? state)
- {
- _ = (state as TaskCompletionSource)!.TrySetResult();
- }
-
/*
* The calling thread may be a TP thread proccessing an async event loop.
* We do not want to block this worker thread.
*/
- ThreadPool.UnsafeQueueUserWorkItem(OnTpCallback, tcs);
+ return ThreadPool.UnsafeQueueUserWorkItem(this, true);
}
+ /*
+ * Called when scheduled on the TP thread pool
+ */
///<inheritdoc/>
- public async Task WaitAsync(TimeSpan timeout, CancellationToken cancellation)
+ public void Execute() => _tcs?.TrySetResult();
+
+
+ ///<inheritdoc/>
+ public Task GetTask(TimeSpan timeout, CancellationToken cancellation)
{
- if (timeout.Ticks > 0)
- {
- //Restart timer if timeout is configured
- _timer.Restart(timeout);
- }
+ TaskCompletionSource? tcs = _tcs;
- //Confim the token may be cancelled
- if (cancellation.CanBeCanceled)
- {
- //Register cancellation
- using CancellationTokenRegistration reg = cancellation.Register(OnCancelled, this, false);
+ Debug.Assert(tcs != null, "A call to GetTask was made outside of the request flow, the TaskCompletionSource was null");
- //await task that may be canclled
- await _tcs.Task.ConfigureAwait(false);
- }
- else
+ /*
+ * Get task will only be called after the message has been sent.
+ * The Complete method may have already scheduled a completion by
+ * the time this method is called, so we may avoid setting up the
+ * timer and cancellation if possible. Also since this mthod is
+ * called from the request side, we know the tcs cannot be null
+ */
+
+ if (!tcs.Task.IsCompleted)
{
- //await the task directly
- await _tcs.Task.ConfigureAwait(false);
+ if (timeout.Ticks > 0)
+ {
+ //Restart timer if timeout is configured
+ _timer.Restart(timeout);
+ }
+
+ if (cancellation.CanBeCanceled)
+ {
+ //Register cancellation
+ _token = cancellation.Register(OnCancelled, this);
+ }
}
+
+ return tcs.Task;
}
///<inheritdoc/>
public void ManualCancellation() => OnCancelled(this);
- private void OnCancelled(object? state)
- {
- //Set cancelled state if exists
- _ = _tcs?.TrySetCanceled();
- }
+ //Set cancelled state if exists, the task may have already completed
+ private void OnCancelled(object? state) => _tcs?.TrySetCanceled();
///<inheritdoc/>
public void Dispose()
{
_timer.Dispose();
+ _token.Dispose();
}
+
}
#endregion
diff --git a/lib/Net.Messaging.FBM/src/Client/FBMResponse.cs b/lib/Net.Messaging.FBM/src/Client/FBMResponse.cs
index 6f8fec4..f1148f1 100644
--- a/lib/Net.Messaging.FBM/src/Client/FBMResponse.cs
+++ b/lib/Net.Messaging.FBM/src/Client/FBMResponse.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2022 Vaughn Nugent
+* Copyright (c) 2023 Vaughn Nugent
*
* Library: VNLib
* Package: VNLib.Net.Messaging.FBM
@@ -90,17 +90,16 @@ namespace VNLib.Net.Messaging.FBM.Client
/// <summary>
/// Releases any resources associated with the response message
/// </summary>
- public void Dispose() => _onDispose?.Invoke();
+ public readonly void Dispose() => _onDispose?.Invoke();
///<inheritdoc/>
public override bool Equals(object? obj) => obj is FBMResponse response && Equals(response);
///<inheritdoc/>
+ public bool Equals(FBMResponse other) => IsSet && other.IsSet && ReferenceEquals(MessagePacket, other.MessagePacket);
+ ///<inheritdoc/>
public override int GetHashCode() => IsSet ? MessagePacket!.GetHashCode() : 0;
///<inheritdoc/>
public static bool operator ==(FBMResponse left, FBMResponse right) => left.Equals(right);
///<inheritdoc/>
public static bool operator !=(FBMResponse left, FBMResponse right) => !(left == right);
- ///<inheritdoc/>
- public bool Equals(FBMResponse other) => IsSet && other.IsSet && MessagePacket == other.MessagePacket;
-
}
}
diff --git a/lib/Net.Messaging.FBM/src/Client/IFBMMessage.cs b/lib/Net.Messaging.FBM/src/Client/IFBMMessage.cs
deleted file mode 100644
index 18f19ec..0000000
--- a/lib/Net.Messaging.FBM/src/Client/IFBMMessage.cs
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
-* Copyright (c) 2022 Vaughn Nugent
-*
-* Library: VNLib
-* Package: VNLib.Net.Messaging.FBM
-* File: IFBMMessage.cs
-*
-* IFBMMessage.cs is part of VNLib.Net.Messaging.FBM which is part of the larger
-* VNLib collection of libraries and utilities.
-*
-* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify
-* it under the terms of the GNU Affero General Public License as
-* published by the Free Software Foundation, either version 3 of the
-* License, or (at your option) any later version.
-*
-* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-* GNU Affero General Public License for more details.
-*
-* You should have received a copy of the GNU Affero General Public License
-* along with this program. If not, see https://www.gnu.org/licenses/.
-*/
-
-using System;
-
-using VNLib.Net.Http;
-
-namespace VNLib.Net.Messaging.FBM.Client
-{
- /// <summary>
- /// Represents basic Fixed Buffer Message protocol operations
- /// </summary>
- public interface IFBMMessage
- {
- /// <summary>
- /// The unique id of the message (nonzero)
- /// </summary>
- int MessageId { get; }
- /// <summary>
- /// Writes a data body to the message of the specified content type
- /// </summary>
- /// <param name="body">The body of the message to copy</param>
- /// <param name="contentType">The content type of the message body</param>
- /// <exception cref="OutOfMemoryException"></exception>
- void WriteBody(ReadOnlySpan<byte> body, ContentType contentType = ContentType.Binary);
- /// <summary>
- /// Appends an arbitrary header to the current request buffer
- /// </summary>
- /// <param name="header">The header id</param>
- /// <param name="value">The value of the header</param>
- /// <exception cref="OutOfMemoryException"></exception>
- void WriteHeader(byte header, ReadOnlySpan<char> value);
- /// <summary>
- /// Appends an arbitrary header to the current request buffer
- /// </summary>
- /// <param name="header">The <see cref="HeaderCommand"/> of the header</param>
- /// <param name="value">The value of the header</param>
- /// <exception cref="OutOfMemoryException"></exception>
- void WriteHeader(HeaderCommand header, ReadOnlySpan<char> value);
- }
-} \ No newline at end of file
diff --git a/lib/Net.Messaging.FBM/src/Client/IFBMMessageWaiter.cs b/lib/Net.Messaging.FBM/src/Client/IFBMMessageWaiter.cs
index 5000711..cc8e1c4 100644
--- a/lib/Net.Messaging.FBM/src/Client/IFBMMessageWaiter.cs
+++ b/lib/Net.Messaging.FBM/src/Client/IFBMMessageWaiter.cs
@@ -48,9 +48,9 @@ namespace VNLib.Net.Messaging.FBM.Client
/// or a timeout
/// </summary>
/// <param name="timeout">The maxium time to wait for the server to respond (may be default/0)</param>
- /// <param name="cancellation">The cancellation token to observe</param>
+ /// <param name="cancellation">A token to cancel the wait task</param>
/// <returns>A task that completes when the server responds</returns>
- Task WaitAsync(TimeSpan timeout, CancellationToken cancellation);
+ Task GetTask(TimeSpan timeout, CancellationToken cancellation);
/// <summary>
/// Called by the client to cleanup the waiter when the request is completed
@@ -63,7 +63,7 @@ namespace VNLib.Net.Messaging.FBM.Client
/// Set by the client when the response has been successfully received by the client
/// </summary>
/// <param name="ms">The response data to pass to the response</param>
- void Complete(VnMemoryStream ms);
+ bool Complete(VnMemoryStream ms);
/// <summary>
/// Called to invoke a manual cancellation of a request waiter. This method should
diff --git a/lib/Net.Messaging.FBM/src/Client/ManagedClientWebSocket.cs b/lib/Net.Messaging.FBM/src/Client/ManagedClientWebSocket.cs
index d0352d3..fc2e417 100644
--- a/lib/Net.Messaging.FBM/src/Client/ManagedClientWebSocket.cs
+++ b/lib/Net.Messaging.FBM/src/Client/ManagedClientWebSocket.cs
@@ -43,7 +43,7 @@ namespace VNLib.Net.Messaging.FBM.Client
private readonly int TxBufferSize;
private readonly int RxBufferSize;
private readonly TimeSpan KeepAliveInterval;
- private readonly VnTempBuffer<byte> _dataBuffer;
+ private readonly ArrayPoolBuffer<byte> _dataBuffer;
private readonly string? _subProtocol;
/// <summary>
@@ -95,7 +95,7 @@ namespace VNLib.Net.Messaging.FBM.Client
try
{
//Set buffer
- _socket.Options.SetBuffer(RxBufferSize, TxBufferSize, _dataBuffer);
+ _socket.Options.SetBuffer(RxBufferSize, TxBufferSize, _dataBuffer.AsArraySegment());
//Set remaining stored options
_socket.Options.ClientCertificates = Certificates;
_socket.Options.KeepAliveInterval = KeepAliveInterval;