aboutsummaryrefslogtreecommitdiff
path: root/libs/VNLib.Plugins.Sessions.Cache.Client/src/SessionCacheClient.cs
diff options
context:
space:
mode:
Diffstat (limited to 'libs/VNLib.Plugins.Sessions.Cache.Client/src/SessionCacheClient.cs')
-rw-r--r--libs/VNLib.Plugins.Sessions.Cache.Client/src/SessionCacheClient.cs242
1 files changed, 242 insertions, 0 deletions
diff --git a/libs/VNLib.Plugins.Sessions.Cache.Client/src/SessionCacheClient.cs b/libs/VNLib.Plugins.Sessions.Cache.Client/src/SessionCacheClient.cs
new file mode 100644
index 0000000..800ad66
--- /dev/null
+++ b/libs/VNLib.Plugins.Sessions.Cache.Client/src/SessionCacheClient.cs
@@ -0,0 +1,242 @@
+/*
+* Copyright (c) 2022 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Plugins.Sessions.Cache.Client
+* File: SessionCacheClient.cs
+*
+* SessionCacheClient.cs is part of VNLib.Plugins.Sessions.Cache.Client which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* VNLib.Plugins.Sessions.Cache.Client is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Plugins.Sessions.Cache.Client is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+using System.Collections.Generic;
+using System.Diagnostics.CodeAnalysis;
+
+using VNLib.Net.Http;
+using VNLib.Utils.Async;
+using VNLib.Utils.Logging;
+using VNLib.Utils.Memory.Caching;
+using VNLib.Plugins.Essentials.Sessions;
+
+namespace VNLib.Plugins.Sessions.Cache.Client
+{
+
+ /// <summary>
+ /// A client that allows access to sessions located on external servers
+ /// </summary>
+ public abstract class SessionCacheClient : ICacheHolder
+ {
+ public class LRUSessionStore<T> : LRUCache<string, T>, ICacheHolder where T : ISession
+ {
+ internal AsyncQueue<T> ExpiredSessions { get; }
+
+ ///<inheritdoc/>
+ public override bool IsReadOnly => false;
+ ///<inheritdoc/>
+ protected override int MaxCapacity { get; }
+
+
+ public LRUSessionStore(int maxCapacity) : base(StringComparer.Ordinal)
+ {
+ MaxCapacity = maxCapacity;
+ ExpiredSessions = new (true, true);
+ }
+
+ ///<inheritdoc/>
+ protected override bool CacheMiss(string key, [NotNullWhen(true)] out T? value)
+ {
+ value = default;
+ return false;
+ }
+
+ ///<inheritdoc/>
+ protected override void Evicted(KeyValuePair<string, T> evicted)
+ {
+ //add to queue, the list lock should be held during this operatio
+ _ = ExpiredSessions.TryEnque(evicted.Value);
+ }
+
+ ///<inheritdoc/>
+ public void CacheClear()
+ {
+ foreach (KeyValuePair<string, T> value in List)
+ {
+ Evicted(value);
+ }
+ Clear();
+ }
+
+ ///<inheritdoc/>
+ public void CacheHardClear()
+ {
+ CacheClear();
+ }
+ }
+
+ protected readonly LRUSessionStore<RemoteSession> CacheTable;
+ protected readonly object CacheLock;
+ protected readonly int MaxLoadedEntires;
+
+ /// <summary>
+ /// The client used to communicate with the cache server
+ /// </summary>
+ protected IRemoteCacheStore Store { get; }
+
+ /// <summary>
+ /// Initializes a new <see cref="SessionCacheClient"/>
+ /// </summary>
+ /// <param name="client"></param>
+ /// <param name="maxCacheItems">The maximum number of sessions to keep in memory</param>
+ protected SessionCacheClient(IRemoteCacheStore client, int maxCacheItems)
+ {
+ MaxLoadedEntires = maxCacheItems;
+ CacheLock = new();
+ CacheTable = new(maxCacheItems);
+ Store = client;
+ }
+
+ private ulong _waitingCount;
+
+ /// <summary>
+ /// The number of pending connections waiting for results from the cache server
+ /// </summary>
+ public ulong WaitingConnections => _waitingCount;
+
+ /// <summary>
+ /// Attempts to get a session from the cache identified by its sessionId asynchronously
+ /// </summary>
+ /// <param name="entity">The connection/request to attach the session to</param>
+ /// <param name="sessionId">The ID of the session to retrieve</param>
+ /// <param name="cancellationToken">A token to cancel the operation</param>
+ /// <returns>A <see cref="ValueTask"/> that resolves the remote session</returns>
+ /// <exception cref="SessionException"></exception>
+ public virtual async ValueTask<RemoteSession> GetSessionAsync(IHttpEvent entity, string sessionId, CancellationToken cancellationToken)
+ {
+ try
+ {
+ RemoteSession? session;
+ //Aquire lock on cache
+ lock (CacheLock)
+ {
+ //See if session is loaded into cache
+ if (!CacheTable.TryGetValue(sessionId, out session))
+ {
+ //Init new record
+ session = SessionCtor(sessionId);
+ //Add to cache
+ CacheTable.Add(session.SessionID, session);
+ }
+ //Valid entry found in cache
+ }
+
+ //Inc waiting count
+ Interlocked.Increment(ref _waitingCount);
+
+ try
+ {
+ //Load session-data
+ await session.WaitAndLoadAsync(entity, cancellationToken);
+ return session;
+ }
+ catch
+ {
+ //Remove the invalid cached session
+ lock (CacheLock)
+ {
+ _ = CacheTable.Remove(sessionId);
+ }
+ throw;
+ }
+ finally
+ {
+ //Dec waiting count
+ Interlocked.Decrement(ref _waitingCount);
+ }
+ }
+ catch (SessionException)
+ {
+ throw;
+ }
+ catch (OperationCanceledException)
+ {
+ throw;
+ }
+ //Wrap exceptions
+ catch (Exception ex)
+ {
+ throw new SessionException("An unhandled exception was raised", ex);
+ }
+ }
+
+ /// <summary>
+ /// Gets a new <see cref="RemoteSession"/> instances for the given sessionId,
+ /// and places it a the head of internal cache
+ /// </summary>
+ /// <param name="sessionId">The session identifier</param>
+ /// <returns>The new session for the given ID</returns>
+ protected abstract RemoteSession SessionCtor(string sessionId);
+
+ /// <summary>
+ /// Begins waiting for expired sessions to be evicted from the cache table that
+ /// may have pending synchronization operations
+ /// </summary>
+ /// <param name="log"></param>
+ /// <param name="token"></param>
+ /// <returns></returns>
+ public async Task CleanupExpiredSessionsAsync(ILogProvider log, CancellationToken token)
+ {
+ while (true)
+ {
+ try
+ {
+ //Wait for expired session and dispose it
+ using RemoteSession session = await CacheTable.ExpiredSessions.DequeueAsync(token);
+
+ //Obtain lock on session
+ await session.WaitOneAsync(CancellationToken.None);
+
+ log.Verbose("Removed expired session {id}", session.SessionID);
+ }
+ catch (OperationCanceledException)
+ {
+ break;
+ }
+ catch(Exception ex)
+ {
+ log.Error(ex);
+ }
+ }
+ }
+
+ ///<inheritdoc/>
+ public void CacheClear()
+ {
+
+ }
+ ///<inheritdoc/>
+ public void CacheHardClear()
+ {
+ //Cleanup cache when disconnected
+ lock (CacheLock)
+ {
+ CacheTable.CacheHardClear();
+ }
+ }
+ }
+}