aboutsummaryrefslogtreecommitdiff
path: root/lib/Net.Messaging.FBM/src/Server
diff options
context:
space:
mode:
authorLibravatar vnugent <public@vaughnnugent.com>2023-01-08 16:01:54 -0500
committerLibravatar vnugent <public@vaughnnugent.com>2023-01-08 16:01:54 -0500
commitde94d788e9a47432a7630a8215896b8dd3628599 (patch)
tree666dec06eef861d101cb6948aff52a3d354c8d73 /lib/Net.Messaging.FBM/src/Server
parentbe6dc557a3b819248b014992eb96c1cb21f8112b (diff)
Reorder + analyzer cleanup
Diffstat (limited to 'lib/Net.Messaging.FBM/src/Server')
-rw-r--r--lib/Net.Messaging.FBM/src/Server/FBMContext.cs85
-rw-r--r--lib/Net.Messaging.FBM/src/Server/FBMListener.cs388
-rw-r--r--lib/Net.Messaging.FBM/src/Server/FBMListenerBase.cs113
-rw-r--r--lib/Net.Messaging.FBM/src/Server/FBMListenerSessionParams.cs62
-rw-r--r--lib/Net.Messaging.FBM/src/Server/FBMRequestMessage.cs196
-rw-r--r--lib/Net.Messaging.FBM/src/Server/FBMResponseMessage.cs226
-rw-r--r--lib/Net.Messaging.FBM/src/Server/HeaderDataAccumulator.cs89
-rw-r--r--lib/Net.Messaging.FBM/src/Server/IAsyncMessageBody.cs57
-rw-r--r--lib/Net.Messaging.FBM/src/Server/IAsyncMessageReader.cs42
-rw-r--r--lib/Net.Messaging.FBM/src/Server/readme.md35
10 files changed, 1293 insertions, 0 deletions
diff --git a/lib/Net.Messaging.FBM/src/Server/FBMContext.cs b/lib/Net.Messaging.FBM/src/Server/FBMContext.cs
new file mode 100644
index 0000000..fb39d1b
--- /dev/null
+++ b/lib/Net.Messaging.FBM/src/Server/FBMContext.cs
@@ -0,0 +1,85 @@
+/*
+* Copyright (c) 2022 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Net.Messaging.FBM
+* File: FBMContext.cs
+*
+* FBMContext.cs is part of VNLib.Net.Messaging.FBM which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System.Text;
+
+using VNLib.Utils.IO;
+using VNLib.Utils.Memory.Caching;
+
+namespace VNLib.Net.Messaging.FBM.Server
+{
+ /// <summary>
+ /// A request/response pair message context
+ /// </summary>
+ public sealed class FBMContext : IReusable
+ {
+ private readonly Encoding _headerEncoding;
+
+ /// <summary>
+ /// The request message to process
+ /// </summary>
+ public FBMRequestMessage Request { get; }
+ /// <summary>
+ /// The response message
+ /// </summary>
+ public FBMResponseMessage Response { get; }
+ /// <summary>
+ /// Creates a new reusable <see cref="FBMContext"/>
+ /// for use within a <see cref="ObjectRental{T}"/>
+ /// cache
+ /// </summary>
+ /// <param name="requestHeaderBufferSize">The size in characters of the request header buffer</param>
+ /// <param name="responseBufferSize">The size in characters of the response header buffer</param>
+ /// <param name="headerEncoding">The message header encoding instance</param>
+ public FBMContext(int requestHeaderBufferSize, int responseBufferSize, Encoding headerEncoding)
+ {
+ Request = new(requestHeaderBufferSize);
+ Response = new(responseBufferSize, headerEncoding);
+ _headerEncoding = headerEncoding;
+ }
+
+ /// <summary>
+ /// Initializes the context with the buffered request data
+ /// </summary>
+ /// <param name="requestData">The request data buffer positioned at the begining of the request data</param>
+ /// <param name="connectionId">The unique id of the connection</param>
+ internal void Prepare(VnMemoryStream requestData, string connectionId)
+ {
+ Request.Prepare(requestData, connectionId, _headerEncoding);
+ //Message id is set after the request parses the incoming message
+ Response.Prepare(Request.MessageId);
+ }
+
+ void IReusable.Prepare()
+ {
+ (Request as IReusable).Prepare();
+ (Response as IReusable).Prepare();
+ }
+
+ bool IReusable.Release()
+ {
+ return (Request as IReusable).Release() & (Response as IReusable).Release();
+ }
+ }
+}
diff --git a/lib/Net.Messaging.FBM/src/Server/FBMListener.cs b/lib/Net.Messaging.FBM/src/Server/FBMListener.cs
new file mode 100644
index 0000000..6cca2a9
--- /dev/null
+++ b/lib/Net.Messaging.FBM/src/Server/FBMListener.cs
@@ -0,0 +1,388 @@
+/*
+* Copyright (c) 2022 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Net.Messaging.FBM
+* File: FBMListener.cs
+*
+* FBMListener.cs is part of VNLib.Net.Messaging.FBM which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+using System.IO;
+using System.Buffers;
+using System.Threading;
+using System.Net.WebSockets;
+using System.Threading.Tasks;
+
+using VNLib.Utils.IO;
+using VNLib.Utils.Async;
+using VNLib.Utils.Memory;
+using VNLib.Utils.Extensions;
+using VNLib.Utils.Memory.Caching;
+using VNLib.Plugins.Essentials;
+
+namespace VNLib.Net.Messaging.FBM.Server
+{
+
+ /// <summary>
+ /// Method delegate for processing FBM messages from an <see cref="FBMListener"/>
+ /// when messages are received
+ /// </summary>
+ /// <param name="context">The message/connection context</param>
+ /// <param name="userState">The state parameter passed on client connected</param>
+ /// <param name="cancellationToken">A token that reflects the state of the listener</param>
+ /// <returns>A <see cref="Task"/> that resolves when processing is complete</returns>
+ public delegate Task RequestHandler(FBMContext context, object? userState, CancellationToken cancellationToken);
+
+ /// <summary>
+ /// A FBM protocol listener. Listens for messages on a <see cref="WebSocketSession"/>
+ /// and raises events on requests.
+ /// </summary>
+ public class FBMListener
+ {
+ private sealed class ListeningSession
+ {
+ private readonly ReusableStore<FBMContext> CtxStore;
+ private readonly CancellationTokenSource Cancellation;
+ private readonly CancellationTokenRegistration Registration;
+ private readonly FBMListenerSessionParams Params;
+
+
+ public readonly object? UserState;
+
+ public readonly SemaphoreSlim ResponseLock;
+
+ public readonly WebSocketSession Socket;
+
+ public readonly RequestHandler OnRecieved;
+
+ public CancellationToken CancellationToken => Cancellation.Token;
+
+
+ public ListeningSession(WebSocketSession session, RequestHandler onRecieved, in FBMListenerSessionParams args, object? userState)
+ {
+ Params = args;
+ Socket = session;
+ UserState = userState;
+ OnRecieved = onRecieved;
+
+ //Create cancellation and register for session close
+ Cancellation = new();
+ Registration = session.Token.Register(Cancellation.Cancel);
+
+
+ ResponseLock = new(1);
+ CtxStore = ObjectRental.CreateReusable(ContextCtor);
+ }
+
+ private FBMContext ContextCtor() => new(Params.MaxHeaderBufferSize, Params.ResponseBufferSize, Params.HeaderEncoding);
+
+ /// <summary>
+ /// Cancels any pending opreations relating to the current session
+ /// </summary>
+ public void CancelSession()
+ {
+ Cancellation.Cancel();
+
+ //If dispose happens without any outstanding requests, we can dispose the session
+ if (_counter == 0)
+ {
+ CleanupInternal();
+ }
+ }
+
+ private void CleanupInternal()
+ {
+ Registration.Dispose();
+ CtxStore.Dispose();
+ Cancellation.Dispose();
+ ResponseLock.Dispose();
+ }
+
+
+ private uint _counter;
+
+ /// <summary>
+ /// Rents a new <see cref="FBMContext"/> instance from the pool
+ /// and increments the counter
+ /// </summary>
+ /// <returns>The rented instance</returns>
+ /// <exception cref="ObjectDisposedException"></exception>
+ public FBMContext RentContext()
+ {
+
+ if (Cancellation.IsCancellationRequested)
+ {
+ throw new ObjectDisposedException("The instance has been disposed");
+ }
+
+ //Rent context
+ FBMContext ctx = CtxStore.Rent();
+ //Increment counter
+ Interlocked.Increment(ref _counter);
+
+ return ctx;
+ }
+
+ /// <summary>
+ /// Returns a previously rented context to the pool
+ /// and decrements the counter. If the session has been
+ /// cancelled, when the counter reaches 0, cleanup occurs
+ /// </summary>
+ /// <param name="ctx">The context to return</param>
+ public void ReturnContext(FBMContext ctx)
+ {
+ //Return the context
+ CtxStore.Return(ctx);
+
+ uint current = Interlocked.Decrement(ref _counter);
+
+ //No more contexts in use, dispose internals
+ if (Cancellation.IsCancellationRequested && current == 0)
+ {
+ ResponseLock.Dispose();
+ Cancellation.Dispose();
+ CtxStore.Dispose();
+ }
+ }
+ }
+
+ public const int SEND_SEMAPHORE_TIMEOUT_MS = 10 * 1000;
+
+ private readonly IUnmangedHeap Heap;
+
+ /// <summary>
+ /// Raised when a response processing error occured
+ /// </summary>
+ public event EventHandler<Exception>? OnProcessError;
+
+ /// <summary>
+ /// Creates a new <see cref="FBMListener"/> instance ready for
+ /// processing connections
+ /// </summary>
+ /// <param name="heap">The heap to alloc buffers from</param>
+ public FBMListener(IUnmangedHeap heap)
+ {
+ Heap = heap;
+ }
+
+ /// <summary>
+ /// Begins listening for requests on the current websocket until
+ /// a close message is received or an error occurs
+ /// </summary>
+ /// <param name="wss">The <see cref="WebSocketSession"/> to receive messages on</param>
+ /// <param name="handler">The callback method to handle incoming requests</param>
+ /// <param name="args">The arguments used to configured this listening session</param>
+ /// <param name="userState">A state parameter</param>
+ /// <returns>A <see cref="Task"/> that completes when the connection closes</returns>
+ public async Task ListenAsync(WebSocketSession wss, RequestHandler handler, FBMListenerSessionParams args, object? userState)
+ {
+ ListeningSession session = new(wss, handler, args, userState);
+ //Alloc a recieve buffer
+ using IMemoryOwner<byte> recvBuffer = Heap.DirectAlloc<byte>(args.RecvBufferSize);
+
+ //Init new queue for dispatching work
+ AsyncQueue<VnMemoryStream> workQueue = new(true, true);
+
+ //Start a task to process the queue
+ Task queueWorker = QueueWorkerDoWork(workQueue, session);
+
+ try
+ {
+ //Listen for incoming messages
+ while (true)
+ {
+ //Receive a message
+ ValueWebSocketReceiveResult result = await wss.ReceiveAsync(recvBuffer.Memory);
+ //If a close message has been received, we can gracefully exit
+ if (result.MessageType == WebSocketMessageType.Close)
+ {
+ //Return close message
+ await wss.CloseSocketAsync(WebSocketCloseStatus.NormalClosure, "Goodbye");
+ //break listen loop
+ break;
+ }
+ //create buffer for storing data
+ VnMemoryStream request = new(Heap);
+ //Copy initial data
+ request.Write(recvBuffer.Memory.Span[..result.Count]);
+ //Streaming read
+ while (!result.EndOfMessage)
+ {
+ //Read more data
+ result = await wss.ReceiveAsync(recvBuffer.Memory);
+ //Make sure the request is small enough to buffer
+ if (request.Length + result.Count > args.MaxMessageSize)
+ {
+ //dispose the buffer
+ request.Dispose();
+ //close the socket with a message too big
+ await wss.CloseSocketAsync(WebSocketCloseStatus.MessageTooBig, "Buffer space exceeded for message. Goodbye");
+ //break listen loop
+ goto Exit;
+ }
+ //write to buffer
+ request.Write(recvBuffer.Memory.Span[..result.Count]);
+ }
+ //Make sure data is available
+ if (request.Length == 0)
+ {
+ request.Dispose();
+ continue;
+ }
+ //reset buffer position
+ _ = request.Seek(0, SeekOrigin.Begin);
+ //Enqueue the request
+ await workQueue.EnqueueAsync(request);
+ }
+
+ Exit:
+ ;
+ }
+ finally
+ {
+ session.CancelSession();
+ await queueWorker.ConfigureAwait(false);
+ }
+ }
+
+ private async Task QueueWorkerDoWork(AsyncQueue<VnMemoryStream> queue, ListeningSession session)
+ {
+ try
+ {
+ while (true)
+ {
+ //Get work from queue
+ VnMemoryStream request = await queue.DequeueAsync(session.CancellationToken);
+ //Process request without waiting
+ _ = ProcessAsync(request, session).ConfigureAwait(false);
+ }
+ }
+ catch (OperationCanceledException)
+ { }
+ finally
+ {
+ //Cleanup any queued requests
+ while (queue.TryDequeue(out VnMemoryStream? stream))
+ {
+ stream.Dispose();
+ }
+ }
+ }
+
+ private async Task ProcessAsync(VnMemoryStream data, ListeningSession session)
+ {
+ //Rent a new request object
+ FBMContext context = session.RentContext();
+ try
+ {
+ //Prepare the request/response
+ context.Prepare(data, session.Socket.SocketID);
+
+ if ((context.Request.ParseStatus & HeaderParseError.InvalidId) > 0)
+ {
+ OnProcessError?.Invoke(this, new FBMException($"Invalid messageid {context.Request.MessageId}, message length {data.Length}"));
+ return;
+ }
+
+ //Check parse status flags
+ if ((context.Request.ParseStatus & HeaderParseError.HeaderOutOfMem) > 0)
+ {
+ OnProcessError?.Invoke(this, new FBMException("Packet received with not enough space to store headers"));
+ }
+ //Determine if request is an out-of-band message
+ else if (context.Request.MessageId == Helpers.CONTROL_FRAME_MID)
+ {
+ //Process control frame
+ await ProcessOOBAsync(context);
+ }
+ else
+ {
+ //Invoke normal message handler
+ await session.OnRecieved.Invoke(context, session.UserState, session.CancellationToken);
+ }
+
+ //Get response data
+ await using IAsyncMessageReader messageEnumerator = await context.Response.GetResponseDataAsync(session.CancellationToken);
+
+ //Load inital segment
+ if (await messageEnumerator.MoveNextAsync() && !session.CancellationToken.IsCancellationRequested)
+ {
+ ValueTask sendTask;
+
+ //Syncrhonize access to send data because we may need to stream data to the client
+ await session.ResponseLock.WaitAsync(SEND_SEMAPHORE_TIMEOUT_MS);
+ try
+ {
+ do
+ {
+ bool eof = !messageEnumerator.DataRemaining;
+
+ //Send first segment
+ sendTask = session.Socket.SendAsync(messageEnumerator.Current, WebSocketMessageType.Binary, eof);
+
+ /*
+ * WARNING!
+ * this code relies on the managed websocket impl that the websocket will read
+ * the entire buffer before returning. If this is not the case, this code will
+ * overwrite the memory buffer on the next call to move next.
+ */
+
+ //Move to next segment
+ if (!await messageEnumerator.MoveNextAsync())
+ {
+ break;
+ }
+
+ //Await previous send
+ await sendTask;
+
+ } while (true);
+ }
+ finally
+ {
+ //release semaphore
+ session.ResponseLock.Release();
+ }
+
+ await sendTask;
+ }
+
+ //No data to send
+ }
+ catch (Exception ex)
+ {
+ OnProcessError?.Invoke(this, ex);
+ }
+ finally
+ {
+ session.ReturnContext(context);
+ }
+ }
+
+ /// <summary>
+ /// Processes an out-of-band request message (internal communications)
+ /// </summary>
+ /// <param name="outOfBandContext">The <see cref="FBMContext"/> containing the OOB message</param>
+ /// <returns>A <see cref="Task"/> that completes when the operation completes</returns>
+ protected virtual Task ProcessOOBAsync(FBMContext outOfBandContext)
+ {
+ return Task.CompletedTask;
+ }
+ }
+}
diff --git a/lib/Net.Messaging.FBM/src/Server/FBMListenerBase.cs b/lib/Net.Messaging.FBM/src/Server/FBMListenerBase.cs
new file mode 100644
index 0000000..3e9fde2
--- /dev/null
+++ b/lib/Net.Messaging.FBM/src/Server/FBMListenerBase.cs
@@ -0,0 +1,113 @@
+/*
+* Copyright (c) 2022 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Net.Messaging.FBM
+* File: FBMListenerBase.cs
+*
+* FBMListenerBase.cs is part of VNLib.Net.Messaging.FBM which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+
+using VNLib.Utils.Logging;
+using VNLib.Utils.Memory;
+using VNLib.Plugins.Essentials;
+
+namespace VNLib.Net.Messaging.FBM.Server
+{
+ /// <summary>
+ /// Provides a simple base class for an <see cref="FBMListener"/>
+ /// processor
+ /// </summary>
+ public abstract class FBMListenerBase
+ {
+
+ /// <summary>
+ /// The initialzied listener
+ /// </summary>
+ protected FBMListener? Listener { get; private set; }
+ /// <summary>
+ /// A provider to write log information to
+ /// </summary>
+ protected abstract ILogProvider Log { get; }
+
+ /// <summary>
+ /// Initializes the <see cref="FBMListener"/>
+ /// </summary>
+ /// <param name="heap">The heap to alloc buffers from</param>
+ protected void InitListener(IUnmangedHeap heap)
+ {
+ Listener = new(heap);
+ //Attach service handler
+ Listener.OnProcessError += Listener_OnProcessError;
+ }
+
+ /// <summary>
+ /// A single event service routine for servicing errors that occur within
+ /// the listener loop
+ /// </summary>
+ /// <param name="sender"></param>
+ /// <param name="e">The exception that was raised</param>
+ protected virtual void Listener_OnProcessError(object? sender, Exception e)
+ {
+ //Write the error to the log
+ Log.Error(e);
+ }
+
+ private async Task OnReceivedAsync(FBMContext context, object? userState, CancellationToken token)
+ {
+ try
+ {
+ await ProcessAsync(context, userState, token);
+ }
+ catch (OperationCanceledException)
+ {
+ Log.Debug("Async operation cancelled");
+ }
+ catch(Exception ex)
+ {
+ Log.Error(ex);
+ }
+ }
+
+ /// <summary>
+ /// Begins listening for requests on the current websocket until
+ /// a close message is received or an error occurs
+ /// </summary>
+ /// <param name="wss">The <see cref="WebSocketSession"/> to receive messages on</param>
+ /// <param name="args">The arguments used to configured this listening session</param>
+ /// <param name="userState">A state token to use for processing events for this connection</param>
+ /// <returns>A <see cref="Task"/> that completes when the connection closes</returns>
+ public virtual async Task ListenAsync(WebSocketSession wss, FBMListenerSessionParams args, object? userState)
+ {
+ _ = Listener ?? throw new InvalidOperationException("The listener has not been intialized");
+ await Listener.ListenAsync(wss, OnReceivedAsync, args, userState);
+ }
+
+ /// <summary>
+ /// A method to service an incoming message
+ /// </summary>
+ /// <param name="context">The context containing the message to be serviced</param>
+ /// <param name="userState">A state token passed on client connected</param>
+ /// <param name="exitToken">A token that reflects the state of the listener</param>
+ /// <returns>A task that completes when the message has been serviced</returns>
+ protected abstract Task ProcessAsync(FBMContext context, object? userState, CancellationToken exitToken);
+ }
+}
diff --git a/lib/Net.Messaging.FBM/src/Server/FBMListenerSessionParams.cs b/lib/Net.Messaging.FBM/src/Server/FBMListenerSessionParams.cs
new file mode 100644
index 0000000..c327475
--- /dev/null
+++ b/lib/Net.Messaging.FBM/src/Server/FBMListenerSessionParams.cs
@@ -0,0 +1,62 @@
+/*
+* Copyright (c) 2022 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Net.Messaging.FBM
+* File: FBMListenerSessionParams.cs
+*
+* FBMListenerSessionParams.cs is part of VNLib.Net.Messaging.FBM which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System.Text;
+
+namespace VNLib.Net.Messaging.FBM.Server
+{
+ /// <summary>
+ /// Represents a configuration structure for an <see cref="FBMListener"/>
+ /// listening session
+ /// </summary>
+ public readonly struct FBMListenerSessionParams
+ {
+ /// <summary>
+ /// The size of the buffer to use while reading data from the websocket
+ /// in the listener loop
+ /// </summary>
+ public readonly int RecvBufferSize { get; init; }
+ /// <summary>
+ /// The size of the character buffer to store FBMheader values in
+ /// the <see cref="FBMRequestMessage"/>
+ /// </summary>
+ public readonly int MaxHeaderBufferSize { get; init; }
+ /// <summary>
+ /// The size of the internal message response buffer when
+ /// not streaming
+ /// </summary>
+ public readonly int ResponseBufferSize { get; init; }
+ /// <summary>
+ /// The FMB message header character encoding
+ /// </summary>
+ public readonly Encoding HeaderEncoding { get; init; }
+
+ /// <summary>
+ /// The absolute maxium size (in bytes) message to process before
+ /// closing the websocket connection. This value should be negotiaed
+ /// by clients or hard-coded to avoid connection issues
+ /// </summary>
+ public readonly int MaxMessageSize { get; init; }
+ }
+}
diff --git a/lib/Net.Messaging.FBM/src/Server/FBMRequestMessage.cs b/lib/Net.Messaging.FBM/src/Server/FBMRequestMessage.cs
new file mode 100644
index 0000000..ed36571
--- /dev/null
+++ b/lib/Net.Messaging.FBM/src/Server/FBMRequestMessage.cs
@@ -0,0 +1,196 @@
+/*
+* Copyright (c) 2022 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Net.Messaging.FBM
+* File: FBMRequestMessage.cs
+*
+* FBMRequestMessage.cs is part of VNLib.Net.Messaging.FBM which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+using System.Text;
+using System.Buffers;
+using System.Text.Json;
+using System.Collections.Generic;
+
+using VNLib.Utils;
+using VNLib.Utils.IO;
+using VNLib.Utils.Memory;
+using VNLib.Utils.Extensions;
+using VNLib.Utils.Memory.Caching;
+
+namespace VNLib.Net.Messaging.FBM.Server
+{
+ /// <summary>
+ /// Represents a client request message to be serviced
+ /// </summary>
+ public sealed class FBMRequestMessage : IReusable
+ {
+ private readonly List<KeyValuePair<HeaderCommand, ReadOnlyMemory<char>>> _headers;
+ private readonly int HeaderCharBufferSize;
+ /// <summary>
+ /// Creates a new resusable <see cref="FBMRequestMessage"/>
+ /// </summary>
+ /// <param name="headerBufferSize">The size of the buffer to alloc during initialization</param>
+ internal FBMRequestMessage(int headerBufferSize)
+ {
+ HeaderCharBufferSize = headerBufferSize;
+ _headers = new();
+ }
+
+ private char[]? _headerBuffer;
+
+ /// <summary>
+ /// The ID of the current message
+ /// </summary>
+ public int MessageId { get; private set; }
+ /// <summary>
+ /// Gets the underlying socket-id fot the current connection
+ /// </summary>
+ public string? ConnectionId { get; private set; }
+ /// <summary>
+ /// The raw request message, positioned to the body section of the message data
+ /// </summary>
+ public VnMemoryStream? RequestBody { get; private set; }
+ /// <summary>
+ /// A collection of headers for the current request
+ /// </summary>
+ public IReadOnlyList<KeyValuePair<HeaderCommand, ReadOnlyMemory<char>>> Headers => _headers;
+ /// <summary>
+ /// Status flags set during the message parsing
+ /// </summary>
+ public HeaderParseError ParseStatus { get; private set; }
+ /// <summary>
+ /// The message body data as a <see cref="ReadOnlySpan{T}"/>
+ /// </summary>
+ public ReadOnlySpan<byte> BodyData => Helpers.GetRemainingData(RequestBody!);
+
+ /// <summary>
+ /// Determines if the current message is considered a control frame
+ /// </summary>
+ public bool IsControlFrame { get; private set; }
+
+ /// <summary>
+ /// Prepares the request to be serviced
+ /// </summary>
+ /// <param name="vms">The request data packet</param>
+ /// <param name="socketId">The unique id of the connection</param>
+ /// <param name="dataEncoding">The data encoding used to decode header values</param>
+ internal void Prepare(VnMemoryStream vms, string socketId, Encoding dataEncoding)
+ {
+ //Store request body
+ RequestBody = vms;
+ //Store message id
+ MessageId = Helpers.GetMessageId(Helpers.ReadLine(vms));
+ //Check mid for control frame
+ if(MessageId == Helpers.CONTROL_FRAME_MID)
+ {
+ IsControlFrame = true;
+ }
+ else if (MessageId < 1)
+ {
+ ParseStatus |= HeaderParseError.InvalidId;
+ return;
+ }
+
+ ConnectionId = socketId;
+
+ //sliding window over remaining data from internal buffer
+ ForwardOnlyMemoryWriter<char> writer = new(_headerBuffer);
+
+ //Accumulate headers
+ while (true)
+ {
+ //Read the next line from the current stream
+ ReadOnlySpan<byte> line = Helpers.ReadLine(vms);
+ if (line.IsEmpty)
+ {
+ //Done reading headers
+ break;
+ }
+ HeaderCommand cmd = Helpers.GetHeaderCommand(line);
+ //Get header value
+ ERRNO charsRead = Helpers.GetHeaderValue(line, writer.Remaining.Span, dataEncoding);
+ if (charsRead < 0)
+ {
+ //Out of buffer space
+ ParseStatus |= HeaderParseError.HeaderOutOfMem;
+ break;
+ }
+ else if (!charsRead)
+ {
+ //Invalid header
+ ParseStatus |= HeaderParseError.InvalidHeaderRead;
+ }
+ else
+ {
+ //Store header as a read-only sequence
+ _headers.Add(new(cmd, writer.Remaining[..(int)charsRead]));
+ //Shift buffer window
+ writer.Advance(charsRead);
+ }
+ }
+ }
+
+ /// <summary>
+ /// Deserializes the request body into a new specified object type
+ /// </summary>
+ /// <typeparam name="T">The type of the object to deserialize</typeparam>
+ /// <param name="jso">The <see cref="JsonSerializerOptions"/> to use while deserializing data</param>
+ /// <returns>The deserialized object from the request body</returns>
+ /// <exception cref="JsonException"></exception>
+ public T? DeserializeBody<T>(JsonSerializerOptions? jso = default)
+ {
+ return BodyData.IsEmpty ? default : BodyData.AsJsonObject<T>(jso);
+ }
+ /// <summary>
+ /// Gets a <see cref="JsonDocument"/> of the request body
+ /// </summary>
+ /// <returns>The parsed <see cref="JsonDocument"/> if parsed successfully, or null otherwise</returns>
+ /// <exception cref="JsonException"></exception>
+ public JsonDocument? GetBodyAsJson()
+ {
+ Utf8JsonReader reader = new(BodyData);
+ return JsonDocument.TryParseValue(ref reader, out JsonDocument? jdoc) ? jdoc : default;
+ }
+
+ void IReusable.Prepare()
+ {
+ ParseStatus = HeaderParseError.None;
+ //Alloc header buffer
+ _headerBuffer = ArrayPool<char>.Shared.Rent(HeaderCharBufferSize);
+ }
+
+
+ bool IReusable.Release()
+ {
+ //Dispose the request message
+ RequestBody?.Dispose();
+ RequestBody = null;
+ //Clear headers before freeing buffer
+ _headers.Clear();
+ //Free header-buffer
+ ArrayPool<char>.Shared.Return(_headerBuffer!);
+ _headerBuffer = null;
+ ConnectionId = null;
+ MessageId = 0;
+ IsControlFrame = false;
+ return true;
+ }
+ }
+}
diff --git a/lib/Net.Messaging.FBM/src/Server/FBMResponseMessage.cs b/lib/Net.Messaging.FBM/src/Server/FBMResponseMessage.cs
new file mode 100644
index 0000000..ac34dda
--- /dev/null
+++ b/lib/Net.Messaging.FBM/src/Server/FBMResponseMessage.cs
@@ -0,0 +1,226 @@
+/*
+* Copyright (c) 2022 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Net.Messaging.FBM
+* File: FBMResponseMessage.cs
+*
+* FBMResponseMessage.cs is part of VNLib.Net.Messaging.FBM which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+
+using VNLib.Net.Http;
+using VNLib.Utils.IO;
+using VNLib.Utils.Extensions;
+using VNLib.Utils.Memory.Caching;
+using VNLib.Net.Messaging.FBM.Client;
+
+namespace VNLib.Net.Messaging.FBM.Server
+{
+
+ /// <summary>
+ /// Represents an FBM request response container.
+ /// </summary>
+ public sealed class FBMResponseMessage : IReusable, IFBMMessage
+ {
+ internal FBMResponseMessage(int internalBufferSize, Encoding headerEncoding)
+ {
+ _headerAccumulator = new HeaderDataAccumulator(internalBufferSize);
+ _headerEncoding = headerEncoding;
+ _messageEnumerator = new(this);
+ }
+
+ private readonly MessageSegmentEnumerator _messageEnumerator;
+ private readonly ISlindingWindowBuffer<byte> _headerAccumulator;
+ private readonly Encoding _headerEncoding;
+
+ private IAsyncMessageBody? _messageBody;
+
+ ///<inheritdoc/>
+ public int MessageId { get; private set; }
+
+ void IReusable.Prepare()
+ {
+ (_headerAccumulator as HeaderDataAccumulator)!.Prepare();
+ }
+
+ bool IReusable.Release()
+ {
+ //Release header accumulator
+ _headerAccumulator.Close();
+
+ _messageBody = null;
+
+ MessageId = 0;
+
+ return true;
+ }
+
+ /// <summary>
+ /// Initializes the response message with the specified message-id
+ /// to respond with
+ /// </summary>
+ /// <param name="messageId">The message id of the context to respond to</param>
+ internal void Prepare(int messageId)
+ {
+ //Reset accumulator when message id is written
+ _headerAccumulator.Reset();
+ //Write the messageid to the begining of the headers buffer
+ MessageId = messageId;
+ _headerAccumulator.Append((byte)HeaderCommand.MessageId);
+ _headerAccumulator.Append(messageId);
+ _headerAccumulator.WriteTermination();
+ }
+
+ ///<inheritdoc/>
+ public void WriteHeader(HeaderCommand header, ReadOnlySpan<char> value)
+ {
+ WriteHeader((byte)header, value);
+ }
+ ///<inheritdoc/>
+ public void WriteHeader(byte header, ReadOnlySpan<char> value)
+ {
+ _headerAccumulator.WriteHeader(header, value, _headerEncoding);
+ }
+
+ ///<inheritdoc/>
+ public void WriteBody(ReadOnlySpan<byte> body, ContentType contentType = ContentType.Binary)
+ {
+ //Append content type header
+ WriteHeader(HeaderCommand.ContentType, HttpHelpers.GetContentTypeString(contentType));
+ //end header segment
+ _headerAccumulator.WriteTermination();
+ //Write message body
+ _headerAccumulator.Append(body);
+ }
+
+ /// <summary>
+ /// Sets the response message body
+ /// </summary>
+ /// <param name="messageBody">The <see cref="IAsyncMessageBody"/> to stream data from</param>
+ /// <exception cref="InvalidOperationException"></exception>
+ public void AddMessageBody(IAsyncMessageBody messageBody)
+ {
+ if(_messageBody != null)
+ {
+ throw new InvalidOperationException("The message body is already set");
+ }
+
+ //Append message content type header
+ WriteHeader(HeaderCommand.ContentType, HttpHelpers.GetContentTypeString(messageBody.ContentType));
+
+ //end header segment
+ _headerAccumulator.WriteTermination();
+
+ //Store message body
+ _messageBody = messageBody;
+
+ }
+
+ /// <summary>
+ /// Gets the internal message body enumerator and prepares the message for sending
+ /// </summary>
+ /// <param name="cancellationToken">A cancellation token</param>
+ /// <returns>A value task that returns the message body enumerator</returns>
+ internal async ValueTask<IAsyncMessageReader> GetResponseDataAsync(CancellationToken cancellationToken)
+ {
+ //try to buffer as much data in the header segment first
+ if(_messageBody?.RemainingSize > 0 && _headerAccumulator.RemainingSize > 0)
+ {
+ //Read data from the message
+ int read = await _messageBody.ReadAsync(_headerAccumulator.RemainingBuffer, cancellationToken);
+ //Advance accumulator to the read bytes
+ _headerAccumulator.Advance(read);
+ }
+ //return reusable enumerator
+ return _messageEnumerator;
+ }
+
+ private sealed class MessageSegmentEnumerator : IAsyncMessageReader
+ {
+ private readonly FBMResponseMessage _message;
+
+ bool HeadersRead;
+
+ public MessageSegmentEnumerator(FBMResponseMessage message)
+ {
+ _message = message;
+ }
+
+ public ReadOnlyMemory<byte> Current { get; private set; }
+
+ public bool DataRemaining { get; private set; }
+
+ public async ValueTask<bool> MoveNextAsync()
+ {
+ //Attempt to read header segment first
+ if (!HeadersRead)
+ {
+ //Set the accumulated buffer
+ Current = _message._headerAccumulator.AccumulatedBuffer;
+
+ //Update data remaining flag
+ DataRemaining = _message._messageBody?.RemainingSize > 0;
+
+ //Set headers read flag
+ HeadersRead = true;
+
+ return true;
+ }
+ else if (_message._messageBody?.RemainingSize > 0)
+ {
+ //Use the header buffer as the buffer for the message body
+ Memory<byte> buffer = _message._headerAccumulator.Buffer;
+
+ //Read body segment
+ int read = await _message._messageBody.ReadAsync(buffer);
+
+ //Update data remaining flag
+ DataRemaining = _message._messageBody.RemainingSize > 0;
+
+ if (read > 0)
+ {
+ //Store the read segment
+ Current = buffer[..read];
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public async ValueTask DisposeAsync()
+ {
+ //Clear current segment
+ Current = default;
+
+ //Reset headers read flag
+ HeadersRead = false;
+
+ //Dispose the message body if set
+ if (_message._messageBody != null)
+ {
+ await _message._messageBody.DisposeAsync();
+ }
+ }
+ }
+ }
+
+}
diff --git a/lib/Net.Messaging.FBM/src/Server/HeaderDataAccumulator.cs b/lib/Net.Messaging.FBM/src/Server/HeaderDataAccumulator.cs
new file mode 100644
index 0000000..423a26e
--- /dev/null
+++ b/lib/Net.Messaging.FBM/src/Server/HeaderDataAccumulator.cs
@@ -0,0 +1,89 @@
+/*
+* Copyright (c) 2022 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Net.Messaging.FBM
+* File: HeaderDataAccumulator.cs
+*
+* HeaderDataAccumulator.cs is part of VNLib.Net.Messaging.FBM which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+using System.Buffers;
+
+using VNLib.Utils.IO;
+
+
+namespace VNLib.Net.Messaging.FBM.Server
+{
+ /// <summary>
+ /// Reusable sliding window impl
+ /// </summary>
+ internal sealed class HeaderDataAccumulator : ISlindingWindowBuffer<byte>
+ {
+ private readonly int BufferSize;
+
+ private byte[]? _memHandle;
+
+ public HeaderDataAccumulator(int bufferSize)
+ {
+ BufferSize = bufferSize;
+ }
+
+ ///<inheritdoc/>
+ public int WindowStartPos { get; private set; }
+ ///<inheritdoc/>
+ public int WindowEndPos { get; private set; }
+ ///<inheritdoc/>
+ public Memory<byte> Buffer => _memHandle.AsMemory();
+
+ ///<inheritdoc/>
+ public void Advance(int count) => WindowEndPos += count;
+
+ ///<inheritdoc/>
+ public void AdvanceStart(int count) => WindowEndPos += count;
+
+ ///<inheritdoc/>
+ public void Reset()
+ {
+ WindowStartPos = 0;
+ WindowEndPos = 0;
+ }
+
+ /// <summary>
+ /// Allocates the internal message buffer
+ /// </summary>
+ public void Prepare()
+ {
+ _memHandle ??= ArrayPool<byte>.Shared.Rent(BufferSize);
+ }
+
+ ///<inheritdoc/>
+ public void Close()
+ {
+ Reset();
+
+ if (_memHandle != null)
+ {
+ //Return the buffer to the pool
+ ArrayPool<byte>.Shared.Return(_memHandle);
+ _memHandle = null;
+ }
+ }
+ }
+
+}
diff --git a/lib/Net.Messaging.FBM/src/Server/IAsyncMessageBody.cs b/lib/Net.Messaging.FBM/src/Server/IAsyncMessageBody.cs
new file mode 100644
index 0000000..5566520
--- /dev/null
+++ b/lib/Net.Messaging.FBM/src/Server/IAsyncMessageBody.cs
@@ -0,0 +1,57 @@
+/*
+* Copyright (c) 2022 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Net.Messaging.FBM
+* File: IAsyncMessageBody.cs
+*
+* IAsyncMessageBody.cs is part of VNLib.Net.Messaging.FBM which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+
+using VNLib.Net.Http;
+
+namespace VNLib.Net.Messaging.FBM
+{
+ /// <summary>
+ /// A disposable message body container for asynchronously reading a variable length message body
+ /// </summary>
+ public interface IAsyncMessageBody : IAsyncDisposable
+ {
+ /// <summary>
+ /// The message body content type
+ /// </summary>
+ ContentType ContentType { get; }
+
+ /// <summary>
+ /// The number of bytes remaining to be read from the message body
+ /// </summary>
+ int RemainingSize { get; }
+
+ /// <summary>
+ /// Reads the next chunk of data from the message body
+ /// </summary>
+ /// <param name="buffer">The buffer to copy output data to</param>
+ /// <param name="token">A token to cancel the operation</param>
+ /// <returns></returns>
+ ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken token = default);
+ }
+
+}
diff --git a/lib/Net.Messaging.FBM/src/Server/IAsyncMessageReader.cs b/lib/Net.Messaging.FBM/src/Server/IAsyncMessageReader.cs
new file mode 100644
index 0000000..b2abe8d
--- /dev/null
+++ b/lib/Net.Messaging.FBM/src/Server/IAsyncMessageReader.cs
@@ -0,0 +1,42 @@
+/*
+* Copyright (c) 2022 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Net.Messaging.FBM
+* File: IAsyncMessageReader.cs
+*
+* IAsyncMessageReader.cs is part of VNLib.Net.Messaging.FBM which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+using System.Collections.Generic;
+
+
+namespace VNLib.Net.Messaging.FBM.Server
+{
+ /// <summary>
+ /// Internal message body reader/enumerator for FBM messages
+ /// </summary>
+ internal interface IAsyncMessageReader : IAsyncEnumerator<ReadOnlyMemory<byte>>
+ {
+ /// <summary>
+ /// A value that indicates if there is data remaining after a
+ /// </summary>
+ bool DataRemaining { get; }
+ }
+
+}
diff --git a/lib/Net.Messaging.FBM/src/Server/readme.md b/lib/Net.Messaging.FBM/src/Server/readme.md
new file mode 100644
index 0000000..489e58f
--- /dev/null
+++ b/lib/Net.Messaging.FBM/src/Server/readme.md
@@ -0,0 +1,35 @@
+# VNLib.Net.Messaging.FBM.Server
+
+Fixed Buffer Messaging Protocol server library. High performance statful messaging
+protocol built on top of HTTP web-sockets. Low/no allocation, completely asynchronous
+while providing a TPL API. This library provides a simple asynchronous request/response
+architecture to web-sockets. This was initially designed to provide an alternative to
+complete HTTP request/response overhead, but allow a simple control flow for work
+across a network.
+
+Messages consist of a 4 byte message id, a collection of headers, and a message body.
+The first 4 bytes of a message is the ID (for normal messages a signed integer greater than 0),
+0 is reserved for error conditions, and negative numbers are reserved for internal
+messages. Headers are identified by a single byte, followed by a variable length UTF8
+encoded character sequence, followed by a termination of 0xFF, 0xF1 (may change).
+
+### Message structure
+ 4 byte positive (signed 32-bit integer) message id
+ 2 byte termination
+ 1 byte header-id
+ variable length UTF8 value
+ 2 byte termination
+ -- other headers --
+ 2 byte termination (extra termination, ie: empty header)
+ variable length payload
+ (end of message is the end of the payload)
+
+
+XML Documentation is or will be provided for almost all public exports. APIs are intended to
+be sensibly public and immutable to allow for easy extensability (via extension methods). I
+often use extension libraries to provide additional functionality. (See cache library)
+
+This library is likely a niche use case, and is probably not for everyone. Unless you care
+about reasonably efficient high frequency request/response messaging, this probably isnt
+for you. This library provides a reasonable building block for distributed lock mechanisms
+and small data caching. \ No newline at end of file