diff options
author | vnugent <public@vaughnnugent.com> | 2023-01-08 16:01:54 -0500 |
---|---|---|
committer | vnugent <public@vaughnnugent.com> | 2023-01-08 16:01:54 -0500 |
commit | de94d788e9a47432a7630a8215896b8dd3628599 (patch) | |
tree | 666dec06eef861d101cb6948aff52a3d354c8d73 /Net.Messaging.FBM/src/Server | |
parent | be6dc557a3b819248b014992eb96c1cb21f8112b (diff) |
Reorder + analyzer cleanup
Diffstat (limited to 'Net.Messaging.FBM/src/Server')
-rw-r--r-- | Net.Messaging.FBM/src/Server/FBMContext.cs | 85 | ||||
-rw-r--r-- | Net.Messaging.FBM/src/Server/FBMListener.cs | 389 | ||||
-rw-r--r-- | Net.Messaging.FBM/src/Server/FBMListenerBase.cs | 113 | ||||
-rw-r--r-- | Net.Messaging.FBM/src/Server/FBMListenerSessionParams.cs | 62 | ||||
-rw-r--r-- | Net.Messaging.FBM/src/Server/FBMRequestMessage.cs | 196 | ||||
-rw-r--r-- | Net.Messaging.FBM/src/Server/FBMResponseMessage.cs | 226 | ||||
-rw-r--r-- | Net.Messaging.FBM/src/Server/HeaderDataAccumulator.cs | 89 | ||||
-rw-r--r-- | Net.Messaging.FBM/src/Server/IAsyncMessageBody.cs | 57 | ||||
-rw-r--r-- | Net.Messaging.FBM/src/Server/IAsyncMessageReader.cs | 42 | ||||
-rw-r--r-- | Net.Messaging.FBM/src/Server/readme.md | 35 |
10 files changed, 0 insertions, 1294 deletions
diff --git a/Net.Messaging.FBM/src/Server/FBMContext.cs b/Net.Messaging.FBM/src/Server/FBMContext.cs deleted file mode 100644 index fb39d1b..0000000 --- a/Net.Messaging.FBM/src/Server/FBMContext.cs +++ /dev/null @@ -1,85 +0,0 @@ -/* -* Copyright (c) 2022 Vaughn Nugent -* -* Library: VNLib -* Package: VNLib.Net.Messaging.FBM -* File: FBMContext.cs -* -* FBMContext.cs is part of VNLib.Net.Messaging.FBM which is part of the larger -* VNLib collection of libraries and utilities. -* -* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License as -* published by the Free Software Foundation, either version 3 of the -* License, or (at your option) any later version. -* -* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see https://www.gnu.org/licenses/. -*/ - -using System.Text; - -using VNLib.Utils.IO; -using VNLib.Utils.Memory.Caching; - -namespace VNLib.Net.Messaging.FBM.Server -{ - /// <summary> - /// A request/response pair message context - /// </summary> - public sealed class FBMContext : IReusable - { - private readonly Encoding _headerEncoding; - - /// <summary> - /// The request message to process - /// </summary> - public FBMRequestMessage Request { get; } - /// <summary> - /// The response message - /// </summary> - public FBMResponseMessage Response { get; } - /// <summary> - /// Creates a new reusable <see cref="FBMContext"/> - /// for use within a <see cref="ObjectRental{T}"/> - /// cache - /// </summary> - /// <param name="requestHeaderBufferSize">The size in characters of the request header buffer</param> - /// <param name="responseBufferSize">The size in characters of the response header buffer</param> - /// <param name="headerEncoding">The message header encoding instance</param> - public FBMContext(int requestHeaderBufferSize, int responseBufferSize, Encoding headerEncoding) - { - Request = new(requestHeaderBufferSize); - Response = new(responseBufferSize, headerEncoding); - _headerEncoding = headerEncoding; - } - - /// <summary> - /// Initializes the context with the buffered request data - /// </summary> - /// <param name="requestData">The request data buffer positioned at the begining of the request data</param> - /// <param name="connectionId">The unique id of the connection</param> - internal void Prepare(VnMemoryStream requestData, string connectionId) - { - Request.Prepare(requestData, connectionId, _headerEncoding); - //Message id is set after the request parses the incoming message - Response.Prepare(Request.MessageId); - } - - void IReusable.Prepare() - { - (Request as IReusable).Prepare(); - (Response as IReusable).Prepare(); - } - - bool IReusable.Release() - { - return (Request as IReusable).Release() & (Response as IReusable).Release(); - } - } -} diff --git a/Net.Messaging.FBM/src/Server/FBMListener.cs b/Net.Messaging.FBM/src/Server/FBMListener.cs deleted file mode 100644 index 1d50953..0000000 --- a/Net.Messaging.FBM/src/Server/FBMListener.cs +++ /dev/null @@ -1,389 +0,0 @@ -/* -* Copyright (c) 2022 Vaughn Nugent -* -* Library: VNLib -* Package: VNLib.Net.Messaging.FBM -* File: FBMListener.cs -* -* FBMListener.cs is part of VNLib.Net.Messaging.FBM which is part of the larger -* VNLib collection of libraries and utilities. -* -* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License as -* published by the Free Software Foundation, either version 3 of the -* License, or (at your option) any later version. -* -* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see https://www.gnu.org/licenses/. -*/ - -using System; -using System.IO; -using System.Buffers; -using System.Threading; -using System.Net.WebSockets; -using System.Threading.Tasks; - -using VNLib.Utils; -using VNLib.Utils.IO; -using VNLib.Utils.Async; -using VNLib.Utils.Memory; -using VNLib.Utils.Extensions; -using VNLib.Utils.Memory.Caching; -using VNLib.Plugins.Essentials; - -namespace VNLib.Net.Messaging.FBM.Server -{ - - /// <summary> - /// Method delegate for processing FBM messages from an <see cref="FBMListener"/> - /// when messages are received - /// </summary> - /// <param name="context">The message/connection context</param> - /// <param name="userState">The state parameter passed on client connected</param> - /// <param name="cancellationToken">A token that reflects the state of the listener</param> - /// <returns>A <see cref="Task"/> that resolves when processing is complete</returns> - public delegate Task RequestHandler(FBMContext context, object? userState, CancellationToken cancellationToken); - - /// <summary> - /// A FBM protocol listener. Listens for messages on a <see cref="WebSocketSession"/> - /// and raises events on requests. - /// </summary> - public class FBMListener - { - class ListeningSession - { - private readonly ReusableStore<FBMContext> CtxStore; - private readonly CancellationTokenSource Cancellation; - private readonly CancellationTokenRegistration Registration; - private readonly FBMListenerSessionParams Params; - - - public readonly object? UserState; - - public readonly SemaphoreSlim ResponseLock; - - public readonly WebSocketSession Socket; - - public readonly RequestHandler OnRecieved; - - public CancellationToken CancellationToken => Cancellation.Token; - - - public ListeningSession(WebSocketSession session, RequestHandler onRecieved, in FBMListenerSessionParams args, object? userState) - { - Params = args; - Socket = session; - UserState = userState; - OnRecieved = onRecieved; - - //Create cancellation and register for session close - Cancellation = new(); - Registration = session.Token.Register(Cancellation.Cancel); - - - ResponseLock = new(1); - CtxStore = ObjectRental.CreateReusable(ContextCtor); - } - - private FBMContext ContextCtor() => new(Params.MaxHeaderBufferSize, Params.ResponseBufferSize, Params.HeaderEncoding); - - /// <summary> - /// Cancels any pending opreations relating to the current session - /// </summary> - public void CancelSession() - { - Cancellation.Cancel(); - - //If dispose happens without any outstanding requests, we can dispose the session - if (_counter == 0) - { - CleanupInternal(); - } - } - - private void CleanupInternal() - { - Registration.Dispose(); - CtxStore.Dispose(); - Cancellation.Dispose(); - ResponseLock.Dispose(); - } - - - private uint _counter; - - /// <summary> - /// Rents a new <see cref="FBMContext"/> instance from the pool - /// and increments the counter - /// </summary> - /// <returns>The rented instance</returns> - /// <exception cref="ObjectDisposedException"></exception> - public FBMContext RentContext() - { - - if (Cancellation.IsCancellationRequested) - { - throw new ObjectDisposedException("The instance has been disposed"); - } - - //Rent context - FBMContext ctx = CtxStore.Rent(); - //Increment counter - Interlocked.Increment(ref _counter); - - return ctx; - } - - /// <summary> - /// Returns a previously rented context to the pool - /// and decrements the counter. If the session has been - /// cancelled, when the counter reaches 0, cleanup occurs - /// </summary> - /// <param name="ctx">The context to return</param> - public void ReturnContext(FBMContext ctx) - { - //Return the context - CtxStore.Return(ctx); - - uint current = Interlocked.Decrement(ref _counter); - - //No more contexts in use, dispose internals - if (Cancellation.IsCancellationRequested && current == 0) - { - ResponseLock.Dispose(); - Cancellation.Dispose(); - CtxStore.Dispose(); - } - } - } - - public const int SEND_SEMAPHORE_TIMEOUT_MS = 10 * 1000; - - private readonly IUnmangedHeap Heap; - - /// <summary> - /// Raised when a response processing error occured - /// </summary> - public event EventHandler<Exception>? OnProcessError; - - /// <summary> - /// Creates a new <see cref="FBMListener"/> instance ready for - /// processing connections - /// </summary> - /// <param name="heap">The heap to alloc buffers from</param> - public FBMListener(IUnmangedHeap heap) - { - Heap = heap; - } - - /// <summary> - /// Begins listening for requests on the current websocket until - /// a close message is received or an error occurs - /// </summary> - /// <param name="wss">The <see cref="WebSocketSession"/> to receive messages on</param> - /// <param name="handler">The callback method to handle incoming requests</param> - /// <param name="args">The arguments used to configured this listening session</param> - /// <param name="userState">A state parameter</param> - /// <returns>A <see cref="Task"/> that completes when the connection closes</returns> - public async Task ListenAsync(WebSocketSession wss, RequestHandler handler, FBMListenerSessionParams args, object? userState) - { - ListeningSession session = new(wss, handler, args, userState); - //Alloc a recieve buffer - using IMemoryOwner<byte> recvBuffer = Heap.DirectAlloc<byte>(args.RecvBufferSize); - - //Init new queue for dispatching work - AsyncQueue<VnMemoryStream> workQueue = new(true, true); - - //Start a task to process the queue - Task queueWorker = QueueWorkerDoWork(workQueue, session); - - try - { - //Listen for incoming messages - while (true) - { - //Receive a message - ValueWebSocketReceiveResult result = await wss.ReceiveAsync(recvBuffer.Memory); - //If a close message has been received, we can gracefully exit - if (result.MessageType == WebSocketMessageType.Close) - { - //Return close message - await wss.CloseSocketAsync(WebSocketCloseStatus.NormalClosure, "Goodbye"); - //break listen loop - break; - } - //create buffer for storing data - VnMemoryStream request = new(Heap); - //Copy initial data - request.Write(recvBuffer.Memory.Span[..result.Count]); - //Streaming read - while (!result.EndOfMessage) - { - //Read more data - result = await wss.ReceiveAsync(recvBuffer.Memory); - //Make sure the request is small enough to buffer - if (request.Length + result.Count > args.MaxMessageSize) - { - //dispose the buffer - request.Dispose(); - //close the socket with a message too big - await wss.CloseSocketAsync(WebSocketCloseStatus.MessageTooBig, "Buffer space exceeded for message. Goodbye"); - //break listen loop - goto Exit; - } - //write to buffer - request.Write(recvBuffer.Memory.Span[..result.Count]); - } - //Make sure data is available - if (request.Length == 0) - { - request.Dispose(); - continue; - } - //reset buffer position - _ = request.Seek(0, SeekOrigin.Begin); - //Enqueue the request - await workQueue.EnqueueAsync(request); - } - - Exit: - ; - } - finally - { - session.CancelSession(); - await queueWorker.ConfigureAwait(false); - } - } - - private async Task QueueWorkerDoWork(AsyncQueue<VnMemoryStream> queue, ListeningSession session) - { - try - { - while (true) - { - //Get work from queue - VnMemoryStream request = await queue.DequeueAsync(session.CancellationToken); - //Process request without waiting - _ = ProcessAsync(request, session).ConfigureAwait(false); - } - } - catch (OperationCanceledException) - { } - finally - { - //Cleanup any queued requests - while (queue.TryDequeue(out VnMemoryStream? stream)) - { - stream.Dispose(); - } - } - } - - private async Task ProcessAsync(VnMemoryStream data, ListeningSession session) - { - //Rent a new request object - FBMContext context = session.RentContext(); - try - { - //Prepare the request/response - context.Prepare(data, session.Socket.SocketID); - - if ((context.Request.ParseStatus & HeaderParseError.InvalidId) > 0) - { - OnProcessError?.Invoke(this, new FBMException($"Invalid messageid {context.Request.MessageId}, message length {data.Length}")); - return; - } - - //Check parse status flags - if ((context.Request.ParseStatus & HeaderParseError.HeaderOutOfMem) > 0) - { - OnProcessError?.Invoke(this, new OutOfMemoryException("Packet received with not enough space to store headers")); - } - //Determine if request is an out-of-band message - else if (context.Request.MessageId == Helpers.CONTROL_FRAME_MID) - { - //Process control frame - await ProcessOOBAsync(context); - } - else - { - //Invoke normal message handler - await session.OnRecieved.Invoke(context, session.UserState, session.CancellationToken); - } - - //Get response data - await using IAsyncMessageReader messageEnumerator = await context.Response.GetResponseDataAsync(session.CancellationToken); - - //Load inital segment - if (await messageEnumerator.MoveNextAsync() && !session.CancellationToken.IsCancellationRequested) - { - ValueTask sendTask; - - //Syncrhonize access to send data because we may need to stream data to the client - await session.ResponseLock.WaitAsync(SEND_SEMAPHORE_TIMEOUT_MS); - try - { - do - { - bool eof = !messageEnumerator.DataRemaining; - - //Send first segment - sendTask = session.Socket.SendAsync(messageEnumerator.Current, WebSocketMessageType.Binary, eof); - - /* - * WARNING! - * this code relies on the managed websocket impl that the websocket will read - * the entire buffer before returning. If this is not the case, this code will - * overwrite the memory buffer on the next call to move next. - */ - - //Move to next segment - if (!await messageEnumerator.MoveNextAsync()) - { - break; - } - - //Await previous send - await sendTask; - - } while (true); - } - finally - { - //release semaphore - session.ResponseLock.Release(); - } - - await sendTask; - } - - //No data to send - } - catch (Exception ex) - { - OnProcessError?.Invoke(this, ex); - } - finally - { - session.ReturnContext(context); - } - } - - /// <summary> - /// Processes an out-of-band request message (internal communications) - /// </summary> - /// <param name="outOfBandContext">The <see cref="FBMContext"/> containing the OOB message</param> - /// <returns>A <see cref="Task"/> that completes when the operation completes</returns> - protected virtual Task ProcessOOBAsync(FBMContext outOfBandContext) - { - return Task.CompletedTask; - } - } -} diff --git a/Net.Messaging.FBM/src/Server/FBMListenerBase.cs b/Net.Messaging.FBM/src/Server/FBMListenerBase.cs deleted file mode 100644 index 3e9fde2..0000000 --- a/Net.Messaging.FBM/src/Server/FBMListenerBase.cs +++ /dev/null @@ -1,113 +0,0 @@ -/* -* Copyright (c) 2022 Vaughn Nugent -* -* Library: VNLib -* Package: VNLib.Net.Messaging.FBM -* File: FBMListenerBase.cs -* -* FBMListenerBase.cs is part of VNLib.Net.Messaging.FBM which is part of the larger -* VNLib collection of libraries and utilities. -* -* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License as -* published by the Free Software Foundation, either version 3 of the -* License, or (at your option) any later version. -* -* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see https://www.gnu.org/licenses/. -*/ - -using System; -using System.Threading; -using System.Threading.Tasks; - -using VNLib.Utils.Logging; -using VNLib.Utils.Memory; -using VNLib.Plugins.Essentials; - -namespace VNLib.Net.Messaging.FBM.Server -{ - /// <summary> - /// Provides a simple base class for an <see cref="FBMListener"/> - /// processor - /// </summary> - public abstract class FBMListenerBase - { - - /// <summary> - /// The initialzied listener - /// </summary> - protected FBMListener? Listener { get; private set; } - /// <summary> - /// A provider to write log information to - /// </summary> - protected abstract ILogProvider Log { get; } - - /// <summary> - /// Initializes the <see cref="FBMListener"/> - /// </summary> - /// <param name="heap">The heap to alloc buffers from</param> - protected void InitListener(IUnmangedHeap heap) - { - Listener = new(heap); - //Attach service handler - Listener.OnProcessError += Listener_OnProcessError; - } - - /// <summary> - /// A single event service routine for servicing errors that occur within - /// the listener loop - /// </summary> - /// <param name="sender"></param> - /// <param name="e">The exception that was raised</param> - protected virtual void Listener_OnProcessError(object? sender, Exception e) - { - //Write the error to the log - Log.Error(e); - } - - private async Task OnReceivedAsync(FBMContext context, object? userState, CancellationToken token) - { - try - { - await ProcessAsync(context, userState, token); - } - catch (OperationCanceledException) - { - Log.Debug("Async operation cancelled"); - } - catch(Exception ex) - { - Log.Error(ex); - } - } - - /// <summary> - /// Begins listening for requests on the current websocket until - /// a close message is received or an error occurs - /// </summary> - /// <param name="wss">The <see cref="WebSocketSession"/> to receive messages on</param> - /// <param name="args">The arguments used to configured this listening session</param> - /// <param name="userState">A state token to use for processing events for this connection</param> - /// <returns>A <see cref="Task"/> that completes when the connection closes</returns> - public virtual async Task ListenAsync(WebSocketSession wss, FBMListenerSessionParams args, object? userState) - { - _ = Listener ?? throw new InvalidOperationException("The listener has not been intialized"); - await Listener.ListenAsync(wss, OnReceivedAsync, args, userState); - } - - /// <summary> - /// A method to service an incoming message - /// </summary> - /// <param name="context">The context containing the message to be serviced</param> - /// <param name="userState">A state token passed on client connected</param> - /// <param name="exitToken">A token that reflects the state of the listener</param> - /// <returns>A task that completes when the message has been serviced</returns> - protected abstract Task ProcessAsync(FBMContext context, object? userState, CancellationToken exitToken); - } -} diff --git a/Net.Messaging.FBM/src/Server/FBMListenerSessionParams.cs b/Net.Messaging.FBM/src/Server/FBMListenerSessionParams.cs deleted file mode 100644 index c327475..0000000 --- a/Net.Messaging.FBM/src/Server/FBMListenerSessionParams.cs +++ /dev/null @@ -1,62 +0,0 @@ -/* -* Copyright (c) 2022 Vaughn Nugent -* -* Library: VNLib -* Package: VNLib.Net.Messaging.FBM -* File: FBMListenerSessionParams.cs -* -* FBMListenerSessionParams.cs is part of VNLib.Net.Messaging.FBM which is part of the larger -* VNLib collection of libraries and utilities. -* -* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License as -* published by the Free Software Foundation, either version 3 of the -* License, or (at your option) any later version. -* -* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see https://www.gnu.org/licenses/. -*/ - -using System.Text; - -namespace VNLib.Net.Messaging.FBM.Server -{ - /// <summary> - /// Represents a configuration structure for an <see cref="FBMListener"/> - /// listening session - /// </summary> - public readonly struct FBMListenerSessionParams - { - /// <summary> - /// The size of the buffer to use while reading data from the websocket - /// in the listener loop - /// </summary> - public readonly int RecvBufferSize { get; init; } - /// <summary> - /// The size of the character buffer to store FBMheader values in - /// the <see cref="FBMRequestMessage"/> - /// </summary> - public readonly int MaxHeaderBufferSize { get; init; } - /// <summary> - /// The size of the internal message response buffer when - /// not streaming - /// </summary> - public readonly int ResponseBufferSize { get; init; } - /// <summary> - /// The FMB message header character encoding - /// </summary> - public readonly Encoding HeaderEncoding { get; init; } - - /// <summary> - /// The absolute maxium size (in bytes) message to process before - /// closing the websocket connection. This value should be negotiaed - /// by clients or hard-coded to avoid connection issues - /// </summary> - public readonly int MaxMessageSize { get; init; } - } -} diff --git a/Net.Messaging.FBM/src/Server/FBMRequestMessage.cs b/Net.Messaging.FBM/src/Server/FBMRequestMessage.cs deleted file mode 100644 index ed36571..0000000 --- a/Net.Messaging.FBM/src/Server/FBMRequestMessage.cs +++ /dev/null @@ -1,196 +0,0 @@ -/* -* Copyright (c) 2022 Vaughn Nugent -* -* Library: VNLib -* Package: VNLib.Net.Messaging.FBM -* File: FBMRequestMessage.cs -* -* FBMRequestMessage.cs is part of VNLib.Net.Messaging.FBM which is part of the larger -* VNLib collection of libraries and utilities. -* -* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License as -* published by the Free Software Foundation, either version 3 of the -* License, or (at your option) any later version. -* -* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see https://www.gnu.org/licenses/. -*/ - -using System; -using System.Text; -using System.Buffers; -using System.Text.Json; -using System.Collections.Generic; - -using VNLib.Utils; -using VNLib.Utils.IO; -using VNLib.Utils.Memory; -using VNLib.Utils.Extensions; -using VNLib.Utils.Memory.Caching; - -namespace VNLib.Net.Messaging.FBM.Server -{ - /// <summary> - /// Represents a client request message to be serviced - /// </summary> - public sealed class FBMRequestMessage : IReusable - { - private readonly List<KeyValuePair<HeaderCommand, ReadOnlyMemory<char>>> _headers; - private readonly int HeaderCharBufferSize; - /// <summary> - /// Creates a new resusable <see cref="FBMRequestMessage"/> - /// </summary> - /// <param name="headerBufferSize">The size of the buffer to alloc during initialization</param> - internal FBMRequestMessage(int headerBufferSize) - { - HeaderCharBufferSize = headerBufferSize; - _headers = new(); - } - - private char[]? _headerBuffer; - - /// <summary> - /// The ID of the current message - /// </summary> - public int MessageId { get; private set; } - /// <summary> - /// Gets the underlying socket-id fot the current connection - /// </summary> - public string? ConnectionId { get; private set; } - /// <summary> - /// The raw request message, positioned to the body section of the message data - /// </summary> - public VnMemoryStream? RequestBody { get; private set; } - /// <summary> - /// A collection of headers for the current request - /// </summary> - public IReadOnlyList<KeyValuePair<HeaderCommand, ReadOnlyMemory<char>>> Headers => _headers; - /// <summary> - /// Status flags set during the message parsing - /// </summary> - public HeaderParseError ParseStatus { get; private set; } - /// <summary> - /// The message body data as a <see cref="ReadOnlySpan{T}"/> - /// </summary> - public ReadOnlySpan<byte> BodyData => Helpers.GetRemainingData(RequestBody!); - - /// <summary> - /// Determines if the current message is considered a control frame - /// </summary> - public bool IsControlFrame { get; private set; } - - /// <summary> - /// Prepares the request to be serviced - /// </summary> - /// <param name="vms">The request data packet</param> - /// <param name="socketId">The unique id of the connection</param> - /// <param name="dataEncoding">The data encoding used to decode header values</param> - internal void Prepare(VnMemoryStream vms, string socketId, Encoding dataEncoding) - { - //Store request body - RequestBody = vms; - //Store message id - MessageId = Helpers.GetMessageId(Helpers.ReadLine(vms)); - //Check mid for control frame - if(MessageId == Helpers.CONTROL_FRAME_MID) - { - IsControlFrame = true; - } - else if (MessageId < 1) - { - ParseStatus |= HeaderParseError.InvalidId; - return; - } - - ConnectionId = socketId; - - //sliding window over remaining data from internal buffer - ForwardOnlyMemoryWriter<char> writer = new(_headerBuffer); - - //Accumulate headers - while (true) - { - //Read the next line from the current stream - ReadOnlySpan<byte> line = Helpers.ReadLine(vms); - if (line.IsEmpty) - { - //Done reading headers - break; - } - HeaderCommand cmd = Helpers.GetHeaderCommand(line); - //Get header value - ERRNO charsRead = Helpers.GetHeaderValue(line, writer.Remaining.Span, dataEncoding); - if (charsRead < 0) - { - //Out of buffer space - ParseStatus |= HeaderParseError.HeaderOutOfMem; - break; - } - else if (!charsRead) - { - //Invalid header - ParseStatus |= HeaderParseError.InvalidHeaderRead; - } - else - { - //Store header as a read-only sequence - _headers.Add(new(cmd, writer.Remaining[..(int)charsRead])); - //Shift buffer window - writer.Advance(charsRead); - } - } - } - - /// <summary> - /// Deserializes the request body into a new specified object type - /// </summary> - /// <typeparam name="T">The type of the object to deserialize</typeparam> - /// <param name="jso">The <see cref="JsonSerializerOptions"/> to use while deserializing data</param> - /// <returns>The deserialized object from the request body</returns> - /// <exception cref="JsonException"></exception> - public T? DeserializeBody<T>(JsonSerializerOptions? jso = default) - { - return BodyData.IsEmpty ? default : BodyData.AsJsonObject<T>(jso); - } - /// <summary> - /// Gets a <see cref="JsonDocument"/> of the request body - /// </summary> - /// <returns>The parsed <see cref="JsonDocument"/> if parsed successfully, or null otherwise</returns> - /// <exception cref="JsonException"></exception> - public JsonDocument? GetBodyAsJson() - { - Utf8JsonReader reader = new(BodyData); - return JsonDocument.TryParseValue(ref reader, out JsonDocument? jdoc) ? jdoc : default; - } - - void IReusable.Prepare() - { - ParseStatus = HeaderParseError.None; - //Alloc header buffer - _headerBuffer = ArrayPool<char>.Shared.Rent(HeaderCharBufferSize); - } - - - bool IReusable.Release() - { - //Dispose the request message - RequestBody?.Dispose(); - RequestBody = null; - //Clear headers before freeing buffer - _headers.Clear(); - //Free header-buffer - ArrayPool<char>.Shared.Return(_headerBuffer!); - _headerBuffer = null; - ConnectionId = null; - MessageId = 0; - IsControlFrame = false; - return true; - } - } -} diff --git a/Net.Messaging.FBM/src/Server/FBMResponseMessage.cs b/Net.Messaging.FBM/src/Server/FBMResponseMessage.cs deleted file mode 100644 index 1536c99..0000000 --- a/Net.Messaging.FBM/src/Server/FBMResponseMessage.cs +++ /dev/null @@ -1,226 +0,0 @@ -/* -* Copyright (c) 2022 Vaughn Nugent -* -* Library: VNLib -* Package: VNLib.Net.Messaging.FBM -* File: FBMResponseMessage.cs -* -* FBMResponseMessage.cs is part of VNLib.Net.Messaging.FBM which is part of the larger -* VNLib collection of libraries and utilities. -* -* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License as -* published by the Free Software Foundation, either version 3 of the -* License, or (at your option) any later version. -* -* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see https://www.gnu.org/licenses/. -*/ - -using System; -using System.Text; -using System.Threading; -using System.Threading.Tasks; - -using VNLib.Net.Http; -using VNLib.Utils.IO; -using VNLib.Utils.Extensions; -using VNLib.Utils.Memory.Caching; -using VNLib.Net.Messaging.FBM.Client; - -namespace VNLib.Net.Messaging.FBM.Server -{ - - /// <summary> - /// Represents an FBM request response container. - /// </summary> - public sealed class FBMResponseMessage : IReusable, IFBMMessage - { - internal FBMResponseMessage(int internalBufferSize, Encoding headerEncoding) - { - _headerAccumulator = new HeaderDataAccumulator(internalBufferSize); - _headerEncoding = headerEncoding; - _messageEnumerator = new(this); - } - - private readonly MessageSegmentEnumerator _messageEnumerator; - private readonly ISlindingWindowBuffer<byte> _headerAccumulator; - private readonly Encoding _headerEncoding; - - private IAsyncMessageBody? _messageBody; - - ///<inheritdoc/> - public int MessageId { get; private set; } - - void IReusable.Prepare() - { - (_headerAccumulator as HeaderDataAccumulator)!.Prepare(); - } - - bool IReusable.Release() - { - //Release header accumulator - _headerAccumulator.Close(); - - _messageBody = null; - - MessageId = 0; - - return true; - } - - /// <summary> - /// Initializes the response message with the specified message-id - /// to respond with - /// </summary> - /// <param name="messageId">The message id of the context to respond to</param> - internal void Prepare(int messageId) - { - //Reset accumulator when message id is written - _headerAccumulator.Reset(); - //Write the messageid to the begining of the headers buffer - MessageId = messageId; - _headerAccumulator.Append((byte)HeaderCommand.MessageId); - _headerAccumulator.Append(messageId); - _headerAccumulator.WriteTermination(); - } - - ///<inheritdoc/> - public void WriteHeader(HeaderCommand header, ReadOnlySpan<char> value) - { - WriteHeader((byte)header, value); - } - ///<inheritdoc/> - public void WriteHeader(byte header, ReadOnlySpan<char> value) - { - _headerAccumulator.WriteHeader(header, value, _headerEncoding); - } - - ///<inheritdoc/> - public void WriteBody(ReadOnlySpan<byte> body, ContentType contentType = ContentType.Binary) - { - //Append content type header - WriteHeader(HeaderCommand.ContentType, HttpHelpers.GetContentTypeString(contentType)); - //end header segment - _headerAccumulator.WriteTermination(); - //Write message body - _headerAccumulator.Append(body); - } - - /// <summary> - /// Sets the response message body - /// </summary> - /// <param name="messageBody">The <see cref="IAsyncMessageBody"/> to stream data from</param> - /// <exception cref="InvalidOperationException"></exception> - public void AddMessageBody(IAsyncMessageBody messageBody) - { - if(_messageBody != null) - { - throw new InvalidOperationException("The message body is already set"); - } - - //Append message content type header - WriteHeader(HeaderCommand.ContentType, HttpHelpers.GetContentTypeString(messageBody.ContentType)); - - //end header segment - _headerAccumulator.WriteTermination(); - - //Store message body - _messageBody = messageBody; - - } - - /// <summary> - /// Gets the internal message body enumerator and prepares the message for sending - /// </summary> - /// <param name="cancellationToken">A cancellation token</param> - /// <returns>A value task that returns the message body enumerator</returns> - internal async ValueTask<IAsyncMessageReader> GetResponseDataAsync(CancellationToken cancellationToken) - { - //try to buffer as much data in the header segment first - if(_messageBody?.RemainingSize > 0 && _headerAccumulator.RemainingSize > 0) - { - //Read data from the message - int read = await _messageBody.ReadAsync(_headerAccumulator.RemainingBuffer, cancellationToken); - //Advance accumulator to the read bytes - _headerAccumulator.Advance(read); - } - //return reusable enumerator - return _messageEnumerator; - } - - private class MessageSegmentEnumerator : IAsyncMessageReader - { - private readonly FBMResponseMessage _message; - - bool HeadersRead; - - public MessageSegmentEnumerator(FBMResponseMessage message) - { - _message = message; - } - - public ReadOnlyMemory<byte> Current { get; private set; } - - public bool DataRemaining { get; private set; } - - public async ValueTask<bool> MoveNextAsync() - { - //Attempt to read header segment first - if (!HeadersRead) - { - //Set the accumulated buffer - Current = _message._headerAccumulator.AccumulatedBuffer; - - //Update data remaining flag - DataRemaining = _message._messageBody?.RemainingSize > 0; - - //Set headers read flag - HeadersRead = true; - - return true; - } - else if (_message._messageBody?.RemainingSize > 0) - { - //Use the header buffer as the buffer for the message body - Memory<byte> buffer = _message._headerAccumulator.Buffer; - - //Read body segment - int read = await _message._messageBody.ReadAsync(buffer); - - //Update data remaining flag - DataRemaining = _message._messageBody.RemainingSize > 0; - - if (read > 0) - { - //Store the read segment - Current = buffer[..read]; - return true; - } - } - return false; - } - - public async ValueTask DisposeAsync() - { - //Clear current segment - Current = default; - - //Reset headers read flag - HeadersRead = false; - - //Dispose the message body if set - if (_message._messageBody != null) - { - await _message._messageBody.DisposeAsync(); - } - } - } - } - -} diff --git a/Net.Messaging.FBM/src/Server/HeaderDataAccumulator.cs b/Net.Messaging.FBM/src/Server/HeaderDataAccumulator.cs deleted file mode 100644 index 78b378d..0000000 --- a/Net.Messaging.FBM/src/Server/HeaderDataAccumulator.cs +++ /dev/null @@ -1,89 +0,0 @@ -/* -* Copyright (c) 2022 Vaughn Nugent -* -* Library: VNLib -* Package: VNLib.Net.Messaging.FBM -* File: HeaderDataAccumulator.cs -* -* HeaderDataAccumulator.cs is part of VNLib.Net.Messaging.FBM which is part of the larger -* VNLib collection of libraries and utilities. -* -* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License as -* published by the Free Software Foundation, either version 3 of the -* License, or (at your option) any later version. -* -* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see https://www.gnu.org/licenses/. -*/ - -using System; -using System.Buffers; - -using VNLib.Utils.IO; - - -namespace VNLib.Net.Messaging.FBM.Server -{ - /// <summary> - /// Reusable sliding window impl - /// </summary> - internal class HeaderDataAccumulator : ISlindingWindowBuffer<byte> - { - private readonly int BufferSize; - - private byte[]? _memHandle; - - public HeaderDataAccumulator(int bufferSize) - { - BufferSize = bufferSize; - } - - ///<inheritdoc/> - public int WindowStartPos { get; private set; } - ///<inheritdoc/> - public int WindowEndPos { get; private set; } - ///<inheritdoc/> - public Memory<byte> Buffer => _memHandle.AsMemory(); - - ///<inheritdoc/> - public void Advance(int count) => WindowEndPos += count; - - ///<inheritdoc/> - public void AdvanceStart(int count) => WindowEndPos += count; - - ///<inheritdoc/> - public void Reset() - { - WindowStartPos = 0; - WindowEndPos = 0; - } - - /// <summary> - /// Allocates the internal message buffer - /// </summary> - public void Prepare() - { - _memHandle ??= ArrayPool<byte>.Shared.Rent(BufferSize); - } - - ///<inheritdoc/> - public void Close() - { - Reset(); - - if (_memHandle != null) - { - //Return the buffer to the pool - ArrayPool<byte>.Shared.Return(_memHandle); - _memHandle = null; - } - } - } - -} diff --git a/Net.Messaging.FBM/src/Server/IAsyncMessageBody.cs b/Net.Messaging.FBM/src/Server/IAsyncMessageBody.cs deleted file mode 100644 index 5566520..0000000 --- a/Net.Messaging.FBM/src/Server/IAsyncMessageBody.cs +++ /dev/null @@ -1,57 +0,0 @@ -/* -* Copyright (c) 2022 Vaughn Nugent -* -* Library: VNLib -* Package: VNLib.Net.Messaging.FBM -* File: IAsyncMessageBody.cs -* -* IAsyncMessageBody.cs is part of VNLib.Net.Messaging.FBM which is part of the larger -* VNLib collection of libraries and utilities. -* -* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License as -* published by the Free Software Foundation, either version 3 of the -* License, or (at your option) any later version. -* -* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see https://www.gnu.org/licenses/. -*/ - -using System; -using System.Threading; -using System.Threading.Tasks; - -using VNLib.Net.Http; - -namespace VNLib.Net.Messaging.FBM -{ - /// <summary> - /// A disposable message body container for asynchronously reading a variable length message body - /// </summary> - public interface IAsyncMessageBody : IAsyncDisposable - { - /// <summary> - /// The message body content type - /// </summary> - ContentType ContentType { get; } - - /// <summary> - /// The number of bytes remaining to be read from the message body - /// </summary> - int RemainingSize { get; } - - /// <summary> - /// Reads the next chunk of data from the message body - /// </summary> - /// <param name="buffer">The buffer to copy output data to</param> - /// <param name="token">A token to cancel the operation</param> - /// <returns></returns> - ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken token = default); - } - -} diff --git a/Net.Messaging.FBM/src/Server/IAsyncMessageReader.cs b/Net.Messaging.FBM/src/Server/IAsyncMessageReader.cs deleted file mode 100644 index b2abe8d..0000000 --- a/Net.Messaging.FBM/src/Server/IAsyncMessageReader.cs +++ /dev/null @@ -1,42 +0,0 @@ -/* -* Copyright (c) 2022 Vaughn Nugent -* -* Library: VNLib -* Package: VNLib.Net.Messaging.FBM -* File: IAsyncMessageReader.cs -* -* IAsyncMessageReader.cs is part of VNLib.Net.Messaging.FBM which is part of the larger -* VNLib collection of libraries and utilities. -* -* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License as -* published by the Free Software Foundation, either version 3 of the -* License, or (at your option) any later version. -* -* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see https://www.gnu.org/licenses/. -*/ - -using System; -using System.Collections.Generic; - - -namespace VNLib.Net.Messaging.FBM.Server -{ - /// <summary> - /// Internal message body reader/enumerator for FBM messages - /// </summary> - internal interface IAsyncMessageReader : IAsyncEnumerator<ReadOnlyMemory<byte>> - { - /// <summary> - /// A value that indicates if there is data remaining after a - /// </summary> - bool DataRemaining { get; } - } - -} diff --git a/Net.Messaging.FBM/src/Server/readme.md b/Net.Messaging.FBM/src/Server/readme.md deleted file mode 100644 index 489e58f..0000000 --- a/Net.Messaging.FBM/src/Server/readme.md +++ /dev/null @@ -1,35 +0,0 @@ -# VNLib.Net.Messaging.FBM.Server - -Fixed Buffer Messaging Protocol server library. High performance statful messaging -protocol built on top of HTTP web-sockets. Low/no allocation, completely asynchronous -while providing a TPL API. This library provides a simple asynchronous request/response -architecture to web-sockets. This was initially designed to provide an alternative to -complete HTTP request/response overhead, but allow a simple control flow for work -across a network. - -Messages consist of a 4 byte message id, a collection of headers, and a message body. -The first 4 bytes of a message is the ID (for normal messages a signed integer greater than 0), -0 is reserved for error conditions, and negative numbers are reserved for internal -messages. Headers are identified by a single byte, followed by a variable length UTF8 -encoded character sequence, followed by a termination of 0xFF, 0xF1 (may change). - -### Message structure - 4 byte positive (signed 32-bit integer) message id - 2 byte termination - 1 byte header-id - variable length UTF8 value - 2 byte termination - -- other headers -- - 2 byte termination (extra termination, ie: empty header) - variable length payload - (end of message is the end of the payload) - - -XML Documentation is or will be provided for almost all public exports. APIs are intended to -be sensibly public and immutable to allow for easy extensability (via extension methods). I -often use extension libraries to provide additional functionality. (See cache library) - -This library is likely a niche use case, and is probably not for everyone. Unless you care -about reasonably efficient high frequency request/response messaging, this probably isnt -for you. This library provides a reasonable building block for distributed lock mechanisms -and small data caching.
\ No newline at end of file |