aboutsummaryrefslogtreecommitdiff
path: root/lib/Net.Messaging.FBM/src/Server
diff options
context:
space:
mode:
authorLibravatar vnugent <public@vaughnnugent.com>2023-11-02 01:49:02 -0400
committerLibravatar vnugent <public@vaughnnugent.com>2023-11-02 01:49:02 -0400
commit9e3dd9be0f0ec7aaef1a719f09f96425e66369df (patch)
tree59b8bd4ace8750327db80823fa1e5eccdf44bc74 /lib/Net.Messaging.FBM/src/Server
parenteafefadc4b858e5b5be481662a2b0c8e47a43bf4 (diff)
may have gottem carried away
Diffstat (limited to 'lib/Net.Messaging.FBM/src/Server')
-rw-r--r--lib/Net.Messaging.FBM/src/Server/FBMContext.cs9
-rw-r--r--lib/Net.Messaging.FBM/src/Server/FBMListener.cs108
-rw-r--r--lib/Net.Messaging.FBM/src/Server/FBMListenerBase.cs81
-rw-r--r--lib/Net.Messaging.FBM/src/Server/FBMListenerSessionParams.cs4
-rw-r--r--lib/Net.Messaging.FBM/src/Server/FBMRequestMessage.cs56
-rw-r--r--lib/Net.Messaging.FBM/src/Server/FBMResponseMessage.cs82
-rw-r--r--lib/Net.Messaging.FBM/src/Server/HeaderDataAccumulator.cs32
-rw-r--r--lib/Net.Messaging.FBM/src/Server/IAsyncMessageReader.cs4
-rw-r--r--lib/Net.Messaging.FBM/src/Server/IFBMServerErrorHandler.cs49
-rw-r--r--lib/Net.Messaging.FBM/src/Server/IFBMServerMessageHandler.cs43
10 files changed, 246 insertions, 222 deletions
diff --git a/lib/Net.Messaging.FBM/src/Server/FBMContext.cs b/lib/Net.Messaging.FBM/src/Server/FBMContext.cs
index 6d5f3bd..f2a2fea 100644
--- a/lib/Net.Messaging.FBM/src/Server/FBMContext.cs
+++ b/lib/Net.Messaging.FBM/src/Server/FBMContext.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2022 Vaughn Nugent
+* Copyright (c) 2023 Vaughn Nugent
*
* Library: VNLib
* Package: VNLib.Net.Messaging.FBM
@@ -55,10 +55,11 @@ namespace VNLib.Net.Messaging.FBM.Server
/// <param name="requestHeaderBufferSize">The size in characters of the request header buffer</param>
/// <param name="responseBufferSize">The size in characters of the response header buffer</param>
/// <param name="headerEncoding">The message header encoding instance</param>
- public FBMContext(int requestHeaderBufferSize, int responseBufferSize, Encoding headerEncoding)
+ /// <param name="manager">The context memory manager</param>
+ public FBMContext(int requestHeaderBufferSize, int responseBufferSize, Encoding headerEncoding, IFBMMemoryManager manager)
{
- _request = Request = new(requestHeaderBufferSize);
- _response = Response = new(responseBufferSize, headerEncoding);
+ _request = Request = new(requestHeaderBufferSize, manager);
+ _response = Response = new(responseBufferSize, headerEncoding, manager);
_headerEncoding = headerEncoding;
}
diff --git a/lib/Net.Messaging.FBM/src/Server/FBMListener.cs b/lib/Net.Messaging.FBM/src/Server/FBMListener.cs
index 46ee160..30fa1ac 100644
--- a/lib/Net.Messaging.FBM/src/Server/FBMListener.cs
+++ b/lib/Net.Messaging.FBM/src/Server/FBMListener.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2022 Vaughn Nugent
+* Copyright (c) 2023 Vaughn Nugent
*
* Library: VNLib
* Package: VNLib.Net.Messaging.FBM
@@ -24,7 +24,6 @@
using System;
using System.IO;
-using System.Buffers;
using System.Threading;
using System.Net.WebSockets;
using System.Threading.Tasks;
@@ -32,7 +31,6 @@ using System.Threading.Tasks;
using VNLib.Utils.IO;
using VNLib.Utils.Async;
using VNLib.Utils.Memory;
-using VNLib.Utils.Extensions;
using VNLib.Utils.Memory.Caching;
using VNLib.Plugins.Essentials;
@@ -40,16 +38,6 @@ namespace VNLib.Net.Messaging.FBM.Server
{
/// <summary>
- /// Method delegate for processing FBM messages from an <see cref="FBMListener"/>
- /// when messages are received
- /// </summary>
- /// <param name="context">The message/connection context</param>
- /// <param name="userState">The state parameter passed on client connected</param>
- /// <param name="cancellationToken">A token that reflects the state of the listener</param>
- /// <returns>A <see cref="Task"/> that resolves when processing is complete</returns>
- public delegate Task RequestHandler(FBMContext context, object? userState, CancellationToken cancellationToken);
-
- /// <summary>
/// A FBM protocol listener. Listens for messages on a <see cref="WebSocketSession"/>
/// and raises events on requests.
/// </summary>
@@ -58,22 +46,16 @@ namespace VNLib.Net.Messaging.FBM.Server
public const int SEND_SEMAPHORE_TIMEOUT_MS = 10 * 1000;
- private readonly IUnmangedHeap Heap;
-
- /// <summary>
- /// Raised when a response processing error occured
- /// </summary>
- public event EventHandler<Exception>? OnProcessError;
+ private readonly IFBMMemoryManager MemoryManger;
/// <summary>
/// Creates a new <see cref="FBMListener"/> instance ready for
/// processing connections
/// </summary>
/// <param name="heap">The heap to alloc buffers from</param>
- public FBMListener(IUnmangedHeap heap)
- {
- Heap = heap;
- }
+ /// <exception cref="ArgumentNullException"></exception>
+ public FBMListener(IFBMMemoryManager heap) => MemoryManger = heap ?? throw new ArgumentNullException(nameof(heap));
+
#pragma warning disable CA2007 // Consider calling ConfigureAwait on the awaited task
/// <summary>
@@ -83,31 +65,38 @@ namespace VNLib.Net.Messaging.FBM.Server
/// <param name="wss">The <see cref="WebSocketSession"/> to receive messages on</param>
/// <param name="handler">The callback method to handle incoming requests</param>
/// <param name="args">The arguments used to configured this listening session</param>
- /// <param name="userState">A state parameter</param>
/// <returns>A <see cref="Task"/> that completes when the connection closes</returns>
- public async Task ListenAsync(WebSocketSession wss, RequestHandler handler, FBMListenerSessionParams args, object? userState)
+ public async Task ListenAsync(WebSocketSession wss, IFBMServerMessageHandler handler, FBMListenerSessionParams args)
{
_ = wss ?? throw new ArgumentNullException(nameof(wss));
_ = handler ?? throw new ArgumentNullException(nameof(handler));
- ListeningSession session = new(wss, handler, in args, userState);
-
- //Alloc a recieve buffer
- using IMemoryOwner<byte> recvBuffer = Heap.DirectAlloc<byte>(args.RecvBufferSize);
+ ListeningSession session = new(wss, handler, in args, MemoryManger);
//Init new queue for dispatching work
AsyncQueue<VnMemoryStream> workQueue = new(true, true);
//Start a task to process the queue
Task queueWorker = QueueWorkerDoWork(workQueue, session);
-
+
+ //Alloc buffer
+ IFBMMemoryHandle memHandle = MemoryManger.InitHandle();
+ MemoryManger.AllocBuffer(memHandle, args.RecvBufferSize);
+
try
{
+ if(!MemoryManger.TryGetHeap(out IUnmangedHeap? heap))
+ {
+ throw new NotSupportedException("The memory manager must export an unmanaged heap");
+ }
+
+ Memory<byte> recvBuffer = memHandle.GetMemory();
+
//Listen for incoming messages
while (true)
{
//Receive a message
- ValueWebSocketReceiveResult result = await wss.ReceiveAsync(recvBuffer.Memory);
+ ValueWebSocketReceiveResult result = await wss.ReceiveAsync(recvBuffer);
//If a close message has been received, we can gracefully exit
if (result.MessageType == WebSocketMessageType.Close)
{
@@ -118,13 +107,13 @@ namespace VNLib.Net.Messaging.FBM.Server
}
//create buffer for storing data, pre alloc with initial data
- VnMemoryStream request = new(Heap, recvBuffer.Memory[..result.Count]);
+ VnMemoryStream request = new(heap, recvBuffer[..result.Count]);
//Streaming read
while (!result.EndOfMessage)
{
//Read more data
- result = await wss.ReceiveAsync(recvBuffer.Memory);
+ result = await wss.ReceiveAsync(recvBuffer);
//Make sure the request is small enough to buffer
if (request.Length + result.Count > args.MaxMessageSize)
{
@@ -135,8 +124,9 @@ namespace VNLib.Net.Messaging.FBM.Server
//break listen loop
goto Exit;
}
+
//write to buffer
- request.Write(recvBuffer.Memory.Span[..result.Count]);
+ request.Write(memHandle.GetSpan()[..result.Count]);
}
//Make sure data is available
if (request.Length == 0)
@@ -195,14 +185,19 @@ namespace VNLib.Net.Messaging.FBM.Server
if ((context.Request.ParseStatus & HeaderParseError.InvalidId) > 0)
{
- OnProcessError?.Invoke(this, new FBMException($"Invalid messageid {context.Request.MessageId}, message length {data.Length}"));
- return;
+ Exception cause = new FBMException($"Invalid messageid {context.Request.MessageId}, message length {data.Length}");
+ _ = session.Handler.OnInvalidMessage(context, cause);
+ return; //Cannot continue on invalid message id
}
//Check parse status flags
if ((context.Request.ParseStatus & HeaderParseError.HeaderOutOfMem) > 0)
{
- OnProcessError?.Invoke(this, new FBMException("Packet received with not enough space to store headers"));
+ Exception cause = new FBMException("Packet received with not enough space to store headers");
+ if(!session.Handler.OnInvalidMessage(context, cause))
+ {
+ return;
+ }
}
//Determine if request is an out-of-band message
else if (context.Request.MessageId == Helpers.CONTROL_FRAME_MID)
@@ -213,18 +208,15 @@ namespace VNLib.Net.Messaging.FBM.Server
else
{
//Invoke normal message handler
- await session.OnRecieved.Invoke(context, session.UserState, session.CancellationToken);
+ await session.Handler.HandleMessage(context, session.CancellationToken);
}
- //Get response data
-
- await using IAsyncMessageReader messageEnumerator = await context.Response.GetResponseDataAsync(session.CancellationToken);
-
+ //Get response data reader
+ await using IAsyncMessageReader messageEnumerator = context.Response.GetResponseData();
//Load inital segment
if (await messageEnumerator.MoveNextAsync() && !session.CancellationToken.IsCancellationRequested)
- {
- ValueTask sendTask;
+ {
//Syncrhonize access to send data because we may need to stream data to the client
await session.ResponseLock.WaitAsync(SEND_SEMAPHORE_TIMEOUT_MS);
@@ -233,10 +225,8 @@ namespace VNLib.Net.Messaging.FBM.Server
{
do
{
- bool eof = !messageEnumerator.DataRemaining;
-
- //Send first segment
- sendTask = session.Socket.SendAsync(messageEnumerator.Current, WebSocketMessageType.Binary, eof);
+ //Send current segment
+ await session.Socket.SendAsync(messageEnumerator.Current, WebSocketMessageType.Binary, !messageEnumerator.DataRemaining);
/*
* WARNING!
@@ -250,9 +240,6 @@ namespace VNLib.Net.Messaging.FBM.Server
{
break;
}
-
- //Await previous send
- await sendTask;
} while (true);
}
@@ -261,15 +248,13 @@ namespace VNLib.Net.Messaging.FBM.Server
//release semaphore
session.ResponseLock.Release();
}
-
- await sendTask;
}
//No data to send
}
catch (Exception ex)
{
- OnProcessError?.Invoke(this, ex);
+ session.Handler.OnProcessError(ex);
}
finally
{
@@ -295,25 +280,23 @@ namespace VNLib.Net.Messaging.FBM.Server
private readonly CancellationTokenSource Cancellation;
private readonly CancellationTokenRegistration Registration;
private readonly FBMListenerSessionParams Params;
+ private readonly IFBMMemoryManager MemManager;
- public readonly object? UserState;
-
public readonly SemaphoreSlim ResponseLock;
public readonly WebSocketSession Socket;
- public readonly RequestHandler OnRecieved;
+ public readonly IFBMServerMessageHandler Handler;
public CancellationToken CancellationToken => Cancellation.Token;
-
- public ListeningSession(WebSocketSession session, RequestHandler onRecieved, in FBMListenerSessionParams args, object? userState)
+ public ListeningSession(WebSocketSession session, IFBMServerMessageHandler handler, in FBMListenerSessionParams args, IFBMMemoryManager memManager)
{
Params = args;
Socket = session;
- UserState = userState;
- OnRecieved = onRecieved;
+ Handler = handler;
+ MemManager = memManager;
//Create cancellation and register for session close
Cancellation = new();
@@ -323,7 +306,7 @@ namespace VNLib.Net.Messaging.FBM.Server
CtxStore = ObjectRental.CreateReusable(ContextCtor);
}
- private FBMContext ContextCtor() => new(Params.MaxHeaderBufferSize, Params.ResponseBufferSize, Params.HeaderEncoding);
+ private FBMContext ContextCtor() => new(Params.MaxHeaderBufferSize, Params.ResponseBufferSize, Params.HeaderEncoding, MemManager);
/// <summary>
/// Cancels any pending opreations relating to the current session
@@ -358,7 +341,6 @@ namespace VNLib.Net.Messaging.FBM.Server
/// <exception cref="ObjectDisposedException"></exception>
public FBMContext RentContext()
{
-
if (Cancellation.IsCancellationRequested)
{
throw new ObjectDisposedException("The instance has been disposed");
diff --git a/lib/Net.Messaging.FBM/src/Server/FBMListenerBase.cs b/lib/Net.Messaging.FBM/src/Server/FBMListenerBase.cs
index 3e9fde2..71b1c8f 100644
--- a/lib/Net.Messaging.FBM/src/Server/FBMListenerBase.cs
+++ b/lib/Net.Messaging.FBM/src/Server/FBMListenerBase.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2022 Vaughn Nugent
+* Copyright (c) 2023 Vaughn Nugent
*
* Library: VNLib
* Package: VNLib.Net.Messaging.FBM
@@ -27,67 +27,29 @@ using System.Threading;
using System.Threading.Tasks;
using VNLib.Utils.Logging;
-using VNLib.Utils.Memory;
using VNLib.Plugins.Essentials;
namespace VNLib.Net.Messaging.FBM.Server
{
+
/// <summary>
/// Provides a simple base class for an <see cref="FBMListener"/>
/// processor
/// </summary>
- public abstract class FBMListenerBase
+ public abstract class FBMListenerBase<T> : IFBMServerErrorHandler
{
/// <summary>
/// The initialzied listener
/// </summary>
- protected FBMListener? Listener { get; private set; }
+ protected abstract FBMListener Listener { get; }
+
/// <summary>
/// A provider to write log information to
/// </summary>
protected abstract ILogProvider Log { get; }
/// <summary>
- /// Initializes the <see cref="FBMListener"/>
- /// </summary>
- /// <param name="heap">The heap to alloc buffers from</param>
- protected void InitListener(IUnmangedHeap heap)
- {
- Listener = new(heap);
- //Attach service handler
- Listener.OnProcessError += Listener_OnProcessError;
- }
-
- /// <summary>
- /// A single event service routine for servicing errors that occur within
- /// the listener loop
- /// </summary>
- /// <param name="sender"></param>
- /// <param name="e">The exception that was raised</param>
- protected virtual void Listener_OnProcessError(object? sender, Exception e)
- {
- //Write the error to the log
- Log.Error(e);
- }
-
- private async Task OnReceivedAsync(FBMContext context, object? userState, CancellationToken token)
- {
- try
- {
- await ProcessAsync(context, userState, token);
- }
- catch (OperationCanceledException)
- {
- Log.Debug("Async operation cancelled");
- }
- catch(Exception ex)
- {
- Log.Error(ex);
- }
- }
-
- /// <summary>
/// Begins listening for requests on the current websocket until
/// a close message is received or an error occurs
/// </summary>
@@ -95,10 +57,13 @@ namespace VNLib.Net.Messaging.FBM.Server
/// <param name="args">The arguments used to configured this listening session</param>
/// <param name="userState">A state token to use for processing events for this connection</param>
/// <returns>A <see cref="Task"/> that completes when the connection closes</returns>
- public virtual async Task ListenAsync(WebSocketSession wss, FBMListenerSessionParams args, object? userState)
+ /// <exception cref="InvalidOperationException"></exception>
+ public virtual Task ListenAsync(WebSocketSession wss, T userState, FBMListenerSessionParams args)
{
_ = Listener ?? throw new InvalidOperationException("The listener has not been intialized");
- await Listener.ListenAsync(wss, OnReceivedAsync, args, userState);
+ //Initn new event handler
+ FBMEventHandler handler = new(userState, this);
+ return Listener.ListenAsync(wss, handler, args);
}
/// <summary>
@@ -108,6 +73,30 @@ namespace VNLib.Net.Messaging.FBM.Server
/// <param name="userState">A state token passed on client connected</param>
/// <param name="exitToken">A token that reflects the state of the listener</param>
/// <returns>A task that completes when the message has been serviced</returns>
- protected abstract Task ProcessAsync(FBMContext context, object? userState, CancellationToken exitToken);
+ protected abstract Task ProcessAsync(FBMContext context, T? userState, CancellationToken exitToken);
+
+ ///<inheritdoc/>
+ public virtual bool OnInvalidMessage(FBMContext context, Exception ex)
+ {
+ Log.Error("Invalid message received for session {ses}\n{ex}", context.Request.ConnectionId, ex);
+ //Invalid id should be captured already, so if oom, do not allow, but if a single header is invalid, it will be ignored by default
+ return !context.Request.ParseStatus.HasFlag(HeaderParseError.HeaderOutOfMem);
+ }
+
+ ///<inheritdoc/>
+ public virtual void OnProcessError(Exception ex) => Log.Error(ex);
+
+
+ private sealed record class FBMEventHandler(T State, FBMListenerBase<T> Lb) : IFBMServerMessageHandler
+ {
+ ///<inheritdoc/>
+ public Task HandleMessage(FBMContext context, CancellationToken cancellationToken) => Lb.ProcessAsync(context, State, cancellationToken);
+
+ ///<inheritdoc/>
+ public bool OnInvalidMessage(FBMContext context, Exception ex) => Lb.OnInvalidMessage(context, ex);
+
+ ///<inheritdoc/>
+ public void OnProcessError(Exception ex) => Lb.OnProcessError(ex);
+ }
}
}
diff --git a/lib/Net.Messaging.FBM/src/Server/FBMListenerSessionParams.cs b/lib/Net.Messaging.FBM/src/Server/FBMListenerSessionParams.cs
index ccf79db..0b4fa5b 100644
--- a/lib/Net.Messaging.FBM/src/Server/FBMListenerSessionParams.cs
+++ b/lib/Net.Messaging.FBM/src/Server/FBMListenerSessionParams.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2022 Vaughn Nugent
+* Copyright (c) 2023 Vaughn Nugent
*
* Library: VNLib
* Package: VNLib.Net.Messaging.FBM
@@ -30,7 +30,7 @@ namespace VNLib.Net.Messaging.FBM.Server
/// Represents a configuration structure for an <see cref="FBMListener"/>
/// listening session
/// </summary>
- public readonly struct FBMListenerSessionParams
+ public readonly record struct FBMListenerSessionParams
{
/// <summary>
/// The size of the buffer to use while reading data from the websocket
diff --git a/lib/Net.Messaging.FBM/src/Server/FBMRequestMessage.cs b/lib/Net.Messaging.FBM/src/Server/FBMRequestMessage.cs
index db0655a..e9ff9f5 100644
--- a/lib/Net.Messaging.FBM/src/Server/FBMRequestMessage.cs
+++ b/lib/Net.Messaging.FBM/src/Server/FBMRequestMessage.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2022 Vaughn Nugent
+* Copyright (c) 2023 Vaughn Nugent
*
* Library: VNLib
* Package: VNLib.Net.Messaging.FBM
@@ -24,13 +24,12 @@
using System;
using System.Text;
-using System.Buffers;
-using System.Text.Json;
using System.Collections.Generic;
+using System.Runtime.InteropServices;
using System.Runtime.CompilerServices;
using VNLib.Utils.IO;
-using VNLib.Utils.Extensions;
+using VNLib.Utils.Memory;
using VNLib.Utils.Memory.Caching;
namespace VNLib.Net.Messaging.FBM.Server
@@ -42,18 +41,21 @@ namespace VNLib.Net.Messaging.FBM.Server
{
private readonly List<FBMMessageHeader> _headers;
private readonly int HeaderBufferSize;
+ private readonly IFBMMemoryManager _memoryManager;
+ private readonly IFBMSpanOnlyMemoryHandle _memHandle;
/// <summary>
/// Creates a new resusable <see cref="FBMRequestMessage"/>
/// </summary>
/// <param name="headerBufferSize">The size of the buffer to alloc during initialization</param>
- internal FBMRequestMessage(int headerBufferSize)
+ /// <param name="manager">The memory manager to use for the message</param>
+ internal FBMRequestMessage(int headerBufferSize, IFBMMemoryManager manager)
{
HeaderBufferSize = headerBufferSize;
_headers = new();
+ _memoryManager = manager;
+ _memHandle = _memoryManager.InitSpanOnly();
}
-
- private char[]? _headerBuffer;
/// <summary>
/// The ID of the current message
@@ -115,35 +117,13 @@ namespace VNLib.Net.Messaging.FBM.Server
//Parse headers
ParseStatus = Helpers.ParseHeaders(vms, this, _headers, dataEncoding);
}
-
- /// <summary>
- /// Deserializes the request body into a new specified object type
- /// </summary>
- /// <typeparam name="T">The type of the object to deserialize</typeparam>
- /// <param name="jso">The <see cref="JsonSerializerOptions"/> to use while deserializing data</param>
- /// <returns>The deserialized object from the request body</returns>
- /// <exception cref="JsonException"></exception>
- public T? DeserializeBody<T>(JsonSerializerOptions? jso = default)
- {
- return BodyData.IsEmpty ? default : BodyData.AsJsonObject<T>(jso);
- }
-
- /// <summary>
- /// Gets a <see cref="JsonDocument"/> of the request body
- /// </summary>
- /// <returns>The parsed <see cref="JsonDocument"/> if parsed successfully, or null otherwise</returns>
- /// <exception cref="JsonException"></exception>
- public JsonDocument? GetBodyAsJson()
- {
- Utf8JsonReader reader = new(BodyData);
- return JsonDocument.TryParseValue(ref reader, out JsonDocument? jdoc) ? jdoc : default;
- }
+
void IReusable.Prepare()
{
ParseStatus = HeaderParseError.None;
//Alloc header buffer
- _headerBuffer = ArrayPool<char>.Shared.Rent(HeaderBufferSize);
+ _memoryManager.AllocBuffer(_memHandle, MemoryUtil.ByteCount<char>(HeaderBufferSize));
}
@@ -155,8 +135,7 @@ namespace VNLib.Net.Messaging.FBM.Server
//Clear headers before freeing buffer
_headers.Clear();
//Free header-buffer
- ArrayPool<char>.Shared.Return(_headerBuffer!);
- _headerBuffer = null;
+ _memoryManager.FreeBuffer(_memHandle);
ConnectionId = null;
MessageId = 0;
IsControlFrame = false;
@@ -165,11 +144,16 @@ namespace VNLib.Net.Messaging.FBM.Server
[MethodImpl(MethodImplOptions.AggressiveInlining)]
- Span<char> IFBMHeaderBuffer.GetSpan(int offset, int count)
- => _headerBuffer != null ? _headerBuffer.AsSpan(offset, count) : throw new InvalidOperationException("The buffer is no longer available");
+ Span<char> IFBMHeaderBuffer.GetSpan(int offset, int count)
+ {
+ //Cast to char buffer
+ Span<char> chars = MemoryMarshal.Cast<byte, char>(_memHandle.GetSpan());
+ //Return the requested span
+ return chars.Slice(offset, count);
+ }
[MethodImpl(MethodImplOptions.AggressiveInlining)]
- Span<char> IFBMHeaderBuffer.GetSpan() => _headerBuffer ?? throw new InvalidOperationException("The buffer is no longer available");
+ Span<char> IFBMHeaderBuffer.GetSpan() => MemoryMarshal.Cast<byte, char>(_memHandle.GetSpan());
}
}
diff --git a/lib/Net.Messaging.FBM/src/Server/FBMResponseMessage.cs b/lib/Net.Messaging.FBM/src/Server/FBMResponseMessage.cs
index 9ca6b4d..1e26140 100644
--- a/lib/Net.Messaging.FBM/src/Server/FBMResponseMessage.cs
+++ b/lib/Net.Messaging.FBM/src/Server/FBMResponseMessage.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2022 Vaughn Nugent
+* Copyright (c) 2023 Vaughn Nugent
*
* Library: VNLib
* Package: VNLib.Net.Messaging.FBM
@@ -24,14 +24,12 @@
using System;
using System.Text;
-using System.Threading;
using System.Threading.Tasks;
using VNLib.Net.Http;
using VNLib.Utils.IO;
using VNLib.Utils.Extensions;
using VNLib.Utils.Memory.Caching;
-using VNLib.Net.Messaging.FBM.Client;
namespace VNLib.Net.Messaging.FBM.Server
{
@@ -41,9 +39,9 @@ namespace VNLib.Net.Messaging.FBM.Server
/// </summary>
public sealed class FBMResponseMessage : IReusable, IFBMMessage
{
- internal FBMResponseMessage(int internalBufferSize, Encoding headerEncoding)
+ internal FBMResponseMessage(int internalBufferSize, Encoding headerEncoding, IFBMMemoryManager manager)
{
- _headerAccumulator = new HeaderDataAccumulator(internalBufferSize);
+ _headerAccumulator = new HeaderDataAccumulator(internalBufferSize, manager);
_headerEncoding = headerEncoding;
_messageEnumerator = new(this);
}
@@ -134,92 +132,76 @@ namespace VNLib.Net.Messaging.FBM.Server
/// <summary>
/// Gets the internal message body enumerator and prepares the message for sending
/// </summary>
- /// <param name="cancellationToken">A cancellation token</param>
/// <returns>A value task that returns the message body enumerator</returns>
- internal async ValueTask<IAsyncMessageReader> GetResponseDataAsync(CancellationToken cancellationToken)
- {
- //try to buffer as much data in the header segment first
- if(MessageBody?.RemainingSize > 0 && _headerAccumulator.RemainingSize > 0)
- {
- //Read data from the message
- int read = await MessageBody.ReadAsync(_headerAccumulator.RemainingBuffer, cancellationToken);
-
- //Advance accumulator to the read bytes
- _headerAccumulator.Advance(read);
- }
- //return reusable enumerator
- return _messageEnumerator;
- }
+ internal IAsyncMessageReader GetResponseData() => _messageEnumerator;
private sealed class MessageSegmentEnumerator : IAsyncMessageReader
{
private readonly FBMResponseMessage _message;
+ private readonly ISlindingWindowBuffer<byte> _accumulator;
bool HeadersRead;
public MessageSegmentEnumerator(FBMResponseMessage message)
{
_message = message;
+ _accumulator = _message._headerAccumulator;
}
- public ReadOnlyMemory<byte> Current { get; private set; }
+ ///<inheritdoc/>
+ public ReadOnlyMemory<byte> Current => _accumulator.AccumulatedBuffer;
- public bool DataRemaining { get; private set; }
+ ///<inheritdoc/>
+ public bool DataRemaining => _message.MessageBody?.RemainingSize > 0;
+ ///<inheritdoc/>
public async ValueTask<bool> MoveNextAsync()
{
//Attempt to read header segment first
if (!HeadersRead)
{
- //Set the accumulated buffer
- Current = _message._headerAccumulator.AccumulatedBuffer;
+ /*
+ * If headers have not been read yet, we can attempt to buffer as much
+ * of the message body into the header accumulator buffer as possible. This will
+ * reduce message fragmentation.
+ */
+ if (DataRemaining && _accumulator.RemainingSize > 0)
+ {
+ int read = await _message.MessageBody.ReadAsync(_accumulator.RemainingBuffer).ConfigureAwait(false);
- //Update data remaining flag
- DataRemaining = _message.MessageBody?.RemainingSize > 0;
+ //Advance accumulator to the read bytes
+ _accumulator.Advance(read);
+ }
//Set headers read flag
HeadersRead = true;
return true;
}
- else if (_message.MessageBody?.RemainingSize > 0)
+ else if (DataRemaining)
{
- //Use the header buffer as the buffer for the message body
- Memory<byte> buffer = _message._headerAccumulator.Buffer;
+ //Reset the accumulator so we can read another segment
+ _accumulator.Reset();
//Read body segment
- int read = await _message.MessageBody.ReadAsync(buffer);
+ int read = await _message.MessageBody.ReadAsync(_accumulator.RemainingBuffer);
- //Update data remaining flag
- DataRemaining = _message.MessageBody.RemainingSize > 0;
+ //Advance accumulator to the read bytes
+ _accumulator.Advance(read);
- if (read > 0)
- {
- //Store the read segment
- Current = buffer[..read];
- return true;
- }
+ return read > 0;
}
return false;
}
+ ///<inheritdoc/>
public ValueTask DisposeAsync()
{
- //Clear current segment
- Current = default;
-
//Reset headers read flag
HeadersRead = false;
-
+
//Dispose the message body if set
- if (_message.MessageBody != null)
- {
- return _message.MessageBody.DisposeAsync();
- }
- else
- {
- return ValueTask.CompletedTask;
- }
+ return _message.MessageBody != null ? _message.MessageBody.DisposeAsync() : ValueTask.CompletedTask;
}
}
}
diff --git a/lib/Net.Messaging.FBM/src/Server/HeaderDataAccumulator.cs b/lib/Net.Messaging.FBM/src/Server/HeaderDataAccumulator.cs
index 423a26e..891c583 100644
--- a/lib/Net.Messaging.FBM/src/Server/HeaderDataAccumulator.cs
+++ b/lib/Net.Messaging.FBM/src/Server/HeaderDataAccumulator.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2022 Vaughn Nugent
+* Copyright (c) 2023 Vaughn Nugent
*
* Library: VNLib
* Package: VNLib.Net.Messaging.FBM
@@ -23,25 +23,28 @@
*/
using System;
-using System.Buffers;
using VNLib.Utils.IO;
namespace VNLib.Net.Messaging.FBM.Server
{
+
/// <summary>
/// Reusable sliding window impl
/// </summary>
internal sealed class HeaderDataAccumulator : ISlindingWindowBuffer<byte>
{
- private readonly int BufferSize;
-
- private byte[]? _memHandle;
+ private readonly int _bufferSize;
+ private readonly IFBMMemoryManager _memManager;
+ private readonly IFBMMemoryHandle _handle;
+
- public HeaderDataAccumulator(int bufferSize)
+ public HeaderDataAccumulator(int bufferSize, IFBMMemoryManager memManager)
{
- BufferSize = bufferSize;
+ _bufferSize = bufferSize;
+ _memManager = memManager;
+ _handle = memManager.InitHandle();
}
///<inheritdoc/>
@@ -49,7 +52,7 @@ namespace VNLib.Net.Messaging.FBM.Server
///<inheritdoc/>
public int WindowEndPos { get; private set; }
///<inheritdoc/>
- public Memory<byte> Buffer => _memHandle.AsMemory();
+ public Memory<byte> Buffer => _handle.GetMemory();
///<inheritdoc/>
public void Advance(int count) => WindowEndPos += count;
@@ -67,22 +70,13 @@ namespace VNLib.Net.Messaging.FBM.Server
/// <summary>
/// Allocates the internal message buffer
/// </summary>
- public void Prepare()
- {
- _memHandle ??= ArrayPool<byte>.Shared.Rent(BufferSize);
- }
+ public void Prepare() => _memManager.AllocBuffer(_handle, _bufferSize);
///<inheritdoc/>
public void Close()
{
Reset();
-
- if (_memHandle != null)
- {
- //Return the buffer to the pool
- ArrayPool<byte>.Shared.Return(_memHandle);
- _memHandle = null;
- }
+ _memManager.FreeBuffer(_handle);
}
}
diff --git a/lib/Net.Messaging.FBM/src/Server/IAsyncMessageReader.cs b/lib/Net.Messaging.FBM/src/Server/IAsyncMessageReader.cs
index b2abe8d..abb3600 100644
--- a/lib/Net.Messaging.FBM/src/Server/IAsyncMessageReader.cs
+++ b/lib/Net.Messaging.FBM/src/Server/IAsyncMessageReader.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2022 Vaughn Nugent
+* Copyright (c) 2023 Vaughn Nugent
*
* Library: VNLib
* Package: VNLib.Net.Messaging.FBM
@@ -34,7 +34,7 @@ namespace VNLib.Net.Messaging.FBM.Server
internal interface IAsyncMessageReader : IAsyncEnumerator<ReadOnlyMemory<byte>>
{
/// <summary>
- /// A value that indicates if there is data remaining after a
+ /// A value that indicates if there is data remaining after a read
/// </summary>
bool DataRemaining { get; }
}
diff --git a/lib/Net.Messaging.FBM/src/Server/IFBMServerErrorHandler.cs b/lib/Net.Messaging.FBM/src/Server/IFBMServerErrorHandler.cs
new file mode 100644
index 0000000..caa4f96
--- /dev/null
+++ b/lib/Net.Messaging.FBM/src/Server/IFBMServerErrorHandler.cs
@@ -0,0 +1,49 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Net.Messaging.FBM
+* File: IFBMServerErrorHandler.cs
+*
+* IFBMServerErrorHandler.cs is part of VNLib.Net.Messaging.FBM which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+
+namespace VNLib.Net.Messaging.FBM.Server
+{
+ /// <summary>
+ /// An server side FBM protocol error handler abstraction
+ /// </summary>
+ public interface IFBMServerErrorHandler
+ {
+ /// <summary>
+ /// An exception handler for unhandled events that occur during a listening session
+ /// </summary>
+ /// <param name="ex">The exception that caused this handler to be invoked</param>
+ void OnProcessError(Exception ex);
+
+ /// <summary>
+ /// An exception handler for invalid messages that occur during a listening session.
+ /// NOTE: The context parameter is likely in an invlaid state and should be read carefully
+ /// </summary>
+ /// <param name="context">The context that the error occured while parsing on</param>
+ /// <param name="ex">The exception explaining the reason this handler was invoked</param>
+ /// <returns>A value that indicates if the server should continue processing</returns>
+ bool OnInvalidMessage(FBMContext context, Exception ex);
+ }
+}
diff --git a/lib/Net.Messaging.FBM/src/Server/IFBMServerMessageHandler.cs b/lib/Net.Messaging.FBM/src/Server/IFBMServerMessageHandler.cs
new file mode 100644
index 0000000..532db5f
--- /dev/null
+++ b/lib/Net.Messaging.FBM/src/Server/IFBMServerMessageHandler.cs
@@ -0,0 +1,43 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Net.Messaging.FBM
+* File: IFBMServerMessageHandler.cs
+*
+* IFBMServerMessageHandler.cs is part of VNLib.Net.Messaging.FBM which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace VNLib.Net.Messaging.FBM.Server
+{
+ /// <summary>
+ /// A server side FBM protocol handler
+ /// </summary>
+ public interface IFBMServerMessageHandler : IFBMServerErrorHandler
+ {
+ /// <summary>
+ /// Handles processing of a normal incoming message
+ /// </summary>
+ /// <param name="context">The context to process for this new message</param>
+ /// <param name="cancellationToken">A token that signals the session has been cancelled</param>
+ /// <returns>A task representing the asynchronous work</returns>
+ Task HandleMessage(FBMContext context, CancellationToken cancellationToken);
+ }
+}