diff options
Diffstat (limited to 'lib/Net.Messaging.FBM/src/Server')
-rw-r--r-- | lib/Net.Messaging.FBM/src/Server/FBMListener.cs | 212 |
1 files changed, 107 insertions, 105 deletions
diff --git a/lib/Net.Messaging.FBM/src/Server/FBMListener.cs b/lib/Net.Messaging.FBM/src/Server/FBMListener.cs index 3417abc..fd8b025 100644 --- a/lib/Net.Messaging.FBM/src/Server/FBMListener.cs +++ b/lib/Net.Messaging.FBM/src/Server/FBMListener.cs @@ -54,110 +54,7 @@ namespace VNLib.Net.Messaging.FBM.Server /// and raises events on requests. /// </summary> public class FBMListener - { - private sealed class ListeningSession - { - private readonly ReusableStore<FBMContext> CtxStore; - private readonly CancellationTokenSource Cancellation; - private readonly CancellationTokenRegistration Registration; - private readonly FBMListenerSessionParams Params; - - - public readonly object? UserState; - - public readonly SemaphoreSlim ResponseLock; - - public readonly WebSocketSession Socket; - - public readonly RequestHandler OnRecieved; - - public CancellationToken CancellationToken => Cancellation.Token; - - - public ListeningSession(WebSocketSession session, RequestHandler onRecieved, in FBMListenerSessionParams args, object? userState) - { - Params = args; - Socket = session; - UserState = userState; - OnRecieved = onRecieved; - - //Create cancellation and register for session close - Cancellation = new(); - Registration = session.Token.Register(Cancellation.Cancel); - - ResponseLock = new(1); - CtxStore = ObjectRental.CreateReusable(ContextCtor); - } - - private FBMContext ContextCtor() => new(Params.MaxHeaderBufferSize, Params.ResponseBufferSize, Params.HeaderEncoding); - - /// <summary> - /// Cancels any pending opreations relating to the current session - /// </summary> - public void CancelSession() - { - Cancellation.Cancel(); - - //If dispose happens without any outstanding requests, we can dispose the session - if (_counter == 0) - { - CleanupInternal(); - } - } - - private void CleanupInternal() - { - Registration.Dispose(); - CtxStore.Dispose(); - Cancellation.Dispose(); - ResponseLock.Dispose(); - } - - - private uint _counter; - - /// <summary> - /// Rents a new <see cref="FBMContext"/> instance from the pool - /// and increments the counter - /// </summary> - /// <returns>The rented instance</returns> - /// <exception cref="ObjectDisposedException"></exception> - public FBMContext RentContext() - { - - if (Cancellation.IsCancellationRequested) - { - throw new ObjectDisposedException("The instance has been disposed"); - } - - //Rent context - FBMContext ctx = CtxStore.Rent(); - //Increment counter - Interlocked.Increment(ref _counter); - - return ctx; - } - - /// <summary> - /// Returns a previously rented context to the pool - /// and decrements the counter. If the session has been - /// cancelled, when the counter reaches 0, cleanup occurs - /// </summary> - /// <param name="ctx">The context to return</param> - public void ReturnContext(FBMContext ctx) - { - //Return the context - CtxStore.Return(ctx); - - uint current = Interlocked.Decrement(ref _counter); - - //No more contexts in use, dispose internals - if (Cancellation.IsCancellationRequested && current == 0) - { - CleanupInternal(); - } - } - } + { public const int SEND_SEMAPHORE_TIMEOUT_MS = 10 * 1000; @@ -189,7 +86,8 @@ namespace VNLib.Net.Messaging.FBM.Server /// <returns>A <see cref="Task"/> that completes when the connection closes</returns> public async Task ListenAsync(WebSocketSession wss, RequestHandler handler, FBMListenerSessionParams args, object? userState) { - ListeningSession session = new(wss, handler, args, userState); + ListeningSession session = new(wss, handler, in args, userState); + //Alloc a recieve buffer using IMemoryOwner<byte> recvBuffer = Heap.DirectAlloc<byte>(args.RecvBufferSize); @@ -385,5 +283,109 @@ namespace VNLib.Net.Messaging.FBM.Server { return Task.CompletedTask; } + + private sealed class ListeningSession + { + private readonly ReusableStore<FBMContext> CtxStore; + private readonly CancellationTokenSource Cancellation; + private readonly CancellationTokenRegistration Registration; + private readonly FBMListenerSessionParams Params; + + + public readonly object? UserState; + + public readonly SemaphoreSlim ResponseLock; + + public readonly WebSocketSession Socket; + + public readonly RequestHandler OnRecieved; + + public CancellationToken CancellationToken => Cancellation.Token; + + + public ListeningSession(WebSocketSession session, RequestHandler onRecieved, in FBMListenerSessionParams args, object? userState) + { + Params = args; + Socket = session; + UserState = userState; + OnRecieved = onRecieved; + + //Create cancellation and register for session close + Cancellation = new(); + Registration = session.Token.Register(Cancellation.Cancel); + + ResponseLock = new(1); + CtxStore = ObjectRental.CreateReusable(ContextCtor); + } + + private FBMContext ContextCtor() => new(Params.MaxHeaderBufferSize, Params.ResponseBufferSize, Params.HeaderEncoding); + + /// <summary> + /// Cancels any pending opreations relating to the current session + /// </summary> + public void CancelSession() + { + Cancellation.Cancel(); + + //If dispose happens without any outstanding requests, we can dispose the session + if (_counter == 0) + { + CleanupInternal(); + } + } + + private void CleanupInternal() + { + Registration.Dispose(); + CtxStore.Dispose(); + Cancellation.Dispose(); + ResponseLock.Dispose(); + } + + + private uint _counter; + + /// <summary> + /// Rents a new <see cref="FBMContext"/> instance from the pool + /// and increments the counter + /// </summary> + /// <returns>The rented instance</returns> + /// <exception cref="ObjectDisposedException"></exception> + public FBMContext RentContext() + { + + if (Cancellation.IsCancellationRequested) + { + throw new ObjectDisposedException("The instance has been disposed"); + } + + //Rent context + FBMContext ctx = CtxStore.Rent(); + //Increment counter + Interlocked.Increment(ref _counter); + + return ctx; + } + + /// <summary> + /// Returns a previously rented context to the pool + /// and decrements the counter. If the session has been + /// cancelled, when the counter reaches 0, cleanup occurs + /// </summary> + /// <param name="ctx">The context to return</param> + public void ReturnContext(FBMContext ctx) + { + //Return the context + CtxStore.Return(ctx); + + uint current = Interlocked.Decrement(ref _counter); + + //No more contexts in use, dispose internals + if (Cancellation.IsCancellationRequested && current == 0) + { + CleanupInternal(); + } + } + } } } |