diff options
Diffstat (limited to 'plugins/ObjectCacheServer/src/Cache')
-rw-r--r-- | plugins/ObjectCacheServer/src/Cache/CacheListenerPubQueue.cs | 31 | ||||
-rw-r--r-- | plugins/ObjectCacheServer/src/Cache/CacheMemoryConfiguration.cs (renamed from plugins/ObjectCacheServer/src/Cache/CacheConfiguration.cs) | 15 | ||||
-rw-r--r-- | plugins/ObjectCacheServer/src/Cache/CacheStore.cs | 131 | ||||
-rw-r--r-- | plugins/ObjectCacheServer/src/Cache/CacheSystemUtil.cs | 8 | ||||
-rw-r--r-- | plugins/ObjectCacheServer/src/Cache/PeerEventQueueManager.cs (renamed from plugins/ObjectCacheServer/src/Cache/CacheEventQueueManager.cs) | 132 |
5 files changed, 82 insertions, 235 deletions
diff --git a/plugins/ObjectCacheServer/src/Cache/CacheListenerPubQueue.cs b/plugins/ObjectCacheServer/src/Cache/CacheListenerPubQueue.cs index 6942828..aef0255 100644 --- a/plugins/ObjectCacheServer/src/Cache/CacheListenerPubQueue.cs +++ b/plugins/ObjectCacheServer/src/Cache/CacheListenerPubQueue.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2023 Vaughn Nugent +* Copyright (c) 2024 Vaughn Nugent * * Library: VNLib * Package: ObjectCacheServer @@ -45,39 +45,33 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache internal sealed class CacheListenerPubQueue : ICacheListenerEventQueue<IPeerEventQueue>, IAsyncBackgroundWork { - private const int MAX_LOCAL_QUEUE_ITEMS = 10000; - private const string LOG_SCOPE_NAME = "QUEUE"; - private readonly AsyncQueue<ChangeEvent> _listenerQueue; private readonly ILogProvider _logProvider; - private readonly ICacheEventQueueManager _queueManager; + private readonly PeerEventQueueManager _queueManager; - public CacheListenerPubQueue(PluginBase plugin) + public CacheListenerPubQueue(PluginBase plugin, PeerEventQueueManager queueMan) { - _queueManager = plugin.GetOrCreateSingleton<CacheEventQueueManager>(); - _logProvider = plugin.Log.CreateScope(LOG_SCOPE_NAME); + _queueManager = queueMan; + _logProvider = plugin.Log.CreateScope(CacheConstants.LogScopes.CacheListenerPubQueue); //Init local queue to store published events - _listenerQueue = new(new BoundedChannelOptions(MAX_LOCAL_QUEUE_ITEMS) + _listenerQueue = new(new BoundedChannelOptions(CacheConstants.CacheListenerChangeQueueSize) { AllowSynchronousContinuations = true, FullMode = BoundedChannelFullMode.DropOldest, - SingleReader = true, + SingleReader = true, //Always a singe thread reading events SingleWriter = false, }); } ///<inheritdoc/> - async Task IAsyncBackgroundWork.DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken) + async Task IAsyncBackgroundWork.DoWorkAsync(ILogProvider _, CancellationToken exitToken) { const int accumulatorSize = 64; - //Create scope - pluginLog = pluginLog.CreateScope(LOG_SCOPE_NAME); - try { - pluginLog.Debug("Change queue worker listening for local cache changes"); + _logProvider.Debug("Change queue worker listening for local cache changes"); //Accumulator for events ChangeEvent[] accumulator = new ChangeEvent[accumulatorSize]; @@ -105,15 +99,12 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache catch (OperationCanceledException) { //Normal exit - pluginLog.Debug("Change queue listener worker exited"); + _logProvider.Debug("Change queue listener worker exited"); } } ///<inheritdoc/> - public bool IsEnabled([NotNullWhen(true)] IPeerEventQueue? userState) - { - return userState is IPeerEventQueue; - } + public bool IsEnabled([NotNullWhen(true)] IPeerEventQueue? userState) => userState is not null; ///<inheritdoc/> public void PublishEvent(ChangeEvent changeEvent) diff --git a/plugins/ObjectCacheServer/src/Cache/CacheConfiguration.cs b/plugins/ObjectCacheServer/src/Cache/CacheMemoryConfiguration.cs index bd15d24..c404cc5 100644 --- a/plugins/ObjectCacheServer/src/Cache/CacheConfiguration.cs +++ b/plugins/ObjectCacheServer/src/Cache/CacheMemoryConfiguration.cs @@ -1,12 +1,12 @@ /* -* Copyright (c) 2023 Vaughn Nugent +* Copyright (c) 2024 Vaughn Nugent * * Library: VNLib * Package: ObjectCacheServer -* File: CacheConfiguration.cs +* File: CacheMemoryConfiguration.cs * -* CacheConfiguration.cs is part of ObjectCacheServer which is part of the larger -* VNLib collection of libraries and utilities. +* CacheMemoryConfiguration.cs is part of ObjectCacheServer which +* is part of the larger VNLib collection of libraries and utilities. * * ObjectCacheServer is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License as @@ -26,7 +26,7 @@ using System.Text.Json.Serialization; namespace VNLib.Data.Caching.ObjectCache.Server.Cache { - internal sealed class CacheConfiguration + internal sealed class CacheMemoryConfiguration { [JsonPropertyName("buffer_recv_max")] public int MaxRecvBufferSize { get; set; } = 1000 * 1024; @@ -36,6 +36,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache [JsonPropertyName("buffer_header_max")] public int MaxHeaderBufferSize { get; set; } = 2 * 1024; + [JsonPropertyName("buffer_header_min")] public int MinHeaderBufferSize { get; set; } = 128; @@ -49,5 +50,9 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache [JsonPropertyName("buckets")] public uint BucketCount { get; set; } = 10; + + + [JsonPropertyName("memory_lib_path")] + public string? ExternLibPath { get; set; } } } diff --git a/plugins/ObjectCacheServer/src/Cache/CacheStore.cs b/plugins/ObjectCacheServer/src/Cache/CacheStore.cs deleted file mode 100644 index 75abe37..0000000 --- a/plugins/ObjectCacheServer/src/Cache/CacheStore.cs +++ /dev/null @@ -1,131 +0,0 @@ -/* -* Copyright (c) 2024 Vaughn Nugent -* -* Library: VNLib -* Package: ObjectCacheServer -* File: CacheStore.cs -* -* CacheStore.cs is part of ObjectCacheServer which is part of the larger -* VNLib collection of libraries and utilities. -* -* ObjectCacheServer is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License as -* published by the Free Software Foundation, either version 3 of the -* License, or (at your option) any later version. -* -* ObjectCacheServer is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see https://www.gnu.org/licenses/. -*/ - -using System; -using System.Threading; -using System.Threading.Tasks; - -using VNLib.Utils.Logging; -using VNLib.Net.Messaging.FBM; -using VNLib.Plugins; -using VNLib.Plugins.Extensions.Loading; - -namespace VNLib.Data.Caching.ObjectCache.Server.Cache -{ - /* - * Implements the blob cache store, which is an abstraction around the blob cache listener. - * This allows for publishing local events (say from other nodes) to keep caches in sync. - */ - - [ConfigurationName("cache")] - internal sealed class CacheStore : ICacheStore, IDisposable - { - /// <summary> - /// Gets the underlying cache listener - /// </summary> - public BlobCacheListener<IPeerEventQueue> Listener { get; } - - - public CacheStore(PluginBase plugin, IConfigScope config) - { - //Init cache - Listener = InitializeCache((ObjectCacheServerEntry)plugin, config); - } - - ///<inheritdoc/> - ValueTask ICacheStore.AddOrUpdateBlobAsync<T>(string objectId, string? alternateId, ObjectDataGet<T> bodyData, T state, CancellationToken token) - { - return Listener.Cache.AddOrUpdateObjectAsync(objectId, alternateId, bodyData, state, default, token); - } - - ///<inheritdoc/> - void ICacheStore.Clear() - { - throw new NotImplementedException(); - } - - ///<inheritdoc/> - ValueTask<bool> ICacheStore.DeleteItemAsync(string id, CancellationToken token) - { - return Listener.Cache.DeleteObjectAsync(id, token); - } - - private static BlobCacheListener<IPeerEventQueue> InitializeCache(ObjectCacheServerEntry plugin, IConfigScope config) - { - const string CacheConfigTemplate = -@" -Cache Configuration: - Max memory: {max} Mb - Buckets: {bc} - Entries per-bucket: {mc} -"; - - //Deserialize the cache config - CacheConfiguration cacheConf = config.Deserialze<CacheConfiguration>(); - - if (cacheConf.MaxCacheEntries < 2) - { - throw new ArgumentException("You must configure a 'max_cache' size larger than 1 item"); - } - - //Suggestion - if (cacheConf.MaxCacheEntries < 200) - { - plugin.Log.Information("Suggestion: You may want a larger cache size, you have less than 200 items in cache"); - } - - //calculate the max memory usage - ulong maxByteSize = cacheConf.MaxCacheEntries * (ulong)cacheConf.BucketCount * (ulong)cacheConf.MaxMessageSize; - - //Log the cache config - plugin.Log.Information(CacheConfigTemplate, - maxByteSize / (1024 * 1000), - cacheConf.BucketCount, - cacheConf.MaxCacheEntries - ); - - //Get the event listener - ICacheListenerEventQueue<IPeerEventQueue> queue = plugin.GetOrCreateSingleton<CacheListenerPubQueue>(); - - //Get the memory manager - ICacheMemoryManagerFactory manager = plugin.GetOrCreateSingleton<BucketLocalManagerFactory>(); - - //Load the blob cache table system - IBlobCacheTable bc = plugin.LoadMemoryCacheSystem(config, manager, cacheConf); - - FallbackFBMMemoryManager fbmMemManager = new(plugin.ListenerHeap); - - //Endpoint only allows for a single reader - return new(bc, queue, plugin.Log, fbmMemManager); - } - - /* - * Cleaned up by the plugin on exit - */ - public void Dispose() - { - Listener.Dispose(); - } - } -} diff --git a/plugins/ObjectCacheServer/src/Cache/CacheSystemUtil.cs b/plugins/ObjectCacheServer/src/Cache/CacheSystemUtil.cs index b7bf83f..8f196b0 100644 --- a/plugins/ObjectCacheServer/src/Cache/CacheSystemUtil.cs +++ b/plugins/ObjectCacheServer/src/Cache/CacheSystemUtil.cs @@ -29,6 +29,7 @@ using System.Text.Json; using VNLib.Utils.Resources; using VNLib.Plugins; using VNLib.Plugins.Extensions.Loading; +using VNLib.Utils.Extensions; namespace VNLib.Data.Caching.ObjectCache.Server.Cache { @@ -49,7 +50,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache /// <param name="cacheConf">The cache configuration object</param> /// <returns>The loaded <see cref="IBlobCacheTable"/> implementation</returns> /// <exception cref="FileNotFoundException"></exception> - public static IBlobCacheTable LoadMemoryCacheSystem(this PluginBase plugin, IConfigScope config, ICacheMemoryManagerFactory heap, CacheConfiguration cacheConf) + public static IBlobCacheTable LoadMemoryCacheSystem(this PluginBase plugin, IConfigScope config, ICacheMemoryManagerFactory heap, CacheMemoryConfiguration cacheConf) { #pragma warning disable CA2000 // Dispose objects before losing scope @@ -94,10 +95,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache if(initMethod != null) { //Itterate all buckets - foreach (IBlobCacheBucket bucket in table) - { - initMethod.Invoke(bucket.Id); - } + table.ForEach(bucket => initMethod(bucket.Id)); } } diff --git a/plugins/ObjectCacheServer/src/Cache/CacheEventQueueManager.cs b/plugins/ObjectCacheServer/src/Cache/PeerEventQueueManager.cs index e3c613d..4b76a9b 100644 --- a/plugins/ObjectCacheServer/src/Cache/CacheEventQueueManager.cs +++ b/plugins/ObjectCacheServer/src/Cache/PeerEventQueueManager.cs @@ -1,11 +1,11 @@ /* -* Copyright (c) 2023 Vaughn Nugent +* Copyright (c) 2024 Vaughn Nugent * * Library: VNLib * Package: ObjectCacheServer -* File: CacheEventQueueManager.cs +* File: PeerEventQueueManager.cs * -* CacheEventQueueManager.cs is part of ObjectCacheServer which is +* PeerEventQueueManager.cs is part of ObjectCacheServer which is * part of the larger VNLib collection of libraries and utilities. * * ObjectCacheServer is free software: you can redistribute it and/or modify @@ -38,41 +38,37 @@ using VNLib.Plugins.Extensions.Loading.Events; namespace VNLib.Data.Caching.ObjectCache.Server.Cache { - internal sealed class CacheEventQueueManager : ICacheEventQueueManager, IDisposable, IIntervalScheduleable + internal sealed class PeerEventQueueManager : ICacheEventQueueManager, IIntervalScheduleable { private readonly int MaxQueueDepth; - private readonly object SubLock; - private readonly LinkedList<NodeQueue> Subscribers; + private readonly object SubLock = new(); + private readonly LinkedList<PeerEventListenerQueue> Subscribers = []; - private readonly object StoreLock; - private readonly Dictionary<string, NodeQueue> QueueStore; + private readonly object StoreLock = new(); + private readonly Dictionary<string, PeerEventListenerQueue> QueueStore = new(StringComparer.OrdinalIgnoreCase); - - public CacheEventQueueManager(PluginBase plugin) + public PeerEventQueueManager(PluginBase plugin, ServerClusterConfig config) { - //Get node config - NodeConfig config = plugin.GetOrCreateSingleton<NodeConfig>(); - - //Get max queue depth MaxQueueDepth = config.MaxQueueDepth; /* - * Schedule purge interval to clean up stale queues - */ + * Schedule purge interval to clean up stale queues + */ plugin.ScheduleInterval(this, config.EventQueuePurgeInterval); - - SubLock = new(); - Subscribers = new(); - - StoreLock = new(); - QueueStore = new(StringComparer.OrdinalIgnoreCase); + + //Cleanup disposeables on unload + _ = plugin.RegisterForUnload(() => + { + QueueStore.Clear(); + Subscribers.Clear(); + }); } ///<inheritdoc/> public IPeerEventQueue Subscribe(ICachePeer peer) { - NodeQueue? nq; + PeerEventListenerQueue? nq; bool isNew = false; @@ -82,13 +78,13 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache //Try to recover the queue for the node if (!QueueStore.TryGetValue(peer.NodeId, out nq)) { - //Create new queue + //Create new queue since an existing queue was not found nq = new(peer.NodeId, MaxQueueDepth); QueueStore.Add(peer.NodeId, nq); isNew = true; } - //Increment listener count + //Increment listener count since a new listener has attached nq.Listeners++; } @@ -109,11 +105,20 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache ///<inheritdoc/> public void Unsubscribe(ICachePeer peer) { + /* + * The reason I am not purging queues that no longer have listeners + * now is because it is possible that a listener needed to detach because of + * a network issue and will be reconnecting shortly. If the node doesnt + * come back before the next purge interval, it's events will be purged. + * + * Point is: there is a reason for the garbage collection style purging + */ + //Detach a listener for a node lock (StoreLock) { //Get the queue and decrement the listener count - NodeQueue nq = QueueStore[peer.NodeId]; + PeerEventListenerQueue nq = QueueStore[peer.NodeId]; nq.Listeners--; } } @@ -125,7 +130,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache lock (SubLock) { //Loop through ll the fast way - LinkedListNode<NodeQueue>? q = Subscribers.First; + LinkedListNode<PeerEventListenerQueue>? q = Subscribers.First; while (q != null) { @@ -145,7 +150,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache lock (SubLock) { //Loop through ll the fast way - LinkedListNode<NodeQueue>? q = Subscribers.First; + LinkedListNode<PeerEventListenerQueue>? q = Subscribers.First; while (q != null) { @@ -167,9 +172,9 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache lock (StoreLock) { //Get all stale queues (queues without listeners) - NodeQueue[] staleQueues = QueueStore.Values.Where(static nq => nq.Listeners == 0).ToArray(); + PeerEventListenerQueue[] staleQueues = QueueStore.Values.Where(static nq => nq.Listeners == 0).ToArray(); - foreach (NodeQueue nq in staleQueues) + foreach (PeerEventListenerQueue nq in staleQueues) { //Remove from store QueueStore.Remove(nq.NodeId); @@ -191,54 +196,39 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache return Task.CompletedTask; } - void IDisposable.Dispose() - { - QueueStore.Clear(); - Subscribers.Clear(); - } /* * Holds queues for each node and keeps track of the number of listeners * attached to the queue + * + * The role of this class is to store change events for a given peer node, + * and return them when the peer requests them. It also keeps track of the + * number of active listeners (server connections) to the queue. */ - private sealed class NodeQueue : IPeerEventQueue + private sealed class PeerEventListenerQueue(string nodeId, int maxDepth) : IPeerEventQueue { public int Listeners; - public string NodeId { get; } - - public AsyncQueue<ChangeEvent> Queue { get; } + public string NodeId => nodeId; - public NodeQueue(string nodeId, int maxDepth) + /* + * Create a bounded channel that acts as a lru and evicts + * the oldest item when the queue is full + * + * There will also only ever be a single thread writing events + * to the queue + */ + private readonly AsyncQueue<ChangeEvent> Queue = new(new BoundedChannelOptions(maxDepth) { - NodeId = nodeId; - - /* - * Create a bounded channel that acts as a lru and evicts - * the oldest item when the queue is full - * - * There will also only ever be a single thread writing events - * to the queue - */ - - BoundedChannelOptions queueOptions = new(maxDepth) - { - AllowSynchronousContinuations = true, - SingleReader = false, - SingleWriter = true, - //Drop oldest item in queue if full - FullMode = BoundedChannelFullMode.DropOldest, - }; - - //Init queue/channel - Queue = new(queueOptions); - } + AllowSynchronousContinuations = true, + SingleReader = false, + SingleWriter = true, + //Drop oldest item in queue if full + FullMode = BoundedChannelFullMode.DropOldest, + }); - public void PublishChange(ChangeEvent change) - { - Queue.TryEnque(change); - } + public void PublishChange(ChangeEvent change) => Queue.TryEnque(change); public void PublishChanges(Span<ChangeEvent> changes) { @@ -249,16 +239,10 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache } ///<inheritdoc/> - public ValueTask<ChangeEvent> DequeueAsync(CancellationToken cancellation) - { - return Queue.DequeueAsync(cancellation); - } + public ValueTask<ChangeEvent> DequeueAsync(CancellationToken cancellation) => Queue.DequeueAsync(cancellation); ///<inheritdoc/> - public bool TryDequeue(out ChangeEvent change) - { - return Queue.TryDequeue(out change); - } + public bool TryDequeue(out ChangeEvent change) => Queue.TryDequeue(out change); } } } |