diff options
author | vnugent <public@vaughnnugent.com> | 2023-07-15 13:06:00 -0400 |
---|---|---|
committer | vnugent <public@vaughnnugent.com> | 2023-07-15 13:06:00 -0400 |
commit | 8b4fb26473256da5eaa89f3e9d2ac5d44f1e9b88 (patch) | |
tree | 6ff979b6110b9e6c61ff9f22bb0dbdd2094e08cf /plugins/ObjectCacheServer/src/Cache | |
parent | 2f674e79d42e7d36225fa9ac7ecefbc5bc62d325 (diff) |
Latest working draft
Diffstat (limited to 'plugins/ObjectCacheServer/src/Cache')
5 files changed, 826 insertions, 0 deletions
diff --git a/plugins/ObjectCacheServer/src/Cache/CacheConfiguration.cs b/plugins/ObjectCacheServer/src/Cache/CacheConfiguration.cs new file mode 100644 index 0000000..86e1f5a --- /dev/null +++ b/plugins/ObjectCacheServer/src/Cache/CacheConfiguration.cs @@ -0,0 +1,57 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: ObjectCacheServer +* File: CacheConfiguration.cs +* +* CacheConfiguration.cs is part of ObjectCacheServer which is part of the larger +* VNLib collection of libraries and utilities. +* +* ObjectCacheServer is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* ObjectCacheServer is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System.Text.Json.Serialization; + +namespace VNLib.Data.Caching.ObjectCache.Server.Cache +{ + internal sealed class CacheConfiguration + { + [JsonPropertyName("buffer_recv_max")] + public int MaxRecvBufferSize { get; set; } = 1000 * 1024; + [JsonPropertyName("buffer_recv_min")] + public int MinRecvBufferSize { get; set; } = 8 * 1024; + + + [JsonPropertyName("buffer_header_max")] + public int MaxHeaderBufferSize { get; set; } = 2 * 1024; + [JsonPropertyName("buffer_header_min")] + public int MinHeaderBufferSize { get; set; } = 128; + + + [JsonPropertyName("max_message_size")] + public int MaxMessageSize { get; set; } = 1000 * 1024; + + + [JsonPropertyName("change_queue_max_depth")] + public int MaxEventQueueDepth { get; set; } = 10 * 1000; + + + [JsonPropertyName("max_cache")] + public uint MaxCacheEntries { get; set; } = 10000; + + [JsonPropertyName("buckets")] + public uint BucketCount { get; set; } = 10; + } +} diff --git a/plugins/ObjectCacheServer/src/Cache/CacheEventQueueManager.cs b/plugins/ObjectCacheServer/src/Cache/CacheEventQueueManager.cs new file mode 100644 index 0000000..ad0eb5a --- /dev/null +++ b/plugins/ObjectCacheServer/src/Cache/CacheEventQueueManager.cs @@ -0,0 +1,263 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: ObjectCacheServer +* File: CacheEventQueueManager.cs +* +* CacheEventQueueManager.cs is part of ObjectCacheServer which is +* part of the larger VNLib collection of libraries and utilities. +* +* ObjectCacheServer is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* ObjectCacheServer is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using System.Threading.Channels; +using System.Collections.Generic; + +using VNLib.Plugins; +using VNLib.Utils.Async; +using VNLib.Utils.Logging; +using VNLib.Utils.Extensions; +using VNLib.Plugins.Extensions.Loading; +using VNLib.Plugins.Extensions.Loading.Events; + + +namespace VNLib.Data.Caching.ObjectCache.Server.Cache +{ + internal sealed class CacheEventQueueManager : ICacheEventQueueManager, IDisposable, IIntervalScheduleable + { + private readonly int MaxQueueDepth; + + private readonly object SubLock; + private readonly LinkedList<NodeQueue> Subscribers; + + private readonly object StoreLock; + private readonly Dictionary<string, NodeQueue> QueueStore; + + + public CacheEventQueueManager(PluginBase plugin) + { + //Get node config + NodeConfig config = plugin.GetOrCreateSingleton<NodeConfig>(); + + //Get max queue depth + MaxQueueDepth = config.MaxQueueDepth; + + /* + * Schedule purge interval to clean up stale queues + */ + plugin.ScheduleInterval(this, config.EventQueuePurgeInterval); + + SubLock = new(); + Subscribers = new(); + + StoreLock = new(); + QueueStore = new(StringComparer.OrdinalIgnoreCase); + } + + ///<inheritdoc/> + public IPeerEventQueue Subscribe(ICachePeer peer) + { + NodeQueue? nq; + + bool isNew = false; + + //Enter sync lock + lock (StoreLock) + { + //Try to recover the queue for the node + if (!QueueStore.TryGetValue(peer.NodeId, out nq)) + { + //Create new queue + nq = new(peer.NodeId, MaxQueueDepth); + QueueStore.Add(peer.NodeId, nq); + isNew = true; + } + + //Increment listener count + nq.Listeners++; + } + + //Publish new peer to subscribers list + if (isNew) + { + lock (SubLock) + { + //Add peer to subscribers list + Subscribers.AddLast(nq); + } + } + + //Return the node's queue + return nq; + } + + ///<inheritdoc/> + public void Unsubscribe(ICachePeer peer) + { + //Detach a listener for a node + lock (StoreLock) + { + //Get the queue and decrement the listener count + NodeQueue nq = QueueStore[peer.NodeId]; + nq.Listeners--; + } + } + + ///<inheritdoc/> + public void PublishSingle(ChangeEvent change) + { + //Wait to enter the sub lock + lock (SubLock) + { + //Loop through ll the fast way + LinkedListNode<NodeQueue>? q = Subscribers.First; + + while (q != null) + { + //Pub single event node + q.Value.PublishChange(change); + + //Get next queue + q = q.Next; + } + } + } + + ///<inheritdoc/> + public void PublishMultiple(Span<ChangeEvent> changes) + { + //Wait to enter the sub lock + lock (SubLock) + { + //Loop through ll the fast way + LinkedListNode<NodeQueue>? q = Subscribers.First; + + while (q != null) + { + //Publish multiple + q.Value.PublishChanges(changes); + + //Get next queue + q = q.Next; + } + } + } + + ///<inheritdoc/> + public void PurgeStaleSubscribers() + { + //Enter locks + lock (SubLock) + lock (StoreLock) + { + //Get all stale queues (queues without listeners) + NodeQueue[] staleQueues = QueueStore.Values.Where(static nq => nq.Listeners == 0).ToArray(); + + foreach (NodeQueue nq in staleQueues) + { + //Remove from store + QueueStore.Remove(nq.NodeId); + + //remove from subscribers + Subscribers.Remove(nq); + } + } + } + + //Interval to purge stale subscribers + Task IIntervalScheduleable.OnIntervalAsync(ILogProvider log, CancellationToken cancellationToken) + { + log.Debug("Purging stale peer event queues"); + + PurgeStaleSubscribers(); + + return Task.CompletedTask; + } + + void IDisposable.Dispose() + { + QueueStore.Clear(); + Subscribers.Clear(); + } + + /* + * Holds queues for each node and keeps track of the number of listeners + * attached to the queue + */ + + private sealed class NodeQueue : IPeerEventQueue + { + public int Listeners; + + public string NodeId { get; } + + public AsyncQueue<ChangeEvent> Queue { get; } + + public NodeQueue(string nodeId, int maxDepth) + { + NodeId = nodeId; + + /* + * Create a bounded channel that acts as a lru and evicts + * the oldest item when the queue is full + * + * There will also only ever be a single thread writing events + * to the queue + */ + + BoundedChannelOptions queueOptions = new(maxDepth) + { + AllowSynchronousContinuations = true, + SingleReader = false, + SingleWriter = true, + //Drop oldest item in queue if full + FullMode = BoundedChannelFullMode.DropOldest, + }; + + //Init queue/channel + Queue = new(queueOptions); + } + + public void PublishChange(ChangeEvent change) + { + Queue.TryEnque(change); + } + + public void PublishChanges(Span<ChangeEvent> changes) + { + for (int i = 0; i < changes.Length; i++) + { + Queue.TryEnque(changes[i]); + } + } + + ///<inheritdoc/> + public ValueTask<ChangeEvent> DequeueAsync(CancellationToken cancellation) + { + return Queue.DequeueAsync(cancellation); + } + + ///<inheritdoc/> + public bool TryDequeue(out ChangeEvent change) + { + return Queue.TryDequeue(out change); + } + } + } +} diff --git a/plugins/ObjectCacheServer/src/Cache/CacheListenerPubQueue.cs b/plugins/ObjectCacheServer/src/Cache/CacheListenerPubQueue.cs new file mode 100644 index 0000000..ba39db6 --- /dev/null +++ b/plugins/ObjectCacheServer/src/Cache/CacheListenerPubQueue.cs @@ -0,0 +1,139 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: ObjectCacheServer +* File: CacheListenerPubQueue.cs +* +* CacheListenerPubQueue.cs is part of ObjectCacheServer which is part of the larger +* VNLib collection of libraries and utilities. +* +* ObjectCacheServer is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* ObjectCacheServer is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Threading; +using System.Threading.Tasks; +using System.Threading.Channels; +using System.Diagnostics.CodeAnalysis; + +using VNLib.Utils.Async; +using VNLib.Utils.Logging; +using VNLib.Plugins; +using VNLib.Plugins.Extensions.Loading; + +namespace VNLib.Data.Caching.ObjectCache.Server.Cache +{ + /* + * Implements the event queue for the cache listener. Captures changes from the cache store + * and publishes them to subscribers. + * + * It also allows clients that are listening for changes to wait for events to + * their individual queues. + */ + + internal sealed class CacheListenerPubQueue : ICacheListenerEventQueue, IAsyncBackgroundWork + { + private const int MAX_LOCAL_QUEUE_ITEMS = 10000; + private const string LOG_SCOPE_NAME = "QUEUE"; + + private readonly AsyncQueue<ChangeEvent> _listenerQueue; + private readonly ILogProvider _logProvider; + private readonly ICacheEventQueueManager _queueManager; + + public CacheListenerPubQueue(PluginBase plugin) + { + _queueManager = plugin.GetOrCreateSingleton<CacheEventQueueManager>(); + _logProvider = plugin.Log.CreateScope(LOG_SCOPE_NAME); + + //Init local queue to store published events + _listenerQueue = new(new BoundedChannelOptions(MAX_LOCAL_QUEUE_ITEMS) + { + AllowSynchronousContinuations = true, + FullMode = BoundedChannelFullMode.DropOldest, + SingleReader = true, + SingleWriter = false, + }); + } + + ///<inheritdoc/> + async Task IAsyncBackgroundWork.DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken) + { + const int accumulatorSize = 64; + + //Create scope + pluginLog = pluginLog.CreateScope(LOG_SCOPE_NAME); + + try + { + pluginLog.Debug("Change queue worker listening for local cache changes"); + + //Accumulator for events + ChangeEvent[] accumulator = new ChangeEvent[accumulatorSize]; + int index = 0; + + //Listen for changes + while (true) + { + //Wait for next event + accumulator[index++] = await _listenerQueue.DequeueAsync(exitToken); + + //try to accumulate more events until we can't anymore + while (_listenerQueue.TryDequeue(out ChangeEvent? ev) && index < accumulatorSize) + { + accumulator[index++] = ev; + } + + //Publish all events to subscribers + _queueManager.PublishMultiple(accumulator.AsSpan(0, index)); + + //Reset pointer + index = 0; + } + } + catch (OperationCanceledException) + { + //Normal exit + pluginLog.Debug("Change queue listener worker exited"); + } + } + + ///<inheritdoc/> + public bool IsEnabled([NotNullWhen(true)] object? userState) + { + return userState is IPeerEventQueue; + } + + ///<inheritdoc/> + public void PublishEvent(ChangeEvent changeEvent) + { + if (!_listenerQueue.TryEnque(changeEvent)) + { + _logProvider.Warn("Cache listener event queue is overflowing"); + } + } + + ///<inheritdoc/> + public bool TryDequeue(object userState, out ChangeEvent changeEvent) + { + return (userState as IPeerEventQueue)!.TryDequeue(out changeEvent); + } + + ///<inheritdoc/> + public ValueTask<ChangeEvent> DequeueAsync(object userState, CancellationToken cancellation) + { + return (userState as IPeerEventQueue)!.DequeueAsync(cancellation); + } + } +} diff --git a/plugins/ObjectCacheServer/src/Cache/CacheStore.cs b/plugins/ObjectCacheServer/src/Cache/CacheStore.cs new file mode 100644 index 0000000..f94a3f5 --- /dev/null +++ b/plugins/ObjectCacheServer/src/Cache/CacheStore.cs @@ -0,0 +1,125 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: ObjectCacheServer +* File: CacheStore.cs +* +* CacheStore.cs is part of ObjectCacheServer which is part of the larger +* VNLib collection of libraries and utilities. +* +* ObjectCacheServer is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* ObjectCacheServer is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Threading; +using System.Threading.Tasks; + +using VNLib.Utils.Logging; +using VNLib.Plugins; +using VNLib.Plugins.Extensions.Loading; + +namespace VNLib.Data.Caching.ObjectCache.Server.Cache +{ + /* + * Implements the blob cache store, which is an abstraction around the blob cache listener. + * This allows for publishing local events (say from other nodes) to keep caches in sync. + */ + + [ConfigurationName("cache")] + internal sealed class CacheStore : ICacheStore, IDisposable + { + /// <summary> + /// Gets the underlying cache listener + /// </summary> + public BlobCacheListener Listener { get; } + + + public CacheStore(PluginBase plugin, IConfigScope config) + { + //Init cache + Listener = InitializeCache((ObjectCacheServerEntry)plugin, config); + } + + ///<inheritdoc/> + ValueTask ICacheStore.AddOrUpdateBlobAsync<T>(string objectId, string? alternateId, GetBodyDataCallback<T> bodyData, T state, CancellationToken token) + { + return Listener.Cache.AddOrUpdateObjectAsync(objectId, alternateId, bodyData, state, default, token); + } + + ///<inheritdoc/> + void ICacheStore.Clear() + { + throw new NotImplementedException(); + } + + ///<inheritdoc/> + ValueTask<bool> ICacheStore.DeleteItemAsync(string id, CancellationToken token) + { + return Listener.Cache.DeleteObjectAsync(id, token); + } + + private static BlobCacheListener InitializeCache(ObjectCacheServerEntry plugin, IConfigScope config) + { + const string CacheConfigTemplate = +@" +Cache Configuration: + Max memory: {max} Mb + Buckets: {bc} + Entries per-bucket: {mc} +"; + + //Deserialize the cache config + CacheConfiguration cacheConf = config.Deserialze<CacheConfiguration>(); + + if (cacheConf.MaxCacheEntries < 2) + { + throw new ArgumentException("You must configure a 'max_cache' size larger than 1 item"); + } + + //Suggestion + if (cacheConf.MaxCacheEntries < 200) + { + plugin.Log.Information("Suggestion: You may want a larger cache size, you have less than 200 items in cache"); + } + + //calculate the max memory usage + ulong maxByteSize = cacheConf.MaxCacheEntries * (ulong)cacheConf.BucketCount * (ulong)cacheConf.MaxMessageSize; + + //Log the cache config + plugin.Log.Information(CacheConfigTemplate, + maxByteSize / (1024 * 1000), + cacheConf.BucketCount, + cacheConf.MaxCacheEntries + ); + + //Get the event listener + ICacheListenerEventQueue queue = plugin.GetOrCreateSingleton<CacheListenerPubQueue>(); + + //Load the blob cache table system + IBlobCacheTable bc = plugin.LoadMemoryCacheSystem(config, plugin.CacheHeap, cacheConf); + + //Endpoint only allows for a single reader + return new(bc, queue, plugin.Log, plugin.CacheHeap); + } + + /* + * Cleaned up by the plugin on exit + */ + public void Dispose() + { + Listener.Dispose(); + } + } +} diff --git a/plugins/ObjectCacheServer/src/Cache/CacheSystemUtil.cs b/plugins/ObjectCacheServer/src/Cache/CacheSystemUtil.cs new file mode 100644 index 0000000..2071d2b --- /dev/null +++ b/plugins/ObjectCacheServer/src/Cache/CacheSystemUtil.cs @@ -0,0 +1,242 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: ObjectCacheServer +* File: CacheSystemUtil.cs +* +* CacheSystemUtil.cs is part of ObjectCacheServer which is part of the larger +* VNLib collection of libraries and utilities. +* +* ObjectCacheServer is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* ObjectCacheServer is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.IO; +using System.Text.Json; +using System.Collections; +using System.Collections.Generic; +using System.Runtime.CompilerServices; + +using VNLib.Plugins; +using VNLib.Utils.Memory; +using VNLib.Plugins.Extensions.Loading; + +namespace VNLib.Data.Caching.ObjectCache.Server.Cache +{ + internal static class CacheSystemUtil + { + const string PERSISTANT_ASM_CONFIF_KEY = "persistant_cache_asm"; + const string USER_CACHE_ASM_CONFIG_KEY = "custom_cache_impl_asm"; + const string LOAD_METHOD_NAME = "OnRuntimeLoad"; + const string TEARDOWN_METHOD_NAME = "OnSystemDetach"; + + /// <summary> + /// Loads the <see cref="IBlobCacheTable"/> implementation (dynamic or default) into the process + /// and initializes it and it's backing store. + /// </summary> + /// <param name="plugin"></param> + /// <param name="config">The configuration object that contains loading variables</param> + /// <param name="heap">The heap for memory cache table to allocate buffers from</param> + /// <param name="cacheConf">The cache configuration object</param> + /// <returns>The loaded <see cref="IBlobCacheTable"/> implementation</returns> + /// <exception cref="FileNotFoundException"></exception> + public static IBlobCacheTable LoadMemoryCacheSystem(this PluginBase plugin, IConfigScope config, IUnmangedHeap heap, CacheConfiguration cacheConf) + { + //First, try to load persitant cache store + PersistantCacheManager? pCManager = GetPersistantStore(plugin, config); + + IBlobCacheTable table; + + //See if the user defined a custom cache table implementation + if (config.TryGetValue(USER_CACHE_ASM_CONFIG_KEY, out JsonElement customEl)) + { + string asmName = customEl.GetString() ?? throw new FileNotFoundException("User defined a custom blob cache assembly but the file name was null"); + + //Return the runtime loaded table + table = LoadCustomMemCacheTable(plugin, asmName, pCManager); + } + else + { + //Default type + table = GetInternalBlobCache(heap, cacheConf, pCManager); + } + + //Initialize the subsystem from the cache table + pCManager?.InitializeSubsystem(table); + + return table; + } + + private static IBlobCacheTable GetInternalBlobCache(IUnmangedHeap heap, CacheConfiguration config, IPersistantCacheStore? store) + { + return new BlobCacheTable(config.BucketCount, config.MaxCacheEntries, heap, store); + } + + private static IBlobCacheTable LoadCustomMemCacheTable(PluginBase plugin, string asmName, IPersistantCacheStore? store) + { + //Load the custom assembly + AssemblyLoader<IBlobCacheTable> customTable = plugin.LoadAssembly<IBlobCacheTable>(asmName); + + try + { + //Try get onload method and pass the persistant cache instance + Action<PluginBase, IPersistantCacheStore?>? onLoad = customTable.TryGetMethod<Action<PluginBase, IPersistantCacheStore?>>(LOAD_METHOD_NAME); + onLoad?.Invoke(plugin, store); + } + catch + { + customTable.Dispose(); + throw; + } + + return new RuntimeBlobCacheTable(customTable); + } + + private static PersistantCacheManager? GetPersistantStore(PluginBase plugin, IConfigScope config) + { + //Get the persistant assembly + if (!config.TryGetValue(PERSISTANT_ASM_CONFIF_KEY, out JsonElement asmEl)) + { + return null; + } + + string? asmName = asmEl.GetString(); + if (asmName == null) + { + return null; + } + + //Load the dynamic assembly into the alc + AssemblyLoader<IPersistantCacheStore> loader = plugin.LoadAssembly<IPersistantCacheStore>(asmName); + try + { + //Call the OnLoad method + Action<PluginBase, IConfigScope>? loadMethod = loader.TryGetMethod<Action<PluginBase, IConfigScope>>(LOAD_METHOD_NAME); + + loadMethod?.Invoke(plugin, config); + } + catch + { + loader.Dispose(); + throw; + } + + //Return the + return new(loader); + } + + + private sealed class RuntimeBlobCacheTable : IBlobCacheTable + { + + private readonly IBlobCacheTable _table; + private readonly Action? OnDetatch; + + public RuntimeBlobCacheTable(AssemblyLoader<IBlobCacheTable> loader) + { + OnDetatch = loader.TryGetMethod<Action>(TEARDOWN_METHOD_NAME); + _table = loader.Resource; + } + + public void Dispose() + { + //We can let the loader dispose the cache table, but we can notify of detatch + OnDetatch?.Invoke(); + } + + + ///<inheritdoc/> + [MethodImpl(MethodImplOptions.AggressiveInlining)] + IBlobCacheBucket IBlobCacheTable.GetBucket(ReadOnlySpan<char> objectId) => _table.GetBucket(objectId); + + ///<inheritdoc/> + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public IEnumerator<IBlobCacheBucket> GetEnumerator() => _table.GetEnumerator(); + + ///<inheritdoc/> + [MethodImpl(MethodImplOptions.AggressiveInlining)] + IEnumerator IEnumerable.GetEnumerator() => ((IEnumerable)_table).GetEnumerator(); + } + + internal sealed class PersistantCacheManager : IPersistantCacheStore + { + const string INITIALIZE_METHOD_NAME = "OnInitializeForBucket"; + + + /* + * Our referrence can be technically unloaded, but so will + * this instance, since its loaded into the current ALC, so + * this referrence may exist for the lifetime of this instance. + * + * It also implements IDisposable, which the assembly loader class + * will call when this plugin is unloaded, we dont need to call + * it here, but we can signal a detach. + * + * Since the store implements IDisposable, its likely going to + * check for dispose on each call, so we don't need to add + * and additional disposed check since the method calls must be fast. + */ + + private readonly IPersistantCacheStore store; + + private readonly Action<uint>? InitMethod; + private readonly Action? OnServiceDetatch; + + public PersistantCacheManager(AssemblyLoader<IPersistantCacheStore> loader) + { + //Try to get the Initialize method + InitMethod = loader.TryGetMethod<Action<uint>>(INITIALIZE_METHOD_NAME); + + //Get the optional detatch method + OnServiceDetatch = loader.TryGetMethod<Action>(TEARDOWN_METHOD_NAME); + + store = loader.Resource; + } + + /// <summary> + /// Optionally initializes the backing store by publishing the table's bucket + /// id's so it's made aware of the memory cache bucket system. + /// </summary> + /// <param name="table">The table containing buckets to publish</param> + public void InitializeSubsystem(IBlobCacheTable table) + { + //Itterate all buckets + foreach (IBlobCacheBucket bucket in table) + { + InitMethod?.Invoke(bucket.Id); + } + } + + void IDisposable.Dispose() + { + //Assembly loader will dispose the type, we can just signal a detach + + OnServiceDetatch?.Invoke(); + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + bool IPersistantCacheStore.OnCacheMiss(uint bucketId, string key, IMemoryCacheEntryFactory factory, out CacheEntry entry) + { + return store.OnCacheMiss(bucketId, key, factory, out entry); + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + void IPersistantCacheStore.OnEntryDeleted(uint bucketId, string key) => store.OnEntryDeleted(bucketId, key); + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + void IPersistantCacheStore.OnEntryEvicted(uint bucketId, string key, in CacheEntry entry) => store.OnEntryEvicted(bucketId, key, in entry); + } + } +} |