diff options
Diffstat (limited to 'lib/Net.Messaging.FBM')
24 files changed, 746 insertions, 388 deletions
diff --git a/lib/Net.Messaging.FBM/src/Client/FBMBuffer.cs b/lib/Net.Messaging.FBM/src/Client/FBMBuffer.cs index fac41a6..698a98b 100644 --- a/lib/Net.Messaging.FBM/src/Client/FBMBuffer.cs +++ b/lib/Net.Messaging.FBM/src/Client/FBMBuffer.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2022 Vaughn Nugent +* Copyright (c) 2023 Vaughn Nugent * * Library: VNLib * Package: VNLib.Net.Messaging.FBM @@ -28,28 +28,50 @@ using System.Runtime.CompilerServices; using System.Runtime.InteropServices; using VNLib.Utils.IO; +using VNLib.Utils.Memory.Caching; namespace VNLib.Net.Messaging.FBM.Client { /// <summary> /// Represents a shared internal character and bianry buffer for /// </summary> - internal sealed class FBMBuffer : IFBMHeaderBuffer, IDisposable + internal sealed class FBMBuffer : IFBMHeaderBuffer, IDisposable, IReusable { - private readonly IMemoryOwner<byte> Handle; + private readonly IFBMMemoryHandle Handle; + private readonly IFBMMemoryManager _memoryManager; + private readonly int _size; private readonly BufferWriter _writer; private readonly BinaryRequestAccumulator _requestAccumulator; - internal FBMBuffer(IMemoryOwner<byte> handle) + internal FBMBuffer(IFBMMemoryManager manager, int bufferSize) { - Handle = handle; + _memoryManager = manager; + Handle = manager.InitHandle(); _writer = new(this); - _requestAccumulator = new(handle.Memory); + _size = bufferSize; + _requestAccumulator = new(this, bufferSize); } + /* + * Reusable methods will alloc and free buffers during + * normal operation. + */ + + ///<inheritdoc/> + public void Prepare() => _memoryManager.AllocBuffer(Handle, _size); + + ///<inheritdoc/> + public bool Release() + { + _memoryManager.FreeBuffer(Handle); + return true; + } + + public void Dispose() => _ = Release(); + /// <summary> /// Gets the internal request data accumulator /// </summary> @@ -73,13 +95,6 @@ namespace VNLib.Net.Messaging.FBM.Client //Return the internal writer return _writer; } - - - public void Dispose() - { - //Dispose handle - Handle.Dispose(); - } /// <summary> /// Resets the request accumulator and writes the initial message id @@ -91,7 +106,7 @@ namespace VNLib.Net.Messaging.FBM.Client _requestAccumulator.Reset(); //Write message id to accumulator, it should already be reset - Helpers.WriteMessageid(RequestBuffer, messageId); + Helpers.WriteMessageid(_requestAccumulator, messageId); } ///<inheritdoc/> @@ -99,24 +114,24 @@ namespace VNLib.Net.Messaging.FBM.Client Span<char> IFBMHeaderBuffer.GetSpan(int offset, int count) { //Get the character span - Span<char> span = MemoryMarshal.Cast<byte, char>(Handle.Memory.Span); + Span<char> span = MemoryMarshal.Cast<byte, char>(Handle.GetSpan()); return span.Slice(offset, count); } ///<inheritdoc/> [MethodImpl(MethodImplOptions.AggressiveInlining)] - Span<char> IFBMHeaderBuffer.GetSpan() => MemoryMarshal.Cast<byte, char>(Handle.Memory.Span); + Span<char> IFBMHeaderBuffer.GetSpan() => MemoryMarshal.Cast<byte, char>(Handle.GetSpan()); private sealed class BinaryRequestAccumulator : IDataAccumulator<byte> { private readonly int Size; - private readonly Memory<byte> Buffer; + private readonly FBMBuffer Buffer; - internal BinaryRequestAccumulator(Memory<byte> buffer) + internal BinaryRequestAccumulator(FBMBuffer buffer, int size) { Buffer = buffer; - Size = buffer.Length; + Size = size; } ///<inheritdoc/> @@ -126,52 +141,47 @@ namespace VNLib.Net.Messaging.FBM.Client public int RemainingSize => Size - AccumulatedSize; ///<inheritdoc/> - public Span<byte> Remaining => Buffer.Span.Slice(AccumulatedSize, RemainingSize); + public Span<byte> Remaining => Buffer.Handle.GetSpan().Slice(AccumulatedSize, RemainingSize); + ///<inheritdoc/> - public Span<byte> Accumulated => Buffer.Span[..AccumulatedSize]; + public Span<byte> Accumulated => Buffer.Handle.GetSpan()[..AccumulatedSize]; /// <summary> /// Gets the accumulated data as a memory segment /// </summary> - public Memory<byte> AccumulatedMemory => Buffer[..AccumulatedSize]; + public Memory<byte> AccumulatedMemory => Buffer.Handle.GetMemory()[..AccumulatedSize]; /// <summary> /// Gets the remaining buffer segment as a memory segment /// </summary> - public Memory<byte> RemainingMemory => Buffer.Slice(AccumulatedSize, RemainingSize); + public Memory<byte> RemainingMemory => Buffer.Handle.GetMemory().Slice(AccumulatedSize, RemainingSize); ///<inheritdoc/> public void Advance(int count) => AccumulatedSize += count; + ///<inheritdoc/> public void Reset() => AccumulatedSize = 0; } + /* + * A buffer writer that wraps the request accumulator to allow + * a finite amount of data to be written to the accumulator since + * it uses a fixed size internal buffer. + */ private sealed class BufferWriter : IBufferWriter<byte> { private readonly FBMBuffer Buffer; - public BufferWriter(FBMBuffer buffer) - { - Buffer = buffer; - } + public BufferWriter(FBMBuffer buffer) => Buffer = buffer; - public void Advance(int count) - { - //Advance the writer - Buffer.RequestBuffer.Advance(count); - } + ///<inheritdoc/> + public void Advance(int count) => Buffer._requestAccumulator.Advance(count); - public Memory<byte> GetMemory(int sizeHint = 0) - { - //Get the remaining memory segment - return Buffer._requestAccumulator.RemainingMemory; - } + ///<inheritdoc/> + public Memory<byte> GetMemory(int sizeHint = 0) => Buffer._requestAccumulator.RemainingMemory; - public Span<byte> GetSpan(int sizeHint = 0) - { - //Get the remaining data segment - return Buffer.RequestBuffer.Remaining; - } + ///<inheritdoc/> + public Span<byte> GetSpan(int sizeHint = 0) => Buffer._requestAccumulator.Remaining; } } } diff --git a/lib/Net.Messaging.FBM/src/Client/FBMClient.cs b/lib/Net.Messaging.FBM/src/Client/FBMClient.cs index 5184c38..c8319fa 100644 --- a/lib/Net.Messaging.FBM/src/Client/FBMClient.cs +++ b/lib/Net.Messaging.FBM/src/Client/FBMClient.cs @@ -24,7 +24,6 @@ using System; using System.IO; -using System.Buffers; using System.Threading; using System.Net.WebSockets; using System.Threading.Tasks; @@ -34,9 +33,12 @@ using System.Collections.Concurrent; using VNLib.Net.Http; using VNLib.Utils; using VNLib.Utils.IO; +using VNLib.Utils.Memory; +using VNLib.Utils.Memory.Caching; using VNLib.Utils.Logging; using VNLib.Utils.Extensions; -using VNLib.Utils.Memory.Caching; + +#pragma warning disable CA2007 // Consider calling ConfigureAwait on the awaited task namespace VNLib.Net.Messaging.FBM.Client { @@ -60,6 +62,8 @@ namespace VNLib.Net.Messaging.FBM.Client /// </summary> public const string REQ_MAX_MESS_QUERY_ARG = "mx"; + public const int MAX_STREAM_BUFFER_SIZE = 128 * 1024; + /// <summary> /// Raised when the websocket has been closed because an error occured. /// You may inspect the event args to determine the cause of the error. @@ -74,12 +78,14 @@ namespace VNLib.Net.Messaging.FBM.Client private readonly SemaphoreSlim SendLock; private readonly ConcurrentDictionary<int, FBMRequest> ActiveRequests; private readonly ObjectRental<FBMRequest> RequestRental; + private readonly FBMClientConfig _config; + private readonly byte[] _streamBuffer; /// <summary> /// The configuration for the current client /// </summary> - public FBMClientConfig Config { get; } - + public ref readonly FBMClientConfig Config => ref _config; + /// <summary> /// A handle that is reset when a connection has been successfully set, and is set /// when the connection exists @@ -103,9 +109,13 @@ namespace VNLib.Net.Messaging.FBM.Client ConnectionStatusHandle = new(true); ActiveRequests = new(Environment.ProcessorCount, 100); - Config = config; + _config = config; //Init the new client socket ClientSocket = new(config.RecvBufferSize, config.RecvBufferSize, config.KeepAliveInterval, config.SubProtocol); + + //Init the stream buffer + int maxStrmBufSize = Math.Min(config.MaxMessageSize, MAX_STREAM_BUFFER_SIZE); + _streamBuffer = new byte[maxStrmBufSize]; } private void Debug(string format, params string[] strings) @@ -127,7 +137,7 @@ namespace VNLib.Net.Messaging.FBM.Client /// Allocates and configures a new <see cref="FBMRequest"/> message object for use within the reusable store /// </summary> /// <returns>The configured <see cref="FBMRequest"/></returns> - protected virtual FBMRequest ReuseableRequestConstructor() => new(Config); + protected virtual FBMRequest ReuseableRequestConstructor() => new(in _config); /// <summary> /// Asynchronously opens a websocket connection with the specifed remote server @@ -180,10 +190,7 @@ namespace VNLib.Net.Messaging.FBM.Client /// <exception cref="ObjectDisposedException"></exception> /// <exception cref="InvalidOperationException"></exception> /// <exception cref="FBMInvalidRequestException"></exception> - public Task<FBMResponse> SendAsync(FBMRequest request, CancellationToken cancellationToken = default) - { - return SendAsync(request, Config.RequestTimeout, cancellationToken); - } + public Task<FBMResponse> SendAsync(FBMRequest request, CancellationToken cancellationToken = default) => SendAsync(request, _config.RequestTimeout, cancellationToken); /// <summary> /// Sends a <see cref="FBMRequest"/> to the connected server @@ -211,6 +218,8 @@ namespace VNLib.Net.Messaging.FBM.Client try { + FBMResponse response = new(); + //Get the request data segment ReadOnlyMemory<byte> requestData = request.GetRequestData(); @@ -224,22 +233,26 @@ namespace VNLib.Net.Messaging.FBM.Client } //wait for the response to be set - await request.Waiter.WaitAsync(timeout, cancellationToken).ConfigureAwait(true); + await request.Waiter.GetTask(timeout, cancellationToken).ConfigureAwait(true); Debug("Received {size} bytes for message {id}", request.Response?.Length ?? 0, request.MessageId); - return request.GetResponse(); + //Get the response data + request.GetResponse(ref response); + + return response; } catch { //Remove the request since packet was never sent ActiveRequests.Remove(request.MessageId, out _); - - //Cleanup waiter - request.Waiter.OnEndRequest(); - throw; } + finally + { + //Always cleanup waiter + request.Waiter.OnEndRequest(); + } } /// <summary> @@ -247,30 +260,28 @@ namespace VNLib.Net.Messaging.FBM.Client /// </summary> /// <param name="request">The request message to send to the server</param> /// <param name="payload">Data to stream to the server</param> - /// <param name="ct">The content type of the stream of data</param> + /// <param name="contentType">The content type of the stream of data</param> /// <param name="cancellationToken">A token to cancel the operation</param> /// <returns>A task that resolves when the data is sent and the resonse is received</returns> /// <exception cref="ArgumentException"></exception> /// <exception cref="ObjectDisposedException"></exception> /// <exception cref="InvalidOperationException"></exception> - public Task StreamDataAsync(FBMRequest request, Stream payload, ContentType ct, CancellationToken cancellationToken = default) - { - return StreamDataAsync(request, payload, ct, Config.RequestTimeout, cancellationToken); - } + public Task<FBMResponse> StreamDataAsync(FBMRequest request, Stream payload, ContentType contentType, CancellationToken cancellationToken = default) + => StreamDataAsync(request, payload, contentType, _config.RequestTimeout, cancellationToken); /// <summary> /// Streams arbitrary binary data to the server with the initial request message /// </summary> /// <param name="request">The request message to send to the server</param> /// <param name="payload">Data to stream to the server</param> - /// <param name="ct">The content type of the stream of data</param> + /// <param name="contentType">The content type of the stream of data</param> /// <param name="cancellationToken">A token to cancel the operation</param> /// <param name="timeout">A maxium wait timeout period. If -1 or 0 the timeout is disabled</param> /// <returns>A task that resolves when the data is sent and the resonse is received</returns> /// <exception cref="ArgumentException"></exception> /// <exception cref="ObjectDisposedException"></exception> /// <exception cref="InvalidOperationException"></exception> - public async Task StreamDataAsync(FBMRequest request, Stream payload, ContentType ct, TimeSpan timeout, CancellationToken cancellationToken = default) + public async Task<FBMResponse> StreamDataAsync(FBMRequest request, Stream payload, ContentType contentType, TimeSpan timeout, CancellationToken cancellationToken = default) { Check(); @@ -282,19 +293,15 @@ namespace VNLib.Net.Messaging.FBM.Client try { + FBMResponse response = new(); + //Get the request data segment ReadOnlyMemory<byte> requestData = request.GetRequestData(); Debug("Streaming {bytes} with id {id}", requestData.Length, request.MessageId); - //Write an empty body in the request - request.WriteBody(ReadOnlySpan<byte>.Empty, ct); - - //Calc buffer size - int bufSize = (int)Math.Clamp(payload.Length, Config.MessageBufferSize, Config.MaxMessageSize); - - //Alloc a streaming buffer - using IMemoryOwner<byte> buffer = Config.BufferHeap.DirectAlloc<byte>(bufSize); + //Write an empty body in the request so a content type header is writen + request.WriteBody(ReadOnlySpan<byte>.Empty, contentType); //Wait for send-lock using (SemSlimReleaser releaser = await SendLock.GetReleaserAsync(cancellationToken)) @@ -306,7 +313,7 @@ namespace VNLib.Net.Messaging.FBM.Client do { //Read data - int read = await payload.ReadAsync(buffer.Memory, cancellationToken); + int read = await payload.ReadAsync(_streamBuffer, cancellationToken); if (read == 0) { @@ -315,29 +322,32 @@ namespace VNLib.Net.Messaging.FBM.Client } //write message to socket, if the read data was smaller than the buffer, we can send the last packet - await ClientSocket.SendAsync(buffer.Memory[..read], WebSocketMessageType.Binary, read < bufSize, cancellationToken); + await ClientSocket.SendAsync(_streamBuffer[..read], WebSocketMessageType.Binary, read < _streamBuffer.Length, cancellationToken); } while (true); } //wait for the server to respond - await request.Waiter.WaitAsync(timeout, cancellationToken).ConfigureAwait(true); + await request.Waiter.GetTask(timeout, cancellationToken).ConfigureAwait(true); Debug("Response recieved {size} bytes for message {id}", request.Response?.Length ?? 0, request.MessageId); + + request.GetResponse(ref response); + return response; } catch { //Remove the request since packet was never sent or cancelled _ = ActiveRequests.TryRemove(request.MessageId, out _); - - //Cleanup request waiter - request.Waiter.OnEndRequest(); - throw; } + finally + { + //Always cleanup waiter + request.Waiter.OnEndRequest(); + } } - /// <summary> /// Closes the underlying <see cref="WebSocket"/> and cancels all pending operations /// </summary> @@ -399,14 +409,20 @@ namespace VNLib.Net.Messaging.FBM.Client Debug("Begining receive loop"); //Alloc recv buffer - IMemoryOwner<byte> recvBuffer = Config.BufferHeap.DirectAlloc<byte>(Config.RecvBufferSize); + IFBMMemoryHandle recvBuffer = _config.MemoryManager.InitHandle(); + _config.MemoryManager.AllocBuffer(recvBuffer, _config.RecvBufferSize); try { + if(!_config.MemoryManager.TryGetHeap(out IUnmangedHeap? heap)) + { + throw new NotSupportedException("The memory manager must support using IUnmanagedHeaps"); + } + //Recv event loop while (true) { //Listen for incoming packets with the intial data buffer - ValueWebSocketReceiveResult result = await socket.ReceiveAsync(recvBuffer.Memory, CancellationToken.None); + ValueWebSocketReceiveResult result = await socket.ReceiveAsync(recvBuffer.GetMemory(), CancellationToken.None); //If the message is a close message, its time to exit if (result.MessageType == WebSocketMessageType.Close) @@ -422,19 +438,19 @@ namespace VNLib.Net.Messaging.FBM.Client } //Alloc data buffer and write initial data - VnMemoryStream responseBuffer = new(Config.BufferHeap); + VnMemoryStream responseBuffer = new(heap); //Copy initial data - responseBuffer.Write(recvBuffer.Memory.Span[..result.Count]); + responseBuffer.Write(recvBuffer.GetSpan()[..result.Count]); //Receive packets until the EOF is reached while (!result.EndOfMessage) { //recive more data - result = await socket.ReceiveAsync(recvBuffer.Memory, CancellationToken.None); + result = await socket.ReceiveAsync(recvBuffer.GetMemory(), CancellationToken.None); //Make sure the buffer is not too large - if ((responseBuffer.Length + result.Count) > Config.MaxMessageSize) + if ((responseBuffer.Length + result.Count) > _config.MaxMessageSize) { //Dispose the buffer before exiting responseBuffer.Dispose(); @@ -443,7 +459,7 @@ namespace VNLib.Net.Messaging.FBM.Client } //Copy continuous data - responseBuffer.Write(recvBuffer.Memory.Span[..result.Count]); + responseBuffer.Write(recvBuffer.GetSpan()[..result.Count]); } //Reset the buffer stream position @@ -472,7 +488,7 @@ namespace VNLib.Net.Messaging.FBM.Client finally { //Dispose the recv buffer - recvBuffer.Dispose(); + _config.MemoryManager.FreeBuffer(recvBuffer); //Cleanup the socket when exiting ClientSocket.Cleanup(); //Set status handle as unset @@ -522,7 +538,12 @@ namespace VNLib.Net.Messaging.FBM.Client if (ActiveRequests.TryRemove(messageId, out FBMRequest? request)) { //Set the new response message - request.Waiter.Complete(responseMessage); + if (!request.Waiter.Complete(responseMessage)) + { + //Falied to complete, dispose the message data + responseMessage.Dispose(); + Debug("Failed to transition waiting request {id}. Message was dropped", messageId, 0); + } } else { diff --git a/lib/Net.Messaging.FBM/src/Client/FBMClientConfig.cs b/lib/Net.Messaging.FBM/src/Client/FBMClientConfig.cs index 30e9a95..735a0a8 100644 --- a/lib/Net.Messaging.FBM/src/Client/FBMClientConfig.cs +++ b/lib/Net.Messaging.FBM/src/Client/FBMClientConfig.cs @@ -25,7 +25,6 @@ using System; using System.Text; -using VNLib.Utils.Memory; using VNLib.Utils.Logging; namespace VNLib.Net.Messaging.FBM.Client @@ -59,7 +58,7 @@ namespace VNLib.Net.Messaging.FBM.Client /// <summary> /// The heap to allocate internal (and message) buffers from /// </summary> - public readonly IUnmangedHeap BufferHeap { get; init; } + public readonly IFBMMemoryManager MemoryManager { get; init; } /// <summary> /// The websocket keepalive interval to use (leaving this property default disables keepalives) /// </summary> diff --git a/lib/Net.Messaging.FBM/src/Client/FBMRequest.cs b/lib/Net.Messaging.FBM/src/Client/FBMRequest.cs index c4fb493..418a9ec 100644 --- a/lib/Net.Messaging.FBM/src/Client/FBMRequest.cs +++ b/lib/Net.Messaging.FBM/src/Client/FBMRequest.cs @@ -26,6 +26,7 @@ using System; using System.Text; using System.Buffers; using System.Threading; +using System.Diagnostics; using System.Threading.Tasks; using System.Collections.Generic; using System.Runtime.CompilerServices; @@ -37,7 +38,6 @@ using VNLib.Utils.Memory; using VNLib.Utils.Extensions; using VNLib.Utils.Memory.Caching; - namespace VNLib.Net.Messaging.FBM.Client { /// <summary> @@ -97,29 +97,29 @@ namespace VNLib.Net.Messaging.FBM.Client /// <param name="messageId">The custom message id</param> /// <param name="config">The fbm client config storing required config variables</param> public FBMRequest(int messageId, in FBMClientConfig config) - :this(messageId, config.BufferHeap, config.MessageBufferSize, config.HeaderEncoding) + :this(messageId, config.MemoryManager, config.MessageBufferSize, config.HeaderEncoding) { } /// <summary> /// Initializes a new <see cref="FBMRequest"/> with the sepcified message buffer size and a custom MessageId /// </summary> /// <param name="messageId">The custom message id</param> - /// <param name="heap">The heap to allocate the internal buffer from</param> + /// <param name="manager">The memory manager used to allocate the internal buffers</param> /// <param name="bufferSize">The size of the internal buffer</param> /// <param name="headerEncoding">The encoding instance used for header character encoding</param> - public FBMRequest(int messageId, IUnmangedHeap heap, int bufferSize, Encoding headerEncoding) + public FBMRequest(int messageId, IFBMMemoryManager manager, int bufferSize, Encoding headerEncoding) { MessageId = messageId; - HeaderEncoding = headerEncoding; + HeaderEncoding = headerEncoding ?? throw new ArgumentNullException(nameof(headerEncoding)); + _ = manager ?? throw new ArgumentNullException(nameof(manager)); //Configure waiter Waiter = new FBMMessageWaiter(this); - - //Alloc the buffer as a memory owner so a memory buffer can be used - IMemoryOwner<byte> HeapBuffer = heap.DirectAlloc<byte>(bufferSize); - Buffer = new(HeapBuffer); + + Buffer = new(manager, bufferSize); //Prepare the message incase the request is fresh + Buffer.Prepare(); Reset(); } @@ -152,19 +152,13 @@ namespace VNLib.Net.Messaging.FBM.Client /// The request message packet, this may cause side effects /// </summary> [MethodImpl(MethodImplOptions.AggressiveInlining)] - public ReadOnlyMemory<byte> GetRequestData() - { - return Buffer.RequestData; - } + public ReadOnlyMemory<byte> GetRequestData() => Buffer.RequestData; /// <summary> /// Resets the internal buffer and allows for writing a new message with /// the same message-id /// </summary> - public void Reset() - { - Buffer.Reset(MessageId); - } + public void Reset() => Buffer.Reset(MessageId); ///<inheritdoc/> protected override void Free() @@ -175,7 +169,12 @@ namespace VNLib.Net.Messaging.FBM.Client (Waiter as FBMMessageWaiter)!.Dispose(); } - void IReusable.Prepare() => Reset(); + void IReusable.Prepare() + { + //MUST BE CALLED FIRST! + Buffer.Prepare(); + Reset(); + } bool IReusable.Release() { @@ -186,6 +185,9 @@ namespace VNLib.Net.Messaging.FBM.Client Response?.Dispose(); Response = null; + //Free buffer + Buffer.Release(); + return true; } @@ -195,7 +197,7 @@ namespace VNLib.Net.Messaging.FBM.Client /// Gets the response of the sent message /// </summary> /// <returns>The response message for the current request</returns> - internal FBMResponse GetResponse() + internal void GetResponse(ref FBMResponse response) { if (Response != null) { @@ -215,11 +217,7 @@ namespace VNLib.Net.Messaging.FBM.Client HeaderParseError statusFlags = Helpers.ParseHeaders(Response, Buffer, ResponseHeaderList, HeaderEncoding); //return response structure - return new(Response, statusFlags, ResponseHeaderList, OnResponseDisposed); - } - else - { - return new(); + response = new(Response, statusFlags, ResponseHeaderList, OnResponseDisposed); } } @@ -272,12 +270,13 @@ namespace VNLib.Net.Messaging.FBM.Client #endregion #region waiter - private sealed class FBMMessageWaiter : IFBMMessageWaiter, IDisposable + private sealed class FBMMessageWaiter : IFBMMessageWaiter, IDisposable, IThreadPoolWorkItem { private readonly Timer _timer; private readonly FBMRequest _request; private TaskCompletionSource? _tcs; + private CancellationTokenRegistration _token; public FBMMessageWaiter(FBMRequest request) { @@ -298,79 +297,89 @@ namespace VNLib.Net.Messaging.FBM.Client public void OnEndRequest() { //Cleanup tcs ref - _ = Interlocked.Exchange(ref _tcs, null); + _tcs = null; - //Stop timer if set + //Always stop timer if set _timer.Stop(); + + //Cleanup cancellation token + _token.Dispose(); } ///<inheritdoc/> - public void Complete(VnMemoryStream ms) + public bool Complete(VnMemoryStream ms) { - //Read the current state of the tcs TaskCompletionSource? tcs = _tcs; - if (tcs == null) + //Work is done/cancelled + if (tcs != null && tcs.Task.IsCompleted) { - //Work is done/cancelled, dispose the ms and leave - ms.Dispose(); + return false; } //Store response _request.Response = ms; - //Transition to completed state in background thread - static void OnTpCallback(object? state) - { - _ = (state as TaskCompletionSource)!.TrySetResult(); - } - /* * The calling thread may be a TP thread proccessing an async event loop. * We do not want to block this worker thread. */ - ThreadPool.UnsafeQueueUserWorkItem(OnTpCallback, tcs); + return ThreadPool.UnsafeQueueUserWorkItem(this, true); } + /* + * Called when scheduled on the TP thread pool + */ ///<inheritdoc/> - public async Task WaitAsync(TimeSpan timeout, CancellationToken cancellation) + public void Execute() => _tcs?.TrySetResult(); + + + ///<inheritdoc/> + public Task GetTask(TimeSpan timeout, CancellationToken cancellation) { - if (timeout.Ticks > 0) - { - //Restart timer if timeout is configured - _timer.Restart(timeout); - } + TaskCompletionSource? tcs = _tcs; - //Confim the token may be cancelled - if (cancellation.CanBeCanceled) - { - //Register cancellation - using CancellationTokenRegistration reg = cancellation.Register(OnCancelled, this, false); + Debug.Assert(tcs != null, "A call to GetTask was made outside of the request flow, the TaskCompletionSource was null"); - //await task that may be canclled - await _tcs.Task.ConfigureAwait(false); - } - else + /* + * Get task will only be called after the message has been sent. + * The Complete method may have already scheduled a completion by + * the time this method is called, so we may avoid setting up the + * timer and cancellation if possible. Also since this mthod is + * called from the request side, we know the tcs cannot be null + */ + + if (!tcs.Task.IsCompleted) { - //await the task directly - await _tcs.Task.ConfigureAwait(false); + if (timeout.Ticks > 0) + { + //Restart timer if timeout is configured + _timer.Restart(timeout); + } + + if (cancellation.CanBeCanceled) + { + //Register cancellation + _token = cancellation.Register(OnCancelled, this); + } } + + return tcs.Task; } ///<inheritdoc/> public void ManualCancellation() => OnCancelled(this); - private void OnCancelled(object? state) - { - //Set cancelled state if exists - _ = _tcs?.TrySetCanceled(); - } + //Set cancelled state if exists, the task may have already completed + private void OnCancelled(object? state) => _tcs?.TrySetCanceled(); ///<inheritdoc/> public void Dispose() { _timer.Dispose(); + _token.Dispose(); } + } #endregion diff --git a/lib/Net.Messaging.FBM/src/Client/FBMResponse.cs b/lib/Net.Messaging.FBM/src/Client/FBMResponse.cs index 6f8fec4..f1148f1 100644 --- a/lib/Net.Messaging.FBM/src/Client/FBMResponse.cs +++ b/lib/Net.Messaging.FBM/src/Client/FBMResponse.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2022 Vaughn Nugent +* Copyright (c) 2023 Vaughn Nugent * * Library: VNLib * Package: VNLib.Net.Messaging.FBM @@ -90,17 +90,16 @@ namespace VNLib.Net.Messaging.FBM.Client /// <summary> /// Releases any resources associated with the response message /// </summary> - public void Dispose() => _onDispose?.Invoke(); + public readonly void Dispose() => _onDispose?.Invoke(); ///<inheritdoc/> public override bool Equals(object? obj) => obj is FBMResponse response && Equals(response); ///<inheritdoc/> + public bool Equals(FBMResponse other) => IsSet && other.IsSet && ReferenceEquals(MessagePacket, other.MessagePacket); + ///<inheritdoc/> public override int GetHashCode() => IsSet ? MessagePacket!.GetHashCode() : 0; ///<inheritdoc/> public static bool operator ==(FBMResponse left, FBMResponse right) => left.Equals(right); ///<inheritdoc/> public static bool operator !=(FBMResponse left, FBMResponse right) => !(left == right); - ///<inheritdoc/> - public bool Equals(FBMResponse other) => IsSet && other.IsSet && MessagePacket == other.MessagePacket; - } } diff --git a/lib/Net.Messaging.FBM/src/Client/IFBMMessageWaiter.cs b/lib/Net.Messaging.FBM/src/Client/IFBMMessageWaiter.cs index 5000711..cc8e1c4 100644 --- a/lib/Net.Messaging.FBM/src/Client/IFBMMessageWaiter.cs +++ b/lib/Net.Messaging.FBM/src/Client/IFBMMessageWaiter.cs @@ -48,9 +48,9 @@ namespace VNLib.Net.Messaging.FBM.Client /// or a timeout /// </summary> /// <param name="timeout">The maxium time to wait for the server to respond (may be default/0)</param> - /// <param name="cancellation">The cancellation token to observe</param> + /// <param name="cancellation">A token to cancel the wait task</param> /// <returns>A task that completes when the server responds</returns> - Task WaitAsync(TimeSpan timeout, CancellationToken cancellation); + Task GetTask(TimeSpan timeout, CancellationToken cancellation); /// <summary> /// Called by the client to cleanup the waiter when the request is completed @@ -63,7 +63,7 @@ namespace VNLib.Net.Messaging.FBM.Client /// Set by the client when the response has been successfully received by the client /// </summary> /// <param name="ms">The response data to pass to the response</param> - void Complete(VnMemoryStream ms); + bool Complete(VnMemoryStream ms); /// <summary> /// Called to invoke a manual cancellation of a request waiter. This method should diff --git a/lib/Net.Messaging.FBM/src/Client/ManagedClientWebSocket.cs b/lib/Net.Messaging.FBM/src/Client/ManagedClientWebSocket.cs index d0352d3..fc2e417 100644 --- a/lib/Net.Messaging.FBM/src/Client/ManagedClientWebSocket.cs +++ b/lib/Net.Messaging.FBM/src/Client/ManagedClientWebSocket.cs @@ -43,7 +43,7 @@ namespace VNLib.Net.Messaging.FBM.Client private readonly int TxBufferSize; private readonly int RxBufferSize; private readonly TimeSpan KeepAliveInterval; - private readonly VnTempBuffer<byte> _dataBuffer; + private readonly ArrayPoolBuffer<byte> _dataBuffer; private readonly string? _subProtocol; /// <summary> @@ -95,7 +95,7 @@ namespace VNLib.Net.Messaging.FBM.Client try { //Set buffer - _socket.Options.SetBuffer(RxBufferSize, TxBufferSize, _dataBuffer); + _socket.Options.SetBuffer(RxBufferSize, TxBufferSize, _dataBuffer.AsArraySegment()); //Set remaining stored options _socket.Options.ClientCertificates = Certificates; _socket.Options.KeepAliveInterval = KeepAliveInterval; diff --git a/lib/Net.Messaging.FBM/src/FBMMessageHeader.cs b/lib/Net.Messaging.FBM/src/FBMMessageHeader.cs index d1f4f1c..180ce7d 100644 --- a/lib/Net.Messaging.FBM/src/FBMMessageHeader.cs +++ b/lib/Net.Messaging.FBM/src/FBMMessageHeader.cs @@ -87,6 +87,7 @@ namespace VNLib.Net.Messaging.FBM ///<inheritdoc/> public static bool operator ==(FBMMessageHeader left, FBMMessageHeader right) => left.Equals(right); + ///<inheritdoc/> public static bool operator !=(FBMMessageHeader left, FBMMessageHeader right) => !(left == right); diff --git a/lib/Net.Messaging.FBM/src/FallbackFBMMemoryManager.cs b/lib/Net.Messaging.FBM/src/FallbackFBMMemoryManager.cs new file mode 100644 index 0000000..260cbd6 --- /dev/null +++ b/lib/Net.Messaging.FBM/src/FallbackFBMMemoryManager.cs @@ -0,0 +1,140 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Net.Messaging.FBM +* File: FallbackFBMMemoryManager.cs +* +* FallbackFBMMemoryManager.cs is part of VNLib.Net.Messaging.FBM which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Buffers; +using System.Diagnostics.CodeAnalysis; + +using VNLib.Utils.Memory; +using VNLib.Utils.Extensions; + +namespace VNLib.Net.Messaging.FBM +{ + /// <summary> + /// A default/fallback implementation of a <see cref="IFBMMemoryManager"/> that + /// uses an <see cref="IUnmangedHeap"/> to allocate buffers from + /// </summary> + public sealed class FallbackFBMMemoryManager : IFBMMemoryManager + { + private readonly IUnmangedHeap _heap; + + /// <summary> + /// Initializes a new instance of <see cref="FallbackFBMMemoryManager"/> allocationg + /// memory from the specified <see cref="IUnmangedHeap"/> + /// </summary> + /// <param name="heap">The heap to allocate memory from</param> + /// <exception cref="ArgumentNullException"></exception> + public FallbackFBMMemoryManager(IUnmangedHeap heap) => _heap = heap ?? throw new ArgumentNullException(nameof(heap)); + + ///<inheritdoc/> + public void AllocBuffer(IFBMSpanOnlyMemoryHandle state, int size) + { + _ = state ?? throw new ArgumentNullException(nameof(state)); + (state as IFBMBufferHolder)!.AllocBuffer(size); + } + + ///<inheritdoc/> + public void FreeBuffer(IFBMSpanOnlyMemoryHandle state) + { + _ = state ?? throw new ArgumentNullException(nameof(state)); + (state as IFBMBufferHolder)!.FreeBuffer(); + } + + ///<inheritdoc/> + public IFBMMemoryHandle InitHandle() => new FBMMemHandle(_heap); + + ///<inheritdoc/> + public IFBMSpanOnlyMemoryHandle InitSpanOnly() => new FBMSpanOnlyMemHandle(_heap); + + ///<inheritdoc/> + public bool TryGetHeap([NotNullWhen(true)] out IUnmangedHeap? heap) + { + heap = _heap; + return true; + } + + interface IFBMBufferHolder + { + void AllocBuffer(int size); + + void FreeBuffer(); + } + + private sealed record class FBMMemHandle(IUnmangedHeap Heap) : IFBMMemoryHandle, IFBMBufferHolder + { + private MemoryHandle<byte>? _handle; + private IMemoryOwner<byte>? _memHandle; + + ///<inheritdoc/> + public Memory<byte> GetMemory() + { + _ = _memHandle ?? throw new InvalidOperationException("Buffer has not allocated"); + return _memHandle.Memory; + } + + ///<inheritdoc/> + public Span<byte> GetSpan() + { + _ = _handle ?? throw new InvalidOperationException("Buffer has not allocated"); + return _handle.Span; + } + + ///<inheritdoc/> + public void AllocBuffer(int size) + { + //Alloc buffer and memory manager to wrap it + _handle = Heap.Alloc<byte>(size, false); + _memHandle = _handle.ToMemoryManager(false); + } + + ///<inheritdoc/> + public void FreeBuffer() + { + _handle?.Dispose(); + _memHandle?.Dispose(); + + _handle = null; + _memHandle = null; + } + } + + private sealed record class FBMSpanOnlyMemHandle(IUnmangedHeap Heap) : IFBMSpanOnlyMemoryHandle, IFBMBufferHolder + { + private MemoryHandle<byte>? _handle; + + ///<inheritdoc/> + public void AllocBuffer(int size) => _handle = Heap.Alloc<byte>(size, false); + + ///<inheritdoc/> + public void FreeBuffer() => _handle?.Dispose(); + + ///<inheritdoc/> + public Span<byte> GetSpan() + { + _ = _handle ?? throw new InvalidOperationException("Buffer has not allocated"); + return _handle.Span; + } + } + } +} diff --git a/lib/Net.Messaging.FBM/src/IFBMMemoryHandle.cs b/lib/Net.Messaging.FBM/src/IFBMMemoryHandle.cs new file mode 100644 index 0000000..97a41b4 --- /dev/null +++ b/lib/Net.Messaging.FBM/src/IFBMMemoryHandle.cs @@ -0,0 +1,43 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Net.Messaging.FBM +* File: IFBMMemoryHandle.cs +* +* IFBMMemoryHandle.cs is part of VNLib.Net.Messaging.FBM which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; + + +namespace VNLib.Net.Messaging.FBM +{ + /// <summary> + /// Represents a configurable handle to a memory block + /// </summary> + public interface IFBMMemoryHandle : IFBMSpanOnlyMemoryHandle + { + /// <summary> + /// Gets the block as a <see cref="Memory{T}"/> + /// structure + /// </summary> + /// <returns>The memory structure wrapping the underlying memory block</returns> + Memory<byte> GetMemory(); + } + +} diff --git a/lib/Net.Messaging.FBM/src/IFBMMemoryManager.cs b/lib/Net.Messaging.FBM/src/IFBMMemoryManager.cs new file mode 100644 index 0000000..9342993 --- /dev/null +++ b/lib/Net.Messaging.FBM/src/IFBMMemoryManager.cs @@ -0,0 +1,71 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Net.Messaging.FBM +* File: IFBMMemoryManager.cs +* +* IFBMMemoryManager.cs is part of VNLib.Net.Messaging.FBM which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System.Diagnostics.CodeAnalysis; + +using VNLib.Utils.Memory; + +namespace VNLib.Net.Messaging.FBM +{ + /// <summary> + /// Manages memory blocks required by the FBM server messages + /// </summary> + public interface IFBMMemoryManager + { + /// <summary> + /// Initializes a new <see cref="IFBMMemoryHandle"/> + /// </summary> + /// <returns>The initialized handle</returns> + IFBMMemoryHandle InitHandle(); + + /// <summary> + /// Initializes a new <see cref="IFBMSpanOnlyMemoryHandle"/> + /// </summary> + /// <returns>The initialized handle</returns> + IFBMSpanOnlyMemoryHandle InitSpanOnly(); + + /// <summary> + /// Allocates the <see cref="IFBMMemoryHandle"/> internal buffer + /// for use + /// </summary> + /// <param name="state">The memory handle to allocate the buffer for</param> + /// <param name="size">The size of the buffer required</param> + void AllocBuffer(IFBMSpanOnlyMemoryHandle state, int size); + + /// <summary> + /// Frees the <see cref="IFBMSpanOnlyMemoryHandle"/> internal buffer + /// </summary> + /// <param name="state">The buffer handle holding the memory to free</param> + void FreeBuffer(IFBMSpanOnlyMemoryHandle state); + + /// <summary> + /// Tries to get the internal <see cref="IUnmangedHeap"/> to allocate internal + /// buffers from + /// </summary> + /// <param name="heap">The internal heap</param> + /// <returns>A value that indicates if a backing heap is supported and can be recovered</returns> + bool TryGetHeap([NotNullWhen(true)]out IUnmangedHeap? heap); + } + +} diff --git a/lib/Net.Messaging.FBM/src/Client/IFBMMessage.cs b/lib/Net.Messaging.FBM/src/IFBMMessage.cs index 18f19ec..dba605d 100644 --- a/lib/Net.Messaging.FBM/src/Client/IFBMMessage.cs +++ b/lib/Net.Messaging.FBM/src/IFBMMessage.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2022 Vaughn Nugent +* Copyright (c) 2023 Vaughn Nugent * * Library: VNLib * Package: VNLib.Net.Messaging.FBM @@ -26,7 +26,7 @@ using System; using VNLib.Net.Http; -namespace VNLib.Net.Messaging.FBM.Client +namespace VNLib.Net.Messaging.FBM { /// <summary> /// Represents basic Fixed Buffer Message protocol operations diff --git a/lib/Net.Messaging.FBM/src/IFBMSpanOnlyMemoryHandle.cs b/lib/Net.Messaging.FBM/src/IFBMSpanOnlyMemoryHandle.cs new file mode 100644 index 0000000..0078357 --- /dev/null +++ b/lib/Net.Messaging.FBM/src/IFBMSpanOnlyMemoryHandle.cs @@ -0,0 +1,42 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Net.Messaging.FBM +* File: IFBMSpanOnlyMemoryHandle.cs +* +* IFBMSpanOnlyMemoryHandle.cs is part of VNLib.Net.Messaging.FBM which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; + + +namespace VNLib.Net.Messaging.FBM +{ + /// <summary> + /// Represents a configurable handle to a memory block + /// </summary> + public interface IFBMSpanOnlyMemoryHandle + { + /// <summary> + /// Gets the block as a <see cref="Span{T}"/> + /// </summary> + /// <returns>The memory block as a span</returns> + Span<byte> GetSpan(); + } + +} diff --git a/lib/Net.Messaging.FBM/src/Server/FBMContext.cs b/lib/Net.Messaging.FBM/src/Server/FBMContext.cs index 6d5f3bd..f2a2fea 100644 --- a/lib/Net.Messaging.FBM/src/Server/FBMContext.cs +++ b/lib/Net.Messaging.FBM/src/Server/FBMContext.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2022 Vaughn Nugent +* Copyright (c) 2023 Vaughn Nugent * * Library: VNLib * Package: VNLib.Net.Messaging.FBM @@ -55,10 +55,11 @@ namespace VNLib.Net.Messaging.FBM.Server /// <param name="requestHeaderBufferSize">The size in characters of the request header buffer</param> /// <param name="responseBufferSize">The size in characters of the response header buffer</param> /// <param name="headerEncoding">The message header encoding instance</param> - public FBMContext(int requestHeaderBufferSize, int responseBufferSize, Encoding headerEncoding) + /// <param name="manager">The context memory manager</param> + public FBMContext(int requestHeaderBufferSize, int responseBufferSize, Encoding headerEncoding, IFBMMemoryManager manager) { - _request = Request = new(requestHeaderBufferSize); - _response = Response = new(responseBufferSize, headerEncoding); + _request = Request = new(requestHeaderBufferSize, manager); + _response = Response = new(responseBufferSize, headerEncoding, manager); _headerEncoding = headerEncoding; } diff --git a/lib/Net.Messaging.FBM/src/Server/FBMListener.cs b/lib/Net.Messaging.FBM/src/Server/FBMListener.cs index 46ee160..30fa1ac 100644 --- a/lib/Net.Messaging.FBM/src/Server/FBMListener.cs +++ b/lib/Net.Messaging.FBM/src/Server/FBMListener.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2022 Vaughn Nugent +* Copyright (c) 2023 Vaughn Nugent * * Library: VNLib * Package: VNLib.Net.Messaging.FBM @@ -24,7 +24,6 @@ using System; using System.IO; -using System.Buffers; using System.Threading; using System.Net.WebSockets; using System.Threading.Tasks; @@ -32,7 +31,6 @@ using System.Threading.Tasks; using VNLib.Utils.IO; using VNLib.Utils.Async; using VNLib.Utils.Memory; -using VNLib.Utils.Extensions; using VNLib.Utils.Memory.Caching; using VNLib.Plugins.Essentials; @@ -40,16 +38,6 @@ namespace VNLib.Net.Messaging.FBM.Server { /// <summary> - /// Method delegate for processing FBM messages from an <see cref="FBMListener"/> - /// when messages are received - /// </summary> - /// <param name="context">The message/connection context</param> - /// <param name="userState">The state parameter passed on client connected</param> - /// <param name="cancellationToken">A token that reflects the state of the listener</param> - /// <returns>A <see cref="Task"/> that resolves when processing is complete</returns> - public delegate Task RequestHandler(FBMContext context, object? userState, CancellationToken cancellationToken); - - /// <summary> /// A FBM protocol listener. Listens for messages on a <see cref="WebSocketSession"/> /// and raises events on requests. /// </summary> @@ -58,22 +46,16 @@ namespace VNLib.Net.Messaging.FBM.Server public const int SEND_SEMAPHORE_TIMEOUT_MS = 10 * 1000; - private readonly IUnmangedHeap Heap; - - /// <summary> - /// Raised when a response processing error occured - /// </summary> - public event EventHandler<Exception>? OnProcessError; + private readonly IFBMMemoryManager MemoryManger; /// <summary> /// Creates a new <see cref="FBMListener"/> instance ready for /// processing connections /// </summary> /// <param name="heap">The heap to alloc buffers from</param> - public FBMListener(IUnmangedHeap heap) - { - Heap = heap; - } + /// <exception cref="ArgumentNullException"></exception> + public FBMListener(IFBMMemoryManager heap) => MemoryManger = heap ?? throw new ArgumentNullException(nameof(heap)); + #pragma warning disable CA2007 // Consider calling ConfigureAwait on the awaited task /// <summary> @@ -83,31 +65,38 @@ namespace VNLib.Net.Messaging.FBM.Server /// <param name="wss">The <see cref="WebSocketSession"/> to receive messages on</param> /// <param name="handler">The callback method to handle incoming requests</param> /// <param name="args">The arguments used to configured this listening session</param> - /// <param name="userState">A state parameter</param> /// <returns>A <see cref="Task"/> that completes when the connection closes</returns> - public async Task ListenAsync(WebSocketSession wss, RequestHandler handler, FBMListenerSessionParams args, object? userState) + public async Task ListenAsync(WebSocketSession wss, IFBMServerMessageHandler handler, FBMListenerSessionParams args) { _ = wss ?? throw new ArgumentNullException(nameof(wss)); _ = handler ?? throw new ArgumentNullException(nameof(handler)); - ListeningSession session = new(wss, handler, in args, userState); - - //Alloc a recieve buffer - using IMemoryOwner<byte> recvBuffer = Heap.DirectAlloc<byte>(args.RecvBufferSize); + ListeningSession session = new(wss, handler, in args, MemoryManger); //Init new queue for dispatching work AsyncQueue<VnMemoryStream> workQueue = new(true, true); //Start a task to process the queue Task queueWorker = QueueWorkerDoWork(workQueue, session); - + + //Alloc buffer + IFBMMemoryHandle memHandle = MemoryManger.InitHandle(); + MemoryManger.AllocBuffer(memHandle, args.RecvBufferSize); + try { + if(!MemoryManger.TryGetHeap(out IUnmangedHeap? heap)) + { + throw new NotSupportedException("The memory manager must export an unmanaged heap"); + } + + Memory<byte> recvBuffer = memHandle.GetMemory(); + //Listen for incoming messages while (true) { //Receive a message - ValueWebSocketReceiveResult result = await wss.ReceiveAsync(recvBuffer.Memory); + ValueWebSocketReceiveResult result = await wss.ReceiveAsync(recvBuffer); //If a close message has been received, we can gracefully exit if (result.MessageType == WebSocketMessageType.Close) { @@ -118,13 +107,13 @@ namespace VNLib.Net.Messaging.FBM.Server } //create buffer for storing data, pre alloc with initial data - VnMemoryStream request = new(Heap, recvBuffer.Memory[..result.Count]); + VnMemoryStream request = new(heap, recvBuffer[..result.Count]); //Streaming read while (!result.EndOfMessage) { //Read more data - result = await wss.ReceiveAsync(recvBuffer.Memory); + result = await wss.ReceiveAsync(recvBuffer); //Make sure the request is small enough to buffer if (request.Length + result.Count > args.MaxMessageSize) { @@ -135,8 +124,9 @@ namespace VNLib.Net.Messaging.FBM.Server //break listen loop goto Exit; } + //write to buffer - request.Write(recvBuffer.Memory.Span[..result.Count]); + request.Write(memHandle.GetSpan()[..result.Count]); } //Make sure data is available if (request.Length == 0) @@ -195,14 +185,19 @@ namespace VNLib.Net.Messaging.FBM.Server if ((context.Request.ParseStatus & HeaderParseError.InvalidId) > 0) { - OnProcessError?.Invoke(this, new FBMException($"Invalid messageid {context.Request.MessageId}, message length {data.Length}")); - return; + Exception cause = new FBMException($"Invalid messageid {context.Request.MessageId}, message length {data.Length}"); + _ = session.Handler.OnInvalidMessage(context, cause); + return; //Cannot continue on invalid message id } //Check parse status flags if ((context.Request.ParseStatus & HeaderParseError.HeaderOutOfMem) > 0) { - OnProcessError?.Invoke(this, new FBMException("Packet received with not enough space to store headers")); + Exception cause = new FBMException("Packet received with not enough space to store headers"); + if(!session.Handler.OnInvalidMessage(context, cause)) + { + return; + } } //Determine if request is an out-of-band message else if (context.Request.MessageId == Helpers.CONTROL_FRAME_MID) @@ -213,18 +208,15 @@ namespace VNLib.Net.Messaging.FBM.Server else { //Invoke normal message handler - await session.OnRecieved.Invoke(context, session.UserState, session.CancellationToken); + await session.Handler.HandleMessage(context, session.CancellationToken); } - //Get response data - - await using IAsyncMessageReader messageEnumerator = await context.Response.GetResponseDataAsync(session.CancellationToken); - + //Get response data reader + await using IAsyncMessageReader messageEnumerator = context.Response.GetResponseData(); //Load inital segment if (await messageEnumerator.MoveNextAsync() && !session.CancellationToken.IsCancellationRequested) - { - ValueTask sendTask; + { //Syncrhonize access to send data because we may need to stream data to the client await session.ResponseLock.WaitAsync(SEND_SEMAPHORE_TIMEOUT_MS); @@ -233,10 +225,8 @@ namespace VNLib.Net.Messaging.FBM.Server { do { - bool eof = !messageEnumerator.DataRemaining; - - //Send first segment - sendTask = session.Socket.SendAsync(messageEnumerator.Current, WebSocketMessageType.Binary, eof); + //Send current segment + await session.Socket.SendAsync(messageEnumerator.Current, WebSocketMessageType.Binary, !messageEnumerator.DataRemaining); /* * WARNING! @@ -250,9 +240,6 @@ namespace VNLib.Net.Messaging.FBM.Server { break; } - - //Await previous send - await sendTask; } while (true); } @@ -261,15 +248,13 @@ namespace VNLib.Net.Messaging.FBM.Server //release semaphore session.ResponseLock.Release(); } - - await sendTask; } //No data to send } catch (Exception ex) { - OnProcessError?.Invoke(this, ex); + session.Handler.OnProcessError(ex); } finally { @@ -295,25 +280,23 @@ namespace VNLib.Net.Messaging.FBM.Server private readonly CancellationTokenSource Cancellation; private readonly CancellationTokenRegistration Registration; private readonly FBMListenerSessionParams Params; + private readonly IFBMMemoryManager MemManager; - public readonly object? UserState; - public readonly SemaphoreSlim ResponseLock; public readonly WebSocketSession Socket; - public readonly RequestHandler OnRecieved; + public readonly IFBMServerMessageHandler Handler; public CancellationToken CancellationToken => Cancellation.Token; - - public ListeningSession(WebSocketSession session, RequestHandler onRecieved, in FBMListenerSessionParams args, object? userState) + public ListeningSession(WebSocketSession session, IFBMServerMessageHandler handler, in FBMListenerSessionParams args, IFBMMemoryManager memManager) { Params = args; Socket = session; - UserState = userState; - OnRecieved = onRecieved; + Handler = handler; + MemManager = memManager; //Create cancellation and register for session close Cancellation = new(); @@ -323,7 +306,7 @@ namespace VNLib.Net.Messaging.FBM.Server CtxStore = ObjectRental.CreateReusable(ContextCtor); } - private FBMContext ContextCtor() => new(Params.MaxHeaderBufferSize, Params.ResponseBufferSize, Params.HeaderEncoding); + private FBMContext ContextCtor() => new(Params.MaxHeaderBufferSize, Params.ResponseBufferSize, Params.HeaderEncoding, MemManager); /// <summary> /// Cancels any pending opreations relating to the current session @@ -358,7 +341,6 @@ namespace VNLib.Net.Messaging.FBM.Server /// <exception cref="ObjectDisposedException"></exception> public FBMContext RentContext() { - if (Cancellation.IsCancellationRequested) { throw new ObjectDisposedException("The instance has been disposed"); diff --git a/lib/Net.Messaging.FBM/src/Server/FBMListenerBase.cs b/lib/Net.Messaging.FBM/src/Server/FBMListenerBase.cs index 3e9fde2..71b1c8f 100644 --- a/lib/Net.Messaging.FBM/src/Server/FBMListenerBase.cs +++ b/lib/Net.Messaging.FBM/src/Server/FBMListenerBase.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2022 Vaughn Nugent +* Copyright (c) 2023 Vaughn Nugent * * Library: VNLib * Package: VNLib.Net.Messaging.FBM @@ -27,67 +27,29 @@ using System.Threading; using System.Threading.Tasks; using VNLib.Utils.Logging; -using VNLib.Utils.Memory; using VNLib.Plugins.Essentials; namespace VNLib.Net.Messaging.FBM.Server { + /// <summary> /// Provides a simple base class for an <see cref="FBMListener"/> /// processor /// </summary> - public abstract class FBMListenerBase + public abstract class FBMListenerBase<T> : IFBMServerErrorHandler { /// <summary> /// The initialzied listener /// </summary> - protected FBMListener? Listener { get; private set; } + protected abstract FBMListener Listener { get; } + /// <summary> /// A provider to write log information to /// </summary> protected abstract ILogProvider Log { get; } /// <summary> - /// Initializes the <see cref="FBMListener"/> - /// </summary> - /// <param name="heap">The heap to alloc buffers from</param> - protected void InitListener(IUnmangedHeap heap) - { - Listener = new(heap); - //Attach service handler - Listener.OnProcessError += Listener_OnProcessError; - } - - /// <summary> - /// A single event service routine for servicing errors that occur within - /// the listener loop - /// </summary> - /// <param name="sender"></param> - /// <param name="e">The exception that was raised</param> - protected virtual void Listener_OnProcessError(object? sender, Exception e) - { - //Write the error to the log - Log.Error(e); - } - - private async Task OnReceivedAsync(FBMContext context, object? userState, CancellationToken token) - { - try - { - await ProcessAsync(context, userState, token); - } - catch (OperationCanceledException) - { - Log.Debug("Async operation cancelled"); - } - catch(Exception ex) - { - Log.Error(ex); - } - } - - /// <summary> /// Begins listening for requests on the current websocket until /// a close message is received or an error occurs /// </summary> @@ -95,10 +57,13 @@ namespace VNLib.Net.Messaging.FBM.Server /// <param name="args">The arguments used to configured this listening session</param> /// <param name="userState">A state token to use for processing events for this connection</param> /// <returns>A <see cref="Task"/> that completes when the connection closes</returns> - public virtual async Task ListenAsync(WebSocketSession wss, FBMListenerSessionParams args, object? userState) + /// <exception cref="InvalidOperationException"></exception> + public virtual Task ListenAsync(WebSocketSession wss, T userState, FBMListenerSessionParams args) { _ = Listener ?? throw new InvalidOperationException("The listener has not been intialized"); - await Listener.ListenAsync(wss, OnReceivedAsync, args, userState); + //Initn new event handler + FBMEventHandler handler = new(userState, this); + return Listener.ListenAsync(wss, handler, args); } /// <summary> @@ -108,6 +73,30 @@ namespace VNLib.Net.Messaging.FBM.Server /// <param name="userState">A state token passed on client connected</param> /// <param name="exitToken">A token that reflects the state of the listener</param> /// <returns>A task that completes when the message has been serviced</returns> - protected abstract Task ProcessAsync(FBMContext context, object? userState, CancellationToken exitToken); + protected abstract Task ProcessAsync(FBMContext context, T? userState, CancellationToken exitToken); + + ///<inheritdoc/> + public virtual bool OnInvalidMessage(FBMContext context, Exception ex) + { + Log.Error("Invalid message received for session {ses}\n{ex}", context.Request.ConnectionId, ex); + //Invalid id should be captured already, so if oom, do not allow, but if a single header is invalid, it will be ignored by default + return !context.Request.ParseStatus.HasFlag(HeaderParseError.HeaderOutOfMem); + } + + ///<inheritdoc/> + public virtual void OnProcessError(Exception ex) => Log.Error(ex); + + + private sealed record class FBMEventHandler(T State, FBMListenerBase<T> Lb) : IFBMServerMessageHandler + { + ///<inheritdoc/> + public Task HandleMessage(FBMContext context, CancellationToken cancellationToken) => Lb.ProcessAsync(context, State, cancellationToken); + + ///<inheritdoc/> + public bool OnInvalidMessage(FBMContext context, Exception ex) => Lb.OnInvalidMessage(context, ex); + + ///<inheritdoc/> + public void OnProcessError(Exception ex) => Lb.OnProcessError(ex); + } } } diff --git a/lib/Net.Messaging.FBM/src/Server/FBMListenerSessionParams.cs b/lib/Net.Messaging.FBM/src/Server/FBMListenerSessionParams.cs index ccf79db..0b4fa5b 100644 --- a/lib/Net.Messaging.FBM/src/Server/FBMListenerSessionParams.cs +++ b/lib/Net.Messaging.FBM/src/Server/FBMListenerSessionParams.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2022 Vaughn Nugent +* Copyright (c) 2023 Vaughn Nugent * * Library: VNLib * Package: VNLib.Net.Messaging.FBM @@ -30,7 +30,7 @@ namespace VNLib.Net.Messaging.FBM.Server /// Represents a configuration structure for an <see cref="FBMListener"/> /// listening session /// </summary> - public readonly struct FBMListenerSessionParams + public readonly record struct FBMListenerSessionParams { /// <summary> /// The size of the buffer to use while reading data from the websocket diff --git a/lib/Net.Messaging.FBM/src/Server/FBMRequestMessage.cs b/lib/Net.Messaging.FBM/src/Server/FBMRequestMessage.cs index db0655a..e9ff9f5 100644 --- a/lib/Net.Messaging.FBM/src/Server/FBMRequestMessage.cs +++ b/lib/Net.Messaging.FBM/src/Server/FBMRequestMessage.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2022 Vaughn Nugent +* Copyright (c) 2023 Vaughn Nugent * * Library: VNLib * Package: VNLib.Net.Messaging.FBM @@ -24,13 +24,12 @@ using System; using System.Text; -using System.Buffers; -using System.Text.Json; using System.Collections.Generic; +using System.Runtime.InteropServices; using System.Runtime.CompilerServices; using VNLib.Utils.IO; -using VNLib.Utils.Extensions; +using VNLib.Utils.Memory; using VNLib.Utils.Memory.Caching; namespace VNLib.Net.Messaging.FBM.Server @@ -42,18 +41,21 @@ namespace VNLib.Net.Messaging.FBM.Server { private readonly List<FBMMessageHeader> _headers; private readonly int HeaderBufferSize; + private readonly IFBMMemoryManager _memoryManager; + private readonly IFBMSpanOnlyMemoryHandle _memHandle; /// <summary> /// Creates a new resusable <see cref="FBMRequestMessage"/> /// </summary> /// <param name="headerBufferSize">The size of the buffer to alloc during initialization</param> - internal FBMRequestMessage(int headerBufferSize) + /// <param name="manager">The memory manager to use for the message</param> + internal FBMRequestMessage(int headerBufferSize, IFBMMemoryManager manager) { HeaderBufferSize = headerBufferSize; _headers = new(); + _memoryManager = manager; + _memHandle = _memoryManager.InitSpanOnly(); } - - private char[]? _headerBuffer; /// <summary> /// The ID of the current message @@ -115,35 +117,13 @@ namespace VNLib.Net.Messaging.FBM.Server //Parse headers ParseStatus = Helpers.ParseHeaders(vms, this, _headers, dataEncoding); } - - /// <summary> - /// Deserializes the request body into a new specified object type - /// </summary> - /// <typeparam name="T">The type of the object to deserialize</typeparam> - /// <param name="jso">The <see cref="JsonSerializerOptions"/> to use while deserializing data</param> - /// <returns>The deserialized object from the request body</returns> - /// <exception cref="JsonException"></exception> - public T? DeserializeBody<T>(JsonSerializerOptions? jso = default) - { - return BodyData.IsEmpty ? default : BodyData.AsJsonObject<T>(jso); - } - - /// <summary> - /// Gets a <see cref="JsonDocument"/> of the request body - /// </summary> - /// <returns>The parsed <see cref="JsonDocument"/> if parsed successfully, or null otherwise</returns> - /// <exception cref="JsonException"></exception> - public JsonDocument? GetBodyAsJson() - { - Utf8JsonReader reader = new(BodyData); - return JsonDocument.TryParseValue(ref reader, out JsonDocument? jdoc) ? jdoc : default; - } + void IReusable.Prepare() { ParseStatus = HeaderParseError.None; //Alloc header buffer - _headerBuffer = ArrayPool<char>.Shared.Rent(HeaderBufferSize); + _memoryManager.AllocBuffer(_memHandle, MemoryUtil.ByteCount<char>(HeaderBufferSize)); } @@ -155,8 +135,7 @@ namespace VNLib.Net.Messaging.FBM.Server //Clear headers before freeing buffer _headers.Clear(); //Free header-buffer - ArrayPool<char>.Shared.Return(_headerBuffer!); - _headerBuffer = null; + _memoryManager.FreeBuffer(_memHandle); ConnectionId = null; MessageId = 0; IsControlFrame = false; @@ -165,11 +144,16 @@ namespace VNLib.Net.Messaging.FBM.Server [MethodImpl(MethodImplOptions.AggressiveInlining)] - Span<char> IFBMHeaderBuffer.GetSpan(int offset, int count) - => _headerBuffer != null ? _headerBuffer.AsSpan(offset, count) : throw new InvalidOperationException("The buffer is no longer available"); + Span<char> IFBMHeaderBuffer.GetSpan(int offset, int count) + { + //Cast to char buffer + Span<char> chars = MemoryMarshal.Cast<byte, char>(_memHandle.GetSpan()); + //Return the requested span + return chars.Slice(offset, count); + } [MethodImpl(MethodImplOptions.AggressiveInlining)] - Span<char> IFBMHeaderBuffer.GetSpan() => _headerBuffer ?? throw new InvalidOperationException("The buffer is no longer available"); + Span<char> IFBMHeaderBuffer.GetSpan() => MemoryMarshal.Cast<byte, char>(_memHandle.GetSpan()); } } diff --git a/lib/Net.Messaging.FBM/src/Server/FBMResponseMessage.cs b/lib/Net.Messaging.FBM/src/Server/FBMResponseMessage.cs index 9ca6b4d..1e26140 100644 --- a/lib/Net.Messaging.FBM/src/Server/FBMResponseMessage.cs +++ b/lib/Net.Messaging.FBM/src/Server/FBMResponseMessage.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2022 Vaughn Nugent +* Copyright (c) 2023 Vaughn Nugent * * Library: VNLib * Package: VNLib.Net.Messaging.FBM @@ -24,14 +24,12 @@ using System; using System.Text; -using System.Threading; using System.Threading.Tasks; using VNLib.Net.Http; using VNLib.Utils.IO; using VNLib.Utils.Extensions; using VNLib.Utils.Memory.Caching; -using VNLib.Net.Messaging.FBM.Client; namespace VNLib.Net.Messaging.FBM.Server { @@ -41,9 +39,9 @@ namespace VNLib.Net.Messaging.FBM.Server /// </summary> public sealed class FBMResponseMessage : IReusable, IFBMMessage { - internal FBMResponseMessage(int internalBufferSize, Encoding headerEncoding) + internal FBMResponseMessage(int internalBufferSize, Encoding headerEncoding, IFBMMemoryManager manager) { - _headerAccumulator = new HeaderDataAccumulator(internalBufferSize); + _headerAccumulator = new HeaderDataAccumulator(internalBufferSize, manager); _headerEncoding = headerEncoding; _messageEnumerator = new(this); } @@ -134,92 +132,76 @@ namespace VNLib.Net.Messaging.FBM.Server /// <summary> /// Gets the internal message body enumerator and prepares the message for sending /// </summary> - /// <param name="cancellationToken">A cancellation token</param> /// <returns>A value task that returns the message body enumerator</returns> - internal async ValueTask<IAsyncMessageReader> GetResponseDataAsync(CancellationToken cancellationToken) - { - //try to buffer as much data in the header segment first - if(MessageBody?.RemainingSize > 0 && _headerAccumulator.RemainingSize > 0) - { - //Read data from the message - int read = await MessageBody.ReadAsync(_headerAccumulator.RemainingBuffer, cancellationToken); - - //Advance accumulator to the read bytes - _headerAccumulator.Advance(read); - } - //return reusable enumerator - return _messageEnumerator; - } + internal IAsyncMessageReader GetResponseData() => _messageEnumerator; private sealed class MessageSegmentEnumerator : IAsyncMessageReader { private readonly FBMResponseMessage _message; + private readonly ISlindingWindowBuffer<byte> _accumulator; bool HeadersRead; public MessageSegmentEnumerator(FBMResponseMessage message) { _message = message; + _accumulator = _message._headerAccumulator; } - public ReadOnlyMemory<byte> Current { get; private set; } + ///<inheritdoc/> + public ReadOnlyMemory<byte> Current => _accumulator.AccumulatedBuffer; - public bool DataRemaining { get; private set; } + ///<inheritdoc/> + public bool DataRemaining => _message.MessageBody?.RemainingSize > 0; + ///<inheritdoc/> public async ValueTask<bool> MoveNextAsync() { //Attempt to read header segment first if (!HeadersRead) { - //Set the accumulated buffer - Current = _message._headerAccumulator.AccumulatedBuffer; + /* + * If headers have not been read yet, we can attempt to buffer as much + * of the message body into the header accumulator buffer as possible. This will + * reduce message fragmentation. + */ + if (DataRemaining && _accumulator.RemainingSize > 0) + { + int read = await _message.MessageBody.ReadAsync(_accumulator.RemainingBuffer).ConfigureAwait(false); - //Update data remaining flag - DataRemaining = _message.MessageBody?.RemainingSize > 0; + //Advance accumulator to the read bytes + _accumulator.Advance(read); + } //Set headers read flag HeadersRead = true; return true; } - else if (_message.MessageBody?.RemainingSize > 0) + else if (DataRemaining) { - //Use the header buffer as the buffer for the message body - Memory<byte> buffer = _message._headerAccumulator.Buffer; + //Reset the accumulator so we can read another segment + _accumulator.Reset(); //Read body segment - int read = await _message.MessageBody.ReadAsync(buffer); + int read = await _message.MessageBody.ReadAsync(_accumulator.RemainingBuffer); - //Update data remaining flag - DataRemaining = _message.MessageBody.RemainingSize > 0; + //Advance accumulator to the read bytes + _accumulator.Advance(read); - if (read > 0) - { - //Store the read segment - Current = buffer[..read]; - return true; - } + return read > 0; } return false; } + ///<inheritdoc/> public ValueTask DisposeAsync() { - //Clear current segment - Current = default; - //Reset headers read flag HeadersRead = false; - + //Dispose the message body if set - if (_message.MessageBody != null) - { - return _message.MessageBody.DisposeAsync(); - } - else - { - return ValueTask.CompletedTask; - } + return _message.MessageBody != null ? _message.MessageBody.DisposeAsync() : ValueTask.CompletedTask; } } } diff --git a/lib/Net.Messaging.FBM/src/Server/HeaderDataAccumulator.cs b/lib/Net.Messaging.FBM/src/Server/HeaderDataAccumulator.cs index 423a26e..891c583 100644 --- a/lib/Net.Messaging.FBM/src/Server/HeaderDataAccumulator.cs +++ b/lib/Net.Messaging.FBM/src/Server/HeaderDataAccumulator.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2022 Vaughn Nugent +* Copyright (c) 2023 Vaughn Nugent * * Library: VNLib * Package: VNLib.Net.Messaging.FBM @@ -23,25 +23,28 @@ */ using System; -using System.Buffers; using VNLib.Utils.IO; namespace VNLib.Net.Messaging.FBM.Server { + /// <summary> /// Reusable sliding window impl /// </summary> internal sealed class HeaderDataAccumulator : ISlindingWindowBuffer<byte> { - private readonly int BufferSize; - - private byte[]? _memHandle; + private readonly int _bufferSize; + private readonly IFBMMemoryManager _memManager; + private readonly IFBMMemoryHandle _handle; + - public HeaderDataAccumulator(int bufferSize) + public HeaderDataAccumulator(int bufferSize, IFBMMemoryManager memManager) { - BufferSize = bufferSize; + _bufferSize = bufferSize; + _memManager = memManager; + _handle = memManager.InitHandle(); } ///<inheritdoc/> @@ -49,7 +52,7 @@ namespace VNLib.Net.Messaging.FBM.Server ///<inheritdoc/> public int WindowEndPos { get; private set; } ///<inheritdoc/> - public Memory<byte> Buffer => _memHandle.AsMemory(); + public Memory<byte> Buffer => _handle.GetMemory(); ///<inheritdoc/> public void Advance(int count) => WindowEndPos += count; @@ -67,22 +70,13 @@ namespace VNLib.Net.Messaging.FBM.Server /// <summary> /// Allocates the internal message buffer /// </summary> - public void Prepare() - { - _memHandle ??= ArrayPool<byte>.Shared.Rent(BufferSize); - } + public void Prepare() => _memManager.AllocBuffer(_handle, _bufferSize); ///<inheritdoc/> public void Close() { Reset(); - - if (_memHandle != null) - { - //Return the buffer to the pool - ArrayPool<byte>.Shared.Return(_memHandle); - _memHandle = null; - } + _memManager.FreeBuffer(_handle); } } diff --git a/lib/Net.Messaging.FBM/src/Server/IAsyncMessageReader.cs b/lib/Net.Messaging.FBM/src/Server/IAsyncMessageReader.cs index b2abe8d..abb3600 100644 --- a/lib/Net.Messaging.FBM/src/Server/IAsyncMessageReader.cs +++ b/lib/Net.Messaging.FBM/src/Server/IAsyncMessageReader.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2022 Vaughn Nugent +* Copyright (c) 2023 Vaughn Nugent * * Library: VNLib * Package: VNLib.Net.Messaging.FBM @@ -34,7 +34,7 @@ namespace VNLib.Net.Messaging.FBM.Server internal interface IAsyncMessageReader : IAsyncEnumerator<ReadOnlyMemory<byte>> { /// <summary> - /// A value that indicates if there is data remaining after a + /// A value that indicates if there is data remaining after a read /// </summary> bool DataRemaining { get; } } diff --git a/lib/Net.Messaging.FBM/src/Server/IFBMServerErrorHandler.cs b/lib/Net.Messaging.FBM/src/Server/IFBMServerErrorHandler.cs new file mode 100644 index 0000000..caa4f96 --- /dev/null +++ b/lib/Net.Messaging.FBM/src/Server/IFBMServerErrorHandler.cs @@ -0,0 +1,49 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Net.Messaging.FBM +* File: IFBMServerErrorHandler.cs +* +* IFBMServerErrorHandler.cs is part of VNLib.Net.Messaging.FBM which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; + +namespace VNLib.Net.Messaging.FBM.Server +{ + /// <summary> + /// An server side FBM protocol error handler abstraction + /// </summary> + public interface IFBMServerErrorHandler + { + /// <summary> + /// An exception handler for unhandled events that occur during a listening session + /// </summary> + /// <param name="ex">The exception that caused this handler to be invoked</param> + void OnProcessError(Exception ex); + + /// <summary> + /// An exception handler for invalid messages that occur during a listening session. + /// NOTE: The context parameter is likely in an invlaid state and should be read carefully + /// </summary> + /// <param name="context">The context that the error occured while parsing on</param> + /// <param name="ex">The exception explaining the reason this handler was invoked</param> + /// <returns>A value that indicates if the server should continue processing</returns> + bool OnInvalidMessage(FBMContext context, Exception ex); + } +} diff --git a/lib/Net.Messaging.FBM/src/Server/IFBMServerMessageHandler.cs b/lib/Net.Messaging.FBM/src/Server/IFBMServerMessageHandler.cs new file mode 100644 index 0000000..532db5f --- /dev/null +++ b/lib/Net.Messaging.FBM/src/Server/IFBMServerMessageHandler.cs @@ -0,0 +1,43 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Net.Messaging.FBM +* File: IFBMServerMessageHandler.cs +* +* IFBMServerMessageHandler.cs is part of VNLib.Net.Messaging.FBM which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System.Threading; +using System.Threading.Tasks; + +namespace VNLib.Net.Messaging.FBM.Server +{ + /// <summary> + /// A server side FBM protocol handler + /// </summary> + public interface IFBMServerMessageHandler : IFBMServerErrorHandler + { + /// <summary> + /// Handles processing of a normal incoming message + /// </summary> + /// <param name="context">The context to process for this new message</param> + /// <param name="cancellationToken">A token that signals the session has been cancelled</param> + /// <returns>A task representing the asynchronous work</returns> + Task HandleMessage(FBMContext context, CancellationToken cancellationToken); + } +} diff --git a/lib/Net.Messaging.FBM/src/VNLib.Net.Messaging.FBM.csproj b/lib/Net.Messaging.FBM/src/VNLib.Net.Messaging.FBM.csproj index 7fade2c..70a640d 100644 --- a/lib/Net.Messaging.FBM/src/VNLib.Net.Messaging.FBM.csproj +++ b/lib/Net.Messaging.FBM/src/VNLib.Net.Messaging.FBM.csproj @@ -35,7 +35,6 @@ </ItemGroup> <ItemGroup> - <ProjectReference Include="..\..\Net.Http\src\VNLib.Net.Http.csproj" /> <ProjectReference Include="..\..\Plugins.Essentials\src\VNLib.Plugins.Essentials.csproj" /> <ProjectReference Include="..\..\Utils\src\VNLib.Utils.csproj" /> </ItemGroup> |