aboutsummaryrefslogtreecommitdiff
path: root/plugins/ObjectCacheServer/src/Cache
diff options
context:
space:
mode:
authorLibravatar vnugent <public@vaughnnugent.com>2024-03-10 16:46:50 -0400
committerLibravatar vnugent <public@vaughnnugent.com>2024-03-10 16:46:50 -0400
commite5bb0ee302e789cb96e7ecfe839cbbcc8e3fd5d7 (patch)
tree6f4d824eaea0f4c433f98c0685bf66c06b30e16a /plugins/ObjectCacheServer/src/Cache
parent6b87785026ca57d6f41cff87ddbd066362f3cacc (diff)
Squashed commit of the following:
commit 2f7565976472f0f056db60520bf253a776112c10 Merge: 323ff67 6b87785 Author: vnugent <public@vaughnnugent.com> Date: Sun Mar 10 16:45:23 2024 -0400 merge master commit 323ff67badfc46ad638d75f059d60d9425ccb2fa Author: vnugent <public@vaughnnugent.com> Date: Sun Mar 10 15:50:07 2024 -0400 ci(server): Conainerize and add vncache server packages commit 5d4192880654fd6e00e587814169415b42621327 Author: vnugent <public@vaughnnugent.com> Date: Sat Mar 9 19:13:21 2024 -0500 chore: #2 Minor fixes and polish before release commit a4b3504bb891829074d1efde0433eae010862181 Author: vnugent <public@vaughnnugent.com> Date: Sat Mar 9 16:30:44 2024 -0500 package updates commit 4d8cfc10382105b0acbd94df93ad3d05ff91db54 Author: vnugent <public@vaughnnugent.com> Date: Wed Mar 6 21:30:58 2024 -0500 refactor: #2 Centralize server state, default discovery endpoints & more commit 016a96a80cce025a86c6cf26707738f6a2eb2658 Author: vnugent <public@vaughnnugent.com> Date: Thu Feb 29 21:22:38 2024 -0500 feat: add future support for memory diagnostics, and some docs commit 456ead9bc8b0f61357bae93152ad0403c4940101 Author: vnugent <public@vaughnnugent.com> Date: Tue Feb 13 14:46:35 2024 -0500 fix: #1 shared cluster index on linux & latested core updates commit a481d63f964a5d5204cac2e95141f37f9a28d573 Author: vnugent <public@vaughnnugent.com> Date: Tue Jan 23 15:43:50 2024 -0500 cache extension api tweaks
Diffstat (limited to 'plugins/ObjectCacheServer/src/Cache')
-rw-r--r--plugins/ObjectCacheServer/src/Cache/CacheListenerPubQueue.cs31
-rw-r--r--plugins/ObjectCacheServer/src/Cache/CacheMemoryConfiguration.cs (renamed from plugins/ObjectCacheServer/src/Cache/CacheConfiguration.cs)15
-rw-r--r--plugins/ObjectCacheServer/src/Cache/CacheStore.cs131
-rw-r--r--plugins/ObjectCacheServer/src/Cache/CacheSystemUtil.cs8
-rw-r--r--plugins/ObjectCacheServer/src/Cache/PeerEventQueueManager.cs (renamed from plugins/ObjectCacheServer/src/Cache/CacheEventQueueManager.cs)132
5 files changed, 82 insertions, 235 deletions
diff --git a/plugins/ObjectCacheServer/src/Cache/CacheListenerPubQueue.cs b/plugins/ObjectCacheServer/src/Cache/CacheListenerPubQueue.cs
index 6942828..aef0255 100644
--- a/plugins/ObjectCacheServer/src/Cache/CacheListenerPubQueue.cs
+++ b/plugins/ObjectCacheServer/src/Cache/CacheListenerPubQueue.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2023 Vaughn Nugent
+* Copyright (c) 2024 Vaughn Nugent
*
* Library: VNLib
* Package: ObjectCacheServer
@@ -45,39 +45,33 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache
internal sealed class CacheListenerPubQueue : ICacheListenerEventQueue<IPeerEventQueue>, IAsyncBackgroundWork
{
- private const int MAX_LOCAL_QUEUE_ITEMS = 10000;
- private const string LOG_SCOPE_NAME = "QUEUE";
-
private readonly AsyncQueue<ChangeEvent> _listenerQueue;
private readonly ILogProvider _logProvider;
- private readonly ICacheEventQueueManager _queueManager;
+ private readonly PeerEventQueueManager _queueManager;
- public CacheListenerPubQueue(PluginBase plugin)
+ public CacheListenerPubQueue(PluginBase plugin, PeerEventQueueManager queueMan)
{
- _queueManager = plugin.GetOrCreateSingleton<CacheEventQueueManager>();
- _logProvider = plugin.Log.CreateScope(LOG_SCOPE_NAME);
+ _queueManager = queueMan;
+ _logProvider = plugin.Log.CreateScope(CacheConstants.LogScopes.CacheListenerPubQueue);
//Init local queue to store published events
- _listenerQueue = new(new BoundedChannelOptions(MAX_LOCAL_QUEUE_ITEMS)
+ _listenerQueue = new(new BoundedChannelOptions(CacheConstants.CacheListenerChangeQueueSize)
{
AllowSynchronousContinuations = true,
FullMode = BoundedChannelFullMode.DropOldest,
- SingleReader = true,
+ SingleReader = true, //Always a singe thread reading events
SingleWriter = false,
});
}
///<inheritdoc/>
- async Task IAsyncBackgroundWork.DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken)
+ async Task IAsyncBackgroundWork.DoWorkAsync(ILogProvider _, CancellationToken exitToken)
{
const int accumulatorSize = 64;
- //Create scope
- pluginLog = pluginLog.CreateScope(LOG_SCOPE_NAME);
-
try
{
- pluginLog.Debug("Change queue worker listening for local cache changes");
+ _logProvider.Debug("Change queue worker listening for local cache changes");
//Accumulator for events
ChangeEvent[] accumulator = new ChangeEvent[accumulatorSize];
@@ -105,15 +99,12 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache
catch (OperationCanceledException)
{
//Normal exit
- pluginLog.Debug("Change queue listener worker exited");
+ _logProvider.Debug("Change queue listener worker exited");
}
}
///<inheritdoc/>
- public bool IsEnabled([NotNullWhen(true)] IPeerEventQueue? userState)
- {
- return userState is IPeerEventQueue;
- }
+ public bool IsEnabled([NotNullWhen(true)] IPeerEventQueue? userState) => userState is not null;
///<inheritdoc/>
public void PublishEvent(ChangeEvent changeEvent)
diff --git a/plugins/ObjectCacheServer/src/Cache/CacheConfiguration.cs b/plugins/ObjectCacheServer/src/Cache/CacheMemoryConfiguration.cs
index bd15d24..c404cc5 100644
--- a/plugins/ObjectCacheServer/src/Cache/CacheConfiguration.cs
+++ b/plugins/ObjectCacheServer/src/Cache/CacheMemoryConfiguration.cs
@@ -1,12 +1,12 @@
/*
-* Copyright (c) 2023 Vaughn Nugent
+* Copyright (c) 2024 Vaughn Nugent
*
* Library: VNLib
* Package: ObjectCacheServer
-* File: CacheConfiguration.cs
+* File: CacheMemoryConfiguration.cs
*
-* CacheConfiguration.cs is part of ObjectCacheServer which is part of the larger
-* VNLib collection of libraries and utilities.
+* CacheMemoryConfiguration.cs is part of ObjectCacheServer which
+* is part of the larger VNLib collection of libraries and utilities.
*
* ObjectCacheServer is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
@@ -26,7 +26,7 @@ using System.Text.Json.Serialization;
namespace VNLib.Data.Caching.ObjectCache.Server.Cache
{
- internal sealed class CacheConfiguration
+ internal sealed class CacheMemoryConfiguration
{
[JsonPropertyName("buffer_recv_max")]
public int MaxRecvBufferSize { get; set; } = 1000 * 1024;
@@ -36,6 +36,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache
[JsonPropertyName("buffer_header_max")]
public int MaxHeaderBufferSize { get; set; } = 2 * 1024;
+
[JsonPropertyName("buffer_header_min")]
public int MinHeaderBufferSize { get; set; } = 128;
@@ -49,5 +50,9 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache
[JsonPropertyName("buckets")]
public uint BucketCount { get; set; } = 10;
+
+
+ [JsonPropertyName("memory_lib_path")]
+ public string? ExternLibPath { get; set; }
}
}
diff --git a/plugins/ObjectCacheServer/src/Cache/CacheStore.cs b/plugins/ObjectCacheServer/src/Cache/CacheStore.cs
deleted file mode 100644
index 75abe37..0000000
--- a/plugins/ObjectCacheServer/src/Cache/CacheStore.cs
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
-* Copyright (c) 2024 Vaughn Nugent
-*
-* Library: VNLib
-* Package: ObjectCacheServer
-* File: CacheStore.cs
-*
-* CacheStore.cs is part of ObjectCacheServer which is part of the larger
-* VNLib collection of libraries and utilities.
-*
-* ObjectCacheServer is free software: you can redistribute it and/or modify
-* it under the terms of the GNU Affero General Public License as
-* published by the Free Software Foundation, either version 3 of the
-* License, or (at your option) any later version.
-*
-* ObjectCacheServer is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-* GNU Affero General Public License for more details.
-*
-* You should have received a copy of the GNU Affero General Public License
-* along with this program. If not, see https://www.gnu.org/licenses/.
-*/
-
-using System;
-using System.Threading;
-using System.Threading.Tasks;
-
-using VNLib.Utils.Logging;
-using VNLib.Net.Messaging.FBM;
-using VNLib.Plugins;
-using VNLib.Plugins.Extensions.Loading;
-
-namespace VNLib.Data.Caching.ObjectCache.Server.Cache
-{
- /*
- * Implements the blob cache store, which is an abstraction around the blob cache listener.
- * This allows for publishing local events (say from other nodes) to keep caches in sync.
- */
-
- [ConfigurationName("cache")]
- internal sealed class CacheStore : ICacheStore, IDisposable
- {
- /// <summary>
- /// Gets the underlying cache listener
- /// </summary>
- public BlobCacheListener<IPeerEventQueue> Listener { get; }
-
-
- public CacheStore(PluginBase plugin, IConfigScope config)
- {
- //Init cache
- Listener = InitializeCache((ObjectCacheServerEntry)plugin, config);
- }
-
- ///<inheritdoc/>
- ValueTask ICacheStore.AddOrUpdateBlobAsync<T>(string objectId, string? alternateId, ObjectDataGet<T> bodyData, T state, CancellationToken token)
- {
- return Listener.Cache.AddOrUpdateObjectAsync(objectId, alternateId, bodyData, state, default, token);
- }
-
- ///<inheritdoc/>
- void ICacheStore.Clear()
- {
- throw new NotImplementedException();
- }
-
- ///<inheritdoc/>
- ValueTask<bool> ICacheStore.DeleteItemAsync(string id, CancellationToken token)
- {
- return Listener.Cache.DeleteObjectAsync(id, token);
- }
-
- private static BlobCacheListener<IPeerEventQueue> InitializeCache(ObjectCacheServerEntry plugin, IConfigScope config)
- {
- const string CacheConfigTemplate =
-@"
-Cache Configuration:
- Max memory: {max} Mb
- Buckets: {bc}
- Entries per-bucket: {mc}
-";
-
- //Deserialize the cache config
- CacheConfiguration cacheConf = config.Deserialze<CacheConfiguration>();
-
- if (cacheConf.MaxCacheEntries < 2)
- {
- throw new ArgumentException("You must configure a 'max_cache' size larger than 1 item");
- }
-
- //Suggestion
- if (cacheConf.MaxCacheEntries < 200)
- {
- plugin.Log.Information("Suggestion: You may want a larger cache size, you have less than 200 items in cache");
- }
-
- //calculate the max memory usage
- ulong maxByteSize = cacheConf.MaxCacheEntries * (ulong)cacheConf.BucketCount * (ulong)cacheConf.MaxMessageSize;
-
- //Log the cache config
- plugin.Log.Information(CacheConfigTemplate,
- maxByteSize / (1024 * 1000),
- cacheConf.BucketCount,
- cacheConf.MaxCacheEntries
- );
-
- //Get the event listener
- ICacheListenerEventQueue<IPeerEventQueue> queue = plugin.GetOrCreateSingleton<CacheListenerPubQueue>();
-
- //Get the memory manager
- ICacheMemoryManagerFactory manager = plugin.GetOrCreateSingleton<BucketLocalManagerFactory>();
-
- //Load the blob cache table system
- IBlobCacheTable bc = plugin.LoadMemoryCacheSystem(config, manager, cacheConf);
-
- FallbackFBMMemoryManager fbmMemManager = new(plugin.ListenerHeap);
-
- //Endpoint only allows for a single reader
- return new(bc, queue, plugin.Log, fbmMemManager);
- }
-
- /*
- * Cleaned up by the plugin on exit
- */
- public void Dispose()
- {
- Listener.Dispose();
- }
- }
-}
diff --git a/plugins/ObjectCacheServer/src/Cache/CacheSystemUtil.cs b/plugins/ObjectCacheServer/src/Cache/CacheSystemUtil.cs
index b7bf83f..8f196b0 100644
--- a/plugins/ObjectCacheServer/src/Cache/CacheSystemUtil.cs
+++ b/plugins/ObjectCacheServer/src/Cache/CacheSystemUtil.cs
@@ -29,6 +29,7 @@ using System.Text.Json;
using VNLib.Utils.Resources;
using VNLib.Plugins;
using VNLib.Plugins.Extensions.Loading;
+using VNLib.Utils.Extensions;
namespace VNLib.Data.Caching.ObjectCache.Server.Cache
{
@@ -49,7 +50,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache
/// <param name="cacheConf">The cache configuration object</param>
/// <returns>The loaded <see cref="IBlobCacheTable"/> implementation</returns>
/// <exception cref="FileNotFoundException"></exception>
- public static IBlobCacheTable LoadMemoryCacheSystem(this PluginBase plugin, IConfigScope config, ICacheMemoryManagerFactory heap, CacheConfiguration cacheConf)
+ public static IBlobCacheTable LoadMemoryCacheSystem(this PluginBase plugin, IConfigScope config, ICacheMemoryManagerFactory heap, CacheMemoryConfiguration cacheConf)
{
#pragma warning disable CA2000 // Dispose objects before losing scope
@@ -94,10 +95,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache
if(initMethod != null)
{
//Itterate all buckets
- foreach (IBlobCacheBucket bucket in table)
- {
- initMethod.Invoke(bucket.Id);
- }
+ table.ForEach(bucket => initMethod(bucket.Id));
}
}
diff --git a/plugins/ObjectCacheServer/src/Cache/CacheEventQueueManager.cs b/plugins/ObjectCacheServer/src/Cache/PeerEventQueueManager.cs
index e3c613d..4b76a9b 100644
--- a/plugins/ObjectCacheServer/src/Cache/CacheEventQueueManager.cs
+++ b/plugins/ObjectCacheServer/src/Cache/PeerEventQueueManager.cs
@@ -1,11 +1,11 @@
/*
-* Copyright (c) 2023 Vaughn Nugent
+* Copyright (c) 2024 Vaughn Nugent
*
* Library: VNLib
* Package: ObjectCacheServer
-* File: CacheEventQueueManager.cs
+* File: PeerEventQueueManager.cs
*
-* CacheEventQueueManager.cs is part of ObjectCacheServer which is
+* PeerEventQueueManager.cs is part of ObjectCacheServer which is
* part of the larger VNLib collection of libraries and utilities.
*
* ObjectCacheServer is free software: you can redistribute it and/or modify
@@ -38,41 +38,37 @@ using VNLib.Plugins.Extensions.Loading.Events;
namespace VNLib.Data.Caching.ObjectCache.Server.Cache
{
- internal sealed class CacheEventQueueManager : ICacheEventQueueManager, IDisposable, IIntervalScheduleable
+ internal sealed class PeerEventQueueManager : ICacheEventQueueManager, IIntervalScheduleable
{
private readonly int MaxQueueDepth;
- private readonly object SubLock;
- private readonly LinkedList<NodeQueue> Subscribers;
+ private readonly object SubLock = new();
+ private readonly LinkedList<PeerEventListenerQueue> Subscribers = [];
- private readonly object StoreLock;
- private readonly Dictionary<string, NodeQueue> QueueStore;
+ private readonly object StoreLock = new();
+ private readonly Dictionary<string, PeerEventListenerQueue> QueueStore = new(StringComparer.OrdinalIgnoreCase);
-
- public CacheEventQueueManager(PluginBase plugin)
+ public PeerEventQueueManager(PluginBase plugin, ServerClusterConfig config)
{
- //Get node config
- NodeConfig config = plugin.GetOrCreateSingleton<NodeConfig>();
-
- //Get max queue depth
MaxQueueDepth = config.MaxQueueDepth;
/*
- * Schedule purge interval to clean up stale queues
- */
+ * Schedule purge interval to clean up stale queues
+ */
plugin.ScheduleInterval(this, config.EventQueuePurgeInterval);
-
- SubLock = new();
- Subscribers = new();
-
- StoreLock = new();
- QueueStore = new(StringComparer.OrdinalIgnoreCase);
+
+ //Cleanup disposeables on unload
+ _ = plugin.RegisterForUnload(() =>
+ {
+ QueueStore.Clear();
+ Subscribers.Clear();
+ });
}
///<inheritdoc/>
public IPeerEventQueue Subscribe(ICachePeer peer)
{
- NodeQueue? nq;
+ PeerEventListenerQueue? nq;
bool isNew = false;
@@ -82,13 +78,13 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache
//Try to recover the queue for the node
if (!QueueStore.TryGetValue(peer.NodeId, out nq))
{
- //Create new queue
+ //Create new queue since an existing queue was not found
nq = new(peer.NodeId, MaxQueueDepth);
QueueStore.Add(peer.NodeId, nq);
isNew = true;
}
- //Increment listener count
+ //Increment listener count since a new listener has attached
nq.Listeners++;
}
@@ -109,11 +105,20 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache
///<inheritdoc/>
public void Unsubscribe(ICachePeer peer)
{
+ /*
+ * The reason I am not purging queues that no longer have listeners
+ * now is because it is possible that a listener needed to detach because of
+ * a network issue and will be reconnecting shortly. If the node doesnt
+ * come back before the next purge interval, it's events will be purged.
+ *
+ * Point is: there is a reason for the garbage collection style purging
+ */
+
//Detach a listener for a node
lock (StoreLock)
{
//Get the queue and decrement the listener count
- NodeQueue nq = QueueStore[peer.NodeId];
+ PeerEventListenerQueue nq = QueueStore[peer.NodeId];
nq.Listeners--;
}
}
@@ -125,7 +130,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache
lock (SubLock)
{
//Loop through ll the fast way
- LinkedListNode<NodeQueue>? q = Subscribers.First;
+ LinkedListNode<PeerEventListenerQueue>? q = Subscribers.First;
while (q != null)
{
@@ -145,7 +150,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache
lock (SubLock)
{
//Loop through ll the fast way
- LinkedListNode<NodeQueue>? q = Subscribers.First;
+ LinkedListNode<PeerEventListenerQueue>? q = Subscribers.First;
while (q != null)
{
@@ -167,9 +172,9 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache
lock (StoreLock)
{
//Get all stale queues (queues without listeners)
- NodeQueue[] staleQueues = QueueStore.Values.Where(static nq => nq.Listeners == 0).ToArray();
+ PeerEventListenerQueue[] staleQueues = QueueStore.Values.Where(static nq => nq.Listeners == 0).ToArray();
- foreach (NodeQueue nq in staleQueues)
+ foreach (PeerEventListenerQueue nq in staleQueues)
{
//Remove from store
QueueStore.Remove(nq.NodeId);
@@ -191,54 +196,39 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache
return Task.CompletedTask;
}
- void IDisposable.Dispose()
- {
- QueueStore.Clear();
- Subscribers.Clear();
- }
/*
* Holds queues for each node and keeps track of the number of listeners
* attached to the queue
+ *
+ * The role of this class is to store change events for a given peer node,
+ * and return them when the peer requests them. It also keeps track of the
+ * number of active listeners (server connections) to the queue.
*/
- private sealed class NodeQueue : IPeerEventQueue
+ private sealed class PeerEventListenerQueue(string nodeId, int maxDepth) : IPeerEventQueue
{
public int Listeners;
- public string NodeId { get; }
-
- public AsyncQueue<ChangeEvent> Queue { get; }
+ public string NodeId => nodeId;
- public NodeQueue(string nodeId, int maxDepth)
+ /*
+ * Create a bounded channel that acts as a lru and evicts
+ * the oldest item when the queue is full
+ *
+ * There will also only ever be a single thread writing events
+ * to the queue
+ */
+ private readonly AsyncQueue<ChangeEvent> Queue = new(new BoundedChannelOptions(maxDepth)
{
- NodeId = nodeId;
-
- /*
- * Create a bounded channel that acts as a lru and evicts
- * the oldest item when the queue is full
- *
- * There will also only ever be a single thread writing events
- * to the queue
- */
-
- BoundedChannelOptions queueOptions = new(maxDepth)
- {
- AllowSynchronousContinuations = true,
- SingleReader = false,
- SingleWriter = true,
- //Drop oldest item in queue if full
- FullMode = BoundedChannelFullMode.DropOldest,
- };
-
- //Init queue/channel
- Queue = new(queueOptions);
- }
+ AllowSynchronousContinuations = true,
+ SingleReader = false,
+ SingleWriter = true,
+ //Drop oldest item in queue if full
+ FullMode = BoundedChannelFullMode.DropOldest,
+ });
- public void PublishChange(ChangeEvent change)
- {
- Queue.TryEnque(change);
- }
+ public void PublishChange(ChangeEvent change) => Queue.TryEnque(change);
public void PublishChanges(Span<ChangeEvent> changes)
{
@@ -249,16 +239,10 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache
}
///<inheritdoc/>
- public ValueTask<ChangeEvent> DequeueAsync(CancellationToken cancellation)
- {
- return Queue.DequeueAsync(cancellation);
- }
+ public ValueTask<ChangeEvent> DequeueAsync(CancellationToken cancellation) => Queue.DequeueAsync(cancellation);
///<inheritdoc/>
- public bool TryDequeue(out ChangeEvent change)
- {
- return Queue.TryDequeue(out change);
- }
+ public bool TryDequeue(out ChangeEvent change) => Queue.TryDequeue(out change);
}
}
}