From 24929f4e7acce9847f4cbe813e850ee57d474723 Mon Sep 17 00:00:00 2001 From: vnugent Date: Mon, 15 Jul 2024 19:01:28 -0400 Subject: refactor: Update entity result caching --- .../src/DataModel/EntityCacheExtensions.cs | 20 --- .../src/DataModel/EntityResultCache.cs | 180 ++++++++++++++++++++- .../src/DataModel/IEntityStore.cs | 9 +- .../src/DataModel/TransparentEntityCache.cs | 74 --------- .../src/Clustering/ClusterNodeIndex.cs | 18 ++- .../src/FBMCacheClient.cs | 14 +- .../src/VnCacheClientConfig.cs | 3 +- 7 files changed, 204 insertions(+), 114 deletions(-) delete mode 100644 lib/VNLib.Plugins.Extensions.VNCache/src/DataModel/TransparentEntityCache.cs diff --git a/lib/VNLib.Plugins.Extensions.VNCache/src/DataModel/EntityCacheExtensions.cs b/lib/VNLib.Plugins.Extensions.VNCache/src/DataModel/EntityCacheExtensions.cs index a26da27..2c77218 100644 --- a/lib/VNLib.Plugins.Extensions.VNCache/src/DataModel/EntityCacheExtensions.cs +++ b/lib/VNLib.Plugins.Extensions.VNCache/src/DataModel/EntityCacheExtensions.cs @@ -133,26 +133,6 @@ namespace VNLib.Plugins.Extensions.VNCache.DataModel ArgumentNullException.ThrowIfNull(cache); return new EntityCacheBuilder(cache); } - - /// - /// Builds a transparent entity result cache for a backing store. - /// - /// - /// - /// The backing data store used to fetch results from - /// The new that wraps the entity store - public static TransparentEntityCache BuildTransparent(this EntityCacheBuilder builder, IEntityStore store) - where TEntity : class - { - ArgumentNullException.ThrowIfNull(builder); - ArgumentNullException.ThrowIfNull(store); - - EntityResultCache resultCache = builder.Build(); - - return new TransparentEntityCache(store, resultCache); - } - - private sealed class EntityCacheImpl(IGlobalCacheProvider cache, ICacheObjectDeserializer deserializer, ICacheObjectSerializer serializer) : IEntityCache where T : class { diff --git a/lib/VNLib.Plugins.Extensions.VNCache/src/DataModel/EntityResultCache.cs b/lib/VNLib.Plugins.Extensions.VNCache/src/DataModel/EntityResultCache.cs index fb75fc6..56311f0 100644 --- a/lib/VNLib.Plugins.Extensions.VNCache/src/DataModel/EntityResultCache.cs +++ b/lib/VNLib.Plugins.Extensions.VNCache/src/DataModel/EntityResultCache.cs @@ -44,6 +44,24 @@ namespace VNLib.Plugins.Extensions.VNCache.DataModel where TEntity : class { + /// + /// The backing entity cache store + /// + public IEntityCache Cache => cache; + + /// + /// The task policy for which this result cache will + /// respect + /// + public ICacheTaskPolicy TaskPolicy => taskPolicy; + + /// + /// The expiration policy for which this result cache will + /// respect for entity expiration and refreshing + /// + public ICacheExpirationPolicy ExpirationPolicy => expirationPolicy; + + /// /// Fetchs a result by it's request entity /// @@ -51,7 +69,7 @@ namespace VNLib.Plugins.Extensions.VNCache.DataModel /// A token to canel the operation /// A callback generator function /// A task the returns the result of the requested entity, or null if it was not found or provided by the backing store - public async Task FetchAsync( + public Task FetchAsync( TRequest request, Func> resultFactory, CancellationToken cancellation = default @@ -59,9 +77,48 @@ namespace VNLib.Plugins.Extensions.VNCache.DataModel { ArgumentNullException.ThrowIfNull(request); ArgumentNullException.ThrowIfNull(resultFactory); - cancellation.ThrowIfCancellationRequested(); - string key = request.GetKey(); + return FetchAsync( + key: request.GetKey(), + state: (resultFactory, request), + resultFactory: static (rf, c) => rf.resultFactory(rf.request, c), + cancellation + ); + } + + /// + /// Fetchs a result by it's request entity + /// + /// The fetch request state object + /// A token to canel the operation + /// A callback generator function + /// A task the returns the result of the requested entity, or null if it was not found or provided by the backing store + public Task FetchAsync( + string key, + Func> resultFactory, + CancellationToken cancellation = default + ) + { + ArgumentNullException.ThrowIfNull(resultFactory); + + return FetchAsync( + key, + state: resultFactory, + resultFactory: static (rf, c) => rf(c), + cancellation + ); + } + + private async Task FetchAsync( + string key, + TState state, + Func> resultFactory, + CancellationToken cancellation = default + ) + { + ArgumentException.ThrowIfNullOrWhiteSpace(key); + ArgumentNullException.ThrowIfNull(resultFactory); + cancellation.ThrowIfCancellationRequested(); //try to fetch from cache TEntity? entity = await cache.GetAsync(key, cancellation); @@ -79,7 +136,7 @@ namespace VNLib.Plugins.Extensions.VNCache.DataModel if (entity is null) { //Cache miss, load from factory - entity = await resultFactory(request, cancellation); + entity = await resultFactory(state, cancellation); if (entity is not null) { @@ -127,5 +184,120 @@ namespace VNLib.Plugins.Extensions.VNCache.DataModel return taskPolicy.ObserveOperationAsync(remove); } + + /// + /// Performs a cache replacement operation. That is substitutes an exiting + /// value with a new one, or inserts a new value if the key does not exist. + /// + /// The operation request state object + /// The entity object to store + /// A generic callback function to invoke in parallel with the upsert operation + /// A token to cancel the async operation + /// + /// A task that completes when the upsert operation has completed according to the + /// + /// + /// + /// + public Task UpsertAsync( + TRequest request, + TEntity entity, + Func action, + CancellationToken cancellation = default + ) where TRequest : IEntityCacheKey + { + ArgumentNullException.ThrowIfNull(request); + ArgumentNullException.ThrowIfNull(action); + + return UpsertAsync( + key: request.GetKey(), + entity: entity, + state: (action, request), + callback: static (cb, e, c) => cb.action.Invoke(cb.request, e, c), + cancellation + ); + } + + + /// + /// Performs a cache replacement operation. That is substitutes an exiting + /// value with a new one, or inserts a new value if the key does not exist. + /// + /// The entity's unique id within the cache store + /// The entity object to store + /// A generic callback function to invoke in parallel with the upsert operation + /// A token to cancel the async operation + /// + /// A task that completes when the upsert operation has completed according to the + /// + /// + /// + /// + public Task UpsertAsync( + string key, + TEntity entity, + Func action, + CancellationToken cancellation = default + ) + { + ArgumentNullException.ThrowIfNull(action); + + return UpsertAsync( + key, + entity, + state: action, + callback: static (cb, e, c) => cb.Invoke(e, c), + cancellation + ); + } + + /// + /// Performs a cache replacement operation. That is substitutes an exiting + /// value with a new one, or inserts a new value if the key does not exist. + /// + /// The entity's unique id within the cache store + /// The entity object to store + /// A token to cancel the async operation + /// + /// A task that completes when the upsert operation has completed according to the + /// + /// + /// + public Task UpsertAsync(string key, TEntity entity, CancellationToken cancellation = default) + { + return UpsertAsync( + key, + entity, + state: null, + callback: static (_, _, _) => Task.CompletedTask, + cancellation + ); + } + + private Task UpsertAsync( + string key, + TEntity entity, + TState state, + Func callback, + CancellationToken cancellation = default + ) + { + ArgumentException.ThrowIfNullOrWhiteSpace(key); + ArgumentNullException.ThrowIfNull(callback); + cancellation.ThrowIfCancellationRequested(); + + //Call refresh before storing the entity incase any setup needs to be performed + expirationPolicy.OnRefreshed(entity); + + //Cache task must be observed by the task policy + Task upsert = taskPolicy.ObserveOperationAsync( + operation: cache.UpsertAsync(key, entity, cancellation) + ); + + Task cbResult = callback(state, entity, cancellation); + + //Combine the observed task and the callback function + return Task.WhenAll(cbResult, upsert); + } } } diff --git a/lib/VNLib.Plugins.Extensions.VNCache/src/DataModel/IEntityStore.cs b/lib/VNLib.Plugins.Extensions.VNCache/src/DataModel/IEntityStore.cs index 29282cf..744ad8a 100644 --- a/lib/VNLib.Plugins.Extensions.VNCache/src/DataModel/IEntityStore.cs +++ b/lib/VNLib.Plugins.Extensions.VNCache/src/DataModel/IEntityStore.cs @@ -31,7 +31,8 @@ namespace VNLib.Plugins.Extensions.VNCache.DataModel /// An instance that stores entities which can be fetched, updated, or removed /// /// The entity result type - public interface IEntityStore + /// The entity operation request state object + public interface IEntityStore where TRequest : IEntityCacheKey { /// /// Fetches an entity from the store by it's request entity state object @@ -39,7 +40,7 @@ namespace VNLib.Plugins.Extensions.VNCache.DataModel /// The request state object /// A token to cancel the operation /// A task that yields the entity object if it exists - Task GetAsync(TRequest request, CancellationToken cancellation = default) where TRequest : IEntityCacheKey; + Task GetAsync(TRequest request, CancellationToken cancellation = default); /// /// Updates or inserts an entity into the store @@ -48,7 +49,7 @@ namespace VNLib.Plugins.Extensions.VNCache.DataModel /// The request state object /// A token to cancel the operation /// A task that completes when the upsert operation has completed - Task UpsertAsync(TRequest request, TEntity entity, CancellationToken cancellation = default) where TRequest : IEntityCacheKey; + Task UpsertAsync(TRequest request, TEntity entity, CancellationToken cancellation = default); /// /// Removes an entity from the store @@ -56,6 +57,6 @@ namespace VNLib.Plugins.Extensions.VNCache.DataModel /// The request state object /// A token to cancel the operation /// A task that completes with the result of the delete operation - Task RemoveAsync(TRequest request, CancellationToken cancellation = default) where TRequest : IEntityCacheKey; + Task RemoveAsync(TRequest request, CancellationToken cancellation = default); } } diff --git a/lib/VNLib.Plugins.Extensions.VNCache/src/DataModel/TransparentEntityCache.cs b/lib/VNLib.Plugins.Extensions.VNCache/src/DataModel/TransparentEntityCache.cs deleted file mode 100644 index 90aeb74..0000000 --- a/lib/VNLib.Plugins.Extensions.VNCache/src/DataModel/TransparentEntityCache.cs +++ /dev/null @@ -1,74 +0,0 @@ -/* -* Copyright (c) 2024 Vaughn Nugent -* -* Library: VNLib -* Package: VNLib.Plugins.Extensions.VNCache -* File: EntityCacheExtensions.cs -* -* EntityCacheExtensions.cs is part of VNLib.Plugins.Extensions.VNCache -* which is part of the larger VNLib collection of libraries and utilities. -* -* VNLib.Plugins.Extensions.VNCache is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License as -* published by the Free Software Foundation, either version 3 of the -* License, or (at your option) any later version. -* -* VNLib.Plugins.Extensions.VNCache is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see https://www.gnu.org/licenses/. -*/ - -using System.Linq; -using System.Threading; -using System.Threading.Tasks; - -namespace VNLib.Plugins.Extensions.VNCache.DataModel -{ - /// - /// A cache proxy that sits transparently between a backing store and the caller - /// to cache unique entities - /// - /// - /// The backing entity data store - /// The entity cache used to fetch - public class TransparentEntityCache(IEntityStore store, EntityResultCache cache) : IEntityStore - where TEntity : class - { - - /// - public Task GetAsync(TRequest request, CancellationToken cancellation = default) - where TRequest : IEntityCacheKey - { - return cache.FetchAsync(request, store.GetAsync, cancellation); - } - - /// - public Task RemoveAsync(TRequest request, CancellationToken cancellation = default) - where TRequest : IEntityCacheKey - { - Task _fromCache = cache.RemoveAsync(request, cancellation) - .ContinueWith(static (_) => true, TaskScheduler.Default); - - Task _fromStore = store.RemoveAsync(request, cancellation); - - return Task.WhenAll(_fromCache, _fromStore) - .ContinueWith(static (t) => t.Result.All(static r => r), TaskScheduler.Default); - } - - /// - public Task UpsertAsync(TRequest request, TEntity entity, CancellationToken cancellation = default) - where TRequest : IEntityCacheKey - { - //Remove key from cache but push update to store - Task _fromCache = cache.RemoveAsync(request, cancellation); - - Task _fromStore = store.UpsertAsync(request, entity, cancellation); - - return Task.WhenAll(_fromCache, _fromStore); - } - } -} diff --git a/plugins/VNLib.Data.Caching.Providers.VNCache/src/Clustering/ClusterNodeIndex.cs b/plugins/VNLib.Data.Caching.Providers.VNCache/src/Clustering/ClusterNodeIndex.cs index e9dcbc5..effa4d7 100644 --- a/plugins/VNLib.Data.Caching.Providers.VNCache/src/Clustering/ClusterNodeIndex.cs +++ b/plugins/VNLib.Data.Caching.Providers.VNCache/src/Clustering/ClusterNodeIndex.cs @@ -124,7 +124,9 @@ namespace VNLib.Data.Caching.Providers.VNCache.Clustering //Get all discovered nodes CacheNodeAdvertisment[] ads = cluster.DiscoveredNodes.GetAllNodes(); //Just get a random node from the collection for now - return ads.Length > 0 ? ads.SelectRandom() : null; + return ads.Length > 0 + ? ads.SelectRandom() + : null; } /// @@ -145,22 +147,28 @@ namespace VNLib.Data.Caching.Providers.VNCache.Clustering internal string? SerializeNextNode() { CacheNodeAdvertisment? nextNode = GetNextNode(); - return nextNode == null ? null : JsonSerializer.Serialize(nextNode); + return nextNode is not null + ? JsonSerializer.Serialize(nextNode) + : null; } } sealed class RemoteHandler(object RemoteIndex) : IClusterNodeIndex { - private readonly Func _remoteSerializer = ManagedLibrary.GetMethod>(RemoteIndex, nameof(LocalHandler.SerializeNextNode), BindingFlags.NonPublic); + private readonly Func _remoteSerializer + = ManagedLibrary.GetMethod>(RemoteIndex, nameof(LocalHandler.SerializeNextNode), BindingFlags.NonPublic); - private readonly Func _waitTask = ManagedLibrary.GetMethod>(RemoteIndex, nameof(WaitForDiscoveryAsync), BindingFlags.Public); + private readonly Func _waitTask + = ManagedLibrary.GetMethod>(RemoteIndex, nameof(LocalHandler.WaitForDiscoveryAsync), BindingFlags.Public); /// public CacheNodeAdvertisment? GetNextNode() { //Deserialize the next node from the remote index string? nexNode = _remoteSerializer(); - return nexNode == null ? null : JsonSerializer.Deserialize(nexNode); + return nexNode is not null + ? JsonSerializer.Deserialize(nexNode) + : null; } /// diff --git a/plugins/VNLib.Data.Caching.Providers.VNCache/src/FBMCacheClient.cs b/plugins/VNLib.Data.Caching.Providers.VNCache/src/FBMCacheClient.cs index af12b32..fc4b111 100644 --- a/plugins/VNLib.Data.Caching.Providers.VNCache/src/FBMCacheClient.cs +++ b/plugins/VNLib.Data.Caching.Providers.VNCache/src/FBMCacheClient.cs @@ -79,7 +79,7 @@ namespace VNLib.Data.Caching.Providers.VNCache public FBMCacheClient(PluginBase plugin, IConfigScope config) : this( config.Deserialze(), - plugin.IsDebug() ? plugin.Log : null, + plugin.IsDebug() ? plugin.Log.CreateScope("FBM-DEBUG") : null, plugin ) { @@ -117,15 +117,17 @@ namespace VNLib.Data.Caching.Providers.VNCache _config = config; //Set a default node delay if null - _initNodeDelay = _config.InitialNodeDelay.HasValue ? TimeSpan.FromSeconds(_config.InitialNodeDelay.Value) : InitialDelay; + _initNodeDelay = _config.InitialNodeDelay.HasValue + ? TimeSpan.FromSeconds(_config.InitialNodeDelay.Value) + : InitialDelay; //Init the client with default settings FBMClientConfig conf = FBMDataCacheExtensions.GetDefaultConfig(BufferHeap, (int)config.MaxBlobSize, config.RequestTimeout, debugLog); FBMClientFactory clientFactory = new( in conf, - new FBMFallbackClientWsFactory(), - 10 + webSocketManager: new FBMFallbackClientWsFactory(), + maxClients: 10 ); _cluster = (new CacheClientConfiguration()) @@ -421,10 +423,10 @@ namespace VNLib.Data.Caching.Providers.VNCache private sealed record class DiscoveryErrHAndler(ILogProvider Logger) : ICacheDiscoveryErrorHandler { public void OnDiscoveryError(CacheNodeAdvertisment errorNode, Exception ex) - => OnDiscoveryError(errorNode, ex); + => OnDiscoveryError(ex, errorNode, address: null); public void OnDiscoveryError(Uri errorAddress, Exception ex) - => OnDiscoveryError(ex, null, errorAddress); + => OnDiscoveryError(ex, errorNode: null, errorAddress); public void OnDiscoveryError(Exception ex, CacheNodeAdvertisment? errorNode, Uri? address) { diff --git a/plugins/VNLib.Data.Caching.Providers.VNCache/src/VnCacheClientConfig.cs b/plugins/VNLib.Data.Caching.Providers.VNCache/src/VnCacheClientConfig.cs index 6ee410c..2731833 100644 --- a/plugins/VNLib.Data.Caching.Providers.VNCache/src/VnCacheClientConfig.cs +++ b/plugins/VNLib.Data.Caching.Providers.VNCache/src/VnCacheClientConfig.cs @@ -91,7 +91,8 @@ namespace VNLib.Data.Caching.Providers.VNCache /// public Uri[] GetInitialNodeUris() { - _ = InitialNodes ?? throw new InvalidOperationException("Initial nodes have not been set"); + Validate.NotNull(InitialNodes, "Initial nodes have not been set"); + return InitialNodes.Select(static x => { //Append a default well known endpoint if the path is just a root -- cgit