aboutsummaryrefslogtreecommitdiff
path: root/plugins/ObjectCacheServer/src/Cache
diff options
context:
space:
mode:
authorLibravatar vnugent <public@vaughnnugent.com>2023-07-15 13:06:00 -0400
committerLibravatar vnugent <public@vaughnnugent.com>2023-07-15 13:06:00 -0400
commit8b4fb26473256da5eaa89f3e9d2ac5d44f1e9b88 (patch)
tree6ff979b6110b9e6c61ff9f22bb0dbdd2094e08cf /plugins/ObjectCacheServer/src/Cache
parent2f674e79d42e7d36225fa9ac7ecefbc5bc62d325 (diff)
Latest working draft
Diffstat (limited to 'plugins/ObjectCacheServer/src/Cache')
-rw-r--r--plugins/ObjectCacheServer/src/Cache/CacheConfiguration.cs57
-rw-r--r--plugins/ObjectCacheServer/src/Cache/CacheEventQueueManager.cs263
-rw-r--r--plugins/ObjectCacheServer/src/Cache/CacheListenerPubQueue.cs139
-rw-r--r--plugins/ObjectCacheServer/src/Cache/CacheStore.cs125
-rw-r--r--plugins/ObjectCacheServer/src/Cache/CacheSystemUtil.cs242
5 files changed, 826 insertions, 0 deletions
diff --git a/plugins/ObjectCacheServer/src/Cache/CacheConfiguration.cs b/plugins/ObjectCacheServer/src/Cache/CacheConfiguration.cs
new file mode 100644
index 0000000..86e1f5a
--- /dev/null
+++ b/plugins/ObjectCacheServer/src/Cache/CacheConfiguration.cs
@@ -0,0 +1,57 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: ObjectCacheServer
+* File: CacheConfiguration.cs
+*
+* CacheConfiguration.cs is part of ObjectCacheServer which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* ObjectCacheServer is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* ObjectCacheServer is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System.Text.Json.Serialization;
+
+namespace VNLib.Data.Caching.ObjectCache.Server.Cache
+{
+ internal sealed class CacheConfiguration
+ {
+ [JsonPropertyName("buffer_recv_max")]
+ public int MaxRecvBufferSize { get; set; } = 1000 * 1024;
+ [JsonPropertyName("buffer_recv_min")]
+ public int MinRecvBufferSize { get; set; } = 8 * 1024;
+
+
+ [JsonPropertyName("buffer_header_max")]
+ public int MaxHeaderBufferSize { get; set; } = 2 * 1024;
+ [JsonPropertyName("buffer_header_min")]
+ public int MinHeaderBufferSize { get; set; } = 128;
+
+
+ [JsonPropertyName("max_message_size")]
+ public int MaxMessageSize { get; set; } = 1000 * 1024;
+
+
+ [JsonPropertyName("change_queue_max_depth")]
+ public int MaxEventQueueDepth { get; set; } = 10 * 1000;
+
+
+ [JsonPropertyName("max_cache")]
+ public uint MaxCacheEntries { get; set; } = 10000;
+
+ [JsonPropertyName("buckets")]
+ public uint BucketCount { get; set; } = 10;
+ }
+}
diff --git a/plugins/ObjectCacheServer/src/Cache/CacheEventQueueManager.cs b/plugins/ObjectCacheServer/src/Cache/CacheEventQueueManager.cs
new file mode 100644
index 0000000..ad0eb5a
--- /dev/null
+++ b/plugins/ObjectCacheServer/src/Cache/CacheEventQueueManager.cs
@@ -0,0 +1,263 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: ObjectCacheServer
+* File: CacheEventQueueManager.cs
+*
+* CacheEventQueueManager.cs is part of ObjectCacheServer which is
+* part of the larger VNLib collection of libraries and utilities.
+*
+* ObjectCacheServer is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* ObjectCacheServer is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using System.Threading.Channels;
+using System.Collections.Generic;
+
+using VNLib.Plugins;
+using VNLib.Utils.Async;
+using VNLib.Utils.Logging;
+using VNLib.Utils.Extensions;
+using VNLib.Plugins.Extensions.Loading;
+using VNLib.Plugins.Extensions.Loading.Events;
+
+
+namespace VNLib.Data.Caching.ObjectCache.Server.Cache
+{
+ internal sealed class CacheEventQueueManager : ICacheEventQueueManager, IDisposable, IIntervalScheduleable
+ {
+ private readonly int MaxQueueDepth;
+
+ private readonly object SubLock;
+ private readonly LinkedList<NodeQueue> Subscribers;
+
+ private readonly object StoreLock;
+ private readonly Dictionary<string, NodeQueue> QueueStore;
+
+
+ public CacheEventQueueManager(PluginBase plugin)
+ {
+ //Get node config
+ NodeConfig config = plugin.GetOrCreateSingleton<NodeConfig>();
+
+ //Get max queue depth
+ MaxQueueDepth = config.MaxQueueDepth;
+
+ /*
+ * Schedule purge interval to clean up stale queues
+ */
+ plugin.ScheduleInterval(this, config.EventQueuePurgeInterval);
+
+ SubLock = new();
+ Subscribers = new();
+
+ StoreLock = new();
+ QueueStore = new(StringComparer.OrdinalIgnoreCase);
+ }
+
+ ///<inheritdoc/>
+ public IPeerEventQueue Subscribe(ICachePeer peer)
+ {
+ NodeQueue? nq;
+
+ bool isNew = false;
+
+ //Enter sync lock
+ lock (StoreLock)
+ {
+ //Try to recover the queue for the node
+ if (!QueueStore.TryGetValue(peer.NodeId, out nq))
+ {
+ //Create new queue
+ nq = new(peer.NodeId, MaxQueueDepth);
+ QueueStore.Add(peer.NodeId, nq);
+ isNew = true;
+ }
+
+ //Increment listener count
+ nq.Listeners++;
+ }
+
+ //Publish new peer to subscribers list
+ if (isNew)
+ {
+ lock (SubLock)
+ {
+ //Add peer to subscribers list
+ Subscribers.AddLast(nq);
+ }
+ }
+
+ //Return the node's queue
+ return nq;
+ }
+
+ ///<inheritdoc/>
+ public void Unsubscribe(ICachePeer peer)
+ {
+ //Detach a listener for a node
+ lock (StoreLock)
+ {
+ //Get the queue and decrement the listener count
+ NodeQueue nq = QueueStore[peer.NodeId];
+ nq.Listeners--;
+ }
+ }
+
+ ///<inheritdoc/>
+ public void PublishSingle(ChangeEvent change)
+ {
+ //Wait to enter the sub lock
+ lock (SubLock)
+ {
+ //Loop through ll the fast way
+ LinkedListNode<NodeQueue>? q = Subscribers.First;
+
+ while (q != null)
+ {
+ //Pub single event node
+ q.Value.PublishChange(change);
+
+ //Get next queue
+ q = q.Next;
+ }
+ }
+ }
+
+ ///<inheritdoc/>
+ public void PublishMultiple(Span<ChangeEvent> changes)
+ {
+ //Wait to enter the sub lock
+ lock (SubLock)
+ {
+ //Loop through ll the fast way
+ LinkedListNode<NodeQueue>? q = Subscribers.First;
+
+ while (q != null)
+ {
+ //Publish multiple
+ q.Value.PublishChanges(changes);
+
+ //Get next queue
+ q = q.Next;
+ }
+ }
+ }
+
+ ///<inheritdoc/>
+ public void PurgeStaleSubscribers()
+ {
+ //Enter locks
+ lock (SubLock)
+ lock (StoreLock)
+ {
+ //Get all stale queues (queues without listeners)
+ NodeQueue[] staleQueues = QueueStore.Values.Where(static nq => nq.Listeners == 0).ToArray();
+
+ foreach (NodeQueue nq in staleQueues)
+ {
+ //Remove from store
+ QueueStore.Remove(nq.NodeId);
+
+ //remove from subscribers
+ Subscribers.Remove(nq);
+ }
+ }
+ }
+
+ //Interval to purge stale subscribers
+ Task IIntervalScheduleable.OnIntervalAsync(ILogProvider log, CancellationToken cancellationToken)
+ {
+ log.Debug("Purging stale peer event queues");
+
+ PurgeStaleSubscribers();
+
+ return Task.CompletedTask;
+ }
+
+ void IDisposable.Dispose()
+ {
+ QueueStore.Clear();
+ Subscribers.Clear();
+ }
+
+ /*
+ * Holds queues for each node and keeps track of the number of listeners
+ * attached to the queue
+ */
+
+ private sealed class NodeQueue : IPeerEventQueue
+ {
+ public int Listeners;
+
+ public string NodeId { get; }
+
+ public AsyncQueue<ChangeEvent> Queue { get; }
+
+ public NodeQueue(string nodeId, int maxDepth)
+ {
+ NodeId = nodeId;
+
+ /*
+ * Create a bounded channel that acts as a lru and evicts
+ * the oldest item when the queue is full
+ *
+ * There will also only ever be a single thread writing events
+ * to the queue
+ */
+
+ BoundedChannelOptions queueOptions = new(maxDepth)
+ {
+ AllowSynchronousContinuations = true,
+ SingleReader = false,
+ SingleWriter = true,
+ //Drop oldest item in queue if full
+ FullMode = BoundedChannelFullMode.DropOldest,
+ };
+
+ //Init queue/channel
+ Queue = new(queueOptions);
+ }
+
+ public void PublishChange(ChangeEvent change)
+ {
+ Queue.TryEnque(change);
+ }
+
+ public void PublishChanges(Span<ChangeEvent> changes)
+ {
+ for (int i = 0; i < changes.Length; i++)
+ {
+ Queue.TryEnque(changes[i]);
+ }
+ }
+
+ ///<inheritdoc/>
+ public ValueTask<ChangeEvent> DequeueAsync(CancellationToken cancellation)
+ {
+ return Queue.DequeueAsync(cancellation);
+ }
+
+ ///<inheritdoc/>
+ public bool TryDequeue(out ChangeEvent change)
+ {
+ return Queue.TryDequeue(out change);
+ }
+ }
+ }
+}
diff --git a/plugins/ObjectCacheServer/src/Cache/CacheListenerPubQueue.cs b/plugins/ObjectCacheServer/src/Cache/CacheListenerPubQueue.cs
new file mode 100644
index 0000000..ba39db6
--- /dev/null
+++ b/plugins/ObjectCacheServer/src/Cache/CacheListenerPubQueue.cs
@@ -0,0 +1,139 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: ObjectCacheServer
+* File: CacheListenerPubQueue.cs
+*
+* CacheListenerPubQueue.cs is part of ObjectCacheServer which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* ObjectCacheServer is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* ObjectCacheServer is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+using System.Threading.Channels;
+using System.Diagnostics.CodeAnalysis;
+
+using VNLib.Utils.Async;
+using VNLib.Utils.Logging;
+using VNLib.Plugins;
+using VNLib.Plugins.Extensions.Loading;
+
+namespace VNLib.Data.Caching.ObjectCache.Server.Cache
+{
+ /*
+ * Implements the event queue for the cache listener. Captures changes from the cache store
+ * and publishes them to subscribers.
+ *
+ * It also allows clients that are listening for changes to wait for events to
+ * their individual queues.
+ */
+
+ internal sealed class CacheListenerPubQueue : ICacheListenerEventQueue, IAsyncBackgroundWork
+ {
+ private const int MAX_LOCAL_QUEUE_ITEMS = 10000;
+ private const string LOG_SCOPE_NAME = "QUEUE";
+
+ private readonly AsyncQueue<ChangeEvent> _listenerQueue;
+ private readonly ILogProvider _logProvider;
+ private readonly ICacheEventQueueManager _queueManager;
+
+ public CacheListenerPubQueue(PluginBase plugin)
+ {
+ _queueManager = plugin.GetOrCreateSingleton<CacheEventQueueManager>();
+ _logProvider = plugin.Log.CreateScope(LOG_SCOPE_NAME);
+
+ //Init local queue to store published events
+ _listenerQueue = new(new BoundedChannelOptions(MAX_LOCAL_QUEUE_ITEMS)
+ {
+ AllowSynchronousContinuations = true,
+ FullMode = BoundedChannelFullMode.DropOldest,
+ SingleReader = true,
+ SingleWriter = false,
+ });
+ }
+
+ ///<inheritdoc/>
+ async Task IAsyncBackgroundWork.DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken)
+ {
+ const int accumulatorSize = 64;
+
+ //Create scope
+ pluginLog = pluginLog.CreateScope(LOG_SCOPE_NAME);
+
+ try
+ {
+ pluginLog.Debug("Change queue worker listening for local cache changes");
+
+ //Accumulator for events
+ ChangeEvent[] accumulator = new ChangeEvent[accumulatorSize];
+ int index = 0;
+
+ //Listen for changes
+ while (true)
+ {
+ //Wait for next event
+ accumulator[index++] = await _listenerQueue.DequeueAsync(exitToken);
+
+ //try to accumulate more events until we can't anymore
+ while (_listenerQueue.TryDequeue(out ChangeEvent? ev) && index < accumulatorSize)
+ {
+ accumulator[index++] = ev;
+ }
+
+ //Publish all events to subscribers
+ _queueManager.PublishMultiple(accumulator.AsSpan(0, index));
+
+ //Reset pointer
+ index = 0;
+ }
+ }
+ catch (OperationCanceledException)
+ {
+ //Normal exit
+ pluginLog.Debug("Change queue listener worker exited");
+ }
+ }
+
+ ///<inheritdoc/>
+ public bool IsEnabled([NotNullWhen(true)] object? userState)
+ {
+ return userState is IPeerEventQueue;
+ }
+
+ ///<inheritdoc/>
+ public void PublishEvent(ChangeEvent changeEvent)
+ {
+ if (!_listenerQueue.TryEnque(changeEvent))
+ {
+ _logProvider.Warn("Cache listener event queue is overflowing");
+ }
+ }
+
+ ///<inheritdoc/>
+ public bool TryDequeue(object userState, out ChangeEvent changeEvent)
+ {
+ return (userState as IPeerEventQueue)!.TryDequeue(out changeEvent);
+ }
+
+ ///<inheritdoc/>
+ public ValueTask<ChangeEvent> DequeueAsync(object userState, CancellationToken cancellation)
+ {
+ return (userState as IPeerEventQueue)!.DequeueAsync(cancellation);
+ }
+ }
+}
diff --git a/plugins/ObjectCacheServer/src/Cache/CacheStore.cs b/plugins/ObjectCacheServer/src/Cache/CacheStore.cs
new file mode 100644
index 0000000..f94a3f5
--- /dev/null
+++ b/plugins/ObjectCacheServer/src/Cache/CacheStore.cs
@@ -0,0 +1,125 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: ObjectCacheServer
+* File: CacheStore.cs
+*
+* CacheStore.cs is part of ObjectCacheServer which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* ObjectCacheServer is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* ObjectCacheServer is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+
+using VNLib.Utils.Logging;
+using VNLib.Plugins;
+using VNLib.Plugins.Extensions.Loading;
+
+namespace VNLib.Data.Caching.ObjectCache.Server.Cache
+{
+ /*
+ * Implements the blob cache store, which is an abstraction around the blob cache listener.
+ * This allows for publishing local events (say from other nodes) to keep caches in sync.
+ */
+
+ [ConfigurationName("cache")]
+ internal sealed class CacheStore : ICacheStore, IDisposable
+ {
+ /// <summary>
+ /// Gets the underlying cache listener
+ /// </summary>
+ public BlobCacheListener Listener { get; }
+
+
+ public CacheStore(PluginBase plugin, IConfigScope config)
+ {
+ //Init cache
+ Listener = InitializeCache((ObjectCacheServerEntry)plugin, config);
+ }
+
+ ///<inheritdoc/>
+ ValueTask ICacheStore.AddOrUpdateBlobAsync<T>(string objectId, string? alternateId, GetBodyDataCallback<T> bodyData, T state, CancellationToken token)
+ {
+ return Listener.Cache.AddOrUpdateObjectAsync(objectId, alternateId, bodyData, state, default, token);
+ }
+
+ ///<inheritdoc/>
+ void ICacheStore.Clear()
+ {
+ throw new NotImplementedException();
+ }
+
+ ///<inheritdoc/>
+ ValueTask<bool> ICacheStore.DeleteItemAsync(string id, CancellationToken token)
+ {
+ return Listener.Cache.DeleteObjectAsync(id, token);
+ }
+
+ private static BlobCacheListener InitializeCache(ObjectCacheServerEntry plugin, IConfigScope config)
+ {
+ const string CacheConfigTemplate =
+@"
+Cache Configuration:
+ Max memory: {max} Mb
+ Buckets: {bc}
+ Entries per-bucket: {mc}
+";
+
+ //Deserialize the cache config
+ CacheConfiguration cacheConf = config.Deserialze<CacheConfiguration>();
+
+ if (cacheConf.MaxCacheEntries < 2)
+ {
+ throw new ArgumentException("You must configure a 'max_cache' size larger than 1 item");
+ }
+
+ //Suggestion
+ if (cacheConf.MaxCacheEntries < 200)
+ {
+ plugin.Log.Information("Suggestion: You may want a larger cache size, you have less than 200 items in cache");
+ }
+
+ //calculate the max memory usage
+ ulong maxByteSize = cacheConf.MaxCacheEntries * (ulong)cacheConf.BucketCount * (ulong)cacheConf.MaxMessageSize;
+
+ //Log the cache config
+ plugin.Log.Information(CacheConfigTemplate,
+ maxByteSize / (1024 * 1000),
+ cacheConf.BucketCount,
+ cacheConf.MaxCacheEntries
+ );
+
+ //Get the event listener
+ ICacheListenerEventQueue queue = plugin.GetOrCreateSingleton<CacheListenerPubQueue>();
+
+ //Load the blob cache table system
+ IBlobCacheTable bc = plugin.LoadMemoryCacheSystem(config, plugin.CacheHeap, cacheConf);
+
+ //Endpoint only allows for a single reader
+ return new(bc, queue, plugin.Log, plugin.CacheHeap);
+ }
+
+ /*
+ * Cleaned up by the plugin on exit
+ */
+ public void Dispose()
+ {
+ Listener.Dispose();
+ }
+ }
+}
diff --git a/plugins/ObjectCacheServer/src/Cache/CacheSystemUtil.cs b/plugins/ObjectCacheServer/src/Cache/CacheSystemUtil.cs
new file mode 100644
index 0000000..2071d2b
--- /dev/null
+++ b/plugins/ObjectCacheServer/src/Cache/CacheSystemUtil.cs
@@ -0,0 +1,242 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: ObjectCacheServer
+* File: CacheSystemUtil.cs
+*
+* CacheSystemUtil.cs is part of ObjectCacheServer which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* ObjectCacheServer is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* ObjectCacheServer is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+using System.IO;
+using System.Text.Json;
+using System.Collections;
+using System.Collections.Generic;
+using System.Runtime.CompilerServices;
+
+using VNLib.Plugins;
+using VNLib.Utils.Memory;
+using VNLib.Plugins.Extensions.Loading;
+
+namespace VNLib.Data.Caching.ObjectCache.Server.Cache
+{
+ internal static class CacheSystemUtil
+ {
+ const string PERSISTANT_ASM_CONFIF_KEY = "persistant_cache_asm";
+ const string USER_CACHE_ASM_CONFIG_KEY = "custom_cache_impl_asm";
+ const string LOAD_METHOD_NAME = "OnRuntimeLoad";
+ const string TEARDOWN_METHOD_NAME = "OnSystemDetach";
+
+ /// <summary>
+ /// Loads the <see cref="IBlobCacheTable"/> implementation (dynamic or default) into the process
+ /// and initializes it and it's backing store.
+ /// </summary>
+ /// <param name="plugin"></param>
+ /// <param name="config">The configuration object that contains loading variables</param>
+ /// <param name="heap">The heap for memory cache table to allocate buffers from</param>
+ /// <param name="cacheConf">The cache configuration object</param>
+ /// <returns>The loaded <see cref="IBlobCacheTable"/> implementation</returns>
+ /// <exception cref="FileNotFoundException"></exception>
+ public static IBlobCacheTable LoadMemoryCacheSystem(this PluginBase plugin, IConfigScope config, IUnmangedHeap heap, CacheConfiguration cacheConf)
+ {
+ //First, try to load persitant cache store
+ PersistantCacheManager? pCManager = GetPersistantStore(plugin, config);
+
+ IBlobCacheTable table;
+
+ //See if the user defined a custom cache table implementation
+ if (config.TryGetValue(USER_CACHE_ASM_CONFIG_KEY, out JsonElement customEl))
+ {
+ string asmName = customEl.GetString() ?? throw new FileNotFoundException("User defined a custom blob cache assembly but the file name was null");
+
+ //Return the runtime loaded table
+ table = LoadCustomMemCacheTable(plugin, asmName, pCManager);
+ }
+ else
+ {
+ //Default type
+ table = GetInternalBlobCache(heap, cacheConf, pCManager);
+ }
+
+ //Initialize the subsystem from the cache table
+ pCManager?.InitializeSubsystem(table);
+
+ return table;
+ }
+
+ private static IBlobCacheTable GetInternalBlobCache(IUnmangedHeap heap, CacheConfiguration config, IPersistantCacheStore? store)
+ {
+ return new BlobCacheTable(config.BucketCount, config.MaxCacheEntries, heap, store);
+ }
+
+ private static IBlobCacheTable LoadCustomMemCacheTable(PluginBase plugin, string asmName, IPersistantCacheStore? store)
+ {
+ //Load the custom assembly
+ AssemblyLoader<IBlobCacheTable> customTable = plugin.LoadAssembly<IBlobCacheTable>(asmName);
+
+ try
+ {
+ //Try get onload method and pass the persistant cache instance
+ Action<PluginBase, IPersistantCacheStore?>? onLoad = customTable.TryGetMethod<Action<PluginBase, IPersistantCacheStore?>>(LOAD_METHOD_NAME);
+ onLoad?.Invoke(plugin, store);
+ }
+ catch
+ {
+ customTable.Dispose();
+ throw;
+ }
+
+ return new RuntimeBlobCacheTable(customTable);
+ }
+
+ private static PersistantCacheManager? GetPersistantStore(PluginBase plugin, IConfigScope config)
+ {
+ //Get the persistant assembly
+ if (!config.TryGetValue(PERSISTANT_ASM_CONFIF_KEY, out JsonElement asmEl))
+ {
+ return null;
+ }
+
+ string? asmName = asmEl.GetString();
+ if (asmName == null)
+ {
+ return null;
+ }
+
+ //Load the dynamic assembly into the alc
+ AssemblyLoader<IPersistantCacheStore> loader = plugin.LoadAssembly<IPersistantCacheStore>(asmName);
+ try
+ {
+ //Call the OnLoad method
+ Action<PluginBase, IConfigScope>? loadMethod = loader.TryGetMethod<Action<PluginBase, IConfigScope>>(LOAD_METHOD_NAME);
+
+ loadMethod?.Invoke(plugin, config);
+ }
+ catch
+ {
+ loader.Dispose();
+ throw;
+ }
+
+ //Return the
+ return new(loader);
+ }
+
+
+ private sealed class RuntimeBlobCacheTable : IBlobCacheTable
+ {
+
+ private readonly IBlobCacheTable _table;
+ private readonly Action? OnDetatch;
+
+ public RuntimeBlobCacheTable(AssemblyLoader<IBlobCacheTable> loader)
+ {
+ OnDetatch = loader.TryGetMethod<Action>(TEARDOWN_METHOD_NAME);
+ _table = loader.Resource;
+ }
+
+ public void Dispose()
+ {
+ //We can let the loader dispose the cache table, but we can notify of detatch
+ OnDetatch?.Invoke();
+ }
+
+
+ ///<inheritdoc/>
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ IBlobCacheBucket IBlobCacheTable.GetBucket(ReadOnlySpan<char> objectId) => _table.GetBucket(objectId);
+
+ ///<inheritdoc/>
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public IEnumerator<IBlobCacheBucket> GetEnumerator() => _table.GetEnumerator();
+
+ ///<inheritdoc/>
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ IEnumerator IEnumerable.GetEnumerator() => ((IEnumerable)_table).GetEnumerator();
+ }
+
+ internal sealed class PersistantCacheManager : IPersistantCacheStore
+ {
+ const string INITIALIZE_METHOD_NAME = "OnInitializeForBucket";
+
+
+ /*
+ * Our referrence can be technically unloaded, but so will
+ * this instance, since its loaded into the current ALC, so
+ * this referrence may exist for the lifetime of this instance.
+ *
+ * It also implements IDisposable, which the assembly loader class
+ * will call when this plugin is unloaded, we dont need to call
+ * it here, but we can signal a detach.
+ *
+ * Since the store implements IDisposable, its likely going to
+ * check for dispose on each call, so we don't need to add
+ * and additional disposed check since the method calls must be fast.
+ */
+
+ private readonly IPersistantCacheStore store;
+
+ private readonly Action<uint>? InitMethod;
+ private readonly Action? OnServiceDetatch;
+
+ public PersistantCacheManager(AssemblyLoader<IPersistantCacheStore> loader)
+ {
+ //Try to get the Initialize method
+ InitMethod = loader.TryGetMethod<Action<uint>>(INITIALIZE_METHOD_NAME);
+
+ //Get the optional detatch method
+ OnServiceDetatch = loader.TryGetMethod<Action>(TEARDOWN_METHOD_NAME);
+
+ store = loader.Resource;
+ }
+
+ /// <summary>
+ /// Optionally initializes the backing store by publishing the table's bucket
+ /// id's so it's made aware of the memory cache bucket system.
+ /// </summary>
+ /// <param name="table">The table containing buckets to publish</param>
+ public void InitializeSubsystem(IBlobCacheTable table)
+ {
+ //Itterate all buckets
+ foreach (IBlobCacheBucket bucket in table)
+ {
+ InitMethod?.Invoke(bucket.Id);
+ }
+ }
+
+ void IDisposable.Dispose()
+ {
+ //Assembly loader will dispose the type, we can just signal a detach
+
+ OnServiceDetatch?.Invoke();
+ }
+
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ bool IPersistantCacheStore.OnCacheMiss(uint bucketId, string key, IMemoryCacheEntryFactory factory, out CacheEntry entry)
+ {
+ return store.OnCacheMiss(bucketId, key, factory, out entry);
+ }
+
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ void IPersistantCacheStore.OnEntryDeleted(uint bucketId, string key) => store.OnEntryDeleted(bucketId, key);
+
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ void IPersistantCacheStore.OnEntryEvicted(uint bucketId, string key, in CacheEntry entry) => store.OnEntryEvicted(bucketId, key, in entry);
+ }
+ }
+}