aboutsummaryrefslogtreecommitdiff
path: root/plugins/ObjectCacheServer/src/Cache
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/ObjectCacheServer/src/Cache')
-rw-r--r--plugins/ObjectCacheServer/src/Cache/CacheListenerPubQueue.cs22
-rw-r--r--plugins/ObjectCacheServer/src/Cache/CacheStore.cs61
-rw-r--r--plugins/ObjectCacheServer/src/Cache/PeerEventQueueManager.cs2
3 files changed, 9 insertions, 76 deletions
diff --git a/plugins/ObjectCacheServer/src/Cache/CacheListenerPubQueue.cs b/plugins/ObjectCacheServer/src/Cache/CacheListenerPubQueue.cs
index 16fda39..aef0255 100644
--- a/plugins/ObjectCacheServer/src/Cache/CacheListenerPubQueue.cs
+++ b/plugins/ObjectCacheServer/src/Cache/CacheListenerPubQueue.cs
@@ -45,39 +45,33 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache
internal sealed class CacheListenerPubQueue : ICacheListenerEventQueue<IPeerEventQueue>, IAsyncBackgroundWork
{
- private const int MAX_LOCAL_QUEUE_ITEMS = 10000;
- private const string LOG_SCOPE_NAME = "QUEUE";
-
private readonly AsyncQueue<ChangeEvent> _listenerQueue;
private readonly ILogProvider _logProvider;
private readonly PeerEventQueueManager _queueManager;
- public CacheListenerPubQueue(PluginBase plugin)
+ public CacheListenerPubQueue(PluginBase plugin, PeerEventQueueManager queueMan)
{
- _queueManager = plugin.GetOrCreateSingleton<PeerEventQueueManager>();
- _logProvider = plugin.Log.CreateScope(LOG_SCOPE_NAME);
+ _queueManager = queueMan;
+ _logProvider = plugin.Log.CreateScope(CacheConstants.LogScopes.CacheListenerPubQueue);
//Init local queue to store published events
- _listenerQueue = new(new BoundedChannelOptions(MAX_LOCAL_QUEUE_ITEMS)
+ _listenerQueue = new(new BoundedChannelOptions(CacheConstants.CacheListenerChangeQueueSize)
{
AllowSynchronousContinuations = true,
FullMode = BoundedChannelFullMode.DropOldest,
- SingleReader = true,
+ SingleReader = true, //Always a singe thread reading events
SingleWriter = false,
});
}
///<inheritdoc/>
- async Task IAsyncBackgroundWork.DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken)
+ async Task IAsyncBackgroundWork.DoWorkAsync(ILogProvider _, CancellationToken exitToken)
{
const int accumulatorSize = 64;
- //Create scope
- pluginLog = pluginLog.CreateScope(LOG_SCOPE_NAME);
-
try
{
- pluginLog.Debug("Change queue worker listening for local cache changes");
+ _logProvider.Debug("Change queue worker listening for local cache changes");
//Accumulator for events
ChangeEvent[] accumulator = new ChangeEvent[accumulatorSize];
@@ -105,7 +99,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache
catch (OperationCanceledException)
{
//Normal exit
- pluginLog.Debug("Change queue listener worker exited");
+ _logProvider.Debug("Change queue listener worker exited");
}
}
diff --git a/plugins/ObjectCacheServer/src/Cache/CacheStore.cs b/plugins/ObjectCacheServer/src/Cache/CacheStore.cs
deleted file mode 100644
index 81f4843..0000000
--- a/plugins/ObjectCacheServer/src/Cache/CacheStore.cs
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
-* Copyright (c) 2024 Vaughn Nugent
-*
-* Library: VNLib
-* Package: ObjectCacheServer
-* File: CacheStore.cs
-*
-* CacheStore.cs is part of ObjectCacheServer which is part of the larger
-* VNLib collection of libraries and utilities.
-*
-* ObjectCacheServer is free software: you can redistribute it and/or modify
-* it under the terms of the GNU Affero General Public License as
-* published by the Free Software Foundation, either version 3 of the
-* License, or (at your option) any later version.
-*
-* ObjectCacheServer is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-* GNU Affero General Public License for more details.
-*
-* You should have received a copy of the GNU Affero General Public License
-* along with this program. If not, see https://www.gnu.org/licenses/.
-*/
-
-using System;
-using System.Threading;
-using System.Threading.Tasks;
-
-using VNLib.Plugins.Extensions.Loading;
-
-namespace VNLib.Data.Caching.ObjectCache.Server.Cache
-{
-
- /*
- * Implements the blob cache store, which is an abstraction around the blob cache listener.
- * This allows for publishing local events (say from other nodes) to keep caches in sync.
- */
-
- [ConfigurationName("cache")]
- internal sealed class CacheStore(IBlobCacheTable table) : ICacheStore
- {
-
- ///<inheritdoc/>
- ValueTask ICacheStore.AddOrUpdateBlobAsync<T>(string objectId, string? alternateId, ObjectDataGet<T> bodyData, T state, CancellationToken token)
- {
- return table.AddOrUpdateObjectAsync(objectId, alternateId, bodyData, state, default, token);
- }
-
- ///<inheritdoc/>
- void ICacheStore.Clear()
- {
- throw new NotImplementedException();
- }
-
- ///<inheritdoc/>
- ValueTask<bool> ICacheStore.DeleteItemAsync(string id, CancellationToken token)
- {
- return table.DeleteObjectAsync(id, token);
- }
- }
-}
diff --git a/plugins/ObjectCacheServer/src/Cache/PeerEventQueueManager.cs b/plugins/ObjectCacheServer/src/Cache/PeerEventQueueManager.cs
index 12cf37a..4b76a9b 100644
--- a/plugins/ObjectCacheServer/src/Cache/PeerEventQueueManager.cs
+++ b/plugins/ObjectCacheServer/src/Cache/PeerEventQueueManager.cs
@@ -48,7 +48,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache
private readonly object StoreLock = new();
private readonly Dictionary<string, PeerEventListenerQueue> QueueStore = new(StringComparer.OrdinalIgnoreCase);
- public PeerEventQueueManager(PluginBase plugin, NodeConfig config)
+ public PeerEventQueueManager(PluginBase plugin, ServerClusterConfig config)
{
MaxQueueDepth = config.MaxQueueDepth;