diff options
author | vnugent <public@vaughnnugent.com> | 2023-01-08 16:01:54 -0500 |
---|---|---|
committer | vnugent <public@vaughnnugent.com> | 2023-01-08 16:01:54 -0500 |
commit | de94d788e9a47432a7630a8215896b8dd3628599 (patch) | |
tree | 666dec06eef861d101cb6948aff52a3d354c8d73 /lib/Net.Messaging.FBM/src/Server | |
parent | be6dc557a3b819248b014992eb96c1cb21f8112b (diff) |
Reorder + analyzer cleanup
Diffstat (limited to 'lib/Net.Messaging.FBM/src/Server')
-rw-r--r-- | lib/Net.Messaging.FBM/src/Server/FBMContext.cs | 85 | ||||
-rw-r--r-- | lib/Net.Messaging.FBM/src/Server/FBMListener.cs | 388 | ||||
-rw-r--r-- | lib/Net.Messaging.FBM/src/Server/FBMListenerBase.cs | 113 | ||||
-rw-r--r-- | lib/Net.Messaging.FBM/src/Server/FBMListenerSessionParams.cs | 62 | ||||
-rw-r--r-- | lib/Net.Messaging.FBM/src/Server/FBMRequestMessage.cs | 196 | ||||
-rw-r--r-- | lib/Net.Messaging.FBM/src/Server/FBMResponseMessage.cs | 226 | ||||
-rw-r--r-- | lib/Net.Messaging.FBM/src/Server/HeaderDataAccumulator.cs | 89 | ||||
-rw-r--r-- | lib/Net.Messaging.FBM/src/Server/IAsyncMessageBody.cs | 57 | ||||
-rw-r--r-- | lib/Net.Messaging.FBM/src/Server/IAsyncMessageReader.cs | 42 | ||||
-rw-r--r-- | lib/Net.Messaging.FBM/src/Server/readme.md | 35 |
10 files changed, 1293 insertions, 0 deletions
diff --git a/lib/Net.Messaging.FBM/src/Server/FBMContext.cs b/lib/Net.Messaging.FBM/src/Server/FBMContext.cs new file mode 100644 index 0000000..fb39d1b --- /dev/null +++ b/lib/Net.Messaging.FBM/src/Server/FBMContext.cs @@ -0,0 +1,85 @@ +/* +* Copyright (c) 2022 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Net.Messaging.FBM +* File: FBMContext.cs +* +* FBMContext.cs is part of VNLib.Net.Messaging.FBM which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System.Text; + +using VNLib.Utils.IO; +using VNLib.Utils.Memory.Caching; + +namespace VNLib.Net.Messaging.FBM.Server +{ + /// <summary> + /// A request/response pair message context + /// </summary> + public sealed class FBMContext : IReusable + { + private readonly Encoding _headerEncoding; + + /// <summary> + /// The request message to process + /// </summary> + public FBMRequestMessage Request { get; } + /// <summary> + /// The response message + /// </summary> + public FBMResponseMessage Response { get; } + /// <summary> + /// Creates a new reusable <see cref="FBMContext"/> + /// for use within a <see cref="ObjectRental{T}"/> + /// cache + /// </summary> + /// <param name="requestHeaderBufferSize">The size in characters of the request header buffer</param> + /// <param name="responseBufferSize">The size in characters of the response header buffer</param> + /// <param name="headerEncoding">The message header encoding instance</param> + public FBMContext(int requestHeaderBufferSize, int responseBufferSize, Encoding headerEncoding) + { + Request = new(requestHeaderBufferSize); + Response = new(responseBufferSize, headerEncoding); + _headerEncoding = headerEncoding; + } + + /// <summary> + /// Initializes the context with the buffered request data + /// </summary> + /// <param name="requestData">The request data buffer positioned at the begining of the request data</param> + /// <param name="connectionId">The unique id of the connection</param> + internal void Prepare(VnMemoryStream requestData, string connectionId) + { + Request.Prepare(requestData, connectionId, _headerEncoding); + //Message id is set after the request parses the incoming message + Response.Prepare(Request.MessageId); + } + + void IReusable.Prepare() + { + (Request as IReusable).Prepare(); + (Response as IReusable).Prepare(); + } + + bool IReusable.Release() + { + return (Request as IReusable).Release() & (Response as IReusable).Release(); + } + } +} diff --git a/lib/Net.Messaging.FBM/src/Server/FBMListener.cs b/lib/Net.Messaging.FBM/src/Server/FBMListener.cs new file mode 100644 index 0000000..6cca2a9 --- /dev/null +++ b/lib/Net.Messaging.FBM/src/Server/FBMListener.cs @@ -0,0 +1,388 @@ +/* +* Copyright (c) 2022 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Net.Messaging.FBM +* File: FBMListener.cs +* +* FBMListener.cs is part of VNLib.Net.Messaging.FBM which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.IO; +using System.Buffers; +using System.Threading; +using System.Net.WebSockets; +using System.Threading.Tasks; + +using VNLib.Utils.IO; +using VNLib.Utils.Async; +using VNLib.Utils.Memory; +using VNLib.Utils.Extensions; +using VNLib.Utils.Memory.Caching; +using VNLib.Plugins.Essentials; + +namespace VNLib.Net.Messaging.FBM.Server +{ + + /// <summary> + /// Method delegate for processing FBM messages from an <see cref="FBMListener"/> + /// when messages are received + /// </summary> + /// <param name="context">The message/connection context</param> + /// <param name="userState">The state parameter passed on client connected</param> + /// <param name="cancellationToken">A token that reflects the state of the listener</param> + /// <returns>A <see cref="Task"/> that resolves when processing is complete</returns> + public delegate Task RequestHandler(FBMContext context, object? userState, CancellationToken cancellationToken); + + /// <summary> + /// A FBM protocol listener. Listens for messages on a <see cref="WebSocketSession"/> + /// and raises events on requests. + /// </summary> + public class FBMListener + { + private sealed class ListeningSession + { + private readonly ReusableStore<FBMContext> CtxStore; + private readonly CancellationTokenSource Cancellation; + private readonly CancellationTokenRegistration Registration; + private readonly FBMListenerSessionParams Params; + + + public readonly object? UserState; + + public readonly SemaphoreSlim ResponseLock; + + public readonly WebSocketSession Socket; + + public readonly RequestHandler OnRecieved; + + public CancellationToken CancellationToken => Cancellation.Token; + + + public ListeningSession(WebSocketSession session, RequestHandler onRecieved, in FBMListenerSessionParams args, object? userState) + { + Params = args; + Socket = session; + UserState = userState; + OnRecieved = onRecieved; + + //Create cancellation and register for session close + Cancellation = new(); + Registration = session.Token.Register(Cancellation.Cancel); + + + ResponseLock = new(1); + CtxStore = ObjectRental.CreateReusable(ContextCtor); + } + + private FBMContext ContextCtor() => new(Params.MaxHeaderBufferSize, Params.ResponseBufferSize, Params.HeaderEncoding); + + /// <summary> + /// Cancels any pending opreations relating to the current session + /// </summary> + public void CancelSession() + { + Cancellation.Cancel(); + + //If dispose happens without any outstanding requests, we can dispose the session + if (_counter == 0) + { + CleanupInternal(); + } + } + + private void CleanupInternal() + { + Registration.Dispose(); + CtxStore.Dispose(); + Cancellation.Dispose(); + ResponseLock.Dispose(); + } + + + private uint _counter; + + /// <summary> + /// Rents a new <see cref="FBMContext"/> instance from the pool + /// and increments the counter + /// </summary> + /// <returns>The rented instance</returns> + /// <exception cref="ObjectDisposedException"></exception> + public FBMContext RentContext() + { + + if (Cancellation.IsCancellationRequested) + { + throw new ObjectDisposedException("The instance has been disposed"); + } + + //Rent context + FBMContext ctx = CtxStore.Rent(); + //Increment counter + Interlocked.Increment(ref _counter); + + return ctx; + } + + /// <summary> + /// Returns a previously rented context to the pool + /// and decrements the counter. If the session has been + /// cancelled, when the counter reaches 0, cleanup occurs + /// </summary> + /// <param name="ctx">The context to return</param> + public void ReturnContext(FBMContext ctx) + { + //Return the context + CtxStore.Return(ctx); + + uint current = Interlocked.Decrement(ref _counter); + + //No more contexts in use, dispose internals + if (Cancellation.IsCancellationRequested && current == 0) + { + ResponseLock.Dispose(); + Cancellation.Dispose(); + CtxStore.Dispose(); + } + } + } + + public const int SEND_SEMAPHORE_TIMEOUT_MS = 10 * 1000; + + private readonly IUnmangedHeap Heap; + + /// <summary> + /// Raised when a response processing error occured + /// </summary> + public event EventHandler<Exception>? OnProcessError; + + /// <summary> + /// Creates a new <see cref="FBMListener"/> instance ready for + /// processing connections + /// </summary> + /// <param name="heap">The heap to alloc buffers from</param> + public FBMListener(IUnmangedHeap heap) + { + Heap = heap; + } + + /// <summary> + /// Begins listening for requests on the current websocket until + /// a close message is received or an error occurs + /// </summary> + /// <param name="wss">The <see cref="WebSocketSession"/> to receive messages on</param> + /// <param name="handler">The callback method to handle incoming requests</param> + /// <param name="args">The arguments used to configured this listening session</param> + /// <param name="userState">A state parameter</param> + /// <returns>A <see cref="Task"/> that completes when the connection closes</returns> + public async Task ListenAsync(WebSocketSession wss, RequestHandler handler, FBMListenerSessionParams args, object? userState) + { + ListeningSession session = new(wss, handler, args, userState); + //Alloc a recieve buffer + using IMemoryOwner<byte> recvBuffer = Heap.DirectAlloc<byte>(args.RecvBufferSize); + + //Init new queue for dispatching work + AsyncQueue<VnMemoryStream> workQueue = new(true, true); + + //Start a task to process the queue + Task queueWorker = QueueWorkerDoWork(workQueue, session); + + try + { + //Listen for incoming messages + while (true) + { + //Receive a message + ValueWebSocketReceiveResult result = await wss.ReceiveAsync(recvBuffer.Memory); + //If a close message has been received, we can gracefully exit + if (result.MessageType == WebSocketMessageType.Close) + { + //Return close message + await wss.CloseSocketAsync(WebSocketCloseStatus.NormalClosure, "Goodbye"); + //break listen loop + break; + } + //create buffer for storing data + VnMemoryStream request = new(Heap); + //Copy initial data + request.Write(recvBuffer.Memory.Span[..result.Count]); + //Streaming read + while (!result.EndOfMessage) + { + //Read more data + result = await wss.ReceiveAsync(recvBuffer.Memory); + //Make sure the request is small enough to buffer + if (request.Length + result.Count > args.MaxMessageSize) + { + //dispose the buffer + request.Dispose(); + //close the socket with a message too big + await wss.CloseSocketAsync(WebSocketCloseStatus.MessageTooBig, "Buffer space exceeded for message. Goodbye"); + //break listen loop + goto Exit; + } + //write to buffer + request.Write(recvBuffer.Memory.Span[..result.Count]); + } + //Make sure data is available + if (request.Length == 0) + { + request.Dispose(); + continue; + } + //reset buffer position + _ = request.Seek(0, SeekOrigin.Begin); + //Enqueue the request + await workQueue.EnqueueAsync(request); + } + + Exit: + ; + } + finally + { + session.CancelSession(); + await queueWorker.ConfigureAwait(false); + } + } + + private async Task QueueWorkerDoWork(AsyncQueue<VnMemoryStream> queue, ListeningSession session) + { + try + { + while (true) + { + //Get work from queue + VnMemoryStream request = await queue.DequeueAsync(session.CancellationToken); + //Process request without waiting + _ = ProcessAsync(request, session).ConfigureAwait(false); + } + } + catch (OperationCanceledException) + { } + finally + { + //Cleanup any queued requests + while (queue.TryDequeue(out VnMemoryStream? stream)) + { + stream.Dispose(); + } + } + } + + private async Task ProcessAsync(VnMemoryStream data, ListeningSession session) + { + //Rent a new request object + FBMContext context = session.RentContext(); + try + { + //Prepare the request/response + context.Prepare(data, session.Socket.SocketID); + + if ((context.Request.ParseStatus & HeaderParseError.InvalidId) > 0) + { + OnProcessError?.Invoke(this, new FBMException($"Invalid messageid {context.Request.MessageId}, message length {data.Length}")); + return; + } + + //Check parse status flags + if ((context.Request.ParseStatus & HeaderParseError.HeaderOutOfMem) > 0) + { + OnProcessError?.Invoke(this, new FBMException("Packet received with not enough space to store headers")); + } + //Determine if request is an out-of-band message + else if (context.Request.MessageId == Helpers.CONTROL_FRAME_MID) + { + //Process control frame + await ProcessOOBAsync(context); + } + else + { + //Invoke normal message handler + await session.OnRecieved.Invoke(context, session.UserState, session.CancellationToken); + } + + //Get response data + await using IAsyncMessageReader messageEnumerator = await context.Response.GetResponseDataAsync(session.CancellationToken); + + //Load inital segment + if (await messageEnumerator.MoveNextAsync() && !session.CancellationToken.IsCancellationRequested) + { + ValueTask sendTask; + + //Syncrhonize access to send data because we may need to stream data to the client + await session.ResponseLock.WaitAsync(SEND_SEMAPHORE_TIMEOUT_MS); + try + { + do + { + bool eof = !messageEnumerator.DataRemaining; + + //Send first segment + sendTask = session.Socket.SendAsync(messageEnumerator.Current, WebSocketMessageType.Binary, eof); + + /* + * WARNING! + * this code relies on the managed websocket impl that the websocket will read + * the entire buffer before returning. If this is not the case, this code will + * overwrite the memory buffer on the next call to move next. + */ + + //Move to next segment + if (!await messageEnumerator.MoveNextAsync()) + { + break; + } + + //Await previous send + await sendTask; + + } while (true); + } + finally + { + //release semaphore + session.ResponseLock.Release(); + } + + await sendTask; + } + + //No data to send + } + catch (Exception ex) + { + OnProcessError?.Invoke(this, ex); + } + finally + { + session.ReturnContext(context); + } + } + + /// <summary> + /// Processes an out-of-band request message (internal communications) + /// </summary> + /// <param name="outOfBandContext">The <see cref="FBMContext"/> containing the OOB message</param> + /// <returns>A <see cref="Task"/> that completes when the operation completes</returns> + protected virtual Task ProcessOOBAsync(FBMContext outOfBandContext) + { + return Task.CompletedTask; + } + } +} diff --git a/lib/Net.Messaging.FBM/src/Server/FBMListenerBase.cs b/lib/Net.Messaging.FBM/src/Server/FBMListenerBase.cs new file mode 100644 index 0000000..3e9fde2 --- /dev/null +++ b/lib/Net.Messaging.FBM/src/Server/FBMListenerBase.cs @@ -0,0 +1,113 @@ +/* +* Copyright (c) 2022 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Net.Messaging.FBM +* File: FBMListenerBase.cs +* +* FBMListenerBase.cs is part of VNLib.Net.Messaging.FBM which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Threading; +using System.Threading.Tasks; + +using VNLib.Utils.Logging; +using VNLib.Utils.Memory; +using VNLib.Plugins.Essentials; + +namespace VNLib.Net.Messaging.FBM.Server +{ + /// <summary> + /// Provides a simple base class for an <see cref="FBMListener"/> + /// processor + /// </summary> + public abstract class FBMListenerBase + { + + /// <summary> + /// The initialzied listener + /// </summary> + protected FBMListener? Listener { get; private set; } + /// <summary> + /// A provider to write log information to + /// </summary> + protected abstract ILogProvider Log { get; } + + /// <summary> + /// Initializes the <see cref="FBMListener"/> + /// </summary> + /// <param name="heap">The heap to alloc buffers from</param> + protected void InitListener(IUnmangedHeap heap) + { + Listener = new(heap); + //Attach service handler + Listener.OnProcessError += Listener_OnProcessError; + } + + /// <summary> + /// A single event service routine for servicing errors that occur within + /// the listener loop + /// </summary> + /// <param name="sender"></param> + /// <param name="e">The exception that was raised</param> + protected virtual void Listener_OnProcessError(object? sender, Exception e) + { + //Write the error to the log + Log.Error(e); + } + + private async Task OnReceivedAsync(FBMContext context, object? userState, CancellationToken token) + { + try + { + await ProcessAsync(context, userState, token); + } + catch (OperationCanceledException) + { + Log.Debug("Async operation cancelled"); + } + catch(Exception ex) + { + Log.Error(ex); + } + } + + /// <summary> + /// Begins listening for requests on the current websocket until + /// a close message is received or an error occurs + /// </summary> + /// <param name="wss">The <see cref="WebSocketSession"/> to receive messages on</param> + /// <param name="args">The arguments used to configured this listening session</param> + /// <param name="userState">A state token to use for processing events for this connection</param> + /// <returns>A <see cref="Task"/> that completes when the connection closes</returns> + public virtual async Task ListenAsync(WebSocketSession wss, FBMListenerSessionParams args, object? userState) + { + _ = Listener ?? throw new InvalidOperationException("The listener has not been intialized"); + await Listener.ListenAsync(wss, OnReceivedAsync, args, userState); + } + + /// <summary> + /// A method to service an incoming message + /// </summary> + /// <param name="context">The context containing the message to be serviced</param> + /// <param name="userState">A state token passed on client connected</param> + /// <param name="exitToken">A token that reflects the state of the listener</param> + /// <returns>A task that completes when the message has been serviced</returns> + protected abstract Task ProcessAsync(FBMContext context, object? userState, CancellationToken exitToken); + } +} diff --git a/lib/Net.Messaging.FBM/src/Server/FBMListenerSessionParams.cs b/lib/Net.Messaging.FBM/src/Server/FBMListenerSessionParams.cs new file mode 100644 index 0000000..c327475 --- /dev/null +++ b/lib/Net.Messaging.FBM/src/Server/FBMListenerSessionParams.cs @@ -0,0 +1,62 @@ +/* +* Copyright (c) 2022 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Net.Messaging.FBM +* File: FBMListenerSessionParams.cs +* +* FBMListenerSessionParams.cs is part of VNLib.Net.Messaging.FBM which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System.Text; + +namespace VNLib.Net.Messaging.FBM.Server +{ + /// <summary> + /// Represents a configuration structure for an <see cref="FBMListener"/> + /// listening session + /// </summary> + public readonly struct FBMListenerSessionParams + { + /// <summary> + /// The size of the buffer to use while reading data from the websocket + /// in the listener loop + /// </summary> + public readonly int RecvBufferSize { get; init; } + /// <summary> + /// The size of the character buffer to store FBMheader values in + /// the <see cref="FBMRequestMessage"/> + /// </summary> + public readonly int MaxHeaderBufferSize { get; init; } + /// <summary> + /// The size of the internal message response buffer when + /// not streaming + /// </summary> + public readonly int ResponseBufferSize { get; init; } + /// <summary> + /// The FMB message header character encoding + /// </summary> + public readonly Encoding HeaderEncoding { get; init; } + + /// <summary> + /// The absolute maxium size (in bytes) message to process before + /// closing the websocket connection. This value should be negotiaed + /// by clients or hard-coded to avoid connection issues + /// </summary> + public readonly int MaxMessageSize { get; init; } + } +} diff --git a/lib/Net.Messaging.FBM/src/Server/FBMRequestMessage.cs b/lib/Net.Messaging.FBM/src/Server/FBMRequestMessage.cs new file mode 100644 index 0000000..ed36571 --- /dev/null +++ b/lib/Net.Messaging.FBM/src/Server/FBMRequestMessage.cs @@ -0,0 +1,196 @@ +/* +* Copyright (c) 2022 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Net.Messaging.FBM +* File: FBMRequestMessage.cs +* +* FBMRequestMessage.cs is part of VNLib.Net.Messaging.FBM which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Text; +using System.Buffers; +using System.Text.Json; +using System.Collections.Generic; + +using VNLib.Utils; +using VNLib.Utils.IO; +using VNLib.Utils.Memory; +using VNLib.Utils.Extensions; +using VNLib.Utils.Memory.Caching; + +namespace VNLib.Net.Messaging.FBM.Server +{ + /// <summary> + /// Represents a client request message to be serviced + /// </summary> + public sealed class FBMRequestMessage : IReusable + { + private readonly List<KeyValuePair<HeaderCommand, ReadOnlyMemory<char>>> _headers; + private readonly int HeaderCharBufferSize; + /// <summary> + /// Creates a new resusable <see cref="FBMRequestMessage"/> + /// </summary> + /// <param name="headerBufferSize">The size of the buffer to alloc during initialization</param> + internal FBMRequestMessage(int headerBufferSize) + { + HeaderCharBufferSize = headerBufferSize; + _headers = new(); + } + + private char[]? _headerBuffer; + + /// <summary> + /// The ID of the current message + /// </summary> + public int MessageId { get; private set; } + /// <summary> + /// Gets the underlying socket-id fot the current connection + /// </summary> + public string? ConnectionId { get; private set; } + /// <summary> + /// The raw request message, positioned to the body section of the message data + /// </summary> + public VnMemoryStream? RequestBody { get; private set; } + /// <summary> + /// A collection of headers for the current request + /// </summary> + public IReadOnlyList<KeyValuePair<HeaderCommand, ReadOnlyMemory<char>>> Headers => _headers; + /// <summary> + /// Status flags set during the message parsing + /// </summary> + public HeaderParseError ParseStatus { get; private set; } + /// <summary> + /// The message body data as a <see cref="ReadOnlySpan{T}"/> + /// </summary> + public ReadOnlySpan<byte> BodyData => Helpers.GetRemainingData(RequestBody!); + + /// <summary> + /// Determines if the current message is considered a control frame + /// </summary> + public bool IsControlFrame { get; private set; } + + /// <summary> + /// Prepares the request to be serviced + /// </summary> + /// <param name="vms">The request data packet</param> + /// <param name="socketId">The unique id of the connection</param> + /// <param name="dataEncoding">The data encoding used to decode header values</param> + internal void Prepare(VnMemoryStream vms, string socketId, Encoding dataEncoding) + { + //Store request body + RequestBody = vms; + //Store message id + MessageId = Helpers.GetMessageId(Helpers.ReadLine(vms)); + //Check mid for control frame + if(MessageId == Helpers.CONTROL_FRAME_MID) + { + IsControlFrame = true; + } + else if (MessageId < 1) + { + ParseStatus |= HeaderParseError.InvalidId; + return; + } + + ConnectionId = socketId; + + //sliding window over remaining data from internal buffer + ForwardOnlyMemoryWriter<char> writer = new(_headerBuffer); + + //Accumulate headers + while (true) + { + //Read the next line from the current stream + ReadOnlySpan<byte> line = Helpers.ReadLine(vms); + if (line.IsEmpty) + { + //Done reading headers + break; + } + HeaderCommand cmd = Helpers.GetHeaderCommand(line); + //Get header value + ERRNO charsRead = Helpers.GetHeaderValue(line, writer.Remaining.Span, dataEncoding); + if (charsRead < 0) + { + //Out of buffer space + ParseStatus |= HeaderParseError.HeaderOutOfMem; + break; + } + else if (!charsRead) + { + //Invalid header + ParseStatus |= HeaderParseError.InvalidHeaderRead; + } + else + { + //Store header as a read-only sequence + _headers.Add(new(cmd, writer.Remaining[..(int)charsRead])); + //Shift buffer window + writer.Advance(charsRead); + } + } + } + + /// <summary> + /// Deserializes the request body into a new specified object type + /// </summary> + /// <typeparam name="T">The type of the object to deserialize</typeparam> + /// <param name="jso">The <see cref="JsonSerializerOptions"/> to use while deserializing data</param> + /// <returns>The deserialized object from the request body</returns> + /// <exception cref="JsonException"></exception> + public T? DeserializeBody<T>(JsonSerializerOptions? jso = default) + { + return BodyData.IsEmpty ? default : BodyData.AsJsonObject<T>(jso); + } + /// <summary> + /// Gets a <see cref="JsonDocument"/> of the request body + /// </summary> + /// <returns>The parsed <see cref="JsonDocument"/> if parsed successfully, or null otherwise</returns> + /// <exception cref="JsonException"></exception> + public JsonDocument? GetBodyAsJson() + { + Utf8JsonReader reader = new(BodyData); + return JsonDocument.TryParseValue(ref reader, out JsonDocument? jdoc) ? jdoc : default; + } + + void IReusable.Prepare() + { + ParseStatus = HeaderParseError.None; + //Alloc header buffer + _headerBuffer = ArrayPool<char>.Shared.Rent(HeaderCharBufferSize); + } + + + bool IReusable.Release() + { + //Dispose the request message + RequestBody?.Dispose(); + RequestBody = null; + //Clear headers before freeing buffer + _headers.Clear(); + //Free header-buffer + ArrayPool<char>.Shared.Return(_headerBuffer!); + _headerBuffer = null; + ConnectionId = null; + MessageId = 0; + IsControlFrame = false; + return true; + } + } +} diff --git a/lib/Net.Messaging.FBM/src/Server/FBMResponseMessage.cs b/lib/Net.Messaging.FBM/src/Server/FBMResponseMessage.cs new file mode 100644 index 0000000..ac34dda --- /dev/null +++ b/lib/Net.Messaging.FBM/src/Server/FBMResponseMessage.cs @@ -0,0 +1,226 @@ +/* +* Copyright (c) 2022 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Net.Messaging.FBM +* File: FBMResponseMessage.cs +* +* FBMResponseMessage.cs is part of VNLib.Net.Messaging.FBM which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +using VNLib.Net.Http; +using VNLib.Utils.IO; +using VNLib.Utils.Extensions; +using VNLib.Utils.Memory.Caching; +using VNLib.Net.Messaging.FBM.Client; + +namespace VNLib.Net.Messaging.FBM.Server +{ + + /// <summary> + /// Represents an FBM request response container. + /// </summary> + public sealed class FBMResponseMessage : IReusable, IFBMMessage + { + internal FBMResponseMessage(int internalBufferSize, Encoding headerEncoding) + { + _headerAccumulator = new HeaderDataAccumulator(internalBufferSize); + _headerEncoding = headerEncoding; + _messageEnumerator = new(this); + } + + private readonly MessageSegmentEnumerator _messageEnumerator; + private readonly ISlindingWindowBuffer<byte> _headerAccumulator; + private readonly Encoding _headerEncoding; + + private IAsyncMessageBody? _messageBody; + + ///<inheritdoc/> + public int MessageId { get; private set; } + + void IReusable.Prepare() + { + (_headerAccumulator as HeaderDataAccumulator)!.Prepare(); + } + + bool IReusable.Release() + { + //Release header accumulator + _headerAccumulator.Close(); + + _messageBody = null; + + MessageId = 0; + + return true; + } + + /// <summary> + /// Initializes the response message with the specified message-id + /// to respond with + /// </summary> + /// <param name="messageId">The message id of the context to respond to</param> + internal void Prepare(int messageId) + { + //Reset accumulator when message id is written + _headerAccumulator.Reset(); + //Write the messageid to the begining of the headers buffer + MessageId = messageId; + _headerAccumulator.Append((byte)HeaderCommand.MessageId); + _headerAccumulator.Append(messageId); + _headerAccumulator.WriteTermination(); + } + + ///<inheritdoc/> + public void WriteHeader(HeaderCommand header, ReadOnlySpan<char> value) + { + WriteHeader((byte)header, value); + } + ///<inheritdoc/> + public void WriteHeader(byte header, ReadOnlySpan<char> value) + { + _headerAccumulator.WriteHeader(header, value, _headerEncoding); + } + + ///<inheritdoc/> + public void WriteBody(ReadOnlySpan<byte> body, ContentType contentType = ContentType.Binary) + { + //Append content type header + WriteHeader(HeaderCommand.ContentType, HttpHelpers.GetContentTypeString(contentType)); + //end header segment + _headerAccumulator.WriteTermination(); + //Write message body + _headerAccumulator.Append(body); + } + + /// <summary> + /// Sets the response message body + /// </summary> + /// <param name="messageBody">The <see cref="IAsyncMessageBody"/> to stream data from</param> + /// <exception cref="InvalidOperationException"></exception> + public void AddMessageBody(IAsyncMessageBody messageBody) + { + if(_messageBody != null) + { + throw new InvalidOperationException("The message body is already set"); + } + + //Append message content type header + WriteHeader(HeaderCommand.ContentType, HttpHelpers.GetContentTypeString(messageBody.ContentType)); + + //end header segment + _headerAccumulator.WriteTermination(); + + //Store message body + _messageBody = messageBody; + + } + + /// <summary> + /// Gets the internal message body enumerator and prepares the message for sending + /// </summary> + /// <param name="cancellationToken">A cancellation token</param> + /// <returns>A value task that returns the message body enumerator</returns> + internal async ValueTask<IAsyncMessageReader> GetResponseDataAsync(CancellationToken cancellationToken) + { + //try to buffer as much data in the header segment first + if(_messageBody?.RemainingSize > 0 && _headerAccumulator.RemainingSize > 0) + { + //Read data from the message + int read = await _messageBody.ReadAsync(_headerAccumulator.RemainingBuffer, cancellationToken); + //Advance accumulator to the read bytes + _headerAccumulator.Advance(read); + } + //return reusable enumerator + return _messageEnumerator; + } + + private sealed class MessageSegmentEnumerator : IAsyncMessageReader + { + private readonly FBMResponseMessage _message; + + bool HeadersRead; + + public MessageSegmentEnumerator(FBMResponseMessage message) + { + _message = message; + } + + public ReadOnlyMemory<byte> Current { get; private set; } + + public bool DataRemaining { get; private set; } + + public async ValueTask<bool> MoveNextAsync() + { + //Attempt to read header segment first + if (!HeadersRead) + { + //Set the accumulated buffer + Current = _message._headerAccumulator.AccumulatedBuffer; + + //Update data remaining flag + DataRemaining = _message._messageBody?.RemainingSize > 0; + + //Set headers read flag + HeadersRead = true; + + return true; + } + else if (_message._messageBody?.RemainingSize > 0) + { + //Use the header buffer as the buffer for the message body + Memory<byte> buffer = _message._headerAccumulator.Buffer; + + //Read body segment + int read = await _message._messageBody.ReadAsync(buffer); + + //Update data remaining flag + DataRemaining = _message._messageBody.RemainingSize > 0; + + if (read > 0) + { + //Store the read segment + Current = buffer[..read]; + return true; + } + } + return false; + } + + public async ValueTask DisposeAsync() + { + //Clear current segment + Current = default; + + //Reset headers read flag + HeadersRead = false; + + //Dispose the message body if set + if (_message._messageBody != null) + { + await _message._messageBody.DisposeAsync(); + } + } + } + } + +} diff --git a/lib/Net.Messaging.FBM/src/Server/HeaderDataAccumulator.cs b/lib/Net.Messaging.FBM/src/Server/HeaderDataAccumulator.cs new file mode 100644 index 0000000..423a26e --- /dev/null +++ b/lib/Net.Messaging.FBM/src/Server/HeaderDataAccumulator.cs @@ -0,0 +1,89 @@ +/* +* Copyright (c) 2022 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Net.Messaging.FBM +* File: HeaderDataAccumulator.cs +* +* HeaderDataAccumulator.cs is part of VNLib.Net.Messaging.FBM which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Buffers; + +using VNLib.Utils.IO; + + +namespace VNLib.Net.Messaging.FBM.Server +{ + /// <summary> + /// Reusable sliding window impl + /// </summary> + internal sealed class HeaderDataAccumulator : ISlindingWindowBuffer<byte> + { + private readonly int BufferSize; + + private byte[]? _memHandle; + + public HeaderDataAccumulator(int bufferSize) + { + BufferSize = bufferSize; + } + + ///<inheritdoc/> + public int WindowStartPos { get; private set; } + ///<inheritdoc/> + public int WindowEndPos { get; private set; } + ///<inheritdoc/> + public Memory<byte> Buffer => _memHandle.AsMemory(); + + ///<inheritdoc/> + public void Advance(int count) => WindowEndPos += count; + + ///<inheritdoc/> + public void AdvanceStart(int count) => WindowEndPos += count; + + ///<inheritdoc/> + public void Reset() + { + WindowStartPos = 0; + WindowEndPos = 0; + } + + /// <summary> + /// Allocates the internal message buffer + /// </summary> + public void Prepare() + { + _memHandle ??= ArrayPool<byte>.Shared.Rent(BufferSize); + } + + ///<inheritdoc/> + public void Close() + { + Reset(); + + if (_memHandle != null) + { + //Return the buffer to the pool + ArrayPool<byte>.Shared.Return(_memHandle); + _memHandle = null; + } + } + } + +} diff --git a/lib/Net.Messaging.FBM/src/Server/IAsyncMessageBody.cs b/lib/Net.Messaging.FBM/src/Server/IAsyncMessageBody.cs new file mode 100644 index 0000000..5566520 --- /dev/null +++ b/lib/Net.Messaging.FBM/src/Server/IAsyncMessageBody.cs @@ -0,0 +1,57 @@ +/* +* Copyright (c) 2022 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Net.Messaging.FBM +* File: IAsyncMessageBody.cs +* +* IAsyncMessageBody.cs is part of VNLib.Net.Messaging.FBM which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Threading; +using System.Threading.Tasks; + +using VNLib.Net.Http; + +namespace VNLib.Net.Messaging.FBM +{ + /// <summary> + /// A disposable message body container for asynchronously reading a variable length message body + /// </summary> + public interface IAsyncMessageBody : IAsyncDisposable + { + /// <summary> + /// The message body content type + /// </summary> + ContentType ContentType { get; } + + /// <summary> + /// The number of bytes remaining to be read from the message body + /// </summary> + int RemainingSize { get; } + + /// <summary> + /// Reads the next chunk of data from the message body + /// </summary> + /// <param name="buffer">The buffer to copy output data to</param> + /// <param name="token">A token to cancel the operation</param> + /// <returns></returns> + ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken token = default); + } + +} diff --git a/lib/Net.Messaging.FBM/src/Server/IAsyncMessageReader.cs b/lib/Net.Messaging.FBM/src/Server/IAsyncMessageReader.cs new file mode 100644 index 0000000..b2abe8d --- /dev/null +++ b/lib/Net.Messaging.FBM/src/Server/IAsyncMessageReader.cs @@ -0,0 +1,42 @@ +/* +* Copyright (c) 2022 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Net.Messaging.FBM +* File: IAsyncMessageReader.cs +* +* IAsyncMessageReader.cs is part of VNLib.Net.Messaging.FBM which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Collections.Generic; + + +namespace VNLib.Net.Messaging.FBM.Server +{ + /// <summary> + /// Internal message body reader/enumerator for FBM messages + /// </summary> + internal interface IAsyncMessageReader : IAsyncEnumerator<ReadOnlyMemory<byte>> + { + /// <summary> + /// A value that indicates if there is data remaining after a + /// </summary> + bool DataRemaining { get; } + } + +} diff --git a/lib/Net.Messaging.FBM/src/Server/readme.md b/lib/Net.Messaging.FBM/src/Server/readme.md new file mode 100644 index 0000000..489e58f --- /dev/null +++ b/lib/Net.Messaging.FBM/src/Server/readme.md @@ -0,0 +1,35 @@ +# VNLib.Net.Messaging.FBM.Server + +Fixed Buffer Messaging Protocol server library. High performance statful messaging +protocol built on top of HTTP web-sockets. Low/no allocation, completely asynchronous +while providing a TPL API. This library provides a simple asynchronous request/response +architecture to web-sockets. This was initially designed to provide an alternative to +complete HTTP request/response overhead, but allow a simple control flow for work +across a network. + +Messages consist of a 4 byte message id, a collection of headers, and a message body. +The first 4 bytes of a message is the ID (for normal messages a signed integer greater than 0), +0 is reserved for error conditions, and negative numbers are reserved for internal +messages. Headers are identified by a single byte, followed by a variable length UTF8 +encoded character sequence, followed by a termination of 0xFF, 0xF1 (may change). + +### Message structure + 4 byte positive (signed 32-bit integer) message id + 2 byte termination + 1 byte header-id + variable length UTF8 value + 2 byte termination + -- other headers -- + 2 byte termination (extra termination, ie: empty header) + variable length payload + (end of message is the end of the payload) + + +XML Documentation is or will be provided for almost all public exports. APIs are intended to +be sensibly public and immutable to allow for easy extensability (via extension methods). I +often use extension libraries to provide additional functionality. (See cache library) + +This library is likely a niche use case, and is probably not for everyone. Unless you care +about reasonably efficient high frequency request/response messaging, this probably isnt +for you. This library provides a reasonable building block for distributed lock mechanisms +and small data caching.
\ No newline at end of file |