diff options
author | vnugent <public@vaughnnugent.com> | 2023-11-02 01:49:02 -0400 |
---|---|---|
committer | vnugent <public@vaughnnugent.com> | 2023-11-02 01:49:02 -0400 |
commit | 9e3dd9be0f0ec7aaef1a719f09f96425e66369df (patch) | |
tree | 59b8bd4ace8750327db80823fa1e5eccdf44bc74 /lib/Net.Messaging.FBM/src/Server | |
parent | eafefadc4b858e5b5be481662a2b0c8e47a43bf4 (diff) |
may have gottem carried away
Diffstat (limited to 'lib/Net.Messaging.FBM/src/Server')
10 files changed, 246 insertions, 222 deletions
diff --git a/lib/Net.Messaging.FBM/src/Server/FBMContext.cs b/lib/Net.Messaging.FBM/src/Server/FBMContext.cs index 6d5f3bd..f2a2fea 100644 --- a/lib/Net.Messaging.FBM/src/Server/FBMContext.cs +++ b/lib/Net.Messaging.FBM/src/Server/FBMContext.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2022 Vaughn Nugent +* Copyright (c) 2023 Vaughn Nugent * * Library: VNLib * Package: VNLib.Net.Messaging.FBM @@ -55,10 +55,11 @@ namespace VNLib.Net.Messaging.FBM.Server /// <param name="requestHeaderBufferSize">The size in characters of the request header buffer</param> /// <param name="responseBufferSize">The size in characters of the response header buffer</param> /// <param name="headerEncoding">The message header encoding instance</param> - public FBMContext(int requestHeaderBufferSize, int responseBufferSize, Encoding headerEncoding) + /// <param name="manager">The context memory manager</param> + public FBMContext(int requestHeaderBufferSize, int responseBufferSize, Encoding headerEncoding, IFBMMemoryManager manager) { - _request = Request = new(requestHeaderBufferSize); - _response = Response = new(responseBufferSize, headerEncoding); + _request = Request = new(requestHeaderBufferSize, manager); + _response = Response = new(responseBufferSize, headerEncoding, manager); _headerEncoding = headerEncoding; } diff --git a/lib/Net.Messaging.FBM/src/Server/FBMListener.cs b/lib/Net.Messaging.FBM/src/Server/FBMListener.cs index 46ee160..30fa1ac 100644 --- a/lib/Net.Messaging.FBM/src/Server/FBMListener.cs +++ b/lib/Net.Messaging.FBM/src/Server/FBMListener.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2022 Vaughn Nugent +* Copyright (c) 2023 Vaughn Nugent * * Library: VNLib * Package: VNLib.Net.Messaging.FBM @@ -24,7 +24,6 @@ using System; using System.IO; -using System.Buffers; using System.Threading; using System.Net.WebSockets; using System.Threading.Tasks; @@ -32,7 +31,6 @@ using System.Threading.Tasks; using VNLib.Utils.IO; using VNLib.Utils.Async; using VNLib.Utils.Memory; -using VNLib.Utils.Extensions; using VNLib.Utils.Memory.Caching; using VNLib.Plugins.Essentials; @@ -40,16 +38,6 @@ namespace VNLib.Net.Messaging.FBM.Server { /// <summary> - /// Method delegate for processing FBM messages from an <see cref="FBMListener"/> - /// when messages are received - /// </summary> - /// <param name="context">The message/connection context</param> - /// <param name="userState">The state parameter passed on client connected</param> - /// <param name="cancellationToken">A token that reflects the state of the listener</param> - /// <returns>A <see cref="Task"/> that resolves when processing is complete</returns> - public delegate Task RequestHandler(FBMContext context, object? userState, CancellationToken cancellationToken); - - /// <summary> /// A FBM protocol listener. Listens for messages on a <see cref="WebSocketSession"/> /// and raises events on requests. /// </summary> @@ -58,22 +46,16 @@ namespace VNLib.Net.Messaging.FBM.Server public const int SEND_SEMAPHORE_TIMEOUT_MS = 10 * 1000; - private readonly IUnmangedHeap Heap; - - /// <summary> - /// Raised when a response processing error occured - /// </summary> - public event EventHandler<Exception>? OnProcessError; + private readonly IFBMMemoryManager MemoryManger; /// <summary> /// Creates a new <see cref="FBMListener"/> instance ready for /// processing connections /// </summary> /// <param name="heap">The heap to alloc buffers from</param> - public FBMListener(IUnmangedHeap heap) - { - Heap = heap; - } + /// <exception cref="ArgumentNullException"></exception> + public FBMListener(IFBMMemoryManager heap) => MemoryManger = heap ?? throw new ArgumentNullException(nameof(heap)); + #pragma warning disable CA2007 // Consider calling ConfigureAwait on the awaited task /// <summary> @@ -83,31 +65,38 @@ namespace VNLib.Net.Messaging.FBM.Server /// <param name="wss">The <see cref="WebSocketSession"/> to receive messages on</param> /// <param name="handler">The callback method to handle incoming requests</param> /// <param name="args">The arguments used to configured this listening session</param> - /// <param name="userState">A state parameter</param> /// <returns>A <see cref="Task"/> that completes when the connection closes</returns> - public async Task ListenAsync(WebSocketSession wss, RequestHandler handler, FBMListenerSessionParams args, object? userState) + public async Task ListenAsync(WebSocketSession wss, IFBMServerMessageHandler handler, FBMListenerSessionParams args) { _ = wss ?? throw new ArgumentNullException(nameof(wss)); _ = handler ?? throw new ArgumentNullException(nameof(handler)); - ListeningSession session = new(wss, handler, in args, userState); - - //Alloc a recieve buffer - using IMemoryOwner<byte> recvBuffer = Heap.DirectAlloc<byte>(args.RecvBufferSize); + ListeningSession session = new(wss, handler, in args, MemoryManger); //Init new queue for dispatching work AsyncQueue<VnMemoryStream> workQueue = new(true, true); //Start a task to process the queue Task queueWorker = QueueWorkerDoWork(workQueue, session); - + + //Alloc buffer + IFBMMemoryHandle memHandle = MemoryManger.InitHandle(); + MemoryManger.AllocBuffer(memHandle, args.RecvBufferSize); + try { + if(!MemoryManger.TryGetHeap(out IUnmangedHeap? heap)) + { + throw new NotSupportedException("The memory manager must export an unmanaged heap"); + } + + Memory<byte> recvBuffer = memHandle.GetMemory(); + //Listen for incoming messages while (true) { //Receive a message - ValueWebSocketReceiveResult result = await wss.ReceiveAsync(recvBuffer.Memory); + ValueWebSocketReceiveResult result = await wss.ReceiveAsync(recvBuffer); //If a close message has been received, we can gracefully exit if (result.MessageType == WebSocketMessageType.Close) { @@ -118,13 +107,13 @@ namespace VNLib.Net.Messaging.FBM.Server } //create buffer for storing data, pre alloc with initial data - VnMemoryStream request = new(Heap, recvBuffer.Memory[..result.Count]); + VnMemoryStream request = new(heap, recvBuffer[..result.Count]); //Streaming read while (!result.EndOfMessage) { //Read more data - result = await wss.ReceiveAsync(recvBuffer.Memory); + result = await wss.ReceiveAsync(recvBuffer); //Make sure the request is small enough to buffer if (request.Length + result.Count > args.MaxMessageSize) { @@ -135,8 +124,9 @@ namespace VNLib.Net.Messaging.FBM.Server //break listen loop goto Exit; } + //write to buffer - request.Write(recvBuffer.Memory.Span[..result.Count]); + request.Write(memHandle.GetSpan()[..result.Count]); } //Make sure data is available if (request.Length == 0) @@ -195,14 +185,19 @@ namespace VNLib.Net.Messaging.FBM.Server if ((context.Request.ParseStatus & HeaderParseError.InvalidId) > 0) { - OnProcessError?.Invoke(this, new FBMException($"Invalid messageid {context.Request.MessageId}, message length {data.Length}")); - return; + Exception cause = new FBMException($"Invalid messageid {context.Request.MessageId}, message length {data.Length}"); + _ = session.Handler.OnInvalidMessage(context, cause); + return; //Cannot continue on invalid message id } //Check parse status flags if ((context.Request.ParseStatus & HeaderParseError.HeaderOutOfMem) > 0) { - OnProcessError?.Invoke(this, new FBMException("Packet received with not enough space to store headers")); + Exception cause = new FBMException("Packet received with not enough space to store headers"); + if(!session.Handler.OnInvalidMessage(context, cause)) + { + return; + } } //Determine if request is an out-of-band message else if (context.Request.MessageId == Helpers.CONTROL_FRAME_MID) @@ -213,18 +208,15 @@ namespace VNLib.Net.Messaging.FBM.Server else { //Invoke normal message handler - await session.OnRecieved.Invoke(context, session.UserState, session.CancellationToken); + await session.Handler.HandleMessage(context, session.CancellationToken); } - //Get response data - - await using IAsyncMessageReader messageEnumerator = await context.Response.GetResponseDataAsync(session.CancellationToken); - + //Get response data reader + await using IAsyncMessageReader messageEnumerator = context.Response.GetResponseData(); //Load inital segment if (await messageEnumerator.MoveNextAsync() && !session.CancellationToken.IsCancellationRequested) - { - ValueTask sendTask; + { //Syncrhonize access to send data because we may need to stream data to the client await session.ResponseLock.WaitAsync(SEND_SEMAPHORE_TIMEOUT_MS); @@ -233,10 +225,8 @@ namespace VNLib.Net.Messaging.FBM.Server { do { - bool eof = !messageEnumerator.DataRemaining; - - //Send first segment - sendTask = session.Socket.SendAsync(messageEnumerator.Current, WebSocketMessageType.Binary, eof); + //Send current segment + await session.Socket.SendAsync(messageEnumerator.Current, WebSocketMessageType.Binary, !messageEnumerator.DataRemaining); /* * WARNING! @@ -250,9 +240,6 @@ namespace VNLib.Net.Messaging.FBM.Server { break; } - - //Await previous send - await sendTask; } while (true); } @@ -261,15 +248,13 @@ namespace VNLib.Net.Messaging.FBM.Server //release semaphore session.ResponseLock.Release(); } - - await sendTask; } //No data to send } catch (Exception ex) { - OnProcessError?.Invoke(this, ex); + session.Handler.OnProcessError(ex); } finally { @@ -295,25 +280,23 @@ namespace VNLib.Net.Messaging.FBM.Server private readonly CancellationTokenSource Cancellation; private readonly CancellationTokenRegistration Registration; private readonly FBMListenerSessionParams Params; + private readonly IFBMMemoryManager MemManager; - public readonly object? UserState; - public readonly SemaphoreSlim ResponseLock; public readonly WebSocketSession Socket; - public readonly RequestHandler OnRecieved; + public readonly IFBMServerMessageHandler Handler; public CancellationToken CancellationToken => Cancellation.Token; - - public ListeningSession(WebSocketSession session, RequestHandler onRecieved, in FBMListenerSessionParams args, object? userState) + public ListeningSession(WebSocketSession session, IFBMServerMessageHandler handler, in FBMListenerSessionParams args, IFBMMemoryManager memManager) { Params = args; Socket = session; - UserState = userState; - OnRecieved = onRecieved; + Handler = handler; + MemManager = memManager; //Create cancellation and register for session close Cancellation = new(); @@ -323,7 +306,7 @@ namespace VNLib.Net.Messaging.FBM.Server CtxStore = ObjectRental.CreateReusable(ContextCtor); } - private FBMContext ContextCtor() => new(Params.MaxHeaderBufferSize, Params.ResponseBufferSize, Params.HeaderEncoding); + private FBMContext ContextCtor() => new(Params.MaxHeaderBufferSize, Params.ResponseBufferSize, Params.HeaderEncoding, MemManager); /// <summary> /// Cancels any pending opreations relating to the current session @@ -358,7 +341,6 @@ namespace VNLib.Net.Messaging.FBM.Server /// <exception cref="ObjectDisposedException"></exception> public FBMContext RentContext() { - if (Cancellation.IsCancellationRequested) { throw new ObjectDisposedException("The instance has been disposed"); diff --git a/lib/Net.Messaging.FBM/src/Server/FBMListenerBase.cs b/lib/Net.Messaging.FBM/src/Server/FBMListenerBase.cs index 3e9fde2..71b1c8f 100644 --- a/lib/Net.Messaging.FBM/src/Server/FBMListenerBase.cs +++ b/lib/Net.Messaging.FBM/src/Server/FBMListenerBase.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2022 Vaughn Nugent +* Copyright (c) 2023 Vaughn Nugent * * Library: VNLib * Package: VNLib.Net.Messaging.FBM @@ -27,67 +27,29 @@ using System.Threading; using System.Threading.Tasks; using VNLib.Utils.Logging; -using VNLib.Utils.Memory; using VNLib.Plugins.Essentials; namespace VNLib.Net.Messaging.FBM.Server { + /// <summary> /// Provides a simple base class for an <see cref="FBMListener"/> /// processor /// </summary> - public abstract class FBMListenerBase + public abstract class FBMListenerBase<T> : IFBMServerErrorHandler { /// <summary> /// The initialzied listener /// </summary> - protected FBMListener? Listener { get; private set; } + protected abstract FBMListener Listener { get; } + /// <summary> /// A provider to write log information to /// </summary> protected abstract ILogProvider Log { get; } /// <summary> - /// Initializes the <see cref="FBMListener"/> - /// </summary> - /// <param name="heap">The heap to alloc buffers from</param> - protected void InitListener(IUnmangedHeap heap) - { - Listener = new(heap); - //Attach service handler - Listener.OnProcessError += Listener_OnProcessError; - } - - /// <summary> - /// A single event service routine for servicing errors that occur within - /// the listener loop - /// </summary> - /// <param name="sender"></param> - /// <param name="e">The exception that was raised</param> - protected virtual void Listener_OnProcessError(object? sender, Exception e) - { - //Write the error to the log - Log.Error(e); - } - - private async Task OnReceivedAsync(FBMContext context, object? userState, CancellationToken token) - { - try - { - await ProcessAsync(context, userState, token); - } - catch (OperationCanceledException) - { - Log.Debug("Async operation cancelled"); - } - catch(Exception ex) - { - Log.Error(ex); - } - } - - /// <summary> /// Begins listening for requests on the current websocket until /// a close message is received or an error occurs /// </summary> @@ -95,10 +57,13 @@ namespace VNLib.Net.Messaging.FBM.Server /// <param name="args">The arguments used to configured this listening session</param> /// <param name="userState">A state token to use for processing events for this connection</param> /// <returns>A <see cref="Task"/> that completes when the connection closes</returns> - public virtual async Task ListenAsync(WebSocketSession wss, FBMListenerSessionParams args, object? userState) + /// <exception cref="InvalidOperationException"></exception> + public virtual Task ListenAsync(WebSocketSession wss, T userState, FBMListenerSessionParams args) { _ = Listener ?? throw new InvalidOperationException("The listener has not been intialized"); - await Listener.ListenAsync(wss, OnReceivedAsync, args, userState); + //Initn new event handler + FBMEventHandler handler = new(userState, this); + return Listener.ListenAsync(wss, handler, args); } /// <summary> @@ -108,6 +73,30 @@ namespace VNLib.Net.Messaging.FBM.Server /// <param name="userState">A state token passed on client connected</param> /// <param name="exitToken">A token that reflects the state of the listener</param> /// <returns>A task that completes when the message has been serviced</returns> - protected abstract Task ProcessAsync(FBMContext context, object? userState, CancellationToken exitToken); + protected abstract Task ProcessAsync(FBMContext context, T? userState, CancellationToken exitToken); + + ///<inheritdoc/> + public virtual bool OnInvalidMessage(FBMContext context, Exception ex) + { + Log.Error("Invalid message received for session {ses}\n{ex}", context.Request.ConnectionId, ex); + //Invalid id should be captured already, so if oom, do not allow, but if a single header is invalid, it will be ignored by default + return !context.Request.ParseStatus.HasFlag(HeaderParseError.HeaderOutOfMem); + } + + ///<inheritdoc/> + public virtual void OnProcessError(Exception ex) => Log.Error(ex); + + + private sealed record class FBMEventHandler(T State, FBMListenerBase<T> Lb) : IFBMServerMessageHandler + { + ///<inheritdoc/> + public Task HandleMessage(FBMContext context, CancellationToken cancellationToken) => Lb.ProcessAsync(context, State, cancellationToken); + + ///<inheritdoc/> + public bool OnInvalidMessage(FBMContext context, Exception ex) => Lb.OnInvalidMessage(context, ex); + + ///<inheritdoc/> + public void OnProcessError(Exception ex) => Lb.OnProcessError(ex); + } } } diff --git a/lib/Net.Messaging.FBM/src/Server/FBMListenerSessionParams.cs b/lib/Net.Messaging.FBM/src/Server/FBMListenerSessionParams.cs index ccf79db..0b4fa5b 100644 --- a/lib/Net.Messaging.FBM/src/Server/FBMListenerSessionParams.cs +++ b/lib/Net.Messaging.FBM/src/Server/FBMListenerSessionParams.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2022 Vaughn Nugent +* Copyright (c) 2023 Vaughn Nugent * * Library: VNLib * Package: VNLib.Net.Messaging.FBM @@ -30,7 +30,7 @@ namespace VNLib.Net.Messaging.FBM.Server /// Represents a configuration structure for an <see cref="FBMListener"/> /// listening session /// </summary> - public readonly struct FBMListenerSessionParams + public readonly record struct FBMListenerSessionParams { /// <summary> /// The size of the buffer to use while reading data from the websocket diff --git a/lib/Net.Messaging.FBM/src/Server/FBMRequestMessage.cs b/lib/Net.Messaging.FBM/src/Server/FBMRequestMessage.cs index db0655a..e9ff9f5 100644 --- a/lib/Net.Messaging.FBM/src/Server/FBMRequestMessage.cs +++ b/lib/Net.Messaging.FBM/src/Server/FBMRequestMessage.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2022 Vaughn Nugent +* Copyright (c) 2023 Vaughn Nugent * * Library: VNLib * Package: VNLib.Net.Messaging.FBM @@ -24,13 +24,12 @@ using System; using System.Text; -using System.Buffers; -using System.Text.Json; using System.Collections.Generic; +using System.Runtime.InteropServices; using System.Runtime.CompilerServices; using VNLib.Utils.IO; -using VNLib.Utils.Extensions; +using VNLib.Utils.Memory; using VNLib.Utils.Memory.Caching; namespace VNLib.Net.Messaging.FBM.Server @@ -42,18 +41,21 @@ namespace VNLib.Net.Messaging.FBM.Server { private readonly List<FBMMessageHeader> _headers; private readonly int HeaderBufferSize; + private readonly IFBMMemoryManager _memoryManager; + private readonly IFBMSpanOnlyMemoryHandle _memHandle; /// <summary> /// Creates a new resusable <see cref="FBMRequestMessage"/> /// </summary> /// <param name="headerBufferSize">The size of the buffer to alloc during initialization</param> - internal FBMRequestMessage(int headerBufferSize) + /// <param name="manager">The memory manager to use for the message</param> + internal FBMRequestMessage(int headerBufferSize, IFBMMemoryManager manager) { HeaderBufferSize = headerBufferSize; _headers = new(); + _memoryManager = manager; + _memHandle = _memoryManager.InitSpanOnly(); } - - private char[]? _headerBuffer; /// <summary> /// The ID of the current message @@ -115,35 +117,13 @@ namespace VNLib.Net.Messaging.FBM.Server //Parse headers ParseStatus = Helpers.ParseHeaders(vms, this, _headers, dataEncoding); } - - /// <summary> - /// Deserializes the request body into a new specified object type - /// </summary> - /// <typeparam name="T">The type of the object to deserialize</typeparam> - /// <param name="jso">The <see cref="JsonSerializerOptions"/> to use while deserializing data</param> - /// <returns>The deserialized object from the request body</returns> - /// <exception cref="JsonException"></exception> - public T? DeserializeBody<T>(JsonSerializerOptions? jso = default) - { - return BodyData.IsEmpty ? default : BodyData.AsJsonObject<T>(jso); - } - - /// <summary> - /// Gets a <see cref="JsonDocument"/> of the request body - /// </summary> - /// <returns>The parsed <see cref="JsonDocument"/> if parsed successfully, or null otherwise</returns> - /// <exception cref="JsonException"></exception> - public JsonDocument? GetBodyAsJson() - { - Utf8JsonReader reader = new(BodyData); - return JsonDocument.TryParseValue(ref reader, out JsonDocument? jdoc) ? jdoc : default; - } + void IReusable.Prepare() { ParseStatus = HeaderParseError.None; //Alloc header buffer - _headerBuffer = ArrayPool<char>.Shared.Rent(HeaderBufferSize); + _memoryManager.AllocBuffer(_memHandle, MemoryUtil.ByteCount<char>(HeaderBufferSize)); } @@ -155,8 +135,7 @@ namespace VNLib.Net.Messaging.FBM.Server //Clear headers before freeing buffer _headers.Clear(); //Free header-buffer - ArrayPool<char>.Shared.Return(_headerBuffer!); - _headerBuffer = null; + _memoryManager.FreeBuffer(_memHandle); ConnectionId = null; MessageId = 0; IsControlFrame = false; @@ -165,11 +144,16 @@ namespace VNLib.Net.Messaging.FBM.Server [MethodImpl(MethodImplOptions.AggressiveInlining)] - Span<char> IFBMHeaderBuffer.GetSpan(int offset, int count) - => _headerBuffer != null ? _headerBuffer.AsSpan(offset, count) : throw new InvalidOperationException("The buffer is no longer available"); + Span<char> IFBMHeaderBuffer.GetSpan(int offset, int count) + { + //Cast to char buffer + Span<char> chars = MemoryMarshal.Cast<byte, char>(_memHandle.GetSpan()); + //Return the requested span + return chars.Slice(offset, count); + } [MethodImpl(MethodImplOptions.AggressiveInlining)] - Span<char> IFBMHeaderBuffer.GetSpan() => _headerBuffer ?? throw new InvalidOperationException("The buffer is no longer available"); + Span<char> IFBMHeaderBuffer.GetSpan() => MemoryMarshal.Cast<byte, char>(_memHandle.GetSpan()); } } diff --git a/lib/Net.Messaging.FBM/src/Server/FBMResponseMessage.cs b/lib/Net.Messaging.FBM/src/Server/FBMResponseMessage.cs index 9ca6b4d..1e26140 100644 --- a/lib/Net.Messaging.FBM/src/Server/FBMResponseMessage.cs +++ b/lib/Net.Messaging.FBM/src/Server/FBMResponseMessage.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2022 Vaughn Nugent +* Copyright (c) 2023 Vaughn Nugent * * Library: VNLib * Package: VNLib.Net.Messaging.FBM @@ -24,14 +24,12 @@ using System; using System.Text; -using System.Threading; using System.Threading.Tasks; using VNLib.Net.Http; using VNLib.Utils.IO; using VNLib.Utils.Extensions; using VNLib.Utils.Memory.Caching; -using VNLib.Net.Messaging.FBM.Client; namespace VNLib.Net.Messaging.FBM.Server { @@ -41,9 +39,9 @@ namespace VNLib.Net.Messaging.FBM.Server /// </summary> public sealed class FBMResponseMessage : IReusable, IFBMMessage { - internal FBMResponseMessage(int internalBufferSize, Encoding headerEncoding) + internal FBMResponseMessage(int internalBufferSize, Encoding headerEncoding, IFBMMemoryManager manager) { - _headerAccumulator = new HeaderDataAccumulator(internalBufferSize); + _headerAccumulator = new HeaderDataAccumulator(internalBufferSize, manager); _headerEncoding = headerEncoding; _messageEnumerator = new(this); } @@ -134,92 +132,76 @@ namespace VNLib.Net.Messaging.FBM.Server /// <summary> /// Gets the internal message body enumerator and prepares the message for sending /// </summary> - /// <param name="cancellationToken">A cancellation token</param> /// <returns>A value task that returns the message body enumerator</returns> - internal async ValueTask<IAsyncMessageReader> GetResponseDataAsync(CancellationToken cancellationToken) - { - //try to buffer as much data in the header segment first - if(MessageBody?.RemainingSize > 0 && _headerAccumulator.RemainingSize > 0) - { - //Read data from the message - int read = await MessageBody.ReadAsync(_headerAccumulator.RemainingBuffer, cancellationToken); - - //Advance accumulator to the read bytes - _headerAccumulator.Advance(read); - } - //return reusable enumerator - return _messageEnumerator; - } + internal IAsyncMessageReader GetResponseData() => _messageEnumerator; private sealed class MessageSegmentEnumerator : IAsyncMessageReader { private readonly FBMResponseMessage _message; + private readonly ISlindingWindowBuffer<byte> _accumulator; bool HeadersRead; public MessageSegmentEnumerator(FBMResponseMessage message) { _message = message; + _accumulator = _message._headerAccumulator; } - public ReadOnlyMemory<byte> Current { get; private set; } + ///<inheritdoc/> + public ReadOnlyMemory<byte> Current => _accumulator.AccumulatedBuffer; - public bool DataRemaining { get; private set; } + ///<inheritdoc/> + public bool DataRemaining => _message.MessageBody?.RemainingSize > 0; + ///<inheritdoc/> public async ValueTask<bool> MoveNextAsync() { //Attempt to read header segment first if (!HeadersRead) { - //Set the accumulated buffer - Current = _message._headerAccumulator.AccumulatedBuffer; + /* + * If headers have not been read yet, we can attempt to buffer as much + * of the message body into the header accumulator buffer as possible. This will + * reduce message fragmentation. + */ + if (DataRemaining && _accumulator.RemainingSize > 0) + { + int read = await _message.MessageBody.ReadAsync(_accumulator.RemainingBuffer).ConfigureAwait(false); - //Update data remaining flag - DataRemaining = _message.MessageBody?.RemainingSize > 0; + //Advance accumulator to the read bytes + _accumulator.Advance(read); + } //Set headers read flag HeadersRead = true; return true; } - else if (_message.MessageBody?.RemainingSize > 0) + else if (DataRemaining) { - //Use the header buffer as the buffer for the message body - Memory<byte> buffer = _message._headerAccumulator.Buffer; + //Reset the accumulator so we can read another segment + _accumulator.Reset(); //Read body segment - int read = await _message.MessageBody.ReadAsync(buffer); + int read = await _message.MessageBody.ReadAsync(_accumulator.RemainingBuffer); - //Update data remaining flag - DataRemaining = _message.MessageBody.RemainingSize > 0; + //Advance accumulator to the read bytes + _accumulator.Advance(read); - if (read > 0) - { - //Store the read segment - Current = buffer[..read]; - return true; - } + return read > 0; } return false; } + ///<inheritdoc/> public ValueTask DisposeAsync() { - //Clear current segment - Current = default; - //Reset headers read flag HeadersRead = false; - + //Dispose the message body if set - if (_message.MessageBody != null) - { - return _message.MessageBody.DisposeAsync(); - } - else - { - return ValueTask.CompletedTask; - } + return _message.MessageBody != null ? _message.MessageBody.DisposeAsync() : ValueTask.CompletedTask; } } } diff --git a/lib/Net.Messaging.FBM/src/Server/HeaderDataAccumulator.cs b/lib/Net.Messaging.FBM/src/Server/HeaderDataAccumulator.cs index 423a26e..891c583 100644 --- a/lib/Net.Messaging.FBM/src/Server/HeaderDataAccumulator.cs +++ b/lib/Net.Messaging.FBM/src/Server/HeaderDataAccumulator.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2022 Vaughn Nugent +* Copyright (c) 2023 Vaughn Nugent * * Library: VNLib * Package: VNLib.Net.Messaging.FBM @@ -23,25 +23,28 @@ */ using System; -using System.Buffers; using VNLib.Utils.IO; namespace VNLib.Net.Messaging.FBM.Server { + /// <summary> /// Reusable sliding window impl /// </summary> internal sealed class HeaderDataAccumulator : ISlindingWindowBuffer<byte> { - private readonly int BufferSize; - - private byte[]? _memHandle; + private readonly int _bufferSize; + private readonly IFBMMemoryManager _memManager; + private readonly IFBMMemoryHandle _handle; + - public HeaderDataAccumulator(int bufferSize) + public HeaderDataAccumulator(int bufferSize, IFBMMemoryManager memManager) { - BufferSize = bufferSize; + _bufferSize = bufferSize; + _memManager = memManager; + _handle = memManager.InitHandle(); } ///<inheritdoc/> @@ -49,7 +52,7 @@ namespace VNLib.Net.Messaging.FBM.Server ///<inheritdoc/> public int WindowEndPos { get; private set; } ///<inheritdoc/> - public Memory<byte> Buffer => _memHandle.AsMemory(); + public Memory<byte> Buffer => _handle.GetMemory(); ///<inheritdoc/> public void Advance(int count) => WindowEndPos += count; @@ -67,22 +70,13 @@ namespace VNLib.Net.Messaging.FBM.Server /// <summary> /// Allocates the internal message buffer /// </summary> - public void Prepare() - { - _memHandle ??= ArrayPool<byte>.Shared.Rent(BufferSize); - } + public void Prepare() => _memManager.AllocBuffer(_handle, _bufferSize); ///<inheritdoc/> public void Close() { Reset(); - - if (_memHandle != null) - { - //Return the buffer to the pool - ArrayPool<byte>.Shared.Return(_memHandle); - _memHandle = null; - } + _memManager.FreeBuffer(_handle); } } diff --git a/lib/Net.Messaging.FBM/src/Server/IAsyncMessageReader.cs b/lib/Net.Messaging.FBM/src/Server/IAsyncMessageReader.cs index b2abe8d..abb3600 100644 --- a/lib/Net.Messaging.FBM/src/Server/IAsyncMessageReader.cs +++ b/lib/Net.Messaging.FBM/src/Server/IAsyncMessageReader.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2022 Vaughn Nugent +* Copyright (c) 2023 Vaughn Nugent * * Library: VNLib * Package: VNLib.Net.Messaging.FBM @@ -34,7 +34,7 @@ namespace VNLib.Net.Messaging.FBM.Server internal interface IAsyncMessageReader : IAsyncEnumerator<ReadOnlyMemory<byte>> { /// <summary> - /// A value that indicates if there is data remaining after a + /// A value that indicates if there is data remaining after a read /// </summary> bool DataRemaining { get; } } diff --git a/lib/Net.Messaging.FBM/src/Server/IFBMServerErrorHandler.cs b/lib/Net.Messaging.FBM/src/Server/IFBMServerErrorHandler.cs new file mode 100644 index 0000000..caa4f96 --- /dev/null +++ b/lib/Net.Messaging.FBM/src/Server/IFBMServerErrorHandler.cs @@ -0,0 +1,49 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Net.Messaging.FBM +* File: IFBMServerErrorHandler.cs +* +* IFBMServerErrorHandler.cs is part of VNLib.Net.Messaging.FBM which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; + +namespace VNLib.Net.Messaging.FBM.Server +{ + /// <summary> + /// An server side FBM protocol error handler abstraction + /// </summary> + public interface IFBMServerErrorHandler + { + /// <summary> + /// An exception handler for unhandled events that occur during a listening session + /// </summary> + /// <param name="ex">The exception that caused this handler to be invoked</param> + void OnProcessError(Exception ex); + + /// <summary> + /// An exception handler for invalid messages that occur during a listening session. + /// NOTE: The context parameter is likely in an invlaid state and should be read carefully + /// </summary> + /// <param name="context">The context that the error occured while parsing on</param> + /// <param name="ex">The exception explaining the reason this handler was invoked</param> + /// <returns>A value that indicates if the server should continue processing</returns> + bool OnInvalidMessage(FBMContext context, Exception ex); + } +} diff --git a/lib/Net.Messaging.FBM/src/Server/IFBMServerMessageHandler.cs b/lib/Net.Messaging.FBM/src/Server/IFBMServerMessageHandler.cs new file mode 100644 index 0000000..532db5f --- /dev/null +++ b/lib/Net.Messaging.FBM/src/Server/IFBMServerMessageHandler.cs @@ -0,0 +1,43 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Net.Messaging.FBM +* File: IFBMServerMessageHandler.cs +* +* IFBMServerMessageHandler.cs is part of VNLib.Net.Messaging.FBM which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System.Threading; +using System.Threading.Tasks; + +namespace VNLib.Net.Messaging.FBM.Server +{ + /// <summary> + /// A server side FBM protocol handler + /// </summary> + public interface IFBMServerMessageHandler : IFBMServerErrorHandler + { + /// <summary> + /// Handles processing of a normal incoming message + /// </summary> + /// <param name="context">The context to process for this new message</param> + /// <param name="cancellationToken">A token that signals the session has been cancelled</param> + /// <returns>A task representing the asynchronous work</returns> + Task HandleMessage(FBMContext context, CancellationToken cancellationToken); + } +} |