aboutsummaryrefslogtreecommitdiff
path: root/lib/Net.Messaging.FBM
diff options
context:
space:
mode:
authorLibravatar vnugent <public@vaughnnugent.com>2023-11-29 00:15:28 -0500
committerLibravatar vnugent <public@vaughnnugent.com>2023-11-29 00:15:28 -0500
commit29371caa9c29fd6cfdfb238d98f53fda59e2e8a7 (patch)
tree47bb4d4726f2cafb1af41460d3356519b4074198 /lib/Net.Messaging.FBM
parent07824a130c7608337a36382dbfa40198a8c70297 (diff)
immutable fbm clients, websocket abstractions, prep for monocypher/argon2 bindings
Diffstat (limited to 'lib/Net.Messaging.FBM')
-rw-r--r--lib/Net.Messaging.FBM/src/Client/FBMClient.cs145
-rw-r--r--lib/Net.Messaging.FBM/src/Client/FBMClientFactory.cs85
-rw-r--r--lib/Net.Messaging.FBM/src/Client/FBMClientWorkerBase.cs125
-rw-r--r--lib/Net.Messaging.FBM/src/Client/FBMFallbackClientWsFactory.cs117
-rw-r--r--lib/Net.Messaging.FBM/src/Client/IFbmClientWebsocket.cs76
-rw-r--r--lib/Net.Messaging.FBM/src/Client/IFbmWebsocketFactory.cs40
-rw-r--r--lib/Net.Messaging.FBM/src/Client/ManagedClientWebSocket.cs203
7 files changed, 407 insertions, 384 deletions
diff --git a/lib/Net.Messaging.FBM/src/Client/FBMClient.cs b/lib/Net.Messaging.FBM/src/Client/FBMClient.cs
index ee64d53..9628313 100644
--- a/lib/Net.Messaging.FBM/src/Client/FBMClient.cs
+++ b/lib/Net.Messaging.FBM/src/Client/FBMClient.cs
@@ -47,7 +47,7 @@ namespace VNLib.Net.Messaging.FBM.Client
/// A Fixed Buffer Message Protocol client. Allows for high performance client-server messaging
/// with minimal memory overhead.
/// </summary>
- public class FBMClient : VnDisposeable, IStatefulConnection, ICacheHolder
+ public class FBMClient : VnDisposeable, IStatefulConnection
{
/// <summary>
/// The WS connection query arguments to specify a receive buffer size
@@ -77,9 +77,12 @@ namespace VNLib.Net.Messaging.FBM.Client
private readonly SemaphoreSlim SendLock;
private readonly ConcurrentDictionary<int, FBMRequest> ActiveRequests;
- private readonly ObjectRental<FBMRequest> RequestRental;
+ private readonly IFBMMemoryHandle _streamBuffer;
+ private readonly IFbmClientWebsocket _socket;
private readonly FBMClientConfig _config;
- private readonly byte[] _streamBuffer;
+
+ private readonly IObjectRental<FBMRequest> _requestRental;
+ private readonly bool _ownsObjectRenal;
/// <summary>
/// The configuration for the current client
@@ -90,34 +93,63 @@ namespace VNLib.Net.Messaging.FBM.Client
/// A handle that is reset when a connection has been successfully set, and is set
/// when the connection exists
/// </summary>
- public ManualResetEvent ConnectionStatusHandle { get; }
-
+ public ManualResetEvent ConnectionStatusHandle { get; }
+
/// <summary>
- /// The <see cref="ClientWebSocket"/> to send/recieve message on
+ /// The client's http header collection used when making connections
/// </summary>
- public ManagedClientWebSocket ClientSocket { get; }
-
+ public VnWebHeaderCollection Headers { get; }
/// <summary>
- /// Creates a new <see cref="FBMClient"/> in a closed state
+ /// Creates an immutable FBMClient that wraps the supplied web socket using the
+ /// supplied config.
/// </summary>
- /// <param name="config">The client configuration</param>
- public FBMClient(FBMClientConfig config)
+ /// <param name="config">The client config</param>
+ /// <param name="websocket">The websocket instance used to comunicate with an FBMServer</param>
+ public FBMClient(in FBMClientConfig config, IFbmClientWebsocket websocket)
+ :this(in config, websocket, null)
{
- RequestRental = ObjectRental.CreateReusable(ReuseableRequestConstructor, 200);
+ }
+
+ internal FBMClient(in FBMClientConfig config, IFbmClientWebsocket websocket, IObjectRental<FBMRequest>? requestRental)
+ {
+ _config = config;
+ _socket = websocket ?? throw new ArgumentNullException(nameof(websocket));
+
+ _ = config.MemoryManager ?? throw new ArgumentException("FBM memory manager is required", nameof(config));
+
+ //Create new request rental if none supplied
+ if(requestRental is null)
+ {
+ _ownsObjectRenal = true;
+ _requestRental = ObjectRental.CreateReusable(ReuseableRequestConstructor, 100);
+ }
+ else
+ {
+ _requestRental = requestRental;
+ }
+
+ Headers = new();
SendLock = new(1);
ConnectionStatusHandle = new(true);
ActiveRequests = new(Environment.ProcessorCount, 100);
- _config = config;
- //Init the new client socket
- ClientSocket = new(config.RecvBufferSize, config.RecvBufferSize, config.KeepAliveInterval, config.SubProtocol);
-
- //Init the stream buffer
+ /*
+ * We can use the pool to allocate a single stream buffer that will be shared.
+ * This is because there is only 1 thread allowed to send/copy data at a time
+ * so it can be allocated once and shared
+ */
int maxStrmBufSize = Math.Min(config.MaxMessageSize, MAX_STREAM_BUFFER_SIZE);
- _streamBuffer = new byte[maxStrmBufSize];
+ _streamBuffer = config.MemoryManager.InitHandle();
+ config.MemoryManager.AllocBuffer(_streamBuffer, maxStrmBufSize);
}
+ /// <summary>
+ /// Allocates and configures a new <see cref="FBMRequest"/> message object for use within the reusable store
+ /// </summary>
+ /// <returns>The configured <see cref="FBMRequest"/></returns>
+ protected virtual FBMRequest ReuseableRequestConstructor() => new(in _config);
+
private void Debug(string format, params string[] strings)
{
if(Config.DebugLog != null)
@@ -125,6 +157,7 @@ namespace VNLib.Net.Messaging.FBM.Client
Config.DebugLog.Debug($"[DEBUG] FBM Client: {format}", strings);
}
}
+
private void Debug(string format, long value, long other)
{
if (Config.DebugLog != null)
@@ -132,12 +165,7 @@ namespace VNLib.Net.Messaging.FBM.Client
Config.DebugLog.Debug($"[DEBUG] FBM Client: {format}", value, other);
}
}
-
- /// <summary>
- /// Allocates and configures a new <see cref="FBMRequest"/> message object for use within the reusable store
- /// </summary>
- /// <returns>The configured <see cref="FBMRequest"/></returns>
- protected virtual FBMRequest ReuseableRequestConstructor() => new(in _config);
+
/// <summary>
/// Asynchronously opens a websocket connection with the specifed remote server
@@ -157,28 +185,28 @@ namespace VNLib.Net.Messaging.FBM.Client
Debug("Connection string {con}", urib.Uri.ToString());
//Connect to server
- await ClientSocket.ConnectAsync(urib.Uri, cancellationToken);
+ await _socket.ConnectAsync(urib.Uri, Headers, cancellationToken);
//Reset wait handle before return
ConnectionStatusHandle.Reset();
- //Begin listeing for requets in a background task
- _ = Task.Run(() => ProcessContinuousRecvAsync(ClientSocket), cancellationToken);
+ //Begin listeing for requests in a background task
+ _ = Task.Run(ProcessContinuousRecvAsync, cancellationToken);
}
/// <summary>
- /// Rents a new <see cref="FBMRequest"/> from the internal <see cref="ReusableStore{T}"/>.
+ /// Rents a new <see cref="FBMRequest"/> from the internal <see cref="ObjectRental{T}"/>.
/// Use <see cref="ReturnRequest(FBMRequest)"/> when request is no longer in use
/// </summary>
/// <returns>The configured (rented or new) <see cref="FBMRequest"/> ready for use</returns>
- public FBMRequest RentRequest() => RequestRental.Rent();
+ public FBMRequest RentRequest() => _requestRental.Rent();
/// <summary>
/// Stores (or returns) the reusable request in cache for use with <see cref="RentRequest"/>
/// </summary>
/// <param name="request">The request to return to the store</param>
/// <exception cref="InvalidOperationException"></exception>
- public void ReturnRequest(FBMRequest request) => RequestRental.Return(request);
+ public void ReturnRequest(FBMRequest request) => _requestRental.Return(request);
/// <summary>
/// Sends a <see cref="FBMRequest"/> to the connected server
@@ -190,7 +218,7 @@ namespace VNLib.Net.Messaging.FBM.Client
/// <exception cref="ObjectDisposedException"></exception>
/// <exception cref="InvalidOperationException"></exception>
/// <exception cref="FBMInvalidRequestException"></exception>
- public Task<FBMResponse> SendAsync(FBMRequest request, CancellationToken cancellationToken = default) => SendAsync(request, _config.RequestTimeout, cancellationToken);
+ public Task<FBMResponse> SendAsync(FBMRequest request, CancellationToken cancellationToken = default) => SendAsync(request, Config.RequestTimeout, cancellationToken);
/// <summary>
/// Sends a <see cref="FBMRequest"/> to the connected server
@@ -229,7 +257,7 @@ namespace VNLib.Net.Messaging.FBM.Client
using (SemSlimReleaser releaser = await SendLock.GetReleaserAsync(cancellationToken))
{
//Send the data to the server
- await ClientSocket.SendAsync(requestData, WebSocketMessageType.Binary, true, cancellationToken);
+ await _socket.SendAsync(requestData, WebSocketMessageType.Binary, true, cancellationToken);
}
//wait for the response to be set
@@ -267,7 +295,7 @@ namespace VNLib.Net.Messaging.FBM.Client
/// <exception cref="ObjectDisposedException"></exception>
/// <exception cref="InvalidOperationException"></exception>
public Task<FBMResponse> StreamDataAsync(FBMRequest request, Stream payload, ContentType contentType, CancellationToken cancellationToken = default)
- => StreamDataAsync(request, payload, contentType, _config.RequestTimeout, cancellationToken);
+ => StreamDataAsync(request, payload, contentType, Config.RequestTimeout, cancellationToken);
/// <summary>
/// Streams arbitrary binary data to the server with the initial request message
@@ -283,6 +311,8 @@ namespace VNLib.Net.Messaging.FBM.Client
/// <exception cref="InvalidOperationException"></exception>
public async Task<FBMResponse> StreamDataAsync(FBMRequest request, Stream payload, ContentType contentType, TimeSpan timeout, CancellationToken cancellationToken = default)
{
+ _ = payload ?? throw new ArgumentNullException(nameof(payload));
+
Check();
cancellationToken.ThrowIfCancellationRequested();
@@ -303,17 +333,19 @@ namespace VNLib.Net.Messaging.FBM.Client
//Write an empty body in the request so a content type header is writen
request.WriteBody(ReadOnlySpan<byte>.Empty, contentType);
+ Memory<byte> bufferMemory = _streamBuffer.GetMemory();
+
//Wait for send-lock
using (SemSlimReleaser releaser = await SendLock.GetReleaserAsync(cancellationToken))
{
//Send the initial request packet
- await ClientSocket.SendAsync(requestData, WebSocketMessageType.Binary, false, cancellationToken);
+ await _socket.SendAsync(requestData, WebSocketMessageType.Binary, false, cancellationToken);
//Stream mesage body
do
{
//Read data
- int read = await payload.ReadAsync(_streamBuffer, cancellationToken);
+ int read = await payload.ReadAsync(bufferMemory, cancellationToken);
if (read == 0)
{
@@ -322,7 +354,7 @@ namespace VNLib.Net.Messaging.FBM.Client
}
//write message to socket, if the read data was smaller than the buffer, we can send the last packet
- await ClientSocket.SendAsync(_streamBuffer[..read], WebSocketMessageType.Binary, read < _streamBuffer.Length, cancellationToken);
+ await _socket.SendAsync(bufferMemory[..read], WebSocketMessageType.Binary, read < bufferMemory.Length, cancellationToken);
} while (true);
}
@@ -358,15 +390,9 @@ namespace VNLib.Net.Messaging.FBM.Client
{
Check();
//Close the connection
- await ClientSocket.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, "Closing", cancellationToken);
+ await _socket.DisconnectAsync(WebSocketCloseStatus.NormalClosure, cancellationToken);
}
- ///<inheritdoc/>
- public void CacheClear() => RequestRental.CacheClear();
- ///<inheritdoc/>
- public void CacheHardClear() => RequestRental.CacheHardClear();
-
-
private void CheckOrEnqueue(FBMRequest request)
{
/*
@@ -404,25 +430,27 @@ namespace VNLib.Net.Messaging.FBM.Client
/// until the socket is closed, or canceled
/// </summary>
/// <returns></returns>
- protected async Task ProcessContinuousRecvAsync(WebSocket socket)
+ protected async Task ProcessContinuousRecvAsync()
{
Debug("Begining receive loop");
//Alloc recv buffer
- IFBMMemoryHandle recvBuffer = _config.MemoryManager.InitHandle();
- _config.MemoryManager.AllocBuffer(recvBuffer, _config.RecvBufferSize);
+ IFBMMemoryHandle recvBuffer = Config.MemoryManager.InitHandle();
+ Config.MemoryManager.AllocBuffer(recvBuffer, Config.RecvBufferSize);
try
{
- if(!_config.MemoryManager.TryGetHeap(out IUnmangedHeap? heap))
+ if(!Config.MemoryManager.TryGetHeap(out IUnmangedHeap? heap))
{
throw new NotSupportedException("The memory manager must support using IUnmanagedHeaps");
}
+ Memory<byte> rcvMemory = recvBuffer.GetMemory();
+
//Recv event loop
while (true)
{
//Listen for incoming packets with the intial data buffer
- ValueWebSocketReceiveResult result = await socket.ReceiveAsync(recvBuffer.GetMemory(), CancellationToken.None);
+ ValueWebSocketReceiveResult result = await _socket.ReceiveAsync(rcvMemory, CancellationToken.None);
//If the message is a close message, its time to exit
if (result.MessageType == WebSocketMessageType.Close)
@@ -444,10 +472,10 @@ namespace VNLib.Net.Messaging.FBM.Client
while (!result.EndOfMessage)
{
//recive more data
- result = await socket.ReceiveAsync(recvBuffer.GetMemory(), CancellationToken.None);
+ result = await _socket.ReceiveAsync(rcvMemory, CancellationToken.None);
//Make sure the buffer is not too large
- if ((responseBuffer.Length + result.Count) > _config.MaxMessageSize)
+ if ((responseBuffer.Length + result.Count) > Config.MaxMessageSize)
{
//Dispose the buffer before exiting
responseBuffer.Dispose();
@@ -485,9 +513,7 @@ namespace VNLib.Net.Messaging.FBM.Client
finally
{
//Dispose the recv buffer
- _config.MemoryManager.FreeBuffer(recvBuffer);
- //Cleanup the socket when exiting
- ClientSocket.Cleanup();
+ Config.MemoryManager.FreeBuffer(recvBuffer);
//Set status handle as unset
ConnectionStatusHandle.Set();
@@ -573,12 +599,19 @@ namespace VNLib.Net.Messaging.FBM.Client
///<inheritdoc/>
protected override void Free()
{
- //Dispose socket
- ClientSocket.Dispose();
+ //Free stream buffer
+ Config.MemoryManager.FreeBuffer(_streamBuffer);
+
//Dispose client buffer
- RequestRental.Dispose();
+ _socket.Dispose();
SendLock.Dispose();
ConnectionStatusHandle.Dispose();
+
+ //Dispose object rental if we own it
+ if (_ownsObjectRenal && _requestRental is IDisposable disp)
+ {
+ disp.Dispose();
+ }
}
}
}
diff --git a/lib/Net.Messaging.FBM/src/Client/FBMClientFactory.cs b/lib/Net.Messaging.FBM/src/Client/FBMClientFactory.cs
new file mode 100644
index 0000000..51d3768
--- /dev/null
+++ b/lib/Net.Messaging.FBM/src/Client/FBMClientFactory.cs
@@ -0,0 +1,85 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Net.Messaging.FBM
+* File: FBMClientFactory.cs
+*
+* FBMClientFactory.cs is part of VNLib.Net.Messaging.FBM which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+
+using VNLib.Utils.Memory.Caching;
+
+namespace VNLib.Net.Messaging.FBM.Client
+{
+ /// <summary>
+ /// An FBMClient factory that creates immutable clients from fbm
+ /// websockets
+ /// </summary>
+ public sealed class FBMClientFactory: ICacheHolder
+ {
+ private readonly ObjectRental<FBMRequest> _internalRequestPool;
+ private readonly FBMClientConfig _config;
+ private readonly IFbmWebsocketFactory _socketMan;
+
+ /// <summary>
+ /// Initlaizes a new client factory from the websocket manager
+ /// </summary>
+ /// <param name="config">The configuration state</param>
+ /// <param name="webSocketManager">The client websocket factory</param>
+ /// <exception cref="ArgumentNullException"></exception>
+ public FBMClientFactory(in FBMClientConfig config, IFbmWebsocketFactory webSocketManager)
+ {
+ _config = config;
+ _ = config.MemoryManager ?? throw new ArgumentException("The client memory manager must not be null", nameof(config));
+ _socketMan = webSocketManager ?? throw new ArgumentNullException(nameof(webSocketManager));
+ _internalRequestPool = ObjectRental.CreateReusable(ReuseableRequestConstructor, 1000);
+ }
+
+ /// <summary>
+ /// The configuration for the current client
+ /// </summary>
+ public ref readonly FBMClientConfig Config => ref _config;
+
+ /// <summary>
+ /// Allocates and configures a new <see cref="FBMRequest"/> message object for use within the reusable store
+ /// </summary>
+ /// <returns>The configured <see cref="FBMRequest"/></returns>
+ private FBMRequest ReuseableRequestConstructor() => new(in _config);
+
+ /// <summary>
+ /// Initializes a new websocket and creates a new <see cref="FBMClient"/> instance
+ /// </summary>
+ /// <returns>The initialized FBM client instance</returns>
+ public FBMClient CreateClient()
+ {
+ //Init new socket
+ IFbmClientWebsocket socket = _socketMan.CreateWebsocket(in _config);
+
+ //Create client wrapper
+ return new(in _config, socket, _internalRequestPool);
+ }
+
+ ///<inheritdoc/>
+ public void CacheClear() => _internalRequestPool.CacheClear();
+
+ ///<inheritdoc/>
+ public void CacheHardClear() => _internalRequestPool.CacheHardClear();
+ }
+}
diff --git a/lib/Net.Messaging.FBM/src/Client/FBMClientWorkerBase.cs b/lib/Net.Messaging.FBM/src/Client/FBMClientWorkerBase.cs
deleted file mode 100644
index b4056dc..0000000
--- a/lib/Net.Messaging.FBM/src/Client/FBMClientWorkerBase.cs
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
-* Copyright (c) 2022 Vaughn Nugent
-*
-* Library: VNLib
-* Package: VNLib.Net.Messaging.FBM
-* File: FBMClientWorkerBase.cs
-*
-* FBMClientWorkerBase.cs is part of VNLib.Net.Messaging.FBM which is part of the larger
-* VNLib collection of libraries and utilities.
-*
-* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify
-* it under the terms of the GNU Affero General Public License as
-* published by the Free Software Foundation, either version 3 of the
-* License, or (at your option) any later version.
-*
-* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-* GNU Affero General Public License for more details.
-*
-* You should have received a copy of the GNU Affero General Public License
-* along with this program. If not, see https://www.gnu.org/licenses/.
-*/
-
-using System;
-using System.Threading;
-using System.Threading.Tasks;
-
-using VNLib.Utils;
-
-namespace VNLib.Net.Messaging.FBM.Client
-{
- /// <summary>
- /// A base class for objects that implement <see cref="FBMClient"/>
- /// operations
- /// </summary>
- public abstract class FBMClientWorkerBase : VnDisposeable, IStatefulConnection
- {
- /// <summary>
- /// Allows configuration of websocket configuration options
- /// </summary>
- public ManagedClientWebSocket SocketOptions => Client.ClientSocket;
-
-#nullable disable
- /// <summary>
- /// The <see cref="FBMClient"/> to sent requests from
- /// </summary>
- public FBMClient Client { get; private set; }
-
- /// <summary>
- /// Raised when the client has connected successfully
- /// </summary>
- public event Action<FBMClient, FBMClientWorkerBase> Connected;
-#nullable enable
-
- ///<inheritdoc/>
- public event EventHandler ConnectionClosed
- {
- add => Client.ConnectionClosed += value;
- remove => Client.ConnectionClosed -= value;
- }
-
- /// <summary>
- /// Creates and initializes a the internal <see cref="FBMClient"/>
- /// </summary>
- /// <param name="config">The client config</param>
- protected void InitClient(in FBMClientConfig config)
- {
- Client = new(config);
- Client.ConnectionClosedOnError += Client_ConnectionClosedOnError;
- Client.ConnectionClosed += Client_ConnectionClosed;
- }
-
- private void Client_ConnectionClosed(object? sender, EventArgs e) => OnDisconnected();
- private void Client_ConnectionClosedOnError(object? sender, FMBClientErrorEventArgs e) => OnError(e);
-
- /// <summary>
- /// Asynchronously connects to a remote server by the specified uri
- /// </summary>
- /// <param name="serverUri">The remote uri of a server to connect to</param>
- /// <param name="cancellationToken">A token to cancel the connect operation</param>
- /// <returns>A task that compeltes when the client has connected to the remote server</returns>
- public virtual async Task ConnectAsync(Uri serverUri, CancellationToken cancellationToken = default)
- {
- //Connect to server
- await Client.ConnectAsync(serverUri, cancellationToken).ConfigureAwait(true);
- //Invoke child on-connected event
- OnConnected();
- Connected?.Invoke(Client, this);
- }
-
- /// <summary>
- /// Asynchronously disonnects a client only if the client is currently connected,
- /// returns otherwise
- /// </summary>
- /// <param name="cancellationToken"></param>
- /// <returns>A task that compeltes when the client has disconnected</returns>
- public virtual Task DisconnectAsync(CancellationToken cancellationToken = default)
- {
- return Client.DisconnectAsync(cancellationToken);
- }
-
- /// <summary>
- /// Invoked when a client has successfully connected to the remote server
- /// </summary>
- protected abstract void OnConnected();
- /// <summary>
- /// Invoked when the client has disconnected cleanly
- /// </summary>
- protected abstract void OnDisconnected();
- /// <summary>
- /// Invoked when the connected client is closed because of a connection error
- /// </summary>
- /// <param name="e">A <see cref="EventArgs"/> that contains the client error data</param>
- protected abstract void OnError(FMBClientErrorEventArgs e);
-
- ///<inheritdoc/>
- protected override void Free()
- {
- Client.ConnectionClosedOnError -= Client_ConnectionClosedOnError;
- Client.ConnectionClosed -= Client_ConnectionClosed;
- Client.Dispose();
- }
- }
-}
diff --git a/lib/Net.Messaging.FBM/src/Client/FBMFallbackClientWsFactory.cs b/lib/Net.Messaging.FBM/src/Client/FBMFallbackClientWsFactory.cs
new file mode 100644
index 0000000..5ee4142
--- /dev/null
+++ b/lib/Net.Messaging.FBM/src/Client/FBMFallbackClientWsFactory.cs
@@ -0,0 +1,117 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Net.Messaging.FBM
+* File: FBMFallbackClientWsFactory.cs
+*
+* FBMFallbackClientWsFactory.cs is part of VNLib.Net.Messaging.FBM which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+using System.Buffers;
+using System.Threading;
+using System.Net.WebSockets;
+using System.Threading.Tasks;
+
+using VNLib.Net.Http;
+
+namespace VNLib.Net.Messaging.FBM.Client
+{
+ /// <summary>
+ /// Creates a new <see cref="IFbmWebsocketFactory"/> that builds new client websockets
+ /// on demand using the <see cref="ClientWebSocket"/> .NET default implementation
+ /// </summary>
+ public class FBMFallbackClientWsFactory : IFbmWebsocketFactory
+ {
+ private readonly Action<ClientWebSocketOptions>? _onConfigure;
+
+ /// <summary>
+ /// Initalizes a new <see cref="FBMFallbackClientWsFactory"/> instance
+ /// </summary>
+ /// <param name="onConfigureSocket">A callback function that allows users to configure sockets when created</param>
+ public FBMFallbackClientWsFactory(Action<ClientWebSocketOptions>? onConfigureSocket = null)
+ => _onConfigure = onConfigureSocket;
+
+ ///<inheritdoc/>
+ public IFbmClientWebsocket CreateWebsocket(in FBMClientConfig clientConfig)
+ {
+ ClientWebSocket socket = new();
+
+ socket.Options.KeepAliveInterval = clientConfig.KeepAliveInterval;
+
+ //Setup the socket receive buffer
+ byte[] poolBuffer = ArrayPool<byte>.Shared.Rent(clientConfig.MaxMessageSize);
+ socket.Options.SetBuffer(clientConfig.MaxMessageSize, clientConfig.MaxMessageSize, poolBuffer);
+
+ //If config specifies a sub protocol, set it
+ if (!string.IsNullOrEmpty(clientConfig.SubProtocol))
+ {
+ socket.Options.AddSubProtocol(clientConfig.SubProtocol);
+ }
+
+ //invoke client configuration user callback
+ _onConfigure?.Invoke(socket.Options);
+
+ return new FBMWebsocket(socket, poolBuffer);
+ }
+
+ private sealed record class FBMWebsocket(ClientWebSocket Socket, byte[] Buffer) : IFbmClientWebsocket
+ {
+ ///<inheritdoc/>
+ public async Task ConnectAsync(Uri address, VnWebHeaderCollection headers, CancellationToken cancellation)
+ {
+ //Set headers
+ for (int i = 0; i < headers.Count; i++)
+ {
+ string name = headers.GetKey(i);
+ string? value = headers.Get(i);
+ //Set header
+ Socket.Options.SetRequestHeader(name, value);
+ }
+
+ //Connect to server
+ await Socket.ConnectAsync(address, cancellation);
+ }
+
+ ///<inheritdoc/>
+ public Task DisconnectAsync(WebSocketCloseStatus status, CancellationToken cancellation)
+ {
+ if (Socket?.State == WebSocketState.Open || Socket?.State == WebSocketState.CloseSent)
+ {
+ return Socket.CloseOutputAsync(status, "Socket disconnected", cancellation);
+ }
+ return Task.CompletedTask;
+ }
+
+ ///<inheritdoc/>
+ public ValueTask<ValueWebSocketReceiveResult> ReceiveAsync(Memory<byte> buffer, CancellationToken cancellationToken)
+ => Socket.ReceiveAsync(buffer, cancellationToken);
+
+ ///<inheritdoc/>
+ public ValueTask SendAsync(ReadOnlyMemory<byte> buffer, WebSocketMessageType messageType, bool endOfMessage, CancellationToken cancellationToken)
+ => Socket.SendAsync(buffer, messageType, endOfMessage, cancellationToken);
+
+ public void Dispose()
+ {
+ //Remove buffer refs and return to pool
+ Socket.Dispose();
+ ArrayPool<byte>.Shared.Return(Buffer);
+ }
+ }
+ }
+}
diff --git a/lib/Net.Messaging.FBM/src/Client/IFbmClientWebsocket.cs b/lib/Net.Messaging.FBM/src/Client/IFbmClientWebsocket.cs
new file mode 100644
index 0000000..bea9c28
--- /dev/null
+++ b/lib/Net.Messaging.FBM/src/Client/IFbmClientWebsocket.cs
@@ -0,0 +1,76 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Net.Messaging.FBM
+* File: IFbmClientWebsocket.cs
+*
+* IFbmClientWebsocket.cs is part of VNLib.Net.Messaging.FBM which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+using System.Threading;
+using System.Net.WebSockets;
+using System.Threading.Tasks;
+
+using VNLib.Net.Http;
+
+namespace VNLib.Net.Messaging.FBM.Client
+{
+ /// <summary>
+ /// Represents a client websocket
+ /// </summary>
+ public interface IFbmClientWebsocket: IDisposable
+ {
+ /// <summary>
+ /// Connects the client to the remote server at the specified address,
+ /// with the supplied headers
+ /// </summary>
+ /// <param name="address">The server address to connect to</param>
+ /// <param name="headers">A header collection used when making the initial upgrade request to the server</param>
+ /// <param name="cancellation">A token to cancel the connect operation</param>
+ /// <returns>A task that completes when the socket connection has been established</returns>
+ Task ConnectAsync(Uri address, VnWebHeaderCollection headers, CancellationToken cancellation);
+
+ /// <summary>
+ /// Cleanly disconnects the connected web socket from the
+ /// remote server.
+ /// </summary>
+ /// <param name="status">The websocket status to send on closure</param>
+ /// <param name="cancellation">A token to cancel the operation</param>
+ /// <returns>A task that completes when the operation complets</returns>
+ Task DisconnectAsync(WebSocketCloseStatus status, CancellationToken cancellation);
+
+ /// <summary>
+ /// Sends the supplied memory segment to the connected server
+ /// </summary>
+ /// <param name="buffer">The data buffer to send to the server</param>
+ /// <param name="messageType">A websocket message type</param>
+ /// <param name="endOfMessage">A value that indicates if the segment is the last message in the sequence</param>
+ /// <param name="cancellationToken">A token to cancel the send operation</param>
+ /// <returns>A value task that resovles when the message has been sent</returns>
+ ValueTask SendAsync(ReadOnlyMemory<byte> buffer, WebSocketMessageType messageType, bool endOfMessage, CancellationToken cancellationToken);
+
+ /// <summary>
+ /// Receives data from the connected server and write data to the supplied buffer
+ /// </summary>
+ /// <param name="buffer">The buffer to write data to</param>
+ /// <param name="cancellationToken">A token to cancel the read operation</param>
+ /// <returns>A value task that completes with the receive result</returns>
+ ValueTask<ValueWebSocketReceiveResult> ReceiveAsync(Memory<byte> buffer, CancellationToken cancellationToken);
+ }
+}
diff --git a/lib/Net.Messaging.FBM/src/Client/IFbmWebsocketFactory.cs b/lib/Net.Messaging.FBM/src/Client/IFbmWebsocketFactory.cs
new file mode 100644
index 0000000..24aff0c
--- /dev/null
+++ b/lib/Net.Messaging.FBM/src/Client/IFbmWebsocketFactory.cs
@@ -0,0 +1,40 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Net.Messaging.FBM
+* File: IFbmWebsocketFactory.cs
+*
+* IFbmWebsocketFactory.cs is part of VNLib.Net.Messaging.FBM which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+
+namespace VNLib.Net.Messaging.FBM.Client
+{
+ /// <summary>
+ /// Represents a factory that creates new client websockets on demand
+ /// </summary>
+ public interface IFbmWebsocketFactory
+ {
+ /// <summary>
+ /// Creates a client websocket for new connections, may be used once and discarded
+ /// </summary>
+ /// <param name="clientConfig">The fbm client configuration requesting the new websocket</param>
+ /// <returns>The client websocket</returns>
+ IFbmClientWebsocket CreateWebsocket(in FBMClientConfig clientConfig);
+ }
+}
diff --git a/lib/Net.Messaging.FBM/src/Client/ManagedClientWebSocket.cs b/lib/Net.Messaging.FBM/src/Client/ManagedClientWebSocket.cs
deleted file mode 100644
index fc2e417..0000000
--- a/lib/Net.Messaging.FBM/src/Client/ManagedClientWebSocket.cs
+++ /dev/null
@@ -1,203 +0,0 @@
-/*
-* Copyright (c) 2023 Vaughn Nugent
-*
-* Library: VNLib
-* Package: VNLib.Net.Messaging.FBM
-* File: ManagedClientWebSocket.cs
-*
-* ManagedClientWebSocket.cs is part of VNLib.Net.Messaging.FBM which is part of the larger
-* VNLib collection of libraries and utilities.
-*
-* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify
-* it under the terms of the GNU Affero General Public License as
-* published by the Free Software Foundation, either version 3 of the
-* License, or (at your option) any later version.
-*
-* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-* GNU Affero General Public License for more details.
-*
-* You should have received a copy of the GNU Affero General Public License
-* along with this program. If not, see https://www.gnu.org/licenses/.
-*/
-
-using System;
-using System.Net;
-using System.Threading;
-using System.Net.Security;
-using System.Net.WebSockets;
-using System.Threading.Tasks;
-using System.Security.Cryptography.X509Certificates;
-
-using VNLib.Utils.Memory;
-
-namespace VNLib.Net.Messaging.FBM.Client
-{
-
- /// <summary>
- /// A wrapper container to manage client websockets
- /// </summary>
- public class ManagedClientWebSocket : WebSocket
- {
- private readonly int TxBufferSize;
- private readonly int RxBufferSize;
- private readonly TimeSpan KeepAliveInterval;
- private readonly ArrayPoolBuffer<byte> _dataBuffer;
- private readonly string? _subProtocol;
-
- /// <summary>
- /// A collection of headers to add to the client
- /// </summary>
- public WebHeaderCollection Headers { get; }
-
- public X509CertificateCollection Certificates { get; }
-
- public IWebProxy? Proxy { get; set; }
-
- public CookieContainer? Cookies { get; set; }
-
- public RemoteCertificateValidationCallback? RemoteCertificateValidationCallback { get; set; }
-
-
- private ClientWebSocket? _socket;
-
- /// <summary>
- /// Initiaizes a new <see cref="ManagedClientWebSocket"/> that accepts an optional sub-protocol for connections
- /// </summary>
- /// <param name="txBufferSize">The size (in bytes) of the send buffer size</param>
- /// <param name="rxBufferSize">The size (in bytes) of the receive buffer size to use</param>
- /// <param name="keepAlive">The WS keepalive interval</param>
- /// <param name="subProtocol">The optional sub-protocol to use</param>
- public ManagedClientWebSocket(int txBufferSize, int rxBufferSize, TimeSpan keepAlive, string? subProtocol)
- {
- //Init header collection
- Headers = new();
- Certificates = new();
- //Alloc buffer
- _dataBuffer = new(rxBufferSize);
- TxBufferSize = txBufferSize;
- RxBufferSize = rxBufferSize;
- KeepAliveInterval = keepAlive;
- _subProtocol = subProtocol;
- }
-
- /// <summary>
- /// Asyncrhonously prepares a new client web-socket and connects to the remote endpoint
- /// </summary>
- /// <param name="serverUri">The endpoint to connect to</param>
- /// <param name="token">A token to cancel the connect operation</param>
- /// <returns>A task that compeltes when the client has connected</returns>
- public async Task ConnectAsync(Uri serverUri, CancellationToken token)
- {
- //Init new socket
- _socket = new();
- try
- {
- //Set buffer
- _socket.Options.SetBuffer(RxBufferSize, TxBufferSize, _dataBuffer.AsArraySegment());
- //Set remaining stored options
- _socket.Options.ClientCertificates = Certificates;
- _socket.Options.KeepAliveInterval = KeepAliveInterval;
- _socket.Options.Cookies = Cookies;
- _socket.Options.Proxy = Proxy;
- _socket.Options.RemoteCertificateValidationCallback = RemoteCertificateValidationCallback;
- //Specify sub-protocol
- if (!string.IsNullOrEmpty(_subProtocol))
- {
- _socket.Options.AddSubProtocol(_subProtocol);
- }
- //Set headers
- for (int i = 0; i < Headers.Count; i++)
- {
- string name = Headers.GetKey(i);
- string? value = Headers.Get(i);
- //Set header
- _socket.Options.SetRequestHeader(name, value);
- }
- //Connect to server
- await _socket.ConnectAsync(serverUri, token);
- }
- catch
- {
- //Cleanup the socket
- Cleanup();
- throw;
- }
- }
-
- /// <summary>
- /// Cleans up internal resources to prepare for another connection
- /// </summary>
- public void Cleanup()
- {
- //Dispose old socket if set
- _socket?.Dispose();
- _socket = null;
- }
- ///<inheritdoc/>
- public override WebSocketCloseStatus? CloseStatus => _socket?.CloseStatus;
- ///<inheritdoc/>
- public override string CloseStatusDescription => _socket?.CloseStatusDescription ?? string.Empty;
- ///<inheritdoc/>
- public override WebSocketState State => _socket?.State ?? WebSocketState.Closed;
- ///<inheritdoc/>
- public override string SubProtocol => _subProtocol ?? string.Empty;
-
-
- ///<inheritdoc/>
- public override void Abort()
- {
- _socket?.Abort();
- }
- ///<inheritdoc/>
- public override Task CloseAsync(WebSocketCloseStatus closeStatus, string? statusDescription, CancellationToken cancellationToken)
- {
- return _socket?.CloseAsync(closeStatus, statusDescription, cancellationToken) ?? Task.CompletedTask;
- }
- ///<inheritdoc/>
- public override Task CloseOutputAsync(WebSocketCloseStatus closeStatus, string? statusDescription, CancellationToken cancellationToken)
- {
- if (_socket?.State == WebSocketState.Open || _socket?.State == WebSocketState.CloseSent)
- {
- return _socket.CloseOutputAsync(closeStatus, statusDescription, cancellationToken);
- }
- return Task.CompletedTask;
- }
- ///<inheritdoc/>
- public override ValueTask<ValueWebSocketReceiveResult> ReceiveAsync(Memory<byte> buffer, CancellationToken cancellationToken)
- {
- _ = _socket ?? throw new WebSocketException(WebSocketError.ConnectionClosedPrematurely, "The connected socket has been disconnected");
-
- return _socket.ReceiveAsync(buffer, cancellationToken);
- }
- ///<inheritdoc/>
- public override Task<WebSocketReceiveResult> ReceiveAsync(ArraySegment<byte> buffer, CancellationToken cancellationToken)
- {
- _ = _socket ?? throw new WebSocketException(WebSocketError.ConnectionClosedPrematurely, "The connected socket has been disconnected");
-
- return _socket.ReceiveAsync(buffer, cancellationToken);
- }
- ///<inheritdoc/>
- public override ValueTask SendAsync(ReadOnlyMemory<byte> buffer, WebSocketMessageType messageType, bool endOfMessage, CancellationToken cancellationToken)
- {
- _ = _socket ?? throw new WebSocketException(WebSocketError.ConnectionClosedPrematurely, "The connected socket has been disconnected");
- return _socket.SendAsync(buffer, messageType, endOfMessage, cancellationToken);
- }
- ///<inheritdoc/>
- public override Task SendAsync(ArraySegment<byte> buffer, WebSocketMessageType messageType, bool endOfMessage, CancellationToken cancellationToken)
- {
- _ = _socket ?? throw new WebSocketException(WebSocketError.ConnectionClosedPrematurely, "The connected socket has been disconnected");
- return _socket.SendAsync(buffer, messageType, endOfMessage, cancellationToken);
- }
-
- ///<inheritdoc/>
- public override void Dispose()
- {
- //Free buffer
- _dataBuffer.Dispose();
- _socket?.Dispose();
- GC.SuppressFinalize(this);
- }
- }
-}