From 08bfda9fc76d583e6b83f962b4436a5538587f3e Mon Sep 17 00:00:00 2001 From: vnugent Date: Sat, 11 Mar 2023 22:53:10 -0500 Subject: Concrete AsyncAccessSerializer impl and basline tests --- .../src/SessionSerializer.cs | 271 +++------------------ 1 file changed, 35 insertions(+), 236 deletions(-) (limited to 'libs/VNLib.Plugins.Sessions.Cache.Client/src/SessionSerializer.cs') diff --git a/libs/VNLib.Plugins.Sessions.Cache.Client/src/SessionSerializer.cs b/libs/VNLib.Plugins.Sessions.Cache.Client/src/SessionSerializer.cs index 8634c39..479a958 100644 --- a/libs/VNLib.Plugins.Sessions.Cache.Client/src/SessionSerializer.cs +++ b/libs/VNLib.Plugins.Sessions.Cache.Client/src/SessionSerializer.cs @@ -28,36 +28,43 @@ using System.Threading.Tasks; using System.Collections.Generic; using System.Diagnostics.CodeAnalysis; -using VNLib.Utils.Memory.Caching; +using VNLib.Utils.Async; namespace VNLib.Plugins.Sessions.Cache.Client { + /// /// Concrete that provides /// access serialization for session types /// /// The session type - public class SessionSerializer : ISessionSerialzer, ICacheHolder where TSession : IRemoteSession + public class SessionSerializer : AsyncAccessSerializer, ISessionSerialzer where TSession : IRemoteSession { - - private readonly object StoreLock; - private readonly Stack _entryPool; - private readonly Dictionary _waitStore; - private readonly int MaxPoolSize; + /* + * This implementation uses an internal store for wait entires + * that uses a string key instead of using the moniker directly, + * but inherits the concrete serialzer class to reuse the built + * in helper types and methods in the AsyncAccessSerializer class. + * + * The utils library is also currently the only lib that has + * automated testing, so that helps us. + * + * This is to allow sessions to be recovered from their session + * id instead of a session instance. All public api method calls + * are intercepted and routed to the internal wait table + */ + + private readonly Dictionary _waitTable; + /// /// Initializes a new /// - /// The maximum number of elements to pool - public SessionSerializer(int poolCapacity) + /// The maximum number of wait entry instances to hold in memory cache + public SessionSerializer(int poolCapacity):base(poolCapacity, 0, null) { - StoreLock = new(); - _entryPool = new Stack(poolCapacity); - //Session-ids are security senstive, we must use ordinal(binary) string comparison - _waitStore = new Dictionary(poolCapacity, StringComparer.Ordinal); - - MaxPoolSize = poolCapacity; + _waitTable = new Dictionary(poolCapacity, StringComparer.Ordinal); } /// @@ -66,14 +73,14 @@ namespace VNLib.Plugins.Sessions.Cache.Client lock (StoreLock) { //Try to see if an entry is loaded, and get the session - bool result = _waitStore.TryGetValue(sessionId, out WaitEntry? entry); - session = result ? entry!.Session : default; + bool result = _waitTable.TryGetValue(sessionId, out WaitEntry? entry); + session = result ? entry!.Moniker : default; return result; } } /// - public virtual Task WaitAsync(TSession moniker, CancellationToken cancellation = default) + public override Task WaitAsync(TSession moniker, CancellationToken cancellation = default) { //Token must not be cancelled cancellation.ThrowIfCancellationRequested(); @@ -83,12 +90,12 @@ namespace VNLib.Plugins.Sessions.Cache.Client lock (StoreLock) { //See if the entry already exists, otherwise get a new wait entry - if (!_waitStore.TryGetValue(moniker.SessionID, out WaitEntry? wait)) + if (!_waitTable.TryGetValue(moniker.SessionID, out WaitEntry? wait)) { GetWaitEntry(ref wait, moniker); //Add entry to store - _waitStore[moniker.SessionID] = wait; + _waitTable[moniker.SessionID] = wait; } //Get waiter before leaving lock @@ -99,32 +106,20 @@ namespace VNLib.Plugins.Sessions.Cache.Client } /// - public virtual void Release(TSession moniker) - { - /* - * When releasing a lock on a moniker, we store entires in an internal table. Wait entires also require mutual - * exclustion to properly track waiters. This happens inside a single lock for lower entry times/complexity. - * The wait's internal semaphore may also cause longer waits within the lock, so wait entires are "prepared" - * by using tokens to access the wait/release mechanisms with proper tracking. - * - * Tokens can be used to control the wait because the call to release may cause thread yielding (if internal - * WaitHandle is being used), so we don't want to block other callers. - * - * When there are no more waiters for a moniker at the time the lock was entered, the WaitEntry is released - * back to the pool. - */ + public override void Release(TSession moniker) + { WaitReleaseToken releaser; lock (StoreLock) { - WaitEntry entry = _waitStore[moniker.SessionID]; + WaitEntry entry = _waitTable[moniker.SessionID]; //Call release while holding store lock if(entry.Release(out releaser) == 0) { //No more waiters - _waitStore.Remove(moniker.SessionID); + _waitTable.Remove(moniker.SessionID); /* * We must release the semaphore before returning to pool, @@ -142,211 +137,15 @@ namespace VNLib.Plugins.Sessions.Cache.Client releaser.Release(); } - private void GetWaitEntry([NotNull] ref WaitEntry? wait, TSession session) - { - //Try to get wait from pool - if(!_entryPool.TryPop(out wait)) - { - wait = new(); - } - - //Init wait with session - wait.Prepare(session); - } - - private void ReturnEntry(WaitEntry entry) - { - //Remove session ref - entry.Prepare(default); - - if(_entryPool.Count < MaxPoolSize) - { - _entryPool.Push(entry); - } - else - { - //Dispose entry since were not storing it - entry.Dispose(); - } - } - - /// - /// NOOP - /// - public void CacheClear() - { } - /// - public void CacheHardClear() + public new void CacheHardClear() { - //Take lock to remove the stored wait entires to dispose of them - WaitEntry[] pooled; - + base.CacheHardClear(); lock (StoreLock) { - pooled = _entryPool.ToArray(); - _entryPool.Clear(); - - //Cleanup the wait store - _waitStore.TrimExcess(MaxPoolSize); - } - - //Dispose entires - Array.ForEach(pooled, static pooled => pooled.Dispose()); - } - - /// - /// An entry within the lock table that - /// - protected sealed class WaitEntry : IDisposable - { - private uint _waitCount; - private readonly SemaphoreSlim _waitHandle; - - /// - /// The session this entry is providing mutual exclusion to - /// - public TSession? Session { get; private set; } - - /// - /// Initializes a new - /// - public WaitEntry() - { - _waitHandle = new(1, 1); - Session = default!; - } - - /// - /// Gets a token used to enter the lock which may block, or yield async - /// outside of a nested lock - /// - /// The waiter used to enter a wait on the moniker - public WaitEnterToken GetWaiter() - { - /* - * Increment wait count before entering the lock - * A cancellation is the only way out, so cover that - * during the async, only if the token is cancelable - */ - _ = Interlocked.Increment(ref _waitCount); - return new(this); - } - - /// - /// Prepares a release - /// - /// - /// The token that should be used to release the exclusive lock held on - /// a moniker - /// - /// The number of remaining waiters - public uint Release(out WaitReleaseToken releaser) - { - releaser = new(_waitHandle); - - //Decrement release count before leaving - return Interlocked.Decrement(ref _waitCount); + _waitTable.TrimExcess(); } - - /// - /// Prepres a new for - /// its new session. - /// - /// The session to hold a referrnce to - public void Prepare(TSession? session) - { - Session = session; - _waitCount = 0; - } - - /* - * Called by WaitEnterToken to enter the lock - * outside a nested lock - */ - - internal Task WaitAsync(CancellationToken cancellation) - { - - //See if lock can be entered synchronously - if (_waitHandle.Wait(0, CancellationToken.None)) - { - //Lock was entered successfully without async yield - return Task.CompletedTask; - } - - //Lock must be entered async - - //Check to confirm cancellation may happen - if (cancellation.CanBeCanceled) - { - //Task may be cancelled, so we need to monitor the results to properly set waiting count - Task wait = _waitHandle.WaitAsync(cancellation); - return WaitForLockEntryWithCancellationAsync(wait); - } - else - { - //Task cannot be canceled, so we dont need to monitor the results - return _waitHandle.WaitAsync(CancellationToken.None); - } - } - - private async Task WaitForLockEntryWithCancellationAsync(Task wait) - { - try - { - await wait.ConfigureAwait(false); - } - catch - { - //Decrement wait count on error entering lock async - _ = Interlocked.Decrement(ref _waitCount); - throw; - } - } - - /// - public void Dispose() - { - _waitHandle.Dispose(); - GC.SuppressFinalize(this); - } - } - - /// - /// A token used to safely release an exclusive lock inside the - /// - /// - protected readonly ref struct WaitReleaseToken - { - private readonly SemaphoreSlim? _sem; - - internal WaitReleaseToken(SemaphoreSlim sem) => _sem = sem; - - /// - /// Releases the exclusive lock held by the token. NOTE: - /// this method may only be called ONCE after a wait has been - /// released - /// - public readonly void Release() => _sem?.Release(); - } - - /// - /// A token used to safely enter a wait for exclusive access to a - /// - protected readonly ref struct WaitEnterToken - { - private readonly WaitEntry _entry; - - internal WaitEnterToken(WaitEntry entry) => _entry = entry; - - /// - /// Enters the wait for the WaitEntry. This method may not block - /// or yield (IE Return ) - /// - /// A token to cancel the wait for the resource - /// - public Task EnterWaitAsync(CancellationToken cancellation) => _entry.WaitAsync(cancellation); } + } } -- cgit