diff options
author | vnugent <public@vaughnnugent.com> | 2023-01-08 16:01:54 -0500 |
---|---|---|
committer | vnugent <public@vaughnnugent.com> | 2023-01-08 16:01:54 -0500 |
commit | de94d788e9a47432a7630a8215896b8dd3628599 (patch) | |
tree | 666dec06eef861d101cb6948aff52a3d354c8d73 /Net.Messaging.FBM/src | |
parent | be6dc557a3b819248b014992eb96c1cb21f8112b (diff) |
Reorder + analyzer cleanup
Diffstat (limited to 'Net.Messaging.FBM/src')
28 files changed, 0 insertions, 3537 deletions
diff --git a/Net.Messaging.FBM/src/Client/ClientExtensions.cs b/Net.Messaging.FBM/src/Client/ClientExtensions.cs deleted file mode 100644 index 102b6c9..0000000 --- a/Net.Messaging.FBM/src/Client/ClientExtensions.cs +++ /dev/null @@ -1,69 +0,0 @@ -/* -* Copyright (c) 2022 Vaughn Nugent -* -* Library: VNLib -* Package: VNLib.Net.Messaging.FBM -* File: ClientExtensions.cs -* -* ClientExtensions.cs is part of VNLib.Net.Messaging.FBM which is part of the larger -* VNLib collection of libraries and utilities. -* -* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License as -* published by the Free Software Foundation, either version 3 of the -* License, or (at your option) any later version. -* -* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see https://www.gnu.org/licenses/. -*/ - -using System; -using System.Runtime.CompilerServices; - -namespace VNLib.Net.Messaging.FBM.Client -{ - - public static class ClientExtensions - { - /// <summary> - /// Writes the location header of the requested resource - /// </summary> - /// <param name="request"></param> - /// <param name="location">The location address</param> - /// <exception cref="OutOfMemoryException"></exception> - public static void WriteLocation(this FBMRequest request, ReadOnlySpan<char> location) - { - request.WriteHeader(HeaderCommand.Location, location); - } - - /// <summary> - /// Writes the location header of the requested resource - /// </summary> - /// <param name="request"></param> - /// <param name="location">The location address</param> - /// <exception cref="OutOfMemoryException"></exception> - public static void WriteLocation(this FBMRequest request, Uri location) - { - request.WriteHeader(HeaderCommand.Location, location.ToString()); - } - - /// <summary> - /// If the <see cref="FBMResponse.IsSet"/> property is false, raises an <see cref="InvalidResponseException"/> - /// </summary> - /// <param name="response"></param> - /// <exception cref="InvalidResponseException"></exception> - [MethodImpl(MethodImplOptions.AggressiveInlining)] - public static void ThrowIfNotSet(this in FBMResponse response) - { - if (!response.IsSet) - { - throw new InvalidResponseException("The response state is undefined (no response received)"); - } - } - } -} diff --git a/Net.Messaging.FBM/src/Client/FBMClient.cs b/Net.Messaging.FBM/src/Client/FBMClient.cs deleted file mode 100644 index 5353087..0000000 --- a/Net.Messaging.FBM/src/Client/FBMClient.cs +++ /dev/null @@ -1,475 +0,0 @@ -/* -* Copyright (c) 2022 Vaughn Nugent -* -* Library: VNLib -* Package: VNLib.Net.Messaging.FBM -* File: FBMClient.cs -* -* FBMClient.cs is part of VNLib.Net.Messaging.FBM which is part of the larger -* VNLib collection of libraries and utilities. -* -* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License as -* published by the Free Software Foundation, either version 3 of the -* License, or (at your option) any later version. -* -* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see https://www.gnu.org/licenses/. -*/ - -using System; -using System.IO; -using System.Buffers; -using System.Threading; -using System.Net.WebSockets; -using System.Threading.Tasks; -using System.Collections.Generic; -using System.Collections.Concurrent; - -using VNLib.Net.Http; -using VNLib.Utils; -using VNLib.Utils.IO; -using VNLib.Utils.Logging; -using VNLib.Utils.Extensions; -using VNLib.Utils.Memory.Caching; - -namespace VNLib.Net.Messaging.FBM.Client -{ - /// <summary> - /// A Fixed Buffer Message Protocol client. Allows for high performance client-server messaging - /// with minimal memory overhead. - /// </summary> - public class FBMClient : VnDisposeable, IStatefulConnection, ICacheHolder - { - /// <summary> - /// The WS connection query arguments to specify a receive buffer size - /// </summary> - public const string REQ_RECV_BUF_QUERY_ARG = "b"; - /// <summary> - /// The WS connection query argument to suggest a maximum response header buffer size - /// </summary> - public const string REQ_HEAD_BUF_QUERY_ARG = "hb"; - /// <summary> - /// The WS connection query argument to suggest a maximum message size - /// </summary> - public const string REQ_MAX_MESS_QUERY_ARG = "mx"; - - /// <summary> - /// Raised when the websocket has been closed because an error occured. - /// You may inspect the event args to determine the cause of the error. - /// </summary> - public event EventHandler<FMBClientErrorEventArgs>? ConnectionClosedOnError; - /// <summary> - /// Raised when the client listener operaiton has completed as a normal closure - /// </summary> - public event EventHandler? ConnectionClosed; - - private readonly SemaphoreSlim SendLock; - private readonly ConcurrentDictionary<int, FBMRequest> ActiveRequests; - private readonly ReusableStore<FBMRequest> RequestRental; - private readonly FBMRequest _controlFrame; - /// <summary> - /// The configuration for the current client - /// </summary> - public FBMClientConfig Config { get; } - /// <summary> - /// A handle that is reset when a connection has been successfully set, and is set - /// when the connection exists - /// </summary> - public ManualResetEvent ConnectionStatusHandle { get; } - /// <summary> - /// The <see cref="ClientWebSocket"/> to send/recieve message on - /// </summary> - public ManagedClientWebSocket ClientSocket { get; } - /// <summary> - /// Gets the shared control frame for the current instance. The request is reset when - /// this property is called. (Not thread safe) - /// </summary> - protected FBMRequest ControlFrame - { - get - { - _controlFrame.Reset(); - return _controlFrame; - } - } - - /// <summary> - /// Creates a new <see cref="FBMClient"/> in a closed state - /// </summary> - /// <param name="config">The client configuration</param> - public FBMClient(FBMClientConfig config) - { - RequestRental = ObjectRental.CreateReusable(ReuseableRequestConstructor); - SendLock = new(1); - ConnectionStatusHandle = new(true); - ActiveRequests = new(Environment.ProcessorCount, 100); - - Config = config; - //Init control frame - _controlFrame = new (Helpers.CONTROL_FRAME_MID, in config); - //Init the new client socket - ClientSocket = new(config.RecvBufferSize, config.RecvBufferSize, config.KeepAliveInterval, config.SubProtocol); - } - - private void Debug(string format, params string[] strings) - { - if(Config.DebugLog != null) - { - Config.DebugLog.Debug($"[DEBUG] FBM Client: {format}", strings); - } - } - private void Debug(string format, long value, long other) - { - if (Config.DebugLog != null) - { - Config.DebugLog.Debug($"[DEBUG] FBM Client: {format}", value, other); - } - } - - /// <summary> - /// Allocates and configures a new <see cref="FBMRequest"/> message object for use within the reusable store - /// </summary> - /// <returns>The configured <see cref="FBMRequest"/></returns> - protected virtual FBMRequest ReuseableRequestConstructor() => new(Config); - - /// <summary> - /// Asynchronously opens a websocket connection with the specifed remote server - /// </summary> - /// <param name="address">The address of the server to connect to</param> - /// <param name="cancellation">A cancellation token</param> - /// <returns></returns> - public async Task ConnectAsync(Uri address, CancellationToken cancellation = default) - { - //Uribuilder to send config parameters to the server - UriBuilder urib = new(address); - urib.Query += - $"{REQ_RECV_BUF_QUERY_ARG}={Config.RecvBufferSize}" + - $"&{REQ_HEAD_BUF_QUERY_ARG}={Config.MaxHeaderBufferSize}" + - $"&{REQ_MAX_MESS_QUERY_ARG}={Config.MaxMessageSize}"; - Debug("Connection string {con}", urib.Uri.ToString()); - //Connect to server - await ClientSocket.ConnectAsync(urib.Uri, cancellation); - //Reset wait handle before return - ConnectionStatusHandle.Reset(); - //Begin listeing for requets in a background task - _ = Task.Run(ProcessContinuousRecvAsync, cancellation); - } - - /// <summary> - /// Rents a new <see cref="FBMRequest"/> from the internal <see cref="ReusableStore{T}"/>. - /// Use <see cref="ReturnRequest(FBMRequest)"/> when request is no longer in use - /// </summary> - /// <returns>The configured (rented or new) <see cref="FBMRequest"/> ready for use</returns> - public FBMRequest RentRequest() => RequestRental.Rent(); - /// <summary> - /// Stores (or returns) the reusable request in cache for use with <see cref="RentRequest"/> - /// </summary> - /// <param name="request">The request to return to the store</param> - /// <exception cref="InvalidOperationException"></exception> - public void ReturnRequest(FBMRequest request) => RequestRental.Return(request); - - /// <summary> - /// Sends a <see cref="FBMRequest"/> to the connected server - /// </summary> - /// <param name="request">The request message to send to the server</param> - /// <param name="cancellationToken"></param> - /// <returns>When awaited, yields the server response</returns> - /// <exception cref="ArgumentException"></exception> - /// <exception cref="ObjectDisposedException"></exception> - /// <exception cref="InvalidOperationException"></exception> - /// <exception cref="FBMInvalidRequestException"></exception> - public async Task<FBMResponse> SendAsync(FBMRequest request, CancellationToken cancellationToken = default) - { - Check(); - //Length of the request must contains at least 1 int and header byte - if (request.Length < 1 + sizeof(int)) - { - throw new FBMInvalidRequestException("Message is not initialized"); - } - //Store a null value in the request queue so the response can store a buffer - if (!ActiveRequests.TryAdd(request.MessageId, request)) - { - throw new ArgumentException("Message with the same ID is already being processed"); - } - try - { - Debug("Sending {bytes} with id {id}", request.RequestData.Length, request.MessageId); - - //Reset the wait handle - request.ResponseWaitEvent.Reset(); - - //Wait for send-lock - using (SemSlimReleaser releaser = await SendLock.GetReleaserAsync(cancellationToken)) - { - //Send the data to the server - await ClientSocket.SendAsync(request.RequestData, WebSocketMessageType.Binary, true, cancellationToken); - } - - //wait for the response to be set - await request.WaitForResponseAsync(cancellationToken); - - Debug("Received {size} bytes for message {id}", request.Response?.Length ?? 0, request.MessageId); - - return request.GetResponse(); - } - catch - { - //Remove the request since packet was never sent - ActiveRequests.Remove(request.MessageId, out _); - //Clear waiting flag - request.ResponseWaitEvent.Set(); - throw; - } - } - /// <summary> - /// Streams arbitrary binary data to the server with the initial request message - /// </summary> - /// <param name="request">The request message to send to the server</param> - /// <param name="payload">Data to stream to the server</param> - /// <param name="ct">The content type of the stream of data</param> - /// <param name="cancellationToken"></param> - /// <returns>When awaited, yields the server response</returns> - /// <exception cref="ArgumentException"></exception> - /// <exception cref="ObjectDisposedException"></exception> - /// <exception cref="InvalidOperationException"></exception> - public async Task StreamDataAsync(FBMRequest request, Stream payload, ContentType ct, CancellationToken cancellationToken = default) - { - Check(); - //Length of the request must contains at least 1 int and header byte - if(request.Length < 1 + sizeof(int)) - { - throw new FBMInvalidRequestException("Message is not initialized"); - } - //Store a null value in the request queue so the response can store a buffer - if (!ActiveRequests.TryAdd(request.MessageId, request)) - { - throw new ArgumentException("Message with the same ID is already being processed"); - } - try - { - Debug("Streaming {bytes} with id {id}", request.RequestData.Length, request.MessageId); - //Reset the wait handle - request.ResponseWaitEvent.Reset(); - //Write an empty body in the request - request.WriteBody(ReadOnlySpan<byte>.Empty, ct); - //Wait for send-lock - using (SemSlimReleaser releaser = await SendLock.GetReleaserAsync(cancellationToken)) - { - //Send the initial request packet - await ClientSocket.SendAsync(request.RequestData, WebSocketMessageType.Binary, false, cancellationToken); - //Calc buffer size - int bufSize = (int)Math.Clamp(payload.Length, Config.MessageBufferSize, Config.MaxMessageSize); - //Alloc a streaming buffer - using IMemoryOwner<byte> buffer = Config.BufferHeap.DirectAlloc<byte>(bufSize); - //Stream mesage body - do - { - //Read data - int read = await payload.ReadAsync(buffer.Memory, cancellationToken); - if (read == 0) - { - //No more data avialable - break; - } - //write message to socket, if the read data was smaller than the buffer, we can send the last packet - await ClientSocket.SendAsync(buffer.Memory[..read], WebSocketMessageType.Binary, read < bufSize, cancellationToken); - - } while (true); - } - //wait for the server to respond - await request.WaitForResponseAsync(cancellationToken); - - Debug("Response recieved {size} bytes for message {id}", request.Response?.Length ?? 0, request.MessageId); - } - catch - { - //Remove the request since packet was never sent or cancelled - ActiveRequests.Remove(request.MessageId, out _); - //Clear wait lock so the request state is reset - request.ResponseWaitEvent.Set(); - throw; - } - } - - /// <summary> - /// Begins listening for messages from the server on the internal socket (must be connected), - /// until the socket is closed, or canceled - /// </summary> - /// <returns></returns> - protected async Task ProcessContinuousRecvAsync() - { - Debug("Begining receive loop"); - //Alloc recv buffer - IMemoryOwner<byte> recvBuffer = Config.BufferHeap.DirectAlloc<byte>(Config.RecvBufferSize); - try - { - //Recv event loop - while (true) - { - //Listen for incoming packets with the intial data buffer - ValueWebSocketReceiveResult result = await ClientSocket.ReceiveAsync(recvBuffer.Memory, CancellationToken.None); - //If the message is a close message, its time to exit - if (result.MessageType == WebSocketMessageType.Close) - { - //Notify the event handler that the connection was closed - ConnectionClosed?.Invoke(this, EventArgs.Empty); - break; - } - if (result.Count <= 4) - { - Debug("Empty message recieved from server"); - continue; - } - //Alloc data buffer and write initial data - VnMemoryStream responseBuffer = new(Config.BufferHeap); - //Copy initial data - responseBuffer.Write(recvBuffer.Memory.Span[..result.Count]); - //Receive packets until the EOF is reached - while (!result.EndOfMessage) - { - //recive more data - result = await ClientSocket.ReceiveAsync(recvBuffer.Memory, CancellationToken.None); - //Make sure the buffer is not too large - if ((responseBuffer.Length + result.Count) > Config.MaxMessageSize) - { - //Dispose the buffer before exiting - responseBuffer.Dispose(); - Debug("Recieved a message that was too large, skipped"); - goto Skip; - } - //Copy continuous data - responseBuffer.Write(recvBuffer.Memory.Span[..result.Count]); - } - //Reset the buffer stream position - _ = responseBuffer.Seek(0, SeekOrigin.Begin); - ProcessResponse(responseBuffer); - //Goto skip statment to cleanup resources - Skip:; - } - } - catch (OperationCanceledException) - { - //Normal closeure, do nothing - } - catch (Exception ex) - { - //Error event args - FMBClientErrorEventArgs wsEventArgs = new() - { - Cause = ex, - ErrorClient = this - }; - //Invoke error handler - ConnectionClosedOnError?.Invoke(this, wsEventArgs); - } - finally - { - //Dispose the recv buffer - recvBuffer.Dispose(); - //Set all pending events - foreach (FBMRequest request in ActiveRequests.Values) - { - request.ResponseWaitEvent.Set(); - } - //Clear dict - ActiveRequests.Clear(); - //Cleanup the socket when exiting - ClientSocket.Cleanup(); - //Set status handle as unset - ConnectionStatusHandle.Set(); - //Invoke connection closed - ConnectionClosed?.Invoke(this, EventArgs.Empty); - } - Debug("Receive loop exited"); - } - - /// <summary> - /// Syncrhonously processes a buffered response packet - /// </summary> - /// <param name="responseMessage">The buffered response body recieved from the server</param> - /// <remarks>This method blocks the listening task. So operations should be quick</remarks> - protected virtual void ProcessResponse(VnMemoryStream responseMessage) - { - //read first response line - ReadOnlySpan<byte> line = Helpers.ReadLine(responseMessage); - //get the id of the message - int messageId = Helpers.GetMessageId(line); - //Finalze control frame - if(messageId == Helpers.CONTROL_FRAME_MID) - { - Debug("Control frame received"); - ProcessControlFrame(responseMessage); - return; - } - else if (messageId < 0) - { - //Cannot process request - responseMessage.Dispose(); - Debug("Invalid messageid"); - return; - } - //Search for the request that has the same id - if (ActiveRequests.TryRemove(messageId, out FBMRequest? request)) - { - //Set the new response message - request.SetResponse(responseMessage); - } - else - { - Debug("Message {id} was not found in the waiting message queue", messageId, 0); - - //Cleanup no request was waiting - responseMessage.Dispose(); - } - } - /// <summary> - /// Processes a control frame response from the server - /// </summary> - /// <param name="vms">The raw response packet from the server</param> - private void ProcessControlFrame(VnMemoryStream vms) - { - vms.Dispose(); - } - /// <summary> - /// Processes a control frame response from the server - /// </summary> - /// <param name="response">The parsed response-packet</param> - protected virtual void ProcessControlFrame(in FBMResponse response) - { - - } - - /// <summary> - /// Closes the underlying <see cref="WebSocket"/> and cancels all pending operations - /// </summary> - /// <param name="cancellationToken"></param> - /// <returns></returns> - /// <exception cref="ObjectDisposedException"></exception> - public async Task DisconnectAsync(CancellationToken cancellationToken = default) - { - Check(); - //Close the connection - await ClientSocket.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, "Closing", cancellationToken); - } - ///<inheritdoc/> - protected override void Free() - { - //Dispose socket - ClientSocket.Dispose(); - //Dispose client buffer - RequestRental.Dispose(); - SendLock.Dispose(); - ConnectionStatusHandle.Dispose(); - } - ///<inheritdoc/> - public void CacheClear() => RequestRental.CacheClear(); - ///<inheritdoc/> - public void CacheHardClear() => RequestRental.CacheHardClear(); - } -} diff --git a/Net.Messaging.FBM/src/Client/FBMClientConfig.cs b/Net.Messaging.FBM/src/Client/FBMClientConfig.cs deleted file mode 100644 index 229eb76..0000000 --- a/Net.Messaging.FBM/src/Client/FBMClientConfig.cs +++ /dev/null @@ -1,81 +0,0 @@ -/* -* Copyright (c) 2022 Vaughn Nugent -* -* Library: VNLib -* Package: VNLib.Net.Messaging.FBM -* File: FBMClientConfig.cs -* -* FBMClientConfig.cs is part of VNLib.Net.Messaging.FBM which is part of the larger -* VNLib collection of libraries and utilities. -* -* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License as -* published by the Free Software Foundation, either version 3 of the -* License, or (at your option) any later version. -* -* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see https://www.gnu.org/licenses/. -*/ - -using System; -using System.Text; - -using VNLib.Utils.Memory; -using VNLib.Utils.Logging; - -namespace VNLib.Net.Messaging.FBM.Client -{ - /// <summary> - /// A structure that defines readonly constants for the <see cref="FBMClient"/> to use - /// </summary> - public readonly struct FBMClientConfig - { - /// <summary> - /// The size (in bytes) of the internal buffer to use when receiving messages from the server - /// </summary> - public readonly int RecvBufferSize { get; init; } - /// <summary> - /// The size (in bytes) of the <see cref="FBMRequest"/> internal buffer size, when requests are rented from the client - /// </summary> - /// <remarks> - /// This is the entire size of the request buffer including headers and payload data, unless - /// data is streamed to the server - /// </remarks> - public readonly int MessageBufferSize { get; init; } - /// <summary> - /// The size (in chars) of the client/server message header buffer - /// </summary> - public readonly int MaxHeaderBufferSize { get; init; } - /// <summary> - /// The maximum size (in bytes) of messages sent or recieved from the server - /// </summary> - public readonly int MaxMessageSize { get; init; } - /// <summary> - /// The heap to allocate interal (and message) buffers from - /// </summary> - public readonly IUnmangedHeap BufferHeap { get; init; } - /// <summary> - /// The websocket keepalive interval to use (leaving this property default disables keepalives) - /// </summary> - public readonly TimeSpan KeepAliveInterval { get; init; } - /// <summary> - /// The websocket sub-protocol to use - /// </summary> - public readonly string? SubProtocol { get; init; } - /// <summary> - /// The encoding instance used to encode header values - /// </summary> - public readonly Encoding HeaderEncoding { get; init; } - - /// <summary> - /// An optional log provider to write debug logs to. If this propery is not null, - /// debugging information will be logged with the debug log-level - /// </summary> - public readonly ILogProvider? DebugLog { get; init; } - } -} diff --git a/Net.Messaging.FBM/src/Client/FBMClientWorkerBase.cs b/Net.Messaging.FBM/src/Client/FBMClientWorkerBase.cs deleted file mode 100644 index b4056dc..0000000 --- a/Net.Messaging.FBM/src/Client/FBMClientWorkerBase.cs +++ /dev/null @@ -1,125 +0,0 @@ -/* -* Copyright (c) 2022 Vaughn Nugent -* -* Library: VNLib -* Package: VNLib.Net.Messaging.FBM -* File: FBMClientWorkerBase.cs -* -* FBMClientWorkerBase.cs is part of VNLib.Net.Messaging.FBM which is part of the larger -* VNLib collection of libraries and utilities. -* -* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License as -* published by the Free Software Foundation, either version 3 of the -* License, or (at your option) any later version. -* -* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see https://www.gnu.org/licenses/. -*/ - -using System; -using System.Threading; -using System.Threading.Tasks; - -using VNLib.Utils; - -namespace VNLib.Net.Messaging.FBM.Client -{ - /// <summary> - /// A base class for objects that implement <see cref="FBMClient"/> - /// operations - /// </summary> - public abstract class FBMClientWorkerBase : VnDisposeable, IStatefulConnection - { - /// <summary> - /// Allows configuration of websocket configuration options - /// </summary> - public ManagedClientWebSocket SocketOptions => Client.ClientSocket; - -#nullable disable - /// <summary> - /// The <see cref="FBMClient"/> to sent requests from - /// </summary> - public FBMClient Client { get; private set; } - - /// <summary> - /// Raised when the client has connected successfully - /// </summary> - public event Action<FBMClient, FBMClientWorkerBase> Connected; -#nullable enable - - ///<inheritdoc/> - public event EventHandler ConnectionClosed - { - add => Client.ConnectionClosed += value; - remove => Client.ConnectionClosed -= value; - } - - /// <summary> - /// Creates and initializes a the internal <see cref="FBMClient"/> - /// </summary> - /// <param name="config">The client config</param> - protected void InitClient(in FBMClientConfig config) - { - Client = new(config); - Client.ConnectionClosedOnError += Client_ConnectionClosedOnError; - Client.ConnectionClosed += Client_ConnectionClosed; - } - - private void Client_ConnectionClosed(object? sender, EventArgs e) => OnDisconnected(); - private void Client_ConnectionClosedOnError(object? sender, FMBClientErrorEventArgs e) => OnError(e); - - /// <summary> - /// Asynchronously connects to a remote server by the specified uri - /// </summary> - /// <param name="serverUri">The remote uri of a server to connect to</param> - /// <param name="cancellationToken">A token to cancel the connect operation</param> - /// <returns>A task that compeltes when the client has connected to the remote server</returns> - public virtual async Task ConnectAsync(Uri serverUri, CancellationToken cancellationToken = default) - { - //Connect to server - await Client.ConnectAsync(serverUri, cancellationToken).ConfigureAwait(true); - //Invoke child on-connected event - OnConnected(); - Connected?.Invoke(Client, this); - } - - /// <summary> - /// Asynchronously disonnects a client only if the client is currently connected, - /// returns otherwise - /// </summary> - /// <param name="cancellationToken"></param> - /// <returns>A task that compeltes when the client has disconnected</returns> - public virtual Task DisconnectAsync(CancellationToken cancellationToken = default) - { - return Client.DisconnectAsync(cancellationToken); - } - - /// <summary> - /// Invoked when a client has successfully connected to the remote server - /// </summary> - protected abstract void OnConnected(); - /// <summary> - /// Invoked when the client has disconnected cleanly - /// </summary> - protected abstract void OnDisconnected(); - /// <summary> - /// Invoked when the connected client is closed because of a connection error - /// </summary> - /// <param name="e">A <see cref="EventArgs"/> that contains the client error data</param> - protected abstract void OnError(FMBClientErrorEventArgs e); - - ///<inheritdoc/> - protected override void Free() - { - Client.ConnectionClosedOnError -= Client_ConnectionClosedOnError; - Client.ConnectionClosed -= Client_ConnectionClosed; - Client.Dispose(); - } - } -} diff --git a/Net.Messaging.FBM/src/Client/FBMRequest.cs b/Net.Messaging.FBM/src/Client/FBMRequest.cs deleted file mode 100644 index 9d8af42..0000000 --- a/Net.Messaging.FBM/src/Client/FBMRequest.cs +++ /dev/null @@ -1,302 +0,0 @@ -/* -* Copyright (c) 2022 Vaughn Nugent -* -* Library: VNLib -* Package: VNLib.Net.Messaging.FBM -* File: FBMRequest.cs -* -* FBMRequest.cs is part of VNLib.Net.Messaging.FBM which is part of the larger -* VNLib collection of libraries and utilities. -* -* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License as -* published by the Free Software Foundation, either version 3 of the -* License, or (at your option) any later version. -* -* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see https://www.gnu.org/licenses/. -*/ - -using System; -using System.Text; -using System.Buffers; -using System.Threading; -using System.Threading.Tasks; -using System.Collections.Generic; - -using VNLib.Net.Http; -using VNLib.Utils; -using VNLib.Utils.IO; -using VNLib.Utils.Memory; -using VNLib.Utils.Extensions; -using VNLib.Utils.Memory.Caching; - -namespace VNLib.Net.Messaging.FBM.Client -{ - /// <summary> - /// A reusable Fixed Buffer Message request container. This class is not thread-safe - /// </summary> - public sealed class FBMRequest : VnDisposeable, IReusable, IFBMMessage, IStringSerializeable - { - private class BufferWriter : IBufferWriter<byte> - { - private readonly FBMRequest _request; - - public BufferWriter(FBMRequest request) - { - _request = request; - } - - public void Advance(int count) - { - _request.Position += count; - } - - public Memory<byte> GetMemory(int sizeHint = 0) - { - return sizeHint > 0 ? _request.RemainingBuffer[0..sizeHint] : _request.RemainingBuffer; - } - - public Span<byte> GetSpan(int sizeHint = 0) - { - return sizeHint > 0 ? _request.RemainingBuffer.Span[0..sizeHint] : _request.RemainingBuffer.Span; - } - } - - private readonly IMemoryOwner<byte> HeapBuffer; - - - private readonly BufferWriter _writer; - private int Position; - - private readonly Encoding HeaderEncoding; - private readonly int ResponseHeaderBufferSize; - private readonly List<KeyValuePair<HeaderCommand, ReadOnlyMemory<char>>> ResponseHeaderList = new(); - private char[]? ResponseHeaderBuffer; - - /// <summary> - /// The size (in bytes) of the request message - /// </summary> - public int Length => Position; - private Memory<byte> RemainingBuffer => HeapBuffer.Memory[Position..]; - - /// <summary> - /// The id of the current request message - /// </summary> - public int MessageId { get; } - /// <summary> - /// The request message packet - /// </summary> - public ReadOnlyMemory<byte> RequestData => HeapBuffer.Memory[..Position]; - /// <summary> - /// An <see cref="ManualResetEvent"/> to signal request/response - /// event completion - /// </summary> - internal ManualResetEvent ResponseWaitEvent { get; } - - internal VnMemoryStream? Response { get; private set; } - /// <summary> - /// Initializes a new <see cref="FBMRequest"/> with the sepcified message buffer size, - /// and a random messageid - /// </summary> - /// <param name="config">The fbm client config storing required config variables</param> - public FBMRequest(in FBMClientConfig config) : this(Helpers.RandomMessageId, in config) - { } - /// <summary> - /// Initializes a new <see cref="FBMRequest"/> with the sepcified message buffer size and a custom MessageId - /// </summary> - /// <param name="messageId">The custom message id</param> - /// <param name="config">The fbm client config storing required config variables</param> - public FBMRequest(int messageId, in FBMClientConfig config) - { - //Setup response wait handle but make sure the contuation runs async - ResponseWaitEvent = new(true); - - //Alloc the buffer as a memory owner so a memory buffer can be used - HeapBuffer = config.BufferHeap.DirectAlloc<byte>(config.MessageBufferSize); - - MessageId = messageId; - - HeaderEncoding = config.HeaderEncoding; - ResponseHeaderBufferSize = config.MaxHeaderBufferSize; - - WriteMessageId(); - _writer = new(this); - } - - /// <summary> - /// Resets the internal buffer and writes the message-id header to the begining - /// of the buffer - /// </summary> - private void WriteMessageId() - { - //Get writer over buffer - ForwardOnlyWriter<byte> buffer = new(HeapBuffer.Memory.Span); - //write messageid header to the buffer - buffer.Append((byte)HeaderCommand.MessageId); - buffer.Append(MessageId); - buffer.WriteTermination(); - //Store intial position - Position = buffer.Written; - } - - ///<inheritdoc/> - public void WriteHeader(HeaderCommand header, ReadOnlySpan<char> value) => WriteHeader((byte)header, value); - ///<inheritdoc/> - public void WriteHeader(byte header, ReadOnlySpan<char> value) - { - ForwardOnlyWriter<byte> buffer = new(RemainingBuffer.Span); - buffer.WriteHeader(header, value, Helpers.DefaultEncoding); - //Update position - Position += buffer.Written; - } - ///<inheritdoc/> - public void WriteBody(ReadOnlySpan<byte> body, ContentType contentType = ContentType.Binary) - { - //Write content type header - WriteHeader(HeaderCommand.ContentType, HttpHelpers.GetContentTypeString(contentType)); - //Get writer over buffer - ForwardOnlyWriter<byte> buffer = new(RemainingBuffer.Span); - //Now safe to write body - buffer.WriteBody(body); - //Update position - Position += buffer.Written; - } - /// <summary> - /// Returns buffer writer for writing the body data to the internal message buffer - /// </summary> - /// <returns>A <see cref="BufferWriter"/> to write message body to</returns> - public IBufferWriter<byte> GetBodyWriter() - { - //Write body termination - Helpers.Termination.CopyTo(RemainingBuffer); - Position += Helpers.Termination.Length; - //Return buffer writer - return _writer; - } - - /// <summary> - /// Resets the internal buffer and allows for writing a new message with - /// the same message-id - /// </summary> - public void Reset() - { - //Re-writing the message-id will reset the buffer - WriteMessageId(); - } - - internal void SetResponse(VnMemoryStream? vms) - { - Response = vms; - ResponseWaitEvent.Set(); - } - - internal Task WaitForResponseAsync(CancellationToken token) - { - return ResponseWaitEvent.WaitAsync().WaitAsync(token); - } - - ///<inheritdoc/> - protected override void Free() - { - HeapBuffer.Dispose(); - ResponseWaitEvent.Dispose(); - OnResponseDisposed(); - } - void IReusable.Prepare() => Reset(); - bool IReusable.Release() - { - //Clear old response data if error occured - Response?.Dispose(); - Response = null; - - return true; - } - - /// <summary> - /// Gets the response of the sent message - /// </summary> - /// <returns>The response message for the current request</returns> - internal FBMResponse GetResponse() - { - if (Response != null) - { - /* - * NOTICE - * - * The FBM Client will position the response stream to the start - * of the header section (missing the message-id header) - * - * The message id belongs to this request so it cannot be mismatched - * - * The headers are read into a list of key-value pairs and the stream - * is positioned to the start of the message body - */ - - - //Alloc rseponse buffer - ResponseHeaderBuffer ??= ArrayPool<char>.Shared.Rent(ResponseHeaderBufferSize); - - //Parse message headers - HeaderParseError statusFlags = Helpers.ParseHeaders(Response, ResponseHeaderBuffer, ResponseHeaderList, HeaderEncoding); - - //return response structure - return new(Response, statusFlags, ResponseHeaderList, OnResponseDisposed); - } - else - { - return new(); - } - } - - //Called when a response message is disposed to cleanup resources held by the response - private void OnResponseDisposed() - { - //Clear response header list - ResponseHeaderList.Clear(); - - //Clear old response - Response?.Dispose(); - Response = null; - - if (ResponseHeaderBuffer != null) - { - //Free response buffer - ArrayPool<char>.Shared.Return(ResponseHeaderBuffer!); - ResponseHeaderBuffer = null; - } - } - - ///<inheritdoc/> - public string Compile() - { - int charSize = Helpers.DefaultEncoding.GetCharCount(RequestData.Span); - using UnsafeMemoryHandle<char> buffer = Memory.UnsafeAlloc<char>(charSize + 128); - ERRNO count = Compile(buffer.Span); - return buffer.AsSpan(0, count).ToString(); - } - ///<inheritdoc/> - public void Compile(ref ForwardOnlyWriter<char> writer) - { - writer.Append("Message ID:"); - writer.Append(MessageId); - writer.Append(Environment.NewLine); - Helpers.DefaultEncoding.GetChars(RequestData.Span, ref writer); - } - ///<inheritdoc/> - public ERRNO Compile(in Span<char> buffer) - { - ForwardOnlyWriter<char> writer = new(buffer); - Compile(ref writer); - return writer.Written; - } - ///<inheritdoc/> - public override string ToString() => Compile(); - - } -} diff --git a/Net.Messaging.FBM/src/Client/FBMResponse.cs b/Net.Messaging.FBM/src/Client/FBMResponse.cs deleted file mode 100644 index da36956..0000000 --- a/Net.Messaging.FBM/src/Client/FBMResponse.cs +++ /dev/null @@ -1,106 +0,0 @@ -/* -* Copyright (c) 2022 Vaughn Nugent -* -* Library: VNLib -* Package: VNLib.Net.Messaging.FBM -* File: FBMResponse.cs -* -* FBMResponse.cs is part of VNLib.Net.Messaging.FBM which is part of the larger -* VNLib collection of libraries and utilities. -* -* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License as -* published by the Free Software Foundation, either version 3 of the -* License, or (at your option) any later version. -* -* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see https://www.gnu.org/licenses/. -*/ - -using System; -using System.Collections.Generic; - -using VNLib.Utils.IO; - -namespace VNLib.Net.Messaging.FBM.Client -{ - /// <summary> - /// A Fixed Buffer Message client response linked to the request that generated it. - /// Once the request is disposed or returned this message state is invalid - /// </summary> - public readonly struct FBMResponse : IDisposable, IEquatable<FBMResponse> - { - private readonly Action? _onDispose; - - /// <summary> - /// True when a response body was recieved and properly parsed - /// </summary> - public readonly bool IsSet { get; } - /// <summary> - /// The raw response message packet - /// </summary> - public readonly VnMemoryStream? MessagePacket { get; } - /// <summary> - /// A collection of response message headers - /// </summary> - public readonly IReadOnlyList<KeyValuePair<HeaderCommand, ReadOnlyMemory<char>>> Headers { get; } - /// <summary> - /// Status flags of the message parse operation - /// </summary> - public readonly HeaderParseError StatusFlags { get; } - /// <summary> - /// The body segment of the response message - /// </summary> - public readonly ReadOnlySpan<byte> ResponseBody => IsSet ? Helpers.GetRemainingData(MessagePacket!) : ReadOnlySpan<byte>.Empty; - - /// <summary> - /// Initailzies a response message structure and parses response - /// packet structure - /// </summary> - /// <param name="vms">The message buffer (message packet)</param> - /// <param name="status">The size of the buffer to alloc for header value storage</param> - /// <param name="headerList">The collection of headerse</param> - /// <param name="onDispose">A method that will be invoked when the message response body is disposed</param> - public FBMResponse(VnMemoryStream? vms, HeaderParseError status, IReadOnlyList<KeyValuePair<HeaderCommand, ReadOnlyMemory<char>>> headerList, Action onDispose) - { - MessagePacket = vms; - StatusFlags = status; - Headers = headerList; - IsSet = true; - _onDispose = onDispose; - } - - /// <summary> - /// Creates an unset response structure - /// </summary> - public FBMResponse() - { - MessagePacket = null; - StatusFlags = HeaderParseError.InvalidHeaderRead; - Headers = Array.Empty<KeyValuePair<HeaderCommand, ReadOnlyMemory<char>>>(); - IsSet = false; - _onDispose = null; - } - - /// <summary> - /// Releases any resources associated with the response message - /// </summary> - public void Dispose() => _onDispose?.Invoke(); - ///<inheritdoc/> - public override bool Equals(object? obj) => obj is FBMResponse response && Equals(response); - ///<inheritdoc/> - public override int GetHashCode() => IsSet ? MessagePacket!.GetHashCode() : 0; - ///<inheritdoc/> - public static bool operator ==(FBMResponse left, FBMResponse right) => left.Equals(right); - ///<inheritdoc/> - public static bool operator !=(FBMResponse left, FBMResponse right) => !(left == right); - ///<inheritdoc/> - public bool Equals(FBMResponse other) => IsSet && other.IsSet && MessagePacket == other.MessagePacket; - - } -} diff --git a/Net.Messaging.FBM/src/Client/FMBClientErrorEventArgs.cs b/Net.Messaging.FBM/src/Client/FMBClientErrorEventArgs.cs deleted file mode 100644 index 96e9414..0000000 --- a/Net.Messaging.FBM/src/Client/FMBClientErrorEventArgs.cs +++ /dev/null @@ -1,46 +0,0 @@ -/* -* Copyright (c) 2022 Vaughn Nugent -* -* Library: VNLib -* Package: VNLib.Net.Messaging.FBM -* File: FMBClientErrorEventArgs.cs -* -* FMBClientErrorEventArgs.cs is part of VNLib.Net.Messaging.FBM which is part of the larger -* VNLib collection of libraries and utilities. -* -* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License as -* published by the Free Software Foundation, either version 3 of the -* License, or (at your option) any later version. -* -* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see https://www.gnu.org/licenses/. -*/ - -using System; - -#nullable disable - -namespace VNLib.Net.Messaging.FBM.Client -{ - /// <summary> - /// <see cref="EventArgs"/> that is raised when an error occurs - /// in the background listener loop - /// </summary> - public class FMBClientErrorEventArgs : EventArgs - { - /// <summary> - /// The client that the exception was raised from - /// </summary> - public FBMClient ErrorClient { get; init; } - /// <summary> - /// The exception that was raised - /// </summary> - public Exception Cause { get; init; } - } -} diff --git a/Net.Messaging.FBM/src/Client/HeaderCommand.cs b/Net.Messaging.FBM/src/Client/HeaderCommand.cs deleted file mode 100644 index 5a57d85..0000000 --- a/Net.Messaging.FBM/src/Client/HeaderCommand.cs +++ /dev/null @@ -1,57 +0,0 @@ -/* -* Copyright (c) 2022 Vaughn Nugent -* -* Library: VNLib -* Package: VNLib.Net.Messaging.FBM -* File: HeaderCommand.cs -* -* HeaderCommand.cs is part of VNLib.Net.Messaging.FBM which is part of the larger -* VNLib collection of libraries and utilities. -* -* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License as -* published by the Free Software Foundation, either version 3 of the -* License, or (at your option) any later version. -* -* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see https://www.gnu.org/licenses/. -*/ - -namespace VNLib.Net.Messaging.FBM -{ - /// <summary> - /// A Fixed Buffer Message header command value - /// </summary> - public enum HeaderCommand : byte - { - /// <summary> - /// Default, do not use - /// </summary> - NOT_USED, - /// <summary> - /// Specifies the header for a message-id - /// </summary> - MessageId, - /// <summary> - /// Specifies a resource location - /// </summary> - Location, - /// <summary> - /// Specifies a standard MIME content type header - /// </summary> - ContentType, - /// <summary> - /// Specifies an action on a request - /// </summary> - Action, - /// <summary> - /// Specifies a status header - /// </summary> - Status - } -} diff --git a/Net.Messaging.FBM/src/Client/HeaderParseStatus.cs b/Net.Messaging.FBM/src/Client/HeaderParseStatus.cs deleted file mode 100644 index d38df26..0000000 --- a/Net.Messaging.FBM/src/Client/HeaderParseStatus.cs +++ /dev/null @@ -1,40 +0,0 @@ -/* -* Copyright (c) 2022 Vaughn Nugent -* -* Library: VNLib -* Package: VNLib.Net.Messaging.FBM -* File: HeaderParseStatus.cs -* -* HeaderParseStatus.cs is part of VNLib.Net.Messaging.FBM which is part of the larger -* VNLib collection of libraries and utilities. -* -* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License as -* published by the Free Software Foundation, either version 3 of the -* License, or (at your option) any later version. -* -* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see https://www.gnu.org/licenses/. -*/ - -using System; - -namespace VNLib.Net.Messaging.FBM -{ - /// <summary> - /// Specifies the results of a response parsing operation - /// </summary> - [Flags] - public enum HeaderParseError - { - None = 0, - InvalidId = 1, - HeaderOutOfMem = 2, - InvalidHeaderRead = 4 - } -} diff --git a/Net.Messaging.FBM/src/Client/Helpers.cs b/Net.Messaging.FBM/src/Client/Helpers.cs deleted file mode 100644 index 8f895fa..0000000 --- a/Net.Messaging.FBM/src/Client/Helpers.cs +++ /dev/null @@ -1,272 +0,0 @@ -/* -* Copyright (c) 2022 Vaughn Nugent -* -* Library: VNLib -* Package: VNLib.Net.Messaging.FBM -* File: Helpers.cs -* -* Helpers.cs is part of VNLib.Net.Messaging.FBM which is part of the larger -* VNLib collection of libraries and utilities. -* -* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License as -* published by the Free Software Foundation, either version 3 of the -* License, or (at your option) any later version. -* -* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see https://www.gnu.org/licenses/. -*/ - -using System; -using System.IO; -using System.Text; -using System.Collections.Generic; -using System.Security.Cryptography; - -using VNLib.Utils; -using VNLib.Utils.IO; -using VNLib.Utils.Memory; -using VNLib.Utils.Extensions; - - -namespace VNLib.Net.Messaging.FBM -{ - /// <summary> - /// Contains FBM library helper methods - /// </summary> - public static class Helpers - { - /// <summary> - /// The message-id of a connection control frame / out of band message - /// </summary> - public const int CONTROL_FRAME_MID = -500; - - public static readonly Encoding DefaultEncoding = Encoding.UTF8; - public static readonly ReadOnlyMemory<byte> Termination = new byte[] { 0xFF, 0xF1 }; - - /// <summary> - /// Parses the header line for a message-id - /// </summary> - /// <param name="line">A sequence of bytes that make up a header line</param> - /// <returns>The message-id if parsed, -1 if message-id is not valid</returns> - public static int GetMessageId(ReadOnlySpan<byte> line) - { - //Make sure the message line is large enough to contain a message-id - if (line.Length < 1 + sizeof(int)) - { - return -1; - } - //The first byte should be the header id - HeaderCommand headerId = (HeaderCommand)line[0]; - //Make sure the headerid is set - if (headerId != HeaderCommand.MessageId) - { - return -2; - } - //Get the messageid after the header byte - ReadOnlySpan<byte> messageIdSegment = line.Slice(1, sizeof(int)); - //get the messageid from the messageid segment - return BitConverter.ToInt32(messageIdSegment); - } - - /// <summary> - /// Alloctes a random integer to use as a message id - /// </summary> - public static int RandomMessageId => RandomNumberGenerator.GetInt32(1, int.MaxValue); - /// <summary> - /// Gets the remaining data after the current position of the stream. - /// </summary> - /// <param name="response">The stream to segment</param> - /// <returns>The remaining data segment</returns> - public static ReadOnlySpan<byte> GetRemainingData(VnMemoryStream response) - { - return response.AsSpan()[(int)response.Position..]; - } - - /// <summary> - /// Reads the next available line from the response message - /// </summary> - /// <param name="response"></param> - /// <returns>The read line</returns> - public static ReadOnlySpan<byte> ReadLine(VnMemoryStream response) - { - //Get the span from the current stream position to end of the stream - ReadOnlySpan<byte> line = GetRemainingData(response); - //Search for next line termination - int index = line.IndexOf(Termination.Span); - if (index == -1) - { - return ReadOnlySpan<byte>.Empty; - } - //Update stream position to end of termination - response.Seek(index + Termination.Length, SeekOrigin.Current); - //slice up line and exclude the termination - return line[..index]; - } - /// <summary> - /// Parses headers from the request stream, stores headers from the buffer into the - /// header collection - /// </summary> - /// <param name="vms">The FBM packet buffer</param> - /// <param name="buffer">The header character buffer to write headers to</param> - /// <param name="headers">The collection to store headers in</param> - /// <param name="encoding">The encoding type used to deocde header values</param> - /// <returns>The results of the parse operation</returns> - public static HeaderParseError ParseHeaders(VnMemoryStream vms, char[] buffer, ICollection<KeyValuePair<HeaderCommand, ReadOnlyMemory<char>>> headers, Encoding encoding) - { - HeaderParseError status = HeaderParseError.None; - //sliding window - Memory<char> currentWindow = buffer; - //Accumulate headers - while (true) - { - //Read the next line from the current stream - ReadOnlySpan<byte> line = ReadLine(vms); - if (line.IsEmpty) - { - //Done reading headers - break; - } - HeaderCommand cmd = GetHeaderCommand(line); - //Get header value - ERRNO charsRead = GetHeaderValue(line, currentWindow.Span, encoding); - if (charsRead < 0) - { - //Out of buffer space - status |= HeaderParseError.HeaderOutOfMem; - break; - } - else if (!charsRead) - { - //Invalid header - status |= HeaderParseError.InvalidHeaderRead; - } - else - { - //Store header as a read-only sequence - headers.Add(new(cmd, currentWindow[..(int)charsRead])); - //Shift buffer window - currentWindow = currentWindow[(int)charsRead..]; - } - } - return status; - } - - /// <summary> - /// Gets a <see cref="HeaderCommand"/> enum from the first byte of the message - /// </summary> - /// <param name="line"></param> - /// <returns>The <see cref="HeaderCommand"/> enum value from hte first byte of the message</returns> - public static HeaderCommand GetHeaderCommand(ReadOnlySpan<byte> line) - { - return (HeaderCommand)line[0]; - } - /// <summary> - /// Gets the value of the header following the colon bytes in the specifed - /// data message data line - /// </summary> - /// <param name="line">The message header line to get the value of</param> - /// <param name="output">The output character buffer to write characters to</param> - /// <param name="encoding">The encoding to decode the specified data with</param> - /// <returns>The number of characters encoded</returns> - public static ERRNO GetHeaderValue(ReadOnlySpan<byte> line, Span<char> output, Encoding encoding) - { - //Get the data following the header byte - ReadOnlySpan<byte> value = line[1..]; - //Calculate the character account - int charCount = encoding.GetCharCount(value); - //Determine if the output buffer is large enough - if (charCount > output.Length) - { - return -1; - } - //Decode the characters and return the char count - _ = encoding.GetChars(value, output); - return charCount; - } - - /// <summary> - /// Appends an arbitrary header to the current request buffer - /// </summary> - /// <param name="buffer"></param> - /// <param name="header">The <see cref="HeaderCommand"/> of the header</param> - /// <param name="value">The value of the header</param> - /// <param name="encoding">Encoding to use when writing character message</param> - /// <exception cref="OutOfMemoryException"></exception> - public static void WriteHeader(ref this ForwardOnlyWriter<byte> buffer, byte header, ReadOnlySpan<char> value, Encoding encoding) - { - //get char count - int byteCount = encoding.GetByteCount(value); - //make sure there is enough room in the buffer - if (buffer.RemainingSize < byteCount) - { - throw new OutOfMemoryException(); - } - //Write header command enum value - buffer.Append(header); - //Convert the characters to binary and write to the buffer - encoding.GetBytes(value, ref buffer); - //Write termination (0) - buffer.WriteTermination(); - } - - /// <summary> - /// Ends the header section of the request and appends the message body to - /// the end of the request - /// </summary> - /// <param name="buffer"></param> - /// <param name="body">The message body to send with request</param> - /// <exception cref="OutOfMemoryException"></exception> - public static void WriteBody(ref this ForwardOnlyWriter<byte> buffer, ReadOnlySpan<byte> body) - { - //start with termination - buffer.WriteTermination(); - //Write the body - buffer.Append(body); - } - /// <summary> - /// Writes a line termination to the message buffer - /// </summary> - /// <param name="buffer"></param> - public static void WriteTermination(ref this ForwardOnlyWriter<byte> buffer) - { - //write termination - buffer.Append(Termination.Span); - } - - /// <summary> - /// Writes a line termination to the message buffer - /// </summary> - /// <param name="buffer"></param> - public static void WriteTermination(this IDataAccumulator<byte> buffer) - { - //write termination - buffer.Append(Termination.Span); - } - - /// <summary> - /// Appends an arbitrary header to the current request buffer - /// </summary> - /// <param name="buffer"></param> - /// <param name="header">The <see cref="HeaderCommand"/> of the header</param> - /// <param name="value">The value of the header</param> - /// <param name="encoding">Encoding to use when writing character message</param> - /// <exception cref="ArgumentException"></exception> - public static void WriteHeader(this IDataAccumulator<byte> buffer, byte header, ReadOnlySpan<char> value, Encoding encoding) - { - //Write header command enum value - buffer.Append(header); - //Convert the characters to binary and write to the buffer - int written = encoding.GetBytes(value, buffer.Remaining); - //Advance the buffer - buffer.Advance(written); - //Write termination (0) - buffer.WriteTermination(); - } - } -} diff --git a/Net.Messaging.FBM/src/Client/IFBMMessage.cs b/Net.Messaging.FBM/src/Client/IFBMMessage.cs deleted file mode 100644 index 18f19ec..0000000 --- a/Net.Messaging.FBM/src/Client/IFBMMessage.cs +++ /dev/null @@ -1,62 +0,0 @@ -/* -* Copyright (c) 2022 Vaughn Nugent -* -* Library: VNLib -* Package: VNLib.Net.Messaging.FBM -* File: IFBMMessage.cs -* -* IFBMMessage.cs is part of VNLib.Net.Messaging.FBM which is part of the larger -* VNLib collection of libraries and utilities. -* -* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License as -* published by the Free Software Foundation, either version 3 of the -* License, or (at your option) any later version. -* -* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see https://www.gnu.org/licenses/. -*/ - -using System; - -using VNLib.Net.Http; - -namespace VNLib.Net.Messaging.FBM.Client -{ - /// <summary> - /// Represents basic Fixed Buffer Message protocol operations - /// </summary> - public interface IFBMMessage - { - /// <summary> - /// The unique id of the message (nonzero) - /// </summary> - int MessageId { get; } - /// <summary> - /// Writes a data body to the message of the specified content type - /// </summary> - /// <param name="body">The body of the message to copy</param> - /// <param name="contentType">The content type of the message body</param> - /// <exception cref="OutOfMemoryException"></exception> - void WriteBody(ReadOnlySpan<byte> body, ContentType contentType = ContentType.Binary); - /// <summary> - /// Appends an arbitrary header to the current request buffer - /// </summary> - /// <param name="header">The header id</param> - /// <param name="value">The value of the header</param> - /// <exception cref="OutOfMemoryException"></exception> - void WriteHeader(byte header, ReadOnlySpan<char> value); - /// <summary> - /// Appends an arbitrary header to the current request buffer - /// </summary> - /// <param name="header">The <see cref="HeaderCommand"/> of the header</param> - /// <param name="value">The value of the header</param> - /// <exception cref="OutOfMemoryException"></exception> - void WriteHeader(HeaderCommand header, ReadOnlySpan<char> value); - } -}
\ No newline at end of file diff --git a/Net.Messaging.FBM/src/Client/IStatefulConnection.cs b/Net.Messaging.FBM/src/Client/IStatefulConnection.cs deleted file mode 100644 index 3b9dd3b..0000000 --- a/Net.Messaging.FBM/src/Client/IStatefulConnection.cs +++ /dev/null @@ -1,54 +0,0 @@ -/* -* Copyright (c) 2022 Vaughn Nugent -* -* Library: VNLib -* Package: VNLib.Net.Messaging.FBM -* File: IStatefulConnection.cs -* -* IStatefulConnection.cs is part of VNLib.Net.Messaging.FBM which is part of the larger -* VNLib collection of libraries and utilities. -* -* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License as -* published by the Free Software Foundation, either version 3 of the -* License, or (at your option) any later version. -* -* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see https://www.gnu.org/licenses/. -*/ - -using System; -using System.Threading; -using System.Threading.Tasks; - -namespace VNLib.Net.Messaging.FBM.Client -{ - /// <summary> - /// An abstraction for a stateful connection client that reports its status - /// </summary> - public interface IStatefulConnection - { - /// <summary> - /// An event that is raised when the connection state has transition from connected to disconnected - /// </summary> - event EventHandler ConnectionClosed; - /// <summary> - /// Connects the client to the remote resource - /// </summary> - /// <param name="serverUri">The resource location to connect to</param> - /// <param name="cancellationToken">A token to cancel the connect opreation</param> - /// <returns>A task that compeltes when the connection has succedded</returns> - Task ConnectAsync(Uri serverUri, CancellationToken cancellationToken = default); - /// <summary> - /// Gracefully disconnects the client from the remote resource - /// </summary> - /// <param name="cancellationToken">A token to cancel the disconnect operation</param> - /// <returns>A task that completes when the client has been disconnected</returns> - Task DisconnectAsync(CancellationToken cancellationToken = default); - } -} diff --git a/Net.Messaging.FBM/src/Client/ManagedClientWebSocket.cs b/Net.Messaging.FBM/src/Client/ManagedClientWebSocket.cs deleted file mode 100644 index acac369..0000000 --- a/Net.Messaging.FBM/src/Client/ManagedClientWebSocket.cs +++ /dev/null @@ -1,201 +0,0 @@ -/* -* Copyright (c) 2022 Vaughn Nugent -* -* Library: VNLib -* Package: VNLib.Net.Messaging.FBM -* File: ManagedClientWebSocket.cs -* -* ManagedClientWebSocket.cs is part of VNLib.Net.Messaging.FBM which is part of the larger -* VNLib collection of libraries and utilities. -* -* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License as -* published by the Free Software Foundation, either version 3 of the -* License, or (at your option) any later version. -* -* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see https://www.gnu.org/licenses/. -*/ - -using System; -using System.Net; -using System.Threading; -using System.Net.Security; -using System.Net.WebSockets; -using System.Threading.Tasks; -using System.Security.Cryptography.X509Certificates; - -using VNLib.Utils.Memory; - -#nullable enable - -namespace VNLib.Net.Messaging.FBM.Client -{ - - /// <summary> - /// A wrapper container to manage client websockets - /// </summary> - public class ManagedClientWebSocket : WebSocket - { - private readonly int TxBufferSize; - private readonly int RxBufferSize; - private readonly TimeSpan KeepAliveInterval; - private readonly VnTempBuffer<byte> _dataBuffer; - private readonly string? _subProtocol; - - /// <summary> - /// A collection of headers to add to the client - /// </summary> - public WebHeaderCollection Headers { get; } - public X509CertificateCollection Certificates { get; } - public IWebProxy? Proxy { get; set; } - public CookieContainer? Cookies { get; set; } - public RemoteCertificateValidationCallback? RemoteCertificateValidationCallback { get; set; } - - - private ClientWebSocket? _socket; - - /// <summary> - /// Initiaizes a new <see cref="ManagedClientWebSocket"/> that accepts an optional sub-protocol for connections - /// </summary> - /// <param name="txBufferSize">The size (in bytes) of the send buffer size</param> - /// <param name="rxBufferSize">The size (in bytes) of the receive buffer size to use</param> - /// <param name="keepAlive">The WS keepalive interval</param> - /// <param name="subProtocol">The optional sub-protocol to use</param> - public ManagedClientWebSocket(int txBufferSize, int rxBufferSize, TimeSpan keepAlive, string? subProtocol) - { - //Init header collection - Headers = new(); - Certificates = new(); - //Alloc buffer - _dataBuffer = new(rxBufferSize); - TxBufferSize = txBufferSize; - RxBufferSize = rxBufferSize; - KeepAliveInterval = keepAlive; - _subProtocol = subProtocol; - } - - /// <summary> - /// Asyncrhonously prepares a new client web-socket and connects to the remote endpoint - /// </summary> - /// <param name="serverUri">The endpoint to connect to</param> - /// <param name="token">A token to cancel the connect operation</param> - /// <returns>A task that compeltes when the client has connected</returns> - public async Task ConnectAsync(Uri serverUri, CancellationToken token) - { - //Init new socket - _socket = new(); - try - { - //Set buffer - _socket.Options.SetBuffer(RxBufferSize, TxBufferSize, _dataBuffer); - //Set remaining stored options - _socket.Options.ClientCertificates = Certificates; - _socket.Options.KeepAliveInterval = KeepAliveInterval; - _socket.Options.Cookies = Cookies; - _socket.Options.Proxy = Proxy; - _socket.Options.RemoteCertificateValidationCallback = RemoteCertificateValidationCallback; - //Specify sub-protocol - if (!string.IsNullOrEmpty(_subProtocol)) - { - _socket.Options.AddSubProtocol(_subProtocol); - } - //Set headers - for (int i = 0; i < Headers.Count; i++) - { - string name = Headers.GetKey(i); - string? value = Headers.Get(i); - //Set header - _socket.Options.SetRequestHeader(name, value); - } - //Connect to server - await _socket.ConnectAsync(serverUri, token); - } - catch - { - //Cleanup the socket - Cleanup(); - throw; - } - } - - /// <summary> - /// Cleans up internal resources to prepare for another connection - /// </summary> - public void Cleanup() - { - //Dispose old socket if set - _socket?.Dispose(); - _socket = null; - } - ///<inheritdoc/> - public override WebSocketCloseStatus? CloseStatus => _socket?.CloseStatus; - ///<inheritdoc/> - public override string CloseStatusDescription => _socket?.CloseStatusDescription ?? string.Empty; - ///<inheritdoc/> - public override WebSocketState State => _socket?.State ?? WebSocketState.Closed; - ///<inheritdoc/> - public override string SubProtocol => _subProtocol ?? string.Empty; - - - ///<inheritdoc/> - public override void Abort() - { - _socket?.Abort(); - } - ///<inheritdoc/> - public override Task CloseAsync(WebSocketCloseStatus closeStatus, string? statusDescription, CancellationToken cancellationToken) - { - return _socket?.CloseAsync(closeStatus, statusDescription, cancellationToken) ?? Task.CompletedTask; - } - ///<inheritdoc/> - public override Task CloseOutputAsync(WebSocketCloseStatus closeStatus, string? statusDescription, CancellationToken cancellationToken) - { - if (_socket?.State == WebSocketState.Open || _socket?.State == WebSocketState.CloseSent) - { - return _socket.CloseOutputAsync(closeStatus, statusDescription, cancellationToken); - } - return Task.CompletedTask; - } - ///<inheritdoc/> - public override ValueTask<ValueWebSocketReceiveResult> ReceiveAsync(Memory<byte> buffer, CancellationToken cancellationToken) - { - _ = _socket ?? throw new WebSocketException(WebSocketError.ConnectionClosedPrematurely, "The connected socket has been disconnected"); - - return _socket.ReceiveAsync(buffer, cancellationToken); - } - ///<inheritdoc/> - public override Task<WebSocketReceiveResult> ReceiveAsync(ArraySegment<byte> buffer, CancellationToken cancellationToken) - { - _ = _socket ?? throw new WebSocketException(WebSocketError.ConnectionClosedPrematurely, "The connected socket has been disconnected"); - - return _socket.ReceiveAsync(buffer, cancellationToken); - } - ///<inheritdoc/> - public override ValueTask SendAsync(ReadOnlyMemory<byte> buffer, WebSocketMessageType messageType, bool endOfMessage, CancellationToken cancellationToken) - { - _ = _socket ?? throw new WebSocketException(WebSocketError.ConnectionClosedPrematurely, "The connected socket has been disconnected"); - return _socket.SendAsync(buffer, messageType, endOfMessage, cancellationToken); - } - ///<inheritdoc/> - public override Task SendAsync(ArraySegment<byte> buffer, WebSocketMessageType messageType, bool endOfMessage, CancellationToken cancellationToken) - { - _ = _socket ?? throw new WebSocketException(WebSocketError.ConnectionClosedPrematurely, "The connected socket has been disconnected"); - return _socket.SendAsync(buffer, messageType, endOfMessage, cancellationToken); - } - - ///<inheritdoc/> - public override void Dispose() - { - //Free buffer - _dataBuffer.Dispose(); - _socket?.Dispose(); - GC.SuppressFinalize(this); - } - } -} diff --git a/Net.Messaging.FBM/src/Client/README.md b/Net.Messaging.FBM/src/Client/README.md deleted file mode 100644 index 5aa8e76..0000000 --- a/Net.Messaging.FBM/src/Client/README.md +++ /dev/null @@ -1,169 +0,0 @@ -# VNLib.Net.Messaging.FBM.Client - -Fixed Buffer Messaging Protocol client library. High performance statful messaging -protocol built on top of HTTP web-sockets. Low/no allocation, completely asynchronous -while providing a TPL API. This library provides a simple asynchronous request/response -architecture to web-sockets. This was initially designed to provide an alternative to -complete HTTP request/response overhead, but allow a simple control flow for work -across a network. - -The base of the library relies on creating message objects that allocate fixed size -buffers are configured when the IFBMMessageis constructed. All data is written to the -internal buffer adhering to the format below. - -Messages consist of a 4 byte message id, a collection of headers, and a message body. -The first 4 bytes of a message is the ID (for normal messages a signed integer greater than 0), -0 is reserved for error conditions, and negative numbers are reserved for internal -messages. Headers are identified by a single byte, followed by a variable length UTF8 -encoded character sequence, followed by a termination of 0xFF, 0xF1 (may change). - -### Message structure - 4 byte positive (signed 32-bit integer) message id - 2 byte termination - 1 byte header-id - variable length UTF8 value - 2 byte termination - -- other headers -- - 2 byte termination (extra termination, ie: empty header) - variable length payload - (end of message is the end of the payload) - -Buffer sizes are generally negotiated on initial web-socket upgrade, so to buffer entire messages -in a single read/write from the web-socket. Received messages are read into memory until -the web-socket has no more data available. The message is then parsed and passed for processing -on the server side, or complete a pending request on the client side. Servers may drop the -web-socket connection and return an error if messages exceed the size of the pre-negotiated -buffer. Servers should validate buffer sizes before accepting a connection. - -This client library allows for messages to be streamed to the server, however this library -is optimized for fixed buffers, so streaming will not be the most efficient, and will likely -cause slow-downs in message transmission. However, since FBM relies on a streaming protocol, -so it was silly not to provide it. Streams add overhead of additional buffer allocation, -additional copy, and message fragmentation (multiple writes to the web-socket). Since frames -written to the web-socket must be synchronized, a mutex is held during transmission, which -means the more message overhead, the longer the blocking period on new messages. Mutex -acquisition will wait asynchronously when necessary. - -The goal of the FBM protocol for is to provide efficient use of resources (memory, network, -and minimize GC load) to transfer small messages truly asynchronously, at wire speeds, with -only web-socket and transport overhead. Using web-sockets simplifies implementation, and allows -comparability across platforms, languages, and versions. - -## fundamentals - -The main implementation is the FBMClient class. This class provides the means for creating -the stateful connection to the remote server. It also provides an internal FBMRequest message -rental (object cache) that created initialized FBMRequest messages. This class may be derrived -to provide additional functionality, such as handling control frames that may dynamically -alter the state of the connection (negotiation etc). A mechanism to do so is provided. - -### FBMClient layout - -``` - public class FBMClient : VnDisposeable, IStatefulConnection, ICacheHolder - { - //Raised when an error occurs during receiving or parsing - public event EventHandler<FMBClientErrorEventArgs>? ConnectionClosedOnError; - - //Raised when connection is closed, regardless of the cause - public event EventHandler? ConnectionClosed; - - //Connects to the remote server at the specified websocket address (ws:// or wss://) - public async Task ConnectAsync(Uri address, CancellationToken cancellation = default); - - //When connected, sends the specified message to the remote server - public async Task<FBMResponse> SendAsync(FBMRequest request, CancellationToken cancellationToken = default); - - //When connected, streams a message to the remote server, * the message payload must not be configured * - public async Task<FBMResponse> StreamDataAsync(FBMRequest request, Stream payload, ContentType ct, CancellationToken cancellationToken = default); - - //Disconnects from the remote server - public async Task DisconnectAsync(CancellationToken cancellationToken = default); - - //Releases all held resourses - public void Dispose(); //Inherrited from VnDisposeable - - ICacheHolder.CacheClear(); //Inherited member, clears cached FBMRequest objects - ICacheHolder.CacheHardClear(); //Inherited member, clears cached FBMRequest objects - } -``` - -### Example usage -``` - FBMClientConfig config = new() - { - //The size (in bytes) of the internal buffer to use when receiving messages from the server - RecvBufferSize = 1024, - - //FBMRequest buffer size (expected size of buffers, required for negotiation) - RequestBufferSize = 1024, - - //The size (in chars) of headers the FBMResponse should expect to buffer from the server - ResponseHeaderBufSize = 1024, - - //The absolute maximum message size to buffer from the server - MaxMessageSize = 10 * 1024 * 1024, //10KiB - - //The unmanaged heap the allocate buffers from - BufferHeap = Memory.Shared, - - //Web-socket keepalive frame interval - KeepAliveInterval = TimeSpan.FromSeconds(30), - - //Web-socket sub-protocol header value - SubProtocol = null - }; - - //Craete client from the config - using (FBMClient client = new(config)) - { - //Usually set some type of authentication headers before connecting - - /* - client.ClientSocket.SetHeader("Authorization", "Authorization token"); - */ - - //Connect to server - Uri address = new Uri("wss://localhost:8080/some/fbm/endpoint"); - await client.ConnectAsync(address, CancellationToken.None); - - do - { - //Rent request message - FBMRequest request = client.RentRequest(); - //Some arbitrary header value (or preconfigured header) - request.WriteHeader(0x10, "Hello"); - //Some arbitrary payload - request.WriteBody(new byte[] { 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A }, ContentType.Binary); - //Send request message - using (FBMResponse response = await client.SendAsync(request, CancellationToken.None)) - { - //Extension method to raise exception if an invalid response was received (also use the response.IsSet flag) - response.ThrowIfNotSet(); - - //Check headers (using Linq to get first header) - string header1 = response.Headers.First().Value.ToString(); - - //Get payload, data is valid until the response is disposed - ReadOnlySpan<byte> body = response.ResponseBody; - } - //Return request - client.ReturnRequest(request); - //request.Dispose(); //Alternativly dispose message - - await Task.Delay(1000); - } - while(true); - } -``` - -## Final Notes - -XML Documentation is or will be provided for almost all public exports. APIs are intended to -be sensibly public and immutable to allow for easy extensability (via extension methods). I -often use extension libraries to provide additional functionality. (See cache library) - -This library is likely a niche use case, and is probably not for everyone. Unless you care -about reasonably efficient high frequency request/response messaging, this probably isnt -for you. This library provides a reasonable building block for distributed lock mechanisms -and small data caching.
\ No newline at end of file diff --git a/Net.Messaging.FBM/src/Exceptions/FBMException.cs b/Net.Messaging.FBM/src/Exceptions/FBMException.cs deleted file mode 100644 index 1d5c7db..0000000 --- a/Net.Messaging.FBM/src/Exceptions/FBMException.cs +++ /dev/null @@ -1,52 +0,0 @@ -/* -* Copyright (c) 2022 Vaughn Nugent -* -* Library: VNLib -* Package: VNLib.Net.Messaging.FBM -* File: FBMException.cs -* -* FBMException.cs is part of VNLib.Net.Messaging.FBM which is part of the larger -* VNLib collection of libraries and utilities. -* -* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License as -* published by the Free Software Foundation, either version 3 of the -* License, or (at your option) any later version. -* -* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see https://www.gnu.org/licenses/. -*/ - -using System; -using System.Runtime.Serialization; - -namespace VNLib.Net.Messaging.FBM -{ - /// <summary> - /// A base exception class for all FBM Library exceptions - /// </summary> - public class FBMException : Exception - { - ///<inheritdoc/> - public FBMException() - { - } - ///<inheritdoc/> - public FBMException(string message) : base(message) - { - } - ///<inheritdoc/> - public FBMException(string message, Exception innerException) : base(message, innerException) - { - } - ///<inheritdoc/> - protected FBMException(SerializationInfo info, StreamingContext context) : base(info, context) - { - } - } -} diff --git a/Net.Messaging.FBM/src/Exceptions/FBMInvalidRequestException.cs b/Net.Messaging.FBM/src/Exceptions/FBMInvalidRequestException.cs deleted file mode 100644 index ae42797..0000000 --- a/Net.Messaging.FBM/src/Exceptions/FBMInvalidRequestException.cs +++ /dev/null @@ -1,47 +0,0 @@ -/* -* Copyright (c) 2022 Vaughn Nugent -* -* Library: VNLib -* Package: VNLib.Net.Messaging.FBM -* File: FBMInvalidRequestException.cs -* -* FBMInvalidRequestException.cs is part of VNLib.Net.Messaging.FBM which is part of the larger -* VNLib collection of libraries and utilities. -* -* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License as -* published by the Free Software Foundation, either version 3 of the -* License, or (at your option) any later version. -* -* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see https://www.gnu.org/licenses/. -*/ - -using System; - - -namespace VNLib.Net.Messaging.FBM -{ - /// <summary> - /// Raised when a request message is not in a valid state and cannot be sent - /// </summary> - public class FBMInvalidRequestException : FBMException - { - public FBMInvalidRequestException() - { - } - - public FBMInvalidRequestException(string message) : base(message) - { - } - - public FBMInvalidRequestException(string message, Exception innerException) : base(message, innerException) - { - } - } -} diff --git a/Net.Messaging.FBM/src/Exceptions/InvalidResponseException.cs b/Net.Messaging.FBM/src/Exceptions/InvalidResponseException.cs deleted file mode 100644 index 3f0b970..0000000 --- a/Net.Messaging.FBM/src/Exceptions/InvalidResponseException.cs +++ /dev/null @@ -1,52 +0,0 @@ -/* -* Copyright (c) 2022 Vaughn Nugent -* -* Library: VNLib -* Package: VNLib.Net.Messaging.FBM -* File: InvalidResponseException.cs -* -* InvalidResponseException.cs is part of VNLib.Net.Messaging.FBM which is part of the larger -* VNLib collection of libraries and utilities. -* -* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License as -* published by the Free Software Foundation, either version 3 of the -* License, or (at your option) any later version. -* -* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see https://www.gnu.org/licenses/. -*/ - -using System; -using System.Runtime.Serialization; - -namespace VNLib.Net.Messaging.FBM -{ - /// <summary> - /// Raised when a response to an FBM request is not in a valid state - /// </summary> - public class InvalidResponseException : FBMException - { - ///<inheritdoc/> - public InvalidResponseException() - { - } - ///<inheritdoc/> - public InvalidResponseException(string message) : base(message) - { - } - ///<inheritdoc/> - public InvalidResponseException(string message, Exception innerException) : base(message, innerException) - { - } - ///<inheritdoc/> - protected InvalidResponseException(SerializationInfo info, StreamingContext context) : base(info, context) - { - } - } -} diff --git a/Net.Messaging.FBM/src/Server/FBMContext.cs b/Net.Messaging.FBM/src/Server/FBMContext.cs deleted file mode 100644 index fb39d1b..0000000 --- a/Net.Messaging.FBM/src/Server/FBMContext.cs +++ /dev/null @@ -1,85 +0,0 @@ -/* -* Copyright (c) 2022 Vaughn Nugent -* -* Library: VNLib -* Package: VNLib.Net.Messaging.FBM -* File: FBMContext.cs -* -* FBMContext.cs is part of VNLib.Net.Messaging.FBM which is part of the larger -* VNLib collection of libraries and utilities. -* -* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License as -* published by the Free Software Foundation, either version 3 of the -* License, or (at your option) any later version. -* -* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see https://www.gnu.org/licenses/. -*/ - -using System.Text; - -using VNLib.Utils.IO; -using VNLib.Utils.Memory.Caching; - -namespace VNLib.Net.Messaging.FBM.Server -{ - /// <summary> - /// A request/response pair message context - /// </summary> - public sealed class FBMContext : IReusable - { - private readonly Encoding _headerEncoding; - - /// <summary> - /// The request message to process - /// </summary> - public FBMRequestMessage Request { get; } - /// <summary> - /// The response message - /// </summary> - public FBMResponseMessage Response { get; } - /// <summary> - /// Creates a new reusable <see cref="FBMContext"/> - /// for use within a <see cref="ObjectRental{T}"/> - /// cache - /// </summary> - /// <param name="requestHeaderBufferSize">The size in characters of the request header buffer</param> - /// <param name="responseBufferSize">The size in characters of the response header buffer</param> - /// <param name="headerEncoding">The message header encoding instance</param> - public FBMContext(int requestHeaderBufferSize, int responseBufferSize, Encoding headerEncoding) - { - Request = new(requestHeaderBufferSize); - Response = new(responseBufferSize, headerEncoding); - _headerEncoding = headerEncoding; - } - - /// <summary> - /// Initializes the context with the buffered request data - /// </summary> - /// <param name="requestData">The request data buffer positioned at the begining of the request data</param> - /// <param name="connectionId">The unique id of the connection</param> - internal void Prepare(VnMemoryStream requestData, string connectionId) - { - Request.Prepare(requestData, connectionId, _headerEncoding); - //Message id is set after the request parses the incoming message - Response.Prepare(Request.MessageId); - } - - void IReusable.Prepare() - { - (Request as IReusable).Prepare(); - (Response as IReusable).Prepare(); - } - - bool IReusable.Release() - { - return (Request as IReusable).Release() & (Response as IReusable).Release(); - } - } -} diff --git a/Net.Messaging.FBM/src/Server/FBMListener.cs b/Net.Messaging.FBM/src/Server/FBMListener.cs deleted file mode 100644 index 1d50953..0000000 --- a/Net.Messaging.FBM/src/Server/FBMListener.cs +++ /dev/null @@ -1,389 +0,0 @@ -/* -* Copyright (c) 2022 Vaughn Nugent -* -* Library: VNLib -* Package: VNLib.Net.Messaging.FBM -* File: FBMListener.cs -* -* FBMListener.cs is part of VNLib.Net.Messaging.FBM which is part of the larger -* VNLib collection of libraries and utilities. -* -* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License as -* published by the Free Software Foundation, either version 3 of the -* License, or (at your option) any later version. -* -* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see https://www.gnu.org/licenses/. -*/ - -using System; -using System.IO; -using System.Buffers; -using System.Threading; -using System.Net.WebSockets; -using System.Threading.Tasks; - -using VNLib.Utils; -using VNLib.Utils.IO; -using VNLib.Utils.Async; -using VNLib.Utils.Memory; -using VNLib.Utils.Extensions; -using VNLib.Utils.Memory.Caching; -using VNLib.Plugins.Essentials; - -namespace VNLib.Net.Messaging.FBM.Server -{ - - /// <summary> - /// Method delegate for processing FBM messages from an <see cref="FBMListener"/> - /// when messages are received - /// </summary> - /// <param name="context">The message/connection context</param> - /// <param name="userState">The state parameter passed on client connected</param> - /// <param name="cancellationToken">A token that reflects the state of the listener</param> - /// <returns>A <see cref="Task"/> that resolves when processing is complete</returns> - public delegate Task RequestHandler(FBMContext context, object? userState, CancellationToken cancellationToken); - - /// <summary> - /// A FBM protocol listener. Listens for messages on a <see cref="WebSocketSession"/> - /// and raises events on requests. - /// </summary> - public class FBMListener - { - class ListeningSession - { - private readonly ReusableStore<FBMContext> CtxStore; - private readonly CancellationTokenSource Cancellation; - private readonly CancellationTokenRegistration Registration; - private readonly FBMListenerSessionParams Params; - - - public readonly object? UserState; - - public readonly SemaphoreSlim ResponseLock; - - public readonly WebSocketSession Socket; - - public readonly RequestHandler OnRecieved; - - public CancellationToken CancellationToken => Cancellation.Token; - - - public ListeningSession(WebSocketSession session, RequestHandler onRecieved, in FBMListenerSessionParams args, object? userState) - { - Params = args; - Socket = session; - UserState = userState; - OnRecieved = onRecieved; - - //Create cancellation and register for session close - Cancellation = new(); - Registration = session.Token.Register(Cancellation.Cancel); - - - ResponseLock = new(1); - CtxStore = ObjectRental.CreateReusable(ContextCtor); - } - - private FBMContext ContextCtor() => new(Params.MaxHeaderBufferSize, Params.ResponseBufferSize, Params.HeaderEncoding); - - /// <summary> - /// Cancels any pending opreations relating to the current session - /// </summary> - public void CancelSession() - { - Cancellation.Cancel(); - - //If dispose happens without any outstanding requests, we can dispose the session - if (_counter == 0) - { - CleanupInternal(); - } - } - - private void CleanupInternal() - { - Registration.Dispose(); - CtxStore.Dispose(); - Cancellation.Dispose(); - ResponseLock.Dispose(); - } - - - private uint _counter; - - /// <summary> - /// Rents a new <see cref="FBMContext"/> instance from the pool - /// and increments the counter - /// </summary> - /// <returns>The rented instance</returns> - /// <exception cref="ObjectDisposedException"></exception> - public FBMContext RentContext() - { - - if (Cancellation.IsCancellationRequested) - { - throw new ObjectDisposedException("The instance has been disposed"); - } - - //Rent context - FBMContext ctx = CtxStore.Rent(); - //Increment counter - Interlocked.Increment(ref _counter); - - return ctx; - } - - /// <summary> - /// Returns a previously rented context to the pool - /// and decrements the counter. If the session has been - /// cancelled, when the counter reaches 0, cleanup occurs - /// </summary> - /// <param name="ctx">The context to return</param> - public void ReturnContext(FBMContext ctx) - { - //Return the context - CtxStore.Return(ctx); - - uint current = Interlocked.Decrement(ref _counter); - - //No more contexts in use, dispose internals - if (Cancellation.IsCancellationRequested && current == 0) - { - ResponseLock.Dispose(); - Cancellation.Dispose(); - CtxStore.Dispose(); - } - } - } - - public const int SEND_SEMAPHORE_TIMEOUT_MS = 10 * 1000; - - private readonly IUnmangedHeap Heap; - - /// <summary> - /// Raised when a response processing error occured - /// </summary> - public event EventHandler<Exception>? OnProcessError; - - /// <summary> - /// Creates a new <see cref="FBMListener"/> instance ready for - /// processing connections - /// </summary> - /// <param name="heap">The heap to alloc buffers from</param> - public FBMListener(IUnmangedHeap heap) - { - Heap = heap; - } - - /// <summary> - /// Begins listening for requests on the current websocket until - /// a close message is received or an error occurs - /// </summary> - /// <param name="wss">The <see cref="WebSocketSession"/> to receive messages on</param> - /// <param name="handler">The callback method to handle incoming requests</param> - /// <param name="args">The arguments used to configured this listening session</param> - /// <param name="userState">A state parameter</param> - /// <returns>A <see cref="Task"/> that completes when the connection closes</returns> - public async Task ListenAsync(WebSocketSession wss, RequestHandler handler, FBMListenerSessionParams args, object? userState) - { - ListeningSession session = new(wss, handler, args, userState); - //Alloc a recieve buffer - using IMemoryOwner<byte> recvBuffer = Heap.DirectAlloc<byte>(args.RecvBufferSize); - - //Init new queue for dispatching work - AsyncQueue<VnMemoryStream> workQueue = new(true, true); - - //Start a task to process the queue - Task queueWorker = QueueWorkerDoWork(workQueue, session); - - try - { - //Listen for incoming messages - while (true) - { - //Receive a message - ValueWebSocketReceiveResult result = await wss.ReceiveAsync(recvBuffer.Memory); - //If a close message has been received, we can gracefully exit - if (result.MessageType == WebSocketMessageType.Close) - { - //Return close message - await wss.CloseSocketAsync(WebSocketCloseStatus.NormalClosure, "Goodbye"); - //break listen loop - break; - } - //create buffer for storing data - VnMemoryStream request = new(Heap); - //Copy initial data - request.Write(recvBuffer.Memory.Span[..result.Count]); - //Streaming read - while (!result.EndOfMessage) - { - //Read more data - result = await wss.ReceiveAsync(recvBuffer.Memory); - //Make sure the request is small enough to buffer - if (request.Length + result.Count > args.MaxMessageSize) - { - //dispose the buffer - request.Dispose(); - //close the socket with a message too big - await wss.CloseSocketAsync(WebSocketCloseStatus.MessageTooBig, "Buffer space exceeded for message. Goodbye"); - //break listen loop - goto Exit; - } - //write to buffer - request.Write(recvBuffer.Memory.Span[..result.Count]); - } - //Make sure data is available - if (request.Length == 0) - { - request.Dispose(); - continue; - } - //reset buffer position - _ = request.Seek(0, SeekOrigin.Begin); - //Enqueue the request - await workQueue.EnqueueAsync(request); - } - - Exit: - ; - } - finally - { - session.CancelSession(); - await queueWorker.ConfigureAwait(false); - } - } - - private async Task QueueWorkerDoWork(AsyncQueue<VnMemoryStream> queue, ListeningSession session) - { - try - { - while (true) - { - //Get work from queue - VnMemoryStream request = await queue.DequeueAsync(session.CancellationToken); - //Process request without waiting - _ = ProcessAsync(request, session).ConfigureAwait(false); - } - } - catch (OperationCanceledException) - { } - finally - { - //Cleanup any queued requests - while (queue.TryDequeue(out VnMemoryStream? stream)) - { - stream.Dispose(); - } - } - } - - private async Task ProcessAsync(VnMemoryStream data, ListeningSession session) - { - //Rent a new request object - FBMContext context = session.RentContext(); - try - { - //Prepare the request/response - context.Prepare(data, session.Socket.SocketID); - - if ((context.Request.ParseStatus & HeaderParseError.InvalidId) > 0) - { - OnProcessError?.Invoke(this, new FBMException($"Invalid messageid {context.Request.MessageId}, message length {data.Length}")); - return; - } - - //Check parse status flags - if ((context.Request.ParseStatus & HeaderParseError.HeaderOutOfMem) > 0) - { - OnProcessError?.Invoke(this, new OutOfMemoryException("Packet received with not enough space to store headers")); - } - //Determine if request is an out-of-band message - else if (context.Request.MessageId == Helpers.CONTROL_FRAME_MID) - { - //Process control frame - await ProcessOOBAsync(context); - } - else - { - //Invoke normal message handler - await session.OnRecieved.Invoke(context, session.UserState, session.CancellationToken); - } - - //Get response data - await using IAsyncMessageReader messageEnumerator = await context.Response.GetResponseDataAsync(session.CancellationToken); - - //Load inital segment - if (await messageEnumerator.MoveNextAsync() && !session.CancellationToken.IsCancellationRequested) - { - ValueTask sendTask; - - //Syncrhonize access to send data because we may need to stream data to the client - await session.ResponseLock.WaitAsync(SEND_SEMAPHORE_TIMEOUT_MS); - try - { - do - { - bool eof = !messageEnumerator.DataRemaining; - - //Send first segment - sendTask = session.Socket.SendAsync(messageEnumerator.Current, WebSocketMessageType.Binary, eof); - - /* - * WARNING! - * this code relies on the managed websocket impl that the websocket will read - * the entire buffer before returning. If this is not the case, this code will - * overwrite the memory buffer on the next call to move next. - */ - - //Move to next segment - if (!await messageEnumerator.MoveNextAsync()) - { - break; - } - - //Await previous send - await sendTask; - - } while (true); - } - finally - { - //release semaphore - session.ResponseLock.Release(); - } - - await sendTask; - } - - //No data to send - } - catch (Exception ex) - { - OnProcessError?.Invoke(this, ex); - } - finally - { - session.ReturnContext(context); - } - } - - /// <summary> - /// Processes an out-of-band request message (internal communications) - /// </summary> - /// <param name="outOfBandContext">The <see cref="FBMContext"/> containing the OOB message</param> - /// <returns>A <see cref="Task"/> that completes when the operation completes</returns> - protected virtual Task ProcessOOBAsync(FBMContext outOfBandContext) - { - return Task.CompletedTask; - } - } -} diff --git a/Net.Messaging.FBM/src/Server/FBMListenerBase.cs b/Net.Messaging.FBM/src/Server/FBMListenerBase.cs deleted file mode 100644 index 3e9fde2..0000000 --- a/Net.Messaging.FBM/src/Server/FBMListenerBase.cs +++ /dev/null @@ -1,113 +0,0 @@ -/* -* Copyright (c) 2022 Vaughn Nugent -* -* Library: VNLib -* Package: VNLib.Net.Messaging.FBM -* File: FBMListenerBase.cs -* -* FBMListenerBase.cs is part of VNLib.Net.Messaging.FBM which is part of the larger -* VNLib collection of libraries and utilities. -* -* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License as -* published by the Free Software Foundation, either version 3 of the -* License, or (at your option) any later version. -* -* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see https://www.gnu.org/licenses/. -*/ - -using System; -using System.Threading; -using System.Threading.Tasks; - -using VNLib.Utils.Logging; -using VNLib.Utils.Memory; -using VNLib.Plugins.Essentials; - -namespace VNLib.Net.Messaging.FBM.Server -{ - /// <summary> - /// Provides a simple base class for an <see cref="FBMListener"/> - /// processor - /// </summary> - public abstract class FBMListenerBase - { - - /// <summary> - /// The initialzied listener - /// </summary> - protected FBMListener? Listener { get; private set; } - /// <summary> - /// A provider to write log information to - /// </summary> - protected abstract ILogProvider Log { get; } - - /// <summary> - /// Initializes the <see cref="FBMListener"/> - /// </summary> - /// <param name="heap">The heap to alloc buffers from</param> - protected void InitListener(IUnmangedHeap heap) - { - Listener = new(heap); - //Attach service handler - Listener.OnProcessError += Listener_OnProcessError; - } - - /// <summary> - /// A single event service routine for servicing errors that occur within - /// the listener loop - /// </summary> - /// <param name="sender"></param> - /// <param name="e">The exception that was raised</param> - protected virtual void Listener_OnProcessError(object? sender, Exception e) - { - //Write the error to the log - Log.Error(e); - } - - private async Task OnReceivedAsync(FBMContext context, object? userState, CancellationToken token) - { - try - { - await ProcessAsync(context, userState, token); - } - catch (OperationCanceledException) - { - Log.Debug("Async operation cancelled"); - } - catch(Exception ex) - { - Log.Error(ex); - } - } - - /// <summary> - /// Begins listening for requests on the current websocket until - /// a close message is received or an error occurs - /// </summary> - /// <param name="wss">The <see cref="WebSocketSession"/> to receive messages on</param> - /// <param name="args">The arguments used to configured this listening session</param> - /// <param name="userState">A state token to use for processing events for this connection</param> - /// <returns>A <see cref="Task"/> that completes when the connection closes</returns> - public virtual async Task ListenAsync(WebSocketSession wss, FBMListenerSessionParams args, object? userState) - { - _ = Listener ?? throw new InvalidOperationException("The listener has not been intialized"); - await Listener.ListenAsync(wss, OnReceivedAsync, args, userState); - } - - /// <summary> - /// A method to service an incoming message - /// </summary> - /// <param name="context">The context containing the message to be serviced</param> - /// <param name="userState">A state token passed on client connected</param> - /// <param name="exitToken">A token that reflects the state of the listener</param> - /// <returns>A task that completes when the message has been serviced</returns> - protected abstract Task ProcessAsync(FBMContext context, object? userState, CancellationToken exitToken); - } -} diff --git a/Net.Messaging.FBM/src/Server/FBMListenerSessionParams.cs b/Net.Messaging.FBM/src/Server/FBMListenerSessionParams.cs deleted file mode 100644 index c327475..0000000 --- a/Net.Messaging.FBM/src/Server/FBMListenerSessionParams.cs +++ /dev/null @@ -1,62 +0,0 @@ -/* -* Copyright (c) 2022 Vaughn Nugent -* -* Library: VNLib -* Package: VNLib.Net.Messaging.FBM -* File: FBMListenerSessionParams.cs -* -* FBMListenerSessionParams.cs is part of VNLib.Net.Messaging.FBM which is part of the larger -* VNLib collection of libraries and utilities. -* -* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License as -* published by the Free Software Foundation, either version 3 of the -* License, or (at your option) any later version. -* -* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see https://www.gnu.org/licenses/. -*/ - -using System.Text; - -namespace VNLib.Net.Messaging.FBM.Server -{ - /// <summary> - /// Represents a configuration structure for an <see cref="FBMListener"/> - /// listening session - /// </summary> - public readonly struct FBMListenerSessionParams - { - /// <summary> - /// The size of the buffer to use while reading data from the websocket - /// in the listener loop - /// </summary> - public readonly int RecvBufferSize { get; init; } - /// <summary> - /// The size of the character buffer to store FBMheader values in - /// the <see cref="FBMRequestMessage"/> - /// </summary> - public readonly int MaxHeaderBufferSize { get; init; } - /// <summary> - /// The size of the internal message response buffer when - /// not streaming - /// </summary> - public readonly int ResponseBufferSize { get; init; } - /// <summary> - /// The FMB message header character encoding - /// </summary> - public readonly Encoding HeaderEncoding { get; init; } - - /// <summary> - /// The absolute maxium size (in bytes) message to process before - /// closing the websocket connection. This value should be negotiaed - /// by clients or hard-coded to avoid connection issues - /// </summary> - public readonly int MaxMessageSize { get; init; } - } -} diff --git a/Net.Messaging.FBM/src/Server/FBMRequestMessage.cs b/Net.Messaging.FBM/src/Server/FBMRequestMessage.cs deleted file mode 100644 index ed36571..0000000 --- a/Net.Messaging.FBM/src/Server/FBMRequestMessage.cs +++ /dev/null @@ -1,196 +0,0 @@ -/* -* Copyright (c) 2022 Vaughn Nugent -* -* Library: VNLib -* Package: VNLib.Net.Messaging.FBM -* File: FBMRequestMessage.cs -* -* FBMRequestMessage.cs is part of VNLib.Net.Messaging.FBM which is part of the larger -* VNLib collection of libraries and utilities. -* -* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License as -* published by the Free Software Foundation, either version 3 of the -* License, or (at your option) any later version. -* -* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see https://www.gnu.org/licenses/. -*/ - -using System; -using System.Text; -using System.Buffers; -using System.Text.Json; -using System.Collections.Generic; - -using VNLib.Utils; -using VNLib.Utils.IO; -using VNLib.Utils.Memory; -using VNLib.Utils.Extensions; -using VNLib.Utils.Memory.Caching; - -namespace VNLib.Net.Messaging.FBM.Server -{ - /// <summary> - /// Represents a client request message to be serviced - /// </summary> - public sealed class FBMRequestMessage : IReusable - { - private readonly List<KeyValuePair<HeaderCommand, ReadOnlyMemory<char>>> _headers; - private readonly int HeaderCharBufferSize; - /// <summary> - /// Creates a new resusable <see cref="FBMRequestMessage"/> - /// </summary> - /// <param name="headerBufferSize">The size of the buffer to alloc during initialization</param> - internal FBMRequestMessage(int headerBufferSize) - { - HeaderCharBufferSize = headerBufferSize; - _headers = new(); - } - - private char[]? _headerBuffer; - - /// <summary> - /// The ID of the current message - /// </summary> - public int MessageId { get; private set; } - /// <summary> - /// Gets the underlying socket-id fot the current connection - /// </summary> - public string? ConnectionId { get; private set; } - /// <summary> - /// The raw request message, positioned to the body section of the message data - /// </summary> - public VnMemoryStream? RequestBody { get; private set; } - /// <summary> - /// A collection of headers for the current request - /// </summary> - public IReadOnlyList<KeyValuePair<HeaderCommand, ReadOnlyMemory<char>>> Headers => _headers; - /// <summary> - /// Status flags set during the message parsing - /// </summary> - public HeaderParseError ParseStatus { get; private set; } - /// <summary> - /// The message body data as a <see cref="ReadOnlySpan{T}"/> - /// </summary> - public ReadOnlySpan<byte> BodyData => Helpers.GetRemainingData(RequestBody!); - - /// <summary> - /// Determines if the current message is considered a control frame - /// </summary> - public bool IsControlFrame { get; private set; } - - /// <summary> - /// Prepares the request to be serviced - /// </summary> - /// <param name="vms">The request data packet</param> - /// <param name="socketId">The unique id of the connection</param> - /// <param name="dataEncoding">The data encoding used to decode header values</param> - internal void Prepare(VnMemoryStream vms, string socketId, Encoding dataEncoding) - { - //Store request body - RequestBody = vms; - //Store message id - MessageId = Helpers.GetMessageId(Helpers.ReadLine(vms)); - //Check mid for control frame - if(MessageId == Helpers.CONTROL_FRAME_MID) - { - IsControlFrame = true; - } - else if (MessageId < 1) - { - ParseStatus |= HeaderParseError.InvalidId; - return; - } - - ConnectionId = socketId; - - //sliding window over remaining data from internal buffer - ForwardOnlyMemoryWriter<char> writer = new(_headerBuffer); - - //Accumulate headers - while (true) - { - //Read the next line from the current stream - ReadOnlySpan<byte> line = Helpers.ReadLine(vms); - if (line.IsEmpty) - { - //Done reading headers - break; - } - HeaderCommand cmd = Helpers.GetHeaderCommand(line); - //Get header value - ERRNO charsRead = Helpers.GetHeaderValue(line, writer.Remaining.Span, dataEncoding); - if (charsRead < 0) - { - //Out of buffer space - ParseStatus |= HeaderParseError.HeaderOutOfMem; - break; - } - else if (!charsRead) - { - //Invalid header - ParseStatus |= HeaderParseError.InvalidHeaderRead; - } - else - { - //Store header as a read-only sequence - _headers.Add(new(cmd, writer.Remaining[..(int)charsRead])); - //Shift buffer window - writer.Advance(charsRead); - } - } - } - - /// <summary> - /// Deserializes the request body into a new specified object type - /// </summary> - /// <typeparam name="T">The type of the object to deserialize</typeparam> - /// <param name="jso">The <see cref="JsonSerializerOptions"/> to use while deserializing data</param> - /// <returns>The deserialized object from the request body</returns> - /// <exception cref="JsonException"></exception> - public T? DeserializeBody<T>(JsonSerializerOptions? jso = default) - { - return BodyData.IsEmpty ? default : BodyData.AsJsonObject<T>(jso); - } - /// <summary> - /// Gets a <see cref="JsonDocument"/> of the request body - /// </summary> - /// <returns>The parsed <see cref="JsonDocument"/> if parsed successfully, or null otherwise</returns> - /// <exception cref="JsonException"></exception> - public JsonDocument? GetBodyAsJson() - { - Utf8JsonReader reader = new(BodyData); - return JsonDocument.TryParseValue(ref reader, out JsonDocument? jdoc) ? jdoc : default; - } - - void IReusable.Prepare() - { - ParseStatus = HeaderParseError.None; - //Alloc header buffer - _headerBuffer = ArrayPool<char>.Shared.Rent(HeaderCharBufferSize); - } - - - bool IReusable.Release() - { - //Dispose the request message - RequestBody?.Dispose(); - RequestBody = null; - //Clear headers before freeing buffer - _headers.Clear(); - //Free header-buffer - ArrayPool<char>.Shared.Return(_headerBuffer!); - _headerBuffer = null; - ConnectionId = null; - MessageId = 0; - IsControlFrame = false; - return true; - } - } -} diff --git a/Net.Messaging.FBM/src/Server/FBMResponseMessage.cs b/Net.Messaging.FBM/src/Server/FBMResponseMessage.cs deleted file mode 100644 index 1536c99..0000000 --- a/Net.Messaging.FBM/src/Server/FBMResponseMessage.cs +++ /dev/null @@ -1,226 +0,0 @@ -/* -* Copyright (c) 2022 Vaughn Nugent -* -* Library: VNLib -* Package: VNLib.Net.Messaging.FBM -* File: FBMResponseMessage.cs -* -* FBMResponseMessage.cs is part of VNLib.Net.Messaging.FBM which is part of the larger -* VNLib collection of libraries and utilities. -* -* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License as -* published by the Free Software Foundation, either version 3 of the -* License, or (at your option) any later version. -* -* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see https://www.gnu.org/licenses/. -*/ - -using System; -using System.Text; -using System.Threading; -using System.Threading.Tasks; - -using VNLib.Net.Http; -using VNLib.Utils.IO; -using VNLib.Utils.Extensions; -using VNLib.Utils.Memory.Caching; -using VNLib.Net.Messaging.FBM.Client; - -namespace VNLib.Net.Messaging.FBM.Server -{ - - /// <summary> - /// Represents an FBM request response container. - /// </summary> - public sealed class FBMResponseMessage : IReusable, IFBMMessage - { - internal FBMResponseMessage(int internalBufferSize, Encoding headerEncoding) - { - _headerAccumulator = new HeaderDataAccumulator(internalBufferSize); - _headerEncoding = headerEncoding; - _messageEnumerator = new(this); - } - - private readonly MessageSegmentEnumerator _messageEnumerator; - private readonly ISlindingWindowBuffer<byte> _headerAccumulator; - private readonly Encoding _headerEncoding; - - private IAsyncMessageBody? _messageBody; - - ///<inheritdoc/> - public int MessageId { get; private set; } - - void IReusable.Prepare() - { - (_headerAccumulator as HeaderDataAccumulator)!.Prepare(); - } - - bool IReusable.Release() - { - //Release header accumulator - _headerAccumulator.Close(); - - _messageBody = null; - - MessageId = 0; - - return true; - } - - /// <summary> - /// Initializes the response message with the specified message-id - /// to respond with - /// </summary> - /// <param name="messageId">The message id of the context to respond to</param> - internal void Prepare(int messageId) - { - //Reset accumulator when message id is written - _headerAccumulator.Reset(); - //Write the messageid to the begining of the headers buffer - MessageId = messageId; - _headerAccumulator.Append((byte)HeaderCommand.MessageId); - _headerAccumulator.Append(messageId); - _headerAccumulator.WriteTermination(); - } - - ///<inheritdoc/> - public void WriteHeader(HeaderCommand header, ReadOnlySpan<char> value) - { - WriteHeader((byte)header, value); - } - ///<inheritdoc/> - public void WriteHeader(byte header, ReadOnlySpan<char> value) - { - _headerAccumulator.WriteHeader(header, value, _headerEncoding); - } - - ///<inheritdoc/> - public void WriteBody(ReadOnlySpan<byte> body, ContentType contentType = ContentType.Binary) - { - //Append content type header - WriteHeader(HeaderCommand.ContentType, HttpHelpers.GetContentTypeString(contentType)); - //end header segment - _headerAccumulator.WriteTermination(); - //Write message body - _headerAccumulator.Append(body); - } - - /// <summary> - /// Sets the response message body - /// </summary> - /// <param name="messageBody">The <see cref="IAsyncMessageBody"/> to stream data from</param> - /// <exception cref="InvalidOperationException"></exception> - public void AddMessageBody(IAsyncMessageBody messageBody) - { - if(_messageBody != null) - { - throw new InvalidOperationException("The message body is already set"); - } - - //Append message content type header - WriteHeader(HeaderCommand.ContentType, HttpHelpers.GetContentTypeString(messageBody.ContentType)); - - //end header segment - _headerAccumulator.WriteTermination(); - - //Store message body - _messageBody = messageBody; - - } - - /// <summary> - /// Gets the internal message body enumerator and prepares the message for sending - /// </summary> - /// <param name="cancellationToken">A cancellation token</param> - /// <returns>A value task that returns the message body enumerator</returns> - internal async ValueTask<IAsyncMessageReader> GetResponseDataAsync(CancellationToken cancellationToken) - { - //try to buffer as much data in the header segment first - if(_messageBody?.RemainingSize > 0 && _headerAccumulator.RemainingSize > 0) - { - //Read data from the message - int read = await _messageBody.ReadAsync(_headerAccumulator.RemainingBuffer, cancellationToken); - //Advance accumulator to the read bytes - _headerAccumulator.Advance(read); - } - //return reusable enumerator - return _messageEnumerator; - } - - private class MessageSegmentEnumerator : IAsyncMessageReader - { - private readonly FBMResponseMessage _message; - - bool HeadersRead; - - public MessageSegmentEnumerator(FBMResponseMessage message) - { - _message = message; - } - - public ReadOnlyMemory<byte> Current { get; private set; } - - public bool DataRemaining { get; private set; } - - public async ValueTask<bool> MoveNextAsync() - { - //Attempt to read header segment first - if (!HeadersRead) - { - //Set the accumulated buffer - Current = _message._headerAccumulator.AccumulatedBuffer; - - //Update data remaining flag - DataRemaining = _message._messageBody?.RemainingSize > 0; - - //Set headers read flag - HeadersRead = true; - - return true; - } - else if (_message._messageBody?.RemainingSize > 0) - { - //Use the header buffer as the buffer for the message body - Memory<byte> buffer = _message._headerAccumulator.Buffer; - - //Read body segment - int read = await _message._messageBody.ReadAsync(buffer); - - //Update data remaining flag - DataRemaining = _message._messageBody.RemainingSize > 0; - - if (read > 0) - { - //Store the read segment - Current = buffer[..read]; - return true; - } - } - return false; - } - - public async ValueTask DisposeAsync() - { - //Clear current segment - Current = default; - - //Reset headers read flag - HeadersRead = false; - - //Dispose the message body if set - if (_message._messageBody != null) - { - await _message._messageBody.DisposeAsync(); - } - } - } - } - -} diff --git a/Net.Messaging.FBM/src/Server/HeaderDataAccumulator.cs b/Net.Messaging.FBM/src/Server/HeaderDataAccumulator.cs deleted file mode 100644 index 78b378d..0000000 --- a/Net.Messaging.FBM/src/Server/HeaderDataAccumulator.cs +++ /dev/null @@ -1,89 +0,0 @@ -/* -* Copyright (c) 2022 Vaughn Nugent -* -* Library: VNLib -* Package: VNLib.Net.Messaging.FBM -* File: HeaderDataAccumulator.cs -* -* HeaderDataAccumulator.cs is part of VNLib.Net.Messaging.FBM which is part of the larger -* VNLib collection of libraries and utilities. -* -* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License as -* published by the Free Software Foundation, either version 3 of the -* License, or (at your option) any later version. -* -* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see https://www.gnu.org/licenses/. -*/ - -using System; -using System.Buffers; - -using VNLib.Utils.IO; - - -namespace VNLib.Net.Messaging.FBM.Server -{ - /// <summary> - /// Reusable sliding window impl - /// </summary> - internal class HeaderDataAccumulator : ISlindingWindowBuffer<byte> - { - private readonly int BufferSize; - - private byte[]? _memHandle; - - public HeaderDataAccumulator(int bufferSize) - { - BufferSize = bufferSize; - } - - ///<inheritdoc/> - public int WindowStartPos { get; private set; } - ///<inheritdoc/> - public int WindowEndPos { get; private set; } - ///<inheritdoc/> - public Memory<byte> Buffer => _memHandle.AsMemory(); - - ///<inheritdoc/> - public void Advance(int count) => WindowEndPos += count; - - ///<inheritdoc/> - public void AdvanceStart(int count) => WindowEndPos += count; - - ///<inheritdoc/> - public void Reset() - { - WindowStartPos = 0; - WindowEndPos = 0; - } - - /// <summary> - /// Allocates the internal message buffer - /// </summary> - public void Prepare() - { - _memHandle ??= ArrayPool<byte>.Shared.Rent(BufferSize); - } - - ///<inheritdoc/> - public void Close() - { - Reset(); - - if (_memHandle != null) - { - //Return the buffer to the pool - ArrayPool<byte>.Shared.Return(_memHandle); - _memHandle = null; - } - } - } - -} diff --git a/Net.Messaging.FBM/src/Server/IAsyncMessageBody.cs b/Net.Messaging.FBM/src/Server/IAsyncMessageBody.cs deleted file mode 100644 index 5566520..0000000 --- a/Net.Messaging.FBM/src/Server/IAsyncMessageBody.cs +++ /dev/null @@ -1,57 +0,0 @@ -/* -* Copyright (c) 2022 Vaughn Nugent -* -* Library: VNLib -* Package: VNLib.Net.Messaging.FBM -* File: IAsyncMessageBody.cs -* -* IAsyncMessageBody.cs is part of VNLib.Net.Messaging.FBM which is part of the larger -* VNLib collection of libraries and utilities. -* -* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License as -* published by the Free Software Foundation, either version 3 of the -* License, or (at your option) any later version. -* -* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see https://www.gnu.org/licenses/. -*/ - -using System; -using System.Threading; -using System.Threading.Tasks; - -using VNLib.Net.Http; - -namespace VNLib.Net.Messaging.FBM -{ - /// <summary> - /// A disposable message body container for asynchronously reading a variable length message body - /// </summary> - public interface IAsyncMessageBody : IAsyncDisposable - { - /// <summary> - /// The message body content type - /// </summary> - ContentType ContentType { get; } - - /// <summary> - /// The number of bytes remaining to be read from the message body - /// </summary> - int RemainingSize { get; } - - /// <summary> - /// Reads the next chunk of data from the message body - /// </summary> - /// <param name="buffer">The buffer to copy output data to</param> - /// <param name="token">A token to cancel the operation</param> - /// <returns></returns> - ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken token = default); - } - -} diff --git a/Net.Messaging.FBM/src/Server/IAsyncMessageReader.cs b/Net.Messaging.FBM/src/Server/IAsyncMessageReader.cs deleted file mode 100644 index b2abe8d..0000000 --- a/Net.Messaging.FBM/src/Server/IAsyncMessageReader.cs +++ /dev/null @@ -1,42 +0,0 @@ -/* -* Copyright (c) 2022 Vaughn Nugent -* -* Library: VNLib -* Package: VNLib.Net.Messaging.FBM -* File: IAsyncMessageReader.cs -* -* IAsyncMessageReader.cs is part of VNLib.Net.Messaging.FBM which is part of the larger -* VNLib collection of libraries and utilities. -* -* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License as -* published by the Free Software Foundation, either version 3 of the -* License, or (at your option) any later version. -* -* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see https://www.gnu.org/licenses/. -*/ - -using System; -using System.Collections.Generic; - - -namespace VNLib.Net.Messaging.FBM.Server -{ - /// <summary> - /// Internal message body reader/enumerator for FBM messages - /// </summary> - internal interface IAsyncMessageReader : IAsyncEnumerator<ReadOnlyMemory<byte>> - { - /// <summary> - /// A value that indicates if there is data remaining after a - /// </summary> - bool DataRemaining { get; } - } - -} diff --git a/Net.Messaging.FBM/src/Server/readme.md b/Net.Messaging.FBM/src/Server/readme.md deleted file mode 100644 index 489e58f..0000000 --- a/Net.Messaging.FBM/src/Server/readme.md +++ /dev/null @@ -1,35 +0,0 @@ -# VNLib.Net.Messaging.FBM.Server - -Fixed Buffer Messaging Protocol server library. High performance statful messaging -protocol built on top of HTTP web-sockets. Low/no allocation, completely asynchronous -while providing a TPL API. This library provides a simple asynchronous request/response -architecture to web-sockets. This was initially designed to provide an alternative to -complete HTTP request/response overhead, but allow a simple control flow for work -across a network. - -Messages consist of a 4 byte message id, a collection of headers, and a message body. -The first 4 bytes of a message is the ID (for normal messages a signed integer greater than 0), -0 is reserved for error conditions, and negative numbers are reserved for internal -messages. Headers are identified by a single byte, followed by a variable length UTF8 -encoded character sequence, followed by a termination of 0xFF, 0xF1 (may change). - -### Message structure - 4 byte positive (signed 32-bit integer) message id - 2 byte termination - 1 byte header-id - variable length UTF8 value - 2 byte termination - -- other headers -- - 2 byte termination (extra termination, ie: empty header) - variable length payload - (end of message is the end of the payload) - - -XML Documentation is or will be provided for almost all public exports. APIs are intended to -be sensibly public and immutable to allow for easy extensability (via extension methods). I -often use extension libraries to provide additional functionality. (See cache library) - -This library is likely a niche use case, and is probably not for everyone. Unless you care -about reasonably efficient high frequency request/response messaging, this probably isnt -for you. This library provides a reasonable building block for distributed lock mechanisms -and small data caching.
\ No newline at end of file diff --git a/Net.Messaging.FBM/src/VNLib.Net.Messaging.FBM.csproj b/Net.Messaging.FBM/src/VNLib.Net.Messaging.FBM.csproj deleted file mode 100644 index 9440e69..0000000 --- a/Net.Messaging.FBM/src/VNLib.Net.Messaging.FBM.csproj +++ /dev/null @@ -1,33 +0,0 @@ -<Project Sdk="Microsoft.NET.Sdk"> - - <PropertyGroup> - <TargetFramework>net6.0</TargetFramework> - <Authors>Vaughn Nugent</Authors> - <Version>1.0.1.1</Version> - <Copyright>Copyright © 2022 Vaughn Nugent</Copyright> - <Nullable>enable</Nullable> - <PackageProjectUrl>www.vaughnnugent.com/resources</PackageProjectUrl> - <AnalysisLevel>latest-all</AnalysisLevel> - <SignAssembly>True</SignAssembly> - <AssemblyOriginatorKeyFile>\\vaughnnugent.com\Internal\Folder Redirection\vman\Documents\Programming\Software\StrongNameingKey.snk</AssemblyOriginatorKeyFile> - </PropertyGroup> - - - <ItemGroup> - <PackageReference Include="ErrorProne.NET.CoreAnalyzers" Version="0.1.2"> - <PrivateAssets>all</PrivateAssets> - <IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets> - </PackageReference> - <PackageReference Include="ErrorProne.NET.Structs" Version="0.1.2"> - <PrivateAssets>all</PrivateAssets> - <IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets> - </PackageReference> - </ItemGroup> - - <ItemGroup> - <ProjectReference Include="..\..\Essentials\src\VNLib.Plugins.Essentials.csproj" /> - <ProjectReference Include="..\..\Http\src\VNLib.Net.Http.csproj" /> - <ProjectReference Include="..\..\Utils\src\VNLib.Utils.csproj" /> - </ItemGroup> - -</Project> |