aboutsummaryrefslogtreecommitdiff
path: root/lib/Net.Messaging.FBM
diff options
context:
space:
mode:
authorLibravatar vnugent <public@vaughnnugent.com>2023-11-02 01:49:02 -0400
committerLibravatar vnugent <public@vaughnnugent.com>2023-11-02 01:49:02 -0400
commit9e3dd9be0f0ec7aaef1a719f09f96425e66369df (patch)
tree59b8bd4ace8750327db80823fa1e5eccdf44bc74 /lib/Net.Messaging.FBM
parenteafefadc4b858e5b5be481662a2b0c8e47a43bf4 (diff)
may have gottem carried away
Diffstat (limited to 'lib/Net.Messaging.FBM')
-rw-r--r--lib/Net.Messaging.FBM/src/Client/FBMBuffer.cs94
-rw-r--r--lib/Net.Messaging.FBM/src/Client/FBMClient.cs117
-rw-r--r--lib/Net.Messaging.FBM/src/Client/FBMClientConfig.cs3
-rw-r--r--lib/Net.Messaging.FBM/src/Client/FBMRequest.cs131
-rw-r--r--lib/Net.Messaging.FBM/src/Client/FBMResponse.cs9
-rw-r--r--lib/Net.Messaging.FBM/src/Client/IFBMMessageWaiter.cs6
-rw-r--r--lib/Net.Messaging.FBM/src/Client/ManagedClientWebSocket.cs4
-rw-r--r--lib/Net.Messaging.FBM/src/FBMMessageHeader.cs1
-rw-r--r--lib/Net.Messaging.FBM/src/FallbackFBMMemoryManager.cs140
-rw-r--r--lib/Net.Messaging.FBM/src/IFBMMemoryHandle.cs43
-rw-r--r--lib/Net.Messaging.FBM/src/IFBMMemoryManager.cs71
-rw-r--r--lib/Net.Messaging.FBM/src/IFBMMessage.cs (renamed from lib/Net.Messaging.FBM/src/Client/IFBMMessage.cs)4
-rw-r--r--lib/Net.Messaging.FBM/src/IFBMSpanOnlyMemoryHandle.cs42
-rw-r--r--lib/Net.Messaging.FBM/src/Server/FBMContext.cs9
-rw-r--r--lib/Net.Messaging.FBM/src/Server/FBMListener.cs108
-rw-r--r--lib/Net.Messaging.FBM/src/Server/FBMListenerBase.cs81
-rw-r--r--lib/Net.Messaging.FBM/src/Server/FBMListenerSessionParams.cs4
-rw-r--r--lib/Net.Messaging.FBM/src/Server/FBMRequestMessage.cs56
-rw-r--r--lib/Net.Messaging.FBM/src/Server/FBMResponseMessage.cs82
-rw-r--r--lib/Net.Messaging.FBM/src/Server/HeaderDataAccumulator.cs32
-rw-r--r--lib/Net.Messaging.FBM/src/Server/IAsyncMessageReader.cs4
-rw-r--r--lib/Net.Messaging.FBM/src/Server/IFBMServerErrorHandler.cs49
-rw-r--r--lib/Net.Messaging.FBM/src/Server/IFBMServerMessageHandler.cs43
-rw-r--r--lib/Net.Messaging.FBM/src/VNLib.Net.Messaging.FBM.csproj1
24 files changed, 746 insertions, 388 deletions
diff --git a/lib/Net.Messaging.FBM/src/Client/FBMBuffer.cs b/lib/Net.Messaging.FBM/src/Client/FBMBuffer.cs
index fac41a6..698a98b 100644
--- a/lib/Net.Messaging.FBM/src/Client/FBMBuffer.cs
+++ b/lib/Net.Messaging.FBM/src/Client/FBMBuffer.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2022 Vaughn Nugent
+* Copyright (c) 2023 Vaughn Nugent
*
* Library: VNLib
* Package: VNLib.Net.Messaging.FBM
@@ -28,28 +28,50 @@ using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using VNLib.Utils.IO;
+using VNLib.Utils.Memory.Caching;
namespace VNLib.Net.Messaging.FBM.Client
{
/// <summary>
/// Represents a shared internal character and bianry buffer for
/// </summary>
- internal sealed class FBMBuffer : IFBMHeaderBuffer, IDisposable
+ internal sealed class FBMBuffer : IFBMHeaderBuffer, IDisposable, IReusable
{
- private readonly IMemoryOwner<byte> Handle;
+ private readonly IFBMMemoryHandle Handle;
+ private readonly IFBMMemoryManager _memoryManager;
+ private readonly int _size;
private readonly BufferWriter _writer;
private readonly BinaryRequestAccumulator _requestAccumulator;
- internal FBMBuffer(IMemoryOwner<byte> handle)
+ internal FBMBuffer(IFBMMemoryManager manager, int bufferSize)
{
- Handle = handle;
+ _memoryManager = manager;
+ Handle = manager.InitHandle();
_writer = new(this);
- _requestAccumulator = new(handle.Memory);
+ _size = bufferSize;
+ _requestAccumulator = new(this, bufferSize);
}
+ /*
+ * Reusable methods will alloc and free buffers during
+ * normal operation.
+ */
+
+ ///<inheritdoc/>
+ public void Prepare() => _memoryManager.AllocBuffer(Handle, _size);
+
+ ///<inheritdoc/>
+ public bool Release()
+ {
+ _memoryManager.FreeBuffer(Handle);
+ return true;
+ }
+
+ public void Dispose() => _ = Release();
+
/// <summary>
/// Gets the internal request data accumulator
/// </summary>
@@ -73,13 +95,6 @@ namespace VNLib.Net.Messaging.FBM.Client
//Return the internal writer
return _writer;
}
-
-
- public void Dispose()
- {
- //Dispose handle
- Handle.Dispose();
- }
/// <summary>
/// Resets the request accumulator and writes the initial message id
@@ -91,7 +106,7 @@ namespace VNLib.Net.Messaging.FBM.Client
_requestAccumulator.Reset();
//Write message id to accumulator, it should already be reset
- Helpers.WriteMessageid(RequestBuffer, messageId);
+ Helpers.WriteMessageid(_requestAccumulator, messageId);
}
///<inheritdoc/>
@@ -99,24 +114,24 @@ namespace VNLib.Net.Messaging.FBM.Client
Span<char> IFBMHeaderBuffer.GetSpan(int offset, int count)
{
//Get the character span
- Span<char> span = MemoryMarshal.Cast<byte, char>(Handle.Memory.Span);
+ Span<char> span = MemoryMarshal.Cast<byte, char>(Handle.GetSpan());
return span.Slice(offset, count);
}
///<inheritdoc/>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
- Span<char> IFBMHeaderBuffer.GetSpan() => MemoryMarshal.Cast<byte, char>(Handle.Memory.Span);
+ Span<char> IFBMHeaderBuffer.GetSpan() => MemoryMarshal.Cast<byte, char>(Handle.GetSpan());
private sealed class BinaryRequestAccumulator : IDataAccumulator<byte>
{
private readonly int Size;
- private readonly Memory<byte> Buffer;
+ private readonly FBMBuffer Buffer;
- internal BinaryRequestAccumulator(Memory<byte> buffer)
+ internal BinaryRequestAccumulator(FBMBuffer buffer, int size)
{
Buffer = buffer;
- Size = buffer.Length;
+ Size = size;
}
///<inheritdoc/>
@@ -126,52 +141,47 @@ namespace VNLib.Net.Messaging.FBM.Client
public int RemainingSize => Size - AccumulatedSize;
///<inheritdoc/>
- public Span<byte> Remaining => Buffer.Span.Slice(AccumulatedSize, RemainingSize);
+ public Span<byte> Remaining => Buffer.Handle.GetSpan().Slice(AccumulatedSize, RemainingSize);
+
///<inheritdoc/>
- public Span<byte> Accumulated => Buffer.Span[..AccumulatedSize];
+ public Span<byte> Accumulated => Buffer.Handle.GetSpan()[..AccumulatedSize];
/// <summary>
/// Gets the accumulated data as a memory segment
/// </summary>
- public Memory<byte> AccumulatedMemory => Buffer[..AccumulatedSize];
+ public Memory<byte> AccumulatedMemory => Buffer.Handle.GetMemory()[..AccumulatedSize];
/// <summary>
/// Gets the remaining buffer segment as a memory segment
/// </summary>
- public Memory<byte> RemainingMemory => Buffer.Slice(AccumulatedSize, RemainingSize);
+ public Memory<byte> RemainingMemory => Buffer.Handle.GetMemory().Slice(AccumulatedSize, RemainingSize);
///<inheritdoc/>
public void Advance(int count) => AccumulatedSize += count;
+
///<inheritdoc/>
public void Reset() => AccumulatedSize = 0;
}
+ /*
+ * A buffer writer that wraps the request accumulator to allow
+ * a finite amount of data to be written to the accumulator since
+ * it uses a fixed size internal buffer.
+ */
private sealed class BufferWriter : IBufferWriter<byte>
{
private readonly FBMBuffer Buffer;
- public BufferWriter(FBMBuffer buffer)
- {
- Buffer = buffer;
- }
+ public BufferWriter(FBMBuffer buffer) => Buffer = buffer;
- public void Advance(int count)
- {
- //Advance the writer
- Buffer.RequestBuffer.Advance(count);
- }
+ ///<inheritdoc/>
+ public void Advance(int count) => Buffer._requestAccumulator.Advance(count);
- public Memory<byte> GetMemory(int sizeHint = 0)
- {
- //Get the remaining memory segment
- return Buffer._requestAccumulator.RemainingMemory;
- }
+ ///<inheritdoc/>
+ public Memory<byte> GetMemory(int sizeHint = 0) => Buffer._requestAccumulator.RemainingMemory;
- public Span<byte> GetSpan(int sizeHint = 0)
- {
- //Get the remaining data segment
- return Buffer.RequestBuffer.Remaining;
- }
+ ///<inheritdoc/>
+ public Span<byte> GetSpan(int sizeHint = 0) => Buffer._requestAccumulator.Remaining;
}
}
}
diff --git a/lib/Net.Messaging.FBM/src/Client/FBMClient.cs b/lib/Net.Messaging.FBM/src/Client/FBMClient.cs
index 5184c38..c8319fa 100644
--- a/lib/Net.Messaging.FBM/src/Client/FBMClient.cs
+++ b/lib/Net.Messaging.FBM/src/Client/FBMClient.cs
@@ -24,7 +24,6 @@
using System;
using System.IO;
-using System.Buffers;
using System.Threading;
using System.Net.WebSockets;
using System.Threading.Tasks;
@@ -34,9 +33,12 @@ using System.Collections.Concurrent;
using VNLib.Net.Http;
using VNLib.Utils;
using VNLib.Utils.IO;
+using VNLib.Utils.Memory;
+using VNLib.Utils.Memory.Caching;
using VNLib.Utils.Logging;
using VNLib.Utils.Extensions;
-using VNLib.Utils.Memory.Caching;
+
+#pragma warning disable CA2007 // Consider calling ConfigureAwait on the awaited task
namespace VNLib.Net.Messaging.FBM.Client
{
@@ -60,6 +62,8 @@ namespace VNLib.Net.Messaging.FBM.Client
/// </summary>
public const string REQ_MAX_MESS_QUERY_ARG = "mx";
+ public const int MAX_STREAM_BUFFER_SIZE = 128 * 1024;
+
/// <summary>
/// Raised when the websocket has been closed because an error occured.
/// You may inspect the event args to determine the cause of the error.
@@ -74,12 +78,14 @@ namespace VNLib.Net.Messaging.FBM.Client
private readonly SemaphoreSlim SendLock;
private readonly ConcurrentDictionary<int, FBMRequest> ActiveRequests;
private readonly ObjectRental<FBMRequest> RequestRental;
+ private readonly FBMClientConfig _config;
+ private readonly byte[] _streamBuffer;
/// <summary>
/// The configuration for the current client
/// </summary>
- public FBMClientConfig Config { get; }
-
+ public ref readonly FBMClientConfig Config => ref _config;
+
/// <summary>
/// A handle that is reset when a connection has been successfully set, and is set
/// when the connection exists
@@ -103,9 +109,13 @@ namespace VNLib.Net.Messaging.FBM.Client
ConnectionStatusHandle = new(true);
ActiveRequests = new(Environment.ProcessorCount, 100);
- Config = config;
+ _config = config;
//Init the new client socket
ClientSocket = new(config.RecvBufferSize, config.RecvBufferSize, config.KeepAliveInterval, config.SubProtocol);
+
+ //Init the stream buffer
+ int maxStrmBufSize = Math.Min(config.MaxMessageSize, MAX_STREAM_BUFFER_SIZE);
+ _streamBuffer = new byte[maxStrmBufSize];
}
private void Debug(string format, params string[] strings)
@@ -127,7 +137,7 @@ namespace VNLib.Net.Messaging.FBM.Client
/// Allocates and configures a new <see cref="FBMRequest"/> message object for use within the reusable store
/// </summary>
/// <returns>The configured <see cref="FBMRequest"/></returns>
- protected virtual FBMRequest ReuseableRequestConstructor() => new(Config);
+ protected virtual FBMRequest ReuseableRequestConstructor() => new(in _config);
/// <summary>
/// Asynchronously opens a websocket connection with the specifed remote server
@@ -180,10 +190,7 @@ namespace VNLib.Net.Messaging.FBM.Client
/// <exception cref="ObjectDisposedException"></exception>
/// <exception cref="InvalidOperationException"></exception>
/// <exception cref="FBMInvalidRequestException"></exception>
- public Task<FBMResponse> SendAsync(FBMRequest request, CancellationToken cancellationToken = default)
- {
- return SendAsync(request, Config.RequestTimeout, cancellationToken);
- }
+ public Task<FBMResponse> SendAsync(FBMRequest request, CancellationToken cancellationToken = default) => SendAsync(request, _config.RequestTimeout, cancellationToken);
/// <summary>
/// Sends a <see cref="FBMRequest"/> to the connected server
@@ -211,6 +218,8 @@ namespace VNLib.Net.Messaging.FBM.Client
try
{
+ FBMResponse response = new();
+
//Get the request data segment
ReadOnlyMemory<byte> requestData = request.GetRequestData();
@@ -224,22 +233,26 @@ namespace VNLib.Net.Messaging.FBM.Client
}
//wait for the response to be set
- await request.Waiter.WaitAsync(timeout, cancellationToken).ConfigureAwait(true);
+ await request.Waiter.GetTask(timeout, cancellationToken).ConfigureAwait(true);
Debug("Received {size} bytes for message {id}", request.Response?.Length ?? 0, request.MessageId);
- return request.GetResponse();
+ //Get the response data
+ request.GetResponse(ref response);
+
+ return response;
}
catch
{
//Remove the request since packet was never sent
ActiveRequests.Remove(request.MessageId, out _);
-
- //Cleanup waiter
- request.Waiter.OnEndRequest();
-
throw;
}
+ finally
+ {
+ //Always cleanup waiter
+ request.Waiter.OnEndRequest();
+ }
}
/// <summary>
@@ -247,30 +260,28 @@ namespace VNLib.Net.Messaging.FBM.Client
/// </summary>
/// <param name="request">The request message to send to the server</param>
/// <param name="payload">Data to stream to the server</param>
- /// <param name="ct">The content type of the stream of data</param>
+ /// <param name="contentType">The content type of the stream of data</param>
/// <param name="cancellationToken">A token to cancel the operation</param>
/// <returns>A task that resolves when the data is sent and the resonse is received</returns>
/// <exception cref="ArgumentException"></exception>
/// <exception cref="ObjectDisposedException"></exception>
/// <exception cref="InvalidOperationException"></exception>
- public Task StreamDataAsync(FBMRequest request, Stream payload, ContentType ct, CancellationToken cancellationToken = default)
- {
- return StreamDataAsync(request, payload, ct, Config.RequestTimeout, cancellationToken);
- }
+ public Task<FBMResponse> StreamDataAsync(FBMRequest request, Stream payload, ContentType contentType, CancellationToken cancellationToken = default)
+ => StreamDataAsync(request, payload, contentType, _config.RequestTimeout, cancellationToken);
/// <summary>
/// Streams arbitrary binary data to the server with the initial request message
/// </summary>
/// <param name="request">The request message to send to the server</param>
/// <param name="payload">Data to stream to the server</param>
- /// <param name="ct">The content type of the stream of data</param>
+ /// <param name="contentType">The content type of the stream of data</param>
/// <param name="cancellationToken">A token to cancel the operation</param>
/// <param name="timeout">A maxium wait timeout period. If -1 or 0 the timeout is disabled</param>
/// <returns>A task that resolves when the data is sent and the resonse is received</returns>
/// <exception cref="ArgumentException"></exception>
/// <exception cref="ObjectDisposedException"></exception>
/// <exception cref="InvalidOperationException"></exception>
- public async Task StreamDataAsync(FBMRequest request, Stream payload, ContentType ct, TimeSpan timeout, CancellationToken cancellationToken = default)
+ public async Task<FBMResponse> StreamDataAsync(FBMRequest request, Stream payload, ContentType contentType, TimeSpan timeout, CancellationToken cancellationToken = default)
{
Check();
@@ -282,19 +293,15 @@ namespace VNLib.Net.Messaging.FBM.Client
try
{
+ FBMResponse response = new();
+
//Get the request data segment
ReadOnlyMemory<byte> requestData = request.GetRequestData();
Debug("Streaming {bytes} with id {id}", requestData.Length, request.MessageId);
- //Write an empty body in the request
- request.WriteBody(ReadOnlySpan<byte>.Empty, ct);
-
- //Calc buffer size
- int bufSize = (int)Math.Clamp(payload.Length, Config.MessageBufferSize, Config.MaxMessageSize);
-
- //Alloc a streaming buffer
- using IMemoryOwner<byte> buffer = Config.BufferHeap.DirectAlloc<byte>(bufSize);
+ //Write an empty body in the request so a content type header is writen
+ request.WriteBody(ReadOnlySpan<byte>.Empty, contentType);
//Wait for send-lock
using (SemSlimReleaser releaser = await SendLock.GetReleaserAsync(cancellationToken))
@@ -306,7 +313,7 @@ namespace VNLib.Net.Messaging.FBM.Client
do
{
//Read data
- int read = await payload.ReadAsync(buffer.Memory, cancellationToken);
+ int read = await payload.ReadAsync(_streamBuffer, cancellationToken);
if (read == 0)
{
@@ -315,29 +322,32 @@ namespace VNLib.Net.Messaging.FBM.Client
}
//write message to socket, if the read data was smaller than the buffer, we can send the last packet
- await ClientSocket.SendAsync(buffer.Memory[..read], WebSocketMessageType.Binary, read < bufSize, cancellationToken);
+ await ClientSocket.SendAsync(_streamBuffer[..read], WebSocketMessageType.Binary, read < _streamBuffer.Length, cancellationToken);
} while (true);
}
//wait for the server to respond
- await request.Waiter.WaitAsync(timeout, cancellationToken).ConfigureAwait(true);
+ await request.Waiter.GetTask(timeout, cancellationToken).ConfigureAwait(true);
Debug("Response recieved {size} bytes for message {id}", request.Response?.Length ?? 0, request.MessageId);
+
+ request.GetResponse(ref response);
+ return response;
}
catch
{
//Remove the request since packet was never sent or cancelled
_ = ActiveRequests.TryRemove(request.MessageId, out _);
-
- //Cleanup request waiter
- request.Waiter.OnEndRequest();
-
throw;
}
+ finally
+ {
+ //Always cleanup waiter
+ request.Waiter.OnEndRequest();
+ }
}
-
/// <summary>
/// Closes the underlying <see cref="WebSocket"/> and cancels all pending operations
/// </summary>
@@ -399,14 +409,20 @@ namespace VNLib.Net.Messaging.FBM.Client
Debug("Begining receive loop");
//Alloc recv buffer
- IMemoryOwner<byte> recvBuffer = Config.BufferHeap.DirectAlloc<byte>(Config.RecvBufferSize);
+ IFBMMemoryHandle recvBuffer = _config.MemoryManager.InitHandle();
+ _config.MemoryManager.AllocBuffer(recvBuffer, _config.RecvBufferSize);
try
{
+ if(!_config.MemoryManager.TryGetHeap(out IUnmangedHeap? heap))
+ {
+ throw new NotSupportedException("The memory manager must support using IUnmanagedHeaps");
+ }
+
//Recv event loop
while (true)
{
//Listen for incoming packets with the intial data buffer
- ValueWebSocketReceiveResult result = await socket.ReceiveAsync(recvBuffer.Memory, CancellationToken.None);
+ ValueWebSocketReceiveResult result = await socket.ReceiveAsync(recvBuffer.GetMemory(), CancellationToken.None);
//If the message is a close message, its time to exit
if (result.MessageType == WebSocketMessageType.Close)
@@ -422,19 +438,19 @@ namespace VNLib.Net.Messaging.FBM.Client
}
//Alloc data buffer and write initial data
- VnMemoryStream responseBuffer = new(Config.BufferHeap);
+ VnMemoryStream responseBuffer = new(heap);
//Copy initial data
- responseBuffer.Write(recvBuffer.Memory.Span[..result.Count]);
+ responseBuffer.Write(recvBuffer.GetSpan()[..result.Count]);
//Receive packets until the EOF is reached
while (!result.EndOfMessage)
{
//recive more data
- result = await socket.ReceiveAsync(recvBuffer.Memory, CancellationToken.None);
+ result = await socket.ReceiveAsync(recvBuffer.GetMemory(), CancellationToken.None);
//Make sure the buffer is not too large
- if ((responseBuffer.Length + result.Count) > Config.MaxMessageSize)
+ if ((responseBuffer.Length + result.Count) > _config.MaxMessageSize)
{
//Dispose the buffer before exiting
responseBuffer.Dispose();
@@ -443,7 +459,7 @@ namespace VNLib.Net.Messaging.FBM.Client
}
//Copy continuous data
- responseBuffer.Write(recvBuffer.Memory.Span[..result.Count]);
+ responseBuffer.Write(recvBuffer.GetSpan()[..result.Count]);
}
//Reset the buffer stream position
@@ -472,7 +488,7 @@ namespace VNLib.Net.Messaging.FBM.Client
finally
{
//Dispose the recv buffer
- recvBuffer.Dispose();
+ _config.MemoryManager.FreeBuffer(recvBuffer);
//Cleanup the socket when exiting
ClientSocket.Cleanup();
//Set status handle as unset
@@ -522,7 +538,12 @@ namespace VNLib.Net.Messaging.FBM.Client
if (ActiveRequests.TryRemove(messageId, out FBMRequest? request))
{
//Set the new response message
- request.Waiter.Complete(responseMessage);
+ if (!request.Waiter.Complete(responseMessage))
+ {
+ //Falied to complete, dispose the message data
+ responseMessage.Dispose();
+ Debug("Failed to transition waiting request {id}. Message was dropped", messageId, 0);
+ }
}
else
{
diff --git a/lib/Net.Messaging.FBM/src/Client/FBMClientConfig.cs b/lib/Net.Messaging.FBM/src/Client/FBMClientConfig.cs
index 30e9a95..735a0a8 100644
--- a/lib/Net.Messaging.FBM/src/Client/FBMClientConfig.cs
+++ b/lib/Net.Messaging.FBM/src/Client/FBMClientConfig.cs
@@ -25,7 +25,6 @@
using System;
using System.Text;
-using VNLib.Utils.Memory;
using VNLib.Utils.Logging;
namespace VNLib.Net.Messaging.FBM.Client
@@ -59,7 +58,7 @@ namespace VNLib.Net.Messaging.FBM.Client
/// <summary>
/// The heap to allocate internal (and message) buffers from
/// </summary>
- public readonly IUnmangedHeap BufferHeap { get; init; }
+ public readonly IFBMMemoryManager MemoryManager { get; init; }
/// <summary>
/// The websocket keepalive interval to use (leaving this property default disables keepalives)
/// </summary>
diff --git a/lib/Net.Messaging.FBM/src/Client/FBMRequest.cs b/lib/Net.Messaging.FBM/src/Client/FBMRequest.cs
index c4fb493..418a9ec 100644
--- a/lib/Net.Messaging.FBM/src/Client/FBMRequest.cs
+++ b/lib/Net.Messaging.FBM/src/Client/FBMRequest.cs
@@ -26,6 +26,7 @@ using System;
using System.Text;
using System.Buffers;
using System.Threading;
+using System.Diagnostics;
using System.Threading.Tasks;
using System.Collections.Generic;
using System.Runtime.CompilerServices;
@@ -37,7 +38,6 @@ using VNLib.Utils.Memory;
using VNLib.Utils.Extensions;
using VNLib.Utils.Memory.Caching;
-
namespace VNLib.Net.Messaging.FBM.Client
{
/// <summary>
@@ -97,29 +97,29 @@ namespace VNLib.Net.Messaging.FBM.Client
/// <param name="messageId">The custom message id</param>
/// <param name="config">The fbm client config storing required config variables</param>
public FBMRequest(int messageId, in FBMClientConfig config)
- :this(messageId, config.BufferHeap, config.MessageBufferSize, config.HeaderEncoding)
+ :this(messageId, config.MemoryManager, config.MessageBufferSize, config.HeaderEncoding)
{ }
/// <summary>
/// Initializes a new <see cref="FBMRequest"/> with the sepcified message buffer size and a custom MessageId
/// </summary>
/// <param name="messageId">The custom message id</param>
- /// <param name="heap">The heap to allocate the internal buffer from</param>
+ /// <param name="manager">The memory manager used to allocate the internal buffers</param>
/// <param name="bufferSize">The size of the internal buffer</param>
/// <param name="headerEncoding">The encoding instance used for header character encoding</param>
- public FBMRequest(int messageId, IUnmangedHeap heap, int bufferSize, Encoding headerEncoding)
+ public FBMRequest(int messageId, IFBMMemoryManager manager, int bufferSize, Encoding headerEncoding)
{
MessageId = messageId;
- HeaderEncoding = headerEncoding;
+ HeaderEncoding = headerEncoding ?? throw new ArgumentNullException(nameof(headerEncoding));
+ _ = manager ?? throw new ArgumentNullException(nameof(manager));
//Configure waiter
Waiter = new FBMMessageWaiter(this);
-
- //Alloc the buffer as a memory owner so a memory buffer can be used
- IMemoryOwner<byte> HeapBuffer = heap.DirectAlloc<byte>(bufferSize);
- Buffer = new(HeapBuffer);
+
+ Buffer = new(manager, bufferSize);
//Prepare the message incase the request is fresh
+ Buffer.Prepare();
Reset();
}
@@ -152,19 +152,13 @@ namespace VNLib.Net.Messaging.FBM.Client
/// The request message packet, this may cause side effects
/// </summary>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
- public ReadOnlyMemory<byte> GetRequestData()
- {
- return Buffer.RequestData;
- }
+ public ReadOnlyMemory<byte> GetRequestData() => Buffer.RequestData;
/// <summary>
/// Resets the internal buffer and allows for writing a new message with
/// the same message-id
/// </summary>
- public void Reset()
- {
- Buffer.Reset(MessageId);
- }
+ public void Reset() => Buffer.Reset(MessageId);
///<inheritdoc/>
protected override void Free()
@@ -175,7 +169,12 @@ namespace VNLib.Net.Messaging.FBM.Client
(Waiter as FBMMessageWaiter)!.Dispose();
}
- void IReusable.Prepare() => Reset();
+ void IReusable.Prepare()
+ {
+ //MUST BE CALLED FIRST!
+ Buffer.Prepare();
+ Reset();
+ }
bool IReusable.Release()
{
@@ -186,6 +185,9 @@ namespace VNLib.Net.Messaging.FBM.Client
Response?.Dispose();
Response = null;
+ //Free buffer
+ Buffer.Release();
+
return true;
}
@@ -195,7 +197,7 @@ namespace VNLib.Net.Messaging.FBM.Client
/// Gets the response of the sent message
/// </summary>
/// <returns>The response message for the current request</returns>
- internal FBMResponse GetResponse()
+ internal void GetResponse(ref FBMResponse response)
{
if (Response != null)
{
@@ -215,11 +217,7 @@ namespace VNLib.Net.Messaging.FBM.Client
HeaderParseError statusFlags = Helpers.ParseHeaders(Response, Buffer, ResponseHeaderList, HeaderEncoding);
//return response structure
- return new(Response, statusFlags, ResponseHeaderList, OnResponseDisposed);
- }
- else
- {
- return new();
+ response = new(Response, statusFlags, ResponseHeaderList, OnResponseDisposed);
}
}
@@ -272,12 +270,13 @@ namespace VNLib.Net.Messaging.FBM.Client
#endregion
#region waiter
- private sealed class FBMMessageWaiter : IFBMMessageWaiter, IDisposable
+ private sealed class FBMMessageWaiter : IFBMMessageWaiter, IDisposable, IThreadPoolWorkItem
{
private readonly Timer _timer;
private readonly FBMRequest _request;
private TaskCompletionSource? _tcs;
+ private CancellationTokenRegistration _token;
public FBMMessageWaiter(FBMRequest request)
{
@@ -298,79 +297,89 @@ namespace VNLib.Net.Messaging.FBM.Client
public void OnEndRequest()
{
//Cleanup tcs ref
- _ = Interlocked.Exchange(ref _tcs, null);
+ _tcs = null;
- //Stop timer if set
+ //Always stop timer if set
_timer.Stop();
+
+ //Cleanup cancellation token
+ _token.Dispose();
}
///<inheritdoc/>
- public void Complete(VnMemoryStream ms)
+ public bool Complete(VnMemoryStream ms)
{
- //Read the current state of the tcs
TaskCompletionSource? tcs = _tcs;
- if (tcs == null)
+ //Work is done/cancelled
+ if (tcs != null && tcs.Task.IsCompleted)
{
- //Work is done/cancelled, dispose the ms and leave
- ms.Dispose();
+ return false;
}
//Store response
_request.Response = ms;
- //Transition to completed state in background thread
- static void OnTpCallback(object? state)
- {
- _ = (state as TaskCompletionSource)!.TrySetResult();
- }
-
/*
* The calling thread may be a TP thread proccessing an async event loop.
* We do not want to block this worker thread.
*/
- ThreadPool.UnsafeQueueUserWorkItem(OnTpCallback, tcs);
+ return ThreadPool.UnsafeQueueUserWorkItem(this, true);
}
+ /*
+ * Called when scheduled on the TP thread pool
+ */
///<inheritdoc/>
- public async Task WaitAsync(TimeSpan timeout, CancellationToken cancellation)
+ public void Execute() => _tcs?.TrySetResult();
+
+
+ ///<inheritdoc/>
+ public Task GetTask(TimeSpan timeout, CancellationToken cancellation)
{
- if (timeout.Ticks > 0)
- {
- //Restart timer if timeout is configured
- _timer.Restart(timeout);
- }
+ TaskCompletionSource? tcs = _tcs;
- //Confim the token may be cancelled
- if (cancellation.CanBeCanceled)
- {
- //Register cancellation
- using CancellationTokenRegistration reg = cancellation.Register(OnCancelled, this, false);
+ Debug.Assert(tcs != null, "A call to GetTask was made outside of the request flow, the TaskCompletionSource was null");
- //await task that may be canclled
- await _tcs.Task.ConfigureAwait(false);
- }
- else
+ /*
+ * Get task will only be called after the message has been sent.
+ * The Complete method may have already scheduled a completion by
+ * the time this method is called, so we may avoid setting up the
+ * timer and cancellation if possible. Also since this mthod is
+ * called from the request side, we know the tcs cannot be null
+ */
+
+ if (!tcs.Task.IsCompleted)
{
- //await the task directly
- await _tcs.Task.ConfigureAwait(false);
+ if (timeout.Ticks > 0)
+ {
+ //Restart timer if timeout is configured
+ _timer.Restart(timeout);
+ }
+
+ if (cancellation.CanBeCanceled)
+ {
+ //Register cancellation
+ _token = cancellation.Register(OnCancelled, this);
+ }
}
+
+ return tcs.Task;
}
///<inheritdoc/>
public void ManualCancellation() => OnCancelled(this);
- private void OnCancelled(object? state)
- {
- //Set cancelled state if exists
- _ = _tcs?.TrySetCanceled();
- }
+ //Set cancelled state if exists, the task may have already completed
+ private void OnCancelled(object? state) => _tcs?.TrySetCanceled();
///<inheritdoc/>
public void Dispose()
{
_timer.Dispose();
+ _token.Dispose();
}
+
}
#endregion
diff --git a/lib/Net.Messaging.FBM/src/Client/FBMResponse.cs b/lib/Net.Messaging.FBM/src/Client/FBMResponse.cs
index 6f8fec4..f1148f1 100644
--- a/lib/Net.Messaging.FBM/src/Client/FBMResponse.cs
+++ b/lib/Net.Messaging.FBM/src/Client/FBMResponse.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2022 Vaughn Nugent
+* Copyright (c) 2023 Vaughn Nugent
*
* Library: VNLib
* Package: VNLib.Net.Messaging.FBM
@@ -90,17 +90,16 @@ namespace VNLib.Net.Messaging.FBM.Client
/// <summary>
/// Releases any resources associated with the response message
/// </summary>
- public void Dispose() => _onDispose?.Invoke();
+ public readonly void Dispose() => _onDispose?.Invoke();
///<inheritdoc/>
public override bool Equals(object? obj) => obj is FBMResponse response && Equals(response);
///<inheritdoc/>
+ public bool Equals(FBMResponse other) => IsSet && other.IsSet && ReferenceEquals(MessagePacket, other.MessagePacket);
+ ///<inheritdoc/>
public override int GetHashCode() => IsSet ? MessagePacket!.GetHashCode() : 0;
///<inheritdoc/>
public static bool operator ==(FBMResponse left, FBMResponse right) => left.Equals(right);
///<inheritdoc/>
public static bool operator !=(FBMResponse left, FBMResponse right) => !(left == right);
- ///<inheritdoc/>
- public bool Equals(FBMResponse other) => IsSet && other.IsSet && MessagePacket == other.MessagePacket;
-
}
}
diff --git a/lib/Net.Messaging.FBM/src/Client/IFBMMessageWaiter.cs b/lib/Net.Messaging.FBM/src/Client/IFBMMessageWaiter.cs
index 5000711..cc8e1c4 100644
--- a/lib/Net.Messaging.FBM/src/Client/IFBMMessageWaiter.cs
+++ b/lib/Net.Messaging.FBM/src/Client/IFBMMessageWaiter.cs
@@ -48,9 +48,9 @@ namespace VNLib.Net.Messaging.FBM.Client
/// or a timeout
/// </summary>
/// <param name="timeout">The maxium time to wait for the server to respond (may be default/0)</param>
- /// <param name="cancellation">The cancellation token to observe</param>
+ /// <param name="cancellation">A token to cancel the wait task</param>
/// <returns>A task that completes when the server responds</returns>
- Task WaitAsync(TimeSpan timeout, CancellationToken cancellation);
+ Task GetTask(TimeSpan timeout, CancellationToken cancellation);
/// <summary>
/// Called by the client to cleanup the waiter when the request is completed
@@ -63,7 +63,7 @@ namespace VNLib.Net.Messaging.FBM.Client
/// Set by the client when the response has been successfully received by the client
/// </summary>
/// <param name="ms">The response data to pass to the response</param>
- void Complete(VnMemoryStream ms);
+ bool Complete(VnMemoryStream ms);
/// <summary>
/// Called to invoke a manual cancellation of a request waiter. This method should
diff --git a/lib/Net.Messaging.FBM/src/Client/ManagedClientWebSocket.cs b/lib/Net.Messaging.FBM/src/Client/ManagedClientWebSocket.cs
index d0352d3..fc2e417 100644
--- a/lib/Net.Messaging.FBM/src/Client/ManagedClientWebSocket.cs
+++ b/lib/Net.Messaging.FBM/src/Client/ManagedClientWebSocket.cs
@@ -43,7 +43,7 @@ namespace VNLib.Net.Messaging.FBM.Client
private readonly int TxBufferSize;
private readonly int RxBufferSize;
private readonly TimeSpan KeepAliveInterval;
- private readonly VnTempBuffer<byte> _dataBuffer;
+ private readonly ArrayPoolBuffer<byte> _dataBuffer;
private readonly string? _subProtocol;
/// <summary>
@@ -95,7 +95,7 @@ namespace VNLib.Net.Messaging.FBM.Client
try
{
//Set buffer
- _socket.Options.SetBuffer(RxBufferSize, TxBufferSize, _dataBuffer);
+ _socket.Options.SetBuffer(RxBufferSize, TxBufferSize, _dataBuffer.AsArraySegment());
//Set remaining stored options
_socket.Options.ClientCertificates = Certificates;
_socket.Options.KeepAliveInterval = KeepAliveInterval;
diff --git a/lib/Net.Messaging.FBM/src/FBMMessageHeader.cs b/lib/Net.Messaging.FBM/src/FBMMessageHeader.cs
index d1f4f1c..180ce7d 100644
--- a/lib/Net.Messaging.FBM/src/FBMMessageHeader.cs
+++ b/lib/Net.Messaging.FBM/src/FBMMessageHeader.cs
@@ -87,6 +87,7 @@ namespace VNLib.Net.Messaging.FBM
///<inheritdoc/>
public static bool operator ==(FBMMessageHeader left, FBMMessageHeader right) => left.Equals(right);
+
///<inheritdoc/>
public static bool operator !=(FBMMessageHeader left, FBMMessageHeader right) => !(left == right);
diff --git a/lib/Net.Messaging.FBM/src/FallbackFBMMemoryManager.cs b/lib/Net.Messaging.FBM/src/FallbackFBMMemoryManager.cs
new file mode 100644
index 0000000..260cbd6
--- /dev/null
+++ b/lib/Net.Messaging.FBM/src/FallbackFBMMemoryManager.cs
@@ -0,0 +1,140 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Net.Messaging.FBM
+* File: FallbackFBMMemoryManager.cs
+*
+* FallbackFBMMemoryManager.cs is part of VNLib.Net.Messaging.FBM which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+using System.Buffers;
+using System.Diagnostics.CodeAnalysis;
+
+using VNLib.Utils.Memory;
+using VNLib.Utils.Extensions;
+
+namespace VNLib.Net.Messaging.FBM
+{
+ /// <summary>
+ /// A default/fallback implementation of a <see cref="IFBMMemoryManager"/> that
+ /// uses an <see cref="IUnmangedHeap"/> to allocate buffers from
+ /// </summary>
+ public sealed class FallbackFBMMemoryManager : IFBMMemoryManager
+ {
+ private readonly IUnmangedHeap _heap;
+
+ /// <summary>
+ /// Initializes a new instance of <see cref="FallbackFBMMemoryManager"/> allocationg
+ /// memory from the specified <see cref="IUnmangedHeap"/>
+ /// </summary>
+ /// <param name="heap">The heap to allocate memory from</param>
+ /// <exception cref="ArgumentNullException"></exception>
+ public FallbackFBMMemoryManager(IUnmangedHeap heap) => _heap = heap ?? throw new ArgumentNullException(nameof(heap));
+
+ ///<inheritdoc/>
+ public void AllocBuffer(IFBMSpanOnlyMemoryHandle state, int size)
+ {
+ _ = state ?? throw new ArgumentNullException(nameof(state));
+ (state as IFBMBufferHolder)!.AllocBuffer(size);
+ }
+
+ ///<inheritdoc/>
+ public void FreeBuffer(IFBMSpanOnlyMemoryHandle state)
+ {
+ _ = state ?? throw new ArgumentNullException(nameof(state));
+ (state as IFBMBufferHolder)!.FreeBuffer();
+ }
+
+ ///<inheritdoc/>
+ public IFBMMemoryHandle InitHandle() => new FBMMemHandle(_heap);
+
+ ///<inheritdoc/>
+ public IFBMSpanOnlyMemoryHandle InitSpanOnly() => new FBMSpanOnlyMemHandle(_heap);
+
+ ///<inheritdoc/>
+ public bool TryGetHeap([NotNullWhen(true)] out IUnmangedHeap? heap)
+ {
+ heap = _heap;
+ return true;
+ }
+
+ interface IFBMBufferHolder
+ {
+ void AllocBuffer(int size);
+
+ void FreeBuffer();
+ }
+
+ private sealed record class FBMMemHandle(IUnmangedHeap Heap) : IFBMMemoryHandle, IFBMBufferHolder
+ {
+ private MemoryHandle<byte>? _handle;
+ private IMemoryOwner<byte>? _memHandle;
+
+ ///<inheritdoc/>
+ public Memory<byte> GetMemory()
+ {
+ _ = _memHandle ?? throw new InvalidOperationException("Buffer has not allocated");
+ return _memHandle.Memory;
+ }
+
+ ///<inheritdoc/>
+ public Span<byte> GetSpan()
+ {
+ _ = _handle ?? throw new InvalidOperationException("Buffer has not allocated");
+ return _handle.Span;
+ }
+
+ ///<inheritdoc/>
+ public void AllocBuffer(int size)
+ {
+ //Alloc buffer and memory manager to wrap it
+ _handle = Heap.Alloc<byte>(size, false);
+ _memHandle = _handle.ToMemoryManager(false);
+ }
+
+ ///<inheritdoc/>
+ public void FreeBuffer()
+ {
+ _handle?.Dispose();
+ _memHandle?.Dispose();
+
+ _handle = null;
+ _memHandle = null;
+ }
+ }
+
+ private sealed record class FBMSpanOnlyMemHandle(IUnmangedHeap Heap) : IFBMSpanOnlyMemoryHandle, IFBMBufferHolder
+ {
+ private MemoryHandle<byte>? _handle;
+
+ ///<inheritdoc/>
+ public void AllocBuffer(int size) => _handle = Heap.Alloc<byte>(size, false);
+
+ ///<inheritdoc/>
+ public void FreeBuffer() => _handle?.Dispose();
+
+ ///<inheritdoc/>
+ public Span<byte> GetSpan()
+ {
+ _ = _handle ?? throw new InvalidOperationException("Buffer has not allocated");
+ return _handle.Span;
+ }
+ }
+ }
+}
diff --git a/lib/Net.Messaging.FBM/src/IFBMMemoryHandle.cs b/lib/Net.Messaging.FBM/src/IFBMMemoryHandle.cs
new file mode 100644
index 0000000..97a41b4
--- /dev/null
+++ b/lib/Net.Messaging.FBM/src/IFBMMemoryHandle.cs
@@ -0,0 +1,43 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Net.Messaging.FBM
+* File: IFBMMemoryHandle.cs
+*
+* IFBMMemoryHandle.cs is part of VNLib.Net.Messaging.FBM which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+
+
+namespace VNLib.Net.Messaging.FBM
+{
+ /// <summary>
+ /// Represents a configurable handle to a memory block
+ /// </summary>
+ public interface IFBMMemoryHandle : IFBMSpanOnlyMemoryHandle
+ {
+ /// <summary>
+ /// Gets the block as a <see cref="Memory{T}"/>
+ /// structure
+ /// </summary>
+ /// <returns>The memory structure wrapping the underlying memory block</returns>
+ Memory<byte> GetMemory();
+ }
+
+}
diff --git a/lib/Net.Messaging.FBM/src/IFBMMemoryManager.cs b/lib/Net.Messaging.FBM/src/IFBMMemoryManager.cs
new file mode 100644
index 0000000..9342993
--- /dev/null
+++ b/lib/Net.Messaging.FBM/src/IFBMMemoryManager.cs
@@ -0,0 +1,71 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Net.Messaging.FBM
+* File: IFBMMemoryManager.cs
+*
+* IFBMMemoryManager.cs is part of VNLib.Net.Messaging.FBM which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System.Diagnostics.CodeAnalysis;
+
+using VNLib.Utils.Memory;
+
+namespace VNLib.Net.Messaging.FBM
+{
+ /// <summary>
+ /// Manages memory blocks required by the FBM server messages
+ /// </summary>
+ public interface IFBMMemoryManager
+ {
+ /// <summary>
+ /// Initializes a new <see cref="IFBMMemoryHandle"/>
+ /// </summary>
+ /// <returns>The initialized handle</returns>
+ IFBMMemoryHandle InitHandle();
+
+ /// <summary>
+ /// Initializes a new <see cref="IFBMSpanOnlyMemoryHandle"/>
+ /// </summary>
+ /// <returns>The initialized handle</returns>
+ IFBMSpanOnlyMemoryHandle InitSpanOnly();
+
+ /// <summary>
+ /// Allocates the <see cref="IFBMMemoryHandle"/> internal buffer
+ /// for use
+ /// </summary>
+ /// <param name="state">The memory handle to allocate the buffer for</param>
+ /// <param name="size">The size of the buffer required</param>
+ void AllocBuffer(IFBMSpanOnlyMemoryHandle state, int size);
+
+ /// <summary>
+ /// Frees the <see cref="IFBMSpanOnlyMemoryHandle"/> internal buffer
+ /// </summary>
+ /// <param name="state">The buffer handle holding the memory to free</param>
+ void FreeBuffer(IFBMSpanOnlyMemoryHandle state);
+
+ /// <summary>
+ /// Tries to get the internal <see cref="IUnmangedHeap"/> to allocate internal
+ /// buffers from
+ /// </summary>
+ /// <param name="heap">The internal heap</param>
+ /// <returns>A value that indicates if a backing heap is supported and can be recovered</returns>
+ bool TryGetHeap([NotNullWhen(true)]out IUnmangedHeap? heap);
+ }
+
+}
diff --git a/lib/Net.Messaging.FBM/src/Client/IFBMMessage.cs b/lib/Net.Messaging.FBM/src/IFBMMessage.cs
index 18f19ec..dba605d 100644
--- a/lib/Net.Messaging.FBM/src/Client/IFBMMessage.cs
+++ b/lib/Net.Messaging.FBM/src/IFBMMessage.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2022 Vaughn Nugent
+* Copyright (c) 2023 Vaughn Nugent
*
* Library: VNLib
* Package: VNLib.Net.Messaging.FBM
@@ -26,7 +26,7 @@ using System;
using VNLib.Net.Http;
-namespace VNLib.Net.Messaging.FBM.Client
+namespace VNLib.Net.Messaging.FBM
{
/// <summary>
/// Represents basic Fixed Buffer Message protocol operations
diff --git a/lib/Net.Messaging.FBM/src/IFBMSpanOnlyMemoryHandle.cs b/lib/Net.Messaging.FBM/src/IFBMSpanOnlyMemoryHandle.cs
new file mode 100644
index 0000000..0078357
--- /dev/null
+++ b/lib/Net.Messaging.FBM/src/IFBMSpanOnlyMemoryHandle.cs
@@ -0,0 +1,42 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Net.Messaging.FBM
+* File: IFBMSpanOnlyMemoryHandle.cs
+*
+* IFBMSpanOnlyMemoryHandle.cs is part of VNLib.Net.Messaging.FBM which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+
+
+namespace VNLib.Net.Messaging.FBM
+{
+ /// <summary>
+ /// Represents a configurable handle to a memory block
+ /// </summary>
+ public interface IFBMSpanOnlyMemoryHandle
+ {
+ /// <summary>
+ /// Gets the block as a <see cref="Span{T}"/>
+ /// </summary>
+ /// <returns>The memory block as a span</returns>
+ Span<byte> GetSpan();
+ }
+
+}
diff --git a/lib/Net.Messaging.FBM/src/Server/FBMContext.cs b/lib/Net.Messaging.FBM/src/Server/FBMContext.cs
index 6d5f3bd..f2a2fea 100644
--- a/lib/Net.Messaging.FBM/src/Server/FBMContext.cs
+++ b/lib/Net.Messaging.FBM/src/Server/FBMContext.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2022 Vaughn Nugent
+* Copyright (c) 2023 Vaughn Nugent
*
* Library: VNLib
* Package: VNLib.Net.Messaging.FBM
@@ -55,10 +55,11 @@ namespace VNLib.Net.Messaging.FBM.Server
/// <param name="requestHeaderBufferSize">The size in characters of the request header buffer</param>
/// <param name="responseBufferSize">The size in characters of the response header buffer</param>
/// <param name="headerEncoding">The message header encoding instance</param>
- public FBMContext(int requestHeaderBufferSize, int responseBufferSize, Encoding headerEncoding)
+ /// <param name="manager">The context memory manager</param>
+ public FBMContext(int requestHeaderBufferSize, int responseBufferSize, Encoding headerEncoding, IFBMMemoryManager manager)
{
- _request = Request = new(requestHeaderBufferSize);
- _response = Response = new(responseBufferSize, headerEncoding);
+ _request = Request = new(requestHeaderBufferSize, manager);
+ _response = Response = new(responseBufferSize, headerEncoding, manager);
_headerEncoding = headerEncoding;
}
diff --git a/lib/Net.Messaging.FBM/src/Server/FBMListener.cs b/lib/Net.Messaging.FBM/src/Server/FBMListener.cs
index 46ee160..30fa1ac 100644
--- a/lib/Net.Messaging.FBM/src/Server/FBMListener.cs
+++ b/lib/Net.Messaging.FBM/src/Server/FBMListener.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2022 Vaughn Nugent
+* Copyright (c) 2023 Vaughn Nugent
*
* Library: VNLib
* Package: VNLib.Net.Messaging.FBM
@@ -24,7 +24,6 @@
using System;
using System.IO;
-using System.Buffers;
using System.Threading;
using System.Net.WebSockets;
using System.Threading.Tasks;
@@ -32,7 +31,6 @@ using System.Threading.Tasks;
using VNLib.Utils.IO;
using VNLib.Utils.Async;
using VNLib.Utils.Memory;
-using VNLib.Utils.Extensions;
using VNLib.Utils.Memory.Caching;
using VNLib.Plugins.Essentials;
@@ -40,16 +38,6 @@ namespace VNLib.Net.Messaging.FBM.Server
{
/// <summary>
- /// Method delegate for processing FBM messages from an <see cref="FBMListener"/>
- /// when messages are received
- /// </summary>
- /// <param name="context">The message/connection context</param>
- /// <param name="userState">The state parameter passed on client connected</param>
- /// <param name="cancellationToken">A token that reflects the state of the listener</param>
- /// <returns>A <see cref="Task"/> that resolves when processing is complete</returns>
- public delegate Task RequestHandler(FBMContext context, object? userState, CancellationToken cancellationToken);
-
- /// <summary>
/// A FBM protocol listener. Listens for messages on a <see cref="WebSocketSession"/>
/// and raises events on requests.
/// </summary>
@@ -58,22 +46,16 @@ namespace VNLib.Net.Messaging.FBM.Server
public const int SEND_SEMAPHORE_TIMEOUT_MS = 10 * 1000;
- private readonly IUnmangedHeap Heap;
-
- /// <summary>
- /// Raised when a response processing error occured
- /// </summary>
- public event EventHandler<Exception>? OnProcessError;
+ private readonly IFBMMemoryManager MemoryManger;
/// <summary>
/// Creates a new <see cref="FBMListener"/> instance ready for
/// processing connections
/// </summary>
/// <param name="heap">The heap to alloc buffers from</param>
- public FBMListener(IUnmangedHeap heap)
- {
- Heap = heap;
- }
+ /// <exception cref="ArgumentNullException"></exception>
+ public FBMListener(IFBMMemoryManager heap) => MemoryManger = heap ?? throw new ArgumentNullException(nameof(heap));
+
#pragma warning disable CA2007 // Consider calling ConfigureAwait on the awaited task
/// <summary>
@@ -83,31 +65,38 @@ namespace VNLib.Net.Messaging.FBM.Server
/// <param name="wss">The <see cref="WebSocketSession"/> to receive messages on</param>
/// <param name="handler">The callback method to handle incoming requests</param>
/// <param name="args">The arguments used to configured this listening session</param>
- /// <param name="userState">A state parameter</param>
/// <returns>A <see cref="Task"/> that completes when the connection closes</returns>
- public async Task ListenAsync(WebSocketSession wss, RequestHandler handler, FBMListenerSessionParams args, object? userState)
+ public async Task ListenAsync(WebSocketSession wss, IFBMServerMessageHandler handler, FBMListenerSessionParams args)
{
_ = wss ?? throw new ArgumentNullException(nameof(wss));
_ = handler ?? throw new ArgumentNullException(nameof(handler));
- ListeningSession session = new(wss, handler, in args, userState);
-
- //Alloc a recieve buffer
- using IMemoryOwner<byte> recvBuffer = Heap.DirectAlloc<byte>(args.RecvBufferSize);
+ ListeningSession session = new(wss, handler, in args, MemoryManger);
//Init new queue for dispatching work
AsyncQueue<VnMemoryStream> workQueue = new(true, true);
//Start a task to process the queue
Task queueWorker = QueueWorkerDoWork(workQueue, session);
-
+
+ //Alloc buffer
+ IFBMMemoryHandle memHandle = MemoryManger.InitHandle();
+ MemoryManger.AllocBuffer(memHandle, args.RecvBufferSize);
+
try
{
+ if(!MemoryManger.TryGetHeap(out IUnmangedHeap? heap))
+ {
+ throw new NotSupportedException("The memory manager must export an unmanaged heap");
+ }
+
+ Memory<byte> recvBuffer = memHandle.GetMemory();
+
//Listen for incoming messages
while (true)
{
//Receive a message
- ValueWebSocketReceiveResult result = await wss.ReceiveAsync(recvBuffer.Memory);
+ ValueWebSocketReceiveResult result = await wss.ReceiveAsync(recvBuffer);
//If a close message has been received, we can gracefully exit
if (result.MessageType == WebSocketMessageType.Close)
{
@@ -118,13 +107,13 @@ namespace VNLib.Net.Messaging.FBM.Server
}
//create buffer for storing data, pre alloc with initial data
- VnMemoryStream request = new(Heap, recvBuffer.Memory[..result.Count]);
+ VnMemoryStream request = new(heap, recvBuffer[..result.Count]);
//Streaming read
while (!result.EndOfMessage)
{
//Read more data
- result = await wss.ReceiveAsync(recvBuffer.Memory);
+ result = await wss.ReceiveAsync(recvBuffer);
//Make sure the request is small enough to buffer
if (request.Length + result.Count > args.MaxMessageSize)
{
@@ -135,8 +124,9 @@ namespace VNLib.Net.Messaging.FBM.Server
//break listen loop
goto Exit;
}
+
//write to buffer
- request.Write(recvBuffer.Memory.Span[..result.Count]);
+ request.Write(memHandle.GetSpan()[..result.Count]);
}
//Make sure data is available
if (request.Length == 0)
@@ -195,14 +185,19 @@ namespace VNLib.Net.Messaging.FBM.Server
if ((context.Request.ParseStatus & HeaderParseError.InvalidId) > 0)
{
- OnProcessError?.Invoke(this, new FBMException($"Invalid messageid {context.Request.MessageId}, message length {data.Length}"));
- return;
+ Exception cause = new FBMException($"Invalid messageid {context.Request.MessageId}, message length {data.Length}");
+ _ = session.Handler.OnInvalidMessage(context, cause);
+ return; //Cannot continue on invalid message id
}
//Check parse status flags
if ((context.Request.ParseStatus & HeaderParseError.HeaderOutOfMem) > 0)
{
- OnProcessError?.Invoke(this, new FBMException("Packet received with not enough space to store headers"));
+ Exception cause = new FBMException("Packet received with not enough space to store headers");
+ if(!session.Handler.OnInvalidMessage(context, cause))
+ {
+ return;
+ }
}
//Determine if request is an out-of-band message
else if (context.Request.MessageId == Helpers.CONTROL_FRAME_MID)
@@ -213,18 +208,15 @@ namespace VNLib.Net.Messaging.FBM.Server
else
{
//Invoke normal message handler
- await session.OnRecieved.Invoke(context, session.UserState, session.CancellationToken);
+ await session.Handler.HandleMessage(context, session.CancellationToken);
}
- //Get response data
-
- await using IAsyncMessageReader messageEnumerator = await context.Response.GetResponseDataAsync(session.CancellationToken);
-
+ //Get response data reader
+ await using IAsyncMessageReader messageEnumerator = context.Response.GetResponseData();
//Load inital segment
if (await messageEnumerator.MoveNextAsync() && !session.CancellationToken.IsCancellationRequested)
- {
- ValueTask sendTask;
+ {
//Syncrhonize access to send data because we may need to stream data to the client
await session.ResponseLock.WaitAsync(SEND_SEMAPHORE_TIMEOUT_MS);
@@ -233,10 +225,8 @@ namespace VNLib.Net.Messaging.FBM.Server
{
do
{
- bool eof = !messageEnumerator.DataRemaining;
-
- //Send first segment
- sendTask = session.Socket.SendAsync(messageEnumerator.Current, WebSocketMessageType.Binary, eof);
+ //Send current segment
+ await session.Socket.SendAsync(messageEnumerator.Current, WebSocketMessageType.Binary, !messageEnumerator.DataRemaining);
/*
* WARNING!
@@ -250,9 +240,6 @@ namespace VNLib.Net.Messaging.FBM.Server
{
break;
}
-
- //Await previous send
- await sendTask;
} while (true);
}
@@ -261,15 +248,13 @@ namespace VNLib.Net.Messaging.FBM.Server
//release semaphore
session.ResponseLock.Release();
}
-
- await sendTask;
}
//No data to send
}
catch (Exception ex)
{
- OnProcessError?.Invoke(this, ex);
+ session.Handler.OnProcessError(ex);
}
finally
{
@@ -295,25 +280,23 @@ namespace VNLib.Net.Messaging.FBM.Server
private readonly CancellationTokenSource Cancellation;
private readonly CancellationTokenRegistration Registration;
private readonly FBMListenerSessionParams Params;
+ private readonly IFBMMemoryManager MemManager;
- public readonly object? UserState;
-
public readonly SemaphoreSlim ResponseLock;
public readonly WebSocketSession Socket;
- public readonly RequestHandler OnRecieved;
+ public readonly IFBMServerMessageHandler Handler;
public CancellationToken CancellationToken => Cancellation.Token;
-
- public ListeningSession(WebSocketSession session, RequestHandler onRecieved, in FBMListenerSessionParams args, object? userState)
+ public ListeningSession(WebSocketSession session, IFBMServerMessageHandler handler, in FBMListenerSessionParams args, IFBMMemoryManager memManager)
{
Params = args;
Socket = session;
- UserState = userState;
- OnRecieved = onRecieved;
+ Handler = handler;
+ MemManager = memManager;
//Create cancellation and register for session close
Cancellation = new();
@@ -323,7 +306,7 @@ namespace VNLib.Net.Messaging.FBM.Server
CtxStore = ObjectRental.CreateReusable(ContextCtor);
}
- private FBMContext ContextCtor() => new(Params.MaxHeaderBufferSize, Params.ResponseBufferSize, Params.HeaderEncoding);
+ private FBMContext ContextCtor() => new(Params.MaxHeaderBufferSize, Params.ResponseBufferSize, Params.HeaderEncoding, MemManager);
/// <summary>
/// Cancels any pending opreations relating to the current session
@@ -358,7 +341,6 @@ namespace VNLib.Net.Messaging.FBM.Server
/// <exception cref="ObjectDisposedException"></exception>
public FBMContext RentContext()
{
-
if (Cancellation.IsCancellationRequested)
{
throw new ObjectDisposedException("The instance has been disposed");
diff --git a/lib/Net.Messaging.FBM/src/Server/FBMListenerBase.cs b/lib/Net.Messaging.FBM/src/Server/FBMListenerBase.cs
index 3e9fde2..71b1c8f 100644
--- a/lib/Net.Messaging.FBM/src/Server/FBMListenerBase.cs
+++ b/lib/Net.Messaging.FBM/src/Server/FBMListenerBase.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2022 Vaughn Nugent
+* Copyright (c) 2023 Vaughn Nugent
*
* Library: VNLib
* Package: VNLib.Net.Messaging.FBM
@@ -27,67 +27,29 @@ using System.Threading;
using System.Threading.Tasks;
using VNLib.Utils.Logging;
-using VNLib.Utils.Memory;
using VNLib.Plugins.Essentials;
namespace VNLib.Net.Messaging.FBM.Server
{
+
/// <summary>
/// Provides a simple base class for an <see cref="FBMListener"/>
/// processor
/// </summary>
- public abstract class FBMListenerBase
+ public abstract class FBMListenerBase<T> : IFBMServerErrorHandler
{
/// <summary>
/// The initialzied listener
/// </summary>
- protected FBMListener? Listener { get; private set; }
+ protected abstract FBMListener Listener { get; }
+
/// <summary>
/// A provider to write log information to
/// </summary>
protected abstract ILogProvider Log { get; }
/// <summary>
- /// Initializes the <see cref="FBMListener"/>
- /// </summary>
- /// <param name="heap">The heap to alloc buffers from</param>
- protected void InitListener(IUnmangedHeap heap)
- {
- Listener = new(heap);
- //Attach service handler
- Listener.OnProcessError += Listener_OnProcessError;
- }
-
- /// <summary>
- /// A single event service routine for servicing errors that occur within
- /// the listener loop
- /// </summary>
- /// <param name="sender"></param>
- /// <param name="e">The exception that was raised</param>
- protected virtual void Listener_OnProcessError(object? sender, Exception e)
- {
- //Write the error to the log
- Log.Error(e);
- }
-
- private async Task OnReceivedAsync(FBMContext context, object? userState, CancellationToken token)
- {
- try
- {
- await ProcessAsync(context, userState, token);
- }
- catch (OperationCanceledException)
- {
- Log.Debug("Async operation cancelled");
- }
- catch(Exception ex)
- {
- Log.Error(ex);
- }
- }
-
- /// <summary>
/// Begins listening for requests on the current websocket until
/// a close message is received or an error occurs
/// </summary>
@@ -95,10 +57,13 @@ namespace VNLib.Net.Messaging.FBM.Server
/// <param name="args">The arguments used to configured this listening session</param>
/// <param name="userState">A state token to use for processing events for this connection</param>
/// <returns>A <see cref="Task"/> that completes when the connection closes</returns>
- public virtual async Task ListenAsync(WebSocketSession wss, FBMListenerSessionParams args, object? userState)
+ /// <exception cref="InvalidOperationException"></exception>
+ public virtual Task ListenAsync(WebSocketSession wss, T userState, FBMListenerSessionParams args)
{
_ = Listener ?? throw new InvalidOperationException("The listener has not been intialized");
- await Listener.ListenAsync(wss, OnReceivedAsync, args, userState);
+ //Initn new event handler
+ FBMEventHandler handler = new(userState, this);
+ return Listener.ListenAsync(wss, handler, args);
}
/// <summary>
@@ -108,6 +73,30 @@ namespace VNLib.Net.Messaging.FBM.Server
/// <param name="userState">A state token passed on client connected</param>
/// <param name="exitToken">A token that reflects the state of the listener</param>
/// <returns>A task that completes when the message has been serviced</returns>
- protected abstract Task ProcessAsync(FBMContext context, object? userState, CancellationToken exitToken);
+ protected abstract Task ProcessAsync(FBMContext context, T? userState, CancellationToken exitToken);
+
+ ///<inheritdoc/>
+ public virtual bool OnInvalidMessage(FBMContext context, Exception ex)
+ {
+ Log.Error("Invalid message received for session {ses}\n{ex}", context.Request.ConnectionId, ex);
+ //Invalid id should be captured already, so if oom, do not allow, but if a single header is invalid, it will be ignored by default
+ return !context.Request.ParseStatus.HasFlag(HeaderParseError.HeaderOutOfMem);
+ }
+
+ ///<inheritdoc/>
+ public virtual void OnProcessError(Exception ex) => Log.Error(ex);
+
+
+ private sealed record class FBMEventHandler(T State, FBMListenerBase<T> Lb) : IFBMServerMessageHandler
+ {
+ ///<inheritdoc/>
+ public Task HandleMessage(FBMContext context, CancellationToken cancellationToken) => Lb.ProcessAsync(context, State, cancellationToken);
+
+ ///<inheritdoc/>
+ public bool OnInvalidMessage(FBMContext context, Exception ex) => Lb.OnInvalidMessage(context, ex);
+
+ ///<inheritdoc/>
+ public void OnProcessError(Exception ex) => Lb.OnProcessError(ex);
+ }
}
}
diff --git a/lib/Net.Messaging.FBM/src/Server/FBMListenerSessionParams.cs b/lib/Net.Messaging.FBM/src/Server/FBMListenerSessionParams.cs
index ccf79db..0b4fa5b 100644
--- a/lib/Net.Messaging.FBM/src/Server/FBMListenerSessionParams.cs
+++ b/lib/Net.Messaging.FBM/src/Server/FBMListenerSessionParams.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2022 Vaughn Nugent
+* Copyright (c) 2023 Vaughn Nugent
*
* Library: VNLib
* Package: VNLib.Net.Messaging.FBM
@@ -30,7 +30,7 @@ namespace VNLib.Net.Messaging.FBM.Server
/// Represents a configuration structure for an <see cref="FBMListener"/>
/// listening session
/// </summary>
- public readonly struct FBMListenerSessionParams
+ public readonly record struct FBMListenerSessionParams
{
/// <summary>
/// The size of the buffer to use while reading data from the websocket
diff --git a/lib/Net.Messaging.FBM/src/Server/FBMRequestMessage.cs b/lib/Net.Messaging.FBM/src/Server/FBMRequestMessage.cs
index db0655a..e9ff9f5 100644
--- a/lib/Net.Messaging.FBM/src/Server/FBMRequestMessage.cs
+++ b/lib/Net.Messaging.FBM/src/Server/FBMRequestMessage.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2022 Vaughn Nugent
+* Copyright (c) 2023 Vaughn Nugent
*
* Library: VNLib
* Package: VNLib.Net.Messaging.FBM
@@ -24,13 +24,12 @@
using System;
using System.Text;
-using System.Buffers;
-using System.Text.Json;
using System.Collections.Generic;
+using System.Runtime.InteropServices;
using System.Runtime.CompilerServices;
using VNLib.Utils.IO;
-using VNLib.Utils.Extensions;
+using VNLib.Utils.Memory;
using VNLib.Utils.Memory.Caching;
namespace VNLib.Net.Messaging.FBM.Server
@@ -42,18 +41,21 @@ namespace VNLib.Net.Messaging.FBM.Server
{
private readonly List<FBMMessageHeader> _headers;
private readonly int HeaderBufferSize;
+ private readonly IFBMMemoryManager _memoryManager;
+ private readonly IFBMSpanOnlyMemoryHandle _memHandle;
/// <summary>
/// Creates a new resusable <see cref="FBMRequestMessage"/>
/// </summary>
/// <param name="headerBufferSize">The size of the buffer to alloc during initialization</param>
- internal FBMRequestMessage(int headerBufferSize)
+ /// <param name="manager">The memory manager to use for the message</param>
+ internal FBMRequestMessage(int headerBufferSize, IFBMMemoryManager manager)
{
HeaderBufferSize = headerBufferSize;
_headers = new();
+ _memoryManager = manager;
+ _memHandle = _memoryManager.InitSpanOnly();
}
-
- private char[]? _headerBuffer;
/// <summary>
/// The ID of the current message
@@ -115,35 +117,13 @@ namespace VNLib.Net.Messaging.FBM.Server
//Parse headers
ParseStatus = Helpers.ParseHeaders(vms, this, _headers, dataEncoding);
}
-
- /// <summary>
- /// Deserializes the request body into a new specified object type
- /// </summary>
- /// <typeparam name="T">The type of the object to deserialize</typeparam>
- /// <param name="jso">The <see cref="JsonSerializerOptions"/> to use while deserializing data</param>
- /// <returns>The deserialized object from the request body</returns>
- /// <exception cref="JsonException"></exception>
- public T? DeserializeBody<T>(JsonSerializerOptions? jso = default)
- {
- return BodyData.IsEmpty ? default : BodyData.AsJsonObject<T>(jso);
- }
-
- /// <summary>
- /// Gets a <see cref="JsonDocument"/> of the request body
- /// </summary>
- /// <returns>The parsed <see cref="JsonDocument"/> if parsed successfully, or null otherwise</returns>
- /// <exception cref="JsonException"></exception>
- public JsonDocument? GetBodyAsJson()
- {
- Utf8JsonReader reader = new(BodyData);
- return JsonDocument.TryParseValue(ref reader, out JsonDocument? jdoc) ? jdoc : default;
- }
+
void IReusable.Prepare()
{
ParseStatus = HeaderParseError.None;
//Alloc header buffer
- _headerBuffer = ArrayPool<char>.Shared.Rent(HeaderBufferSize);
+ _memoryManager.AllocBuffer(_memHandle, MemoryUtil.ByteCount<char>(HeaderBufferSize));
}
@@ -155,8 +135,7 @@ namespace VNLib.Net.Messaging.FBM.Server
//Clear headers before freeing buffer
_headers.Clear();
//Free header-buffer
- ArrayPool<char>.Shared.Return(_headerBuffer!);
- _headerBuffer = null;
+ _memoryManager.FreeBuffer(_memHandle);
ConnectionId = null;
MessageId = 0;
IsControlFrame = false;
@@ -165,11 +144,16 @@ namespace VNLib.Net.Messaging.FBM.Server
[MethodImpl(MethodImplOptions.AggressiveInlining)]
- Span<char> IFBMHeaderBuffer.GetSpan(int offset, int count)
- => _headerBuffer != null ? _headerBuffer.AsSpan(offset, count) : throw new InvalidOperationException("The buffer is no longer available");
+ Span<char> IFBMHeaderBuffer.GetSpan(int offset, int count)
+ {
+ //Cast to char buffer
+ Span<char> chars = MemoryMarshal.Cast<byte, char>(_memHandle.GetSpan());
+ //Return the requested span
+ return chars.Slice(offset, count);
+ }
[MethodImpl(MethodImplOptions.AggressiveInlining)]
- Span<char> IFBMHeaderBuffer.GetSpan() => _headerBuffer ?? throw new InvalidOperationException("The buffer is no longer available");
+ Span<char> IFBMHeaderBuffer.GetSpan() => MemoryMarshal.Cast<byte, char>(_memHandle.GetSpan());
}
}
diff --git a/lib/Net.Messaging.FBM/src/Server/FBMResponseMessage.cs b/lib/Net.Messaging.FBM/src/Server/FBMResponseMessage.cs
index 9ca6b4d..1e26140 100644
--- a/lib/Net.Messaging.FBM/src/Server/FBMResponseMessage.cs
+++ b/lib/Net.Messaging.FBM/src/Server/FBMResponseMessage.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2022 Vaughn Nugent
+* Copyright (c) 2023 Vaughn Nugent
*
* Library: VNLib
* Package: VNLib.Net.Messaging.FBM
@@ -24,14 +24,12 @@
using System;
using System.Text;
-using System.Threading;
using System.Threading.Tasks;
using VNLib.Net.Http;
using VNLib.Utils.IO;
using VNLib.Utils.Extensions;
using VNLib.Utils.Memory.Caching;
-using VNLib.Net.Messaging.FBM.Client;
namespace VNLib.Net.Messaging.FBM.Server
{
@@ -41,9 +39,9 @@ namespace VNLib.Net.Messaging.FBM.Server
/// </summary>
public sealed class FBMResponseMessage : IReusable, IFBMMessage
{
- internal FBMResponseMessage(int internalBufferSize, Encoding headerEncoding)
+ internal FBMResponseMessage(int internalBufferSize, Encoding headerEncoding, IFBMMemoryManager manager)
{
- _headerAccumulator = new HeaderDataAccumulator(internalBufferSize);
+ _headerAccumulator = new HeaderDataAccumulator(internalBufferSize, manager);
_headerEncoding = headerEncoding;
_messageEnumerator = new(this);
}
@@ -134,92 +132,76 @@ namespace VNLib.Net.Messaging.FBM.Server
/// <summary>
/// Gets the internal message body enumerator and prepares the message for sending
/// </summary>
- /// <param name="cancellationToken">A cancellation token</param>
/// <returns>A value task that returns the message body enumerator</returns>
- internal async ValueTask<IAsyncMessageReader> GetResponseDataAsync(CancellationToken cancellationToken)
- {
- //try to buffer as much data in the header segment first
- if(MessageBody?.RemainingSize > 0 && _headerAccumulator.RemainingSize > 0)
- {
- //Read data from the message
- int read = await MessageBody.ReadAsync(_headerAccumulator.RemainingBuffer, cancellationToken);
-
- //Advance accumulator to the read bytes
- _headerAccumulator.Advance(read);
- }
- //return reusable enumerator
- return _messageEnumerator;
- }
+ internal IAsyncMessageReader GetResponseData() => _messageEnumerator;
private sealed class MessageSegmentEnumerator : IAsyncMessageReader
{
private readonly FBMResponseMessage _message;
+ private readonly ISlindingWindowBuffer<byte> _accumulator;
bool HeadersRead;
public MessageSegmentEnumerator(FBMResponseMessage message)
{
_message = message;
+ _accumulator = _message._headerAccumulator;
}
- public ReadOnlyMemory<byte> Current { get; private set; }
+ ///<inheritdoc/>
+ public ReadOnlyMemory<byte> Current => _accumulator.AccumulatedBuffer;
- public bool DataRemaining { get; private set; }
+ ///<inheritdoc/>
+ public bool DataRemaining => _message.MessageBody?.RemainingSize > 0;
+ ///<inheritdoc/>
public async ValueTask<bool> MoveNextAsync()
{
//Attempt to read header segment first
if (!HeadersRead)
{
- //Set the accumulated buffer
- Current = _message._headerAccumulator.AccumulatedBuffer;
+ /*
+ * If headers have not been read yet, we can attempt to buffer as much
+ * of the message body into the header accumulator buffer as possible. This will
+ * reduce message fragmentation.
+ */
+ if (DataRemaining && _accumulator.RemainingSize > 0)
+ {
+ int read = await _message.MessageBody.ReadAsync(_accumulator.RemainingBuffer).ConfigureAwait(false);
- //Update data remaining flag
- DataRemaining = _message.MessageBody?.RemainingSize > 0;
+ //Advance accumulator to the read bytes
+ _accumulator.Advance(read);
+ }
//Set headers read flag
HeadersRead = true;
return true;
}
- else if (_message.MessageBody?.RemainingSize > 0)
+ else if (DataRemaining)
{
- //Use the header buffer as the buffer for the message body
- Memory<byte> buffer = _message._headerAccumulator.Buffer;
+ //Reset the accumulator so we can read another segment
+ _accumulator.Reset();
//Read body segment
- int read = await _message.MessageBody.ReadAsync(buffer);
+ int read = await _message.MessageBody.ReadAsync(_accumulator.RemainingBuffer);
- //Update data remaining flag
- DataRemaining = _message.MessageBody.RemainingSize > 0;
+ //Advance accumulator to the read bytes
+ _accumulator.Advance(read);
- if (read > 0)
- {
- //Store the read segment
- Current = buffer[..read];
- return true;
- }
+ return read > 0;
}
return false;
}
+ ///<inheritdoc/>
public ValueTask DisposeAsync()
{
- //Clear current segment
- Current = default;
-
//Reset headers read flag
HeadersRead = false;
-
+
//Dispose the message body if set
- if (_message.MessageBody != null)
- {
- return _message.MessageBody.DisposeAsync();
- }
- else
- {
- return ValueTask.CompletedTask;
- }
+ return _message.MessageBody != null ? _message.MessageBody.DisposeAsync() : ValueTask.CompletedTask;
}
}
}
diff --git a/lib/Net.Messaging.FBM/src/Server/HeaderDataAccumulator.cs b/lib/Net.Messaging.FBM/src/Server/HeaderDataAccumulator.cs
index 423a26e..891c583 100644
--- a/lib/Net.Messaging.FBM/src/Server/HeaderDataAccumulator.cs
+++ b/lib/Net.Messaging.FBM/src/Server/HeaderDataAccumulator.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2022 Vaughn Nugent
+* Copyright (c) 2023 Vaughn Nugent
*
* Library: VNLib
* Package: VNLib.Net.Messaging.FBM
@@ -23,25 +23,28 @@
*/
using System;
-using System.Buffers;
using VNLib.Utils.IO;
namespace VNLib.Net.Messaging.FBM.Server
{
+
/// <summary>
/// Reusable sliding window impl
/// </summary>
internal sealed class HeaderDataAccumulator : ISlindingWindowBuffer<byte>
{
- private readonly int BufferSize;
-
- private byte[]? _memHandle;
+ private readonly int _bufferSize;
+ private readonly IFBMMemoryManager _memManager;
+ private readonly IFBMMemoryHandle _handle;
+
- public HeaderDataAccumulator(int bufferSize)
+ public HeaderDataAccumulator(int bufferSize, IFBMMemoryManager memManager)
{
- BufferSize = bufferSize;
+ _bufferSize = bufferSize;
+ _memManager = memManager;
+ _handle = memManager.InitHandle();
}
///<inheritdoc/>
@@ -49,7 +52,7 @@ namespace VNLib.Net.Messaging.FBM.Server
///<inheritdoc/>
public int WindowEndPos { get; private set; }
///<inheritdoc/>
- public Memory<byte> Buffer => _memHandle.AsMemory();
+ public Memory<byte> Buffer => _handle.GetMemory();
///<inheritdoc/>
public void Advance(int count) => WindowEndPos += count;
@@ -67,22 +70,13 @@ namespace VNLib.Net.Messaging.FBM.Server
/// <summary>
/// Allocates the internal message buffer
/// </summary>
- public void Prepare()
- {
- _memHandle ??= ArrayPool<byte>.Shared.Rent(BufferSize);
- }
+ public void Prepare() => _memManager.AllocBuffer(_handle, _bufferSize);
///<inheritdoc/>
public void Close()
{
Reset();
-
- if (_memHandle != null)
- {
- //Return the buffer to the pool
- ArrayPool<byte>.Shared.Return(_memHandle);
- _memHandle = null;
- }
+ _memManager.FreeBuffer(_handle);
}
}
diff --git a/lib/Net.Messaging.FBM/src/Server/IAsyncMessageReader.cs b/lib/Net.Messaging.FBM/src/Server/IAsyncMessageReader.cs
index b2abe8d..abb3600 100644
--- a/lib/Net.Messaging.FBM/src/Server/IAsyncMessageReader.cs
+++ b/lib/Net.Messaging.FBM/src/Server/IAsyncMessageReader.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2022 Vaughn Nugent
+* Copyright (c) 2023 Vaughn Nugent
*
* Library: VNLib
* Package: VNLib.Net.Messaging.FBM
@@ -34,7 +34,7 @@ namespace VNLib.Net.Messaging.FBM.Server
internal interface IAsyncMessageReader : IAsyncEnumerator<ReadOnlyMemory<byte>>
{
/// <summary>
- /// A value that indicates if there is data remaining after a
+ /// A value that indicates if there is data remaining after a read
/// </summary>
bool DataRemaining { get; }
}
diff --git a/lib/Net.Messaging.FBM/src/Server/IFBMServerErrorHandler.cs b/lib/Net.Messaging.FBM/src/Server/IFBMServerErrorHandler.cs
new file mode 100644
index 0000000..caa4f96
--- /dev/null
+++ b/lib/Net.Messaging.FBM/src/Server/IFBMServerErrorHandler.cs
@@ -0,0 +1,49 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Net.Messaging.FBM
+* File: IFBMServerErrorHandler.cs
+*
+* IFBMServerErrorHandler.cs is part of VNLib.Net.Messaging.FBM which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+
+namespace VNLib.Net.Messaging.FBM.Server
+{
+ /// <summary>
+ /// An server side FBM protocol error handler abstraction
+ /// </summary>
+ public interface IFBMServerErrorHandler
+ {
+ /// <summary>
+ /// An exception handler for unhandled events that occur during a listening session
+ /// </summary>
+ /// <param name="ex">The exception that caused this handler to be invoked</param>
+ void OnProcessError(Exception ex);
+
+ /// <summary>
+ /// An exception handler for invalid messages that occur during a listening session.
+ /// NOTE: The context parameter is likely in an invlaid state and should be read carefully
+ /// </summary>
+ /// <param name="context">The context that the error occured while parsing on</param>
+ /// <param name="ex">The exception explaining the reason this handler was invoked</param>
+ /// <returns>A value that indicates if the server should continue processing</returns>
+ bool OnInvalidMessage(FBMContext context, Exception ex);
+ }
+}
diff --git a/lib/Net.Messaging.FBM/src/Server/IFBMServerMessageHandler.cs b/lib/Net.Messaging.FBM/src/Server/IFBMServerMessageHandler.cs
new file mode 100644
index 0000000..532db5f
--- /dev/null
+++ b/lib/Net.Messaging.FBM/src/Server/IFBMServerMessageHandler.cs
@@ -0,0 +1,43 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Net.Messaging.FBM
+* File: IFBMServerMessageHandler.cs
+*
+* IFBMServerMessageHandler.cs is part of VNLib.Net.Messaging.FBM which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace VNLib.Net.Messaging.FBM.Server
+{
+ /// <summary>
+ /// A server side FBM protocol handler
+ /// </summary>
+ public interface IFBMServerMessageHandler : IFBMServerErrorHandler
+ {
+ /// <summary>
+ /// Handles processing of a normal incoming message
+ /// </summary>
+ /// <param name="context">The context to process for this new message</param>
+ /// <param name="cancellationToken">A token that signals the session has been cancelled</param>
+ /// <returns>A task representing the asynchronous work</returns>
+ Task HandleMessage(FBMContext context, CancellationToken cancellationToken);
+ }
+}
diff --git a/lib/Net.Messaging.FBM/src/VNLib.Net.Messaging.FBM.csproj b/lib/Net.Messaging.FBM/src/VNLib.Net.Messaging.FBM.csproj
index 7fade2c..70a640d 100644
--- a/lib/Net.Messaging.FBM/src/VNLib.Net.Messaging.FBM.csproj
+++ b/lib/Net.Messaging.FBM/src/VNLib.Net.Messaging.FBM.csproj
@@ -35,7 +35,6 @@
</ItemGroup>
<ItemGroup>
- <ProjectReference Include="..\..\Net.Http\src\VNLib.Net.Http.csproj" />
<ProjectReference Include="..\..\Plugins.Essentials\src\VNLib.Plugins.Essentials.csproj" />
<ProjectReference Include="..\..\Utils\src\VNLib.Utils.csproj" />
</ItemGroup>