aboutsummaryrefslogtreecommitdiff
path: root/libs/VNLib.Plugins.Sessions.Cache.Client
diff options
context:
space:
mode:
Diffstat (limited to 'libs/VNLib.Plugins.Sessions.Cache.Client')
-rw-r--r--libs/VNLib.Plugins.Sessions.Cache.Client/src/SessionSerializer.cs271
1 files changed, 35 insertions, 236 deletions
diff --git a/libs/VNLib.Plugins.Sessions.Cache.Client/src/SessionSerializer.cs b/libs/VNLib.Plugins.Sessions.Cache.Client/src/SessionSerializer.cs
index 8634c39..479a958 100644
--- a/libs/VNLib.Plugins.Sessions.Cache.Client/src/SessionSerializer.cs
+++ b/libs/VNLib.Plugins.Sessions.Cache.Client/src/SessionSerializer.cs
@@ -28,36 +28,43 @@ using System.Threading.Tasks;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
-using VNLib.Utils.Memory.Caching;
+using VNLib.Utils.Async;
namespace VNLib.Plugins.Sessions.Cache.Client
{
+
/// <summary>
/// Concrete <see cref="ISessionSerialzer{TSession}"/> that provides
/// access serialization for session types
/// </summary>
/// <typeparam name="TSession">The session type</typeparam>
- public class SessionSerializer<TSession> : ISessionSerialzer<TSession>, ICacheHolder where TSession : IRemoteSession
+ public class SessionSerializer<TSession> : AsyncAccessSerializer<TSession>, ISessionSerialzer<TSession> where TSession : IRemoteSession
{
-
- private readonly object StoreLock;
- private readonly Stack<WaitEntry> _entryPool;
- private readonly Dictionary<string, WaitEntry> _waitStore;
- private readonly int MaxPoolSize;
+ /*
+ * This implementation uses an internal store for wait entires
+ * that uses a string key instead of using the moniker directly,
+ * but inherits the concrete serialzer class to reuse the built
+ * in helper types and methods in the AsyncAccessSerializer class.
+ *
+ * The utils library is also currently the only lib that has
+ * automated testing, so that helps us.
+ *
+ * This is to allow sessions to be recovered from their session
+ * id instead of a session instance. All public api method calls
+ * are intercepted and routed to the internal wait table
+ */
+
+ private readonly Dictionary<string, WaitEntry> _waitTable;
+
/// <summary>
/// Initializes a new <see cref="SessionSerializer{TSession}"/>
/// </summary>
- /// <param name="poolCapacity">The maximum number of <see cref="WaitEntry"/> elements to pool</param>
- public SessionSerializer(int poolCapacity)
+ /// <param name="poolCapacity">The maximum number of wait entry instances to hold in memory cache</param>
+ public SessionSerializer(int poolCapacity):base(poolCapacity, 0, null)
{
- StoreLock = new();
- _entryPool = new Stack<WaitEntry>(poolCapacity);
-
//Session-ids are security senstive, we must use ordinal(binary) string comparison
- _waitStore = new Dictionary<string, WaitEntry>(poolCapacity, StringComparer.Ordinal);
-
- MaxPoolSize = poolCapacity;
+ _waitTable = new Dictionary<string, WaitEntry>(poolCapacity, StringComparer.Ordinal);
}
///<inheritdoc/>
@@ -66,14 +73,14 @@ namespace VNLib.Plugins.Sessions.Cache.Client
lock (StoreLock)
{
//Try to see if an entry is loaded, and get the session
- bool result = _waitStore.TryGetValue(sessionId, out WaitEntry? entry);
- session = result ? entry!.Session : default;
+ bool result = _waitTable.TryGetValue(sessionId, out WaitEntry? entry);
+ session = result ? entry!.Moniker : default;
return result;
}
}
///<inheritdoc/>
- public virtual Task WaitAsync(TSession moniker, CancellationToken cancellation = default)
+ public override Task WaitAsync(TSession moniker, CancellationToken cancellation = default)
{
//Token must not be cancelled
cancellation.ThrowIfCancellationRequested();
@@ -83,12 +90,12 @@ namespace VNLib.Plugins.Sessions.Cache.Client
lock (StoreLock)
{
//See if the entry already exists, otherwise get a new wait entry
- if (!_waitStore.TryGetValue(moniker.SessionID, out WaitEntry? wait))
+ if (!_waitTable.TryGetValue(moniker.SessionID, out WaitEntry? wait))
{
GetWaitEntry(ref wait, moniker);
//Add entry to store
- _waitStore[moniker.SessionID] = wait;
+ _waitTable[moniker.SessionID] = wait;
}
//Get waiter before leaving lock
@@ -99,32 +106,20 @@ namespace VNLib.Plugins.Sessions.Cache.Client
}
///<inheritdoc/>
- public virtual void Release(TSession moniker)
- {
- /*
- * When releasing a lock on a moniker, we store entires in an internal table. Wait entires also require mutual
- * exclustion to properly track waiters. This happens inside a single lock for lower entry times/complexity.
- * The wait's internal semaphore may also cause longer waits within the lock, so wait entires are "prepared"
- * by using tokens to access the wait/release mechanisms with proper tracking.
- *
- * Tokens can be used to control the wait because the call to release may cause thread yielding (if internal
- * WaitHandle is being used), so we don't want to block other callers.
- *
- * When there are no more waiters for a moniker at the time the lock was entered, the WaitEntry is released
- * back to the pool.
- */
+ public override void Release(TSession moniker)
+ {
WaitReleaseToken releaser;
lock (StoreLock)
{
- WaitEntry entry = _waitStore[moniker.SessionID];
+ WaitEntry entry = _waitTable[moniker.SessionID];
//Call release while holding store lock
if(entry.Release(out releaser) == 0)
{
//No more waiters
- _waitStore.Remove(moniker.SessionID);
+ _waitTable.Remove(moniker.SessionID);
/*
* We must release the semaphore before returning to pool,
@@ -142,211 +137,15 @@ namespace VNLib.Plugins.Sessions.Cache.Client
releaser.Release();
}
- private void GetWaitEntry([NotNull] ref WaitEntry? wait, TSession session)
- {
- //Try to get wait from pool
- if(!_entryPool.TryPop(out wait))
- {
- wait = new();
- }
-
- //Init wait with session
- wait.Prepare(session);
- }
-
- private void ReturnEntry(WaitEntry entry)
- {
- //Remove session ref
- entry.Prepare(default);
-
- if(_entryPool.Count < MaxPoolSize)
- {
- _entryPool.Push(entry);
- }
- else
- {
- //Dispose entry since were not storing it
- entry.Dispose();
- }
- }
-
- /// <summary>
- /// NOOP
- /// </summary>
- public void CacheClear()
- { }
-
///<inheritdoc/>
- public void CacheHardClear()
+ public new void CacheHardClear()
{
- //Take lock to remove the stored wait entires to dispose of them
- WaitEntry[] pooled;
-
+ base.CacheHardClear();
lock (StoreLock)
{
- pooled = _entryPool.ToArray();
- _entryPool.Clear();
-
- //Cleanup the wait store
- _waitStore.TrimExcess(MaxPoolSize);
- }
-
- //Dispose entires
- Array.ForEach(pooled, static pooled => pooled.Dispose());
- }
-
- /// <summary>
- /// An entry within the lock table that
- /// </summary>
- protected sealed class WaitEntry : IDisposable
- {
- private uint _waitCount;
- private readonly SemaphoreSlim _waitHandle;
-
- /// <summary>
- /// The session this entry is providing mutual exclusion to
- /// </summary>
- public TSession? Session { get; private set; }
-
- /// <summary>
- /// Initializes a new <see cref="WaitEntry"/>
- /// </summary>
- public WaitEntry()
- {
- _waitHandle = new(1, 1);
- Session = default!;
- }
-
- /// <summary>
- /// Gets a token used to enter the lock which may block, or yield async
- /// outside of a nested lock
- /// </summary>
- /// <returns>The waiter used to enter a wait on the moniker</returns>
- public WaitEnterToken GetWaiter()
- {
- /*
- * Increment wait count before entering the lock
- * A cancellation is the only way out, so cover that
- * during the async, only if the token is cancelable
- */
- _ = Interlocked.Increment(ref _waitCount);
- return new(this);
- }
-
- /// <summary>
- /// Prepares a release
- /// </summary>
- /// <param name="releaser">
- /// The token that should be used to release the exclusive lock held on
- /// a moniker
- /// </param>
- /// <returns>The number of remaining waiters</returns>
- public uint Release(out WaitReleaseToken releaser)
- {
- releaser = new(_waitHandle);
-
- //Decrement release count before leaving
- return Interlocked.Decrement(ref _waitCount);
+ _waitTable.TrimExcess();
}
-
- /// <summary>
- /// Prepres a new <see cref="WaitEntry"/> for
- /// its new session.
- /// </summary>
- /// <param name="session">The session to hold a referrnce to</param>
- public void Prepare(TSession? session)
- {
- Session = session;
- _waitCount = 0;
- }
-
- /*
- * Called by WaitEnterToken to enter the lock
- * outside a nested lock
- */
-
- internal Task WaitAsync(CancellationToken cancellation)
- {
-
- //See if lock can be entered synchronously
- if (_waitHandle.Wait(0, CancellationToken.None))
- {
- //Lock was entered successfully without async yield
- return Task.CompletedTask;
- }
-
- //Lock must be entered async
-
- //Check to confirm cancellation may happen
- if (cancellation.CanBeCanceled)
- {
- //Task may be cancelled, so we need to monitor the results to properly set waiting count
- Task wait = _waitHandle.WaitAsync(cancellation);
- return WaitForLockEntryWithCancellationAsync(wait);
- }
- else
- {
- //Task cannot be canceled, so we dont need to monitor the results
- return _waitHandle.WaitAsync(CancellationToken.None);
- }
- }
-
- private async Task WaitForLockEntryWithCancellationAsync(Task wait)
- {
- try
- {
- await wait.ConfigureAwait(false);
- }
- catch
- {
- //Decrement wait count on error entering lock async
- _ = Interlocked.Decrement(ref _waitCount);
- throw;
- }
- }
-
- ///<inheritdoc/>
- public void Dispose()
- {
- _waitHandle.Dispose();
- GC.SuppressFinalize(this);
- }
- }
-
- /// <summary>
- /// A token used to safely release an exclusive lock inside the
- /// <see cref="WaitEntry"/>
- /// </summary>
- protected readonly ref struct WaitReleaseToken
- {
- private readonly SemaphoreSlim? _sem;
-
- internal WaitReleaseToken(SemaphoreSlim sem) => _sem = sem;
-
- /// <summary>
- /// Releases the exclusive lock held by the token. NOTE:
- /// this method may only be called ONCE after a wait has been
- /// released
- /// </summary>
- public readonly void Release() => _sem?.Release();
- }
-
- /// <summary>
- /// A token used to safely enter a wait for exclusive access to a <see cref="WaitEntry"/>
- /// </summary>
- protected readonly ref struct WaitEnterToken
- {
- private readonly WaitEntry _entry;
-
- internal WaitEnterToken(WaitEntry entry) => _entry = entry;
-
- /// <summary>
- /// Enters the wait for the WaitEntry. This method may not block
- /// or yield (IE Return <see cref="Task.CompletedTask"/>)
- /// </summary>
- /// <param name="cancellation">A token to cancel the wait for the resource</param>
- /// <returns></returns>
- public Task EnterWaitAsync(CancellationToken cancellation) => _entry.WaitAsync(cancellation);
}
+
}
}