From de94d788e9a47432a7630a8215896b8dd3628599 Mon Sep 17 00:00:00 2001 From: vnugent Date: Sun, 8 Jan 2023 16:01:54 -0500 Subject: Reorder + analyzer cleanup --- .../src/Client/ClientExtensions.cs | 69 +++ lib/Net.Messaging.FBM/src/Client/FBMClient.cs | 475 +++++++++++++++++++++ .../src/Client/FBMClientConfig.cs | 81 ++++ .../src/Client/FBMClientWorkerBase.cs | 125 ++++++ lib/Net.Messaging.FBM/src/Client/FBMRequest.cs | 302 +++++++++++++ lib/Net.Messaging.FBM/src/Client/FBMResponse.cs | 106 +++++ .../src/Client/FMBClientErrorEventArgs.cs | 46 ++ lib/Net.Messaging.FBM/src/Client/HeaderCommand.cs | 57 +++ .../src/Client/HeaderParseStatus.cs | 40 ++ lib/Net.Messaging.FBM/src/Client/Helpers.cs | 272 ++++++++++++ lib/Net.Messaging.FBM/src/Client/IFBMMessage.cs | 62 +++ .../src/Client/IStatefulConnection.cs | 54 +++ .../src/Client/ManagedClientWebSocket.cs | 201 +++++++++ lib/Net.Messaging.FBM/src/Client/README.md | 169 ++++++++ .../src/Exceptions/FBMException.cs | 52 +++ .../src/Exceptions/FBMInvalidRequestException.cs | 47 ++ .../src/Exceptions/InvalidResponseException.cs | 52 +++ lib/Net.Messaging.FBM/src/Server/FBMContext.cs | 85 ++++ lib/Net.Messaging.FBM/src/Server/FBMListener.cs | 388 +++++++++++++++++ .../src/Server/FBMListenerBase.cs | 113 +++++ .../src/Server/FBMListenerSessionParams.cs | 62 +++ .../src/Server/FBMRequestMessage.cs | 196 +++++++++ .../src/Server/FBMResponseMessage.cs | 226 ++++++++++ .../src/Server/HeaderDataAccumulator.cs | 89 ++++ .../src/Server/IAsyncMessageBody.cs | 57 +++ .../src/Server/IAsyncMessageReader.cs | 42 ++ lib/Net.Messaging.FBM/src/Server/readme.md | 35 ++ .../src/VNLib.Net.Messaging.FBM.csproj | 33 ++ 28 files changed, 3536 insertions(+) create mode 100644 lib/Net.Messaging.FBM/src/Client/ClientExtensions.cs create mode 100644 lib/Net.Messaging.FBM/src/Client/FBMClient.cs create mode 100644 lib/Net.Messaging.FBM/src/Client/FBMClientConfig.cs create mode 100644 lib/Net.Messaging.FBM/src/Client/FBMClientWorkerBase.cs create mode 100644 lib/Net.Messaging.FBM/src/Client/FBMRequest.cs create mode 100644 lib/Net.Messaging.FBM/src/Client/FBMResponse.cs create mode 100644 lib/Net.Messaging.FBM/src/Client/FMBClientErrorEventArgs.cs create mode 100644 lib/Net.Messaging.FBM/src/Client/HeaderCommand.cs create mode 100644 lib/Net.Messaging.FBM/src/Client/HeaderParseStatus.cs create mode 100644 lib/Net.Messaging.FBM/src/Client/Helpers.cs create mode 100644 lib/Net.Messaging.FBM/src/Client/IFBMMessage.cs create mode 100644 lib/Net.Messaging.FBM/src/Client/IStatefulConnection.cs create mode 100644 lib/Net.Messaging.FBM/src/Client/ManagedClientWebSocket.cs create mode 100644 lib/Net.Messaging.FBM/src/Client/README.md create mode 100644 lib/Net.Messaging.FBM/src/Exceptions/FBMException.cs create mode 100644 lib/Net.Messaging.FBM/src/Exceptions/FBMInvalidRequestException.cs create mode 100644 lib/Net.Messaging.FBM/src/Exceptions/InvalidResponseException.cs create mode 100644 lib/Net.Messaging.FBM/src/Server/FBMContext.cs create mode 100644 lib/Net.Messaging.FBM/src/Server/FBMListener.cs create mode 100644 lib/Net.Messaging.FBM/src/Server/FBMListenerBase.cs create mode 100644 lib/Net.Messaging.FBM/src/Server/FBMListenerSessionParams.cs create mode 100644 lib/Net.Messaging.FBM/src/Server/FBMRequestMessage.cs create mode 100644 lib/Net.Messaging.FBM/src/Server/FBMResponseMessage.cs create mode 100644 lib/Net.Messaging.FBM/src/Server/HeaderDataAccumulator.cs create mode 100644 lib/Net.Messaging.FBM/src/Server/IAsyncMessageBody.cs create mode 100644 lib/Net.Messaging.FBM/src/Server/IAsyncMessageReader.cs create mode 100644 lib/Net.Messaging.FBM/src/Server/readme.md create mode 100644 lib/Net.Messaging.FBM/src/VNLib.Net.Messaging.FBM.csproj (limited to 'lib/Net.Messaging.FBM/src') diff --git a/lib/Net.Messaging.FBM/src/Client/ClientExtensions.cs b/lib/Net.Messaging.FBM/src/Client/ClientExtensions.cs new file mode 100644 index 0000000..102b6c9 --- /dev/null +++ b/lib/Net.Messaging.FBM/src/Client/ClientExtensions.cs @@ -0,0 +1,69 @@ +/* +* Copyright (c) 2022 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Net.Messaging.FBM +* File: ClientExtensions.cs +* +* ClientExtensions.cs is part of VNLib.Net.Messaging.FBM which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Runtime.CompilerServices; + +namespace VNLib.Net.Messaging.FBM.Client +{ + + public static class ClientExtensions + { + /// + /// Writes the location header of the requested resource + /// + /// + /// The location address + /// + public static void WriteLocation(this FBMRequest request, ReadOnlySpan location) + { + request.WriteHeader(HeaderCommand.Location, location); + } + + /// + /// Writes the location header of the requested resource + /// + /// + /// The location address + /// + public static void WriteLocation(this FBMRequest request, Uri location) + { + request.WriteHeader(HeaderCommand.Location, location.ToString()); + } + + /// + /// If the property is false, raises an + /// + /// + /// + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public static void ThrowIfNotSet(this in FBMResponse response) + { + if (!response.IsSet) + { + throw new InvalidResponseException("The response state is undefined (no response received)"); + } + } + } +} diff --git a/lib/Net.Messaging.FBM/src/Client/FBMClient.cs b/lib/Net.Messaging.FBM/src/Client/FBMClient.cs new file mode 100644 index 0000000..db52c03 --- /dev/null +++ b/lib/Net.Messaging.FBM/src/Client/FBMClient.cs @@ -0,0 +1,475 @@ +/* +* Copyright (c) 2022 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Net.Messaging.FBM +* File: FBMClient.cs +* +* FBMClient.cs is part of VNLib.Net.Messaging.FBM which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.IO; +using System.Buffers; +using System.Threading; +using System.Net.WebSockets; +using System.Threading.Tasks; +using System.Collections.Generic; +using System.Collections.Concurrent; + +using VNLib.Net.Http; +using VNLib.Utils; +using VNLib.Utils.IO; +using VNLib.Utils.Logging; +using VNLib.Utils.Extensions; +using VNLib.Utils.Memory.Caching; + +namespace VNLib.Net.Messaging.FBM.Client +{ + /// + /// A Fixed Buffer Message Protocol client. Allows for high performance client-server messaging + /// with minimal memory overhead. + /// + public class FBMClient : VnDisposeable, IStatefulConnection, ICacheHolder + { + /// + /// The WS connection query arguments to specify a receive buffer size + /// + public const string REQ_RECV_BUF_QUERY_ARG = "b"; + /// + /// The WS connection query argument to suggest a maximum response header buffer size + /// + public const string REQ_HEAD_BUF_QUERY_ARG = "hb"; + /// + /// The WS connection query argument to suggest a maximum message size + /// + public const string REQ_MAX_MESS_QUERY_ARG = "mx"; + + /// + /// Raised when the websocket has been closed because an error occured. + /// You may inspect the event args to determine the cause of the error. + /// + public event EventHandler? ConnectionClosedOnError; + /// + /// Raised when the client listener operaiton has completed as a normal closure + /// + public event EventHandler? ConnectionClosed; + + private readonly SemaphoreSlim SendLock; + private readonly ConcurrentDictionary ActiveRequests; + private readonly ReusableStore RequestRental; + private readonly FBMRequest _controlFrame; + /// + /// The configuration for the current client + /// + public FBMClientConfig Config { get; } + /// + /// A handle that is reset when a connection has been successfully set, and is set + /// when the connection exists + /// + public ManualResetEvent ConnectionStatusHandle { get; } + /// + /// The to send/recieve message on + /// + public ManagedClientWebSocket ClientSocket { get; } + /// + /// Gets the shared control frame for the current instance. The request is reset when + /// this property is called. (Not thread safe) + /// + protected FBMRequest ControlFrame + { + get + { + _controlFrame.Reset(); + return _controlFrame; + } + } + + /// + /// Creates a new in a closed state + /// + /// The client configuration + public FBMClient(FBMClientConfig config) + { + RequestRental = ObjectRental.CreateReusable(ReuseableRequestConstructor); + SendLock = new(1); + ConnectionStatusHandle = new(true); + ActiveRequests = new(Environment.ProcessorCount, 100); + + Config = config; + //Init control frame + _controlFrame = new (Helpers.CONTROL_FRAME_MID, in config); + //Init the new client socket + ClientSocket = new(config.RecvBufferSize, config.RecvBufferSize, config.KeepAliveInterval, config.SubProtocol); + } + + private void Debug(string format, params string[] strings) + { + if(Config.DebugLog != null) + { + Config.DebugLog.Debug($"[DEBUG] FBM Client: {format}", strings); + } + } + private void Debug(string format, long value, long other) + { + if (Config.DebugLog != null) + { + Config.DebugLog.Debug($"[DEBUG] FBM Client: {format}", value, other); + } + } + + /// + /// Allocates and configures a new message object for use within the reusable store + /// + /// The configured + protected virtual FBMRequest ReuseableRequestConstructor() => new(Config); + + /// + /// Asynchronously opens a websocket connection with the specifed remote server + /// + /// The address of the server to connect to + /// A cancellation token + /// + public async Task ConnectAsync(Uri serverUri, CancellationToken cancellationToken = default) + { + //Uribuilder to send config parameters to the server + UriBuilder urib = new(serverUri); + urib.Query += + $"{REQ_RECV_BUF_QUERY_ARG}={Config.RecvBufferSize}" + + $"&{REQ_HEAD_BUF_QUERY_ARG}={Config.MaxHeaderBufferSize}" + + $"&{REQ_MAX_MESS_QUERY_ARG}={Config.MaxMessageSize}"; + Debug("Connection string {con}", urib.Uri.ToString()); + //Connect to server + await ClientSocket.ConnectAsync(urib.Uri, cancellationToken); + //Reset wait handle before return + ConnectionStatusHandle.Reset(); + //Begin listeing for requets in a background task + _ = Task.Run(ProcessContinuousRecvAsync, cancellationToken); + } + + /// + /// Rents a new from the internal . + /// Use when request is no longer in use + /// + /// The configured (rented or new) ready for use + public FBMRequest RentRequest() => RequestRental.Rent(); + /// + /// Stores (or returns) the reusable request in cache for use with + /// + /// The request to return to the store + /// + public void ReturnRequest(FBMRequest request) => RequestRental.Return(request); + + /// + /// Sends a to the connected server + /// + /// The request message to send to the server + /// + /// When awaited, yields the server response + /// + /// + /// + /// + public async Task SendAsync(FBMRequest request, CancellationToken cancellationToken = default) + { + Check(); + //Length of the request must contains at least 1 int and header byte + if (request.Length < 1 + sizeof(int)) + { + throw new FBMInvalidRequestException("Message is not initialized"); + } + //Store a null value in the request queue so the response can store a buffer + if (!ActiveRequests.TryAdd(request.MessageId, request)) + { + throw new ArgumentException("Message with the same ID is already being processed"); + } + try + { + Debug("Sending {bytes} with id {id}", request.RequestData.Length, request.MessageId); + + //Reset the wait handle + request.ResponseWaitEvent.Reset(); + + //Wait for send-lock + using (SemSlimReleaser releaser = await SendLock.GetReleaserAsync(cancellationToken)) + { + //Send the data to the server + await ClientSocket.SendAsync(request.RequestData, WebSocketMessageType.Binary, true, cancellationToken); + } + + //wait for the response to be set + await request.WaitForResponseAsync(cancellationToken); + + Debug("Received {size} bytes for message {id}", request.Response?.Length ?? 0, request.MessageId); + + return request.GetResponse(); + } + catch + { + //Remove the request since packet was never sent + ActiveRequests.Remove(request.MessageId, out _); + //Clear waiting flag + request.ResponseWaitEvent.Set(); + throw; + } + } + /// + /// Streams arbitrary binary data to the server with the initial request message + /// + /// The request message to send to the server + /// Data to stream to the server + /// The content type of the stream of data + /// + /// When awaited, yields the server response + /// + /// + /// + public async Task StreamDataAsync(FBMRequest request, Stream payload, ContentType ct, CancellationToken cancellationToken = default) + { + Check(); + //Length of the request must contains at least 1 int and header byte + if(request.Length < 1 + sizeof(int)) + { + throw new FBMInvalidRequestException("Message is not initialized"); + } + //Store a null value in the request queue so the response can store a buffer + if (!ActiveRequests.TryAdd(request.MessageId, request)) + { + throw new ArgumentException("Message with the same ID is already being processed"); + } + try + { + Debug("Streaming {bytes} with id {id}", request.RequestData.Length, request.MessageId); + //Reset the wait handle + request.ResponseWaitEvent.Reset(); + //Write an empty body in the request + request.WriteBody(ReadOnlySpan.Empty, ct); + //Wait for send-lock + using (SemSlimReleaser releaser = await SendLock.GetReleaserAsync(cancellationToken)) + { + //Send the initial request packet + await ClientSocket.SendAsync(request.RequestData, WebSocketMessageType.Binary, false, cancellationToken); + //Calc buffer size + int bufSize = (int)Math.Clamp(payload.Length, Config.MessageBufferSize, Config.MaxMessageSize); + //Alloc a streaming buffer + using IMemoryOwner buffer = Config.BufferHeap.DirectAlloc(bufSize); + //Stream mesage body + do + { + //Read data + int read = await payload.ReadAsync(buffer.Memory, cancellationToken); + if (read == 0) + { + //No more data avialable + break; + } + //write message to socket, if the read data was smaller than the buffer, we can send the last packet + await ClientSocket.SendAsync(buffer.Memory[..read], WebSocketMessageType.Binary, read < bufSize, cancellationToken); + + } while (true); + } + //wait for the server to respond + await request.WaitForResponseAsync(cancellationToken); + + Debug("Response recieved {size} bytes for message {id}", request.Response?.Length ?? 0, request.MessageId); + } + catch + { + //Remove the request since packet was never sent or cancelled + ActiveRequests.Remove(request.MessageId, out _); + //Clear wait lock so the request state is reset + request.ResponseWaitEvent.Set(); + throw; + } + } + + /// + /// Begins listening for messages from the server on the internal socket (must be connected), + /// until the socket is closed, or canceled + /// + /// + protected async Task ProcessContinuousRecvAsync() + { + Debug("Begining receive loop"); + //Alloc recv buffer + IMemoryOwner recvBuffer = Config.BufferHeap.DirectAlloc(Config.RecvBufferSize); + try + { + //Recv event loop + while (true) + { + //Listen for incoming packets with the intial data buffer + ValueWebSocketReceiveResult result = await ClientSocket.ReceiveAsync(recvBuffer.Memory, CancellationToken.None); + //If the message is a close message, its time to exit + if (result.MessageType == WebSocketMessageType.Close) + { + //Notify the event handler that the connection was closed + ConnectionClosed?.Invoke(this, EventArgs.Empty); + break; + } + if (result.Count <= 4) + { + Debug("Empty message recieved from server"); + continue; + } + //Alloc data buffer and write initial data + VnMemoryStream responseBuffer = new(Config.BufferHeap); + //Copy initial data + responseBuffer.Write(recvBuffer.Memory.Span[..result.Count]); + //Receive packets until the EOF is reached + while (!result.EndOfMessage) + { + //recive more data + result = await ClientSocket.ReceiveAsync(recvBuffer.Memory, CancellationToken.None); + //Make sure the buffer is not too large + if ((responseBuffer.Length + result.Count) > Config.MaxMessageSize) + { + //Dispose the buffer before exiting + responseBuffer.Dispose(); + Debug("Recieved a message that was too large, skipped"); + goto Skip; + } + //Copy continuous data + responseBuffer.Write(recvBuffer.Memory.Span[..result.Count]); + } + //Reset the buffer stream position + _ = responseBuffer.Seek(0, SeekOrigin.Begin); + ProcessResponse(responseBuffer); + //Goto skip statment to cleanup resources + Skip:; + } + } + catch (OperationCanceledException) + { + //Normal closeure, do nothing + } + catch (Exception ex) + { + //Error event args + FMBClientErrorEventArgs wsEventArgs = new() + { + Cause = ex, + ErrorClient = this + }; + //Invoke error handler + ConnectionClosedOnError?.Invoke(this, wsEventArgs); + } + finally + { + //Dispose the recv buffer + recvBuffer.Dispose(); + //Set all pending events + foreach (FBMRequest request in ActiveRequests.Values) + { + request.ResponseWaitEvent.Set(); + } + //Clear dict + ActiveRequests.Clear(); + //Cleanup the socket when exiting + ClientSocket.Cleanup(); + //Set status handle as unset + ConnectionStatusHandle.Set(); + //Invoke connection closed + ConnectionClosed?.Invoke(this, EventArgs.Empty); + } + Debug("Receive loop exited"); + } + + /// + /// Syncrhonously processes a buffered response packet + /// + /// The buffered response body recieved from the server + /// This method blocks the listening task. So operations should be quick + protected virtual void ProcessResponse(VnMemoryStream responseMessage) + { + //read first response line + ReadOnlySpan line = Helpers.ReadLine(responseMessage); + //get the id of the message + int messageId = Helpers.GetMessageId(line); + //Finalze control frame + if(messageId == Helpers.CONTROL_FRAME_MID) + { + Debug("Control frame received"); + ProcessControlFrame(responseMessage); + return; + } + else if (messageId < 0) + { + //Cannot process request + responseMessage.Dispose(); + Debug("Invalid messageid"); + return; + } + //Search for the request that has the same id + if (ActiveRequests.TryRemove(messageId, out FBMRequest? request)) + { + //Set the new response message + request.SetResponse(responseMessage); + } + else + { + Debug("Message {id} was not found in the waiting message queue", messageId, 0); + + //Cleanup no request was waiting + responseMessage.Dispose(); + } + } + /// + /// Processes a control frame response from the server + /// + /// The raw response packet from the server + private void ProcessControlFrame(VnMemoryStream vms) + { + vms.Dispose(); + } + /// + /// Processes a control frame response from the server + /// + /// The parsed response-packet + protected virtual void ProcessControlFrame(in FBMResponse response) + { + + } + + /// + /// Closes the underlying and cancels all pending operations + /// + /// + /// + /// + public async Task DisconnectAsync(CancellationToken cancellationToken = default) + { + Check(); + //Close the connection + await ClientSocket.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, "Closing", cancellationToken); + } + /// + protected override void Free() + { + //Dispose socket + ClientSocket.Dispose(); + //Dispose client buffer + RequestRental.Dispose(); + SendLock.Dispose(); + ConnectionStatusHandle.Dispose(); + } + /// + public void CacheClear() => RequestRental.CacheClear(); + /// + public void CacheHardClear() => RequestRental.CacheHardClear(); + } +} diff --git a/lib/Net.Messaging.FBM/src/Client/FBMClientConfig.cs b/lib/Net.Messaging.FBM/src/Client/FBMClientConfig.cs new file mode 100644 index 0000000..229eb76 --- /dev/null +++ b/lib/Net.Messaging.FBM/src/Client/FBMClientConfig.cs @@ -0,0 +1,81 @@ +/* +* Copyright (c) 2022 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Net.Messaging.FBM +* File: FBMClientConfig.cs +* +* FBMClientConfig.cs is part of VNLib.Net.Messaging.FBM which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Text; + +using VNLib.Utils.Memory; +using VNLib.Utils.Logging; + +namespace VNLib.Net.Messaging.FBM.Client +{ + /// + /// A structure that defines readonly constants for the to use + /// + public readonly struct FBMClientConfig + { + /// + /// The size (in bytes) of the internal buffer to use when receiving messages from the server + /// + public readonly int RecvBufferSize { get; init; } + /// + /// The size (in bytes) of the internal buffer size, when requests are rented from the client + /// + /// + /// This is the entire size of the request buffer including headers and payload data, unless + /// data is streamed to the server + /// + public readonly int MessageBufferSize { get; init; } + /// + /// The size (in chars) of the client/server message header buffer + /// + public readonly int MaxHeaderBufferSize { get; init; } + /// + /// The maximum size (in bytes) of messages sent or recieved from the server + /// + public readonly int MaxMessageSize { get; init; } + /// + /// The heap to allocate interal (and message) buffers from + /// + public readonly IUnmangedHeap BufferHeap { get; init; } + /// + /// The websocket keepalive interval to use (leaving this property default disables keepalives) + /// + public readonly TimeSpan KeepAliveInterval { get; init; } + /// + /// The websocket sub-protocol to use + /// + public readonly string? SubProtocol { get; init; } + /// + /// The encoding instance used to encode header values + /// + public readonly Encoding HeaderEncoding { get; init; } + + /// + /// An optional log provider to write debug logs to. If this propery is not null, + /// debugging information will be logged with the debug log-level + /// + public readonly ILogProvider? DebugLog { get; init; } + } +} diff --git a/lib/Net.Messaging.FBM/src/Client/FBMClientWorkerBase.cs b/lib/Net.Messaging.FBM/src/Client/FBMClientWorkerBase.cs new file mode 100644 index 0000000..b4056dc --- /dev/null +++ b/lib/Net.Messaging.FBM/src/Client/FBMClientWorkerBase.cs @@ -0,0 +1,125 @@ +/* +* Copyright (c) 2022 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Net.Messaging.FBM +* File: FBMClientWorkerBase.cs +* +* FBMClientWorkerBase.cs is part of VNLib.Net.Messaging.FBM which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Threading; +using System.Threading.Tasks; + +using VNLib.Utils; + +namespace VNLib.Net.Messaging.FBM.Client +{ + /// + /// A base class for objects that implement + /// operations + /// + public abstract class FBMClientWorkerBase : VnDisposeable, IStatefulConnection + { + /// + /// Allows configuration of websocket configuration options + /// + public ManagedClientWebSocket SocketOptions => Client.ClientSocket; + +#nullable disable + /// + /// The to sent requests from + /// + public FBMClient Client { get; private set; } + + /// + /// Raised when the client has connected successfully + /// + public event Action Connected; +#nullable enable + + /// + public event EventHandler ConnectionClosed + { + add => Client.ConnectionClosed += value; + remove => Client.ConnectionClosed -= value; + } + + /// + /// Creates and initializes a the internal + /// + /// The client config + protected void InitClient(in FBMClientConfig config) + { + Client = new(config); + Client.ConnectionClosedOnError += Client_ConnectionClosedOnError; + Client.ConnectionClosed += Client_ConnectionClosed; + } + + private void Client_ConnectionClosed(object? sender, EventArgs e) => OnDisconnected(); + private void Client_ConnectionClosedOnError(object? sender, FMBClientErrorEventArgs e) => OnError(e); + + /// + /// Asynchronously connects to a remote server by the specified uri + /// + /// The remote uri of a server to connect to + /// A token to cancel the connect operation + /// A task that compeltes when the client has connected to the remote server + public virtual async Task ConnectAsync(Uri serverUri, CancellationToken cancellationToken = default) + { + //Connect to server + await Client.ConnectAsync(serverUri, cancellationToken).ConfigureAwait(true); + //Invoke child on-connected event + OnConnected(); + Connected?.Invoke(Client, this); + } + + /// + /// Asynchronously disonnects a client only if the client is currently connected, + /// returns otherwise + /// + /// + /// A task that compeltes when the client has disconnected + public virtual Task DisconnectAsync(CancellationToken cancellationToken = default) + { + return Client.DisconnectAsync(cancellationToken); + } + + /// + /// Invoked when a client has successfully connected to the remote server + /// + protected abstract void OnConnected(); + /// + /// Invoked when the client has disconnected cleanly + /// + protected abstract void OnDisconnected(); + /// + /// Invoked when the connected client is closed because of a connection error + /// + /// A that contains the client error data + protected abstract void OnError(FMBClientErrorEventArgs e); + + /// + protected override void Free() + { + Client.ConnectionClosedOnError -= Client_ConnectionClosedOnError; + Client.ConnectionClosed -= Client_ConnectionClosed; + Client.Dispose(); + } + } +} diff --git a/lib/Net.Messaging.FBM/src/Client/FBMRequest.cs b/lib/Net.Messaging.FBM/src/Client/FBMRequest.cs new file mode 100644 index 0000000..f02724a --- /dev/null +++ b/lib/Net.Messaging.FBM/src/Client/FBMRequest.cs @@ -0,0 +1,302 @@ +/* +* Copyright (c) 2022 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Net.Messaging.FBM +* File: FBMRequest.cs +* +* FBMRequest.cs is part of VNLib.Net.Messaging.FBM which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Text; +using System.Buffers; +using System.Threading; +using System.Threading.Tasks; +using System.Collections.Generic; + +using VNLib.Net.Http; +using VNLib.Utils; +using VNLib.Utils.IO; +using VNLib.Utils.Memory; +using VNLib.Utils.Extensions; +using VNLib.Utils.Memory.Caching; + +namespace VNLib.Net.Messaging.FBM.Client +{ + /// + /// A reusable Fixed Buffer Message request container. This class is not thread-safe + /// + public sealed class FBMRequest : VnDisposeable, IReusable, IFBMMessage, IStringSerializeable + { + private sealed class BufferWriter : IBufferWriter + { + private readonly FBMRequest _request; + + public BufferWriter(FBMRequest request) + { + _request = request; + } + + public void Advance(int count) + { + _request.Position += count; + } + + public Memory GetMemory(int sizeHint = 0) + { + return sizeHint > 0 ? _request.RemainingBuffer[0..sizeHint] : _request.RemainingBuffer; + } + + public Span GetSpan(int sizeHint = 0) + { + return sizeHint > 0 ? _request.RemainingBuffer.Span[0..sizeHint] : _request.RemainingBuffer.Span; + } + } + + private readonly IMemoryOwner HeapBuffer; + + + private readonly BufferWriter _writer; + private int Position; + + private readonly Encoding HeaderEncoding; + private readonly int ResponseHeaderBufferSize; + private readonly List>> ResponseHeaderList = new(); + private char[]? ResponseHeaderBuffer; + + /// + /// The size (in bytes) of the request message + /// + public int Length => Position; + private Memory RemainingBuffer => HeapBuffer.Memory[Position..]; + + /// + /// The id of the current request message + /// + public int MessageId { get; } + /// + /// The request message packet + /// + public ReadOnlyMemory RequestData => HeapBuffer.Memory[..Position]; + /// + /// An to signal request/response + /// event completion + /// + internal ManualResetEvent ResponseWaitEvent { get; } + + internal VnMemoryStream? Response { get; private set; } + /// + /// Initializes a new with the sepcified message buffer size, + /// and a random messageid + /// + /// The fbm client config storing required config variables + public FBMRequest(in FBMClientConfig config) : this(Helpers.RandomMessageId, in config) + { } + /// + /// Initializes a new with the sepcified message buffer size and a custom MessageId + /// + /// The custom message id + /// The fbm client config storing required config variables + public FBMRequest(int messageId, in FBMClientConfig config) + { + //Setup response wait handle but make sure the contuation runs async + ResponseWaitEvent = new(true); + + //Alloc the buffer as a memory owner so a memory buffer can be used + HeapBuffer = config.BufferHeap.DirectAlloc(config.MessageBufferSize); + + MessageId = messageId; + + HeaderEncoding = config.HeaderEncoding; + ResponseHeaderBufferSize = config.MaxHeaderBufferSize; + + WriteMessageId(); + _writer = new(this); + } + + /// + /// Resets the internal buffer and writes the message-id header to the begining + /// of the buffer + /// + private void WriteMessageId() + { + //Get writer over buffer + ForwardOnlyWriter buffer = new(HeapBuffer.Memory.Span); + //write messageid header to the buffer + buffer.Append((byte)HeaderCommand.MessageId); + buffer.Append(MessageId); + buffer.WriteTermination(); + //Store intial position + Position = buffer.Written; + } + + /// + public void WriteHeader(HeaderCommand header, ReadOnlySpan value) => WriteHeader((byte)header, value); + /// + public void WriteHeader(byte header, ReadOnlySpan value) + { + ForwardOnlyWriter buffer = new(RemainingBuffer.Span); + buffer.WriteHeader(header, value, Helpers.DefaultEncoding); + //Update position + Position += buffer.Written; + } + /// + public void WriteBody(ReadOnlySpan body, ContentType contentType = ContentType.Binary) + { + //Write content type header + WriteHeader(HeaderCommand.ContentType, HttpHelpers.GetContentTypeString(contentType)); + //Get writer over buffer + ForwardOnlyWriter buffer = new(RemainingBuffer.Span); + //Now safe to write body + buffer.WriteBody(body); + //Update position + Position += buffer.Written; + } + /// + /// Returns buffer writer for writing the body data to the internal message buffer + /// + /// A to write message body to + public IBufferWriter GetBodyWriter() + { + //Write body termination + Helpers.Termination.CopyTo(RemainingBuffer); + Position += Helpers.Termination.Length; + //Return buffer writer + return _writer; + } + + /// + /// Resets the internal buffer and allows for writing a new message with + /// the same message-id + /// + public void Reset() + { + //Re-writing the message-id will reset the buffer + WriteMessageId(); + } + + internal void SetResponse(VnMemoryStream? vms) + { + Response = vms; + ResponseWaitEvent.Set(); + } + + internal Task WaitForResponseAsync(CancellationToken token) + { + return ResponseWaitEvent.WaitAsync().WaitAsync(token); + } + + /// + protected override void Free() + { + HeapBuffer.Dispose(); + ResponseWaitEvent.Dispose(); + OnResponseDisposed(); + } + void IReusable.Prepare() => Reset(); + bool IReusable.Release() + { + //Clear old response data if error occured + Response?.Dispose(); + Response = null; + + return true; + } + + /// + /// Gets the response of the sent message + /// + /// The response message for the current request + internal FBMResponse GetResponse() + { + if (Response != null) + { + /* + * NOTICE + * + * The FBM Client will position the response stream to the start + * of the header section (missing the message-id header) + * + * The message id belongs to this request so it cannot be mismatched + * + * The headers are read into a list of key-value pairs and the stream + * is positioned to the start of the message body + */ + + + //Alloc rseponse buffer + ResponseHeaderBuffer ??= ArrayPool.Shared.Rent(ResponseHeaderBufferSize); + + //Parse message headers + HeaderParseError statusFlags = Helpers.ParseHeaders(Response, ResponseHeaderBuffer, ResponseHeaderList, HeaderEncoding); + + //return response structure + return new(Response, statusFlags, ResponseHeaderList, OnResponseDisposed); + } + else + { + return new(); + } + } + + //Called when a response message is disposed to cleanup resources held by the response + private void OnResponseDisposed() + { + //Clear response header list + ResponseHeaderList.Clear(); + + //Clear old response + Response?.Dispose(); + Response = null; + + if (ResponseHeaderBuffer != null) + { + //Free response buffer + ArrayPool.Shared.Return(ResponseHeaderBuffer!); + ResponseHeaderBuffer = null; + } + } + + /// + public string Compile() + { + int charSize = Helpers.DefaultEncoding.GetCharCount(RequestData.Span); + using UnsafeMemoryHandle buffer = Memory.UnsafeAlloc(charSize + 128); + ERRNO count = Compile(buffer.Span); + return buffer.AsSpan(0, count).ToString(); + } + /// + public void Compile(ref ForwardOnlyWriter writer) + { + writer.Append("Message ID:"); + writer.Append(MessageId); + writer.Append(Environment.NewLine); + Helpers.DefaultEncoding.GetChars(RequestData.Span, ref writer); + } + /// + public ERRNO Compile(in Span buffer) + { + ForwardOnlyWriter writer = new(buffer); + Compile(ref writer); + return writer.Written; + } + /// + public override string ToString() => Compile(); + + } +} diff --git a/lib/Net.Messaging.FBM/src/Client/FBMResponse.cs b/lib/Net.Messaging.FBM/src/Client/FBMResponse.cs new file mode 100644 index 0000000..da36956 --- /dev/null +++ b/lib/Net.Messaging.FBM/src/Client/FBMResponse.cs @@ -0,0 +1,106 @@ +/* +* Copyright (c) 2022 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Net.Messaging.FBM +* File: FBMResponse.cs +* +* FBMResponse.cs is part of VNLib.Net.Messaging.FBM which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Collections.Generic; + +using VNLib.Utils.IO; + +namespace VNLib.Net.Messaging.FBM.Client +{ + /// + /// A Fixed Buffer Message client response linked to the request that generated it. + /// Once the request is disposed or returned this message state is invalid + /// + public readonly struct FBMResponse : IDisposable, IEquatable + { + private readonly Action? _onDispose; + + /// + /// True when a response body was recieved and properly parsed + /// + public readonly bool IsSet { get; } + /// + /// The raw response message packet + /// + public readonly VnMemoryStream? MessagePacket { get; } + /// + /// A collection of response message headers + /// + public readonly IReadOnlyList>> Headers { get; } + /// + /// Status flags of the message parse operation + /// + public readonly HeaderParseError StatusFlags { get; } + /// + /// The body segment of the response message + /// + public readonly ReadOnlySpan ResponseBody => IsSet ? Helpers.GetRemainingData(MessagePacket!) : ReadOnlySpan.Empty; + + /// + /// Initailzies a response message structure and parses response + /// packet structure + /// + /// The message buffer (message packet) + /// The size of the buffer to alloc for header value storage + /// The collection of headerse + /// A method that will be invoked when the message response body is disposed + public FBMResponse(VnMemoryStream? vms, HeaderParseError status, IReadOnlyList>> headerList, Action onDispose) + { + MessagePacket = vms; + StatusFlags = status; + Headers = headerList; + IsSet = true; + _onDispose = onDispose; + } + + /// + /// Creates an unset response structure + /// + public FBMResponse() + { + MessagePacket = null; + StatusFlags = HeaderParseError.InvalidHeaderRead; + Headers = Array.Empty>>(); + IsSet = false; + _onDispose = null; + } + + /// + /// Releases any resources associated with the response message + /// + public void Dispose() => _onDispose?.Invoke(); + /// + public override bool Equals(object? obj) => obj is FBMResponse response && Equals(response); + /// + public override int GetHashCode() => IsSet ? MessagePacket!.GetHashCode() : 0; + /// + public static bool operator ==(FBMResponse left, FBMResponse right) => left.Equals(right); + /// + public static bool operator !=(FBMResponse left, FBMResponse right) => !(left == right); + /// + public bool Equals(FBMResponse other) => IsSet && other.IsSet && MessagePacket == other.MessagePacket; + + } +} diff --git a/lib/Net.Messaging.FBM/src/Client/FMBClientErrorEventArgs.cs b/lib/Net.Messaging.FBM/src/Client/FMBClientErrorEventArgs.cs new file mode 100644 index 0000000..96e9414 --- /dev/null +++ b/lib/Net.Messaging.FBM/src/Client/FMBClientErrorEventArgs.cs @@ -0,0 +1,46 @@ +/* +* Copyright (c) 2022 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Net.Messaging.FBM +* File: FMBClientErrorEventArgs.cs +* +* FMBClientErrorEventArgs.cs is part of VNLib.Net.Messaging.FBM which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; + +#nullable disable + +namespace VNLib.Net.Messaging.FBM.Client +{ + /// + /// that is raised when an error occurs + /// in the background listener loop + /// + public class FMBClientErrorEventArgs : EventArgs + { + /// + /// The client that the exception was raised from + /// + public FBMClient ErrorClient { get; init; } + /// + /// The exception that was raised + /// + public Exception Cause { get; init; } + } +} diff --git a/lib/Net.Messaging.FBM/src/Client/HeaderCommand.cs b/lib/Net.Messaging.FBM/src/Client/HeaderCommand.cs new file mode 100644 index 0000000..5a57d85 --- /dev/null +++ b/lib/Net.Messaging.FBM/src/Client/HeaderCommand.cs @@ -0,0 +1,57 @@ +/* +* Copyright (c) 2022 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Net.Messaging.FBM +* File: HeaderCommand.cs +* +* HeaderCommand.cs is part of VNLib.Net.Messaging.FBM which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +namespace VNLib.Net.Messaging.FBM +{ + /// + /// A Fixed Buffer Message header command value + /// + public enum HeaderCommand : byte + { + /// + /// Default, do not use + /// + NOT_USED, + /// + /// Specifies the header for a message-id + /// + MessageId, + /// + /// Specifies a resource location + /// + Location, + /// + /// Specifies a standard MIME content type header + /// + ContentType, + /// + /// Specifies an action on a request + /// + Action, + /// + /// Specifies a status header + /// + Status + } +} diff --git a/lib/Net.Messaging.FBM/src/Client/HeaderParseStatus.cs b/lib/Net.Messaging.FBM/src/Client/HeaderParseStatus.cs new file mode 100644 index 0000000..d38df26 --- /dev/null +++ b/lib/Net.Messaging.FBM/src/Client/HeaderParseStatus.cs @@ -0,0 +1,40 @@ +/* +* Copyright (c) 2022 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Net.Messaging.FBM +* File: HeaderParseStatus.cs +* +* HeaderParseStatus.cs is part of VNLib.Net.Messaging.FBM which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; + +namespace VNLib.Net.Messaging.FBM +{ + /// + /// Specifies the results of a response parsing operation + /// + [Flags] + public enum HeaderParseError + { + None = 0, + InvalidId = 1, + HeaderOutOfMem = 2, + InvalidHeaderRead = 4 + } +} diff --git a/lib/Net.Messaging.FBM/src/Client/Helpers.cs b/lib/Net.Messaging.FBM/src/Client/Helpers.cs new file mode 100644 index 0000000..8f895fa --- /dev/null +++ b/lib/Net.Messaging.FBM/src/Client/Helpers.cs @@ -0,0 +1,272 @@ +/* +* Copyright (c) 2022 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Net.Messaging.FBM +* File: Helpers.cs +* +* Helpers.cs is part of VNLib.Net.Messaging.FBM which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.IO; +using System.Text; +using System.Collections.Generic; +using System.Security.Cryptography; + +using VNLib.Utils; +using VNLib.Utils.IO; +using VNLib.Utils.Memory; +using VNLib.Utils.Extensions; + + +namespace VNLib.Net.Messaging.FBM +{ + /// + /// Contains FBM library helper methods + /// + public static class Helpers + { + /// + /// The message-id of a connection control frame / out of band message + /// + public const int CONTROL_FRAME_MID = -500; + + public static readonly Encoding DefaultEncoding = Encoding.UTF8; + public static readonly ReadOnlyMemory Termination = new byte[] { 0xFF, 0xF1 }; + + /// + /// Parses the header line for a message-id + /// + /// A sequence of bytes that make up a header line + /// The message-id if parsed, -1 if message-id is not valid + public static int GetMessageId(ReadOnlySpan line) + { + //Make sure the message line is large enough to contain a message-id + if (line.Length < 1 + sizeof(int)) + { + return -1; + } + //The first byte should be the header id + HeaderCommand headerId = (HeaderCommand)line[0]; + //Make sure the headerid is set + if (headerId != HeaderCommand.MessageId) + { + return -2; + } + //Get the messageid after the header byte + ReadOnlySpan messageIdSegment = line.Slice(1, sizeof(int)); + //get the messageid from the messageid segment + return BitConverter.ToInt32(messageIdSegment); + } + + /// + /// Alloctes a random integer to use as a message id + /// + public static int RandomMessageId => RandomNumberGenerator.GetInt32(1, int.MaxValue); + /// + /// Gets the remaining data after the current position of the stream. + /// + /// The stream to segment + /// The remaining data segment + public static ReadOnlySpan GetRemainingData(VnMemoryStream response) + { + return response.AsSpan()[(int)response.Position..]; + } + + /// + /// Reads the next available line from the response message + /// + /// + /// The read line + public static ReadOnlySpan ReadLine(VnMemoryStream response) + { + //Get the span from the current stream position to end of the stream + ReadOnlySpan line = GetRemainingData(response); + //Search for next line termination + int index = line.IndexOf(Termination.Span); + if (index == -1) + { + return ReadOnlySpan.Empty; + } + //Update stream position to end of termination + response.Seek(index + Termination.Length, SeekOrigin.Current); + //slice up line and exclude the termination + return line[..index]; + } + /// + /// Parses headers from the request stream, stores headers from the buffer into the + /// header collection + /// + /// The FBM packet buffer + /// The header character buffer to write headers to + /// The collection to store headers in + /// The encoding type used to deocde header values + /// The results of the parse operation + public static HeaderParseError ParseHeaders(VnMemoryStream vms, char[] buffer, ICollection>> headers, Encoding encoding) + { + HeaderParseError status = HeaderParseError.None; + //sliding window + Memory currentWindow = buffer; + //Accumulate headers + while (true) + { + //Read the next line from the current stream + ReadOnlySpan line = ReadLine(vms); + if (line.IsEmpty) + { + //Done reading headers + break; + } + HeaderCommand cmd = GetHeaderCommand(line); + //Get header value + ERRNO charsRead = GetHeaderValue(line, currentWindow.Span, encoding); + if (charsRead < 0) + { + //Out of buffer space + status |= HeaderParseError.HeaderOutOfMem; + break; + } + else if (!charsRead) + { + //Invalid header + status |= HeaderParseError.InvalidHeaderRead; + } + else + { + //Store header as a read-only sequence + headers.Add(new(cmd, currentWindow[..(int)charsRead])); + //Shift buffer window + currentWindow = currentWindow[(int)charsRead..]; + } + } + return status; + } + + /// + /// Gets a enum from the first byte of the message + /// + /// + /// The enum value from hte first byte of the message + public static HeaderCommand GetHeaderCommand(ReadOnlySpan line) + { + return (HeaderCommand)line[0]; + } + /// + /// Gets the value of the header following the colon bytes in the specifed + /// data message data line + /// + /// The message header line to get the value of + /// The output character buffer to write characters to + /// The encoding to decode the specified data with + /// The number of characters encoded + public static ERRNO GetHeaderValue(ReadOnlySpan line, Span output, Encoding encoding) + { + //Get the data following the header byte + ReadOnlySpan value = line[1..]; + //Calculate the character account + int charCount = encoding.GetCharCount(value); + //Determine if the output buffer is large enough + if (charCount > output.Length) + { + return -1; + } + //Decode the characters and return the char count + _ = encoding.GetChars(value, output); + return charCount; + } + + /// + /// Appends an arbitrary header to the current request buffer + /// + /// + /// The of the header + /// The value of the header + /// Encoding to use when writing character message + /// + public static void WriteHeader(ref this ForwardOnlyWriter buffer, byte header, ReadOnlySpan value, Encoding encoding) + { + //get char count + int byteCount = encoding.GetByteCount(value); + //make sure there is enough room in the buffer + if (buffer.RemainingSize < byteCount) + { + throw new OutOfMemoryException(); + } + //Write header command enum value + buffer.Append(header); + //Convert the characters to binary and write to the buffer + encoding.GetBytes(value, ref buffer); + //Write termination (0) + buffer.WriteTermination(); + } + + /// + /// Ends the header section of the request and appends the message body to + /// the end of the request + /// + /// + /// The message body to send with request + /// + public static void WriteBody(ref this ForwardOnlyWriter buffer, ReadOnlySpan body) + { + //start with termination + buffer.WriteTermination(); + //Write the body + buffer.Append(body); + } + /// + /// Writes a line termination to the message buffer + /// + /// + public static void WriteTermination(ref this ForwardOnlyWriter buffer) + { + //write termination + buffer.Append(Termination.Span); + } + + /// + /// Writes a line termination to the message buffer + /// + /// + public static void WriteTermination(this IDataAccumulator buffer) + { + //write termination + buffer.Append(Termination.Span); + } + + /// + /// Appends an arbitrary header to the current request buffer + /// + /// + /// The of the header + /// The value of the header + /// Encoding to use when writing character message + /// + public static void WriteHeader(this IDataAccumulator buffer, byte header, ReadOnlySpan value, Encoding encoding) + { + //Write header command enum value + buffer.Append(header); + //Convert the characters to binary and write to the buffer + int written = encoding.GetBytes(value, buffer.Remaining); + //Advance the buffer + buffer.Advance(written); + //Write termination (0) + buffer.WriteTermination(); + } + } +} diff --git a/lib/Net.Messaging.FBM/src/Client/IFBMMessage.cs b/lib/Net.Messaging.FBM/src/Client/IFBMMessage.cs new file mode 100644 index 0000000..18f19ec --- /dev/null +++ b/lib/Net.Messaging.FBM/src/Client/IFBMMessage.cs @@ -0,0 +1,62 @@ +/* +* Copyright (c) 2022 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Net.Messaging.FBM +* File: IFBMMessage.cs +* +* IFBMMessage.cs is part of VNLib.Net.Messaging.FBM which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; + +using VNLib.Net.Http; + +namespace VNLib.Net.Messaging.FBM.Client +{ + /// + /// Represents basic Fixed Buffer Message protocol operations + /// + public interface IFBMMessage + { + /// + /// The unique id of the message (nonzero) + /// + int MessageId { get; } + /// + /// Writes a data body to the message of the specified content type + /// + /// The body of the message to copy + /// The content type of the message body + /// + void WriteBody(ReadOnlySpan body, ContentType contentType = ContentType.Binary); + /// + /// Appends an arbitrary header to the current request buffer + /// + /// The header id + /// The value of the header + /// + void WriteHeader(byte header, ReadOnlySpan value); + /// + /// Appends an arbitrary header to the current request buffer + /// + /// The of the header + /// The value of the header + /// + void WriteHeader(HeaderCommand header, ReadOnlySpan value); + } +} \ No newline at end of file diff --git a/lib/Net.Messaging.FBM/src/Client/IStatefulConnection.cs b/lib/Net.Messaging.FBM/src/Client/IStatefulConnection.cs new file mode 100644 index 0000000..3b9dd3b --- /dev/null +++ b/lib/Net.Messaging.FBM/src/Client/IStatefulConnection.cs @@ -0,0 +1,54 @@ +/* +* Copyright (c) 2022 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Net.Messaging.FBM +* File: IStatefulConnection.cs +* +* IStatefulConnection.cs is part of VNLib.Net.Messaging.FBM which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Threading; +using System.Threading.Tasks; + +namespace VNLib.Net.Messaging.FBM.Client +{ + /// + /// An abstraction for a stateful connection client that reports its status + /// + public interface IStatefulConnection + { + /// + /// An event that is raised when the connection state has transition from connected to disconnected + /// + event EventHandler ConnectionClosed; + /// + /// Connects the client to the remote resource + /// + /// The resource location to connect to + /// A token to cancel the connect opreation + /// A task that compeltes when the connection has succedded + Task ConnectAsync(Uri serverUri, CancellationToken cancellationToken = default); + /// + /// Gracefully disconnects the client from the remote resource + /// + /// A token to cancel the disconnect operation + /// A task that completes when the client has been disconnected + Task DisconnectAsync(CancellationToken cancellationToken = default); + } +} diff --git a/lib/Net.Messaging.FBM/src/Client/ManagedClientWebSocket.cs b/lib/Net.Messaging.FBM/src/Client/ManagedClientWebSocket.cs new file mode 100644 index 0000000..acac369 --- /dev/null +++ b/lib/Net.Messaging.FBM/src/Client/ManagedClientWebSocket.cs @@ -0,0 +1,201 @@ +/* +* Copyright (c) 2022 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Net.Messaging.FBM +* File: ManagedClientWebSocket.cs +* +* ManagedClientWebSocket.cs is part of VNLib.Net.Messaging.FBM which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Net; +using System.Threading; +using System.Net.Security; +using System.Net.WebSockets; +using System.Threading.Tasks; +using System.Security.Cryptography.X509Certificates; + +using VNLib.Utils.Memory; + +#nullable enable + +namespace VNLib.Net.Messaging.FBM.Client +{ + + /// + /// A wrapper container to manage client websockets + /// + public class ManagedClientWebSocket : WebSocket + { + private readonly int TxBufferSize; + private readonly int RxBufferSize; + private readonly TimeSpan KeepAliveInterval; + private readonly VnTempBuffer _dataBuffer; + private readonly string? _subProtocol; + + /// + /// A collection of headers to add to the client + /// + public WebHeaderCollection Headers { get; } + public X509CertificateCollection Certificates { get; } + public IWebProxy? Proxy { get; set; } + public CookieContainer? Cookies { get; set; } + public RemoteCertificateValidationCallback? RemoteCertificateValidationCallback { get; set; } + + + private ClientWebSocket? _socket; + + /// + /// Initiaizes a new that accepts an optional sub-protocol for connections + /// + /// The size (in bytes) of the send buffer size + /// The size (in bytes) of the receive buffer size to use + /// The WS keepalive interval + /// The optional sub-protocol to use + public ManagedClientWebSocket(int txBufferSize, int rxBufferSize, TimeSpan keepAlive, string? subProtocol) + { + //Init header collection + Headers = new(); + Certificates = new(); + //Alloc buffer + _dataBuffer = new(rxBufferSize); + TxBufferSize = txBufferSize; + RxBufferSize = rxBufferSize; + KeepAliveInterval = keepAlive; + _subProtocol = subProtocol; + } + + /// + /// Asyncrhonously prepares a new client web-socket and connects to the remote endpoint + /// + /// The endpoint to connect to + /// A token to cancel the connect operation + /// A task that compeltes when the client has connected + public async Task ConnectAsync(Uri serverUri, CancellationToken token) + { + //Init new socket + _socket = new(); + try + { + //Set buffer + _socket.Options.SetBuffer(RxBufferSize, TxBufferSize, _dataBuffer); + //Set remaining stored options + _socket.Options.ClientCertificates = Certificates; + _socket.Options.KeepAliveInterval = KeepAliveInterval; + _socket.Options.Cookies = Cookies; + _socket.Options.Proxy = Proxy; + _socket.Options.RemoteCertificateValidationCallback = RemoteCertificateValidationCallback; + //Specify sub-protocol + if (!string.IsNullOrEmpty(_subProtocol)) + { + _socket.Options.AddSubProtocol(_subProtocol); + } + //Set headers + for (int i = 0; i < Headers.Count; i++) + { + string name = Headers.GetKey(i); + string? value = Headers.Get(i); + //Set header + _socket.Options.SetRequestHeader(name, value); + } + //Connect to server + await _socket.ConnectAsync(serverUri, token); + } + catch + { + //Cleanup the socket + Cleanup(); + throw; + } + } + + /// + /// Cleans up internal resources to prepare for another connection + /// + public void Cleanup() + { + //Dispose old socket if set + _socket?.Dispose(); + _socket = null; + } + /// + public override WebSocketCloseStatus? CloseStatus => _socket?.CloseStatus; + /// + public override string CloseStatusDescription => _socket?.CloseStatusDescription ?? string.Empty; + /// + public override WebSocketState State => _socket?.State ?? WebSocketState.Closed; + /// + public override string SubProtocol => _subProtocol ?? string.Empty; + + + /// + public override void Abort() + { + _socket?.Abort(); + } + /// + public override Task CloseAsync(WebSocketCloseStatus closeStatus, string? statusDescription, CancellationToken cancellationToken) + { + return _socket?.CloseAsync(closeStatus, statusDescription, cancellationToken) ?? Task.CompletedTask; + } + /// + public override Task CloseOutputAsync(WebSocketCloseStatus closeStatus, string? statusDescription, CancellationToken cancellationToken) + { + if (_socket?.State == WebSocketState.Open || _socket?.State == WebSocketState.CloseSent) + { + return _socket.CloseOutputAsync(closeStatus, statusDescription, cancellationToken); + } + return Task.CompletedTask; + } + /// + public override ValueTask ReceiveAsync(Memory buffer, CancellationToken cancellationToken) + { + _ = _socket ?? throw new WebSocketException(WebSocketError.ConnectionClosedPrematurely, "The connected socket has been disconnected"); + + return _socket.ReceiveAsync(buffer, cancellationToken); + } + /// + public override Task ReceiveAsync(ArraySegment buffer, CancellationToken cancellationToken) + { + _ = _socket ?? throw new WebSocketException(WebSocketError.ConnectionClosedPrematurely, "The connected socket has been disconnected"); + + return _socket.ReceiveAsync(buffer, cancellationToken); + } + /// + public override ValueTask SendAsync(ReadOnlyMemory buffer, WebSocketMessageType messageType, bool endOfMessage, CancellationToken cancellationToken) + { + _ = _socket ?? throw new WebSocketException(WebSocketError.ConnectionClosedPrematurely, "The connected socket has been disconnected"); + return _socket.SendAsync(buffer, messageType, endOfMessage, cancellationToken); + } + /// + public override Task SendAsync(ArraySegment buffer, WebSocketMessageType messageType, bool endOfMessage, CancellationToken cancellationToken) + { + _ = _socket ?? throw new WebSocketException(WebSocketError.ConnectionClosedPrematurely, "The connected socket has been disconnected"); + return _socket.SendAsync(buffer, messageType, endOfMessage, cancellationToken); + } + + /// + public override void Dispose() + { + //Free buffer + _dataBuffer.Dispose(); + _socket?.Dispose(); + GC.SuppressFinalize(this); + } + } +} diff --git a/lib/Net.Messaging.FBM/src/Client/README.md b/lib/Net.Messaging.FBM/src/Client/README.md new file mode 100644 index 0000000..5aa8e76 --- /dev/null +++ b/lib/Net.Messaging.FBM/src/Client/README.md @@ -0,0 +1,169 @@ +# VNLib.Net.Messaging.FBM.Client + +Fixed Buffer Messaging Protocol client library. High performance statful messaging +protocol built on top of HTTP web-sockets. Low/no allocation, completely asynchronous +while providing a TPL API. This library provides a simple asynchronous request/response +architecture to web-sockets. This was initially designed to provide an alternative to +complete HTTP request/response overhead, but allow a simple control flow for work +across a network. + +The base of the library relies on creating message objects that allocate fixed size +buffers are configured when the IFBMMessageis constructed. All data is written to the +internal buffer adhering to the format below. + +Messages consist of a 4 byte message id, a collection of headers, and a message body. +The first 4 bytes of a message is the ID (for normal messages a signed integer greater than 0), +0 is reserved for error conditions, and negative numbers are reserved for internal +messages. Headers are identified by a single byte, followed by a variable length UTF8 +encoded character sequence, followed by a termination of 0xFF, 0xF1 (may change). + +### Message structure + 4 byte positive (signed 32-bit integer) message id + 2 byte termination + 1 byte header-id + variable length UTF8 value + 2 byte termination + -- other headers -- + 2 byte termination (extra termination, ie: empty header) + variable length payload + (end of message is the end of the payload) + +Buffer sizes are generally negotiated on initial web-socket upgrade, so to buffer entire messages +in a single read/write from the web-socket. Received messages are read into memory until +the web-socket has no more data available. The message is then parsed and passed for processing +on the server side, or complete a pending request on the client side. Servers may drop the +web-socket connection and return an error if messages exceed the size of the pre-negotiated +buffer. Servers should validate buffer sizes before accepting a connection. + +This client library allows for messages to be streamed to the server, however this library +is optimized for fixed buffers, so streaming will not be the most efficient, and will likely +cause slow-downs in message transmission. However, since FBM relies on a streaming protocol, +so it was silly not to provide it. Streams add overhead of additional buffer allocation, +additional copy, and message fragmentation (multiple writes to the web-socket). Since frames +written to the web-socket must be synchronized, a mutex is held during transmission, which +means the more message overhead, the longer the blocking period on new messages. Mutex +acquisition will wait asynchronously when necessary. + +The goal of the FBM protocol for is to provide efficient use of resources (memory, network, +and minimize GC load) to transfer small messages truly asynchronously, at wire speeds, with +only web-socket and transport overhead. Using web-sockets simplifies implementation, and allows +comparability across platforms, languages, and versions. + +## fundamentals + +The main implementation is the FBMClient class. This class provides the means for creating +the stateful connection to the remote server. It also provides an internal FBMRequest message +rental (object cache) that created initialized FBMRequest messages. This class may be derrived +to provide additional functionality, such as handling control frames that may dynamically +alter the state of the connection (negotiation etc). A mechanism to do so is provided. + +### FBMClient layout + +``` + public class FBMClient : VnDisposeable, IStatefulConnection, ICacheHolder + { + //Raised when an error occurs during receiving or parsing + public event EventHandler? ConnectionClosedOnError; + + //Raised when connection is closed, regardless of the cause + public event EventHandler? ConnectionClosed; + + //Connects to the remote server at the specified websocket address (ws:// or wss://) + public async Task ConnectAsync(Uri address, CancellationToken cancellation = default); + + //When connected, sends the specified message to the remote server + public async Task SendAsync(FBMRequest request, CancellationToken cancellationToken = default); + + //When connected, streams a message to the remote server, * the message payload must not be configured * + public async Task StreamDataAsync(FBMRequest request, Stream payload, ContentType ct, CancellationToken cancellationToken = default); + + //Disconnects from the remote server + public async Task DisconnectAsync(CancellationToken cancellationToken = default); + + //Releases all held resourses + public void Dispose(); //Inherrited from VnDisposeable + + ICacheHolder.CacheClear(); //Inherited member, clears cached FBMRequest objects + ICacheHolder.CacheHardClear(); //Inherited member, clears cached FBMRequest objects + } +``` + +### Example usage +``` + FBMClientConfig config = new() + { + //The size (in bytes) of the internal buffer to use when receiving messages from the server + RecvBufferSize = 1024, + + //FBMRequest buffer size (expected size of buffers, required for negotiation) + RequestBufferSize = 1024, + + //The size (in chars) of headers the FBMResponse should expect to buffer from the server + ResponseHeaderBufSize = 1024, + + //The absolute maximum message size to buffer from the server + MaxMessageSize = 10 * 1024 * 1024, //10KiB + + //The unmanaged heap the allocate buffers from + BufferHeap = Memory.Shared, + + //Web-socket keepalive frame interval + KeepAliveInterval = TimeSpan.FromSeconds(30), + + //Web-socket sub-protocol header value + SubProtocol = null + }; + + //Craete client from the config + using (FBMClient client = new(config)) + { + //Usually set some type of authentication headers before connecting + + /* + client.ClientSocket.SetHeader("Authorization", "Authorization token"); + */ + + //Connect to server + Uri address = new Uri("wss://localhost:8080/some/fbm/endpoint"); + await client.ConnectAsync(address, CancellationToken.None); + + do + { + //Rent request message + FBMRequest request = client.RentRequest(); + //Some arbitrary header value (or preconfigured header) + request.WriteHeader(0x10, "Hello"); + //Some arbitrary payload + request.WriteBody(new byte[] { 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A }, ContentType.Binary); + //Send request message + using (FBMResponse response = await client.SendAsync(request, CancellationToken.None)) + { + //Extension method to raise exception if an invalid response was received (also use the response.IsSet flag) + response.ThrowIfNotSet(); + + //Check headers (using Linq to get first header) + string header1 = response.Headers.First().Value.ToString(); + + //Get payload, data is valid until the response is disposed + ReadOnlySpan body = response.ResponseBody; + } + //Return request + client.ReturnRequest(request); + //request.Dispose(); //Alternativly dispose message + + await Task.Delay(1000); + } + while(true); + } +``` + +## Final Notes + +XML Documentation is or will be provided for almost all public exports. APIs are intended to +be sensibly public and immutable to allow for easy extensability (via extension methods). I +often use extension libraries to provide additional functionality. (See cache library) + +This library is likely a niche use case, and is probably not for everyone. Unless you care +about reasonably efficient high frequency request/response messaging, this probably isnt +for you. This library provides a reasonable building block for distributed lock mechanisms +and small data caching. \ No newline at end of file diff --git a/lib/Net.Messaging.FBM/src/Exceptions/FBMException.cs b/lib/Net.Messaging.FBM/src/Exceptions/FBMException.cs new file mode 100644 index 0000000..1d5c7db --- /dev/null +++ b/lib/Net.Messaging.FBM/src/Exceptions/FBMException.cs @@ -0,0 +1,52 @@ +/* +* Copyright (c) 2022 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Net.Messaging.FBM +* File: FBMException.cs +* +* FBMException.cs is part of VNLib.Net.Messaging.FBM which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Runtime.Serialization; + +namespace VNLib.Net.Messaging.FBM +{ + /// + /// A base exception class for all FBM Library exceptions + /// + public class FBMException : Exception + { + /// + public FBMException() + { + } + /// + public FBMException(string message) : base(message) + { + } + /// + public FBMException(string message, Exception innerException) : base(message, innerException) + { + } + /// + protected FBMException(SerializationInfo info, StreamingContext context) : base(info, context) + { + } + } +} diff --git a/lib/Net.Messaging.FBM/src/Exceptions/FBMInvalidRequestException.cs b/lib/Net.Messaging.FBM/src/Exceptions/FBMInvalidRequestException.cs new file mode 100644 index 0000000..ae42797 --- /dev/null +++ b/lib/Net.Messaging.FBM/src/Exceptions/FBMInvalidRequestException.cs @@ -0,0 +1,47 @@ +/* +* Copyright (c) 2022 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Net.Messaging.FBM +* File: FBMInvalidRequestException.cs +* +* FBMInvalidRequestException.cs is part of VNLib.Net.Messaging.FBM which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; + + +namespace VNLib.Net.Messaging.FBM +{ + /// + /// Raised when a request message is not in a valid state and cannot be sent + /// + public class FBMInvalidRequestException : FBMException + { + public FBMInvalidRequestException() + { + } + + public FBMInvalidRequestException(string message) : base(message) + { + } + + public FBMInvalidRequestException(string message, Exception innerException) : base(message, innerException) + { + } + } +} diff --git a/lib/Net.Messaging.FBM/src/Exceptions/InvalidResponseException.cs b/lib/Net.Messaging.FBM/src/Exceptions/InvalidResponseException.cs new file mode 100644 index 0000000..3f0b970 --- /dev/null +++ b/lib/Net.Messaging.FBM/src/Exceptions/InvalidResponseException.cs @@ -0,0 +1,52 @@ +/* +* Copyright (c) 2022 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Net.Messaging.FBM +* File: InvalidResponseException.cs +* +* InvalidResponseException.cs is part of VNLib.Net.Messaging.FBM which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Runtime.Serialization; + +namespace VNLib.Net.Messaging.FBM +{ + /// + /// Raised when a response to an FBM request is not in a valid state + /// + public class InvalidResponseException : FBMException + { + /// + public InvalidResponseException() + { + } + /// + public InvalidResponseException(string message) : base(message) + { + } + /// + public InvalidResponseException(string message, Exception innerException) : base(message, innerException) + { + } + /// + protected InvalidResponseException(SerializationInfo info, StreamingContext context) : base(info, context) + { + } + } +} diff --git a/lib/Net.Messaging.FBM/src/Server/FBMContext.cs b/lib/Net.Messaging.FBM/src/Server/FBMContext.cs new file mode 100644 index 0000000..fb39d1b --- /dev/null +++ b/lib/Net.Messaging.FBM/src/Server/FBMContext.cs @@ -0,0 +1,85 @@ +/* +* Copyright (c) 2022 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Net.Messaging.FBM +* File: FBMContext.cs +* +* FBMContext.cs is part of VNLib.Net.Messaging.FBM which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System.Text; + +using VNLib.Utils.IO; +using VNLib.Utils.Memory.Caching; + +namespace VNLib.Net.Messaging.FBM.Server +{ + /// + /// A request/response pair message context + /// + public sealed class FBMContext : IReusable + { + private readonly Encoding _headerEncoding; + + /// + /// The request message to process + /// + public FBMRequestMessage Request { get; } + /// + /// The response message + /// + public FBMResponseMessage Response { get; } + /// + /// Creates a new reusable + /// for use within a + /// cache + /// + /// The size in characters of the request header buffer + /// The size in characters of the response header buffer + /// The message header encoding instance + public FBMContext(int requestHeaderBufferSize, int responseBufferSize, Encoding headerEncoding) + { + Request = new(requestHeaderBufferSize); + Response = new(responseBufferSize, headerEncoding); + _headerEncoding = headerEncoding; + } + + /// + /// Initializes the context with the buffered request data + /// + /// The request data buffer positioned at the begining of the request data + /// The unique id of the connection + internal void Prepare(VnMemoryStream requestData, string connectionId) + { + Request.Prepare(requestData, connectionId, _headerEncoding); + //Message id is set after the request parses the incoming message + Response.Prepare(Request.MessageId); + } + + void IReusable.Prepare() + { + (Request as IReusable).Prepare(); + (Response as IReusable).Prepare(); + } + + bool IReusable.Release() + { + return (Request as IReusable).Release() & (Response as IReusable).Release(); + } + } +} diff --git a/lib/Net.Messaging.FBM/src/Server/FBMListener.cs b/lib/Net.Messaging.FBM/src/Server/FBMListener.cs new file mode 100644 index 0000000..6cca2a9 --- /dev/null +++ b/lib/Net.Messaging.FBM/src/Server/FBMListener.cs @@ -0,0 +1,388 @@ +/* +* Copyright (c) 2022 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Net.Messaging.FBM +* File: FBMListener.cs +* +* FBMListener.cs is part of VNLib.Net.Messaging.FBM which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.IO; +using System.Buffers; +using System.Threading; +using System.Net.WebSockets; +using System.Threading.Tasks; + +using VNLib.Utils.IO; +using VNLib.Utils.Async; +using VNLib.Utils.Memory; +using VNLib.Utils.Extensions; +using VNLib.Utils.Memory.Caching; +using VNLib.Plugins.Essentials; + +namespace VNLib.Net.Messaging.FBM.Server +{ + + /// + /// Method delegate for processing FBM messages from an + /// when messages are received + /// + /// The message/connection context + /// The state parameter passed on client connected + /// A token that reflects the state of the listener + /// A that resolves when processing is complete + public delegate Task RequestHandler(FBMContext context, object? userState, CancellationToken cancellationToken); + + /// + /// A FBM protocol listener. Listens for messages on a + /// and raises events on requests. + /// + public class FBMListener + { + private sealed class ListeningSession + { + private readonly ReusableStore CtxStore; + private readonly CancellationTokenSource Cancellation; + private readonly CancellationTokenRegistration Registration; + private readonly FBMListenerSessionParams Params; + + + public readonly object? UserState; + + public readonly SemaphoreSlim ResponseLock; + + public readonly WebSocketSession Socket; + + public readonly RequestHandler OnRecieved; + + public CancellationToken CancellationToken => Cancellation.Token; + + + public ListeningSession(WebSocketSession session, RequestHandler onRecieved, in FBMListenerSessionParams args, object? userState) + { + Params = args; + Socket = session; + UserState = userState; + OnRecieved = onRecieved; + + //Create cancellation and register for session close + Cancellation = new(); + Registration = session.Token.Register(Cancellation.Cancel); + + + ResponseLock = new(1); + CtxStore = ObjectRental.CreateReusable(ContextCtor); + } + + private FBMContext ContextCtor() => new(Params.MaxHeaderBufferSize, Params.ResponseBufferSize, Params.HeaderEncoding); + + /// + /// Cancels any pending opreations relating to the current session + /// + public void CancelSession() + { + Cancellation.Cancel(); + + //If dispose happens without any outstanding requests, we can dispose the session + if (_counter == 0) + { + CleanupInternal(); + } + } + + private void CleanupInternal() + { + Registration.Dispose(); + CtxStore.Dispose(); + Cancellation.Dispose(); + ResponseLock.Dispose(); + } + + + private uint _counter; + + /// + /// Rents a new instance from the pool + /// and increments the counter + /// + /// The rented instance + /// + public FBMContext RentContext() + { + + if (Cancellation.IsCancellationRequested) + { + throw new ObjectDisposedException("The instance has been disposed"); + } + + //Rent context + FBMContext ctx = CtxStore.Rent(); + //Increment counter + Interlocked.Increment(ref _counter); + + return ctx; + } + + /// + /// Returns a previously rented context to the pool + /// and decrements the counter. If the session has been + /// cancelled, when the counter reaches 0, cleanup occurs + /// + /// The context to return + public void ReturnContext(FBMContext ctx) + { + //Return the context + CtxStore.Return(ctx); + + uint current = Interlocked.Decrement(ref _counter); + + //No more contexts in use, dispose internals + if (Cancellation.IsCancellationRequested && current == 0) + { + ResponseLock.Dispose(); + Cancellation.Dispose(); + CtxStore.Dispose(); + } + } + } + + public const int SEND_SEMAPHORE_TIMEOUT_MS = 10 * 1000; + + private readonly IUnmangedHeap Heap; + + /// + /// Raised when a response processing error occured + /// + public event EventHandler? OnProcessError; + + /// + /// Creates a new instance ready for + /// processing connections + /// + /// The heap to alloc buffers from + public FBMListener(IUnmangedHeap heap) + { + Heap = heap; + } + + /// + /// Begins listening for requests on the current websocket until + /// a close message is received or an error occurs + /// + /// The to receive messages on + /// The callback method to handle incoming requests + /// The arguments used to configured this listening session + /// A state parameter + /// A that completes when the connection closes + public async Task ListenAsync(WebSocketSession wss, RequestHandler handler, FBMListenerSessionParams args, object? userState) + { + ListeningSession session = new(wss, handler, args, userState); + //Alloc a recieve buffer + using IMemoryOwner recvBuffer = Heap.DirectAlloc(args.RecvBufferSize); + + //Init new queue for dispatching work + AsyncQueue workQueue = new(true, true); + + //Start a task to process the queue + Task queueWorker = QueueWorkerDoWork(workQueue, session); + + try + { + //Listen for incoming messages + while (true) + { + //Receive a message + ValueWebSocketReceiveResult result = await wss.ReceiveAsync(recvBuffer.Memory); + //If a close message has been received, we can gracefully exit + if (result.MessageType == WebSocketMessageType.Close) + { + //Return close message + await wss.CloseSocketAsync(WebSocketCloseStatus.NormalClosure, "Goodbye"); + //break listen loop + break; + } + //create buffer for storing data + VnMemoryStream request = new(Heap); + //Copy initial data + request.Write(recvBuffer.Memory.Span[..result.Count]); + //Streaming read + while (!result.EndOfMessage) + { + //Read more data + result = await wss.ReceiveAsync(recvBuffer.Memory); + //Make sure the request is small enough to buffer + if (request.Length + result.Count > args.MaxMessageSize) + { + //dispose the buffer + request.Dispose(); + //close the socket with a message too big + await wss.CloseSocketAsync(WebSocketCloseStatus.MessageTooBig, "Buffer space exceeded for message. Goodbye"); + //break listen loop + goto Exit; + } + //write to buffer + request.Write(recvBuffer.Memory.Span[..result.Count]); + } + //Make sure data is available + if (request.Length == 0) + { + request.Dispose(); + continue; + } + //reset buffer position + _ = request.Seek(0, SeekOrigin.Begin); + //Enqueue the request + await workQueue.EnqueueAsync(request); + } + + Exit: + ; + } + finally + { + session.CancelSession(); + await queueWorker.ConfigureAwait(false); + } + } + + private async Task QueueWorkerDoWork(AsyncQueue queue, ListeningSession session) + { + try + { + while (true) + { + //Get work from queue + VnMemoryStream request = await queue.DequeueAsync(session.CancellationToken); + //Process request without waiting + _ = ProcessAsync(request, session).ConfigureAwait(false); + } + } + catch (OperationCanceledException) + { } + finally + { + //Cleanup any queued requests + while (queue.TryDequeue(out VnMemoryStream? stream)) + { + stream.Dispose(); + } + } + } + + private async Task ProcessAsync(VnMemoryStream data, ListeningSession session) + { + //Rent a new request object + FBMContext context = session.RentContext(); + try + { + //Prepare the request/response + context.Prepare(data, session.Socket.SocketID); + + if ((context.Request.ParseStatus & HeaderParseError.InvalidId) > 0) + { + OnProcessError?.Invoke(this, new FBMException($"Invalid messageid {context.Request.MessageId}, message length {data.Length}")); + return; + } + + //Check parse status flags + if ((context.Request.ParseStatus & HeaderParseError.HeaderOutOfMem) > 0) + { + OnProcessError?.Invoke(this, new FBMException("Packet received with not enough space to store headers")); + } + //Determine if request is an out-of-band message + else if (context.Request.MessageId == Helpers.CONTROL_FRAME_MID) + { + //Process control frame + await ProcessOOBAsync(context); + } + else + { + //Invoke normal message handler + await session.OnRecieved.Invoke(context, session.UserState, session.CancellationToken); + } + + //Get response data + await using IAsyncMessageReader messageEnumerator = await context.Response.GetResponseDataAsync(session.CancellationToken); + + //Load inital segment + if (await messageEnumerator.MoveNextAsync() && !session.CancellationToken.IsCancellationRequested) + { + ValueTask sendTask; + + //Syncrhonize access to send data because we may need to stream data to the client + await session.ResponseLock.WaitAsync(SEND_SEMAPHORE_TIMEOUT_MS); + try + { + do + { + bool eof = !messageEnumerator.DataRemaining; + + //Send first segment + sendTask = session.Socket.SendAsync(messageEnumerator.Current, WebSocketMessageType.Binary, eof); + + /* + * WARNING! + * this code relies on the managed websocket impl that the websocket will read + * the entire buffer before returning. If this is not the case, this code will + * overwrite the memory buffer on the next call to move next. + */ + + //Move to next segment + if (!await messageEnumerator.MoveNextAsync()) + { + break; + } + + //Await previous send + await sendTask; + + } while (true); + } + finally + { + //release semaphore + session.ResponseLock.Release(); + } + + await sendTask; + } + + //No data to send + } + catch (Exception ex) + { + OnProcessError?.Invoke(this, ex); + } + finally + { + session.ReturnContext(context); + } + } + + /// + /// Processes an out-of-band request message (internal communications) + /// + /// The containing the OOB message + /// A that completes when the operation completes + protected virtual Task ProcessOOBAsync(FBMContext outOfBandContext) + { + return Task.CompletedTask; + } + } +} diff --git a/lib/Net.Messaging.FBM/src/Server/FBMListenerBase.cs b/lib/Net.Messaging.FBM/src/Server/FBMListenerBase.cs new file mode 100644 index 0000000..3e9fde2 --- /dev/null +++ b/lib/Net.Messaging.FBM/src/Server/FBMListenerBase.cs @@ -0,0 +1,113 @@ +/* +* Copyright (c) 2022 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Net.Messaging.FBM +* File: FBMListenerBase.cs +* +* FBMListenerBase.cs is part of VNLib.Net.Messaging.FBM which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Threading; +using System.Threading.Tasks; + +using VNLib.Utils.Logging; +using VNLib.Utils.Memory; +using VNLib.Plugins.Essentials; + +namespace VNLib.Net.Messaging.FBM.Server +{ + /// + /// Provides a simple base class for an + /// processor + /// + public abstract class FBMListenerBase + { + + /// + /// The initialzied listener + /// + protected FBMListener? Listener { get; private set; } + /// + /// A provider to write log information to + /// + protected abstract ILogProvider Log { get; } + + /// + /// Initializes the + /// + /// The heap to alloc buffers from + protected void InitListener(IUnmangedHeap heap) + { + Listener = new(heap); + //Attach service handler + Listener.OnProcessError += Listener_OnProcessError; + } + + /// + /// A single event service routine for servicing errors that occur within + /// the listener loop + /// + /// + /// The exception that was raised + protected virtual void Listener_OnProcessError(object? sender, Exception e) + { + //Write the error to the log + Log.Error(e); + } + + private async Task OnReceivedAsync(FBMContext context, object? userState, CancellationToken token) + { + try + { + await ProcessAsync(context, userState, token); + } + catch (OperationCanceledException) + { + Log.Debug("Async operation cancelled"); + } + catch(Exception ex) + { + Log.Error(ex); + } + } + + /// + /// Begins listening for requests on the current websocket until + /// a close message is received or an error occurs + /// + /// The to receive messages on + /// The arguments used to configured this listening session + /// A state token to use for processing events for this connection + /// A that completes when the connection closes + public virtual async Task ListenAsync(WebSocketSession wss, FBMListenerSessionParams args, object? userState) + { + _ = Listener ?? throw new InvalidOperationException("The listener has not been intialized"); + await Listener.ListenAsync(wss, OnReceivedAsync, args, userState); + } + + /// + /// A method to service an incoming message + /// + /// The context containing the message to be serviced + /// A state token passed on client connected + /// A token that reflects the state of the listener + /// A task that completes when the message has been serviced + protected abstract Task ProcessAsync(FBMContext context, object? userState, CancellationToken exitToken); + } +} diff --git a/lib/Net.Messaging.FBM/src/Server/FBMListenerSessionParams.cs b/lib/Net.Messaging.FBM/src/Server/FBMListenerSessionParams.cs new file mode 100644 index 0000000..c327475 --- /dev/null +++ b/lib/Net.Messaging.FBM/src/Server/FBMListenerSessionParams.cs @@ -0,0 +1,62 @@ +/* +* Copyright (c) 2022 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Net.Messaging.FBM +* File: FBMListenerSessionParams.cs +* +* FBMListenerSessionParams.cs is part of VNLib.Net.Messaging.FBM which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System.Text; + +namespace VNLib.Net.Messaging.FBM.Server +{ + /// + /// Represents a configuration structure for an + /// listening session + /// + public readonly struct FBMListenerSessionParams + { + /// + /// The size of the buffer to use while reading data from the websocket + /// in the listener loop + /// + public readonly int RecvBufferSize { get; init; } + /// + /// The size of the character buffer to store FBMheader values in + /// the + /// + public readonly int MaxHeaderBufferSize { get; init; } + /// + /// The size of the internal message response buffer when + /// not streaming + /// + public readonly int ResponseBufferSize { get; init; } + /// + /// The FMB message header character encoding + /// + public readonly Encoding HeaderEncoding { get; init; } + + /// + /// The absolute maxium size (in bytes) message to process before + /// closing the websocket connection. This value should be negotiaed + /// by clients or hard-coded to avoid connection issues + /// + public readonly int MaxMessageSize { get; init; } + } +} diff --git a/lib/Net.Messaging.FBM/src/Server/FBMRequestMessage.cs b/lib/Net.Messaging.FBM/src/Server/FBMRequestMessage.cs new file mode 100644 index 0000000..ed36571 --- /dev/null +++ b/lib/Net.Messaging.FBM/src/Server/FBMRequestMessage.cs @@ -0,0 +1,196 @@ +/* +* Copyright (c) 2022 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Net.Messaging.FBM +* File: FBMRequestMessage.cs +* +* FBMRequestMessage.cs is part of VNLib.Net.Messaging.FBM which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Text; +using System.Buffers; +using System.Text.Json; +using System.Collections.Generic; + +using VNLib.Utils; +using VNLib.Utils.IO; +using VNLib.Utils.Memory; +using VNLib.Utils.Extensions; +using VNLib.Utils.Memory.Caching; + +namespace VNLib.Net.Messaging.FBM.Server +{ + /// + /// Represents a client request message to be serviced + /// + public sealed class FBMRequestMessage : IReusable + { + private readonly List>> _headers; + private readonly int HeaderCharBufferSize; + /// + /// Creates a new resusable + /// + /// The size of the buffer to alloc during initialization + internal FBMRequestMessage(int headerBufferSize) + { + HeaderCharBufferSize = headerBufferSize; + _headers = new(); + } + + private char[]? _headerBuffer; + + /// + /// The ID of the current message + /// + public int MessageId { get; private set; } + /// + /// Gets the underlying socket-id fot the current connection + /// + public string? ConnectionId { get; private set; } + /// + /// The raw request message, positioned to the body section of the message data + /// + public VnMemoryStream? RequestBody { get; private set; } + /// + /// A collection of headers for the current request + /// + public IReadOnlyList>> Headers => _headers; + /// + /// Status flags set during the message parsing + /// + public HeaderParseError ParseStatus { get; private set; } + /// + /// The message body data as a + /// + public ReadOnlySpan BodyData => Helpers.GetRemainingData(RequestBody!); + + /// + /// Determines if the current message is considered a control frame + /// + public bool IsControlFrame { get; private set; } + + /// + /// Prepares the request to be serviced + /// + /// The request data packet + /// The unique id of the connection + /// The data encoding used to decode header values + internal void Prepare(VnMemoryStream vms, string socketId, Encoding dataEncoding) + { + //Store request body + RequestBody = vms; + //Store message id + MessageId = Helpers.GetMessageId(Helpers.ReadLine(vms)); + //Check mid for control frame + if(MessageId == Helpers.CONTROL_FRAME_MID) + { + IsControlFrame = true; + } + else if (MessageId < 1) + { + ParseStatus |= HeaderParseError.InvalidId; + return; + } + + ConnectionId = socketId; + + //sliding window over remaining data from internal buffer + ForwardOnlyMemoryWriter writer = new(_headerBuffer); + + //Accumulate headers + while (true) + { + //Read the next line from the current stream + ReadOnlySpan line = Helpers.ReadLine(vms); + if (line.IsEmpty) + { + //Done reading headers + break; + } + HeaderCommand cmd = Helpers.GetHeaderCommand(line); + //Get header value + ERRNO charsRead = Helpers.GetHeaderValue(line, writer.Remaining.Span, dataEncoding); + if (charsRead < 0) + { + //Out of buffer space + ParseStatus |= HeaderParseError.HeaderOutOfMem; + break; + } + else if (!charsRead) + { + //Invalid header + ParseStatus |= HeaderParseError.InvalidHeaderRead; + } + else + { + //Store header as a read-only sequence + _headers.Add(new(cmd, writer.Remaining[..(int)charsRead])); + //Shift buffer window + writer.Advance(charsRead); + } + } + } + + /// + /// Deserializes the request body into a new specified object type + /// + /// The type of the object to deserialize + /// The to use while deserializing data + /// The deserialized object from the request body + /// + public T? DeserializeBody(JsonSerializerOptions? jso = default) + { + return BodyData.IsEmpty ? default : BodyData.AsJsonObject(jso); + } + /// + /// Gets a of the request body + /// + /// The parsed if parsed successfully, or null otherwise + /// + public JsonDocument? GetBodyAsJson() + { + Utf8JsonReader reader = new(BodyData); + return JsonDocument.TryParseValue(ref reader, out JsonDocument? jdoc) ? jdoc : default; + } + + void IReusable.Prepare() + { + ParseStatus = HeaderParseError.None; + //Alloc header buffer + _headerBuffer = ArrayPool.Shared.Rent(HeaderCharBufferSize); + } + + + bool IReusable.Release() + { + //Dispose the request message + RequestBody?.Dispose(); + RequestBody = null; + //Clear headers before freeing buffer + _headers.Clear(); + //Free header-buffer + ArrayPool.Shared.Return(_headerBuffer!); + _headerBuffer = null; + ConnectionId = null; + MessageId = 0; + IsControlFrame = false; + return true; + } + } +} diff --git a/lib/Net.Messaging.FBM/src/Server/FBMResponseMessage.cs b/lib/Net.Messaging.FBM/src/Server/FBMResponseMessage.cs new file mode 100644 index 0000000..ac34dda --- /dev/null +++ b/lib/Net.Messaging.FBM/src/Server/FBMResponseMessage.cs @@ -0,0 +1,226 @@ +/* +* Copyright (c) 2022 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Net.Messaging.FBM +* File: FBMResponseMessage.cs +* +* FBMResponseMessage.cs is part of VNLib.Net.Messaging.FBM which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +using VNLib.Net.Http; +using VNLib.Utils.IO; +using VNLib.Utils.Extensions; +using VNLib.Utils.Memory.Caching; +using VNLib.Net.Messaging.FBM.Client; + +namespace VNLib.Net.Messaging.FBM.Server +{ + + /// + /// Represents an FBM request response container. + /// + public sealed class FBMResponseMessage : IReusable, IFBMMessage + { + internal FBMResponseMessage(int internalBufferSize, Encoding headerEncoding) + { + _headerAccumulator = new HeaderDataAccumulator(internalBufferSize); + _headerEncoding = headerEncoding; + _messageEnumerator = new(this); + } + + private readonly MessageSegmentEnumerator _messageEnumerator; + private readonly ISlindingWindowBuffer _headerAccumulator; + private readonly Encoding _headerEncoding; + + private IAsyncMessageBody? _messageBody; + + /// + public int MessageId { get; private set; } + + void IReusable.Prepare() + { + (_headerAccumulator as HeaderDataAccumulator)!.Prepare(); + } + + bool IReusable.Release() + { + //Release header accumulator + _headerAccumulator.Close(); + + _messageBody = null; + + MessageId = 0; + + return true; + } + + /// + /// Initializes the response message with the specified message-id + /// to respond with + /// + /// The message id of the context to respond to + internal void Prepare(int messageId) + { + //Reset accumulator when message id is written + _headerAccumulator.Reset(); + //Write the messageid to the begining of the headers buffer + MessageId = messageId; + _headerAccumulator.Append((byte)HeaderCommand.MessageId); + _headerAccumulator.Append(messageId); + _headerAccumulator.WriteTermination(); + } + + /// + public void WriteHeader(HeaderCommand header, ReadOnlySpan value) + { + WriteHeader((byte)header, value); + } + /// + public void WriteHeader(byte header, ReadOnlySpan value) + { + _headerAccumulator.WriteHeader(header, value, _headerEncoding); + } + + /// + public void WriteBody(ReadOnlySpan body, ContentType contentType = ContentType.Binary) + { + //Append content type header + WriteHeader(HeaderCommand.ContentType, HttpHelpers.GetContentTypeString(contentType)); + //end header segment + _headerAccumulator.WriteTermination(); + //Write message body + _headerAccumulator.Append(body); + } + + /// + /// Sets the response message body + /// + /// The to stream data from + /// + public void AddMessageBody(IAsyncMessageBody messageBody) + { + if(_messageBody != null) + { + throw new InvalidOperationException("The message body is already set"); + } + + //Append message content type header + WriteHeader(HeaderCommand.ContentType, HttpHelpers.GetContentTypeString(messageBody.ContentType)); + + //end header segment + _headerAccumulator.WriteTermination(); + + //Store message body + _messageBody = messageBody; + + } + + /// + /// Gets the internal message body enumerator and prepares the message for sending + /// + /// A cancellation token + /// A value task that returns the message body enumerator + internal async ValueTask GetResponseDataAsync(CancellationToken cancellationToken) + { + //try to buffer as much data in the header segment first + if(_messageBody?.RemainingSize > 0 && _headerAccumulator.RemainingSize > 0) + { + //Read data from the message + int read = await _messageBody.ReadAsync(_headerAccumulator.RemainingBuffer, cancellationToken); + //Advance accumulator to the read bytes + _headerAccumulator.Advance(read); + } + //return reusable enumerator + return _messageEnumerator; + } + + private sealed class MessageSegmentEnumerator : IAsyncMessageReader + { + private readonly FBMResponseMessage _message; + + bool HeadersRead; + + public MessageSegmentEnumerator(FBMResponseMessage message) + { + _message = message; + } + + public ReadOnlyMemory Current { get; private set; } + + public bool DataRemaining { get; private set; } + + public async ValueTask MoveNextAsync() + { + //Attempt to read header segment first + if (!HeadersRead) + { + //Set the accumulated buffer + Current = _message._headerAccumulator.AccumulatedBuffer; + + //Update data remaining flag + DataRemaining = _message._messageBody?.RemainingSize > 0; + + //Set headers read flag + HeadersRead = true; + + return true; + } + else if (_message._messageBody?.RemainingSize > 0) + { + //Use the header buffer as the buffer for the message body + Memory buffer = _message._headerAccumulator.Buffer; + + //Read body segment + int read = await _message._messageBody.ReadAsync(buffer); + + //Update data remaining flag + DataRemaining = _message._messageBody.RemainingSize > 0; + + if (read > 0) + { + //Store the read segment + Current = buffer[..read]; + return true; + } + } + return false; + } + + public async ValueTask DisposeAsync() + { + //Clear current segment + Current = default; + + //Reset headers read flag + HeadersRead = false; + + //Dispose the message body if set + if (_message._messageBody != null) + { + await _message._messageBody.DisposeAsync(); + } + } + } + } + +} diff --git a/lib/Net.Messaging.FBM/src/Server/HeaderDataAccumulator.cs b/lib/Net.Messaging.FBM/src/Server/HeaderDataAccumulator.cs new file mode 100644 index 0000000..423a26e --- /dev/null +++ b/lib/Net.Messaging.FBM/src/Server/HeaderDataAccumulator.cs @@ -0,0 +1,89 @@ +/* +* Copyright (c) 2022 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Net.Messaging.FBM +* File: HeaderDataAccumulator.cs +* +* HeaderDataAccumulator.cs is part of VNLib.Net.Messaging.FBM which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Buffers; + +using VNLib.Utils.IO; + + +namespace VNLib.Net.Messaging.FBM.Server +{ + /// + /// Reusable sliding window impl + /// + internal sealed class HeaderDataAccumulator : ISlindingWindowBuffer + { + private readonly int BufferSize; + + private byte[]? _memHandle; + + public HeaderDataAccumulator(int bufferSize) + { + BufferSize = bufferSize; + } + + /// + public int WindowStartPos { get; private set; } + /// + public int WindowEndPos { get; private set; } + /// + public Memory Buffer => _memHandle.AsMemory(); + + /// + public void Advance(int count) => WindowEndPos += count; + + /// + public void AdvanceStart(int count) => WindowEndPos += count; + + /// + public void Reset() + { + WindowStartPos = 0; + WindowEndPos = 0; + } + + /// + /// Allocates the internal message buffer + /// + public void Prepare() + { + _memHandle ??= ArrayPool.Shared.Rent(BufferSize); + } + + /// + public void Close() + { + Reset(); + + if (_memHandle != null) + { + //Return the buffer to the pool + ArrayPool.Shared.Return(_memHandle); + _memHandle = null; + } + } + } + +} diff --git a/lib/Net.Messaging.FBM/src/Server/IAsyncMessageBody.cs b/lib/Net.Messaging.FBM/src/Server/IAsyncMessageBody.cs new file mode 100644 index 0000000..5566520 --- /dev/null +++ b/lib/Net.Messaging.FBM/src/Server/IAsyncMessageBody.cs @@ -0,0 +1,57 @@ +/* +* Copyright (c) 2022 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Net.Messaging.FBM +* File: IAsyncMessageBody.cs +* +* IAsyncMessageBody.cs is part of VNLib.Net.Messaging.FBM which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Threading; +using System.Threading.Tasks; + +using VNLib.Net.Http; + +namespace VNLib.Net.Messaging.FBM +{ + /// + /// A disposable message body container for asynchronously reading a variable length message body + /// + public interface IAsyncMessageBody : IAsyncDisposable + { + /// + /// The message body content type + /// + ContentType ContentType { get; } + + /// + /// The number of bytes remaining to be read from the message body + /// + int RemainingSize { get; } + + /// + /// Reads the next chunk of data from the message body + /// + /// The buffer to copy output data to + /// A token to cancel the operation + /// + ValueTask ReadAsync(Memory buffer, CancellationToken token = default); + } + +} diff --git a/lib/Net.Messaging.FBM/src/Server/IAsyncMessageReader.cs b/lib/Net.Messaging.FBM/src/Server/IAsyncMessageReader.cs new file mode 100644 index 0000000..b2abe8d --- /dev/null +++ b/lib/Net.Messaging.FBM/src/Server/IAsyncMessageReader.cs @@ -0,0 +1,42 @@ +/* +* Copyright (c) 2022 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Net.Messaging.FBM +* File: IAsyncMessageReader.cs +* +* IAsyncMessageReader.cs is part of VNLib.Net.Messaging.FBM which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Collections.Generic; + + +namespace VNLib.Net.Messaging.FBM.Server +{ + /// + /// Internal message body reader/enumerator for FBM messages + /// + internal interface IAsyncMessageReader : IAsyncEnumerator> + { + /// + /// A value that indicates if there is data remaining after a + /// + bool DataRemaining { get; } + } + +} diff --git a/lib/Net.Messaging.FBM/src/Server/readme.md b/lib/Net.Messaging.FBM/src/Server/readme.md new file mode 100644 index 0000000..489e58f --- /dev/null +++ b/lib/Net.Messaging.FBM/src/Server/readme.md @@ -0,0 +1,35 @@ +# VNLib.Net.Messaging.FBM.Server + +Fixed Buffer Messaging Protocol server library. High performance statful messaging +protocol built on top of HTTP web-sockets. Low/no allocation, completely asynchronous +while providing a TPL API. This library provides a simple asynchronous request/response +architecture to web-sockets. This was initially designed to provide an alternative to +complete HTTP request/response overhead, but allow a simple control flow for work +across a network. + +Messages consist of a 4 byte message id, a collection of headers, and a message body. +The first 4 bytes of a message is the ID (for normal messages a signed integer greater than 0), +0 is reserved for error conditions, and negative numbers are reserved for internal +messages. Headers are identified by a single byte, followed by a variable length UTF8 +encoded character sequence, followed by a termination of 0xFF, 0xF1 (may change). + +### Message structure + 4 byte positive (signed 32-bit integer) message id + 2 byte termination + 1 byte header-id + variable length UTF8 value + 2 byte termination + -- other headers -- + 2 byte termination (extra termination, ie: empty header) + variable length payload + (end of message is the end of the payload) + + +XML Documentation is or will be provided for almost all public exports. APIs are intended to +be sensibly public and immutable to allow for easy extensability (via extension methods). I +often use extension libraries to provide additional functionality. (See cache library) + +This library is likely a niche use case, and is probably not for everyone. Unless you care +about reasonably efficient high frequency request/response messaging, this probably isnt +for you. This library provides a reasonable building block for distributed lock mechanisms +and small data caching. \ No newline at end of file diff --git a/lib/Net.Messaging.FBM/src/VNLib.Net.Messaging.FBM.csproj b/lib/Net.Messaging.FBM/src/VNLib.Net.Messaging.FBM.csproj new file mode 100644 index 0000000..d91fb0a --- /dev/null +++ b/lib/Net.Messaging.FBM/src/VNLib.Net.Messaging.FBM.csproj @@ -0,0 +1,33 @@ + + + + net6.0 + Vaughn Nugent + 1.0.1.1 + Copyright © 2022 Vaughn Nugent + enable + www.vaughnnugent.com/resources + latest-all + True + \\vaughnnugent.com\Internal\Folder Redirection\vman\Documents\Programming\Software\StrongNameingKey.snk + + + + + + all + runtime; build; native; contentfiles; analyzers; buildtransitive + + + all + runtime; build; native; contentfiles; analyzers; buildtransitive + + + + + + + + + + -- cgit