diff options
author | vnugent <public@vaughnnugent.com> | 2023-11-29 00:15:28 -0500 |
---|---|---|
committer | vnugent <public@vaughnnugent.com> | 2023-11-29 00:15:28 -0500 |
commit | 29371caa9c29fd6cfdfb238d98f53fda59e2e8a7 (patch) | |
tree | 47bb4d4726f2cafb1af41460d3356519b4074198 /lib/Net.Messaging.FBM | |
parent | 07824a130c7608337a36382dbfa40198a8c70297 (diff) |
immutable fbm clients, websocket abstractions, prep for monocypher/argon2 bindings
Diffstat (limited to 'lib/Net.Messaging.FBM')
7 files changed, 407 insertions, 384 deletions
diff --git a/lib/Net.Messaging.FBM/src/Client/FBMClient.cs b/lib/Net.Messaging.FBM/src/Client/FBMClient.cs index ee64d53..9628313 100644 --- a/lib/Net.Messaging.FBM/src/Client/FBMClient.cs +++ b/lib/Net.Messaging.FBM/src/Client/FBMClient.cs @@ -47,7 +47,7 @@ namespace VNLib.Net.Messaging.FBM.Client /// A Fixed Buffer Message Protocol client. Allows for high performance client-server messaging /// with minimal memory overhead. /// </summary> - public class FBMClient : VnDisposeable, IStatefulConnection, ICacheHolder + public class FBMClient : VnDisposeable, IStatefulConnection { /// <summary> /// The WS connection query arguments to specify a receive buffer size @@ -77,9 +77,12 @@ namespace VNLib.Net.Messaging.FBM.Client private readonly SemaphoreSlim SendLock; private readonly ConcurrentDictionary<int, FBMRequest> ActiveRequests; - private readonly ObjectRental<FBMRequest> RequestRental; + private readonly IFBMMemoryHandle _streamBuffer; + private readonly IFbmClientWebsocket _socket; private readonly FBMClientConfig _config; - private readonly byte[] _streamBuffer; + + private readonly IObjectRental<FBMRequest> _requestRental; + private readonly bool _ownsObjectRenal; /// <summary> /// The configuration for the current client @@ -90,34 +93,63 @@ namespace VNLib.Net.Messaging.FBM.Client /// A handle that is reset when a connection has been successfully set, and is set /// when the connection exists /// </summary> - public ManualResetEvent ConnectionStatusHandle { get; } - + public ManualResetEvent ConnectionStatusHandle { get; } + /// <summary> - /// The <see cref="ClientWebSocket"/> to send/recieve message on + /// The client's http header collection used when making connections /// </summary> - public ManagedClientWebSocket ClientSocket { get; } - + public VnWebHeaderCollection Headers { get; } /// <summary> - /// Creates a new <see cref="FBMClient"/> in a closed state + /// Creates an immutable FBMClient that wraps the supplied web socket using the + /// supplied config. /// </summary> - /// <param name="config">The client configuration</param> - public FBMClient(FBMClientConfig config) + /// <param name="config">The client config</param> + /// <param name="websocket">The websocket instance used to comunicate with an FBMServer</param> + public FBMClient(in FBMClientConfig config, IFbmClientWebsocket websocket) + :this(in config, websocket, null) { - RequestRental = ObjectRental.CreateReusable(ReuseableRequestConstructor, 200); + } + + internal FBMClient(in FBMClientConfig config, IFbmClientWebsocket websocket, IObjectRental<FBMRequest>? requestRental) + { + _config = config; + _socket = websocket ?? throw new ArgumentNullException(nameof(websocket)); + + _ = config.MemoryManager ?? throw new ArgumentException("FBM memory manager is required", nameof(config)); + + //Create new request rental if none supplied + if(requestRental is null) + { + _ownsObjectRenal = true; + _requestRental = ObjectRental.CreateReusable(ReuseableRequestConstructor, 100); + } + else + { + _requestRental = requestRental; + } + + Headers = new(); SendLock = new(1); ConnectionStatusHandle = new(true); ActiveRequests = new(Environment.ProcessorCount, 100); - _config = config; - //Init the new client socket - ClientSocket = new(config.RecvBufferSize, config.RecvBufferSize, config.KeepAliveInterval, config.SubProtocol); - - //Init the stream buffer + /* + * We can use the pool to allocate a single stream buffer that will be shared. + * This is because there is only 1 thread allowed to send/copy data at a time + * so it can be allocated once and shared + */ int maxStrmBufSize = Math.Min(config.MaxMessageSize, MAX_STREAM_BUFFER_SIZE); - _streamBuffer = new byte[maxStrmBufSize]; + _streamBuffer = config.MemoryManager.InitHandle(); + config.MemoryManager.AllocBuffer(_streamBuffer, maxStrmBufSize); } + /// <summary> + /// Allocates and configures a new <see cref="FBMRequest"/> message object for use within the reusable store + /// </summary> + /// <returns>The configured <see cref="FBMRequest"/></returns> + protected virtual FBMRequest ReuseableRequestConstructor() => new(in _config); + private void Debug(string format, params string[] strings) { if(Config.DebugLog != null) @@ -125,6 +157,7 @@ namespace VNLib.Net.Messaging.FBM.Client Config.DebugLog.Debug($"[DEBUG] FBM Client: {format}", strings); } } + private void Debug(string format, long value, long other) { if (Config.DebugLog != null) @@ -132,12 +165,7 @@ namespace VNLib.Net.Messaging.FBM.Client Config.DebugLog.Debug($"[DEBUG] FBM Client: {format}", value, other); } } - - /// <summary> - /// Allocates and configures a new <see cref="FBMRequest"/> message object for use within the reusable store - /// </summary> - /// <returns>The configured <see cref="FBMRequest"/></returns> - protected virtual FBMRequest ReuseableRequestConstructor() => new(in _config); + /// <summary> /// Asynchronously opens a websocket connection with the specifed remote server @@ -157,28 +185,28 @@ namespace VNLib.Net.Messaging.FBM.Client Debug("Connection string {con}", urib.Uri.ToString()); //Connect to server - await ClientSocket.ConnectAsync(urib.Uri, cancellationToken); + await _socket.ConnectAsync(urib.Uri, Headers, cancellationToken); //Reset wait handle before return ConnectionStatusHandle.Reset(); - //Begin listeing for requets in a background task - _ = Task.Run(() => ProcessContinuousRecvAsync(ClientSocket), cancellationToken); + //Begin listeing for requests in a background task + _ = Task.Run(ProcessContinuousRecvAsync, cancellationToken); } /// <summary> - /// Rents a new <see cref="FBMRequest"/> from the internal <see cref="ReusableStore{T}"/>. + /// Rents a new <see cref="FBMRequest"/> from the internal <see cref="ObjectRental{T}"/>. /// Use <see cref="ReturnRequest(FBMRequest)"/> when request is no longer in use /// </summary> /// <returns>The configured (rented or new) <see cref="FBMRequest"/> ready for use</returns> - public FBMRequest RentRequest() => RequestRental.Rent(); + public FBMRequest RentRequest() => _requestRental.Rent(); /// <summary> /// Stores (or returns) the reusable request in cache for use with <see cref="RentRequest"/> /// </summary> /// <param name="request">The request to return to the store</param> /// <exception cref="InvalidOperationException"></exception> - public void ReturnRequest(FBMRequest request) => RequestRental.Return(request); + public void ReturnRequest(FBMRequest request) => _requestRental.Return(request); /// <summary> /// Sends a <see cref="FBMRequest"/> to the connected server @@ -190,7 +218,7 @@ namespace VNLib.Net.Messaging.FBM.Client /// <exception cref="ObjectDisposedException"></exception> /// <exception cref="InvalidOperationException"></exception> /// <exception cref="FBMInvalidRequestException"></exception> - public Task<FBMResponse> SendAsync(FBMRequest request, CancellationToken cancellationToken = default) => SendAsync(request, _config.RequestTimeout, cancellationToken); + public Task<FBMResponse> SendAsync(FBMRequest request, CancellationToken cancellationToken = default) => SendAsync(request, Config.RequestTimeout, cancellationToken); /// <summary> /// Sends a <see cref="FBMRequest"/> to the connected server @@ -229,7 +257,7 @@ namespace VNLib.Net.Messaging.FBM.Client using (SemSlimReleaser releaser = await SendLock.GetReleaserAsync(cancellationToken)) { //Send the data to the server - await ClientSocket.SendAsync(requestData, WebSocketMessageType.Binary, true, cancellationToken); + await _socket.SendAsync(requestData, WebSocketMessageType.Binary, true, cancellationToken); } //wait for the response to be set @@ -267,7 +295,7 @@ namespace VNLib.Net.Messaging.FBM.Client /// <exception cref="ObjectDisposedException"></exception> /// <exception cref="InvalidOperationException"></exception> public Task<FBMResponse> StreamDataAsync(FBMRequest request, Stream payload, ContentType contentType, CancellationToken cancellationToken = default) - => StreamDataAsync(request, payload, contentType, _config.RequestTimeout, cancellationToken); + => StreamDataAsync(request, payload, contentType, Config.RequestTimeout, cancellationToken); /// <summary> /// Streams arbitrary binary data to the server with the initial request message @@ -283,6 +311,8 @@ namespace VNLib.Net.Messaging.FBM.Client /// <exception cref="InvalidOperationException"></exception> public async Task<FBMResponse> StreamDataAsync(FBMRequest request, Stream payload, ContentType contentType, TimeSpan timeout, CancellationToken cancellationToken = default) { + _ = payload ?? throw new ArgumentNullException(nameof(payload)); + Check(); cancellationToken.ThrowIfCancellationRequested(); @@ -303,17 +333,19 @@ namespace VNLib.Net.Messaging.FBM.Client //Write an empty body in the request so a content type header is writen request.WriteBody(ReadOnlySpan<byte>.Empty, contentType); + Memory<byte> bufferMemory = _streamBuffer.GetMemory(); + //Wait for send-lock using (SemSlimReleaser releaser = await SendLock.GetReleaserAsync(cancellationToken)) { //Send the initial request packet - await ClientSocket.SendAsync(requestData, WebSocketMessageType.Binary, false, cancellationToken); + await _socket.SendAsync(requestData, WebSocketMessageType.Binary, false, cancellationToken); //Stream mesage body do { //Read data - int read = await payload.ReadAsync(_streamBuffer, cancellationToken); + int read = await payload.ReadAsync(bufferMemory, cancellationToken); if (read == 0) { @@ -322,7 +354,7 @@ namespace VNLib.Net.Messaging.FBM.Client } //write message to socket, if the read data was smaller than the buffer, we can send the last packet - await ClientSocket.SendAsync(_streamBuffer[..read], WebSocketMessageType.Binary, read < _streamBuffer.Length, cancellationToken); + await _socket.SendAsync(bufferMemory[..read], WebSocketMessageType.Binary, read < bufferMemory.Length, cancellationToken); } while (true); } @@ -358,15 +390,9 @@ namespace VNLib.Net.Messaging.FBM.Client { Check(); //Close the connection - await ClientSocket.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, "Closing", cancellationToken); + await _socket.DisconnectAsync(WebSocketCloseStatus.NormalClosure, cancellationToken); } - ///<inheritdoc/> - public void CacheClear() => RequestRental.CacheClear(); - ///<inheritdoc/> - public void CacheHardClear() => RequestRental.CacheHardClear(); - - private void CheckOrEnqueue(FBMRequest request) { /* @@ -404,25 +430,27 @@ namespace VNLib.Net.Messaging.FBM.Client /// until the socket is closed, or canceled /// </summary> /// <returns></returns> - protected async Task ProcessContinuousRecvAsync(WebSocket socket) + protected async Task ProcessContinuousRecvAsync() { Debug("Begining receive loop"); //Alloc recv buffer - IFBMMemoryHandle recvBuffer = _config.MemoryManager.InitHandle(); - _config.MemoryManager.AllocBuffer(recvBuffer, _config.RecvBufferSize); + IFBMMemoryHandle recvBuffer = Config.MemoryManager.InitHandle(); + Config.MemoryManager.AllocBuffer(recvBuffer, Config.RecvBufferSize); try { - if(!_config.MemoryManager.TryGetHeap(out IUnmangedHeap? heap)) + if(!Config.MemoryManager.TryGetHeap(out IUnmangedHeap? heap)) { throw new NotSupportedException("The memory manager must support using IUnmanagedHeaps"); } + Memory<byte> rcvMemory = recvBuffer.GetMemory(); + //Recv event loop while (true) { //Listen for incoming packets with the intial data buffer - ValueWebSocketReceiveResult result = await socket.ReceiveAsync(recvBuffer.GetMemory(), CancellationToken.None); + ValueWebSocketReceiveResult result = await _socket.ReceiveAsync(rcvMemory, CancellationToken.None); //If the message is a close message, its time to exit if (result.MessageType == WebSocketMessageType.Close) @@ -444,10 +472,10 @@ namespace VNLib.Net.Messaging.FBM.Client while (!result.EndOfMessage) { //recive more data - result = await socket.ReceiveAsync(recvBuffer.GetMemory(), CancellationToken.None); + result = await _socket.ReceiveAsync(rcvMemory, CancellationToken.None); //Make sure the buffer is not too large - if ((responseBuffer.Length + result.Count) > _config.MaxMessageSize) + if ((responseBuffer.Length + result.Count) > Config.MaxMessageSize) { //Dispose the buffer before exiting responseBuffer.Dispose(); @@ -485,9 +513,7 @@ namespace VNLib.Net.Messaging.FBM.Client finally { //Dispose the recv buffer - _config.MemoryManager.FreeBuffer(recvBuffer); - //Cleanup the socket when exiting - ClientSocket.Cleanup(); + Config.MemoryManager.FreeBuffer(recvBuffer); //Set status handle as unset ConnectionStatusHandle.Set(); @@ -573,12 +599,19 @@ namespace VNLib.Net.Messaging.FBM.Client ///<inheritdoc/> protected override void Free() { - //Dispose socket - ClientSocket.Dispose(); + //Free stream buffer + Config.MemoryManager.FreeBuffer(_streamBuffer); + //Dispose client buffer - RequestRental.Dispose(); + _socket.Dispose(); SendLock.Dispose(); ConnectionStatusHandle.Dispose(); + + //Dispose object rental if we own it + if (_ownsObjectRenal && _requestRental is IDisposable disp) + { + disp.Dispose(); + } } } } diff --git a/lib/Net.Messaging.FBM/src/Client/FBMClientFactory.cs b/lib/Net.Messaging.FBM/src/Client/FBMClientFactory.cs new file mode 100644 index 0000000..51d3768 --- /dev/null +++ b/lib/Net.Messaging.FBM/src/Client/FBMClientFactory.cs @@ -0,0 +1,85 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Net.Messaging.FBM +* File: FBMClientFactory.cs +* +* FBMClientFactory.cs is part of VNLib.Net.Messaging.FBM which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; + +using VNLib.Utils.Memory.Caching; + +namespace VNLib.Net.Messaging.FBM.Client +{ + /// <summary> + /// An FBMClient factory that creates immutable clients from fbm + /// websockets + /// </summary> + public sealed class FBMClientFactory: ICacheHolder + { + private readonly ObjectRental<FBMRequest> _internalRequestPool; + private readonly FBMClientConfig _config; + private readonly IFbmWebsocketFactory _socketMan; + + /// <summary> + /// Initlaizes a new client factory from the websocket manager + /// </summary> + /// <param name="config">The configuration state</param> + /// <param name="webSocketManager">The client websocket factory</param> + /// <exception cref="ArgumentNullException"></exception> + public FBMClientFactory(in FBMClientConfig config, IFbmWebsocketFactory webSocketManager) + { + _config = config; + _ = config.MemoryManager ?? throw new ArgumentException("The client memory manager must not be null", nameof(config)); + _socketMan = webSocketManager ?? throw new ArgumentNullException(nameof(webSocketManager)); + _internalRequestPool = ObjectRental.CreateReusable(ReuseableRequestConstructor, 1000); + } + + /// <summary> + /// The configuration for the current client + /// </summary> + public ref readonly FBMClientConfig Config => ref _config; + + /// <summary> + /// Allocates and configures a new <see cref="FBMRequest"/> message object for use within the reusable store + /// </summary> + /// <returns>The configured <see cref="FBMRequest"/></returns> + private FBMRequest ReuseableRequestConstructor() => new(in _config); + + /// <summary> + /// Initializes a new websocket and creates a new <see cref="FBMClient"/> instance + /// </summary> + /// <returns>The initialized FBM client instance</returns> + public FBMClient CreateClient() + { + //Init new socket + IFbmClientWebsocket socket = _socketMan.CreateWebsocket(in _config); + + //Create client wrapper + return new(in _config, socket, _internalRequestPool); + } + + ///<inheritdoc/> + public void CacheClear() => _internalRequestPool.CacheClear(); + + ///<inheritdoc/> + public void CacheHardClear() => _internalRequestPool.CacheHardClear(); + } +} diff --git a/lib/Net.Messaging.FBM/src/Client/FBMClientWorkerBase.cs b/lib/Net.Messaging.FBM/src/Client/FBMClientWorkerBase.cs deleted file mode 100644 index b4056dc..0000000 --- a/lib/Net.Messaging.FBM/src/Client/FBMClientWorkerBase.cs +++ /dev/null @@ -1,125 +0,0 @@ -/* -* Copyright (c) 2022 Vaughn Nugent -* -* Library: VNLib -* Package: VNLib.Net.Messaging.FBM -* File: FBMClientWorkerBase.cs -* -* FBMClientWorkerBase.cs is part of VNLib.Net.Messaging.FBM which is part of the larger -* VNLib collection of libraries and utilities. -* -* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License as -* published by the Free Software Foundation, either version 3 of the -* License, or (at your option) any later version. -* -* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see https://www.gnu.org/licenses/. -*/ - -using System; -using System.Threading; -using System.Threading.Tasks; - -using VNLib.Utils; - -namespace VNLib.Net.Messaging.FBM.Client -{ - /// <summary> - /// A base class for objects that implement <see cref="FBMClient"/> - /// operations - /// </summary> - public abstract class FBMClientWorkerBase : VnDisposeable, IStatefulConnection - { - /// <summary> - /// Allows configuration of websocket configuration options - /// </summary> - public ManagedClientWebSocket SocketOptions => Client.ClientSocket; - -#nullable disable - /// <summary> - /// The <see cref="FBMClient"/> to sent requests from - /// </summary> - public FBMClient Client { get; private set; } - - /// <summary> - /// Raised when the client has connected successfully - /// </summary> - public event Action<FBMClient, FBMClientWorkerBase> Connected; -#nullable enable - - ///<inheritdoc/> - public event EventHandler ConnectionClosed - { - add => Client.ConnectionClosed += value; - remove => Client.ConnectionClosed -= value; - } - - /// <summary> - /// Creates and initializes a the internal <see cref="FBMClient"/> - /// </summary> - /// <param name="config">The client config</param> - protected void InitClient(in FBMClientConfig config) - { - Client = new(config); - Client.ConnectionClosedOnError += Client_ConnectionClosedOnError; - Client.ConnectionClosed += Client_ConnectionClosed; - } - - private void Client_ConnectionClosed(object? sender, EventArgs e) => OnDisconnected(); - private void Client_ConnectionClosedOnError(object? sender, FMBClientErrorEventArgs e) => OnError(e); - - /// <summary> - /// Asynchronously connects to a remote server by the specified uri - /// </summary> - /// <param name="serverUri">The remote uri of a server to connect to</param> - /// <param name="cancellationToken">A token to cancel the connect operation</param> - /// <returns>A task that compeltes when the client has connected to the remote server</returns> - public virtual async Task ConnectAsync(Uri serverUri, CancellationToken cancellationToken = default) - { - //Connect to server - await Client.ConnectAsync(serverUri, cancellationToken).ConfigureAwait(true); - //Invoke child on-connected event - OnConnected(); - Connected?.Invoke(Client, this); - } - - /// <summary> - /// Asynchronously disonnects a client only if the client is currently connected, - /// returns otherwise - /// </summary> - /// <param name="cancellationToken"></param> - /// <returns>A task that compeltes when the client has disconnected</returns> - public virtual Task DisconnectAsync(CancellationToken cancellationToken = default) - { - return Client.DisconnectAsync(cancellationToken); - } - - /// <summary> - /// Invoked when a client has successfully connected to the remote server - /// </summary> - protected abstract void OnConnected(); - /// <summary> - /// Invoked when the client has disconnected cleanly - /// </summary> - protected abstract void OnDisconnected(); - /// <summary> - /// Invoked when the connected client is closed because of a connection error - /// </summary> - /// <param name="e">A <see cref="EventArgs"/> that contains the client error data</param> - protected abstract void OnError(FMBClientErrorEventArgs e); - - ///<inheritdoc/> - protected override void Free() - { - Client.ConnectionClosedOnError -= Client_ConnectionClosedOnError; - Client.ConnectionClosed -= Client_ConnectionClosed; - Client.Dispose(); - } - } -} diff --git a/lib/Net.Messaging.FBM/src/Client/FBMFallbackClientWsFactory.cs b/lib/Net.Messaging.FBM/src/Client/FBMFallbackClientWsFactory.cs new file mode 100644 index 0000000..5ee4142 --- /dev/null +++ b/lib/Net.Messaging.FBM/src/Client/FBMFallbackClientWsFactory.cs @@ -0,0 +1,117 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Net.Messaging.FBM +* File: FBMFallbackClientWsFactory.cs +* +* FBMFallbackClientWsFactory.cs is part of VNLib.Net.Messaging.FBM which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Buffers; +using System.Threading; +using System.Net.WebSockets; +using System.Threading.Tasks; + +using VNLib.Net.Http; + +namespace VNLib.Net.Messaging.FBM.Client +{ + /// <summary> + /// Creates a new <see cref="IFbmWebsocketFactory"/> that builds new client websockets + /// on demand using the <see cref="ClientWebSocket"/> .NET default implementation + /// </summary> + public class FBMFallbackClientWsFactory : IFbmWebsocketFactory + { + private readonly Action<ClientWebSocketOptions>? _onConfigure; + + /// <summary> + /// Initalizes a new <see cref="FBMFallbackClientWsFactory"/> instance + /// </summary> + /// <param name="onConfigureSocket">A callback function that allows users to configure sockets when created</param> + public FBMFallbackClientWsFactory(Action<ClientWebSocketOptions>? onConfigureSocket = null) + => _onConfigure = onConfigureSocket; + + ///<inheritdoc/> + public IFbmClientWebsocket CreateWebsocket(in FBMClientConfig clientConfig) + { + ClientWebSocket socket = new(); + + socket.Options.KeepAliveInterval = clientConfig.KeepAliveInterval; + + //Setup the socket receive buffer + byte[] poolBuffer = ArrayPool<byte>.Shared.Rent(clientConfig.MaxMessageSize); + socket.Options.SetBuffer(clientConfig.MaxMessageSize, clientConfig.MaxMessageSize, poolBuffer); + + //If config specifies a sub protocol, set it + if (!string.IsNullOrEmpty(clientConfig.SubProtocol)) + { + socket.Options.AddSubProtocol(clientConfig.SubProtocol); + } + + //invoke client configuration user callback + _onConfigure?.Invoke(socket.Options); + + return new FBMWebsocket(socket, poolBuffer); + } + + private sealed record class FBMWebsocket(ClientWebSocket Socket, byte[] Buffer) : IFbmClientWebsocket + { + ///<inheritdoc/> + public async Task ConnectAsync(Uri address, VnWebHeaderCollection headers, CancellationToken cancellation) + { + //Set headers + for (int i = 0; i < headers.Count; i++) + { + string name = headers.GetKey(i); + string? value = headers.Get(i); + //Set header + Socket.Options.SetRequestHeader(name, value); + } + + //Connect to server + await Socket.ConnectAsync(address, cancellation); + } + + ///<inheritdoc/> + public Task DisconnectAsync(WebSocketCloseStatus status, CancellationToken cancellation) + { + if (Socket?.State == WebSocketState.Open || Socket?.State == WebSocketState.CloseSent) + { + return Socket.CloseOutputAsync(status, "Socket disconnected", cancellation); + } + return Task.CompletedTask; + } + + ///<inheritdoc/> + public ValueTask<ValueWebSocketReceiveResult> ReceiveAsync(Memory<byte> buffer, CancellationToken cancellationToken) + => Socket.ReceiveAsync(buffer, cancellationToken); + + ///<inheritdoc/> + public ValueTask SendAsync(ReadOnlyMemory<byte> buffer, WebSocketMessageType messageType, bool endOfMessage, CancellationToken cancellationToken) + => Socket.SendAsync(buffer, messageType, endOfMessage, cancellationToken); + + public void Dispose() + { + //Remove buffer refs and return to pool + Socket.Dispose(); + ArrayPool<byte>.Shared.Return(Buffer); + } + } + } +} diff --git a/lib/Net.Messaging.FBM/src/Client/IFbmClientWebsocket.cs b/lib/Net.Messaging.FBM/src/Client/IFbmClientWebsocket.cs new file mode 100644 index 0000000..bea9c28 --- /dev/null +++ b/lib/Net.Messaging.FBM/src/Client/IFbmClientWebsocket.cs @@ -0,0 +1,76 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Net.Messaging.FBM +* File: IFbmClientWebsocket.cs +* +* IFbmClientWebsocket.cs is part of VNLib.Net.Messaging.FBM which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Threading; +using System.Net.WebSockets; +using System.Threading.Tasks; + +using VNLib.Net.Http; + +namespace VNLib.Net.Messaging.FBM.Client +{ + /// <summary> + /// Represents a client websocket + /// </summary> + public interface IFbmClientWebsocket: IDisposable + { + /// <summary> + /// Connects the client to the remote server at the specified address, + /// with the supplied headers + /// </summary> + /// <param name="address">The server address to connect to</param> + /// <param name="headers">A header collection used when making the initial upgrade request to the server</param> + /// <param name="cancellation">A token to cancel the connect operation</param> + /// <returns>A task that completes when the socket connection has been established</returns> + Task ConnectAsync(Uri address, VnWebHeaderCollection headers, CancellationToken cancellation); + + /// <summary> + /// Cleanly disconnects the connected web socket from the + /// remote server. + /// </summary> + /// <param name="status">The websocket status to send on closure</param> + /// <param name="cancellation">A token to cancel the operation</param> + /// <returns>A task that completes when the operation complets</returns> + Task DisconnectAsync(WebSocketCloseStatus status, CancellationToken cancellation); + + /// <summary> + /// Sends the supplied memory segment to the connected server + /// </summary> + /// <param name="buffer">The data buffer to send to the server</param> + /// <param name="messageType">A websocket message type</param> + /// <param name="endOfMessage">A value that indicates if the segment is the last message in the sequence</param> + /// <param name="cancellationToken">A token to cancel the send operation</param> + /// <returns>A value task that resovles when the message has been sent</returns> + ValueTask SendAsync(ReadOnlyMemory<byte> buffer, WebSocketMessageType messageType, bool endOfMessage, CancellationToken cancellationToken); + + /// <summary> + /// Receives data from the connected server and write data to the supplied buffer + /// </summary> + /// <param name="buffer">The buffer to write data to</param> + /// <param name="cancellationToken">A token to cancel the read operation</param> + /// <returns>A value task that completes with the receive result</returns> + ValueTask<ValueWebSocketReceiveResult> ReceiveAsync(Memory<byte> buffer, CancellationToken cancellationToken); + } +} diff --git a/lib/Net.Messaging.FBM/src/Client/IFbmWebsocketFactory.cs b/lib/Net.Messaging.FBM/src/Client/IFbmWebsocketFactory.cs new file mode 100644 index 0000000..24aff0c --- /dev/null +++ b/lib/Net.Messaging.FBM/src/Client/IFbmWebsocketFactory.cs @@ -0,0 +1,40 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Net.Messaging.FBM +* File: IFbmWebsocketFactory.cs +* +* IFbmWebsocketFactory.cs is part of VNLib.Net.Messaging.FBM which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + + +namespace VNLib.Net.Messaging.FBM.Client +{ + /// <summary> + /// Represents a factory that creates new client websockets on demand + /// </summary> + public interface IFbmWebsocketFactory + { + /// <summary> + /// Creates a client websocket for new connections, may be used once and discarded + /// </summary> + /// <param name="clientConfig">The fbm client configuration requesting the new websocket</param> + /// <returns>The client websocket</returns> + IFbmClientWebsocket CreateWebsocket(in FBMClientConfig clientConfig); + } +} diff --git a/lib/Net.Messaging.FBM/src/Client/ManagedClientWebSocket.cs b/lib/Net.Messaging.FBM/src/Client/ManagedClientWebSocket.cs deleted file mode 100644 index fc2e417..0000000 --- a/lib/Net.Messaging.FBM/src/Client/ManagedClientWebSocket.cs +++ /dev/null @@ -1,203 +0,0 @@ -/* -* Copyright (c) 2023 Vaughn Nugent -* -* Library: VNLib -* Package: VNLib.Net.Messaging.FBM -* File: ManagedClientWebSocket.cs -* -* ManagedClientWebSocket.cs is part of VNLib.Net.Messaging.FBM which is part of the larger -* VNLib collection of libraries and utilities. -* -* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License as -* published by the Free Software Foundation, either version 3 of the -* License, or (at your option) any later version. -* -* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see https://www.gnu.org/licenses/. -*/ - -using System; -using System.Net; -using System.Threading; -using System.Net.Security; -using System.Net.WebSockets; -using System.Threading.Tasks; -using System.Security.Cryptography.X509Certificates; - -using VNLib.Utils.Memory; - -namespace VNLib.Net.Messaging.FBM.Client -{ - - /// <summary> - /// A wrapper container to manage client websockets - /// </summary> - public class ManagedClientWebSocket : WebSocket - { - private readonly int TxBufferSize; - private readonly int RxBufferSize; - private readonly TimeSpan KeepAliveInterval; - private readonly ArrayPoolBuffer<byte> _dataBuffer; - private readonly string? _subProtocol; - - /// <summary> - /// A collection of headers to add to the client - /// </summary> - public WebHeaderCollection Headers { get; } - - public X509CertificateCollection Certificates { get; } - - public IWebProxy? Proxy { get; set; } - - public CookieContainer? Cookies { get; set; } - - public RemoteCertificateValidationCallback? RemoteCertificateValidationCallback { get; set; } - - - private ClientWebSocket? _socket; - - /// <summary> - /// Initiaizes a new <see cref="ManagedClientWebSocket"/> that accepts an optional sub-protocol for connections - /// </summary> - /// <param name="txBufferSize">The size (in bytes) of the send buffer size</param> - /// <param name="rxBufferSize">The size (in bytes) of the receive buffer size to use</param> - /// <param name="keepAlive">The WS keepalive interval</param> - /// <param name="subProtocol">The optional sub-protocol to use</param> - public ManagedClientWebSocket(int txBufferSize, int rxBufferSize, TimeSpan keepAlive, string? subProtocol) - { - //Init header collection - Headers = new(); - Certificates = new(); - //Alloc buffer - _dataBuffer = new(rxBufferSize); - TxBufferSize = txBufferSize; - RxBufferSize = rxBufferSize; - KeepAliveInterval = keepAlive; - _subProtocol = subProtocol; - } - - /// <summary> - /// Asyncrhonously prepares a new client web-socket and connects to the remote endpoint - /// </summary> - /// <param name="serverUri">The endpoint to connect to</param> - /// <param name="token">A token to cancel the connect operation</param> - /// <returns>A task that compeltes when the client has connected</returns> - public async Task ConnectAsync(Uri serverUri, CancellationToken token) - { - //Init new socket - _socket = new(); - try - { - //Set buffer - _socket.Options.SetBuffer(RxBufferSize, TxBufferSize, _dataBuffer.AsArraySegment()); - //Set remaining stored options - _socket.Options.ClientCertificates = Certificates; - _socket.Options.KeepAliveInterval = KeepAliveInterval; - _socket.Options.Cookies = Cookies; - _socket.Options.Proxy = Proxy; - _socket.Options.RemoteCertificateValidationCallback = RemoteCertificateValidationCallback; - //Specify sub-protocol - if (!string.IsNullOrEmpty(_subProtocol)) - { - _socket.Options.AddSubProtocol(_subProtocol); - } - //Set headers - for (int i = 0; i < Headers.Count; i++) - { - string name = Headers.GetKey(i); - string? value = Headers.Get(i); - //Set header - _socket.Options.SetRequestHeader(name, value); - } - //Connect to server - await _socket.ConnectAsync(serverUri, token); - } - catch - { - //Cleanup the socket - Cleanup(); - throw; - } - } - - /// <summary> - /// Cleans up internal resources to prepare for another connection - /// </summary> - public void Cleanup() - { - //Dispose old socket if set - _socket?.Dispose(); - _socket = null; - } - ///<inheritdoc/> - public override WebSocketCloseStatus? CloseStatus => _socket?.CloseStatus; - ///<inheritdoc/> - public override string CloseStatusDescription => _socket?.CloseStatusDescription ?? string.Empty; - ///<inheritdoc/> - public override WebSocketState State => _socket?.State ?? WebSocketState.Closed; - ///<inheritdoc/> - public override string SubProtocol => _subProtocol ?? string.Empty; - - - ///<inheritdoc/> - public override void Abort() - { - _socket?.Abort(); - } - ///<inheritdoc/> - public override Task CloseAsync(WebSocketCloseStatus closeStatus, string? statusDescription, CancellationToken cancellationToken) - { - return _socket?.CloseAsync(closeStatus, statusDescription, cancellationToken) ?? Task.CompletedTask; - } - ///<inheritdoc/> - public override Task CloseOutputAsync(WebSocketCloseStatus closeStatus, string? statusDescription, CancellationToken cancellationToken) - { - if (_socket?.State == WebSocketState.Open || _socket?.State == WebSocketState.CloseSent) - { - return _socket.CloseOutputAsync(closeStatus, statusDescription, cancellationToken); - } - return Task.CompletedTask; - } - ///<inheritdoc/> - public override ValueTask<ValueWebSocketReceiveResult> ReceiveAsync(Memory<byte> buffer, CancellationToken cancellationToken) - { - _ = _socket ?? throw new WebSocketException(WebSocketError.ConnectionClosedPrematurely, "The connected socket has been disconnected"); - - return _socket.ReceiveAsync(buffer, cancellationToken); - } - ///<inheritdoc/> - public override Task<WebSocketReceiveResult> ReceiveAsync(ArraySegment<byte> buffer, CancellationToken cancellationToken) - { - _ = _socket ?? throw new WebSocketException(WebSocketError.ConnectionClosedPrematurely, "The connected socket has been disconnected"); - - return _socket.ReceiveAsync(buffer, cancellationToken); - } - ///<inheritdoc/> - public override ValueTask SendAsync(ReadOnlyMemory<byte> buffer, WebSocketMessageType messageType, bool endOfMessage, CancellationToken cancellationToken) - { - _ = _socket ?? throw new WebSocketException(WebSocketError.ConnectionClosedPrematurely, "The connected socket has been disconnected"); - return _socket.SendAsync(buffer, messageType, endOfMessage, cancellationToken); - } - ///<inheritdoc/> - public override Task SendAsync(ArraySegment<byte> buffer, WebSocketMessageType messageType, bool endOfMessage, CancellationToken cancellationToken) - { - _ = _socket ?? throw new WebSocketException(WebSocketError.ConnectionClosedPrematurely, "The connected socket has been disconnected"); - return _socket.SendAsync(buffer, messageType, endOfMessage, cancellationToken); - } - - ///<inheritdoc/> - public override void Dispose() - { - //Free buffer - _dataBuffer.Dispose(); - _socket?.Dispose(); - GC.SuppressFinalize(this); - } - } -} |