/* * Copyright (c) 2022 Vaughn Nugent * * Library: VNLib * Package: VNLib.Plugins.Sessions.Cache.Client * File: SessionCacheClient.cs * * SessionCacheClient.cs is part of VNLib.Plugins.Sessions.Cache.Client which is part of the larger * VNLib collection of libraries and utilities. * * VNLib.Plugins.Sessions.Cache.Client is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License as * published by the Free Software Foundation, either version 3 of the * License, or (at your option) any later version. * * VNLib.Plugins.Sessions.Cache.Client is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see https://www.gnu.org/licenses/. */ using System; using System.Threading; using System.Threading.Tasks; using System.Collections.Generic; using System.Diagnostics.CodeAnalysis; using VNLib.Net.Http; using VNLib.Utils.Async; using VNLib.Utils.Logging; using VNLib.Utils.Memory.Caching; using VNLib.Plugins.Essentials.Sessions; namespace VNLib.Plugins.Sessions.Cache.Client { /// /// A client that allows access to sessions located on external servers /// public abstract class SessionCacheClient : ICacheHolder { public class LRUSessionStore : LRUCache, ICacheHolder where T : ISession { internal AsyncQueue ExpiredSessions { get; } /// public override bool IsReadOnly => false; /// protected override int MaxCapacity { get; } public LRUSessionStore(int maxCapacity) : base(StringComparer.Ordinal) { MaxCapacity = maxCapacity; ExpiredSessions = new (true, true); } /// protected override bool CacheMiss(string key, [NotNullWhen(true)] out T? value) { value = default; return false; } /// protected override void Evicted(ref KeyValuePair evicted) { //add to queue, the list lock should be held during this operatio _ = ExpiredSessions.TryEnque(evicted.Value); } /// public void CacheClear() { foreach (KeyValuePair value in List) { KeyValuePair onStack = value; Evicted(ref onStack); } Clear(); } /// public void CacheHardClear() { CacheClear(); } } protected readonly LRUSessionStore CacheTable; protected readonly object CacheLock; protected readonly int MaxLoadedEntires; /// /// The client used to communicate with the cache server /// protected IRemoteCacheStore Store { get; } /// /// Gets a value that determines if the backing is connected /// to a server /// public bool IsConnected => Store.IsConnected; /// /// Initializes a new /// /// /// The maximum number of sessions to keep in memory protected SessionCacheClient(IRemoteCacheStore client, int maxCacheItems) { MaxLoadedEntires = maxCacheItems; CacheLock = new(); CacheTable = new(maxCacheItems); Store = client; } private ulong _waitingCount; /// /// The number of pending connections waiting for results from the cache server /// public ulong WaitingConnections => _waitingCount; /// /// Attempts to get a session from the cache identified by its sessionId asynchronously /// /// The connection/request to attach the session to /// The ID of the session to retrieve /// A token to cancel the operation /// A that resolves the remote session /// public virtual async ValueTask GetSessionAsync(IHttpEvent entity, string sessionId, CancellationToken cancellationToken) { try { RemoteSession? session; //Aquire lock on cache lock (CacheLock) { //See if session is loaded into cache if (!CacheTable.TryGetValue(sessionId, out session)) { //Init new record session = SessionCtor(sessionId); //Add to cache CacheTable.Add(session.SessionID, session); } //Valid entry found in cache } //Inc waiting count Interlocked.Increment(ref _waitingCount); try { //Load session-data await session.WaitAndLoadAsync(entity, cancellationToken); return session; } catch { //Remove the invalid cached session lock (CacheLock) { _ = CacheTable.Remove(sessionId); } throw; } finally { //Dec waiting count Interlocked.Decrement(ref _waitingCount); } } catch (SessionException) { throw; } catch (OperationCanceledException) { throw; } //Wrap exceptions catch (Exception ex) { throw new SessionException("An unhandled exception was raised", ex); } } /// /// Gets a new instances for the given sessionId, /// and places it a the head of internal cache /// /// The session identifier /// The new session for the given ID protected abstract RemoteSession SessionCtor(string sessionId); /// /// Begins waiting for expired sessions to be evicted from the cache table that /// may have pending synchronization operations /// /// /// /// public async Task CleanupExpiredSessionsAsync(ILogProvider log, CancellationToken token) { while (true) { try { //Wait for expired session and dispose it using RemoteSession session = await CacheTable.ExpiredSessions.DequeueAsync(token); //Obtain lock on session await session.WaitOneAsync(CancellationToken.None); log.Verbose("Removed expired session {id}", session.SessionID); } catch (OperationCanceledException) { break; } catch(Exception ex) { log.Error(ex); } } } /// public void CacheClear() { } /// public void CacheHardClear() { //Cleanup cache when disconnected lock (CacheLock) { CacheTable.CacheHardClear(); } } } }