From 1074db64cc7bae103240ff1220d50d1958b7a900 Mon Sep 17 00:00:00 2001 From: vnugent Date: Thu, 27 Jul 2023 01:39:44 -0400 Subject: Native compression lib first build, managed, and tests --- lib/Net.Messaging.FBM/src/Server/FBMListener.cs | 212 ++++++++++++------------ 1 file changed, 107 insertions(+), 105 deletions(-) (limited to 'lib/Net.Messaging.FBM/src/Server') diff --git a/lib/Net.Messaging.FBM/src/Server/FBMListener.cs b/lib/Net.Messaging.FBM/src/Server/FBMListener.cs index 3417abc..fd8b025 100644 --- a/lib/Net.Messaging.FBM/src/Server/FBMListener.cs +++ b/lib/Net.Messaging.FBM/src/Server/FBMListener.cs @@ -54,110 +54,7 @@ namespace VNLib.Net.Messaging.FBM.Server /// and raises events on requests. /// public class FBMListener - { - private sealed class ListeningSession - { - private readonly ReusableStore CtxStore; - private readonly CancellationTokenSource Cancellation; - private readonly CancellationTokenRegistration Registration; - private readonly FBMListenerSessionParams Params; - - - public readonly object? UserState; - - public readonly SemaphoreSlim ResponseLock; - - public readonly WebSocketSession Socket; - - public readonly RequestHandler OnRecieved; - - public CancellationToken CancellationToken => Cancellation.Token; - - - public ListeningSession(WebSocketSession session, RequestHandler onRecieved, in FBMListenerSessionParams args, object? userState) - { - Params = args; - Socket = session; - UserState = userState; - OnRecieved = onRecieved; - - //Create cancellation and register for session close - Cancellation = new(); - Registration = session.Token.Register(Cancellation.Cancel); - - ResponseLock = new(1); - CtxStore = ObjectRental.CreateReusable(ContextCtor); - } - - private FBMContext ContextCtor() => new(Params.MaxHeaderBufferSize, Params.ResponseBufferSize, Params.HeaderEncoding); - - /// - /// Cancels any pending opreations relating to the current session - /// - public void CancelSession() - { - Cancellation.Cancel(); - - //If dispose happens without any outstanding requests, we can dispose the session - if (_counter == 0) - { - CleanupInternal(); - } - } - - private void CleanupInternal() - { - Registration.Dispose(); - CtxStore.Dispose(); - Cancellation.Dispose(); - ResponseLock.Dispose(); - } - - - private uint _counter; - - /// - /// Rents a new instance from the pool - /// and increments the counter - /// - /// The rented instance - /// - public FBMContext RentContext() - { - - if (Cancellation.IsCancellationRequested) - { - throw new ObjectDisposedException("The instance has been disposed"); - } - - //Rent context - FBMContext ctx = CtxStore.Rent(); - //Increment counter - Interlocked.Increment(ref _counter); - - return ctx; - } - - /// - /// Returns a previously rented context to the pool - /// and decrements the counter. If the session has been - /// cancelled, when the counter reaches 0, cleanup occurs - /// - /// The context to return - public void ReturnContext(FBMContext ctx) - { - //Return the context - CtxStore.Return(ctx); - - uint current = Interlocked.Decrement(ref _counter); - - //No more contexts in use, dispose internals - if (Cancellation.IsCancellationRequested && current == 0) - { - CleanupInternal(); - } - } - } + { public const int SEND_SEMAPHORE_TIMEOUT_MS = 10 * 1000; @@ -189,7 +86,8 @@ namespace VNLib.Net.Messaging.FBM.Server /// A that completes when the connection closes public async Task ListenAsync(WebSocketSession wss, RequestHandler handler, FBMListenerSessionParams args, object? userState) { - ListeningSession session = new(wss, handler, args, userState); + ListeningSession session = new(wss, handler, in args, userState); + //Alloc a recieve buffer using IMemoryOwner recvBuffer = Heap.DirectAlloc(args.RecvBufferSize); @@ -385,5 +283,109 @@ namespace VNLib.Net.Messaging.FBM.Server { return Task.CompletedTask; } + + private sealed class ListeningSession + { + private readonly ReusableStore CtxStore; + private readonly CancellationTokenSource Cancellation; + private readonly CancellationTokenRegistration Registration; + private readonly FBMListenerSessionParams Params; + + + public readonly object? UserState; + + public readonly SemaphoreSlim ResponseLock; + + public readonly WebSocketSession Socket; + + public readonly RequestHandler OnRecieved; + + public CancellationToken CancellationToken => Cancellation.Token; + + + public ListeningSession(WebSocketSession session, RequestHandler onRecieved, in FBMListenerSessionParams args, object? userState) + { + Params = args; + Socket = session; + UserState = userState; + OnRecieved = onRecieved; + + //Create cancellation and register for session close + Cancellation = new(); + Registration = session.Token.Register(Cancellation.Cancel); + + ResponseLock = new(1); + CtxStore = ObjectRental.CreateReusable(ContextCtor); + } + + private FBMContext ContextCtor() => new(Params.MaxHeaderBufferSize, Params.ResponseBufferSize, Params.HeaderEncoding); + + /// + /// Cancels any pending opreations relating to the current session + /// + public void CancelSession() + { + Cancellation.Cancel(); + + //If dispose happens without any outstanding requests, we can dispose the session + if (_counter == 0) + { + CleanupInternal(); + } + } + + private void CleanupInternal() + { + Registration.Dispose(); + CtxStore.Dispose(); + Cancellation.Dispose(); + ResponseLock.Dispose(); + } + + + private uint _counter; + + /// + /// Rents a new instance from the pool + /// and increments the counter + /// + /// The rented instance + /// + public FBMContext RentContext() + { + + if (Cancellation.IsCancellationRequested) + { + throw new ObjectDisposedException("The instance has been disposed"); + } + + //Rent context + FBMContext ctx = CtxStore.Rent(); + //Increment counter + Interlocked.Increment(ref _counter); + + return ctx; + } + + /// + /// Returns a previously rented context to the pool + /// and decrements the counter. If the session has been + /// cancelled, when the counter reaches 0, cleanup occurs + /// + /// The context to return + public void ReturnContext(FBMContext ctx) + { + //Return the context + CtxStore.Return(ctx); + + uint current = Interlocked.Decrement(ref _counter); + + //No more contexts in use, dispose internals + if (Cancellation.IsCancellationRequested && current == 0) + { + CleanupInternal(); + } + } + } } } -- cgit