aboutsummaryrefslogtreecommitdiff
path: root/Net.Messaging.FBM/src/Server
diff options
context:
space:
mode:
authorLibravatar vnugent <public@vaughnnugent.com>2023-01-08 16:01:54 -0500
committerLibravatar vnugent <public@vaughnnugent.com>2023-01-08 16:01:54 -0500
commitde94d788e9a47432a7630a8215896b8dd3628599 (patch)
tree666dec06eef861d101cb6948aff52a3d354c8d73 /Net.Messaging.FBM/src/Server
parentbe6dc557a3b819248b014992eb96c1cb21f8112b (diff)
Reorder + analyzer cleanup
Diffstat (limited to 'Net.Messaging.FBM/src/Server')
-rw-r--r--Net.Messaging.FBM/src/Server/FBMContext.cs85
-rw-r--r--Net.Messaging.FBM/src/Server/FBMListener.cs389
-rw-r--r--Net.Messaging.FBM/src/Server/FBMListenerBase.cs113
-rw-r--r--Net.Messaging.FBM/src/Server/FBMListenerSessionParams.cs62
-rw-r--r--Net.Messaging.FBM/src/Server/FBMRequestMessage.cs196
-rw-r--r--Net.Messaging.FBM/src/Server/FBMResponseMessage.cs226
-rw-r--r--Net.Messaging.FBM/src/Server/HeaderDataAccumulator.cs89
-rw-r--r--Net.Messaging.FBM/src/Server/IAsyncMessageBody.cs57
-rw-r--r--Net.Messaging.FBM/src/Server/IAsyncMessageReader.cs42
-rw-r--r--Net.Messaging.FBM/src/Server/readme.md35
10 files changed, 0 insertions, 1294 deletions
diff --git a/Net.Messaging.FBM/src/Server/FBMContext.cs b/Net.Messaging.FBM/src/Server/FBMContext.cs
deleted file mode 100644
index fb39d1b..0000000
--- a/Net.Messaging.FBM/src/Server/FBMContext.cs
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
-* Copyright (c) 2022 Vaughn Nugent
-*
-* Library: VNLib
-* Package: VNLib.Net.Messaging.FBM
-* File: FBMContext.cs
-*
-* FBMContext.cs is part of VNLib.Net.Messaging.FBM which is part of the larger
-* VNLib collection of libraries and utilities.
-*
-* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify
-* it under the terms of the GNU Affero General Public License as
-* published by the Free Software Foundation, either version 3 of the
-* License, or (at your option) any later version.
-*
-* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-* GNU Affero General Public License for more details.
-*
-* You should have received a copy of the GNU Affero General Public License
-* along with this program. If not, see https://www.gnu.org/licenses/.
-*/
-
-using System.Text;
-
-using VNLib.Utils.IO;
-using VNLib.Utils.Memory.Caching;
-
-namespace VNLib.Net.Messaging.FBM.Server
-{
- /// <summary>
- /// A request/response pair message context
- /// </summary>
- public sealed class FBMContext : IReusable
- {
- private readonly Encoding _headerEncoding;
-
- /// <summary>
- /// The request message to process
- /// </summary>
- public FBMRequestMessage Request { get; }
- /// <summary>
- /// The response message
- /// </summary>
- public FBMResponseMessage Response { get; }
- /// <summary>
- /// Creates a new reusable <see cref="FBMContext"/>
- /// for use within a <see cref="ObjectRental{T}"/>
- /// cache
- /// </summary>
- /// <param name="requestHeaderBufferSize">The size in characters of the request header buffer</param>
- /// <param name="responseBufferSize">The size in characters of the response header buffer</param>
- /// <param name="headerEncoding">The message header encoding instance</param>
- public FBMContext(int requestHeaderBufferSize, int responseBufferSize, Encoding headerEncoding)
- {
- Request = new(requestHeaderBufferSize);
- Response = new(responseBufferSize, headerEncoding);
- _headerEncoding = headerEncoding;
- }
-
- /// <summary>
- /// Initializes the context with the buffered request data
- /// </summary>
- /// <param name="requestData">The request data buffer positioned at the begining of the request data</param>
- /// <param name="connectionId">The unique id of the connection</param>
- internal void Prepare(VnMemoryStream requestData, string connectionId)
- {
- Request.Prepare(requestData, connectionId, _headerEncoding);
- //Message id is set after the request parses the incoming message
- Response.Prepare(Request.MessageId);
- }
-
- void IReusable.Prepare()
- {
- (Request as IReusable).Prepare();
- (Response as IReusable).Prepare();
- }
-
- bool IReusable.Release()
- {
- return (Request as IReusable).Release() & (Response as IReusable).Release();
- }
- }
-}
diff --git a/Net.Messaging.FBM/src/Server/FBMListener.cs b/Net.Messaging.FBM/src/Server/FBMListener.cs
deleted file mode 100644
index 1d50953..0000000
--- a/Net.Messaging.FBM/src/Server/FBMListener.cs
+++ /dev/null
@@ -1,389 +0,0 @@
-/*
-* Copyright (c) 2022 Vaughn Nugent
-*
-* Library: VNLib
-* Package: VNLib.Net.Messaging.FBM
-* File: FBMListener.cs
-*
-* FBMListener.cs is part of VNLib.Net.Messaging.FBM which is part of the larger
-* VNLib collection of libraries and utilities.
-*
-* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify
-* it under the terms of the GNU Affero General Public License as
-* published by the Free Software Foundation, either version 3 of the
-* License, or (at your option) any later version.
-*
-* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-* GNU Affero General Public License for more details.
-*
-* You should have received a copy of the GNU Affero General Public License
-* along with this program. If not, see https://www.gnu.org/licenses/.
-*/
-
-using System;
-using System.IO;
-using System.Buffers;
-using System.Threading;
-using System.Net.WebSockets;
-using System.Threading.Tasks;
-
-using VNLib.Utils;
-using VNLib.Utils.IO;
-using VNLib.Utils.Async;
-using VNLib.Utils.Memory;
-using VNLib.Utils.Extensions;
-using VNLib.Utils.Memory.Caching;
-using VNLib.Plugins.Essentials;
-
-namespace VNLib.Net.Messaging.FBM.Server
-{
-
- /// <summary>
- /// Method delegate for processing FBM messages from an <see cref="FBMListener"/>
- /// when messages are received
- /// </summary>
- /// <param name="context">The message/connection context</param>
- /// <param name="userState">The state parameter passed on client connected</param>
- /// <param name="cancellationToken">A token that reflects the state of the listener</param>
- /// <returns>A <see cref="Task"/> that resolves when processing is complete</returns>
- public delegate Task RequestHandler(FBMContext context, object? userState, CancellationToken cancellationToken);
-
- /// <summary>
- /// A FBM protocol listener. Listens for messages on a <see cref="WebSocketSession"/>
- /// and raises events on requests.
- /// </summary>
- public class FBMListener
- {
- class ListeningSession
- {
- private readonly ReusableStore<FBMContext> CtxStore;
- private readonly CancellationTokenSource Cancellation;
- private readonly CancellationTokenRegistration Registration;
- private readonly FBMListenerSessionParams Params;
-
-
- public readonly object? UserState;
-
- public readonly SemaphoreSlim ResponseLock;
-
- public readonly WebSocketSession Socket;
-
- public readonly RequestHandler OnRecieved;
-
- public CancellationToken CancellationToken => Cancellation.Token;
-
-
- public ListeningSession(WebSocketSession session, RequestHandler onRecieved, in FBMListenerSessionParams args, object? userState)
- {
- Params = args;
- Socket = session;
- UserState = userState;
- OnRecieved = onRecieved;
-
- //Create cancellation and register for session close
- Cancellation = new();
- Registration = session.Token.Register(Cancellation.Cancel);
-
-
- ResponseLock = new(1);
- CtxStore = ObjectRental.CreateReusable(ContextCtor);
- }
-
- private FBMContext ContextCtor() => new(Params.MaxHeaderBufferSize, Params.ResponseBufferSize, Params.HeaderEncoding);
-
- /// <summary>
- /// Cancels any pending opreations relating to the current session
- /// </summary>
- public void CancelSession()
- {
- Cancellation.Cancel();
-
- //If dispose happens without any outstanding requests, we can dispose the session
- if (_counter == 0)
- {
- CleanupInternal();
- }
- }
-
- private void CleanupInternal()
- {
- Registration.Dispose();
- CtxStore.Dispose();
- Cancellation.Dispose();
- ResponseLock.Dispose();
- }
-
-
- private uint _counter;
-
- /// <summary>
- /// Rents a new <see cref="FBMContext"/> instance from the pool
- /// and increments the counter
- /// </summary>
- /// <returns>The rented instance</returns>
- /// <exception cref="ObjectDisposedException"></exception>
- public FBMContext RentContext()
- {
-
- if (Cancellation.IsCancellationRequested)
- {
- throw new ObjectDisposedException("The instance has been disposed");
- }
-
- //Rent context
- FBMContext ctx = CtxStore.Rent();
- //Increment counter
- Interlocked.Increment(ref _counter);
-
- return ctx;
- }
-
- /// <summary>
- /// Returns a previously rented context to the pool
- /// and decrements the counter. If the session has been
- /// cancelled, when the counter reaches 0, cleanup occurs
- /// </summary>
- /// <param name="ctx">The context to return</param>
- public void ReturnContext(FBMContext ctx)
- {
- //Return the context
- CtxStore.Return(ctx);
-
- uint current = Interlocked.Decrement(ref _counter);
-
- //No more contexts in use, dispose internals
- if (Cancellation.IsCancellationRequested && current == 0)
- {
- ResponseLock.Dispose();
- Cancellation.Dispose();
- CtxStore.Dispose();
- }
- }
- }
-
- public const int SEND_SEMAPHORE_TIMEOUT_MS = 10 * 1000;
-
- private readonly IUnmangedHeap Heap;
-
- /// <summary>
- /// Raised when a response processing error occured
- /// </summary>
- public event EventHandler<Exception>? OnProcessError;
-
- /// <summary>
- /// Creates a new <see cref="FBMListener"/> instance ready for
- /// processing connections
- /// </summary>
- /// <param name="heap">The heap to alloc buffers from</param>
- public FBMListener(IUnmangedHeap heap)
- {
- Heap = heap;
- }
-
- /// <summary>
- /// Begins listening for requests on the current websocket until
- /// a close message is received or an error occurs
- /// </summary>
- /// <param name="wss">The <see cref="WebSocketSession"/> to receive messages on</param>
- /// <param name="handler">The callback method to handle incoming requests</param>
- /// <param name="args">The arguments used to configured this listening session</param>
- /// <param name="userState">A state parameter</param>
- /// <returns>A <see cref="Task"/> that completes when the connection closes</returns>
- public async Task ListenAsync(WebSocketSession wss, RequestHandler handler, FBMListenerSessionParams args, object? userState)
- {
- ListeningSession session = new(wss, handler, args, userState);
- //Alloc a recieve buffer
- using IMemoryOwner<byte> recvBuffer = Heap.DirectAlloc<byte>(args.RecvBufferSize);
-
- //Init new queue for dispatching work
- AsyncQueue<VnMemoryStream> workQueue = new(true, true);
-
- //Start a task to process the queue
- Task queueWorker = QueueWorkerDoWork(workQueue, session);
-
- try
- {
- //Listen for incoming messages
- while (true)
- {
- //Receive a message
- ValueWebSocketReceiveResult result = await wss.ReceiveAsync(recvBuffer.Memory);
- //If a close message has been received, we can gracefully exit
- if (result.MessageType == WebSocketMessageType.Close)
- {
- //Return close message
- await wss.CloseSocketAsync(WebSocketCloseStatus.NormalClosure, "Goodbye");
- //break listen loop
- break;
- }
- //create buffer for storing data
- VnMemoryStream request = new(Heap);
- //Copy initial data
- request.Write(recvBuffer.Memory.Span[..result.Count]);
- //Streaming read
- while (!result.EndOfMessage)
- {
- //Read more data
- result = await wss.ReceiveAsync(recvBuffer.Memory);
- //Make sure the request is small enough to buffer
- if (request.Length + result.Count > args.MaxMessageSize)
- {
- //dispose the buffer
- request.Dispose();
- //close the socket with a message too big
- await wss.CloseSocketAsync(WebSocketCloseStatus.MessageTooBig, "Buffer space exceeded for message. Goodbye");
- //break listen loop
- goto Exit;
- }
- //write to buffer
- request.Write(recvBuffer.Memory.Span[..result.Count]);
- }
- //Make sure data is available
- if (request.Length == 0)
- {
- request.Dispose();
- continue;
- }
- //reset buffer position
- _ = request.Seek(0, SeekOrigin.Begin);
- //Enqueue the request
- await workQueue.EnqueueAsync(request);
- }
-
- Exit:
- ;
- }
- finally
- {
- session.CancelSession();
- await queueWorker.ConfigureAwait(false);
- }
- }
-
- private async Task QueueWorkerDoWork(AsyncQueue<VnMemoryStream> queue, ListeningSession session)
- {
- try
- {
- while (true)
- {
- //Get work from queue
- VnMemoryStream request = await queue.DequeueAsync(session.CancellationToken);
- //Process request without waiting
- _ = ProcessAsync(request, session).ConfigureAwait(false);
- }
- }
- catch (OperationCanceledException)
- { }
- finally
- {
- //Cleanup any queued requests
- while (queue.TryDequeue(out VnMemoryStream? stream))
- {
- stream.Dispose();
- }
- }
- }
-
- private async Task ProcessAsync(VnMemoryStream data, ListeningSession session)
- {
- //Rent a new request object
- FBMContext context = session.RentContext();
- try
- {
- //Prepare the request/response
- context.Prepare(data, session.Socket.SocketID);
-
- if ((context.Request.ParseStatus & HeaderParseError.InvalidId) > 0)
- {
- OnProcessError?.Invoke(this, new FBMException($"Invalid messageid {context.Request.MessageId}, message length {data.Length}"));
- return;
- }
-
- //Check parse status flags
- if ((context.Request.ParseStatus & HeaderParseError.HeaderOutOfMem) > 0)
- {
- OnProcessError?.Invoke(this, new OutOfMemoryException("Packet received with not enough space to store headers"));
- }
- //Determine if request is an out-of-band message
- else if (context.Request.MessageId == Helpers.CONTROL_FRAME_MID)
- {
- //Process control frame
- await ProcessOOBAsync(context);
- }
- else
- {
- //Invoke normal message handler
- await session.OnRecieved.Invoke(context, session.UserState, session.CancellationToken);
- }
-
- //Get response data
- await using IAsyncMessageReader messageEnumerator = await context.Response.GetResponseDataAsync(session.CancellationToken);
-
- //Load inital segment
- if (await messageEnumerator.MoveNextAsync() && !session.CancellationToken.IsCancellationRequested)
- {
- ValueTask sendTask;
-
- //Syncrhonize access to send data because we may need to stream data to the client
- await session.ResponseLock.WaitAsync(SEND_SEMAPHORE_TIMEOUT_MS);
- try
- {
- do
- {
- bool eof = !messageEnumerator.DataRemaining;
-
- //Send first segment
- sendTask = session.Socket.SendAsync(messageEnumerator.Current, WebSocketMessageType.Binary, eof);
-
- /*
- * WARNING!
- * this code relies on the managed websocket impl that the websocket will read
- * the entire buffer before returning. If this is not the case, this code will
- * overwrite the memory buffer on the next call to move next.
- */
-
- //Move to next segment
- if (!await messageEnumerator.MoveNextAsync())
- {
- break;
- }
-
- //Await previous send
- await sendTask;
-
- } while (true);
- }
- finally
- {
- //release semaphore
- session.ResponseLock.Release();
- }
-
- await sendTask;
- }
-
- //No data to send
- }
- catch (Exception ex)
- {
- OnProcessError?.Invoke(this, ex);
- }
- finally
- {
- session.ReturnContext(context);
- }
- }
-
- /// <summary>
- /// Processes an out-of-band request message (internal communications)
- /// </summary>
- /// <param name="outOfBandContext">The <see cref="FBMContext"/> containing the OOB message</param>
- /// <returns>A <see cref="Task"/> that completes when the operation completes</returns>
- protected virtual Task ProcessOOBAsync(FBMContext outOfBandContext)
- {
- return Task.CompletedTask;
- }
- }
-}
diff --git a/Net.Messaging.FBM/src/Server/FBMListenerBase.cs b/Net.Messaging.FBM/src/Server/FBMListenerBase.cs
deleted file mode 100644
index 3e9fde2..0000000
--- a/Net.Messaging.FBM/src/Server/FBMListenerBase.cs
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
-* Copyright (c) 2022 Vaughn Nugent
-*
-* Library: VNLib
-* Package: VNLib.Net.Messaging.FBM
-* File: FBMListenerBase.cs
-*
-* FBMListenerBase.cs is part of VNLib.Net.Messaging.FBM which is part of the larger
-* VNLib collection of libraries and utilities.
-*
-* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify
-* it under the terms of the GNU Affero General Public License as
-* published by the Free Software Foundation, either version 3 of the
-* License, or (at your option) any later version.
-*
-* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-* GNU Affero General Public License for more details.
-*
-* You should have received a copy of the GNU Affero General Public License
-* along with this program. If not, see https://www.gnu.org/licenses/.
-*/
-
-using System;
-using System.Threading;
-using System.Threading.Tasks;
-
-using VNLib.Utils.Logging;
-using VNLib.Utils.Memory;
-using VNLib.Plugins.Essentials;
-
-namespace VNLib.Net.Messaging.FBM.Server
-{
- /// <summary>
- /// Provides a simple base class for an <see cref="FBMListener"/>
- /// processor
- /// </summary>
- public abstract class FBMListenerBase
- {
-
- /// <summary>
- /// The initialzied listener
- /// </summary>
- protected FBMListener? Listener { get; private set; }
- /// <summary>
- /// A provider to write log information to
- /// </summary>
- protected abstract ILogProvider Log { get; }
-
- /// <summary>
- /// Initializes the <see cref="FBMListener"/>
- /// </summary>
- /// <param name="heap">The heap to alloc buffers from</param>
- protected void InitListener(IUnmangedHeap heap)
- {
- Listener = new(heap);
- //Attach service handler
- Listener.OnProcessError += Listener_OnProcessError;
- }
-
- /// <summary>
- /// A single event service routine for servicing errors that occur within
- /// the listener loop
- /// </summary>
- /// <param name="sender"></param>
- /// <param name="e">The exception that was raised</param>
- protected virtual void Listener_OnProcessError(object? sender, Exception e)
- {
- //Write the error to the log
- Log.Error(e);
- }
-
- private async Task OnReceivedAsync(FBMContext context, object? userState, CancellationToken token)
- {
- try
- {
- await ProcessAsync(context, userState, token);
- }
- catch (OperationCanceledException)
- {
- Log.Debug("Async operation cancelled");
- }
- catch(Exception ex)
- {
- Log.Error(ex);
- }
- }
-
- /// <summary>
- /// Begins listening for requests on the current websocket until
- /// a close message is received or an error occurs
- /// </summary>
- /// <param name="wss">The <see cref="WebSocketSession"/> to receive messages on</param>
- /// <param name="args">The arguments used to configured this listening session</param>
- /// <param name="userState">A state token to use for processing events for this connection</param>
- /// <returns>A <see cref="Task"/> that completes when the connection closes</returns>
- public virtual async Task ListenAsync(WebSocketSession wss, FBMListenerSessionParams args, object? userState)
- {
- _ = Listener ?? throw new InvalidOperationException("The listener has not been intialized");
- await Listener.ListenAsync(wss, OnReceivedAsync, args, userState);
- }
-
- /// <summary>
- /// A method to service an incoming message
- /// </summary>
- /// <param name="context">The context containing the message to be serviced</param>
- /// <param name="userState">A state token passed on client connected</param>
- /// <param name="exitToken">A token that reflects the state of the listener</param>
- /// <returns>A task that completes when the message has been serviced</returns>
- protected abstract Task ProcessAsync(FBMContext context, object? userState, CancellationToken exitToken);
- }
-}
diff --git a/Net.Messaging.FBM/src/Server/FBMListenerSessionParams.cs b/Net.Messaging.FBM/src/Server/FBMListenerSessionParams.cs
deleted file mode 100644
index c327475..0000000
--- a/Net.Messaging.FBM/src/Server/FBMListenerSessionParams.cs
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
-* Copyright (c) 2022 Vaughn Nugent
-*
-* Library: VNLib
-* Package: VNLib.Net.Messaging.FBM
-* File: FBMListenerSessionParams.cs
-*
-* FBMListenerSessionParams.cs is part of VNLib.Net.Messaging.FBM which is part of the larger
-* VNLib collection of libraries and utilities.
-*
-* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify
-* it under the terms of the GNU Affero General Public License as
-* published by the Free Software Foundation, either version 3 of the
-* License, or (at your option) any later version.
-*
-* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-* GNU Affero General Public License for more details.
-*
-* You should have received a copy of the GNU Affero General Public License
-* along with this program. If not, see https://www.gnu.org/licenses/.
-*/
-
-using System.Text;
-
-namespace VNLib.Net.Messaging.FBM.Server
-{
- /// <summary>
- /// Represents a configuration structure for an <see cref="FBMListener"/>
- /// listening session
- /// </summary>
- public readonly struct FBMListenerSessionParams
- {
- /// <summary>
- /// The size of the buffer to use while reading data from the websocket
- /// in the listener loop
- /// </summary>
- public readonly int RecvBufferSize { get; init; }
- /// <summary>
- /// The size of the character buffer to store FBMheader values in
- /// the <see cref="FBMRequestMessage"/>
- /// </summary>
- public readonly int MaxHeaderBufferSize { get; init; }
- /// <summary>
- /// The size of the internal message response buffer when
- /// not streaming
- /// </summary>
- public readonly int ResponseBufferSize { get; init; }
- /// <summary>
- /// The FMB message header character encoding
- /// </summary>
- public readonly Encoding HeaderEncoding { get; init; }
-
- /// <summary>
- /// The absolute maxium size (in bytes) message to process before
- /// closing the websocket connection. This value should be negotiaed
- /// by clients or hard-coded to avoid connection issues
- /// </summary>
- public readonly int MaxMessageSize { get; init; }
- }
-}
diff --git a/Net.Messaging.FBM/src/Server/FBMRequestMessage.cs b/Net.Messaging.FBM/src/Server/FBMRequestMessage.cs
deleted file mode 100644
index ed36571..0000000
--- a/Net.Messaging.FBM/src/Server/FBMRequestMessage.cs
+++ /dev/null
@@ -1,196 +0,0 @@
-/*
-* Copyright (c) 2022 Vaughn Nugent
-*
-* Library: VNLib
-* Package: VNLib.Net.Messaging.FBM
-* File: FBMRequestMessage.cs
-*
-* FBMRequestMessage.cs is part of VNLib.Net.Messaging.FBM which is part of the larger
-* VNLib collection of libraries and utilities.
-*
-* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify
-* it under the terms of the GNU Affero General Public License as
-* published by the Free Software Foundation, either version 3 of the
-* License, or (at your option) any later version.
-*
-* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-* GNU Affero General Public License for more details.
-*
-* You should have received a copy of the GNU Affero General Public License
-* along with this program. If not, see https://www.gnu.org/licenses/.
-*/
-
-using System;
-using System.Text;
-using System.Buffers;
-using System.Text.Json;
-using System.Collections.Generic;
-
-using VNLib.Utils;
-using VNLib.Utils.IO;
-using VNLib.Utils.Memory;
-using VNLib.Utils.Extensions;
-using VNLib.Utils.Memory.Caching;
-
-namespace VNLib.Net.Messaging.FBM.Server
-{
- /// <summary>
- /// Represents a client request message to be serviced
- /// </summary>
- public sealed class FBMRequestMessage : IReusable
- {
- private readonly List<KeyValuePair<HeaderCommand, ReadOnlyMemory<char>>> _headers;
- private readonly int HeaderCharBufferSize;
- /// <summary>
- /// Creates a new resusable <see cref="FBMRequestMessage"/>
- /// </summary>
- /// <param name="headerBufferSize">The size of the buffer to alloc during initialization</param>
- internal FBMRequestMessage(int headerBufferSize)
- {
- HeaderCharBufferSize = headerBufferSize;
- _headers = new();
- }
-
- private char[]? _headerBuffer;
-
- /// <summary>
- /// The ID of the current message
- /// </summary>
- public int MessageId { get; private set; }
- /// <summary>
- /// Gets the underlying socket-id fot the current connection
- /// </summary>
- public string? ConnectionId { get; private set; }
- /// <summary>
- /// The raw request message, positioned to the body section of the message data
- /// </summary>
- public VnMemoryStream? RequestBody { get; private set; }
- /// <summary>
- /// A collection of headers for the current request
- /// </summary>
- public IReadOnlyList<KeyValuePair<HeaderCommand, ReadOnlyMemory<char>>> Headers => _headers;
- /// <summary>
- /// Status flags set during the message parsing
- /// </summary>
- public HeaderParseError ParseStatus { get; private set; }
- /// <summary>
- /// The message body data as a <see cref="ReadOnlySpan{T}"/>
- /// </summary>
- public ReadOnlySpan<byte> BodyData => Helpers.GetRemainingData(RequestBody!);
-
- /// <summary>
- /// Determines if the current message is considered a control frame
- /// </summary>
- public bool IsControlFrame { get; private set; }
-
- /// <summary>
- /// Prepares the request to be serviced
- /// </summary>
- /// <param name="vms">The request data packet</param>
- /// <param name="socketId">The unique id of the connection</param>
- /// <param name="dataEncoding">The data encoding used to decode header values</param>
- internal void Prepare(VnMemoryStream vms, string socketId, Encoding dataEncoding)
- {
- //Store request body
- RequestBody = vms;
- //Store message id
- MessageId = Helpers.GetMessageId(Helpers.ReadLine(vms));
- //Check mid for control frame
- if(MessageId == Helpers.CONTROL_FRAME_MID)
- {
- IsControlFrame = true;
- }
- else if (MessageId < 1)
- {
- ParseStatus |= HeaderParseError.InvalidId;
- return;
- }
-
- ConnectionId = socketId;
-
- //sliding window over remaining data from internal buffer
- ForwardOnlyMemoryWriter<char> writer = new(_headerBuffer);
-
- //Accumulate headers
- while (true)
- {
- //Read the next line from the current stream
- ReadOnlySpan<byte> line = Helpers.ReadLine(vms);
- if (line.IsEmpty)
- {
- //Done reading headers
- break;
- }
- HeaderCommand cmd = Helpers.GetHeaderCommand(line);
- //Get header value
- ERRNO charsRead = Helpers.GetHeaderValue(line, writer.Remaining.Span, dataEncoding);
- if (charsRead < 0)
- {
- //Out of buffer space
- ParseStatus |= HeaderParseError.HeaderOutOfMem;
- break;
- }
- else if (!charsRead)
- {
- //Invalid header
- ParseStatus |= HeaderParseError.InvalidHeaderRead;
- }
- else
- {
- //Store header as a read-only sequence
- _headers.Add(new(cmd, writer.Remaining[..(int)charsRead]));
- //Shift buffer window
- writer.Advance(charsRead);
- }
- }
- }
-
- /// <summary>
- /// Deserializes the request body into a new specified object type
- /// </summary>
- /// <typeparam name="T">The type of the object to deserialize</typeparam>
- /// <param name="jso">The <see cref="JsonSerializerOptions"/> to use while deserializing data</param>
- /// <returns>The deserialized object from the request body</returns>
- /// <exception cref="JsonException"></exception>
- public T? DeserializeBody<T>(JsonSerializerOptions? jso = default)
- {
- return BodyData.IsEmpty ? default : BodyData.AsJsonObject<T>(jso);
- }
- /// <summary>
- /// Gets a <see cref="JsonDocument"/> of the request body
- /// </summary>
- /// <returns>The parsed <see cref="JsonDocument"/> if parsed successfully, or null otherwise</returns>
- /// <exception cref="JsonException"></exception>
- public JsonDocument? GetBodyAsJson()
- {
- Utf8JsonReader reader = new(BodyData);
- return JsonDocument.TryParseValue(ref reader, out JsonDocument? jdoc) ? jdoc : default;
- }
-
- void IReusable.Prepare()
- {
- ParseStatus = HeaderParseError.None;
- //Alloc header buffer
- _headerBuffer = ArrayPool<char>.Shared.Rent(HeaderCharBufferSize);
- }
-
-
- bool IReusable.Release()
- {
- //Dispose the request message
- RequestBody?.Dispose();
- RequestBody = null;
- //Clear headers before freeing buffer
- _headers.Clear();
- //Free header-buffer
- ArrayPool<char>.Shared.Return(_headerBuffer!);
- _headerBuffer = null;
- ConnectionId = null;
- MessageId = 0;
- IsControlFrame = false;
- return true;
- }
- }
-}
diff --git a/Net.Messaging.FBM/src/Server/FBMResponseMessage.cs b/Net.Messaging.FBM/src/Server/FBMResponseMessage.cs
deleted file mode 100644
index 1536c99..0000000
--- a/Net.Messaging.FBM/src/Server/FBMResponseMessage.cs
+++ /dev/null
@@ -1,226 +0,0 @@
-/*
-* Copyright (c) 2022 Vaughn Nugent
-*
-* Library: VNLib
-* Package: VNLib.Net.Messaging.FBM
-* File: FBMResponseMessage.cs
-*
-* FBMResponseMessage.cs is part of VNLib.Net.Messaging.FBM which is part of the larger
-* VNLib collection of libraries and utilities.
-*
-* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify
-* it under the terms of the GNU Affero General Public License as
-* published by the Free Software Foundation, either version 3 of the
-* License, or (at your option) any later version.
-*
-* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-* GNU Affero General Public License for more details.
-*
-* You should have received a copy of the GNU Affero General Public License
-* along with this program. If not, see https://www.gnu.org/licenses/.
-*/
-
-using System;
-using System.Text;
-using System.Threading;
-using System.Threading.Tasks;
-
-using VNLib.Net.Http;
-using VNLib.Utils.IO;
-using VNLib.Utils.Extensions;
-using VNLib.Utils.Memory.Caching;
-using VNLib.Net.Messaging.FBM.Client;
-
-namespace VNLib.Net.Messaging.FBM.Server
-{
-
- /// <summary>
- /// Represents an FBM request response container.
- /// </summary>
- public sealed class FBMResponseMessage : IReusable, IFBMMessage
- {
- internal FBMResponseMessage(int internalBufferSize, Encoding headerEncoding)
- {
- _headerAccumulator = new HeaderDataAccumulator(internalBufferSize);
- _headerEncoding = headerEncoding;
- _messageEnumerator = new(this);
- }
-
- private readonly MessageSegmentEnumerator _messageEnumerator;
- private readonly ISlindingWindowBuffer<byte> _headerAccumulator;
- private readonly Encoding _headerEncoding;
-
- private IAsyncMessageBody? _messageBody;
-
- ///<inheritdoc/>
- public int MessageId { get; private set; }
-
- void IReusable.Prepare()
- {
- (_headerAccumulator as HeaderDataAccumulator)!.Prepare();
- }
-
- bool IReusable.Release()
- {
- //Release header accumulator
- _headerAccumulator.Close();
-
- _messageBody = null;
-
- MessageId = 0;
-
- return true;
- }
-
- /// <summary>
- /// Initializes the response message with the specified message-id
- /// to respond with
- /// </summary>
- /// <param name="messageId">The message id of the context to respond to</param>
- internal void Prepare(int messageId)
- {
- //Reset accumulator when message id is written
- _headerAccumulator.Reset();
- //Write the messageid to the begining of the headers buffer
- MessageId = messageId;
- _headerAccumulator.Append((byte)HeaderCommand.MessageId);
- _headerAccumulator.Append(messageId);
- _headerAccumulator.WriteTermination();
- }
-
- ///<inheritdoc/>
- public void WriteHeader(HeaderCommand header, ReadOnlySpan<char> value)
- {
- WriteHeader((byte)header, value);
- }
- ///<inheritdoc/>
- public void WriteHeader(byte header, ReadOnlySpan<char> value)
- {
- _headerAccumulator.WriteHeader(header, value, _headerEncoding);
- }
-
- ///<inheritdoc/>
- public void WriteBody(ReadOnlySpan<byte> body, ContentType contentType = ContentType.Binary)
- {
- //Append content type header
- WriteHeader(HeaderCommand.ContentType, HttpHelpers.GetContentTypeString(contentType));
- //end header segment
- _headerAccumulator.WriteTermination();
- //Write message body
- _headerAccumulator.Append(body);
- }
-
- /// <summary>
- /// Sets the response message body
- /// </summary>
- /// <param name="messageBody">The <see cref="IAsyncMessageBody"/> to stream data from</param>
- /// <exception cref="InvalidOperationException"></exception>
- public void AddMessageBody(IAsyncMessageBody messageBody)
- {
- if(_messageBody != null)
- {
- throw new InvalidOperationException("The message body is already set");
- }
-
- //Append message content type header
- WriteHeader(HeaderCommand.ContentType, HttpHelpers.GetContentTypeString(messageBody.ContentType));
-
- //end header segment
- _headerAccumulator.WriteTermination();
-
- //Store message body
- _messageBody = messageBody;
-
- }
-
- /// <summary>
- /// Gets the internal message body enumerator and prepares the message for sending
- /// </summary>
- /// <param name="cancellationToken">A cancellation token</param>
- /// <returns>A value task that returns the message body enumerator</returns>
- internal async ValueTask<IAsyncMessageReader> GetResponseDataAsync(CancellationToken cancellationToken)
- {
- //try to buffer as much data in the header segment first
- if(_messageBody?.RemainingSize > 0 && _headerAccumulator.RemainingSize > 0)
- {
- //Read data from the message
- int read = await _messageBody.ReadAsync(_headerAccumulator.RemainingBuffer, cancellationToken);
- //Advance accumulator to the read bytes
- _headerAccumulator.Advance(read);
- }
- //return reusable enumerator
- return _messageEnumerator;
- }
-
- private class MessageSegmentEnumerator : IAsyncMessageReader
- {
- private readonly FBMResponseMessage _message;
-
- bool HeadersRead;
-
- public MessageSegmentEnumerator(FBMResponseMessage message)
- {
- _message = message;
- }
-
- public ReadOnlyMemory<byte> Current { get; private set; }
-
- public bool DataRemaining { get; private set; }
-
- public async ValueTask<bool> MoveNextAsync()
- {
- //Attempt to read header segment first
- if (!HeadersRead)
- {
- //Set the accumulated buffer
- Current = _message._headerAccumulator.AccumulatedBuffer;
-
- //Update data remaining flag
- DataRemaining = _message._messageBody?.RemainingSize > 0;
-
- //Set headers read flag
- HeadersRead = true;
-
- return true;
- }
- else if (_message._messageBody?.RemainingSize > 0)
- {
- //Use the header buffer as the buffer for the message body
- Memory<byte> buffer = _message._headerAccumulator.Buffer;
-
- //Read body segment
- int read = await _message._messageBody.ReadAsync(buffer);
-
- //Update data remaining flag
- DataRemaining = _message._messageBody.RemainingSize > 0;
-
- if (read > 0)
- {
- //Store the read segment
- Current = buffer[..read];
- return true;
- }
- }
- return false;
- }
-
- public async ValueTask DisposeAsync()
- {
- //Clear current segment
- Current = default;
-
- //Reset headers read flag
- HeadersRead = false;
-
- //Dispose the message body if set
- if (_message._messageBody != null)
- {
- await _message._messageBody.DisposeAsync();
- }
- }
- }
- }
-
-}
diff --git a/Net.Messaging.FBM/src/Server/HeaderDataAccumulator.cs b/Net.Messaging.FBM/src/Server/HeaderDataAccumulator.cs
deleted file mode 100644
index 78b378d..0000000
--- a/Net.Messaging.FBM/src/Server/HeaderDataAccumulator.cs
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
-* Copyright (c) 2022 Vaughn Nugent
-*
-* Library: VNLib
-* Package: VNLib.Net.Messaging.FBM
-* File: HeaderDataAccumulator.cs
-*
-* HeaderDataAccumulator.cs is part of VNLib.Net.Messaging.FBM which is part of the larger
-* VNLib collection of libraries and utilities.
-*
-* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify
-* it under the terms of the GNU Affero General Public License as
-* published by the Free Software Foundation, either version 3 of the
-* License, or (at your option) any later version.
-*
-* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-* GNU Affero General Public License for more details.
-*
-* You should have received a copy of the GNU Affero General Public License
-* along with this program. If not, see https://www.gnu.org/licenses/.
-*/
-
-using System;
-using System.Buffers;
-
-using VNLib.Utils.IO;
-
-
-namespace VNLib.Net.Messaging.FBM.Server
-{
- /// <summary>
- /// Reusable sliding window impl
- /// </summary>
- internal class HeaderDataAccumulator : ISlindingWindowBuffer<byte>
- {
- private readonly int BufferSize;
-
- private byte[]? _memHandle;
-
- public HeaderDataAccumulator(int bufferSize)
- {
- BufferSize = bufferSize;
- }
-
- ///<inheritdoc/>
- public int WindowStartPos { get; private set; }
- ///<inheritdoc/>
- public int WindowEndPos { get; private set; }
- ///<inheritdoc/>
- public Memory<byte> Buffer => _memHandle.AsMemory();
-
- ///<inheritdoc/>
- public void Advance(int count) => WindowEndPos += count;
-
- ///<inheritdoc/>
- public void AdvanceStart(int count) => WindowEndPos += count;
-
- ///<inheritdoc/>
- public void Reset()
- {
- WindowStartPos = 0;
- WindowEndPos = 0;
- }
-
- /// <summary>
- /// Allocates the internal message buffer
- /// </summary>
- public void Prepare()
- {
- _memHandle ??= ArrayPool<byte>.Shared.Rent(BufferSize);
- }
-
- ///<inheritdoc/>
- public void Close()
- {
- Reset();
-
- if (_memHandle != null)
- {
- //Return the buffer to the pool
- ArrayPool<byte>.Shared.Return(_memHandle);
- _memHandle = null;
- }
- }
- }
-
-}
diff --git a/Net.Messaging.FBM/src/Server/IAsyncMessageBody.cs b/Net.Messaging.FBM/src/Server/IAsyncMessageBody.cs
deleted file mode 100644
index 5566520..0000000
--- a/Net.Messaging.FBM/src/Server/IAsyncMessageBody.cs
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
-* Copyright (c) 2022 Vaughn Nugent
-*
-* Library: VNLib
-* Package: VNLib.Net.Messaging.FBM
-* File: IAsyncMessageBody.cs
-*
-* IAsyncMessageBody.cs is part of VNLib.Net.Messaging.FBM which is part of the larger
-* VNLib collection of libraries and utilities.
-*
-* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify
-* it under the terms of the GNU Affero General Public License as
-* published by the Free Software Foundation, either version 3 of the
-* License, or (at your option) any later version.
-*
-* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-* GNU Affero General Public License for more details.
-*
-* You should have received a copy of the GNU Affero General Public License
-* along with this program. If not, see https://www.gnu.org/licenses/.
-*/
-
-using System;
-using System.Threading;
-using System.Threading.Tasks;
-
-using VNLib.Net.Http;
-
-namespace VNLib.Net.Messaging.FBM
-{
- /// <summary>
- /// A disposable message body container for asynchronously reading a variable length message body
- /// </summary>
- public interface IAsyncMessageBody : IAsyncDisposable
- {
- /// <summary>
- /// The message body content type
- /// </summary>
- ContentType ContentType { get; }
-
- /// <summary>
- /// The number of bytes remaining to be read from the message body
- /// </summary>
- int RemainingSize { get; }
-
- /// <summary>
- /// Reads the next chunk of data from the message body
- /// </summary>
- /// <param name="buffer">The buffer to copy output data to</param>
- /// <param name="token">A token to cancel the operation</param>
- /// <returns></returns>
- ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken token = default);
- }
-
-}
diff --git a/Net.Messaging.FBM/src/Server/IAsyncMessageReader.cs b/Net.Messaging.FBM/src/Server/IAsyncMessageReader.cs
deleted file mode 100644
index b2abe8d..0000000
--- a/Net.Messaging.FBM/src/Server/IAsyncMessageReader.cs
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
-* Copyright (c) 2022 Vaughn Nugent
-*
-* Library: VNLib
-* Package: VNLib.Net.Messaging.FBM
-* File: IAsyncMessageReader.cs
-*
-* IAsyncMessageReader.cs is part of VNLib.Net.Messaging.FBM which is part of the larger
-* VNLib collection of libraries and utilities.
-*
-* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify
-* it under the terms of the GNU Affero General Public License as
-* published by the Free Software Foundation, either version 3 of the
-* License, or (at your option) any later version.
-*
-* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-* GNU Affero General Public License for more details.
-*
-* You should have received a copy of the GNU Affero General Public License
-* along with this program. If not, see https://www.gnu.org/licenses/.
-*/
-
-using System;
-using System.Collections.Generic;
-
-
-namespace VNLib.Net.Messaging.FBM.Server
-{
- /// <summary>
- /// Internal message body reader/enumerator for FBM messages
- /// </summary>
- internal interface IAsyncMessageReader : IAsyncEnumerator<ReadOnlyMemory<byte>>
- {
- /// <summary>
- /// A value that indicates if there is data remaining after a
- /// </summary>
- bool DataRemaining { get; }
- }
-
-}
diff --git a/Net.Messaging.FBM/src/Server/readme.md b/Net.Messaging.FBM/src/Server/readme.md
deleted file mode 100644
index 489e58f..0000000
--- a/Net.Messaging.FBM/src/Server/readme.md
+++ /dev/null
@@ -1,35 +0,0 @@
-# VNLib.Net.Messaging.FBM.Server
-
-Fixed Buffer Messaging Protocol server library. High performance statful messaging
-protocol built on top of HTTP web-sockets. Low/no allocation, completely asynchronous
-while providing a TPL API. This library provides a simple asynchronous request/response
-architecture to web-sockets. This was initially designed to provide an alternative to
-complete HTTP request/response overhead, but allow a simple control flow for work
-across a network.
-
-Messages consist of a 4 byte message id, a collection of headers, and a message body.
-The first 4 bytes of a message is the ID (for normal messages a signed integer greater than 0),
-0 is reserved for error conditions, and negative numbers are reserved for internal
-messages. Headers are identified by a single byte, followed by a variable length UTF8
-encoded character sequence, followed by a termination of 0xFF, 0xF1 (may change).
-
-### Message structure
- 4 byte positive (signed 32-bit integer) message id
- 2 byte termination
- 1 byte header-id
- variable length UTF8 value
- 2 byte termination
- -- other headers --
- 2 byte termination (extra termination, ie: empty header)
- variable length payload
- (end of message is the end of the payload)
-
-
-XML Documentation is or will be provided for almost all public exports. APIs are intended to
-be sensibly public and immutable to allow for easy extensability (via extension methods). I
-often use extension libraries to provide additional functionality. (See cache library)
-
-This library is likely a niche use case, and is probably not for everyone. Unless you care
-about reasonably efficient high frequency request/response messaging, this probably isnt
-for you. This library provides a reasonable building block for distributed lock mechanisms
-and small data caching. \ No newline at end of file