aboutsummaryrefslogtreecommitdiff
path: root/lib/Net.Messaging.FBM/src
diff options
context:
space:
mode:
authorLibravatar vnugent <public@vaughnnugent.com>2023-01-08 16:01:54 -0500
committerLibravatar vnugent <public@vaughnnugent.com>2023-01-08 16:01:54 -0500
commitde94d788e9a47432a7630a8215896b8dd3628599 (patch)
tree666dec06eef861d101cb6948aff52a3d354c8d73 /lib/Net.Messaging.FBM/src
parentbe6dc557a3b819248b014992eb96c1cb21f8112b (diff)
Reorder + analyzer cleanup
Diffstat (limited to 'lib/Net.Messaging.FBM/src')
-rw-r--r--lib/Net.Messaging.FBM/src/Client/ClientExtensions.cs69
-rw-r--r--lib/Net.Messaging.FBM/src/Client/FBMClient.cs475
-rw-r--r--lib/Net.Messaging.FBM/src/Client/FBMClientConfig.cs81
-rw-r--r--lib/Net.Messaging.FBM/src/Client/FBMClientWorkerBase.cs125
-rw-r--r--lib/Net.Messaging.FBM/src/Client/FBMRequest.cs302
-rw-r--r--lib/Net.Messaging.FBM/src/Client/FBMResponse.cs106
-rw-r--r--lib/Net.Messaging.FBM/src/Client/FMBClientErrorEventArgs.cs46
-rw-r--r--lib/Net.Messaging.FBM/src/Client/HeaderCommand.cs57
-rw-r--r--lib/Net.Messaging.FBM/src/Client/HeaderParseStatus.cs40
-rw-r--r--lib/Net.Messaging.FBM/src/Client/Helpers.cs272
-rw-r--r--lib/Net.Messaging.FBM/src/Client/IFBMMessage.cs62
-rw-r--r--lib/Net.Messaging.FBM/src/Client/IStatefulConnection.cs54
-rw-r--r--lib/Net.Messaging.FBM/src/Client/ManagedClientWebSocket.cs201
-rw-r--r--lib/Net.Messaging.FBM/src/Client/README.md169
-rw-r--r--lib/Net.Messaging.FBM/src/Exceptions/FBMException.cs52
-rw-r--r--lib/Net.Messaging.FBM/src/Exceptions/FBMInvalidRequestException.cs47
-rw-r--r--lib/Net.Messaging.FBM/src/Exceptions/InvalidResponseException.cs52
-rw-r--r--lib/Net.Messaging.FBM/src/Server/FBMContext.cs85
-rw-r--r--lib/Net.Messaging.FBM/src/Server/FBMListener.cs388
-rw-r--r--lib/Net.Messaging.FBM/src/Server/FBMListenerBase.cs113
-rw-r--r--lib/Net.Messaging.FBM/src/Server/FBMListenerSessionParams.cs62
-rw-r--r--lib/Net.Messaging.FBM/src/Server/FBMRequestMessage.cs196
-rw-r--r--lib/Net.Messaging.FBM/src/Server/FBMResponseMessage.cs226
-rw-r--r--lib/Net.Messaging.FBM/src/Server/HeaderDataAccumulator.cs89
-rw-r--r--lib/Net.Messaging.FBM/src/Server/IAsyncMessageBody.cs57
-rw-r--r--lib/Net.Messaging.FBM/src/Server/IAsyncMessageReader.cs42
-rw-r--r--lib/Net.Messaging.FBM/src/Server/readme.md35
-rw-r--r--lib/Net.Messaging.FBM/src/VNLib.Net.Messaging.FBM.csproj33
28 files changed, 3536 insertions, 0 deletions
diff --git a/lib/Net.Messaging.FBM/src/Client/ClientExtensions.cs b/lib/Net.Messaging.FBM/src/Client/ClientExtensions.cs
new file mode 100644
index 0000000..102b6c9
--- /dev/null
+++ b/lib/Net.Messaging.FBM/src/Client/ClientExtensions.cs
@@ -0,0 +1,69 @@
+/*
+* Copyright (c) 2022 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Net.Messaging.FBM
+* File: ClientExtensions.cs
+*
+* ClientExtensions.cs is part of VNLib.Net.Messaging.FBM which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+using System.Runtime.CompilerServices;
+
+namespace VNLib.Net.Messaging.FBM.Client
+{
+
+ public static class ClientExtensions
+ {
+ /// <summary>
+ /// Writes the location header of the requested resource
+ /// </summary>
+ /// <param name="request"></param>
+ /// <param name="location">The location address</param>
+ /// <exception cref="OutOfMemoryException"></exception>
+ public static void WriteLocation(this FBMRequest request, ReadOnlySpan<char> location)
+ {
+ request.WriteHeader(HeaderCommand.Location, location);
+ }
+
+ /// <summary>
+ /// Writes the location header of the requested resource
+ /// </summary>
+ /// <param name="request"></param>
+ /// <param name="location">The location address</param>
+ /// <exception cref="OutOfMemoryException"></exception>
+ public static void WriteLocation(this FBMRequest request, Uri location)
+ {
+ request.WriteHeader(HeaderCommand.Location, location.ToString());
+ }
+
+ /// <summary>
+ /// If the <see cref="FBMResponse.IsSet"/> property is false, raises an <see cref="InvalidResponseException"/>
+ /// </summary>
+ /// <param name="response"></param>
+ /// <exception cref="InvalidResponseException"></exception>
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public static void ThrowIfNotSet(this in FBMResponse response)
+ {
+ if (!response.IsSet)
+ {
+ throw new InvalidResponseException("The response state is undefined (no response received)");
+ }
+ }
+ }
+}
diff --git a/lib/Net.Messaging.FBM/src/Client/FBMClient.cs b/lib/Net.Messaging.FBM/src/Client/FBMClient.cs
new file mode 100644
index 0000000..db52c03
--- /dev/null
+++ b/lib/Net.Messaging.FBM/src/Client/FBMClient.cs
@@ -0,0 +1,475 @@
+/*
+* Copyright (c) 2022 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Net.Messaging.FBM
+* File: FBMClient.cs
+*
+* FBMClient.cs is part of VNLib.Net.Messaging.FBM which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+using System.IO;
+using System.Buffers;
+using System.Threading;
+using System.Net.WebSockets;
+using System.Threading.Tasks;
+using System.Collections.Generic;
+using System.Collections.Concurrent;
+
+using VNLib.Net.Http;
+using VNLib.Utils;
+using VNLib.Utils.IO;
+using VNLib.Utils.Logging;
+using VNLib.Utils.Extensions;
+using VNLib.Utils.Memory.Caching;
+
+namespace VNLib.Net.Messaging.FBM.Client
+{
+ /// <summary>
+ /// A Fixed Buffer Message Protocol client. Allows for high performance client-server messaging
+ /// with minimal memory overhead.
+ /// </summary>
+ public class FBMClient : VnDisposeable, IStatefulConnection, ICacheHolder
+ {
+ /// <summary>
+ /// The WS connection query arguments to specify a receive buffer size
+ /// </summary>
+ public const string REQ_RECV_BUF_QUERY_ARG = "b";
+ /// <summary>
+ /// The WS connection query argument to suggest a maximum response header buffer size
+ /// </summary>
+ public const string REQ_HEAD_BUF_QUERY_ARG = "hb";
+ /// <summary>
+ /// The WS connection query argument to suggest a maximum message size
+ /// </summary>
+ public const string REQ_MAX_MESS_QUERY_ARG = "mx";
+
+ /// <summary>
+ /// Raised when the websocket has been closed because an error occured.
+ /// You may inspect the event args to determine the cause of the error.
+ /// </summary>
+ public event EventHandler<FMBClientErrorEventArgs>? ConnectionClosedOnError;
+ /// <summary>
+ /// Raised when the client listener operaiton has completed as a normal closure
+ /// </summary>
+ public event EventHandler? ConnectionClosed;
+
+ private readonly SemaphoreSlim SendLock;
+ private readonly ConcurrentDictionary<int, FBMRequest> ActiveRequests;
+ private readonly ReusableStore<FBMRequest> RequestRental;
+ private readonly FBMRequest _controlFrame;
+ /// <summary>
+ /// The configuration for the current client
+ /// </summary>
+ public FBMClientConfig Config { get; }
+ /// <summary>
+ /// A handle that is reset when a connection has been successfully set, and is set
+ /// when the connection exists
+ /// </summary>
+ public ManualResetEvent ConnectionStatusHandle { get; }
+ /// <summary>
+ /// The <see cref="ClientWebSocket"/> to send/recieve message on
+ /// </summary>
+ public ManagedClientWebSocket ClientSocket { get; }
+ /// <summary>
+ /// Gets the shared control frame for the current instance. The request is reset when
+ /// this property is called. (Not thread safe)
+ /// </summary>
+ protected FBMRequest ControlFrame
+ {
+ get
+ {
+ _controlFrame.Reset();
+ return _controlFrame;
+ }
+ }
+
+ /// <summary>
+ /// Creates a new <see cref="FBMClient"/> in a closed state
+ /// </summary>
+ /// <param name="config">The client configuration</param>
+ public FBMClient(FBMClientConfig config)
+ {
+ RequestRental = ObjectRental.CreateReusable(ReuseableRequestConstructor);
+ SendLock = new(1);
+ ConnectionStatusHandle = new(true);
+ ActiveRequests = new(Environment.ProcessorCount, 100);
+
+ Config = config;
+ //Init control frame
+ _controlFrame = new (Helpers.CONTROL_FRAME_MID, in config);
+ //Init the new client socket
+ ClientSocket = new(config.RecvBufferSize, config.RecvBufferSize, config.KeepAliveInterval, config.SubProtocol);
+ }
+
+ private void Debug(string format, params string[] strings)
+ {
+ if(Config.DebugLog != null)
+ {
+ Config.DebugLog.Debug($"[DEBUG] FBM Client: {format}", strings);
+ }
+ }
+ private void Debug(string format, long value, long other)
+ {
+ if (Config.DebugLog != null)
+ {
+ Config.DebugLog.Debug($"[DEBUG] FBM Client: {format}", value, other);
+ }
+ }
+
+ /// <summary>
+ /// Allocates and configures a new <see cref="FBMRequest"/> message object for use within the reusable store
+ /// </summary>
+ /// <returns>The configured <see cref="FBMRequest"/></returns>
+ protected virtual FBMRequest ReuseableRequestConstructor() => new(Config);
+
+ /// <summary>
+ /// Asynchronously opens a websocket connection with the specifed remote server
+ /// </summary>
+ /// <param name="serverUri">The address of the server to connect to</param>
+ /// <param name="cancellationToken">A cancellation token</param>
+ /// <returns></returns>
+ public async Task ConnectAsync(Uri serverUri, CancellationToken cancellationToken = default)
+ {
+ //Uribuilder to send config parameters to the server
+ UriBuilder urib = new(serverUri);
+ urib.Query +=
+ $"{REQ_RECV_BUF_QUERY_ARG}={Config.RecvBufferSize}" +
+ $"&{REQ_HEAD_BUF_QUERY_ARG}={Config.MaxHeaderBufferSize}" +
+ $"&{REQ_MAX_MESS_QUERY_ARG}={Config.MaxMessageSize}";
+ Debug("Connection string {con}", urib.Uri.ToString());
+ //Connect to server
+ await ClientSocket.ConnectAsync(urib.Uri, cancellationToken);
+ //Reset wait handle before return
+ ConnectionStatusHandle.Reset();
+ //Begin listeing for requets in a background task
+ _ = Task.Run(ProcessContinuousRecvAsync, cancellationToken);
+ }
+
+ /// <summary>
+ /// Rents a new <see cref="FBMRequest"/> from the internal <see cref="ReusableStore{T}"/>.
+ /// Use <see cref="ReturnRequest(FBMRequest)"/> when request is no longer in use
+ /// </summary>
+ /// <returns>The configured (rented or new) <see cref="FBMRequest"/> ready for use</returns>
+ public FBMRequest RentRequest() => RequestRental.Rent();
+ /// <summary>
+ /// Stores (or returns) the reusable request in cache for use with <see cref="RentRequest"/>
+ /// </summary>
+ /// <param name="request">The request to return to the store</param>
+ /// <exception cref="InvalidOperationException"></exception>
+ public void ReturnRequest(FBMRequest request) => RequestRental.Return(request);
+
+ /// <summary>
+ /// Sends a <see cref="FBMRequest"/> to the connected server
+ /// </summary>
+ /// <param name="request">The request message to send to the server</param>
+ /// <param name="cancellationToken"></param>
+ /// <returns>When awaited, yields the server response</returns>
+ /// <exception cref="ArgumentException"></exception>
+ /// <exception cref="ObjectDisposedException"></exception>
+ /// <exception cref="InvalidOperationException"></exception>
+ /// <exception cref="FBMInvalidRequestException"></exception>
+ public async Task<FBMResponse> SendAsync(FBMRequest request, CancellationToken cancellationToken = default)
+ {
+ Check();
+ //Length of the request must contains at least 1 int and header byte
+ if (request.Length < 1 + sizeof(int))
+ {
+ throw new FBMInvalidRequestException("Message is not initialized");
+ }
+ //Store a null value in the request queue so the response can store a buffer
+ if (!ActiveRequests.TryAdd(request.MessageId, request))
+ {
+ throw new ArgumentException("Message with the same ID is already being processed");
+ }
+ try
+ {
+ Debug("Sending {bytes} with id {id}", request.RequestData.Length, request.MessageId);
+
+ //Reset the wait handle
+ request.ResponseWaitEvent.Reset();
+
+ //Wait for send-lock
+ using (SemSlimReleaser releaser = await SendLock.GetReleaserAsync(cancellationToken))
+ {
+ //Send the data to the server
+ await ClientSocket.SendAsync(request.RequestData, WebSocketMessageType.Binary, true, cancellationToken);
+ }
+
+ //wait for the response to be set
+ await request.WaitForResponseAsync(cancellationToken);
+
+ Debug("Received {size} bytes for message {id}", request.Response?.Length ?? 0, request.MessageId);
+
+ return request.GetResponse();
+ }
+ catch
+ {
+ //Remove the request since packet was never sent
+ ActiveRequests.Remove(request.MessageId, out _);
+ //Clear waiting flag
+ request.ResponseWaitEvent.Set();
+ throw;
+ }
+ }
+ /// <summary>
+ /// Streams arbitrary binary data to the server with the initial request message
+ /// </summary>
+ /// <param name="request">The request message to send to the server</param>
+ /// <param name="payload">Data to stream to the server</param>
+ /// <param name="ct">The content type of the stream of data</param>
+ /// <param name="cancellationToken"></param>
+ /// <returns>When awaited, yields the server response</returns>
+ /// <exception cref="ArgumentException"></exception>
+ /// <exception cref="ObjectDisposedException"></exception>
+ /// <exception cref="InvalidOperationException"></exception>
+ public async Task StreamDataAsync(FBMRequest request, Stream payload, ContentType ct, CancellationToken cancellationToken = default)
+ {
+ Check();
+ //Length of the request must contains at least 1 int and header byte
+ if(request.Length < 1 + sizeof(int))
+ {
+ throw new FBMInvalidRequestException("Message is not initialized");
+ }
+ //Store a null value in the request queue so the response can store a buffer
+ if (!ActiveRequests.TryAdd(request.MessageId, request))
+ {
+ throw new ArgumentException("Message with the same ID is already being processed");
+ }
+ try
+ {
+ Debug("Streaming {bytes} with id {id}", request.RequestData.Length, request.MessageId);
+ //Reset the wait handle
+ request.ResponseWaitEvent.Reset();
+ //Write an empty body in the request
+ request.WriteBody(ReadOnlySpan<byte>.Empty, ct);
+ //Wait for send-lock
+ using (SemSlimReleaser releaser = await SendLock.GetReleaserAsync(cancellationToken))
+ {
+ //Send the initial request packet
+ await ClientSocket.SendAsync(request.RequestData, WebSocketMessageType.Binary, false, cancellationToken);
+ //Calc buffer size
+ int bufSize = (int)Math.Clamp(payload.Length, Config.MessageBufferSize, Config.MaxMessageSize);
+ //Alloc a streaming buffer
+ using IMemoryOwner<byte> buffer = Config.BufferHeap.DirectAlloc<byte>(bufSize);
+ //Stream mesage body
+ do
+ {
+ //Read data
+ int read = await payload.ReadAsync(buffer.Memory, cancellationToken);
+ if (read == 0)
+ {
+ //No more data avialable
+ break;
+ }
+ //write message to socket, if the read data was smaller than the buffer, we can send the last packet
+ await ClientSocket.SendAsync(buffer.Memory[..read], WebSocketMessageType.Binary, read < bufSize, cancellationToken);
+
+ } while (true);
+ }
+ //wait for the server to respond
+ await request.WaitForResponseAsync(cancellationToken);
+
+ Debug("Response recieved {size} bytes for message {id}", request.Response?.Length ?? 0, request.MessageId);
+ }
+ catch
+ {
+ //Remove the request since packet was never sent or cancelled
+ ActiveRequests.Remove(request.MessageId, out _);
+ //Clear wait lock so the request state is reset
+ request.ResponseWaitEvent.Set();
+ throw;
+ }
+ }
+
+ /// <summary>
+ /// Begins listening for messages from the server on the internal socket (must be connected),
+ /// until the socket is closed, or canceled
+ /// </summary>
+ /// <returns></returns>
+ protected async Task ProcessContinuousRecvAsync()
+ {
+ Debug("Begining receive loop");
+ //Alloc recv buffer
+ IMemoryOwner<byte> recvBuffer = Config.BufferHeap.DirectAlloc<byte>(Config.RecvBufferSize);
+ try
+ {
+ //Recv event loop
+ while (true)
+ {
+ //Listen for incoming packets with the intial data buffer
+ ValueWebSocketReceiveResult result = await ClientSocket.ReceiveAsync(recvBuffer.Memory, CancellationToken.None);
+ //If the message is a close message, its time to exit
+ if (result.MessageType == WebSocketMessageType.Close)
+ {
+ //Notify the event handler that the connection was closed
+ ConnectionClosed?.Invoke(this, EventArgs.Empty);
+ break;
+ }
+ if (result.Count <= 4)
+ {
+ Debug("Empty message recieved from server");
+ continue;
+ }
+ //Alloc data buffer and write initial data
+ VnMemoryStream responseBuffer = new(Config.BufferHeap);
+ //Copy initial data
+ responseBuffer.Write(recvBuffer.Memory.Span[..result.Count]);
+ //Receive packets until the EOF is reached
+ while (!result.EndOfMessage)
+ {
+ //recive more data
+ result = await ClientSocket.ReceiveAsync(recvBuffer.Memory, CancellationToken.None);
+ //Make sure the buffer is not too large
+ if ((responseBuffer.Length + result.Count) > Config.MaxMessageSize)
+ {
+ //Dispose the buffer before exiting
+ responseBuffer.Dispose();
+ Debug("Recieved a message that was too large, skipped");
+ goto Skip;
+ }
+ //Copy continuous data
+ responseBuffer.Write(recvBuffer.Memory.Span[..result.Count]);
+ }
+ //Reset the buffer stream position
+ _ = responseBuffer.Seek(0, SeekOrigin.Begin);
+ ProcessResponse(responseBuffer);
+ //Goto skip statment to cleanup resources
+ Skip:;
+ }
+ }
+ catch (OperationCanceledException)
+ {
+ //Normal closeure, do nothing
+ }
+ catch (Exception ex)
+ {
+ //Error event args
+ FMBClientErrorEventArgs wsEventArgs = new()
+ {
+ Cause = ex,
+ ErrorClient = this
+ };
+ //Invoke error handler
+ ConnectionClosedOnError?.Invoke(this, wsEventArgs);
+ }
+ finally
+ {
+ //Dispose the recv buffer
+ recvBuffer.Dispose();
+ //Set all pending events
+ foreach (FBMRequest request in ActiveRequests.Values)
+ {
+ request.ResponseWaitEvent.Set();
+ }
+ //Clear dict
+ ActiveRequests.Clear();
+ //Cleanup the socket when exiting
+ ClientSocket.Cleanup();
+ //Set status handle as unset
+ ConnectionStatusHandle.Set();
+ //Invoke connection closed
+ ConnectionClosed?.Invoke(this, EventArgs.Empty);
+ }
+ Debug("Receive loop exited");
+ }
+
+ /// <summary>
+ /// Syncrhonously processes a buffered response packet
+ /// </summary>
+ /// <param name="responseMessage">The buffered response body recieved from the server</param>
+ /// <remarks>This method blocks the listening task. So operations should be quick</remarks>
+ protected virtual void ProcessResponse(VnMemoryStream responseMessage)
+ {
+ //read first response line
+ ReadOnlySpan<byte> line = Helpers.ReadLine(responseMessage);
+ //get the id of the message
+ int messageId = Helpers.GetMessageId(line);
+ //Finalze control frame
+ if(messageId == Helpers.CONTROL_FRAME_MID)
+ {
+ Debug("Control frame received");
+ ProcessControlFrame(responseMessage);
+ return;
+ }
+ else if (messageId < 0)
+ {
+ //Cannot process request
+ responseMessage.Dispose();
+ Debug("Invalid messageid");
+ return;
+ }
+ //Search for the request that has the same id
+ if (ActiveRequests.TryRemove(messageId, out FBMRequest? request))
+ {
+ //Set the new response message
+ request.SetResponse(responseMessage);
+ }
+ else
+ {
+ Debug("Message {id} was not found in the waiting message queue", messageId, 0);
+
+ //Cleanup no request was waiting
+ responseMessage.Dispose();
+ }
+ }
+ /// <summary>
+ /// Processes a control frame response from the server
+ /// </summary>
+ /// <param name="vms">The raw response packet from the server</param>
+ private void ProcessControlFrame(VnMemoryStream vms)
+ {
+ vms.Dispose();
+ }
+ /// <summary>
+ /// Processes a control frame response from the server
+ /// </summary>
+ /// <param name="response">The parsed response-packet</param>
+ protected virtual void ProcessControlFrame(in FBMResponse response)
+ {
+
+ }
+
+ /// <summary>
+ /// Closes the underlying <see cref="WebSocket"/> and cancels all pending operations
+ /// </summary>
+ /// <param name="cancellationToken"></param>
+ /// <returns></returns>
+ /// <exception cref="ObjectDisposedException"></exception>
+ public async Task DisconnectAsync(CancellationToken cancellationToken = default)
+ {
+ Check();
+ //Close the connection
+ await ClientSocket.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, "Closing", cancellationToken);
+ }
+ ///<inheritdoc/>
+ protected override void Free()
+ {
+ //Dispose socket
+ ClientSocket.Dispose();
+ //Dispose client buffer
+ RequestRental.Dispose();
+ SendLock.Dispose();
+ ConnectionStatusHandle.Dispose();
+ }
+ ///<inheritdoc/>
+ public void CacheClear() => RequestRental.CacheClear();
+ ///<inheritdoc/>
+ public void CacheHardClear() => RequestRental.CacheHardClear();
+ }
+}
diff --git a/lib/Net.Messaging.FBM/src/Client/FBMClientConfig.cs b/lib/Net.Messaging.FBM/src/Client/FBMClientConfig.cs
new file mode 100644
index 0000000..229eb76
--- /dev/null
+++ b/lib/Net.Messaging.FBM/src/Client/FBMClientConfig.cs
@@ -0,0 +1,81 @@
+/*
+* Copyright (c) 2022 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Net.Messaging.FBM
+* File: FBMClientConfig.cs
+*
+* FBMClientConfig.cs is part of VNLib.Net.Messaging.FBM which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+using System.Text;
+
+using VNLib.Utils.Memory;
+using VNLib.Utils.Logging;
+
+namespace VNLib.Net.Messaging.FBM.Client
+{
+ /// <summary>
+ /// A structure that defines readonly constants for the <see cref="FBMClient"/> to use
+ /// </summary>
+ public readonly struct FBMClientConfig
+ {
+ /// <summary>
+ /// The size (in bytes) of the internal buffer to use when receiving messages from the server
+ /// </summary>
+ public readonly int RecvBufferSize { get; init; }
+ /// <summary>
+ /// The size (in bytes) of the <see cref="FBMRequest"/> internal buffer size, when requests are rented from the client
+ /// </summary>
+ /// <remarks>
+ /// This is the entire size of the request buffer including headers and payload data, unless
+ /// data is streamed to the server
+ /// </remarks>
+ public readonly int MessageBufferSize { get; init; }
+ /// <summary>
+ /// The size (in chars) of the client/server message header buffer
+ /// </summary>
+ public readonly int MaxHeaderBufferSize { get; init; }
+ /// <summary>
+ /// The maximum size (in bytes) of messages sent or recieved from the server
+ /// </summary>
+ public readonly int MaxMessageSize { get; init; }
+ /// <summary>
+ /// The heap to allocate interal (and message) buffers from
+ /// </summary>
+ public readonly IUnmangedHeap BufferHeap { get; init; }
+ /// <summary>
+ /// The websocket keepalive interval to use (leaving this property default disables keepalives)
+ /// </summary>
+ public readonly TimeSpan KeepAliveInterval { get; init; }
+ /// <summary>
+ /// The websocket sub-protocol to use
+ /// </summary>
+ public readonly string? SubProtocol { get; init; }
+ /// <summary>
+ /// The encoding instance used to encode header values
+ /// </summary>
+ public readonly Encoding HeaderEncoding { get; init; }
+
+ /// <summary>
+ /// An optional log provider to write debug logs to. If this propery is not null,
+ /// debugging information will be logged with the debug log-level
+ /// </summary>
+ public readonly ILogProvider? DebugLog { get; init; }
+ }
+}
diff --git a/lib/Net.Messaging.FBM/src/Client/FBMClientWorkerBase.cs b/lib/Net.Messaging.FBM/src/Client/FBMClientWorkerBase.cs
new file mode 100644
index 0000000..b4056dc
--- /dev/null
+++ b/lib/Net.Messaging.FBM/src/Client/FBMClientWorkerBase.cs
@@ -0,0 +1,125 @@
+/*
+* Copyright (c) 2022 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Net.Messaging.FBM
+* File: FBMClientWorkerBase.cs
+*
+* FBMClientWorkerBase.cs is part of VNLib.Net.Messaging.FBM which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+
+using VNLib.Utils;
+
+namespace VNLib.Net.Messaging.FBM.Client
+{
+ /// <summary>
+ /// A base class for objects that implement <see cref="FBMClient"/>
+ /// operations
+ /// </summary>
+ public abstract class FBMClientWorkerBase : VnDisposeable, IStatefulConnection
+ {
+ /// <summary>
+ /// Allows configuration of websocket configuration options
+ /// </summary>
+ public ManagedClientWebSocket SocketOptions => Client.ClientSocket;
+
+#nullable disable
+ /// <summary>
+ /// The <see cref="FBMClient"/> to sent requests from
+ /// </summary>
+ public FBMClient Client { get; private set; }
+
+ /// <summary>
+ /// Raised when the client has connected successfully
+ /// </summary>
+ public event Action<FBMClient, FBMClientWorkerBase> Connected;
+#nullable enable
+
+ ///<inheritdoc/>
+ public event EventHandler ConnectionClosed
+ {
+ add => Client.ConnectionClosed += value;
+ remove => Client.ConnectionClosed -= value;
+ }
+
+ /// <summary>
+ /// Creates and initializes a the internal <see cref="FBMClient"/>
+ /// </summary>
+ /// <param name="config">The client config</param>
+ protected void InitClient(in FBMClientConfig config)
+ {
+ Client = new(config);
+ Client.ConnectionClosedOnError += Client_ConnectionClosedOnError;
+ Client.ConnectionClosed += Client_ConnectionClosed;
+ }
+
+ private void Client_ConnectionClosed(object? sender, EventArgs e) => OnDisconnected();
+ private void Client_ConnectionClosedOnError(object? sender, FMBClientErrorEventArgs e) => OnError(e);
+
+ /// <summary>
+ /// Asynchronously connects to a remote server by the specified uri
+ /// </summary>
+ /// <param name="serverUri">The remote uri of a server to connect to</param>
+ /// <param name="cancellationToken">A token to cancel the connect operation</param>
+ /// <returns>A task that compeltes when the client has connected to the remote server</returns>
+ public virtual async Task ConnectAsync(Uri serverUri, CancellationToken cancellationToken = default)
+ {
+ //Connect to server
+ await Client.ConnectAsync(serverUri, cancellationToken).ConfigureAwait(true);
+ //Invoke child on-connected event
+ OnConnected();
+ Connected?.Invoke(Client, this);
+ }
+
+ /// <summary>
+ /// Asynchronously disonnects a client only if the client is currently connected,
+ /// returns otherwise
+ /// </summary>
+ /// <param name="cancellationToken"></param>
+ /// <returns>A task that compeltes when the client has disconnected</returns>
+ public virtual Task DisconnectAsync(CancellationToken cancellationToken = default)
+ {
+ return Client.DisconnectAsync(cancellationToken);
+ }
+
+ /// <summary>
+ /// Invoked when a client has successfully connected to the remote server
+ /// </summary>
+ protected abstract void OnConnected();
+ /// <summary>
+ /// Invoked when the client has disconnected cleanly
+ /// </summary>
+ protected abstract void OnDisconnected();
+ /// <summary>
+ /// Invoked when the connected client is closed because of a connection error
+ /// </summary>
+ /// <param name="e">A <see cref="EventArgs"/> that contains the client error data</param>
+ protected abstract void OnError(FMBClientErrorEventArgs e);
+
+ ///<inheritdoc/>
+ protected override void Free()
+ {
+ Client.ConnectionClosedOnError -= Client_ConnectionClosedOnError;
+ Client.ConnectionClosed -= Client_ConnectionClosed;
+ Client.Dispose();
+ }
+ }
+}
diff --git a/lib/Net.Messaging.FBM/src/Client/FBMRequest.cs b/lib/Net.Messaging.FBM/src/Client/FBMRequest.cs
new file mode 100644
index 0000000..f02724a
--- /dev/null
+++ b/lib/Net.Messaging.FBM/src/Client/FBMRequest.cs
@@ -0,0 +1,302 @@
+/*
+* Copyright (c) 2022 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Net.Messaging.FBM
+* File: FBMRequest.cs
+*
+* FBMRequest.cs is part of VNLib.Net.Messaging.FBM which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+using System.Text;
+using System.Buffers;
+using System.Threading;
+using System.Threading.Tasks;
+using System.Collections.Generic;
+
+using VNLib.Net.Http;
+using VNLib.Utils;
+using VNLib.Utils.IO;
+using VNLib.Utils.Memory;
+using VNLib.Utils.Extensions;
+using VNLib.Utils.Memory.Caching;
+
+namespace VNLib.Net.Messaging.FBM.Client
+{
+ /// <summary>
+ /// A reusable Fixed Buffer Message request container. This class is not thread-safe
+ /// </summary>
+ public sealed class FBMRequest : VnDisposeable, IReusable, IFBMMessage, IStringSerializeable
+ {
+ private sealed class BufferWriter : IBufferWriter<byte>
+ {
+ private readonly FBMRequest _request;
+
+ public BufferWriter(FBMRequest request)
+ {
+ _request = request;
+ }
+
+ public void Advance(int count)
+ {
+ _request.Position += count;
+ }
+
+ public Memory<byte> GetMemory(int sizeHint = 0)
+ {
+ return sizeHint > 0 ? _request.RemainingBuffer[0..sizeHint] : _request.RemainingBuffer;
+ }
+
+ public Span<byte> GetSpan(int sizeHint = 0)
+ {
+ return sizeHint > 0 ? _request.RemainingBuffer.Span[0..sizeHint] : _request.RemainingBuffer.Span;
+ }
+ }
+
+ private readonly IMemoryOwner<byte> HeapBuffer;
+
+
+ private readonly BufferWriter _writer;
+ private int Position;
+
+ private readonly Encoding HeaderEncoding;
+ private readonly int ResponseHeaderBufferSize;
+ private readonly List<KeyValuePair<HeaderCommand, ReadOnlyMemory<char>>> ResponseHeaderList = new();
+ private char[]? ResponseHeaderBuffer;
+
+ /// <summary>
+ /// The size (in bytes) of the request message
+ /// </summary>
+ public int Length => Position;
+ private Memory<byte> RemainingBuffer => HeapBuffer.Memory[Position..];
+
+ /// <summary>
+ /// The id of the current request message
+ /// </summary>
+ public int MessageId { get; }
+ /// <summary>
+ /// The request message packet
+ /// </summary>
+ public ReadOnlyMemory<byte> RequestData => HeapBuffer.Memory[..Position];
+ /// <summary>
+ /// An <see cref="ManualResetEvent"/> to signal request/response
+ /// event completion
+ /// </summary>
+ internal ManualResetEvent ResponseWaitEvent { get; }
+
+ internal VnMemoryStream? Response { get; private set; }
+ /// <summary>
+ /// Initializes a new <see cref="FBMRequest"/> with the sepcified message buffer size,
+ /// and a random messageid
+ /// </summary>
+ /// <param name="config">The fbm client config storing required config variables</param>
+ public FBMRequest(in FBMClientConfig config) : this(Helpers.RandomMessageId, in config)
+ { }
+ /// <summary>
+ /// Initializes a new <see cref="FBMRequest"/> with the sepcified message buffer size and a custom MessageId
+ /// </summary>
+ /// <param name="messageId">The custom message id</param>
+ /// <param name="config">The fbm client config storing required config variables</param>
+ public FBMRequest(int messageId, in FBMClientConfig config)
+ {
+ //Setup response wait handle but make sure the contuation runs async
+ ResponseWaitEvent = new(true);
+
+ //Alloc the buffer as a memory owner so a memory buffer can be used
+ HeapBuffer = config.BufferHeap.DirectAlloc<byte>(config.MessageBufferSize);
+
+ MessageId = messageId;
+
+ HeaderEncoding = config.HeaderEncoding;
+ ResponseHeaderBufferSize = config.MaxHeaderBufferSize;
+
+ WriteMessageId();
+ _writer = new(this);
+ }
+
+ /// <summary>
+ /// Resets the internal buffer and writes the message-id header to the begining
+ /// of the buffer
+ /// </summary>
+ private void WriteMessageId()
+ {
+ //Get writer over buffer
+ ForwardOnlyWriter<byte> buffer = new(HeapBuffer.Memory.Span);
+ //write messageid header to the buffer
+ buffer.Append((byte)HeaderCommand.MessageId);
+ buffer.Append(MessageId);
+ buffer.WriteTermination();
+ //Store intial position
+ Position = buffer.Written;
+ }
+
+ ///<inheritdoc/>
+ public void WriteHeader(HeaderCommand header, ReadOnlySpan<char> value) => WriteHeader((byte)header, value);
+ ///<inheritdoc/>
+ public void WriteHeader(byte header, ReadOnlySpan<char> value)
+ {
+ ForwardOnlyWriter<byte> buffer = new(RemainingBuffer.Span);
+ buffer.WriteHeader(header, value, Helpers.DefaultEncoding);
+ //Update position
+ Position += buffer.Written;
+ }
+ ///<inheritdoc/>
+ public void WriteBody(ReadOnlySpan<byte> body, ContentType contentType = ContentType.Binary)
+ {
+ //Write content type header
+ WriteHeader(HeaderCommand.ContentType, HttpHelpers.GetContentTypeString(contentType));
+ //Get writer over buffer
+ ForwardOnlyWriter<byte> buffer = new(RemainingBuffer.Span);
+ //Now safe to write body
+ buffer.WriteBody(body);
+ //Update position
+ Position += buffer.Written;
+ }
+ /// <summary>
+ /// Returns buffer writer for writing the body data to the internal message buffer
+ /// </summary>
+ /// <returns>A <see cref="BufferWriter"/> to write message body to</returns>
+ public IBufferWriter<byte> GetBodyWriter()
+ {
+ //Write body termination
+ Helpers.Termination.CopyTo(RemainingBuffer);
+ Position += Helpers.Termination.Length;
+ //Return buffer writer
+ return _writer;
+ }
+
+ /// <summary>
+ /// Resets the internal buffer and allows for writing a new message with
+ /// the same message-id
+ /// </summary>
+ public void Reset()
+ {
+ //Re-writing the message-id will reset the buffer
+ WriteMessageId();
+ }
+
+ internal void SetResponse(VnMemoryStream? vms)
+ {
+ Response = vms;
+ ResponseWaitEvent.Set();
+ }
+
+ internal Task WaitForResponseAsync(CancellationToken token)
+ {
+ return ResponseWaitEvent.WaitAsync().WaitAsync(token);
+ }
+
+ ///<inheritdoc/>
+ protected override void Free()
+ {
+ HeapBuffer.Dispose();
+ ResponseWaitEvent.Dispose();
+ OnResponseDisposed();
+ }
+ void IReusable.Prepare() => Reset();
+ bool IReusable.Release()
+ {
+ //Clear old response data if error occured
+ Response?.Dispose();
+ Response = null;
+
+ return true;
+ }
+
+ /// <summary>
+ /// Gets the response of the sent message
+ /// </summary>
+ /// <returns>The response message for the current request</returns>
+ internal FBMResponse GetResponse()
+ {
+ if (Response != null)
+ {
+ /*
+ * NOTICE
+ *
+ * The FBM Client will position the response stream to the start
+ * of the header section (missing the message-id header)
+ *
+ * The message id belongs to this request so it cannot be mismatched
+ *
+ * The headers are read into a list of key-value pairs and the stream
+ * is positioned to the start of the message body
+ */
+
+
+ //Alloc rseponse buffer
+ ResponseHeaderBuffer ??= ArrayPool<char>.Shared.Rent(ResponseHeaderBufferSize);
+
+ //Parse message headers
+ HeaderParseError statusFlags = Helpers.ParseHeaders(Response, ResponseHeaderBuffer, ResponseHeaderList, HeaderEncoding);
+
+ //return response structure
+ return new(Response, statusFlags, ResponseHeaderList, OnResponseDisposed);
+ }
+ else
+ {
+ return new();
+ }
+ }
+
+ //Called when a response message is disposed to cleanup resources held by the response
+ private void OnResponseDisposed()
+ {
+ //Clear response header list
+ ResponseHeaderList.Clear();
+
+ //Clear old response
+ Response?.Dispose();
+ Response = null;
+
+ if (ResponseHeaderBuffer != null)
+ {
+ //Free response buffer
+ ArrayPool<char>.Shared.Return(ResponseHeaderBuffer!);
+ ResponseHeaderBuffer = null;
+ }
+ }
+
+ ///<inheritdoc/>
+ public string Compile()
+ {
+ int charSize = Helpers.DefaultEncoding.GetCharCount(RequestData.Span);
+ using UnsafeMemoryHandle<char> buffer = Memory.UnsafeAlloc<char>(charSize + 128);
+ ERRNO count = Compile(buffer.Span);
+ return buffer.AsSpan(0, count).ToString();
+ }
+ ///<inheritdoc/>
+ public void Compile(ref ForwardOnlyWriter<char> writer)
+ {
+ writer.Append("Message ID:");
+ writer.Append(MessageId);
+ writer.Append(Environment.NewLine);
+ Helpers.DefaultEncoding.GetChars(RequestData.Span, ref writer);
+ }
+ ///<inheritdoc/>
+ public ERRNO Compile(in Span<char> buffer)
+ {
+ ForwardOnlyWriter<char> writer = new(buffer);
+ Compile(ref writer);
+ return writer.Written;
+ }
+ ///<inheritdoc/>
+ public override string ToString() => Compile();
+
+ }
+}
diff --git a/lib/Net.Messaging.FBM/src/Client/FBMResponse.cs b/lib/Net.Messaging.FBM/src/Client/FBMResponse.cs
new file mode 100644
index 0000000..da36956
--- /dev/null
+++ b/lib/Net.Messaging.FBM/src/Client/FBMResponse.cs
@@ -0,0 +1,106 @@
+/*
+* Copyright (c) 2022 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Net.Messaging.FBM
+* File: FBMResponse.cs
+*
+* FBMResponse.cs is part of VNLib.Net.Messaging.FBM which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+using System.Collections.Generic;
+
+using VNLib.Utils.IO;
+
+namespace VNLib.Net.Messaging.FBM.Client
+{
+ /// <summary>
+ /// A Fixed Buffer Message client response linked to the request that generated it.
+ /// Once the request is disposed or returned this message state is invalid
+ /// </summary>
+ public readonly struct FBMResponse : IDisposable, IEquatable<FBMResponse>
+ {
+ private readonly Action? _onDispose;
+
+ /// <summary>
+ /// True when a response body was recieved and properly parsed
+ /// </summary>
+ public readonly bool IsSet { get; }
+ /// <summary>
+ /// The raw response message packet
+ /// </summary>
+ public readonly VnMemoryStream? MessagePacket { get; }
+ /// <summary>
+ /// A collection of response message headers
+ /// </summary>
+ public readonly IReadOnlyList<KeyValuePair<HeaderCommand, ReadOnlyMemory<char>>> Headers { get; }
+ /// <summary>
+ /// Status flags of the message parse operation
+ /// </summary>
+ public readonly HeaderParseError StatusFlags { get; }
+ /// <summary>
+ /// The body segment of the response message
+ /// </summary>
+ public readonly ReadOnlySpan<byte> ResponseBody => IsSet ? Helpers.GetRemainingData(MessagePacket!) : ReadOnlySpan<byte>.Empty;
+
+ /// <summary>
+ /// Initailzies a response message structure and parses response
+ /// packet structure
+ /// </summary>
+ /// <param name="vms">The message buffer (message packet)</param>
+ /// <param name="status">The size of the buffer to alloc for header value storage</param>
+ /// <param name="headerList">The collection of headerse</param>
+ /// <param name="onDispose">A method that will be invoked when the message response body is disposed</param>
+ public FBMResponse(VnMemoryStream? vms, HeaderParseError status, IReadOnlyList<KeyValuePair<HeaderCommand, ReadOnlyMemory<char>>> headerList, Action onDispose)
+ {
+ MessagePacket = vms;
+ StatusFlags = status;
+ Headers = headerList;
+ IsSet = true;
+ _onDispose = onDispose;
+ }
+
+ /// <summary>
+ /// Creates an unset response structure
+ /// </summary>
+ public FBMResponse()
+ {
+ MessagePacket = null;
+ StatusFlags = HeaderParseError.InvalidHeaderRead;
+ Headers = Array.Empty<KeyValuePair<HeaderCommand, ReadOnlyMemory<char>>>();
+ IsSet = false;
+ _onDispose = null;
+ }
+
+ /// <summary>
+ /// Releases any resources associated with the response message
+ /// </summary>
+ public void Dispose() => _onDispose?.Invoke();
+ ///<inheritdoc/>
+ public override bool Equals(object? obj) => obj is FBMResponse response && Equals(response);
+ ///<inheritdoc/>
+ public override int GetHashCode() => IsSet ? MessagePacket!.GetHashCode() : 0;
+ ///<inheritdoc/>
+ public static bool operator ==(FBMResponse left, FBMResponse right) => left.Equals(right);
+ ///<inheritdoc/>
+ public static bool operator !=(FBMResponse left, FBMResponse right) => !(left == right);
+ ///<inheritdoc/>
+ public bool Equals(FBMResponse other) => IsSet && other.IsSet && MessagePacket == other.MessagePacket;
+
+ }
+}
diff --git a/lib/Net.Messaging.FBM/src/Client/FMBClientErrorEventArgs.cs b/lib/Net.Messaging.FBM/src/Client/FMBClientErrorEventArgs.cs
new file mode 100644
index 0000000..96e9414
--- /dev/null
+++ b/lib/Net.Messaging.FBM/src/Client/FMBClientErrorEventArgs.cs
@@ -0,0 +1,46 @@
+/*
+* Copyright (c) 2022 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Net.Messaging.FBM
+* File: FMBClientErrorEventArgs.cs
+*
+* FMBClientErrorEventArgs.cs is part of VNLib.Net.Messaging.FBM which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+
+#nullable disable
+
+namespace VNLib.Net.Messaging.FBM.Client
+{
+ /// <summary>
+ /// <see cref="EventArgs"/> that is raised when an error occurs
+ /// in the background listener loop
+ /// </summary>
+ public class FMBClientErrorEventArgs : EventArgs
+ {
+ /// <summary>
+ /// The client that the exception was raised from
+ /// </summary>
+ public FBMClient ErrorClient { get; init; }
+ /// <summary>
+ /// The exception that was raised
+ /// </summary>
+ public Exception Cause { get; init; }
+ }
+}
diff --git a/lib/Net.Messaging.FBM/src/Client/HeaderCommand.cs b/lib/Net.Messaging.FBM/src/Client/HeaderCommand.cs
new file mode 100644
index 0000000..5a57d85
--- /dev/null
+++ b/lib/Net.Messaging.FBM/src/Client/HeaderCommand.cs
@@ -0,0 +1,57 @@
+/*
+* Copyright (c) 2022 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Net.Messaging.FBM
+* File: HeaderCommand.cs
+*
+* HeaderCommand.cs is part of VNLib.Net.Messaging.FBM which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+namespace VNLib.Net.Messaging.FBM
+{
+ /// <summary>
+ /// A Fixed Buffer Message header command value
+ /// </summary>
+ public enum HeaderCommand : byte
+ {
+ /// <summary>
+ /// Default, do not use
+ /// </summary>
+ NOT_USED,
+ /// <summary>
+ /// Specifies the header for a message-id
+ /// </summary>
+ MessageId,
+ /// <summary>
+ /// Specifies a resource location
+ /// </summary>
+ Location,
+ /// <summary>
+ /// Specifies a standard MIME content type header
+ /// </summary>
+ ContentType,
+ /// <summary>
+ /// Specifies an action on a request
+ /// </summary>
+ Action,
+ /// <summary>
+ /// Specifies a status header
+ /// </summary>
+ Status
+ }
+}
diff --git a/lib/Net.Messaging.FBM/src/Client/HeaderParseStatus.cs b/lib/Net.Messaging.FBM/src/Client/HeaderParseStatus.cs
new file mode 100644
index 0000000..d38df26
--- /dev/null
+++ b/lib/Net.Messaging.FBM/src/Client/HeaderParseStatus.cs
@@ -0,0 +1,40 @@
+/*
+* Copyright (c) 2022 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Net.Messaging.FBM
+* File: HeaderParseStatus.cs
+*
+* HeaderParseStatus.cs is part of VNLib.Net.Messaging.FBM which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+
+namespace VNLib.Net.Messaging.FBM
+{
+ /// <summary>
+ /// Specifies the results of a response parsing operation
+ /// </summary>
+ [Flags]
+ public enum HeaderParseError
+ {
+ None = 0,
+ InvalidId = 1,
+ HeaderOutOfMem = 2,
+ InvalidHeaderRead = 4
+ }
+}
diff --git a/lib/Net.Messaging.FBM/src/Client/Helpers.cs b/lib/Net.Messaging.FBM/src/Client/Helpers.cs
new file mode 100644
index 0000000..8f895fa
--- /dev/null
+++ b/lib/Net.Messaging.FBM/src/Client/Helpers.cs
@@ -0,0 +1,272 @@
+/*
+* Copyright (c) 2022 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Net.Messaging.FBM
+* File: Helpers.cs
+*
+* Helpers.cs is part of VNLib.Net.Messaging.FBM which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+using System.IO;
+using System.Text;
+using System.Collections.Generic;
+using System.Security.Cryptography;
+
+using VNLib.Utils;
+using VNLib.Utils.IO;
+using VNLib.Utils.Memory;
+using VNLib.Utils.Extensions;
+
+
+namespace VNLib.Net.Messaging.FBM
+{
+ /// <summary>
+ /// Contains FBM library helper methods
+ /// </summary>
+ public static class Helpers
+ {
+ /// <summary>
+ /// The message-id of a connection control frame / out of band message
+ /// </summary>
+ public const int CONTROL_FRAME_MID = -500;
+
+ public static readonly Encoding DefaultEncoding = Encoding.UTF8;
+ public static readonly ReadOnlyMemory<byte> Termination = new byte[] { 0xFF, 0xF1 };
+
+ /// <summary>
+ /// Parses the header line for a message-id
+ /// </summary>
+ /// <param name="line">A sequence of bytes that make up a header line</param>
+ /// <returns>The message-id if parsed, -1 if message-id is not valid</returns>
+ public static int GetMessageId(ReadOnlySpan<byte> line)
+ {
+ //Make sure the message line is large enough to contain a message-id
+ if (line.Length < 1 + sizeof(int))
+ {
+ return -1;
+ }
+ //The first byte should be the header id
+ HeaderCommand headerId = (HeaderCommand)line[0];
+ //Make sure the headerid is set
+ if (headerId != HeaderCommand.MessageId)
+ {
+ return -2;
+ }
+ //Get the messageid after the header byte
+ ReadOnlySpan<byte> messageIdSegment = line.Slice(1, sizeof(int));
+ //get the messageid from the messageid segment
+ return BitConverter.ToInt32(messageIdSegment);
+ }
+
+ /// <summary>
+ /// Alloctes a random integer to use as a message id
+ /// </summary>
+ public static int RandomMessageId => RandomNumberGenerator.GetInt32(1, int.MaxValue);
+ /// <summary>
+ /// Gets the remaining data after the current position of the stream.
+ /// </summary>
+ /// <param name="response">The stream to segment</param>
+ /// <returns>The remaining data segment</returns>
+ public static ReadOnlySpan<byte> GetRemainingData(VnMemoryStream response)
+ {
+ return response.AsSpan()[(int)response.Position..];
+ }
+
+ /// <summary>
+ /// Reads the next available line from the response message
+ /// </summary>
+ /// <param name="response"></param>
+ /// <returns>The read line</returns>
+ public static ReadOnlySpan<byte> ReadLine(VnMemoryStream response)
+ {
+ //Get the span from the current stream position to end of the stream
+ ReadOnlySpan<byte> line = GetRemainingData(response);
+ //Search for next line termination
+ int index = line.IndexOf(Termination.Span);
+ if (index == -1)
+ {
+ return ReadOnlySpan<byte>.Empty;
+ }
+ //Update stream position to end of termination
+ response.Seek(index + Termination.Length, SeekOrigin.Current);
+ //slice up line and exclude the termination
+ return line[..index];
+ }
+ /// <summary>
+ /// Parses headers from the request stream, stores headers from the buffer into the
+ /// header collection
+ /// </summary>
+ /// <param name="vms">The FBM packet buffer</param>
+ /// <param name="buffer">The header character buffer to write headers to</param>
+ /// <param name="headers">The collection to store headers in</param>
+ /// <param name="encoding">The encoding type used to deocde header values</param>
+ /// <returns>The results of the parse operation</returns>
+ public static HeaderParseError ParseHeaders(VnMemoryStream vms, char[] buffer, ICollection<KeyValuePair<HeaderCommand, ReadOnlyMemory<char>>> headers, Encoding encoding)
+ {
+ HeaderParseError status = HeaderParseError.None;
+ //sliding window
+ Memory<char> currentWindow = buffer;
+ //Accumulate headers
+ while (true)
+ {
+ //Read the next line from the current stream
+ ReadOnlySpan<byte> line = ReadLine(vms);
+ if (line.IsEmpty)
+ {
+ //Done reading headers
+ break;
+ }
+ HeaderCommand cmd = GetHeaderCommand(line);
+ //Get header value
+ ERRNO charsRead = GetHeaderValue(line, currentWindow.Span, encoding);
+ if (charsRead < 0)
+ {
+ //Out of buffer space
+ status |= HeaderParseError.HeaderOutOfMem;
+ break;
+ }
+ else if (!charsRead)
+ {
+ //Invalid header
+ status |= HeaderParseError.InvalidHeaderRead;
+ }
+ else
+ {
+ //Store header as a read-only sequence
+ headers.Add(new(cmd, currentWindow[..(int)charsRead]));
+ //Shift buffer window
+ currentWindow = currentWindow[(int)charsRead..];
+ }
+ }
+ return status;
+ }
+
+ /// <summary>
+ /// Gets a <see cref="HeaderCommand"/> enum from the first byte of the message
+ /// </summary>
+ /// <param name="line"></param>
+ /// <returns>The <see cref="HeaderCommand"/> enum value from hte first byte of the message</returns>
+ public static HeaderCommand GetHeaderCommand(ReadOnlySpan<byte> line)
+ {
+ return (HeaderCommand)line[0];
+ }
+ /// <summary>
+ /// Gets the value of the header following the colon bytes in the specifed
+ /// data message data line
+ /// </summary>
+ /// <param name="line">The message header line to get the value of</param>
+ /// <param name="output">The output character buffer to write characters to</param>
+ /// <param name="encoding">The encoding to decode the specified data with</param>
+ /// <returns>The number of characters encoded</returns>
+ public static ERRNO GetHeaderValue(ReadOnlySpan<byte> line, Span<char> output, Encoding encoding)
+ {
+ //Get the data following the header byte
+ ReadOnlySpan<byte> value = line[1..];
+ //Calculate the character account
+ int charCount = encoding.GetCharCount(value);
+ //Determine if the output buffer is large enough
+ if (charCount > output.Length)
+ {
+ return -1;
+ }
+ //Decode the characters and return the char count
+ _ = encoding.GetChars(value, output);
+ return charCount;
+ }
+
+ /// <summary>
+ /// Appends an arbitrary header to the current request buffer
+ /// </summary>
+ /// <param name="buffer"></param>
+ /// <param name="header">The <see cref="HeaderCommand"/> of the header</param>
+ /// <param name="value">The value of the header</param>
+ /// <param name="encoding">Encoding to use when writing character message</param>
+ /// <exception cref="OutOfMemoryException"></exception>
+ public static void WriteHeader(ref this ForwardOnlyWriter<byte> buffer, byte header, ReadOnlySpan<char> value, Encoding encoding)
+ {
+ //get char count
+ int byteCount = encoding.GetByteCount(value);
+ //make sure there is enough room in the buffer
+ if (buffer.RemainingSize < byteCount)
+ {
+ throw new OutOfMemoryException();
+ }
+ //Write header command enum value
+ buffer.Append(header);
+ //Convert the characters to binary and write to the buffer
+ encoding.GetBytes(value, ref buffer);
+ //Write termination (0)
+ buffer.WriteTermination();
+ }
+
+ /// <summary>
+ /// Ends the header section of the request and appends the message body to
+ /// the end of the request
+ /// </summary>
+ /// <param name="buffer"></param>
+ /// <param name="body">The message body to send with request</param>
+ /// <exception cref="OutOfMemoryException"></exception>
+ public static void WriteBody(ref this ForwardOnlyWriter<byte> buffer, ReadOnlySpan<byte> body)
+ {
+ //start with termination
+ buffer.WriteTermination();
+ //Write the body
+ buffer.Append(body);
+ }
+ /// <summary>
+ /// Writes a line termination to the message buffer
+ /// </summary>
+ /// <param name="buffer"></param>
+ public static void WriteTermination(ref this ForwardOnlyWriter<byte> buffer)
+ {
+ //write termination
+ buffer.Append(Termination.Span);
+ }
+
+ /// <summary>
+ /// Writes a line termination to the message buffer
+ /// </summary>
+ /// <param name="buffer"></param>
+ public static void WriteTermination(this IDataAccumulator<byte> buffer)
+ {
+ //write termination
+ buffer.Append(Termination.Span);
+ }
+
+ /// <summary>
+ /// Appends an arbitrary header to the current request buffer
+ /// </summary>
+ /// <param name="buffer"></param>
+ /// <param name="header">The <see cref="HeaderCommand"/> of the header</param>
+ /// <param name="value">The value of the header</param>
+ /// <param name="encoding">Encoding to use when writing character message</param>
+ /// <exception cref="ArgumentException"></exception>
+ public static void WriteHeader(this IDataAccumulator<byte> buffer, byte header, ReadOnlySpan<char> value, Encoding encoding)
+ {
+ //Write header command enum value
+ buffer.Append(header);
+ //Convert the characters to binary and write to the buffer
+ int written = encoding.GetBytes(value, buffer.Remaining);
+ //Advance the buffer
+ buffer.Advance(written);
+ //Write termination (0)
+ buffer.WriteTermination();
+ }
+ }
+}
diff --git a/lib/Net.Messaging.FBM/src/Client/IFBMMessage.cs b/lib/Net.Messaging.FBM/src/Client/IFBMMessage.cs
new file mode 100644
index 0000000..18f19ec
--- /dev/null
+++ b/lib/Net.Messaging.FBM/src/Client/IFBMMessage.cs
@@ -0,0 +1,62 @@
+/*
+* Copyright (c) 2022 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Net.Messaging.FBM
+* File: IFBMMessage.cs
+*
+* IFBMMessage.cs is part of VNLib.Net.Messaging.FBM which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+
+using VNLib.Net.Http;
+
+namespace VNLib.Net.Messaging.FBM.Client
+{
+ /// <summary>
+ /// Represents basic Fixed Buffer Message protocol operations
+ /// </summary>
+ public interface IFBMMessage
+ {
+ /// <summary>
+ /// The unique id of the message (nonzero)
+ /// </summary>
+ int MessageId { get; }
+ /// <summary>
+ /// Writes a data body to the message of the specified content type
+ /// </summary>
+ /// <param name="body">The body of the message to copy</param>
+ /// <param name="contentType">The content type of the message body</param>
+ /// <exception cref="OutOfMemoryException"></exception>
+ void WriteBody(ReadOnlySpan<byte> body, ContentType contentType = ContentType.Binary);
+ /// <summary>
+ /// Appends an arbitrary header to the current request buffer
+ /// </summary>
+ /// <param name="header">The header id</param>
+ /// <param name="value">The value of the header</param>
+ /// <exception cref="OutOfMemoryException"></exception>
+ void WriteHeader(byte header, ReadOnlySpan<char> value);
+ /// <summary>
+ /// Appends an arbitrary header to the current request buffer
+ /// </summary>
+ /// <param name="header">The <see cref="HeaderCommand"/> of the header</param>
+ /// <param name="value">The value of the header</param>
+ /// <exception cref="OutOfMemoryException"></exception>
+ void WriteHeader(HeaderCommand header, ReadOnlySpan<char> value);
+ }
+} \ No newline at end of file
diff --git a/lib/Net.Messaging.FBM/src/Client/IStatefulConnection.cs b/lib/Net.Messaging.FBM/src/Client/IStatefulConnection.cs
new file mode 100644
index 0000000..3b9dd3b
--- /dev/null
+++ b/lib/Net.Messaging.FBM/src/Client/IStatefulConnection.cs
@@ -0,0 +1,54 @@
+/*
+* Copyright (c) 2022 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Net.Messaging.FBM
+* File: IStatefulConnection.cs
+*
+* IStatefulConnection.cs is part of VNLib.Net.Messaging.FBM which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace VNLib.Net.Messaging.FBM.Client
+{
+ /// <summary>
+ /// An abstraction for a stateful connection client that reports its status
+ /// </summary>
+ public interface IStatefulConnection
+ {
+ /// <summary>
+ /// An event that is raised when the connection state has transition from connected to disconnected
+ /// </summary>
+ event EventHandler ConnectionClosed;
+ /// <summary>
+ /// Connects the client to the remote resource
+ /// </summary>
+ /// <param name="serverUri">The resource location to connect to</param>
+ /// <param name="cancellationToken">A token to cancel the connect opreation</param>
+ /// <returns>A task that compeltes when the connection has succedded</returns>
+ Task ConnectAsync(Uri serverUri, CancellationToken cancellationToken = default);
+ /// <summary>
+ /// Gracefully disconnects the client from the remote resource
+ /// </summary>
+ /// <param name="cancellationToken">A token to cancel the disconnect operation</param>
+ /// <returns>A task that completes when the client has been disconnected</returns>
+ Task DisconnectAsync(CancellationToken cancellationToken = default);
+ }
+}
diff --git a/lib/Net.Messaging.FBM/src/Client/ManagedClientWebSocket.cs b/lib/Net.Messaging.FBM/src/Client/ManagedClientWebSocket.cs
new file mode 100644
index 0000000..acac369
--- /dev/null
+++ b/lib/Net.Messaging.FBM/src/Client/ManagedClientWebSocket.cs
@@ -0,0 +1,201 @@
+/*
+* Copyright (c) 2022 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Net.Messaging.FBM
+* File: ManagedClientWebSocket.cs
+*
+* ManagedClientWebSocket.cs is part of VNLib.Net.Messaging.FBM which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+using System.Net;
+using System.Threading;
+using System.Net.Security;
+using System.Net.WebSockets;
+using System.Threading.Tasks;
+using System.Security.Cryptography.X509Certificates;
+
+using VNLib.Utils.Memory;
+
+#nullable enable
+
+namespace VNLib.Net.Messaging.FBM.Client
+{
+
+ /// <summary>
+ /// A wrapper container to manage client websockets
+ /// </summary>
+ public class ManagedClientWebSocket : WebSocket
+ {
+ private readonly int TxBufferSize;
+ private readonly int RxBufferSize;
+ private readonly TimeSpan KeepAliveInterval;
+ private readonly VnTempBuffer<byte> _dataBuffer;
+ private readonly string? _subProtocol;
+
+ /// <summary>
+ /// A collection of headers to add to the client
+ /// </summary>
+ public WebHeaderCollection Headers { get; }
+ public X509CertificateCollection Certificates { get; }
+ public IWebProxy? Proxy { get; set; }
+ public CookieContainer? Cookies { get; set; }
+ public RemoteCertificateValidationCallback? RemoteCertificateValidationCallback { get; set; }
+
+
+ private ClientWebSocket? _socket;
+
+ /// <summary>
+ /// Initiaizes a new <see cref="ManagedClientWebSocket"/> that accepts an optional sub-protocol for connections
+ /// </summary>
+ /// <param name="txBufferSize">The size (in bytes) of the send buffer size</param>
+ /// <param name="rxBufferSize">The size (in bytes) of the receive buffer size to use</param>
+ /// <param name="keepAlive">The WS keepalive interval</param>
+ /// <param name="subProtocol">The optional sub-protocol to use</param>
+ public ManagedClientWebSocket(int txBufferSize, int rxBufferSize, TimeSpan keepAlive, string? subProtocol)
+ {
+ //Init header collection
+ Headers = new();
+ Certificates = new();
+ //Alloc buffer
+ _dataBuffer = new(rxBufferSize);
+ TxBufferSize = txBufferSize;
+ RxBufferSize = rxBufferSize;
+ KeepAliveInterval = keepAlive;
+ _subProtocol = subProtocol;
+ }
+
+ /// <summary>
+ /// Asyncrhonously prepares a new client web-socket and connects to the remote endpoint
+ /// </summary>
+ /// <param name="serverUri">The endpoint to connect to</param>
+ /// <param name="token">A token to cancel the connect operation</param>
+ /// <returns>A task that compeltes when the client has connected</returns>
+ public async Task ConnectAsync(Uri serverUri, CancellationToken token)
+ {
+ //Init new socket
+ _socket = new();
+ try
+ {
+ //Set buffer
+ _socket.Options.SetBuffer(RxBufferSize, TxBufferSize, _dataBuffer);
+ //Set remaining stored options
+ _socket.Options.ClientCertificates = Certificates;
+ _socket.Options.KeepAliveInterval = KeepAliveInterval;
+ _socket.Options.Cookies = Cookies;
+ _socket.Options.Proxy = Proxy;
+ _socket.Options.RemoteCertificateValidationCallback = RemoteCertificateValidationCallback;
+ //Specify sub-protocol
+ if (!string.IsNullOrEmpty(_subProtocol))
+ {
+ _socket.Options.AddSubProtocol(_subProtocol);
+ }
+ //Set headers
+ for (int i = 0; i < Headers.Count; i++)
+ {
+ string name = Headers.GetKey(i);
+ string? value = Headers.Get(i);
+ //Set header
+ _socket.Options.SetRequestHeader(name, value);
+ }
+ //Connect to server
+ await _socket.ConnectAsync(serverUri, token);
+ }
+ catch
+ {
+ //Cleanup the socket
+ Cleanup();
+ throw;
+ }
+ }
+
+ /// <summary>
+ /// Cleans up internal resources to prepare for another connection
+ /// </summary>
+ public void Cleanup()
+ {
+ //Dispose old socket if set
+ _socket?.Dispose();
+ _socket = null;
+ }
+ ///<inheritdoc/>
+ public override WebSocketCloseStatus? CloseStatus => _socket?.CloseStatus;
+ ///<inheritdoc/>
+ public override string CloseStatusDescription => _socket?.CloseStatusDescription ?? string.Empty;
+ ///<inheritdoc/>
+ public override WebSocketState State => _socket?.State ?? WebSocketState.Closed;
+ ///<inheritdoc/>
+ public override string SubProtocol => _subProtocol ?? string.Empty;
+
+
+ ///<inheritdoc/>
+ public override void Abort()
+ {
+ _socket?.Abort();
+ }
+ ///<inheritdoc/>
+ public override Task CloseAsync(WebSocketCloseStatus closeStatus, string? statusDescription, CancellationToken cancellationToken)
+ {
+ return _socket?.CloseAsync(closeStatus, statusDescription, cancellationToken) ?? Task.CompletedTask;
+ }
+ ///<inheritdoc/>
+ public override Task CloseOutputAsync(WebSocketCloseStatus closeStatus, string? statusDescription, CancellationToken cancellationToken)
+ {
+ if (_socket?.State == WebSocketState.Open || _socket?.State == WebSocketState.CloseSent)
+ {
+ return _socket.CloseOutputAsync(closeStatus, statusDescription, cancellationToken);
+ }
+ return Task.CompletedTask;
+ }
+ ///<inheritdoc/>
+ public override ValueTask<ValueWebSocketReceiveResult> ReceiveAsync(Memory<byte> buffer, CancellationToken cancellationToken)
+ {
+ _ = _socket ?? throw new WebSocketException(WebSocketError.ConnectionClosedPrematurely, "The connected socket has been disconnected");
+
+ return _socket.ReceiveAsync(buffer, cancellationToken);
+ }
+ ///<inheritdoc/>
+ public override Task<WebSocketReceiveResult> ReceiveAsync(ArraySegment<byte> buffer, CancellationToken cancellationToken)
+ {
+ _ = _socket ?? throw new WebSocketException(WebSocketError.ConnectionClosedPrematurely, "The connected socket has been disconnected");
+
+ return _socket.ReceiveAsync(buffer, cancellationToken);
+ }
+ ///<inheritdoc/>
+ public override ValueTask SendAsync(ReadOnlyMemory<byte> buffer, WebSocketMessageType messageType, bool endOfMessage, CancellationToken cancellationToken)
+ {
+ _ = _socket ?? throw new WebSocketException(WebSocketError.ConnectionClosedPrematurely, "The connected socket has been disconnected");
+ return _socket.SendAsync(buffer, messageType, endOfMessage, cancellationToken);
+ }
+ ///<inheritdoc/>
+ public override Task SendAsync(ArraySegment<byte> buffer, WebSocketMessageType messageType, bool endOfMessage, CancellationToken cancellationToken)
+ {
+ _ = _socket ?? throw new WebSocketException(WebSocketError.ConnectionClosedPrematurely, "The connected socket has been disconnected");
+ return _socket.SendAsync(buffer, messageType, endOfMessage, cancellationToken);
+ }
+
+ ///<inheritdoc/>
+ public override void Dispose()
+ {
+ //Free buffer
+ _dataBuffer.Dispose();
+ _socket?.Dispose();
+ GC.SuppressFinalize(this);
+ }
+ }
+}
diff --git a/lib/Net.Messaging.FBM/src/Client/README.md b/lib/Net.Messaging.FBM/src/Client/README.md
new file mode 100644
index 0000000..5aa8e76
--- /dev/null
+++ b/lib/Net.Messaging.FBM/src/Client/README.md
@@ -0,0 +1,169 @@
+# VNLib.Net.Messaging.FBM.Client
+
+Fixed Buffer Messaging Protocol client library. High performance statful messaging
+protocol built on top of HTTP web-sockets. Low/no allocation, completely asynchronous
+while providing a TPL API. This library provides a simple asynchronous request/response
+architecture to web-sockets. This was initially designed to provide an alternative to
+complete HTTP request/response overhead, but allow a simple control flow for work
+across a network.
+
+The base of the library relies on creating message objects that allocate fixed size
+buffers are configured when the IFBMMessageis constructed. All data is written to the
+internal buffer adhering to the format below.
+
+Messages consist of a 4 byte message id, a collection of headers, and a message body.
+The first 4 bytes of a message is the ID (for normal messages a signed integer greater than 0),
+0 is reserved for error conditions, and negative numbers are reserved for internal
+messages. Headers are identified by a single byte, followed by a variable length UTF8
+encoded character sequence, followed by a termination of 0xFF, 0xF1 (may change).
+
+### Message structure
+ 4 byte positive (signed 32-bit integer) message id
+ 2 byte termination
+ 1 byte header-id
+ variable length UTF8 value
+ 2 byte termination
+ -- other headers --
+ 2 byte termination (extra termination, ie: empty header)
+ variable length payload
+ (end of message is the end of the payload)
+
+Buffer sizes are generally negotiated on initial web-socket upgrade, so to buffer entire messages
+in a single read/write from the web-socket. Received messages are read into memory until
+the web-socket has no more data available. The message is then parsed and passed for processing
+on the server side, or complete a pending request on the client side. Servers may drop the
+web-socket connection and return an error if messages exceed the size of the pre-negotiated
+buffer. Servers should validate buffer sizes before accepting a connection.
+
+This client library allows for messages to be streamed to the server, however this library
+is optimized for fixed buffers, so streaming will not be the most efficient, and will likely
+cause slow-downs in message transmission. However, since FBM relies on a streaming protocol,
+so it was silly not to provide it. Streams add overhead of additional buffer allocation,
+additional copy, and message fragmentation (multiple writes to the web-socket). Since frames
+written to the web-socket must be synchronized, a mutex is held during transmission, which
+means the more message overhead, the longer the blocking period on new messages. Mutex
+acquisition will wait asynchronously when necessary.
+
+The goal of the FBM protocol for is to provide efficient use of resources (memory, network,
+and minimize GC load) to transfer small messages truly asynchronously, at wire speeds, with
+only web-socket and transport overhead. Using web-sockets simplifies implementation, and allows
+comparability across platforms, languages, and versions.
+
+## fundamentals
+
+The main implementation is the FBMClient class. This class provides the means for creating
+the stateful connection to the remote server. It also provides an internal FBMRequest message
+rental (object cache) that created initialized FBMRequest messages. This class may be derrived
+to provide additional functionality, such as handling control frames that may dynamically
+alter the state of the connection (negotiation etc). A mechanism to do so is provided.
+
+### FBMClient layout
+
+```
+ public class FBMClient : VnDisposeable, IStatefulConnection, ICacheHolder
+ {
+ //Raised when an error occurs during receiving or parsing
+ public event EventHandler<FMBClientErrorEventArgs>? ConnectionClosedOnError;
+
+ //Raised when connection is closed, regardless of the cause
+ public event EventHandler? ConnectionClosed;
+
+ //Connects to the remote server at the specified websocket address (ws:// or wss://)
+ public async Task ConnectAsync(Uri address, CancellationToken cancellation = default);
+
+ //When connected, sends the specified message to the remote server
+ public async Task<FBMResponse> SendAsync(FBMRequest request, CancellationToken cancellationToken = default);
+
+ //When connected, streams a message to the remote server, * the message payload must not be configured *
+ public async Task<FBMResponse> StreamDataAsync(FBMRequest request, Stream payload, ContentType ct, CancellationToken cancellationToken = default);
+
+ //Disconnects from the remote server
+ public async Task DisconnectAsync(CancellationToken cancellationToken = default);
+
+ //Releases all held resourses
+ public void Dispose(); //Inherrited from VnDisposeable
+
+ ICacheHolder.CacheClear(); //Inherited member, clears cached FBMRequest objects
+ ICacheHolder.CacheHardClear(); //Inherited member, clears cached FBMRequest objects
+ }
+```
+
+### Example usage
+```
+ FBMClientConfig config = new()
+ {
+ //The size (in bytes) of the internal buffer to use when receiving messages from the server
+ RecvBufferSize = 1024,
+
+ //FBMRequest buffer size (expected size of buffers, required for negotiation)
+ RequestBufferSize = 1024,
+
+ //The size (in chars) of headers the FBMResponse should expect to buffer from the server
+ ResponseHeaderBufSize = 1024,
+
+ //The absolute maximum message size to buffer from the server
+ MaxMessageSize = 10 * 1024 * 1024, //10KiB
+
+ //The unmanaged heap the allocate buffers from
+ BufferHeap = Memory.Shared,
+
+ //Web-socket keepalive frame interval
+ KeepAliveInterval = TimeSpan.FromSeconds(30),
+
+ //Web-socket sub-protocol header value
+ SubProtocol = null
+ };
+
+ //Craete client from the config
+ using (FBMClient client = new(config))
+ {
+ //Usually set some type of authentication headers before connecting
+
+ /*
+ client.ClientSocket.SetHeader("Authorization", "Authorization token");
+ */
+
+ //Connect to server
+ Uri address = new Uri("wss://localhost:8080/some/fbm/endpoint");
+ await client.ConnectAsync(address, CancellationToken.None);
+
+ do
+ {
+ //Rent request message
+ FBMRequest request = client.RentRequest();
+ //Some arbitrary header value (or preconfigured header)
+ request.WriteHeader(0x10, "Hello");
+ //Some arbitrary payload
+ request.WriteBody(new byte[] { 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A }, ContentType.Binary);
+ //Send request message
+ using (FBMResponse response = await client.SendAsync(request, CancellationToken.None))
+ {
+ //Extension method to raise exception if an invalid response was received (also use the response.IsSet flag)
+ response.ThrowIfNotSet();
+
+ //Check headers (using Linq to get first header)
+ string header1 = response.Headers.First().Value.ToString();
+
+ //Get payload, data is valid until the response is disposed
+ ReadOnlySpan<byte> body = response.ResponseBody;
+ }
+ //Return request
+ client.ReturnRequest(request);
+ //request.Dispose(); //Alternativly dispose message
+
+ await Task.Delay(1000);
+ }
+ while(true);
+ }
+```
+
+## Final Notes
+
+XML Documentation is or will be provided for almost all public exports. APIs are intended to
+be sensibly public and immutable to allow for easy extensability (via extension methods). I
+often use extension libraries to provide additional functionality. (See cache library)
+
+This library is likely a niche use case, and is probably not for everyone. Unless you care
+about reasonably efficient high frequency request/response messaging, this probably isnt
+for you. This library provides a reasonable building block for distributed lock mechanisms
+and small data caching. \ No newline at end of file
diff --git a/lib/Net.Messaging.FBM/src/Exceptions/FBMException.cs b/lib/Net.Messaging.FBM/src/Exceptions/FBMException.cs
new file mode 100644
index 0000000..1d5c7db
--- /dev/null
+++ b/lib/Net.Messaging.FBM/src/Exceptions/FBMException.cs
@@ -0,0 +1,52 @@
+/*
+* Copyright (c) 2022 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Net.Messaging.FBM
+* File: FBMException.cs
+*
+* FBMException.cs is part of VNLib.Net.Messaging.FBM which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+using System.Runtime.Serialization;
+
+namespace VNLib.Net.Messaging.FBM
+{
+ /// <summary>
+ /// A base exception class for all FBM Library exceptions
+ /// </summary>
+ public class FBMException : Exception
+ {
+ ///<inheritdoc/>
+ public FBMException()
+ {
+ }
+ ///<inheritdoc/>
+ public FBMException(string message) : base(message)
+ {
+ }
+ ///<inheritdoc/>
+ public FBMException(string message, Exception innerException) : base(message, innerException)
+ {
+ }
+ ///<inheritdoc/>
+ protected FBMException(SerializationInfo info, StreamingContext context) : base(info, context)
+ {
+ }
+ }
+}
diff --git a/lib/Net.Messaging.FBM/src/Exceptions/FBMInvalidRequestException.cs b/lib/Net.Messaging.FBM/src/Exceptions/FBMInvalidRequestException.cs
new file mode 100644
index 0000000..ae42797
--- /dev/null
+++ b/lib/Net.Messaging.FBM/src/Exceptions/FBMInvalidRequestException.cs
@@ -0,0 +1,47 @@
+/*
+* Copyright (c) 2022 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Net.Messaging.FBM
+* File: FBMInvalidRequestException.cs
+*
+* FBMInvalidRequestException.cs is part of VNLib.Net.Messaging.FBM which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+
+
+namespace VNLib.Net.Messaging.FBM
+{
+ /// <summary>
+ /// Raised when a request message is not in a valid state and cannot be sent
+ /// </summary>
+ public class FBMInvalidRequestException : FBMException
+ {
+ public FBMInvalidRequestException()
+ {
+ }
+
+ public FBMInvalidRequestException(string message) : base(message)
+ {
+ }
+
+ public FBMInvalidRequestException(string message, Exception innerException) : base(message, innerException)
+ {
+ }
+ }
+}
diff --git a/lib/Net.Messaging.FBM/src/Exceptions/InvalidResponseException.cs b/lib/Net.Messaging.FBM/src/Exceptions/InvalidResponseException.cs
new file mode 100644
index 0000000..3f0b970
--- /dev/null
+++ b/lib/Net.Messaging.FBM/src/Exceptions/InvalidResponseException.cs
@@ -0,0 +1,52 @@
+/*
+* Copyright (c) 2022 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Net.Messaging.FBM
+* File: InvalidResponseException.cs
+*
+* InvalidResponseException.cs is part of VNLib.Net.Messaging.FBM which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+using System.Runtime.Serialization;
+
+namespace VNLib.Net.Messaging.FBM
+{
+ /// <summary>
+ /// Raised when a response to an FBM request is not in a valid state
+ /// </summary>
+ public class InvalidResponseException : FBMException
+ {
+ ///<inheritdoc/>
+ public InvalidResponseException()
+ {
+ }
+ ///<inheritdoc/>
+ public InvalidResponseException(string message) : base(message)
+ {
+ }
+ ///<inheritdoc/>
+ public InvalidResponseException(string message, Exception innerException) : base(message, innerException)
+ {
+ }
+ ///<inheritdoc/>
+ protected InvalidResponseException(SerializationInfo info, StreamingContext context) : base(info, context)
+ {
+ }
+ }
+}
diff --git a/lib/Net.Messaging.FBM/src/Server/FBMContext.cs b/lib/Net.Messaging.FBM/src/Server/FBMContext.cs
new file mode 100644
index 0000000..fb39d1b
--- /dev/null
+++ b/lib/Net.Messaging.FBM/src/Server/FBMContext.cs
@@ -0,0 +1,85 @@
+/*
+* Copyright (c) 2022 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Net.Messaging.FBM
+* File: FBMContext.cs
+*
+* FBMContext.cs is part of VNLib.Net.Messaging.FBM which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System.Text;
+
+using VNLib.Utils.IO;
+using VNLib.Utils.Memory.Caching;
+
+namespace VNLib.Net.Messaging.FBM.Server
+{
+ /// <summary>
+ /// A request/response pair message context
+ /// </summary>
+ public sealed class FBMContext : IReusable
+ {
+ private readonly Encoding _headerEncoding;
+
+ /// <summary>
+ /// The request message to process
+ /// </summary>
+ public FBMRequestMessage Request { get; }
+ /// <summary>
+ /// The response message
+ /// </summary>
+ public FBMResponseMessage Response { get; }
+ /// <summary>
+ /// Creates a new reusable <see cref="FBMContext"/>
+ /// for use within a <see cref="ObjectRental{T}"/>
+ /// cache
+ /// </summary>
+ /// <param name="requestHeaderBufferSize">The size in characters of the request header buffer</param>
+ /// <param name="responseBufferSize">The size in characters of the response header buffer</param>
+ /// <param name="headerEncoding">The message header encoding instance</param>
+ public FBMContext(int requestHeaderBufferSize, int responseBufferSize, Encoding headerEncoding)
+ {
+ Request = new(requestHeaderBufferSize);
+ Response = new(responseBufferSize, headerEncoding);
+ _headerEncoding = headerEncoding;
+ }
+
+ /// <summary>
+ /// Initializes the context with the buffered request data
+ /// </summary>
+ /// <param name="requestData">The request data buffer positioned at the begining of the request data</param>
+ /// <param name="connectionId">The unique id of the connection</param>
+ internal void Prepare(VnMemoryStream requestData, string connectionId)
+ {
+ Request.Prepare(requestData, connectionId, _headerEncoding);
+ //Message id is set after the request parses the incoming message
+ Response.Prepare(Request.MessageId);
+ }
+
+ void IReusable.Prepare()
+ {
+ (Request as IReusable).Prepare();
+ (Response as IReusable).Prepare();
+ }
+
+ bool IReusable.Release()
+ {
+ return (Request as IReusable).Release() & (Response as IReusable).Release();
+ }
+ }
+}
diff --git a/lib/Net.Messaging.FBM/src/Server/FBMListener.cs b/lib/Net.Messaging.FBM/src/Server/FBMListener.cs
new file mode 100644
index 0000000..6cca2a9
--- /dev/null
+++ b/lib/Net.Messaging.FBM/src/Server/FBMListener.cs
@@ -0,0 +1,388 @@
+/*
+* Copyright (c) 2022 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Net.Messaging.FBM
+* File: FBMListener.cs
+*
+* FBMListener.cs is part of VNLib.Net.Messaging.FBM which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+using System.IO;
+using System.Buffers;
+using System.Threading;
+using System.Net.WebSockets;
+using System.Threading.Tasks;
+
+using VNLib.Utils.IO;
+using VNLib.Utils.Async;
+using VNLib.Utils.Memory;
+using VNLib.Utils.Extensions;
+using VNLib.Utils.Memory.Caching;
+using VNLib.Plugins.Essentials;
+
+namespace VNLib.Net.Messaging.FBM.Server
+{
+
+ /// <summary>
+ /// Method delegate for processing FBM messages from an <see cref="FBMListener"/>
+ /// when messages are received
+ /// </summary>
+ /// <param name="context">The message/connection context</param>
+ /// <param name="userState">The state parameter passed on client connected</param>
+ /// <param name="cancellationToken">A token that reflects the state of the listener</param>
+ /// <returns>A <see cref="Task"/> that resolves when processing is complete</returns>
+ public delegate Task RequestHandler(FBMContext context, object? userState, CancellationToken cancellationToken);
+
+ /// <summary>
+ /// A FBM protocol listener. Listens for messages on a <see cref="WebSocketSession"/>
+ /// and raises events on requests.
+ /// </summary>
+ public class FBMListener
+ {
+ private sealed class ListeningSession
+ {
+ private readonly ReusableStore<FBMContext> CtxStore;
+ private readonly CancellationTokenSource Cancellation;
+ private readonly CancellationTokenRegistration Registration;
+ private readonly FBMListenerSessionParams Params;
+
+
+ public readonly object? UserState;
+
+ public readonly SemaphoreSlim ResponseLock;
+
+ public readonly WebSocketSession Socket;
+
+ public readonly RequestHandler OnRecieved;
+
+ public CancellationToken CancellationToken => Cancellation.Token;
+
+
+ public ListeningSession(WebSocketSession session, RequestHandler onRecieved, in FBMListenerSessionParams args, object? userState)
+ {
+ Params = args;
+ Socket = session;
+ UserState = userState;
+ OnRecieved = onRecieved;
+
+ //Create cancellation and register for session close
+ Cancellation = new();
+ Registration = session.Token.Register(Cancellation.Cancel);
+
+
+ ResponseLock = new(1);
+ CtxStore = ObjectRental.CreateReusable(ContextCtor);
+ }
+
+ private FBMContext ContextCtor() => new(Params.MaxHeaderBufferSize, Params.ResponseBufferSize, Params.HeaderEncoding);
+
+ /// <summary>
+ /// Cancels any pending opreations relating to the current session
+ /// </summary>
+ public void CancelSession()
+ {
+ Cancellation.Cancel();
+
+ //If dispose happens without any outstanding requests, we can dispose the session
+ if (_counter == 0)
+ {
+ CleanupInternal();
+ }
+ }
+
+ private void CleanupInternal()
+ {
+ Registration.Dispose();
+ CtxStore.Dispose();
+ Cancellation.Dispose();
+ ResponseLock.Dispose();
+ }
+
+
+ private uint _counter;
+
+ /// <summary>
+ /// Rents a new <see cref="FBMContext"/> instance from the pool
+ /// and increments the counter
+ /// </summary>
+ /// <returns>The rented instance</returns>
+ /// <exception cref="ObjectDisposedException"></exception>
+ public FBMContext RentContext()
+ {
+
+ if (Cancellation.IsCancellationRequested)
+ {
+ throw new ObjectDisposedException("The instance has been disposed");
+ }
+
+ //Rent context
+ FBMContext ctx = CtxStore.Rent();
+ //Increment counter
+ Interlocked.Increment(ref _counter);
+
+ return ctx;
+ }
+
+ /// <summary>
+ /// Returns a previously rented context to the pool
+ /// and decrements the counter. If the session has been
+ /// cancelled, when the counter reaches 0, cleanup occurs
+ /// </summary>
+ /// <param name="ctx">The context to return</param>
+ public void ReturnContext(FBMContext ctx)
+ {
+ //Return the context
+ CtxStore.Return(ctx);
+
+ uint current = Interlocked.Decrement(ref _counter);
+
+ //No more contexts in use, dispose internals
+ if (Cancellation.IsCancellationRequested && current == 0)
+ {
+ ResponseLock.Dispose();
+ Cancellation.Dispose();
+ CtxStore.Dispose();
+ }
+ }
+ }
+
+ public const int SEND_SEMAPHORE_TIMEOUT_MS = 10 * 1000;
+
+ private readonly IUnmangedHeap Heap;
+
+ /// <summary>
+ /// Raised when a response processing error occured
+ /// </summary>
+ public event EventHandler<Exception>? OnProcessError;
+
+ /// <summary>
+ /// Creates a new <see cref="FBMListener"/> instance ready for
+ /// processing connections
+ /// </summary>
+ /// <param name="heap">The heap to alloc buffers from</param>
+ public FBMListener(IUnmangedHeap heap)
+ {
+ Heap = heap;
+ }
+
+ /// <summary>
+ /// Begins listening for requests on the current websocket until
+ /// a close message is received or an error occurs
+ /// </summary>
+ /// <param name="wss">The <see cref="WebSocketSession"/> to receive messages on</param>
+ /// <param name="handler">The callback method to handle incoming requests</param>
+ /// <param name="args">The arguments used to configured this listening session</param>
+ /// <param name="userState">A state parameter</param>
+ /// <returns>A <see cref="Task"/> that completes when the connection closes</returns>
+ public async Task ListenAsync(WebSocketSession wss, RequestHandler handler, FBMListenerSessionParams args, object? userState)
+ {
+ ListeningSession session = new(wss, handler, args, userState);
+ //Alloc a recieve buffer
+ using IMemoryOwner<byte> recvBuffer = Heap.DirectAlloc<byte>(args.RecvBufferSize);
+
+ //Init new queue for dispatching work
+ AsyncQueue<VnMemoryStream> workQueue = new(true, true);
+
+ //Start a task to process the queue
+ Task queueWorker = QueueWorkerDoWork(workQueue, session);
+
+ try
+ {
+ //Listen for incoming messages
+ while (true)
+ {
+ //Receive a message
+ ValueWebSocketReceiveResult result = await wss.ReceiveAsync(recvBuffer.Memory);
+ //If a close message has been received, we can gracefully exit
+ if (result.MessageType == WebSocketMessageType.Close)
+ {
+ //Return close message
+ await wss.CloseSocketAsync(WebSocketCloseStatus.NormalClosure, "Goodbye");
+ //break listen loop
+ break;
+ }
+ //create buffer for storing data
+ VnMemoryStream request = new(Heap);
+ //Copy initial data
+ request.Write(recvBuffer.Memory.Span[..result.Count]);
+ //Streaming read
+ while (!result.EndOfMessage)
+ {
+ //Read more data
+ result = await wss.ReceiveAsync(recvBuffer.Memory);
+ //Make sure the request is small enough to buffer
+ if (request.Length + result.Count > args.MaxMessageSize)
+ {
+ //dispose the buffer
+ request.Dispose();
+ //close the socket with a message too big
+ await wss.CloseSocketAsync(WebSocketCloseStatus.MessageTooBig, "Buffer space exceeded for message. Goodbye");
+ //break listen loop
+ goto Exit;
+ }
+ //write to buffer
+ request.Write(recvBuffer.Memory.Span[..result.Count]);
+ }
+ //Make sure data is available
+ if (request.Length == 0)
+ {
+ request.Dispose();
+ continue;
+ }
+ //reset buffer position
+ _ = request.Seek(0, SeekOrigin.Begin);
+ //Enqueue the request
+ await workQueue.EnqueueAsync(request);
+ }
+
+ Exit:
+ ;
+ }
+ finally
+ {
+ session.CancelSession();
+ await queueWorker.ConfigureAwait(false);
+ }
+ }
+
+ private async Task QueueWorkerDoWork(AsyncQueue<VnMemoryStream> queue, ListeningSession session)
+ {
+ try
+ {
+ while (true)
+ {
+ //Get work from queue
+ VnMemoryStream request = await queue.DequeueAsync(session.CancellationToken);
+ //Process request without waiting
+ _ = ProcessAsync(request, session).ConfigureAwait(false);
+ }
+ }
+ catch (OperationCanceledException)
+ { }
+ finally
+ {
+ //Cleanup any queued requests
+ while (queue.TryDequeue(out VnMemoryStream? stream))
+ {
+ stream.Dispose();
+ }
+ }
+ }
+
+ private async Task ProcessAsync(VnMemoryStream data, ListeningSession session)
+ {
+ //Rent a new request object
+ FBMContext context = session.RentContext();
+ try
+ {
+ //Prepare the request/response
+ context.Prepare(data, session.Socket.SocketID);
+
+ if ((context.Request.ParseStatus & HeaderParseError.InvalidId) > 0)
+ {
+ OnProcessError?.Invoke(this, new FBMException($"Invalid messageid {context.Request.MessageId}, message length {data.Length}"));
+ return;
+ }
+
+ //Check parse status flags
+ if ((context.Request.ParseStatus & HeaderParseError.HeaderOutOfMem) > 0)
+ {
+ OnProcessError?.Invoke(this, new FBMException("Packet received with not enough space to store headers"));
+ }
+ //Determine if request is an out-of-band message
+ else if (context.Request.MessageId == Helpers.CONTROL_FRAME_MID)
+ {
+ //Process control frame
+ await ProcessOOBAsync(context);
+ }
+ else
+ {
+ //Invoke normal message handler
+ await session.OnRecieved.Invoke(context, session.UserState, session.CancellationToken);
+ }
+
+ //Get response data
+ await using IAsyncMessageReader messageEnumerator = await context.Response.GetResponseDataAsync(session.CancellationToken);
+
+ //Load inital segment
+ if (await messageEnumerator.MoveNextAsync() && !session.CancellationToken.IsCancellationRequested)
+ {
+ ValueTask sendTask;
+
+ //Syncrhonize access to send data because we may need to stream data to the client
+ await session.ResponseLock.WaitAsync(SEND_SEMAPHORE_TIMEOUT_MS);
+ try
+ {
+ do
+ {
+ bool eof = !messageEnumerator.DataRemaining;
+
+ //Send first segment
+ sendTask = session.Socket.SendAsync(messageEnumerator.Current, WebSocketMessageType.Binary, eof);
+
+ /*
+ * WARNING!
+ * this code relies on the managed websocket impl that the websocket will read
+ * the entire buffer before returning. If this is not the case, this code will
+ * overwrite the memory buffer on the next call to move next.
+ */
+
+ //Move to next segment
+ if (!await messageEnumerator.MoveNextAsync())
+ {
+ break;
+ }
+
+ //Await previous send
+ await sendTask;
+
+ } while (true);
+ }
+ finally
+ {
+ //release semaphore
+ session.ResponseLock.Release();
+ }
+
+ await sendTask;
+ }
+
+ //No data to send
+ }
+ catch (Exception ex)
+ {
+ OnProcessError?.Invoke(this, ex);
+ }
+ finally
+ {
+ session.ReturnContext(context);
+ }
+ }
+
+ /// <summary>
+ /// Processes an out-of-band request message (internal communications)
+ /// </summary>
+ /// <param name="outOfBandContext">The <see cref="FBMContext"/> containing the OOB message</param>
+ /// <returns>A <see cref="Task"/> that completes when the operation completes</returns>
+ protected virtual Task ProcessOOBAsync(FBMContext outOfBandContext)
+ {
+ return Task.CompletedTask;
+ }
+ }
+}
diff --git a/lib/Net.Messaging.FBM/src/Server/FBMListenerBase.cs b/lib/Net.Messaging.FBM/src/Server/FBMListenerBase.cs
new file mode 100644
index 0000000..3e9fde2
--- /dev/null
+++ b/lib/Net.Messaging.FBM/src/Server/FBMListenerBase.cs
@@ -0,0 +1,113 @@
+/*
+* Copyright (c) 2022 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Net.Messaging.FBM
+* File: FBMListenerBase.cs
+*
+* FBMListenerBase.cs is part of VNLib.Net.Messaging.FBM which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+
+using VNLib.Utils.Logging;
+using VNLib.Utils.Memory;
+using VNLib.Plugins.Essentials;
+
+namespace VNLib.Net.Messaging.FBM.Server
+{
+ /// <summary>
+ /// Provides a simple base class for an <see cref="FBMListener"/>
+ /// processor
+ /// </summary>
+ public abstract class FBMListenerBase
+ {
+
+ /// <summary>
+ /// The initialzied listener
+ /// </summary>
+ protected FBMListener? Listener { get; private set; }
+ /// <summary>
+ /// A provider to write log information to
+ /// </summary>
+ protected abstract ILogProvider Log { get; }
+
+ /// <summary>
+ /// Initializes the <see cref="FBMListener"/>
+ /// </summary>
+ /// <param name="heap">The heap to alloc buffers from</param>
+ protected void InitListener(IUnmangedHeap heap)
+ {
+ Listener = new(heap);
+ //Attach service handler
+ Listener.OnProcessError += Listener_OnProcessError;
+ }
+
+ /// <summary>
+ /// A single event service routine for servicing errors that occur within
+ /// the listener loop
+ /// </summary>
+ /// <param name="sender"></param>
+ /// <param name="e">The exception that was raised</param>
+ protected virtual void Listener_OnProcessError(object? sender, Exception e)
+ {
+ //Write the error to the log
+ Log.Error(e);
+ }
+
+ private async Task OnReceivedAsync(FBMContext context, object? userState, CancellationToken token)
+ {
+ try
+ {
+ await ProcessAsync(context, userState, token);
+ }
+ catch (OperationCanceledException)
+ {
+ Log.Debug("Async operation cancelled");
+ }
+ catch(Exception ex)
+ {
+ Log.Error(ex);
+ }
+ }
+
+ /// <summary>
+ /// Begins listening for requests on the current websocket until
+ /// a close message is received or an error occurs
+ /// </summary>
+ /// <param name="wss">The <see cref="WebSocketSession"/> to receive messages on</param>
+ /// <param name="args">The arguments used to configured this listening session</param>
+ /// <param name="userState">A state token to use for processing events for this connection</param>
+ /// <returns>A <see cref="Task"/> that completes when the connection closes</returns>
+ public virtual async Task ListenAsync(WebSocketSession wss, FBMListenerSessionParams args, object? userState)
+ {
+ _ = Listener ?? throw new InvalidOperationException("The listener has not been intialized");
+ await Listener.ListenAsync(wss, OnReceivedAsync, args, userState);
+ }
+
+ /// <summary>
+ /// A method to service an incoming message
+ /// </summary>
+ /// <param name="context">The context containing the message to be serviced</param>
+ /// <param name="userState">A state token passed on client connected</param>
+ /// <param name="exitToken">A token that reflects the state of the listener</param>
+ /// <returns>A task that completes when the message has been serviced</returns>
+ protected abstract Task ProcessAsync(FBMContext context, object? userState, CancellationToken exitToken);
+ }
+}
diff --git a/lib/Net.Messaging.FBM/src/Server/FBMListenerSessionParams.cs b/lib/Net.Messaging.FBM/src/Server/FBMListenerSessionParams.cs
new file mode 100644
index 0000000..c327475
--- /dev/null
+++ b/lib/Net.Messaging.FBM/src/Server/FBMListenerSessionParams.cs
@@ -0,0 +1,62 @@
+/*
+* Copyright (c) 2022 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Net.Messaging.FBM
+* File: FBMListenerSessionParams.cs
+*
+* FBMListenerSessionParams.cs is part of VNLib.Net.Messaging.FBM which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System.Text;
+
+namespace VNLib.Net.Messaging.FBM.Server
+{
+ /// <summary>
+ /// Represents a configuration structure for an <see cref="FBMListener"/>
+ /// listening session
+ /// </summary>
+ public readonly struct FBMListenerSessionParams
+ {
+ /// <summary>
+ /// The size of the buffer to use while reading data from the websocket
+ /// in the listener loop
+ /// </summary>
+ public readonly int RecvBufferSize { get; init; }
+ /// <summary>
+ /// The size of the character buffer to store FBMheader values in
+ /// the <see cref="FBMRequestMessage"/>
+ /// </summary>
+ public readonly int MaxHeaderBufferSize { get; init; }
+ /// <summary>
+ /// The size of the internal message response buffer when
+ /// not streaming
+ /// </summary>
+ public readonly int ResponseBufferSize { get; init; }
+ /// <summary>
+ /// The FMB message header character encoding
+ /// </summary>
+ public readonly Encoding HeaderEncoding { get; init; }
+
+ /// <summary>
+ /// The absolute maxium size (in bytes) message to process before
+ /// closing the websocket connection. This value should be negotiaed
+ /// by clients or hard-coded to avoid connection issues
+ /// </summary>
+ public readonly int MaxMessageSize { get; init; }
+ }
+}
diff --git a/lib/Net.Messaging.FBM/src/Server/FBMRequestMessage.cs b/lib/Net.Messaging.FBM/src/Server/FBMRequestMessage.cs
new file mode 100644
index 0000000..ed36571
--- /dev/null
+++ b/lib/Net.Messaging.FBM/src/Server/FBMRequestMessage.cs
@@ -0,0 +1,196 @@
+/*
+* Copyright (c) 2022 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Net.Messaging.FBM
+* File: FBMRequestMessage.cs
+*
+* FBMRequestMessage.cs is part of VNLib.Net.Messaging.FBM which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+using System.Text;
+using System.Buffers;
+using System.Text.Json;
+using System.Collections.Generic;
+
+using VNLib.Utils;
+using VNLib.Utils.IO;
+using VNLib.Utils.Memory;
+using VNLib.Utils.Extensions;
+using VNLib.Utils.Memory.Caching;
+
+namespace VNLib.Net.Messaging.FBM.Server
+{
+ /// <summary>
+ /// Represents a client request message to be serviced
+ /// </summary>
+ public sealed class FBMRequestMessage : IReusable
+ {
+ private readonly List<KeyValuePair<HeaderCommand, ReadOnlyMemory<char>>> _headers;
+ private readonly int HeaderCharBufferSize;
+ /// <summary>
+ /// Creates a new resusable <see cref="FBMRequestMessage"/>
+ /// </summary>
+ /// <param name="headerBufferSize">The size of the buffer to alloc during initialization</param>
+ internal FBMRequestMessage(int headerBufferSize)
+ {
+ HeaderCharBufferSize = headerBufferSize;
+ _headers = new();
+ }
+
+ private char[]? _headerBuffer;
+
+ /// <summary>
+ /// The ID of the current message
+ /// </summary>
+ public int MessageId { get; private set; }
+ /// <summary>
+ /// Gets the underlying socket-id fot the current connection
+ /// </summary>
+ public string? ConnectionId { get; private set; }
+ /// <summary>
+ /// The raw request message, positioned to the body section of the message data
+ /// </summary>
+ public VnMemoryStream? RequestBody { get; private set; }
+ /// <summary>
+ /// A collection of headers for the current request
+ /// </summary>
+ public IReadOnlyList<KeyValuePair<HeaderCommand, ReadOnlyMemory<char>>> Headers => _headers;
+ /// <summary>
+ /// Status flags set during the message parsing
+ /// </summary>
+ public HeaderParseError ParseStatus { get; private set; }
+ /// <summary>
+ /// The message body data as a <see cref="ReadOnlySpan{T}"/>
+ /// </summary>
+ public ReadOnlySpan<byte> BodyData => Helpers.GetRemainingData(RequestBody!);
+
+ /// <summary>
+ /// Determines if the current message is considered a control frame
+ /// </summary>
+ public bool IsControlFrame { get; private set; }
+
+ /// <summary>
+ /// Prepares the request to be serviced
+ /// </summary>
+ /// <param name="vms">The request data packet</param>
+ /// <param name="socketId">The unique id of the connection</param>
+ /// <param name="dataEncoding">The data encoding used to decode header values</param>
+ internal void Prepare(VnMemoryStream vms, string socketId, Encoding dataEncoding)
+ {
+ //Store request body
+ RequestBody = vms;
+ //Store message id
+ MessageId = Helpers.GetMessageId(Helpers.ReadLine(vms));
+ //Check mid for control frame
+ if(MessageId == Helpers.CONTROL_FRAME_MID)
+ {
+ IsControlFrame = true;
+ }
+ else if (MessageId < 1)
+ {
+ ParseStatus |= HeaderParseError.InvalidId;
+ return;
+ }
+
+ ConnectionId = socketId;
+
+ //sliding window over remaining data from internal buffer
+ ForwardOnlyMemoryWriter<char> writer = new(_headerBuffer);
+
+ //Accumulate headers
+ while (true)
+ {
+ //Read the next line from the current stream
+ ReadOnlySpan<byte> line = Helpers.ReadLine(vms);
+ if (line.IsEmpty)
+ {
+ //Done reading headers
+ break;
+ }
+ HeaderCommand cmd = Helpers.GetHeaderCommand(line);
+ //Get header value
+ ERRNO charsRead = Helpers.GetHeaderValue(line, writer.Remaining.Span, dataEncoding);
+ if (charsRead < 0)
+ {
+ //Out of buffer space
+ ParseStatus |= HeaderParseError.HeaderOutOfMem;
+ break;
+ }
+ else if (!charsRead)
+ {
+ //Invalid header
+ ParseStatus |= HeaderParseError.InvalidHeaderRead;
+ }
+ else
+ {
+ //Store header as a read-only sequence
+ _headers.Add(new(cmd, writer.Remaining[..(int)charsRead]));
+ //Shift buffer window
+ writer.Advance(charsRead);
+ }
+ }
+ }
+
+ /// <summary>
+ /// Deserializes the request body into a new specified object type
+ /// </summary>
+ /// <typeparam name="T">The type of the object to deserialize</typeparam>
+ /// <param name="jso">The <see cref="JsonSerializerOptions"/> to use while deserializing data</param>
+ /// <returns>The deserialized object from the request body</returns>
+ /// <exception cref="JsonException"></exception>
+ public T? DeserializeBody<T>(JsonSerializerOptions? jso = default)
+ {
+ return BodyData.IsEmpty ? default : BodyData.AsJsonObject<T>(jso);
+ }
+ /// <summary>
+ /// Gets a <see cref="JsonDocument"/> of the request body
+ /// </summary>
+ /// <returns>The parsed <see cref="JsonDocument"/> if parsed successfully, or null otherwise</returns>
+ /// <exception cref="JsonException"></exception>
+ public JsonDocument? GetBodyAsJson()
+ {
+ Utf8JsonReader reader = new(BodyData);
+ return JsonDocument.TryParseValue(ref reader, out JsonDocument? jdoc) ? jdoc : default;
+ }
+
+ void IReusable.Prepare()
+ {
+ ParseStatus = HeaderParseError.None;
+ //Alloc header buffer
+ _headerBuffer = ArrayPool<char>.Shared.Rent(HeaderCharBufferSize);
+ }
+
+
+ bool IReusable.Release()
+ {
+ //Dispose the request message
+ RequestBody?.Dispose();
+ RequestBody = null;
+ //Clear headers before freeing buffer
+ _headers.Clear();
+ //Free header-buffer
+ ArrayPool<char>.Shared.Return(_headerBuffer!);
+ _headerBuffer = null;
+ ConnectionId = null;
+ MessageId = 0;
+ IsControlFrame = false;
+ return true;
+ }
+ }
+}
diff --git a/lib/Net.Messaging.FBM/src/Server/FBMResponseMessage.cs b/lib/Net.Messaging.FBM/src/Server/FBMResponseMessage.cs
new file mode 100644
index 0000000..ac34dda
--- /dev/null
+++ b/lib/Net.Messaging.FBM/src/Server/FBMResponseMessage.cs
@@ -0,0 +1,226 @@
+/*
+* Copyright (c) 2022 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Net.Messaging.FBM
+* File: FBMResponseMessage.cs
+*
+* FBMResponseMessage.cs is part of VNLib.Net.Messaging.FBM which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+
+using VNLib.Net.Http;
+using VNLib.Utils.IO;
+using VNLib.Utils.Extensions;
+using VNLib.Utils.Memory.Caching;
+using VNLib.Net.Messaging.FBM.Client;
+
+namespace VNLib.Net.Messaging.FBM.Server
+{
+
+ /// <summary>
+ /// Represents an FBM request response container.
+ /// </summary>
+ public sealed class FBMResponseMessage : IReusable, IFBMMessage
+ {
+ internal FBMResponseMessage(int internalBufferSize, Encoding headerEncoding)
+ {
+ _headerAccumulator = new HeaderDataAccumulator(internalBufferSize);
+ _headerEncoding = headerEncoding;
+ _messageEnumerator = new(this);
+ }
+
+ private readonly MessageSegmentEnumerator _messageEnumerator;
+ private readonly ISlindingWindowBuffer<byte> _headerAccumulator;
+ private readonly Encoding _headerEncoding;
+
+ private IAsyncMessageBody? _messageBody;
+
+ ///<inheritdoc/>
+ public int MessageId { get; private set; }
+
+ void IReusable.Prepare()
+ {
+ (_headerAccumulator as HeaderDataAccumulator)!.Prepare();
+ }
+
+ bool IReusable.Release()
+ {
+ //Release header accumulator
+ _headerAccumulator.Close();
+
+ _messageBody = null;
+
+ MessageId = 0;
+
+ return true;
+ }
+
+ /// <summary>
+ /// Initializes the response message with the specified message-id
+ /// to respond with
+ /// </summary>
+ /// <param name="messageId">The message id of the context to respond to</param>
+ internal void Prepare(int messageId)
+ {
+ //Reset accumulator when message id is written
+ _headerAccumulator.Reset();
+ //Write the messageid to the begining of the headers buffer
+ MessageId = messageId;
+ _headerAccumulator.Append((byte)HeaderCommand.MessageId);
+ _headerAccumulator.Append(messageId);
+ _headerAccumulator.WriteTermination();
+ }
+
+ ///<inheritdoc/>
+ public void WriteHeader(HeaderCommand header, ReadOnlySpan<char> value)
+ {
+ WriteHeader((byte)header, value);
+ }
+ ///<inheritdoc/>
+ public void WriteHeader(byte header, ReadOnlySpan<char> value)
+ {
+ _headerAccumulator.WriteHeader(header, value, _headerEncoding);
+ }
+
+ ///<inheritdoc/>
+ public void WriteBody(ReadOnlySpan<byte> body, ContentType contentType = ContentType.Binary)
+ {
+ //Append content type header
+ WriteHeader(HeaderCommand.ContentType, HttpHelpers.GetContentTypeString(contentType));
+ //end header segment
+ _headerAccumulator.WriteTermination();
+ //Write message body
+ _headerAccumulator.Append(body);
+ }
+
+ /// <summary>
+ /// Sets the response message body
+ /// </summary>
+ /// <param name="messageBody">The <see cref="IAsyncMessageBody"/> to stream data from</param>
+ /// <exception cref="InvalidOperationException"></exception>
+ public void AddMessageBody(IAsyncMessageBody messageBody)
+ {
+ if(_messageBody != null)
+ {
+ throw new InvalidOperationException("The message body is already set");
+ }
+
+ //Append message content type header
+ WriteHeader(HeaderCommand.ContentType, HttpHelpers.GetContentTypeString(messageBody.ContentType));
+
+ //end header segment
+ _headerAccumulator.WriteTermination();
+
+ //Store message body
+ _messageBody = messageBody;
+
+ }
+
+ /// <summary>
+ /// Gets the internal message body enumerator and prepares the message for sending
+ /// </summary>
+ /// <param name="cancellationToken">A cancellation token</param>
+ /// <returns>A value task that returns the message body enumerator</returns>
+ internal async ValueTask<IAsyncMessageReader> GetResponseDataAsync(CancellationToken cancellationToken)
+ {
+ //try to buffer as much data in the header segment first
+ if(_messageBody?.RemainingSize > 0 && _headerAccumulator.RemainingSize > 0)
+ {
+ //Read data from the message
+ int read = await _messageBody.ReadAsync(_headerAccumulator.RemainingBuffer, cancellationToken);
+ //Advance accumulator to the read bytes
+ _headerAccumulator.Advance(read);
+ }
+ //return reusable enumerator
+ return _messageEnumerator;
+ }
+
+ private sealed class MessageSegmentEnumerator : IAsyncMessageReader
+ {
+ private readonly FBMResponseMessage _message;
+
+ bool HeadersRead;
+
+ public MessageSegmentEnumerator(FBMResponseMessage message)
+ {
+ _message = message;
+ }
+
+ public ReadOnlyMemory<byte> Current { get; private set; }
+
+ public bool DataRemaining { get; private set; }
+
+ public async ValueTask<bool> MoveNextAsync()
+ {
+ //Attempt to read header segment first
+ if (!HeadersRead)
+ {
+ //Set the accumulated buffer
+ Current = _message._headerAccumulator.AccumulatedBuffer;
+
+ //Update data remaining flag
+ DataRemaining = _message._messageBody?.RemainingSize > 0;
+
+ //Set headers read flag
+ HeadersRead = true;
+
+ return true;
+ }
+ else if (_message._messageBody?.RemainingSize > 0)
+ {
+ //Use the header buffer as the buffer for the message body
+ Memory<byte> buffer = _message._headerAccumulator.Buffer;
+
+ //Read body segment
+ int read = await _message._messageBody.ReadAsync(buffer);
+
+ //Update data remaining flag
+ DataRemaining = _message._messageBody.RemainingSize > 0;
+
+ if (read > 0)
+ {
+ //Store the read segment
+ Current = buffer[..read];
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public async ValueTask DisposeAsync()
+ {
+ //Clear current segment
+ Current = default;
+
+ //Reset headers read flag
+ HeadersRead = false;
+
+ //Dispose the message body if set
+ if (_message._messageBody != null)
+ {
+ await _message._messageBody.DisposeAsync();
+ }
+ }
+ }
+ }
+
+}
diff --git a/lib/Net.Messaging.FBM/src/Server/HeaderDataAccumulator.cs b/lib/Net.Messaging.FBM/src/Server/HeaderDataAccumulator.cs
new file mode 100644
index 0000000..423a26e
--- /dev/null
+++ b/lib/Net.Messaging.FBM/src/Server/HeaderDataAccumulator.cs
@@ -0,0 +1,89 @@
+/*
+* Copyright (c) 2022 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Net.Messaging.FBM
+* File: HeaderDataAccumulator.cs
+*
+* HeaderDataAccumulator.cs is part of VNLib.Net.Messaging.FBM which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+using System.Buffers;
+
+using VNLib.Utils.IO;
+
+
+namespace VNLib.Net.Messaging.FBM.Server
+{
+ /// <summary>
+ /// Reusable sliding window impl
+ /// </summary>
+ internal sealed class HeaderDataAccumulator : ISlindingWindowBuffer<byte>
+ {
+ private readonly int BufferSize;
+
+ private byte[]? _memHandle;
+
+ public HeaderDataAccumulator(int bufferSize)
+ {
+ BufferSize = bufferSize;
+ }
+
+ ///<inheritdoc/>
+ public int WindowStartPos { get; private set; }
+ ///<inheritdoc/>
+ public int WindowEndPos { get; private set; }
+ ///<inheritdoc/>
+ public Memory<byte> Buffer => _memHandle.AsMemory();
+
+ ///<inheritdoc/>
+ public void Advance(int count) => WindowEndPos += count;
+
+ ///<inheritdoc/>
+ public void AdvanceStart(int count) => WindowEndPos += count;
+
+ ///<inheritdoc/>
+ public void Reset()
+ {
+ WindowStartPos = 0;
+ WindowEndPos = 0;
+ }
+
+ /// <summary>
+ /// Allocates the internal message buffer
+ /// </summary>
+ public void Prepare()
+ {
+ _memHandle ??= ArrayPool<byte>.Shared.Rent(BufferSize);
+ }
+
+ ///<inheritdoc/>
+ public void Close()
+ {
+ Reset();
+
+ if (_memHandle != null)
+ {
+ //Return the buffer to the pool
+ ArrayPool<byte>.Shared.Return(_memHandle);
+ _memHandle = null;
+ }
+ }
+ }
+
+}
diff --git a/lib/Net.Messaging.FBM/src/Server/IAsyncMessageBody.cs b/lib/Net.Messaging.FBM/src/Server/IAsyncMessageBody.cs
new file mode 100644
index 0000000..5566520
--- /dev/null
+++ b/lib/Net.Messaging.FBM/src/Server/IAsyncMessageBody.cs
@@ -0,0 +1,57 @@
+/*
+* Copyright (c) 2022 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Net.Messaging.FBM
+* File: IAsyncMessageBody.cs
+*
+* IAsyncMessageBody.cs is part of VNLib.Net.Messaging.FBM which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+
+using VNLib.Net.Http;
+
+namespace VNLib.Net.Messaging.FBM
+{
+ /// <summary>
+ /// A disposable message body container for asynchronously reading a variable length message body
+ /// </summary>
+ public interface IAsyncMessageBody : IAsyncDisposable
+ {
+ /// <summary>
+ /// The message body content type
+ /// </summary>
+ ContentType ContentType { get; }
+
+ /// <summary>
+ /// The number of bytes remaining to be read from the message body
+ /// </summary>
+ int RemainingSize { get; }
+
+ /// <summary>
+ /// Reads the next chunk of data from the message body
+ /// </summary>
+ /// <param name="buffer">The buffer to copy output data to</param>
+ /// <param name="token">A token to cancel the operation</param>
+ /// <returns></returns>
+ ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken token = default);
+ }
+
+}
diff --git a/lib/Net.Messaging.FBM/src/Server/IAsyncMessageReader.cs b/lib/Net.Messaging.FBM/src/Server/IAsyncMessageReader.cs
new file mode 100644
index 0000000..b2abe8d
--- /dev/null
+++ b/lib/Net.Messaging.FBM/src/Server/IAsyncMessageReader.cs
@@ -0,0 +1,42 @@
+/*
+* Copyright (c) 2022 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Net.Messaging.FBM
+* File: IAsyncMessageReader.cs
+*
+* IAsyncMessageReader.cs is part of VNLib.Net.Messaging.FBM which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+using System.Collections.Generic;
+
+
+namespace VNLib.Net.Messaging.FBM.Server
+{
+ /// <summary>
+ /// Internal message body reader/enumerator for FBM messages
+ /// </summary>
+ internal interface IAsyncMessageReader : IAsyncEnumerator<ReadOnlyMemory<byte>>
+ {
+ /// <summary>
+ /// A value that indicates if there is data remaining after a
+ /// </summary>
+ bool DataRemaining { get; }
+ }
+
+}
diff --git a/lib/Net.Messaging.FBM/src/Server/readme.md b/lib/Net.Messaging.FBM/src/Server/readme.md
new file mode 100644
index 0000000..489e58f
--- /dev/null
+++ b/lib/Net.Messaging.FBM/src/Server/readme.md
@@ -0,0 +1,35 @@
+# VNLib.Net.Messaging.FBM.Server
+
+Fixed Buffer Messaging Protocol server library. High performance statful messaging
+protocol built on top of HTTP web-sockets. Low/no allocation, completely asynchronous
+while providing a TPL API. This library provides a simple asynchronous request/response
+architecture to web-sockets. This was initially designed to provide an alternative to
+complete HTTP request/response overhead, but allow a simple control flow for work
+across a network.
+
+Messages consist of a 4 byte message id, a collection of headers, and a message body.
+The first 4 bytes of a message is the ID (for normal messages a signed integer greater than 0),
+0 is reserved for error conditions, and negative numbers are reserved for internal
+messages. Headers are identified by a single byte, followed by a variable length UTF8
+encoded character sequence, followed by a termination of 0xFF, 0xF1 (may change).
+
+### Message structure
+ 4 byte positive (signed 32-bit integer) message id
+ 2 byte termination
+ 1 byte header-id
+ variable length UTF8 value
+ 2 byte termination
+ -- other headers --
+ 2 byte termination (extra termination, ie: empty header)
+ variable length payload
+ (end of message is the end of the payload)
+
+
+XML Documentation is or will be provided for almost all public exports. APIs are intended to
+be sensibly public and immutable to allow for easy extensability (via extension methods). I
+often use extension libraries to provide additional functionality. (See cache library)
+
+This library is likely a niche use case, and is probably not for everyone. Unless you care
+about reasonably efficient high frequency request/response messaging, this probably isnt
+for you. This library provides a reasonable building block for distributed lock mechanisms
+and small data caching. \ No newline at end of file
diff --git a/lib/Net.Messaging.FBM/src/VNLib.Net.Messaging.FBM.csproj b/lib/Net.Messaging.FBM/src/VNLib.Net.Messaging.FBM.csproj
new file mode 100644
index 0000000..d91fb0a
--- /dev/null
+++ b/lib/Net.Messaging.FBM/src/VNLib.Net.Messaging.FBM.csproj
@@ -0,0 +1,33 @@
+<Project Sdk="Microsoft.NET.Sdk">
+
+ <PropertyGroup>
+ <TargetFramework>net6.0</TargetFramework>
+ <Authors>Vaughn Nugent</Authors>
+ <Version>1.0.1.1</Version>
+ <Copyright>Copyright © 2022 Vaughn Nugent</Copyright>
+ <Nullable>enable</Nullable>
+ <PackageProjectUrl>www.vaughnnugent.com/resources</PackageProjectUrl>
+ <AnalysisLevel>latest-all</AnalysisLevel>
+ <SignAssembly>True</SignAssembly>
+ <AssemblyOriginatorKeyFile>\\vaughnnugent.com\Internal\Folder Redirection\vman\Documents\Programming\Software\StrongNameingKey.snk</AssemblyOriginatorKeyFile>
+ </PropertyGroup>
+
+
+ <ItemGroup>
+ <PackageReference Include="ErrorProne.NET.CoreAnalyzers" Version="0.1.2">
+ <PrivateAssets>all</PrivateAssets>
+ <IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
+ </PackageReference>
+ <PackageReference Include="ErrorProne.NET.Structs" Version="0.1.2">
+ <PrivateAssets>all</PrivateAssets>
+ <IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
+ </PackageReference>
+ </ItemGroup>
+
+ <ItemGroup>
+ <ProjectReference Include="..\..\Net.Http\src\VNLib.Net.Http.csproj" />
+ <ProjectReference Include="..\..\Plugins.Essentials\src\VNLib.Plugins.Essentials.csproj" />
+ <ProjectReference Include="..\..\Utils\src\VNLib.Utils.csproj" />
+ </ItemGroup>
+
+</Project>