aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLibravatar vnugent <public@vaughnnugent.com>2024-07-15 19:01:28 -0400
committerLibravatar vnugent <public@vaughnnugent.com>2024-07-15 19:01:28 -0400
commit24929f4e7acce9847f4cbe813e850ee57d474723 (patch)
tree3764dfcc9212975be7b5238349412f9cc45e4b3c
parent5e8f5f87b243930e17bfe99a34e170161204b4e1 (diff)
refactor: Update entity result caching
-rw-r--r--lib/VNLib.Plugins.Extensions.VNCache/src/DataModel/EntityCacheExtensions.cs20
-rw-r--r--lib/VNLib.Plugins.Extensions.VNCache/src/DataModel/EntityResultCache.cs180
-rw-r--r--lib/VNLib.Plugins.Extensions.VNCache/src/DataModel/IEntityStore.cs9
-rw-r--r--lib/VNLib.Plugins.Extensions.VNCache/src/DataModel/TransparentEntityCache.cs74
-rw-r--r--plugins/VNLib.Data.Caching.Providers.VNCache/src/Clustering/ClusterNodeIndex.cs18
-rw-r--r--plugins/VNLib.Data.Caching.Providers.VNCache/src/FBMCacheClient.cs14
-rw-r--r--plugins/VNLib.Data.Caching.Providers.VNCache/src/VnCacheClientConfig.cs3
7 files changed, 204 insertions, 114 deletions
diff --git a/lib/VNLib.Plugins.Extensions.VNCache/src/DataModel/EntityCacheExtensions.cs b/lib/VNLib.Plugins.Extensions.VNCache/src/DataModel/EntityCacheExtensions.cs
index a26da27..2c77218 100644
--- a/lib/VNLib.Plugins.Extensions.VNCache/src/DataModel/EntityCacheExtensions.cs
+++ b/lib/VNLib.Plugins.Extensions.VNCache/src/DataModel/EntityCacheExtensions.cs
@@ -133,26 +133,6 @@ namespace VNLib.Plugins.Extensions.VNCache.DataModel
ArgumentNullException.ThrowIfNull(cache);
return new EntityCacheBuilder<TEntity>(cache);
}
-
- /// <summary>
- /// Builds a transparent entity result cache for a backing store.
- /// </summary>
- /// <typeparam name="TEntity"></typeparam>
- /// <param name="builder"></param>
- /// <param name="store">The backing data store used to fetch results from</param>
- /// <returns>The new <see cref="TransparentEntityCache{TEntity}"/> that wraps the entity store</returns>
- public static TransparentEntityCache<TEntity> BuildTransparent<TEntity>(this EntityCacheBuilder<TEntity> builder, IEntityStore<TEntity> store)
- where TEntity : class
- {
- ArgumentNullException.ThrowIfNull(builder);
- ArgumentNullException.ThrowIfNull(store);
-
- EntityResultCache<TEntity> resultCache = builder.Build();
-
- return new TransparentEntityCache<TEntity>(store, resultCache);
- }
-
-
private sealed class EntityCacheImpl<T>(IGlobalCacheProvider cache, ICacheObjectDeserializer deserializer, ICacheObjectSerializer serializer)
: IEntityCache<T> where T : class
{
diff --git a/lib/VNLib.Plugins.Extensions.VNCache/src/DataModel/EntityResultCache.cs b/lib/VNLib.Plugins.Extensions.VNCache/src/DataModel/EntityResultCache.cs
index fb75fc6..56311f0 100644
--- a/lib/VNLib.Plugins.Extensions.VNCache/src/DataModel/EntityResultCache.cs
+++ b/lib/VNLib.Plugins.Extensions.VNCache/src/DataModel/EntityResultCache.cs
@@ -45,13 +45,31 @@ namespace VNLib.Plugins.Extensions.VNCache.DataModel
{
/// <summary>
+ /// The backing entity cache store
+ /// </summary>
+ public IEntityCache<TEntity> Cache => cache;
+
+ /// <summary>
+ /// The task policy for which this result cache will
+ /// respect
+ /// </summary>
+ public ICacheTaskPolicy TaskPolicy => taskPolicy;
+
+ /// <summary>
+ /// The expiration policy for which this result cache will
+ /// respect for entity expiration and refreshing
+ /// </summary>
+ public ICacheExpirationPolicy<TEntity> ExpirationPolicy => expirationPolicy;
+
+
+ /// <summary>
/// Fetchs a result by it's request entity
/// </summary>
/// <param name="request">The fetch request state object</param>
/// <param name="cancellation">A token to canel the operation</param>
/// <param name="resultFactory">A callback generator function</param>
/// <returns>A task the returns the result of the requested entity, or null if it was not found or provided by the backing store</returns>
- public async Task<TEntity?> FetchAsync<TRequest>(
+ public Task<TEntity?> FetchAsync<TRequest>(
TRequest request,
Func<TRequest, CancellationToken, Task<TEntity?>> resultFactory,
CancellationToken cancellation = default
@@ -59,9 +77,48 @@ namespace VNLib.Plugins.Extensions.VNCache.DataModel
{
ArgumentNullException.ThrowIfNull(request);
ArgumentNullException.ThrowIfNull(resultFactory);
- cancellation.ThrowIfCancellationRequested();
- string key = request.GetKey();
+ return FetchAsync(
+ key: request.GetKey(),
+ state: (resultFactory, request),
+ resultFactory: static (rf, c) => rf.resultFactory(rf.request, c),
+ cancellation
+ );
+ }
+
+ /// <summary>
+ /// Fetchs a result by it's request entity
+ /// </summary>
+ /// <param name="key">The fetch request state object</param>
+ /// <param name="cancellation">A token to canel the operation</param>
+ /// <param name="resultFactory">A callback generator function</param>
+ /// <returns>A task the returns the result of the requested entity, or null if it was not found or provided by the backing store</returns>
+ public Task<TEntity?> FetchAsync(
+ string key,
+ Func<CancellationToken, Task<TEntity?>> resultFactory,
+ CancellationToken cancellation = default
+ )
+ {
+ ArgumentNullException.ThrowIfNull(resultFactory);
+
+ return FetchAsync(
+ key,
+ state: resultFactory,
+ resultFactory: static (rf, c) => rf(c),
+ cancellation
+ );
+ }
+
+ private async Task<TEntity?> FetchAsync<TState>(
+ string key,
+ TState state,
+ Func<TState, CancellationToken, Task<TEntity?>> resultFactory,
+ CancellationToken cancellation = default
+ )
+ {
+ ArgumentException.ThrowIfNullOrWhiteSpace(key);
+ ArgumentNullException.ThrowIfNull(resultFactory);
+ cancellation.ThrowIfCancellationRequested();
//try to fetch from cache
TEntity? entity = await cache.GetAsync(key, cancellation);
@@ -79,7 +136,7 @@ namespace VNLib.Plugins.Extensions.VNCache.DataModel
if (entity is null)
{
//Cache miss, load from factory
- entity = await resultFactory(request, cancellation);
+ entity = await resultFactory(state, cancellation);
if (entity is not null)
{
@@ -127,5 +184,120 @@ namespace VNLib.Plugins.Extensions.VNCache.DataModel
return taskPolicy.ObserveOperationAsync(remove);
}
+
+ /// <summary>
+ /// Performs a cache replacement operation. That is substitutes an exiting
+ /// value with a new one, or inserts a new value if the key does not exist.
+ /// </summary>
+ /// <param name="request">The operation request state object</param>
+ /// <param name="entity">The entity object to store</param>
+ /// <param name="action">A generic callback function to invoke in parallel with the upsert operation</param>
+ /// <param name="cancellation">A token to cancel the async operation</param>
+ /// <returns>
+ /// A task that completes when the upsert operation has completed according to the
+ /// <see cref="TaskPolicy"/>
+ /// </returns>
+ /// <exception cref="ArgumentException"></exception>
+ /// <exception cref="ArgumentNullException"></exception>
+ public Task UpsertAsync<TRequest>(
+ TRequest request,
+ TEntity entity,
+ Func<TRequest, TEntity, CancellationToken, Task> action,
+ CancellationToken cancellation = default
+ ) where TRequest : IEntityCacheKey
+ {
+ ArgumentNullException.ThrowIfNull(request);
+ ArgumentNullException.ThrowIfNull(action);
+
+ return UpsertAsync(
+ key: request.GetKey(),
+ entity: entity,
+ state: (action, request),
+ callback: static (cb, e, c) => cb.action.Invoke(cb.request, e, c),
+ cancellation
+ );
+ }
+
+
+ /// <summary>
+ /// Performs a cache replacement operation. That is substitutes an exiting
+ /// value with a new one, or inserts a new value if the key does not exist.
+ /// </summary>
+ /// <param name="key">The entity's unique id within the cache store</param>
+ /// <param name="entity">The entity object to store</param>
+ /// <param name="action">A generic callback function to invoke in parallel with the upsert operation</param>
+ /// <param name="cancellation">A token to cancel the async operation</param>
+ /// <returns>
+ /// A task that completes when the upsert operation has completed according to the
+ /// <see cref="TaskPolicy"/>
+ /// </returns>
+ /// <exception cref="ArgumentException"></exception>
+ /// <exception cref="ArgumentNullException"></exception>
+ public Task UpsertAsync(
+ string key,
+ TEntity entity,
+ Func<TEntity, CancellationToken, Task> action,
+ CancellationToken cancellation = default
+ )
+ {
+ ArgumentNullException.ThrowIfNull(action);
+
+ return UpsertAsync(
+ key,
+ entity,
+ state: action,
+ callback: static (cb, e, c) => cb.Invoke(e, c),
+ cancellation
+ );
+ }
+
+ /// <summary>
+ /// Performs a cache replacement operation. That is substitutes an exiting
+ /// value with a new one, or inserts a new value if the key does not exist.
+ /// </summary>
+ /// <param name="key">The entity's unique id within the cache store</param>
+ /// <param name="entity">The entity object to store</param>
+ /// <param name="cancellation">A token to cancel the async operation</param>
+ /// <returns>
+ /// A task that completes when the upsert operation has completed according to the
+ /// <see cref="TaskPolicy"/>
+ /// </returns>
+ /// <exception cref="ArgumentNullException"></exception>
+ public Task UpsertAsync(string key, TEntity entity, CancellationToken cancellation = default)
+ {
+ return UpsertAsync<object?>(
+ key,
+ entity,
+ state: null,
+ callback: static (_, _, _) => Task.CompletedTask,
+ cancellation
+ );
+ }
+
+ private Task UpsertAsync<TState>(
+ string key,
+ TEntity entity,
+ TState state,
+ Func<TState, TEntity, CancellationToken, Task> callback,
+ CancellationToken cancellation = default
+ )
+ {
+ ArgumentException.ThrowIfNullOrWhiteSpace(key);
+ ArgumentNullException.ThrowIfNull(callback);
+ cancellation.ThrowIfCancellationRequested();
+
+ //Call refresh before storing the entity incase any setup needs to be performed
+ expirationPolicy.OnRefreshed(entity);
+
+ //Cache task must be observed by the task policy
+ Task upsert = taskPolicy.ObserveOperationAsync(
+ operation: cache.UpsertAsync(key, entity, cancellation)
+ );
+
+ Task cbResult = callback(state, entity, cancellation);
+
+ //Combine the observed task and the callback function
+ return Task.WhenAll(cbResult, upsert);
+ }
}
}
diff --git a/lib/VNLib.Plugins.Extensions.VNCache/src/DataModel/IEntityStore.cs b/lib/VNLib.Plugins.Extensions.VNCache/src/DataModel/IEntityStore.cs
index 29282cf..744ad8a 100644
--- a/lib/VNLib.Plugins.Extensions.VNCache/src/DataModel/IEntityStore.cs
+++ b/lib/VNLib.Plugins.Extensions.VNCache/src/DataModel/IEntityStore.cs
@@ -31,7 +31,8 @@ namespace VNLib.Plugins.Extensions.VNCache.DataModel
/// An instance that stores entities which can be fetched, updated, or removed
/// </summary>
/// <typeparam name="TEntity">The entity result type</typeparam>
- public interface IEntityStore<TEntity>
+ /// <typeparam name="TRequest">The entity operation request state object</typeparam>
+ public interface IEntityStore<TEntity, TRequest> where TRequest : IEntityCacheKey
{
/// <summary>
/// Fetches an entity from the store by it's request entity state object
@@ -39,7 +40,7 @@ namespace VNLib.Plugins.Extensions.VNCache.DataModel
/// <param name="request">The request state object</param>
/// <param name="cancellation">A token to cancel the operation</param>
/// <returns>A task that yields the entity object if it exists</returns>
- Task<TEntity?> GetAsync<TRequest>(TRequest request, CancellationToken cancellation = default) where TRequest : IEntityCacheKey;
+ Task<TEntity?> GetAsync(TRequest request, CancellationToken cancellation = default);
/// <summary>
/// Updates or inserts an entity into the store
@@ -48,7 +49,7 @@ namespace VNLib.Plugins.Extensions.VNCache.DataModel
/// <param name="request">The request state object</param>
/// <param name="cancellation">A token to cancel the operation</param>
/// <returns>A task that completes when the upsert operation has completed</returns>
- Task UpsertAsync<TRequest>(TRequest request, TEntity entity, CancellationToken cancellation = default) where TRequest : IEntityCacheKey;
+ Task UpsertAsync(TRequest request, TEntity entity, CancellationToken cancellation = default);
/// <summary>
/// Removes an entity from the store
@@ -56,6 +57,6 @@ namespace VNLib.Plugins.Extensions.VNCache.DataModel
/// <param name="request">The request state object</param>
/// <param name="cancellation">A token to cancel the operation</param>
/// <returns>A task that completes with the result of the delete operation</returns>
- Task<bool> RemoveAsync<TRequest>(TRequest request, CancellationToken cancellation = default) where TRequest : IEntityCacheKey;
+ Task<bool> RemoveAsync(TRequest request, CancellationToken cancellation = default);
}
}
diff --git a/lib/VNLib.Plugins.Extensions.VNCache/src/DataModel/TransparentEntityCache.cs b/lib/VNLib.Plugins.Extensions.VNCache/src/DataModel/TransparentEntityCache.cs
deleted file mode 100644
index 90aeb74..0000000
--- a/lib/VNLib.Plugins.Extensions.VNCache/src/DataModel/TransparentEntityCache.cs
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
-* Copyright (c) 2024 Vaughn Nugent
-*
-* Library: VNLib
-* Package: VNLib.Plugins.Extensions.VNCache
-* File: EntityCacheExtensions.cs
-*
-* EntityCacheExtensions.cs is part of VNLib.Plugins.Extensions.VNCache
-* which is part of the larger VNLib collection of libraries and utilities.
-*
-* VNLib.Plugins.Extensions.VNCache is free software: you can redistribute it and/or modify
-* it under the terms of the GNU Affero General Public License as
-* published by the Free Software Foundation, either version 3 of the
-* License, or (at your option) any later version.
-*
-* VNLib.Plugins.Extensions.VNCache is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-* GNU Affero General Public License for more details.
-*
-* You should have received a copy of the GNU Affero General Public License
-* along with this program. If not, see https://www.gnu.org/licenses/.
-*/
-
-using System.Linq;
-using System.Threading;
-using System.Threading.Tasks;
-
-namespace VNLib.Plugins.Extensions.VNCache.DataModel
-{
- /// <summary>
- /// A cache proxy that sits transparently between a backing store and the caller
- /// to cache unique entities
- /// </summary>
- /// <typeparam name="TEntity"></typeparam>
- /// <param name="store">The backing entity data store</param>
- /// <param name="cache">The entity cache used to fetch</param>
- public class TransparentEntityCache<TEntity>(IEntityStore<TEntity> store, EntityResultCache<TEntity> cache) : IEntityStore<TEntity>
- where TEntity : class
- {
-
- ///<inheritdoc/>
- public Task<TEntity?> GetAsync<TRequest>(TRequest request, CancellationToken cancellation = default)
- where TRequest : IEntityCacheKey
- {
- return cache.FetchAsync(request, store.GetAsync, cancellation);
- }
-
- ///<inheritdoc/>
- public Task<bool> RemoveAsync<TRequest>(TRequest request, CancellationToken cancellation = default)
- where TRequest : IEntityCacheKey
- {
- Task<bool> _fromCache = cache.RemoveAsync(request, cancellation)
- .ContinueWith(static (_) => true, TaskScheduler.Default);
-
- Task<bool> _fromStore = store.RemoveAsync(request, cancellation);
-
- return Task.WhenAll(_fromCache, _fromStore)
- .ContinueWith(static (t) => t.Result.All(static r => r), TaskScheduler.Default);
- }
-
- ///<inheritdoc/>
- public Task UpsertAsync<TRequest>(TRequest request, TEntity entity, CancellationToken cancellation = default)
- where TRequest : IEntityCacheKey
- {
- //Remove key from cache but push update to store
- Task _fromCache = cache.RemoveAsync(request, cancellation);
-
- Task _fromStore = store.UpsertAsync(request, entity, cancellation);
-
- return Task.WhenAll(_fromCache, _fromStore);
- }
- }
-}
diff --git a/plugins/VNLib.Data.Caching.Providers.VNCache/src/Clustering/ClusterNodeIndex.cs b/plugins/VNLib.Data.Caching.Providers.VNCache/src/Clustering/ClusterNodeIndex.cs
index e9dcbc5..effa4d7 100644
--- a/plugins/VNLib.Data.Caching.Providers.VNCache/src/Clustering/ClusterNodeIndex.cs
+++ b/plugins/VNLib.Data.Caching.Providers.VNCache/src/Clustering/ClusterNodeIndex.cs
@@ -124,7 +124,9 @@ namespace VNLib.Data.Caching.Providers.VNCache.Clustering
//Get all discovered nodes
CacheNodeAdvertisment[] ads = cluster.DiscoveredNodes.GetAllNodes();
//Just get a random node from the collection for now
- return ads.Length > 0 ? ads.SelectRandom() : null;
+ return ads.Length > 0
+ ? ads.SelectRandom()
+ : null;
}
///<inheritdoc/>
@@ -145,22 +147,28 @@ namespace VNLib.Data.Caching.Providers.VNCache.Clustering
internal string? SerializeNextNode()
{
CacheNodeAdvertisment? nextNode = GetNextNode();
- return nextNode == null ? null : JsonSerializer.Serialize(nextNode);
+ return nextNode is not null
+ ? JsonSerializer.Serialize(nextNode)
+ : null;
}
}
sealed class RemoteHandler(object RemoteIndex) : IClusterNodeIndex
{
- private readonly Func<string?> _remoteSerializer = ManagedLibrary.GetMethod<Func<string?>>(RemoteIndex, nameof(LocalHandler.SerializeNextNode), BindingFlags.NonPublic);
+ private readonly Func<string?> _remoteSerializer
+ = ManagedLibrary.GetMethod<Func<string?>>(RemoteIndex, nameof(LocalHandler.SerializeNextNode), BindingFlags.NonPublic);
- private readonly Func<CancellationToken, Task> _waitTask = ManagedLibrary.GetMethod<Func<CancellationToken, Task>>(RemoteIndex, nameof(WaitForDiscoveryAsync), BindingFlags.Public);
+ private readonly Func<CancellationToken, Task> _waitTask
+ = ManagedLibrary.GetMethod<Func<CancellationToken, Task>>(RemoteIndex, nameof(LocalHandler.WaitForDiscoveryAsync), BindingFlags.Public);
///<inheritdoc/>
public CacheNodeAdvertisment? GetNextNode()
{
//Deserialize the next node from the remote index
string? nexNode = _remoteSerializer();
- return nexNode == null ? null : JsonSerializer.Deserialize<CacheNodeAdvertisment>(nexNode);
+ return nexNode is not null
+ ? JsonSerializer.Deserialize<CacheNodeAdvertisment>(nexNode)
+ : null;
}
///<inheritdoc/>
diff --git a/plugins/VNLib.Data.Caching.Providers.VNCache/src/FBMCacheClient.cs b/plugins/VNLib.Data.Caching.Providers.VNCache/src/FBMCacheClient.cs
index af12b32..fc4b111 100644
--- a/plugins/VNLib.Data.Caching.Providers.VNCache/src/FBMCacheClient.cs
+++ b/plugins/VNLib.Data.Caching.Providers.VNCache/src/FBMCacheClient.cs
@@ -79,7 +79,7 @@ namespace VNLib.Data.Caching.Providers.VNCache
public FBMCacheClient(PluginBase plugin, IConfigScope config)
: this(
config.Deserialze<VnCacheClientConfig>(),
- plugin.IsDebug() ? plugin.Log : null,
+ plugin.IsDebug() ? plugin.Log.CreateScope("FBM-DEBUG") : null,
plugin
)
{
@@ -117,15 +117,17 @@ namespace VNLib.Data.Caching.Providers.VNCache
_config = config;
//Set a default node delay if null
- _initNodeDelay = _config.InitialNodeDelay.HasValue ? TimeSpan.FromSeconds(_config.InitialNodeDelay.Value) : InitialDelay;
+ _initNodeDelay = _config.InitialNodeDelay.HasValue
+ ? TimeSpan.FromSeconds(_config.InitialNodeDelay.Value)
+ : InitialDelay;
//Init the client with default settings
FBMClientConfig conf = FBMDataCacheExtensions.GetDefaultConfig(BufferHeap, (int)config.MaxBlobSize, config.RequestTimeout, debugLog);
FBMClientFactory clientFactory = new(
in conf,
- new FBMFallbackClientWsFactory(),
- 10
+ webSocketManager: new FBMFallbackClientWsFactory(),
+ maxClients: 10
);
_cluster = (new CacheClientConfiguration())
@@ -421,10 +423,10 @@ namespace VNLib.Data.Caching.Providers.VNCache
private sealed record class DiscoveryErrHAndler(ILogProvider Logger) : ICacheDiscoveryErrorHandler
{
public void OnDiscoveryError(CacheNodeAdvertisment errorNode, Exception ex)
- => OnDiscoveryError(errorNode, ex);
+ => OnDiscoveryError(ex, errorNode, address: null);
public void OnDiscoveryError(Uri errorAddress, Exception ex)
- => OnDiscoveryError(ex, null, errorAddress);
+ => OnDiscoveryError(ex, errorNode: null, errorAddress);
public void OnDiscoveryError(Exception ex, CacheNodeAdvertisment? errorNode, Uri? address)
{
diff --git a/plugins/VNLib.Data.Caching.Providers.VNCache/src/VnCacheClientConfig.cs b/plugins/VNLib.Data.Caching.Providers.VNCache/src/VnCacheClientConfig.cs
index 6ee410c..2731833 100644
--- a/plugins/VNLib.Data.Caching.Providers.VNCache/src/VnCacheClientConfig.cs
+++ b/plugins/VNLib.Data.Caching.Providers.VNCache/src/VnCacheClientConfig.cs
@@ -91,7 +91,8 @@ namespace VNLib.Data.Caching.Providers.VNCache
/// <exception cref="InvalidOperationException"></exception>
public Uri[] GetInitialNodeUris()
{
- _ = InitialNodes ?? throw new InvalidOperationException("Initial nodes have not been set");
+ Validate.NotNull(InitialNodes, "Initial nodes have not been set");
+
return InitialNodes.Select(static x =>
{
//Append a default well known endpoint if the path is just a root