diff options
Diffstat (limited to 'lib/Net.Messaging.FBM/src/Client')
-rw-r--r-- | lib/Net.Messaging.FBM/src/Client/FBMBuffer.cs | 94 | ||||
-rw-r--r-- | lib/Net.Messaging.FBM/src/Client/FBMClient.cs | 117 | ||||
-rw-r--r-- | lib/Net.Messaging.FBM/src/Client/FBMClientConfig.cs | 3 | ||||
-rw-r--r-- | lib/Net.Messaging.FBM/src/Client/FBMRequest.cs | 131 | ||||
-rw-r--r-- | lib/Net.Messaging.FBM/src/Client/FBMResponse.cs | 9 | ||||
-rw-r--r-- | lib/Net.Messaging.FBM/src/Client/IFBMMessage.cs | 62 | ||||
-rw-r--r-- | lib/Net.Messaging.FBM/src/Client/IFBMMessageWaiter.cs | 6 | ||||
-rw-r--r-- | lib/Net.Messaging.FBM/src/Client/ManagedClientWebSocket.cs | 4 |
8 files changed, 201 insertions, 225 deletions
diff --git a/lib/Net.Messaging.FBM/src/Client/FBMBuffer.cs b/lib/Net.Messaging.FBM/src/Client/FBMBuffer.cs index fac41a6..698a98b 100644 --- a/lib/Net.Messaging.FBM/src/Client/FBMBuffer.cs +++ b/lib/Net.Messaging.FBM/src/Client/FBMBuffer.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2022 Vaughn Nugent +* Copyright (c) 2023 Vaughn Nugent * * Library: VNLib * Package: VNLib.Net.Messaging.FBM @@ -28,28 +28,50 @@ using System.Runtime.CompilerServices; using System.Runtime.InteropServices; using VNLib.Utils.IO; +using VNLib.Utils.Memory.Caching; namespace VNLib.Net.Messaging.FBM.Client { /// <summary> /// Represents a shared internal character and bianry buffer for /// </summary> - internal sealed class FBMBuffer : IFBMHeaderBuffer, IDisposable + internal sealed class FBMBuffer : IFBMHeaderBuffer, IDisposable, IReusable { - private readonly IMemoryOwner<byte> Handle; + private readonly IFBMMemoryHandle Handle; + private readonly IFBMMemoryManager _memoryManager; + private readonly int _size; private readonly BufferWriter _writer; private readonly BinaryRequestAccumulator _requestAccumulator; - internal FBMBuffer(IMemoryOwner<byte> handle) + internal FBMBuffer(IFBMMemoryManager manager, int bufferSize) { - Handle = handle; + _memoryManager = manager; + Handle = manager.InitHandle(); _writer = new(this); - _requestAccumulator = new(handle.Memory); + _size = bufferSize; + _requestAccumulator = new(this, bufferSize); } + /* + * Reusable methods will alloc and free buffers during + * normal operation. + */ + + ///<inheritdoc/> + public void Prepare() => _memoryManager.AllocBuffer(Handle, _size); + + ///<inheritdoc/> + public bool Release() + { + _memoryManager.FreeBuffer(Handle); + return true; + } + + public void Dispose() => _ = Release(); + /// <summary> /// Gets the internal request data accumulator /// </summary> @@ -73,13 +95,6 @@ namespace VNLib.Net.Messaging.FBM.Client //Return the internal writer return _writer; } - - - public void Dispose() - { - //Dispose handle - Handle.Dispose(); - } /// <summary> /// Resets the request accumulator and writes the initial message id @@ -91,7 +106,7 @@ namespace VNLib.Net.Messaging.FBM.Client _requestAccumulator.Reset(); //Write message id to accumulator, it should already be reset - Helpers.WriteMessageid(RequestBuffer, messageId); + Helpers.WriteMessageid(_requestAccumulator, messageId); } ///<inheritdoc/> @@ -99,24 +114,24 @@ namespace VNLib.Net.Messaging.FBM.Client Span<char> IFBMHeaderBuffer.GetSpan(int offset, int count) { //Get the character span - Span<char> span = MemoryMarshal.Cast<byte, char>(Handle.Memory.Span); + Span<char> span = MemoryMarshal.Cast<byte, char>(Handle.GetSpan()); return span.Slice(offset, count); } ///<inheritdoc/> [MethodImpl(MethodImplOptions.AggressiveInlining)] - Span<char> IFBMHeaderBuffer.GetSpan() => MemoryMarshal.Cast<byte, char>(Handle.Memory.Span); + Span<char> IFBMHeaderBuffer.GetSpan() => MemoryMarshal.Cast<byte, char>(Handle.GetSpan()); private sealed class BinaryRequestAccumulator : IDataAccumulator<byte> { private readonly int Size; - private readonly Memory<byte> Buffer; + private readonly FBMBuffer Buffer; - internal BinaryRequestAccumulator(Memory<byte> buffer) + internal BinaryRequestAccumulator(FBMBuffer buffer, int size) { Buffer = buffer; - Size = buffer.Length; + Size = size; } ///<inheritdoc/> @@ -126,52 +141,47 @@ namespace VNLib.Net.Messaging.FBM.Client public int RemainingSize => Size - AccumulatedSize; ///<inheritdoc/> - public Span<byte> Remaining => Buffer.Span.Slice(AccumulatedSize, RemainingSize); + public Span<byte> Remaining => Buffer.Handle.GetSpan().Slice(AccumulatedSize, RemainingSize); + ///<inheritdoc/> - public Span<byte> Accumulated => Buffer.Span[..AccumulatedSize]; + public Span<byte> Accumulated => Buffer.Handle.GetSpan()[..AccumulatedSize]; /// <summary> /// Gets the accumulated data as a memory segment /// </summary> - public Memory<byte> AccumulatedMemory => Buffer[..AccumulatedSize]; + public Memory<byte> AccumulatedMemory => Buffer.Handle.GetMemory()[..AccumulatedSize]; /// <summary> /// Gets the remaining buffer segment as a memory segment /// </summary> - public Memory<byte> RemainingMemory => Buffer.Slice(AccumulatedSize, RemainingSize); + public Memory<byte> RemainingMemory => Buffer.Handle.GetMemory().Slice(AccumulatedSize, RemainingSize); ///<inheritdoc/> public void Advance(int count) => AccumulatedSize += count; + ///<inheritdoc/> public void Reset() => AccumulatedSize = 0; } + /* + * A buffer writer that wraps the request accumulator to allow + * a finite amount of data to be written to the accumulator since + * it uses a fixed size internal buffer. + */ private sealed class BufferWriter : IBufferWriter<byte> { private readonly FBMBuffer Buffer; - public BufferWriter(FBMBuffer buffer) - { - Buffer = buffer; - } + public BufferWriter(FBMBuffer buffer) => Buffer = buffer; - public void Advance(int count) - { - //Advance the writer - Buffer.RequestBuffer.Advance(count); - } + ///<inheritdoc/> + public void Advance(int count) => Buffer._requestAccumulator.Advance(count); - public Memory<byte> GetMemory(int sizeHint = 0) - { - //Get the remaining memory segment - return Buffer._requestAccumulator.RemainingMemory; - } + ///<inheritdoc/> + public Memory<byte> GetMemory(int sizeHint = 0) => Buffer._requestAccumulator.RemainingMemory; - public Span<byte> GetSpan(int sizeHint = 0) - { - //Get the remaining data segment - return Buffer.RequestBuffer.Remaining; - } + ///<inheritdoc/> + public Span<byte> GetSpan(int sizeHint = 0) => Buffer._requestAccumulator.Remaining; } } } diff --git a/lib/Net.Messaging.FBM/src/Client/FBMClient.cs b/lib/Net.Messaging.FBM/src/Client/FBMClient.cs index 5184c38..c8319fa 100644 --- a/lib/Net.Messaging.FBM/src/Client/FBMClient.cs +++ b/lib/Net.Messaging.FBM/src/Client/FBMClient.cs @@ -24,7 +24,6 @@ using System; using System.IO; -using System.Buffers; using System.Threading; using System.Net.WebSockets; using System.Threading.Tasks; @@ -34,9 +33,12 @@ using System.Collections.Concurrent; using VNLib.Net.Http; using VNLib.Utils; using VNLib.Utils.IO; +using VNLib.Utils.Memory; +using VNLib.Utils.Memory.Caching; using VNLib.Utils.Logging; using VNLib.Utils.Extensions; -using VNLib.Utils.Memory.Caching; + +#pragma warning disable CA2007 // Consider calling ConfigureAwait on the awaited task namespace VNLib.Net.Messaging.FBM.Client { @@ -60,6 +62,8 @@ namespace VNLib.Net.Messaging.FBM.Client /// </summary> public const string REQ_MAX_MESS_QUERY_ARG = "mx"; + public const int MAX_STREAM_BUFFER_SIZE = 128 * 1024; + /// <summary> /// Raised when the websocket has been closed because an error occured. /// You may inspect the event args to determine the cause of the error. @@ -74,12 +78,14 @@ namespace VNLib.Net.Messaging.FBM.Client private readonly SemaphoreSlim SendLock; private readonly ConcurrentDictionary<int, FBMRequest> ActiveRequests; private readonly ObjectRental<FBMRequest> RequestRental; + private readonly FBMClientConfig _config; + private readonly byte[] _streamBuffer; /// <summary> /// The configuration for the current client /// </summary> - public FBMClientConfig Config { get; } - + public ref readonly FBMClientConfig Config => ref _config; + /// <summary> /// A handle that is reset when a connection has been successfully set, and is set /// when the connection exists @@ -103,9 +109,13 @@ namespace VNLib.Net.Messaging.FBM.Client ConnectionStatusHandle = new(true); ActiveRequests = new(Environment.ProcessorCount, 100); - Config = config; + _config = config; //Init the new client socket ClientSocket = new(config.RecvBufferSize, config.RecvBufferSize, config.KeepAliveInterval, config.SubProtocol); + + //Init the stream buffer + int maxStrmBufSize = Math.Min(config.MaxMessageSize, MAX_STREAM_BUFFER_SIZE); + _streamBuffer = new byte[maxStrmBufSize]; } private void Debug(string format, params string[] strings) @@ -127,7 +137,7 @@ namespace VNLib.Net.Messaging.FBM.Client /// Allocates and configures a new <see cref="FBMRequest"/> message object for use within the reusable store /// </summary> /// <returns>The configured <see cref="FBMRequest"/></returns> - protected virtual FBMRequest ReuseableRequestConstructor() => new(Config); + protected virtual FBMRequest ReuseableRequestConstructor() => new(in _config); /// <summary> /// Asynchronously opens a websocket connection with the specifed remote server @@ -180,10 +190,7 @@ namespace VNLib.Net.Messaging.FBM.Client /// <exception cref="ObjectDisposedException"></exception> /// <exception cref="InvalidOperationException"></exception> /// <exception cref="FBMInvalidRequestException"></exception> - public Task<FBMResponse> SendAsync(FBMRequest request, CancellationToken cancellationToken = default) - { - return SendAsync(request, Config.RequestTimeout, cancellationToken); - } + public Task<FBMResponse> SendAsync(FBMRequest request, CancellationToken cancellationToken = default) => SendAsync(request, _config.RequestTimeout, cancellationToken); /// <summary> /// Sends a <see cref="FBMRequest"/> to the connected server @@ -211,6 +218,8 @@ namespace VNLib.Net.Messaging.FBM.Client try { + FBMResponse response = new(); + //Get the request data segment ReadOnlyMemory<byte> requestData = request.GetRequestData(); @@ -224,22 +233,26 @@ namespace VNLib.Net.Messaging.FBM.Client } //wait for the response to be set - await request.Waiter.WaitAsync(timeout, cancellationToken).ConfigureAwait(true); + await request.Waiter.GetTask(timeout, cancellationToken).ConfigureAwait(true); Debug("Received {size} bytes for message {id}", request.Response?.Length ?? 0, request.MessageId); - return request.GetResponse(); + //Get the response data + request.GetResponse(ref response); + + return response; } catch { //Remove the request since packet was never sent ActiveRequests.Remove(request.MessageId, out _); - - //Cleanup waiter - request.Waiter.OnEndRequest(); - throw; } + finally + { + //Always cleanup waiter + request.Waiter.OnEndRequest(); + } } /// <summary> @@ -247,30 +260,28 @@ namespace VNLib.Net.Messaging.FBM.Client /// </summary> /// <param name="request">The request message to send to the server</param> /// <param name="payload">Data to stream to the server</param> - /// <param name="ct">The content type of the stream of data</param> + /// <param name="contentType">The content type of the stream of data</param> /// <param name="cancellationToken">A token to cancel the operation</param> /// <returns>A task that resolves when the data is sent and the resonse is received</returns> /// <exception cref="ArgumentException"></exception> /// <exception cref="ObjectDisposedException"></exception> /// <exception cref="InvalidOperationException"></exception> - public Task StreamDataAsync(FBMRequest request, Stream payload, ContentType ct, CancellationToken cancellationToken = default) - { - return StreamDataAsync(request, payload, ct, Config.RequestTimeout, cancellationToken); - } + public Task<FBMResponse> StreamDataAsync(FBMRequest request, Stream payload, ContentType contentType, CancellationToken cancellationToken = default) + => StreamDataAsync(request, payload, contentType, _config.RequestTimeout, cancellationToken); /// <summary> /// Streams arbitrary binary data to the server with the initial request message /// </summary> /// <param name="request">The request message to send to the server</param> /// <param name="payload">Data to stream to the server</param> - /// <param name="ct">The content type of the stream of data</param> + /// <param name="contentType">The content type of the stream of data</param> /// <param name="cancellationToken">A token to cancel the operation</param> /// <param name="timeout">A maxium wait timeout period. If -1 or 0 the timeout is disabled</param> /// <returns>A task that resolves when the data is sent and the resonse is received</returns> /// <exception cref="ArgumentException"></exception> /// <exception cref="ObjectDisposedException"></exception> /// <exception cref="InvalidOperationException"></exception> - public async Task StreamDataAsync(FBMRequest request, Stream payload, ContentType ct, TimeSpan timeout, CancellationToken cancellationToken = default) + public async Task<FBMResponse> StreamDataAsync(FBMRequest request, Stream payload, ContentType contentType, TimeSpan timeout, CancellationToken cancellationToken = default) { Check(); @@ -282,19 +293,15 @@ namespace VNLib.Net.Messaging.FBM.Client try { + FBMResponse response = new(); + //Get the request data segment ReadOnlyMemory<byte> requestData = request.GetRequestData(); Debug("Streaming {bytes} with id {id}", requestData.Length, request.MessageId); - //Write an empty body in the request - request.WriteBody(ReadOnlySpan<byte>.Empty, ct); - - //Calc buffer size - int bufSize = (int)Math.Clamp(payload.Length, Config.MessageBufferSize, Config.MaxMessageSize); - - //Alloc a streaming buffer - using IMemoryOwner<byte> buffer = Config.BufferHeap.DirectAlloc<byte>(bufSize); + //Write an empty body in the request so a content type header is writen + request.WriteBody(ReadOnlySpan<byte>.Empty, contentType); //Wait for send-lock using (SemSlimReleaser releaser = await SendLock.GetReleaserAsync(cancellationToken)) @@ -306,7 +313,7 @@ namespace VNLib.Net.Messaging.FBM.Client do { //Read data - int read = await payload.ReadAsync(buffer.Memory, cancellationToken); + int read = await payload.ReadAsync(_streamBuffer, cancellationToken); if (read == 0) { @@ -315,29 +322,32 @@ namespace VNLib.Net.Messaging.FBM.Client } //write message to socket, if the read data was smaller than the buffer, we can send the last packet - await ClientSocket.SendAsync(buffer.Memory[..read], WebSocketMessageType.Binary, read < bufSize, cancellationToken); + await ClientSocket.SendAsync(_streamBuffer[..read], WebSocketMessageType.Binary, read < _streamBuffer.Length, cancellationToken); } while (true); } //wait for the server to respond - await request.Waiter.WaitAsync(timeout, cancellationToken).ConfigureAwait(true); + await request.Waiter.GetTask(timeout, cancellationToken).ConfigureAwait(true); Debug("Response recieved {size} bytes for message {id}", request.Response?.Length ?? 0, request.MessageId); + + request.GetResponse(ref response); + return response; } catch { //Remove the request since packet was never sent or cancelled _ = ActiveRequests.TryRemove(request.MessageId, out _); - - //Cleanup request waiter - request.Waiter.OnEndRequest(); - throw; } + finally + { + //Always cleanup waiter + request.Waiter.OnEndRequest(); + } } - /// <summary> /// Closes the underlying <see cref="WebSocket"/> and cancels all pending operations /// </summary> @@ -399,14 +409,20 @@ namespace VNLib.Net.Messaging.FBM.Client Debug("Begining receive loop"); //Alloc recv buffer - IMemoryOwner<byte> recvBuffer = Config.BufferHeap.DirectAlloc<byte>(Config.RecvBufferSize); + IFBMMemoryHandle recvBuffer = _config.MemoryManager.InitHandle(); + _config.MemoryManager.AllocBuffer(recvBuffer, _config.RecvBufferSize); try { + if(!_config.MemoryManager.TryGetHeap(out IUnmangedHeap? heap)) + { + throw new NotSupportedException("The memory manager must support using IUnmanagedHeaps"); + } + //Recv event loop while (true) { //Listen for incoming packets with the intial data buffer - ValueWebSocketReceiveResult result = await socket.ReceiveAsync(recvBuffer.Memory, CancellationToken.None); + ValueWebSocketReceiveResult result = await socket.ReceiveAsync(recvBuffer.GetMemory(), CancellationToken.None); //If the message is a close message, its time to exit if (result.MessageType == WebSocketMessageType.Close) @@ -422,19 +438,19 @@ namespace VNLib.Net.Messaging.FBM.Client } //Alloc data buffer and write initial data - VnMemoryStream responseBuffer = new(Config.BufferHeap); + VnMemoryStream responseBuffer = new(heap); //Copy initial data - responseBuffer.Write(recvBuffer.Memory.Span[..result.Count]); + responseBuffer.Write(recvBuffer.GetSpan()[..result.Count]); //Receive packets until the EOF is reached while (!result.EndOfMessage) { //recive more data - result = await socket.ReceiveAsync(recvBuffer.Memory, CancellationToken.None); + result = await socket.ReceiveAsync(recvBuffer.GetMemory(), CancellationToken.None); //Make sure the buffer is not too large - if ((responseBuffer.Length + result.Count) > Config.MaxMessageSize) + if ((responseBuffer.Length + result.Count) > _config.MaxMessageSize) { //Dispose the buffer before exiting responseBuffer.Dispose(); @@ -443,7 +459,7 @@ namespace VNLib.Net.Messaging.FBM.Client } //Copy continuous data - responseBuffer.Write(recvBuffer.Memory.Span[..result.Count]); + responseBuffer.Write(recvBuffer.GetSpan()[..result.Count]); } //Reset the buffer stream position @@ -472,7 +488,7 @@ namespace VNLib.Net.Messaging.FBM.Client finally { //Dispose the recv buffer - recvBuffer.Dispose(); + _config.MemoryManager.FreeBuffer(recvBuffer); //Cleanup the socket when exiting ClientSocket.Cleanup(); //Set status handle as unset @@ -522,7 +538,12 @@ namespace VNLib.Net.Messaging.FBM.Client if (ActiveRequests.TryRemove(messageId, out FBMRequest? request)) { //Set the new response message - request.Waiter.Complete(responseMessage); + if (!request.Waiter.Complete(responseMessage)) + { + //Falied to complete, dispose the message data + responseMessage.Dispose(); + Debug("Failed to transition waiting request {id}. Message was dropped", messageId, 0); + } } else { diff --git a/lib/Net.Messaging.FBM/src/Client/FBMClientConfig.cs b/lib/Net.Messaging.FBM/src/Client/FBMClientConfig.cs index 30e9a95..735a0a8 100644 --- a/lib/Net.Messaging.FBM/src/Client/FBMClientConfig.cs +++ b/lib/Net.Messaging.FBM/src/Client/FBMClientConfig.cs @@ -25,7 +25,6 @@ using System; using System.Text; -using VNLib.Utils.Memory; using VNLib.Utils.Logging; namespace VNLib.Net.Messaging.FBM.Client @@ -59,7 +58,7 @@ namespace VNLib.Net.Messaging.FBM.Client /// <summary> /// The heap to allocate internal (and message) buffers from /// </summary> - public readonly IUnmangedHeap BufferHeap { get; init; } + public readonly IFBMMemoryManager MemoryManager { get; init; } /// <summary> /// The websocket keepalive interval to use (leaving this property default disables keepalives) /// </summary> diff --git a/lib/Net.Messaging.FBM/src/Client/FBMRequest.cs b/lib/Net.Messaging.FBM/src/Client/FBMRequest.cs index c4fb493..418a9ec 100644 --- a/lib/Net.Messaging.FBM/src/Client/FBMRequest.cs +++ b/lib/Net.Messaging.FBM/src/Client/FBMRequest.cs @@ -26,6 +26,7 @@ using System; using System.Text; using System.Buffers; using System.Threading; +using System.Diagnostics; using System.Threading.Tasks; using System.Collections.Generic; using System.Runtime.CompilerServices; @@ -37,7 +38,6 @@ using VNLib.Utils.Memory; using VNLib.Utils.Extensions; using VNLib.Utils.Memory.Caching; - namespace VNLib.Net.Messaging.FBM.Client { /// <summary> @@ -97,29 +97,29 @@ namespace VNLib.Net.Messaging.FBM.Client /// <param name="messageId">The custom message id</param> /// <param name="config">The fbm client config storing required config variables</param> public FBMRequest(int messageId, in FBMClientConfig config) - :this(messageId, config.BufferHeap, config.MessageBufferSize, config.HeaderEncoding) + :this(messageId, config.MemoryManager, config.MessageBufferSize, config.HeaderEncoding) { } /// <summary> /// Initializes a new <see cref="FBMRequest"/> with the sepcified message buffer size and a custom MessageId /// </summary> /// <param name="messageId">The custom message id</param> - /// <param name="heap">The heap to allocate the internal buffer from</param> + /// <param name="manager">The memory manager used to allocate the internal buffers</param> /// <param name="bufferSize">The size of the internal buffer</param> /// <param name="headerEncoding">The encoding instance used for header character encoding</param> - public FBMRequest(int messageId, IUnmangedHeap heap, int bufferSize, Encoding headerEncoding) + public FBMRequest(int messageId, IFBMMemoryManager manager, int bufferSize, Encoding headerEncoding) { MessageId = messageId; - HeaderEncoding = headerEncoding; + HeaderEncoding = headerEncoding ?? throw new ArgumentNullException(nameof(headerEncoding)); + _ = manager ?? throw new ArgumentNullException(nameof(manager)); //Configure waiter Waiter = new FBMMessageWaiter(this); - - //Alloc the buffer as a memory owner so a memory buffer can be used - IMemoryOwner<byte> HeapBuffer = heap.DirectAlloc<byte>(bufferSize); - Buffer = new(HeapBuffer); + + Buffer = new(manager, bufferSize); //Prepare the message incase the request is fresh + Buffer.Prepare(); Reset(); } @@ -152,19 +152,13 @@ namespace VNLib.Net.Messaging.FBM.Client /// The request message packet, this may cause side effects /// </summary> [MethodImpl(MethodImplOptions.AggressiveInlining)] - public ReadOnlyMemory<byte> GetRequestData() - { - return Buffer.RequestData; - } + public ReadOnlyMemory<byte> GetRequestData() => Buffer.RequestData; /// <summary> /// Resets the internal buffer and allows for writing a new message with /// the same message-id /// </summary> - public void Reset() - { - Buffer.Reset(MessageId); - } + public void Reset() => Buffer.Reset(MessageId); ///<inheritdoc/> protected override void Free() @@ -175,7 +169,12 @@ namespace VNLib.Net.Messaging.FBM.Client (Waiter as FBMMessageWaiter)!.Dispose(); } - void IReusable.Prepare() => Reset(); + void IReusable.Prepare() + { + //MUST BE CALLED FIRST! + Buffer.Prepare(); + Reset(); + } bool IReusable.Release() { @@ -186,6 +185,9 @@ namespace VNLib.Net.Messaging.FBM.Client Response?.Dispose(); Response = null; + //Free buffer + Buffer.Release(); + return true; } @@ -195,7 +197,7 @@ namespace VNLib.Net.Messaging.FBM.Client /// Gets the response of the sent message /// </summary> /// <returns>The response message for the current request</returns> - internal FBMResponse GetResponse() + internal void GetResponse(ref FBMResponse response) { if (Response != null) { @@ -215,11 +217,7 @@ namespace VNLib.Net.Messaging.FBM.Client HeaderParseError statusFlags = Helpers.ParseHeaders(Response, Buffer, ResponseHeaderList, HeaderEncoding); //return response structure - return new(Response, statusFlags, ResponseHeaderList, OnResponseDisposed); - } - else - { - return new(); + response = new(Response, statusFlags, ResponseHeaderList, OnResponseDisposed); } } @@ -272,12 +270,13 @@ namespace VNLib.Net.Messaging.FBM.Client #endregion #region waiter - private sealed class FBMMessageWaiter : IFBMMessageWaiter, IDisposable + private sealed class FBMMessageWaiter : IFBMMessageWaiter, IDisposable, IThreadPoolWorkItem { private readonly Timer _timer; private readonly FBMRequest _request; private TaskCompletionSource? _tcs; + private CancellationTokenRegistration _token; public FBMMessageWaiter(FBMRequest request) { @@ -298,79 +297,89 @@ namespace VNLib.Net.Messaging.FBM.Client public void OnEndRequest() { //Cleanup tcs ref - _ = Interlocked.Exchange(ref _tcs, null); + _tcs = null; - //Stop timer if set + //Always stop timer if set _timer.Stop(); + + //Cleanup cancellation token + _token.Dispose(); } ///<inheritdoc/> - public void Complete(VnMemoryStream ms) + public bool Complete(VnMemoryStream ms) { - //Read the current state of the tcs TaskCompletionSource? tcs = _tcs; - if (tcs == null) + //Work is done/cancelled + if (tcs != null && tcs.Task.IsCompleted) { - //Work is done/cancelled, dispose the ms and leave - ms.Dispose(); + return false; } //Store response _request.Response = ms; - //Transition to completed state in background thread - static void OnTpCallback(object? state) - { - _ = (state as TaskCompletionSource)!.TrySetResult(); - } - /* * The calling thread may be a TP thread proccessing an async event loop. * We do not want to block this worker thread. */ - ThreadPool.UnsafeQueueUserWorkItem(OnTpCallback, tcs); + return ThreadPool.UnsafeQueueUserWorkItem(this, true); } + /* + * Called when scheduled on the TP thread pool + */ ///<inheritdoc/> - public async Task WaitAsync(TimeSpan timeout, CancellationToken cancellation) + public void Execute() => _tcs?.TrySetResult(); + + + ///<inheritdoc/> + public Task GetTask(TimeSpan timeout, CancellationToken cancellation) { - if (timeout.Ticks > 0) - { - //Restart timer if timeout is configured - _timer.Restart(timeout); - } + TaskCompletionSource? tcs = _tcs; - //Confim the token may be cancelled - if (cancellation.CanBeCanceled) - { - //Register cancellation - using CancellationTokenRegistration reg = cancellation.Register(OnCancelled, this, false); + Debug.Assert(tcs != null, "A call to GetTask was made outside of the request flow, the TaskCompletionSource was null"); - //await task that may be canclled - await _tcs.Task.ConfigureAwait(false); - } - else + /* + * Get task will only be called after the message has been sent. + * The Complete method may have already scheduled a completion by + * the time this method is called, so we may avoid setting up the + * timer and cancellation if possible. Also since this mthod is + * called from the request side, we know the tcs cannot be null + */ + + if (!tcs.Task.IsCompleted) { - //await the task directly - await _tcs.Task.ConfigureAwait(false); + if (timeout.Ticks > 0) + { + //Restart timer if timeout is configured + _timer.Restart(timeout); + } + + if (cancellation.CanBeCanceled) + { + //Register cancellation + _token = cancellation.Register(OnCancelled, this); + } } + + return tcs.Task; } ///<inheritdoc/> public void ManualCancellation() => OnCancelled(this); - private void OnCancelled(object? state) - { - //Set cancelled state if exists - _ = _tcs?.TrySetCanceled(); - } + //Set cancelled state if exists, the task may have already completed + private void OnCancelled(object? state) => _tcs?.TrySetCanceled(); ///<inheritdoc/> public void Dispose() { _timer.Dispose(); + _token.Dispose(); } + } #endregion diff --git a/lib/Net.Messaging.FBM/src/Client/FBMResponse.cs b/lib/Net.Messaging.FBM/src/Client/FBMResponse.cs index 6f8fec4..f1148f1 100644 --- a/lib/Net.Messaging.FBM/src/Client/FBMResponse.cs +++ b/lib/Net.Messaging.FBM/src/Client/FBMResponse.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2022 Vaughn Nugent +* Copyright (c) 2023 Vaughn Nugent * * Library: VNLib * Package: VNLib.Net.Messaging.FBM @@ -90,17 +90,16 @@ namespace VNLib.Net.Messaging.FBM.Client /// <summary> /// Releases any resources associated with the response message /// </summary> - public void Dispose() => _onDispose?.Invoke(); + public readonly void Dispose() => _onDispose?.Invoke(); ///<inheritdoc/> public override bool Equals(object? obj) => obj is FBMResponse response && Equals(response); ///<inheritdoc/> + public bool Equals(FBMResponse other) => IsSet && other.IsSet && ReferenceEquals(MessagePacket, other.MessagePacket); + ///<inheritdoc/> public override int GetHashCode() => IsSet ? MessagePacket!.GetHashCode() : 0; ///<inheritdoc/> public static bool operator ==(FBMResponse left, FBMResponse right) => left.Equals(right); ///<inheritdoc/> public static bool operator !=(FBMResponse left, FBMResponse right) => !(left == right); - ///<inheritdoc/> - public bool Equals(FBMResponse other) => IsSet && other.IsSet && MessagePacket == other.MessagePacket; - } } diff --git a/lib/Net.Messaging.FBM/src/Client/IFBMMessage.cs b/lib/Net.Messaging.FBM/src/Client/IFBMMessage.cs deleted file mode 100644 index 18f19ec..0000000 --- a/lib/Net.Messaging.FBM/src/Client/IFBMMessage.cs +++ /dev/null @@ -1,62 +0,0 @@ -/* -* Copyright (c) 2022 Vaughn Nugent -* -* Library: VNLib -* Package: VNLib.Net.Messaging.FBM -* File: IFBMMessage.cs -* -* IFBMMessage.cs is part of VNLib.Net.Messaging.FBM which is part of the larger -* VNLib collection of libraries and utilities. -* -* VNLib.Net.Messaging.FBM is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License as -* published by the Free Software Foundation, either version 3 of the -* License, or (at your option) any later version. -* -* VNLib.Net.Messaging.FBM is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see https://www.gnu.org/licenses/. -*/ - -using System; - -using VNLib.Net.Http; - -namespace VNLib.Net.Messaging.FBM.Client -{ - /// <summary> - /// Represents basic Fixed Buffer Message protocol operations - /// </summary> - public interface IFBMMessage - { - /// <summary> - /// The unique id of the message (nonzero) - /// </summary> - int MessageId { get; } - /// <summary> - /// Writes a data body to the message of the specified content type - /// </summary> - /// <param name="body">The body of the message to copy</param> - /// <param name="contentType">The content type of the message body</param> - /// <exception cref="OutOfMemoryException"></exception> - void WriteBody(ReadOnlySpan<byte> body, ContentType contentType = ContentType.Binary); - /// <summary> - /// Appends an arbitrary header to the current request buffer - /// </summary> - /// <param name="header">The header id</param> - /// <param name="value">The value of the header</param> - /// <exception cref="OutOfMemoryException"></exception> - void WriteHeader(byte header, ReadOnlySpan<char> value); - /// <summary> - /// Appends an arbitrary header to the current request buffer - /// </summary> - /// <param name="header">The <see cref="HeaderCommand"/> of the header</param> - /// <param name="value">The value of the header</param> - /// <exception cref="OutOfMemoryException"></exception> - void WriteHeader(HeaderCommand header, ReadOnlySpan<char> value); - } -}
\ No newline at end of file diff --git a/lib/Net.Messaging.FBM/src/Client/IFBMMessageWaiter.cs b/lib/Net.Messaging.FBM/src/Client/IFBMMessageWaiter.cs index 5000711..cc8e1c4 100644 --- a/lib/Net.Messaging.FBM/src/Client/IFBMMessageWaiter.cs +++ b/lib/Net.Messaging.FBM/src/Client/IFBMMessageWaiter.cs @@ -48,9 +48,9 @@ namespace VNLib.Net.Messaging.FBM.Client /// or a timeout /// </summary> /// <param name="timeout">The maxium time to wait for the server to respond (may be default/0)</param> - /// <param name="cancellation">The cancellation token to observe</param> + /// <param name="cancellation">A token to cancel the wait task</param> /// <returns>A task that completes when the server responds</returns> - Task WaitAsync(TimeSpan timeout, CancellationToken cancellation); + Task GetTask(TimeSpan timeout, CancellationToken cancellation); /// <summary> /// Called by the client to cleanup the waiter when the request is completed @@ -63,7 +63,7 @@ namespace VNLib.Net.Messaging.FBM.Client /// Set by the client when the response has been successfully received by the client /// </summary> /// <param name="ms">The response data to pass to the response</param> - void Complete(VnMemoryStream ms); + bool Complete(VnMemoryStream ms); /// <summary> /// Called to invoke a manual cancellation of a request waiter. This method should diff --git a/lib/Net.Messaging.FBM/src/Client/ManagedClientWebSocket.cs b/lib/Net.Messaging.FBM/src/Client/ManagedClientWebSocket.cs index d0352d3..fc2e417 100644 --- a/lib/Net.Messaging.FBM/src/Client/ManagedClientWebSocket.cs +++ b/lib/Net.Messaging.FBM/src/Client/ManagedClientWebSocket.cs @@ -43,7 +43,7 @@ namespace VNLib.Net.Messaging.FBM.Client private readonly int TxBufferSize; private readonly int RxBufferSize; private readonly TimeSpan KeepAliveInterval; - private readonly VnTempBuffer<byte> _dataBuffer; + private readonly ArrayPoolBuffer<byte> _dataBuffer; private readonly string? _subProtocol; /// <summary> @@ -95,7 +95,7 @@ namespace VNLib.Net.Messaging.FBM.Client try { //Set buffer - _socket.Options.SetBuffer(RxBufferSize, TxBufferSize, _dataBuffer); + _socket.Options.SetBuffer(RxBufferSize, TxBufferSize, _dataBuffer.AsArraySegment()); //Set remaining stored options _socket.Options.ClientCertificates = Certificates; _socket.Options.KeepAliveInterval = KeepAliveInterval; |