aboutsummaryrefslogtreecommitdiff
path: root/plugins/ObjectCacheServer/src
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/ObjectCacheServer/src')
-rw-r--r--plugins/ObjectCacheServer/src/Cache/CacheConfiguration.cs (renamed from plugins/ObjectCacheServer/src/CacheConfiguration.cs)4
-rw-r--r--plugins/ObjectCacheServer/src/Cache/CacheEventQueueManager.cs (renamed from plugins/ObjectCacheServer/src/CacheEventQueueManager.cs)52
-rw-r--r--plugins/ObjectCacheServer/src/Cache/CacheListenerPubQueue.cs (renamed from plugins/ObjectCacheServer/src/CacheListenerPubQueue.cs)31
-rw-r--r--plugins/ObjectCacheServer/src/Cache/CacheStore.cs (renamed from plugins/ObjectCacheServer/src/CacheStore.cs)24
-rw-r--r--plugins/ObjectCacheServer/src/Cache/CacheSystemUtil.cs (renamed from plugins/ObjectCacheServer/src/CacheSystemUtil.cs)8
-rw-r--r--plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs1
-rw-r--r--plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs99
-rw-r--r--plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs56
-rw-r--r--plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs14
-rw-r--r--plugins/ObjectCacheServer/src/Endpoints/WellKnownEndpoint.cs27
-rw-r--r--plugins/ObjectCacheServer/src/NodeConfig.cs128
-rw-r--r--plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs8
12 files changed, 273 insertions, 179 deletions
diff --git a/plugins/ObjectCacheServer/src/CacheConfiguration.cs b/plugins/ObjectCacheServer/src/Cache/CacheConfiguration.cs
index f7adeb3..86e1f5a 100644
--- a/plugins/ObjectCacheServer/src/CacheConfiguration.cs
+++ b/plugins/ObjectCacheServer/src/Cache/CacheConfiguration.cs
@@ -24,7 +24,7 @@
using System.Text.Json.Serialization;
-namespace VNLib.Data.Caching.ObjectCache.Server
+namespace VNLib.Data.Caching.ObjectCache.Server.Cache
{
internal sealed class CacheConfiguration
{
@@ -42,7 +42,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server
[JsonPropertyName("max_message_size")]
public int MaxMessageSize { get; set; } = 1000 * 1024;
-
+
[JsonPropertyName("change_queue_max_depth")]
public int MaxEventQueueDepth { get; set; } = 10 * 1000;
diff --git a/plugins/ObjectCacheServer/src/CacheEventQueueManager.cs b/plugins/ObjectCacheServer/src/Cache/CacheEventQueueManager.cs
index 3827121..ad0eb5a 100644
--- a/plugins/ObjectCacheServer/src/CacheEventQueueManager.cs
+++ b/plugins/ObjectCacheServer/src/Cache/CacheEventQueueManager.cs
@@ -37,31 +37,31 @@ using VNLib.Plugins.Extensions.Loading;
using VNLib.Plugins.Extensions.Loading.Events;
-namespace VNLib.Data.Caching.ObjectCache.Server
+namespace VNLib.Data.Caching.ObjectCache.Server.Cache
{
-
- [ConfigurationName("event_manager")]
internal sealed class CacheEventQueueManager : ICacheEventQueueManager, IDisposable, IIntervalScheduleable
{
private readonly int MaxQueueDepth;
private readonly object SubLock;
- private readonly LinkedList<NodeQueue> Subscribers;
+ private readonly LinkedList<NodeQueue> Subscribers;
private readonly object StoreLock;
private readonly Dictionary<string, NodeQueue> QueueStore;
-
- public CacheEventQueueManager(PluginBase plugin, IConfigScope config)
+
+ public CacheEventQueueManager(PluginBase plugin)
{
- //Get purge interval
- TimeSpan purgeInterval = config["purge_interval_sec"].GetTimeSpan(TimeParseType.Seconds);
+ //Get node config
+ NodeConfig config = plugin.GetOrCreateSingleton<NodeConfig>();
//Get max queue depth
- MaxQueueDepth = (int)config["max_depth"].GetUInt32();
+ MaxQueueDepth = config.MaxQueueDepth;
- //Create purge interval
- plugin.ScheduleInterval(this, purgeInterval);
+ /*
+ * Schedule purge interval to clean up stale queues
+ */
+ plugin.ScheduleInterval(this, config.EventQueuePurgeInterval);
SubLock = new();
Subscribers = new();
@@ -81,7 +81,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server
lock (StoreLock)
{
//Try to recover the queue for the node
- if(!QueueStore.TryGetValue(peer.NodeId, out nq))
+ if (!QueueStore.TryGetValue(peer.NodeId, out nq))
{
//Create new queue
nq = new(peer.NodeId, MaxQueueDepth);
@@ -163,21 +163,21 @@ namespace VNLib.Data.Caching.ObjectCache.Server
public void PurgeStaleSubscribers()
{
//Enter locks
- lock(SubLock)
- lock(StoreLock)
- {
- //Get all stale queues (queues without listeners)
- NodeQueue[] staleQueues = QueueStore.Values.Where(static nq => nq.Listeners == 0).ToArray();
-
- foreach (NodeQueue nq in staleQueues)
+ lock (SubLock)
+ lock (StoreLock)
{
- //Remove from store
- QueueStore.Remove(nq.NodeId);
+ //Get all stale queues (queues without listeners)
+ NodeQueue[] staleQueues = QueueStore.Values.Where(static nq => nq.Listeners == 0).ToArray();
+
+ foreach (NodeQueue nq in staleQueues)
+ {
+ //Remove from store
+ QueueStore.Remove(nq.NodeId);
- //remove from subscribers
- Subscribers.Remove(nq);
+ //remove from subscribers
+ Subscribers.Remove(nq);
+ }
}
- }
}
//Interval to purge stale subscribers
@@ -186,7 +186,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server
log.Debug("Purging stale peer event queues");
PurgeStaleSubscribers();
-
+
return Task.CompletedTask;
}
@@ -241,7 +241,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server
public void PublishChanges(Span<ChangeEvent> changes)
{
- for(int i = 0; i < changes.Length; i++)
+ for (int i = 0; i < changes.Length; i++)
{
Queue.TryEnque(changes[i]);
}
diff --git a/plugins/ObjectCacheServer/src/CacheListenerPubQueue.cs b/plugins/ObjectCacheServer/src/Cache/CacheListenerPubQueue.cs
index 9c7388e..ba39db6 100644
--- a/plugins/ObjectCacheServer/src/CacheListenerPubQueue.cs
+++ b/plugins/ObjectCacheServer/src/Cache/CacheListenerPubQueue.cs
@@ -26,16 +26,28 @@ using System;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Channels;
+using System.Diagnostics.CodeAnalysis;
using VNLib.Utils.Async;
using VNLib.Utils.Logging;
using VNLib.Plugins;
using VNLib.Plugins.Extensions.Loading;
-namespace VNLib.Data.Caching.ObjectCache.Server
+namespace VNLib.Data.Caching.ObjectCache.Server.Cache
{
+ /*
+ * Implements the event queue for the cache listener. Captures changes from the cache store
+ * and publishes them to subscribers.
+ *
+ * It also allows clients that are listening for changes to wait for events to
+ * their individual queues.
+ */
+
internal sealed class CacheListenerPubQueue : ICacheListenerEventQueue, IAsyncBackgroundWork
{
+ private const int MAX_LOCAL_QUEUE_ITEMS = 10000;
+ private const string LOG_SCOPE_NAME = "QUEUE";
+
private readonly AsyncQueue<ChangeEvent> _listenerQueue;
private readonly ILogProvider _logProvider;
private readonly ICacheEventQueueManager _queueManager;
@@ -43,8 +55,10 @@ namespace VNLib.Data.Caching.ObjectCache.Server
public CacheListenerPubQueue(PluginBase plugin)
{
_queueManager = plugin.GetOrCreateSingleton<CacheEventQueueManager>();
- _logProvider = plugin.Log;
- _listenerQueue = new AsyncQueue<ChangeEvent>(new BoundedChannelOptions(10000)
+ _logProvider = plugin.Log.CreateScope(LOG_SCOPE_NAME);
+
+ //Init local queue to store published events
+ _listenerQueue = new(new BoundedChannelOptions(MAX_LOCAL_QUEUE_ITEMS)
{
AllowSynchronousContinuations = true,
FullMode = BoundedChannelFullMode.DropOldest,
@@ -54,12 +68,17 @@ namespace VNLib.Data.Caching.ObjectCache.Server
}
///<inheritdoc/>
- public async Task DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken)
+ async Task IAsyncBackgroundWork.DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken)
{
const int accumulatorSize = 64;
+ //Create scope
+ pluginLog = pluginLog.CreateScope(LOG_SCOPE_NAME);
+
try
{
+ pluginLog.Debug("Change queue worker listening for local cache changes");
+
//Accumulator for events
ChangeEvent[] accumulator = new ChangeEvent[accumulatorSize];
int index = 0;
@@ -89,9 +108,9 @@ namespace VNLib.Data.Caching.ObjectCache.Server
pluginLog.Debug("Change queue listener worker exited");
}
}
-
+
///<inheritdoc/>
- public bool IsEnabled(object userState)
+ public bool IsEnabled([NotNullWhen(true)] object? userState)
{
return userState is IPeerEventQueue;
}
diff --git a/plugins/ObjectCacheServer/src/CacheStore.cs b/plugins/ObjectCacheServer/src/Cache/CacheStore.cs
index c1d47f6..f94a3f5 100644
--- a/plugins/ObjectCacheServer/src/CacheStore.cs
+++ b/plugins/ObjectCacheServer/src/Cache/CacheStore.cs
@@ -30,11 +30,19 @@ using VNLib.Utils.Logging;
using VNLib.Plugins;
using VNLib.Plugins.Extensions.Loading;
-namespace VNLib.Data.Caching.ObjectCache.Server
+namespace VNLib.Data.Caching.ObjectCache.Server.Cache
{
+ /*
+ * Implements the blob cache store, which is an abstraction around the blob cache listener.
+ * This allows for publishing local events (say from other nodes) to keep caches in sync.
+ */
+
[ConfigurationName("cache")]
internal sealed class CacheStore : ICacheStore, IDisposable
{
+ /// <summary>
+ /// Gets the underlying cache listener
+ /// </summary>
public BlobCacheListener Listener { get; }
@@ -44,16 +52,19 @@ namespace VNLib.Data.Caching.ObjectCache.Server
Listener = InitializeCache((ObjectCacheServerEntry)plugin, config);
}
+ ///<inheritdoc/>
ValueTask ICacheStore.AddOrUpdateBlobAsync<T>(string objectId, string? alternateId, GetBodyDataCallback<T> bodyData, T state, CancellationToken token)
{
return Listener.Cache.AddOrUpdateObjectAsync(objectId, alternateId, bodyData, state, default, token);
}
+ ///<inheritdoc/>
void ICacheStore.Clear()
{
throw new NotImplementedException();
}
+ ///<inheritdoc/>
ValueTask<bool> ICacheStore.DeleteItemAsync(string id, CancellationToken token)
{
return Listener.Cache.DeleteObjectAsync(id, token);
@@ -81,14 +92,14 @@ Cache Configuration:
if (cacheConf.MaxCacheEntries < 200)
{
plugin.Log.Information("Suggestion: You may want a larger cache size, you have less than 200 items in cache");
- }
+ }
//calculate the max memory usage
- ulong maxByteSize = ((ulong)cacheConf.MaxCacheEntries * (ulong)cacheConf.BucketCount * (ulong)cacheConf.MaxMessageSize);
+ ulong maxByteSize = cacheConf.MaxCacheEntries * (ulong)cacheConf.BucketCount * (ulong)cacheConf.MaxMessageSize;
//Log the cache config
- plugin.Log.Information(CacheConfigTemplate,
- maxByteSize / (ulong)(1024 * 1000),
+ plugin.Log.Information(CacheConfigTemplate,
+ maxByteSize / (1024 * 1000),
cacheConf.BucketCount,
cacheConf.MaxCacheEntries
);
@@ -103,6 +114,9 @@ Cache Configuration:
return new(bc, queue, plugin.Log, plugin.CacheHeap);
}
+ /*
+ * Cleaned up by the plugin on exit
+ */
public void Dispose()
{
Listener.Dispose();
diff --git a/plugins/ObjectCacheServer/src/CacheSystemUtil.cs b/plugins/ObjectCacheServer/src/Cache/CacheSystemUtil.cs
index f8aedae..2071d2b 100644
--- a/plugins/ObjectCacheServer/src/CacheSystemUtil.cs
+++ b/plugins/ObjectCacheServer/src/Cache/CacheSystemUtil.cs
@@ -33,7 +33,7 @@ using VNLib.Plugins;
using VNLib.Utils.Memory;
using VNLib.Plugins.Extensions.Loading;
-namespace VNLib.Data.Caching.ObjectCache.Server
+namespace VNLib.Data.Caching.ObjectCache.Server.Cache
{
internal static class CacheSystemUtil
{
@@ -136,9 +136,9 @@ namespace VNLib.Data.Caching.ObjectCache.Server
//Return the
return new(loader);
}
-
- private sealed class RuntimeBlobCacheTable : IBlobCacheTable
+
+ private sealed class RuntimeBlobCacheTable : IBlobCacheTable
{
private readonly IBlobCacheTable _table;
@@ -173,7 +173,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server
internal sealed class PersistantCacheManager : IPersistantCacheStore
{
const string INITIALIZE_METHOD_NAME = "OnInitializeForBucket";
-
+
/*
* Our referrence can be technically unloaded, but so will
diff --git a/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs b/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs
index ffdd4f4..5a04737 100644
--- a/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs
+++ b/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs
@@ -36,6 +36,7 @@ using VNLib.Net.Messaging.FBM;
using VNLib.Net.Messaging.FBM.Client;
using VNLib.Plugins.Extensions.Loading;
using VNLib.Data.Caching.Extensions.Clustering;
+using VNLib.Data.Caching.ObjectCache.Server.Cache;
namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
{
diff --git a/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs b/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs
index f132cab..65cc009 100644
--- a/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs
+++ b/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs
@@ -31,9 +31,9 @@ using System.Threading.Tasks;
using System.Collections.Generic;
using VNLib.Utils.Logging;
-using VNLib.Data.Caching.Extensions;
using VNLib.Plugins;
using VNLib.Plugins.Extensions.Loading;
+using VNLib.Data.Caching.Extensions;
using VNLib.Data.Caching.Extensions.Clustering;
namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
@@ -46,38 +46,48 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
internal sealed class PeerDiscoveryManager : IAsyncBackgroundWork, ICachePeerAdapter
{
private const string LOG_SCOPE_NAME = "DISC";
+ /*
+ * The initial discovery delay. This allows for the server to initialize before
+ * starting the discovery process. This will probably be a shorter delay
+ * than a usual discovery interval.
+ */
private static readonly TimeSpan InitialDelay = TimeSpan.FromSeconds(15);
+ private static readonly TimeSpan WhenWellKnownResolveFailed = TimeSpan.FromSeconds(20);
+
private readonly List<CacheNodeAdvertisment> _connectedPeers;
- private readonly NodeConfig _config;
- private readonly CachePeerMonitor _monitor;
+ private readonly NodeConfig Config;
+ private readonly CachePeerMonitor Monitor;
+ private readonly ILogProvider Log;
private readonly bool IsDebug;
- private readonly ILogProvider _log;
+ private readonly bool HasWellKnown;
public PeerDiscoveryManager(PluginBase plugin)
{
//Get config
- _config = plugin.GetOrCreateSingleton<NodeConfig>();
+ Config = plugin.GetOrCreateSingleton<NodeConfig>();
//Get the known peers array from config, its allowed to be null for master nodes
IConfigScope? config = plugin.TryGetConfig("known_peers");
string[] kownPeers = config?.Deserialze<string[]>() ?? Array.Empty<string>();
//Add known peers to the monitor
- _config.Config.WithInitialPeers(kownPeers.Select(static s => new Uri(s)));
+ Config.Config.WithInitialPeers(kownPeers.Select(static s => new Uri(s)));
- plugin.Log.Information("Inital peer nodes: {nodes}", kownPeers);
+ HasWellKnown = kownPeers.Length > 0;
//Get the peer monitor
- _monitor = plugin.GetOrCreateSingleton<CachePeerMonitor>();
+ Monitor = plugin.GetOrCreateSingleton<CachePeerMonitor>();
_connectedPeers = new();
//Create scoped logger
- _log = plugin.Log.CreateScope(LOG_SCOPE_NAME);
+ Log = plugin.Log.CreateScope(LOG_SCOPE_NAME);
+
+ Log.Information("Inital peer nodes: {nodes}", kownPeers);
//Setup discovery error handler
- _config.Config.WithErrorHandler(new ErrorHandler(_log));
+ Config.Config.WithErrorHandler(new ErrorHandler(Log));
IsDebug = plugin.IsDebug();
}
@@ -93,33 +103,29 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
//Start the change listener
Task watcher = WatchForPeersAsync(exitToken);
- _log.Information("Node discovery worker started, waiting for {idel} to start initial discovery", InitialDelay);
+ Log.Information("Node discovery worker started, waiting for {idel} to start initial discovery", InitialDelay);
try
{
//Wait for the initial delay
await Task.Delay(InitialDelay, exitToken);
- _log.Debug("Begining discovery loop");
-
- /*
- * To avoid connecting to ourself, we add ourselves to the connected list
- * and it should never get removed. This is because the monitor will never
- * report our own advertisment.
- */
- _connectedPeers.Add(_config.Config.Advertisment);
+ Log.Debug("Begining discovery loop");
while (true)
{
+ bool wellKnownFailed = false;
+
try
{
if (IsDebug)
{
- _log.Debug("Begining node discovery");
+ Log.Debug("Begining node discovery");
}
//Resolve all known peers
- CacheNodeAdvertisment[] wellKnown = await _config.Config.ResolveWellKnownAsync(exitToken);
+ CacheNodeAdvertisment[] wellKnown = await Config.Config.ResolveWellKnownAsync(exitToken);
+ wellKnownFailed = wellKnown.Length == 0;
//Use the monitor to get the initial peers
IEnumerable<CacheNodeAdvertisment> ads = GetMonitorAds();
@@ -130,15 +136,15 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
if (allAds.Length > 0)
{
//Discover all kown nodes
- await _config.Config.DiscoverNodesAsync(allAds, exitToken);
+ await Config.Config.DiscoverNodesAsync(allAds, exitToken);
}
//Log the discovered nodes if verbose logging is enabled
if (IsDebug)
{
- CacheNodeAdvertisment[] found = _config.Config.NodeCollection.GetAllNodes();
+ CacheNodeAdvertisment[] found = Config.Config.NodeCollection.GetAllNodes();
- _log.Debug("Discovered {count} nodes\n\t{nodes}", found.Length, found.Select(static s => s.NodeId));
+ Log.Debug("Discovered {count} nodes\n\t{nodes}", found.Length, found.Select(static s => s.NodeId));
}
}
catch(OperationCanceledException)
@@ -147,17 +153,38 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
}
catch (Exception ex)
{
- _log.Error(ex, "Failed to discover new peer nodes");
+ Log.Error(ex, "Failed to discover new peer nodes");
}
- //Delay the next discovery
- await Task.Delay(_config.DiscoveryInterval, exitToken);
+ /*
+ * If we have well known nodes and the discovery failed, we wait for a shorter
+ * duration before retrying. This is to avoid spamming the network with requests
+ * if the well known nodes are down. But if we don't have any well known nodes
+ * we cannot continue.
+ *
+ * This only matters if we are exepcted to have well known nodes.
+ */
+ if(HasWellKnown && wellKnownFailed)
+ {
+ if (IsDebug)
+ {
+ Log.Debug("Well known node discovery failed, waiting for {idel} before retrying", WhenWellKnownResolveFailed);
+ }
+
+ //Wait for shorter duration
+ await Task.Delay(WhenWellKnownResolveFailed, exitToken);
+ }
+ else
+ {
+ //Delay the next discovery
+ await Task.Delay(Config.DiscoveryInterval, exitToken);
+ }
}
}
catch (OperationCanceledException)
{
//Normal exit
- _log.Information("Node discovery worker exiting");
+ Log.Information("Node discovery worker exiting on plugin exit");
}
finally
{
@@ -170,10 +197,10 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
private IEnumerable<CacheNodeAdvertisment> GetMonitorAds()
{
- return _monitor.GetAllPeers()
+ return Monitor.GetAllPeers()
.Where(static p => p.Advertisment != null)
//Without us
- .Where(n => n.NodeId != _config.Config.NodeId)
+ .Where(n => n.NodeId != Config.Config.NodeId)
.Select(static p => p.Advertisment!);
}
@@ -182,26 +209,26 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
{
try
{
- _log.Debug("Discovery worker waiting for new peers to connect");
+ Log.Debug("Discovery worker waiting for new peers to connect");
while (true)
{
//Wait for changes, then get new peers
- await _monitor.WaitForChangeAsync().WaitAsync(cancellation);
+ await Monitor.WaitForChangeAsync().WaitAsync(cancellation);
- _log.Verbose("New peers connected");
+ Log.Verbose("New peers connected");
//Use the monitor to get the initial peers
IEnumerable<CacheNodeAdvertisment> ads = GetMonitorAds();
- ((NodeDiscoveryCollection)_config.Config.NodeCollection).AddManualNodes(ads);
+ ((NodeDiscoveryCollection)Config.Config.NodeCollection).AddManualNodes(ads);
}
}
catch (OperationCanceledException)
{
//Normal ext
- _log.Debug("Connected peer listener exited");
+ Log.Debug("Connected peer listener exited");
}
}
@@ -212,7 +239,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
lock (_connectedPeers)
{
//Get all discovered peers
- CacheNodeAdvertisment[] peers = _config.Config.NodeCollection.GetAllNodes();
+ CacheNodeAdvertisment[] peers = Config.Config.NodeCollection.GetAllNodes();
//Get the difference between the discovered peers and the connected peers
return peers.Except(_connectedPeers).ToArray();
diff --git a/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs b/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs
index 5e794f8..d232fd8 100644
--- a/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs
+++ b/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs
@@ -45,13 +45,13 @@ using VNLib.Plugins.Extensions.Loading;
using VNLib.Plugins.Essentials.Endpoints;
using VNLib.Plugins.Essentials.Extensions;
using VNLib.Plugins.Extensions.Loading.Routing;
-using VNLib.Data.Caching.ObjectCache.Server.Distribution;
using VNLib.Data.Caching.Extensions.Clustering;
+using VNLib.Data.Caching.ObjectCache.Server.Cache;
+using VNLib.Data.Caching.ObjectCache.Server.Clustering;
namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
{
- [ConfigurationName("connect_endpoint")]
internal sealed class ConnectEndpoint : ResourceEndpointBase
{
private const string LOG_SCOPE_NAME = "CONEP";
@@ -62,10 +62,8 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
private readonly NodeConfig NodeConfiguration;
private readonly ICacheEventQueueManager PubSubManager;
private readonly IPeerMonitor Peers;
-
private readonly BlobCacheListener Store;
-
- private readonly bool VerifyIp;
+
private readonly string AudienceLocalServerId;
private uint _connectedClients;
@@ -88,21 +86,17 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
DisableSessionsRequired = true
};
- public ConnectEndpoint(PluginBase plugin, IConfigScope config)
+ public ConnectEndpoint(PluginBase plugin)
{
- string? path = config["path"].GetString();
-
- InitPathAndLog(path, plugin.Log.CreateScope(LOG_SCOPE_NAME));
+ //Get node configuration
+ NodeConfiguration = plugin.GetOrCreateSingleton<NodeConfig>();
- //Check for ip-verification flag
- VerifyIp = config.TryGetValue("verify_ip", out JsonElement vIpEl) && vIpEl.GetBoolean();
+ //Init from config and create a new log scope
+ InitPathAndLog(NodeConfiguration.ConnectPath, plugin.Log.CreateScope(LOG_SCOPE_NAME));
//Setup pub/sub manager
PubSubManager = plugin.GetOrCreateSingleton<CacheEventQueueManager>();
- //Get node configuration
- NodeConfiguration = plugin.GetOrCreateSingleton<NodeConfig>();
-
//Get peer monitor
Peers = plugin.GetOrCreateSingleton<CachePeerMonitor>();
@@ -186,7 +180,14 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
}
}
- Log.Debug("Received negotiation request from node {node}", nodeId);
+ if (isPeer)
+ {
+ Log.Debug("Received negotiation request from peer node {node}", nodeId);
+ }
+ else
+ {
+ Log.Debug("Received negotiation request from client {client}", entity.TrustedRemoteIp.ToString());
+ }
//Verified, now we can create an auth message with a short expiration
using JsonWebToken auth = new();
@@ -256,7 +257,8 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
//Verify audience, expiration
- if (!doc.RootElement.TryGetProperty("aud", out JsonElement audEl) || !AudienceLocalServerId.Equals(audEl.GetString(), StringComparison.OrdinalIgnoreCase))
+ if (!doc.RootElement.TryGetProperty("aud", out JsonElement audEl)
+ || !AudienceLocalServerId.Equals(audEl.GetString(), StringComparison.OrdinalIgnoreCase))
{
entity.CloseResponse(HttpStatusCode.Unauthorized);
return VfReturnType.VirtualSkip;
@@ -270,7 +272,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
}
//Check node ip address matches if required
- if (VerifyIp)
+ if (NodeConfiguration.VerifyIp)
{
if (!doc.RootElement.TryGetProperty("ip", out JsonElement ipEl))
{
@@ -318,6 +320,8 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
}
}
+ WsUserState state;
+
try
{
//Get query config suggestions from the client
@@ -340,7 +344,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
int maxMessageSizeClamp = Math.Clamp(maxMessageSize, CacheConfig.MinRecvBufferSize, CacheConfig.MaxRecvBufferSize);
//Init new ws state object and clamp the suggested buffer sizes
- WsUserState state = new()
+ state = new()
{
RecvBufferSize = Math.Clamp(recvBufSize, CacheConfig.MinRecvBufferSize, CacheConfig.MaxRecvBufferSize),
MaxHeaderBufferSize = Math.Clamp(maxHeadBufSize, CacheConfig.MinHeaderBufferSize, CacheConfig.MaxHeaderBufferSize),
@@ -356,20 +360,18 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
NodeId = nodeId,
Advertisment = discoveryAd
};
-
- Log.Debug("Client recv buffer suggestion {recv}, header buffer size {head}, response buffer size {r}", recvBufCmd, maxHeaderCharCmd, state.MaxResponseBufferSize);
-
- //Print state message to console
- Log.Verbose("Client buffer state {state}", state);
-
- //Accept socket and pass state object
- entity.AcceptWebSocket(WebsocketAcceptedAsync, state);
- return VfReturnType.VirtualSkip;
}
catch (KeyNotFoundException)
{
return VfReturnType.BadRequest;
}
+
+ //Print state message to console
+ Log.Debug("Client buffer state {state}", state);
+
+ //Accept socket and pass state object
+ entity.AcceptWebSocket(WebsocketAcceptedAsync, state);
+ return VfReturnType.VirtualSkip;
}
private async Task WebsocketAcceptedAsync(WebSocketSession wss)
diff --git a/plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs b/plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs
index 77d59dd..adb83e0 100644
--- a/plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs
+++ b/plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs
@@ -27,19 +27,17 @@ using System.Net;
using System.Linq;
using System.Text.Json;
-using VNLib.Hashing;
using VNLib.Hashing.IdentityUtility;
using VNLib.Plugins;
using VNLib.Plugins.Essentials;
using VNLib.Plugins.Essentials.Endpoints;
using VNLib.Plugins.Essentials.Extensions;
using VNLib.Plugins.Extensions.Loading;
-using VNLib.Data.Caching.ObjectCache.Server.Distribution;
using VNLib.Data.Caching.Extensions.Clustering;
+using VNLib.Data.Caching.ObjectCache.Server.Clustering;
namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
{
- [ConfigurationName("discovery_endpoint")]
internal sealed class PeerDiscoveryEndpoint : ResourceEndpointBase
{
private readonly IPeerMonitor PeerMonitor;
@@ -53,17 +51,15 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
DisableSessionsRequired = true
};
- public PeerDiscoveryEndpoint(PluginBase plugin, IConfigScope config)
+ public PeerDiscoveryEndpoint(PluginBase plugin)
{
- string? path = config["path"].GetString();
-
- InitPathAndLog(path, plugin.Log);
-
//Get the peer monitor
PeerMonitor = plugin.GetOrCreateSingleton<CachePeerMonitor>();
//Get the node config
Config = plugin.GetOrCreateSingleton<NodeConfig>();
+
+ InitPathAndLog(Config.DiscoveryPath, plugin.Log);
}
protected override VfReturnType Get(HttpEntity entity)
@@ -100,7 +96,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
using JsonDocument payload = jwt.GetPayload();
//Get client info to pass back
- subject = payload.RootElement.GetProperty("sub").GetString() ?? string.Empty;
+ subject = payload.RootElement.TryGetProperty("sub", out JsonElement subEl) ? subEl.GetString() ?? string.Empty : string.Empty;
challenge = payload.RootElement.GetProperty("chl").GetString() ?? string.Empty;
}
diff --git a/plugins/ObjectCacheServer/src/Endpoints/WellKnownEndpoint.cs b/plugins/ObjectCacheServer/src/Endpoints/WellKnownEndpoint.cs
index 99c7f19..3fcc471 100644
--- a/plugins/ObjectCacheServer/src/Endpoints/WellKnownEndpoint.cs
+++ b/plugins/ObjectCacheServer/src/Endpoints/WellKnownEndpoint.cs
@@ -24,10 +24,7 @@
using System;
using System.Net;
-using System.Text.Json;
-using VNLib.Data.Caching.Extensions;
-using VNLib.Data.Caching.Extensions.Clustering;
using VNLib.Hashing;
using VNLib.Hashing.IdentityUtility;
using VNLib.Plugins;
@@ -35,6 +32,8 @@ using VNLib.Plugins.Essentials;
using VNLib.Plugins.Essentials.Endpoints;
using VNLib.Plugins.Essentials.Extensions;
using VNLib.Plugins.Extensions.Loading;
+using VNLib.Data.Caching.Extensions;
+using VNLib.Data.Caching.Extensions.Clustering;
namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
{
@@ -44,12 +43,8 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
* the network. Clients need to know the endpoint layout to be able to
* connect and discover other nodes.
*/
-
- [ConfigurationName("well_known", Required = false)]
internal sealed class WellKnownEndpoint : ResourceEndpointBase
- {
- //Default path for the well known endpoint
- const string DefaultPath = "/.well-known/vncache";
+ {
//Store serialized advertisment
private readonly CacheNodeAdvertisment _advertisment;
@@ -62,10 +57,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
DisableSessionsRequired = true,
};
- public WellKnownEndpoint(PluginBase plugin):this(plugin, null)
- { }
-
- public WellKnownEndpoint(PluginBase plugin, IConfigScope? config)
+ public WellKnownEndpoint(PluginBase plugin)
{
//Get the node config
NodeConfig nodeConfig = plugin.GetOrCreateSingleton<NodeConfig>();
@@ -74,16 +66,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
_advertisment = nodeConfig.Config.Advertisment;
_keyStore = nodeConfig.KeyStore;
- //Default to the well known path
- string path = DefaultPath;
-
- //See if the user configured a path
- if(config != null && config.TryGetValue("path", out JsonElement pathEl))
- {
- path = pathEl.GetString() ?? DefaultPath;
- }
-
- InitPathAndLog(path, plugin.Log);
+ InitPathAndLog(nodeConfig.WellKnownPath, plugin.Log);
}
protected override VfReturnType Get(HttpEntity entity)
diff --git a/plugins/ObjectCacheServer/src/NodeConfig.cs b/plugins/ObjectCacheServer/src/NodeConfig.cs
index a6c5be9..3a2e10e 100644
--- a/plugins/ObjectCacheServer/src/NodeConfig.cs
+++ b/plugins/ObjectCacheServer/src/NodeConfig.cs
@@ -26,29 +26,22 @@ using System;
using System.Net;
using System.Linq;
using System.Text.Json;
+using System.Collections.Generic;
using VNLib.Plugins;
using VNLib.Utils.Logging;
using VNLib.Utils.Extensions;
using VNLib.Plugins.Extensions.Loading;
-using VNLib.Data.Caching.ObjectCache.Server.Endpoints;
using VNLib.Data.Caching.Extensions.Clustering;
+
namespace VNLib.Data.Caching.ObjectCache.Server
{
[ConfigurationName("cluster")]
internal sealed class NodeConfig
{
- const string CacheConfigTemplate =
-@"
-Cluster Configuration:
- Node Id: {id}
- TlsEndabled: {tls}
- Cache Endpoint: {ep}
- Discovery Endpoint: {dep}
- Discovery Interval: {di}
- Max Peers: {mpc}
-";
+ //Default path for the well known endpoint
+ const string DefaultPath = "/.well-known/vncache";
public CacheNodeConfiguration Config { get; }
@@ -56,16 +49,25 @@ Cluster Configuration:
public TimeSpan DiscoveryInterval { get; }
+ public TimeSpan EventQueuePurgeInterval { get; }
+
+ public int MaxQueueDepth { get; }
+
+ public string? DiscoveryPath { get; }
+
+ public string ConnectPath { get; }
+
+ public string WellKnownPath { get; }
+
+ public bool VerifyIp { get; }
+
/// <summary>
/// The maximum number of peer connections to allow
/// </summary>
public uint MaxPeerConnections { get; } = 10;
public NodeConfig(PluginBase plugin, IConfigScope config)
- {
-
- Config = new();
-
+ {
//Get the port of the primary webserver
int port;
bool usingTls;
@@ -86,56 +88,104 @@ Cluster Configuration:
//Server id is just dns name for now
string nodeId = $"{hostname}:{port}";
-
- //The endpoint to advertise to cache clients that allows cache connections
- Uri cacheEndpoint = GetEndpointUri<ConnectEndpoint>(plugin, usingTls, port, hostname);
-
+
//Init key store
KeyStore = new(plugin);
+
+ DiscoveryInterval = config["discovery_interval_sec"].GetTimeSpan(TimeParseType.Seconds);
+
+ //Get the event queue purge interval
+ EventQueuePurgeInterval = config["queue_purge_interval_sec"].GetTimeSpan(TimeParseType.Seconds);
+
+ //Get the max queue depth
+ MaxQueueDepth = (int)config["max_queue_depth"].GetUInt32();
+
+
+ //Get the connect path
+ ConnectPath = config["connect_path"].GetString() ?? throw new KeyNotFoundException("Missing required key 'connect_path' in cluster config");
+
+ //Get the verify ip setting
+ VerifyIp = config["verify_ip"].GetBoolean();
+
+ Uri connectEp = BuildUri(usingTls, hostname, port, ConnectPath);
+ Uri? discoveryEp = null;
+
+ Config = new();
+
//Setup cache node config
- Config.WithCacheEndpoint(cacheEndpoint)
+ Config.WithCacheEndpoint(connectEp)
.WithNodeId(nodeId)
.WithAuthenticator(KeyStore)
.WithTls(usingTls);
- //Check if advertising is enabled
- if(plugin.HasConfigForType<PeerDiscoveryEndpoint>())
+ //Get the discovery path (optional)
+ if (config.TryGetValue("discovery_path", out JsonElement discoveryPathEl))
{
- //Get the the broadcast endpoint
- Uri discoveryEndpoint = GetEndpointUri<PeerDiscoveryEndpoint>(plugin, usingTls, port, hostname);
-
- //Enable advertising
- Config.EnableAdvertisment(discoveryEndpoint);
+ DiscoveryPath = discoveryPathEl.GetString();
+
+ //Enable advertisment if a discovery path is present
+ if (!string.IsNullOrEmpty(DiscoveryPath))
+ {
+ //Build the discovery endpoint, it must be an absolute uri
+ discoveryEp = BuildUri(usingTls, hostname, port, DiscoveryPath);
+ Config.EnableAdvertisment(discoveryEp);
+ }
}
-
- DiscoveryInterval = config["discovery_interval_sec"].GetTimeSpan(TimeParseType.Seconds);
+ //Allow custom well-known path
+ if(config.TryGetValue("well_known_path", out JsonElement wkEl))
+ {
+ WellKnownPath = wkEl.GetString() ?? DefaultPath;
+ }
+ //Default if not set
+ WellKnownPath ??= DefaultPath;
//Get the max peer connections
- if(config.TryGetValue("max_peers", out JsonElement maxPeerEl))
+ if (config.TryGetValue("max_peers", out JsonElement maxPeerEl))
{
MaxPeerConnections = maxPeerEl.GetUInt32();
}
+ const string CacheConfigTemplate =
+@"
+Cluster Configuration:
+ Node Id: {id}
+ TlsEndabled: {tls}
+ Verify Ip: {vi}
+ Well-Known: {wk}
+ Cache Endpoint: {ep}
+ Discovery Endpoint: {dep}
+ Discovery Interval: {di}
+ Max Peer Connections: {mpc}
+ Max Queue Depth: {mqd}
+ Event Queue Purge Interval: {eqpi}
+";
+
//log the config
plugin.Log.Information(CacheConfigTemplate,
nodeId,
usingTls,
- cacheEndpoint,
- Config.DiscoveryEndpoint,
+ VerifyIp,
+ WellKnownPath,
+ connectEp,
+ discoveryEp,
DiscoveryInterval,
- MaxPeerConnections
+ MaxPeerConnections,
+ MaxQueueDepth,
+ EventQueuePurgeInterval
);
}
- private static Uri GetEndpointUri<T>(PluginBase plugin, bool usingTls, int port, string hostName) where T: IEndpoint
+ private static Uri BuildUri(bool tls, string host, int port, string path)
{
- //Get the cache endpoint config
- IConfigScope cacheEpConfig = plugin.GetConfigForType<T>();
-
- //The endpoint to advertise to cache clients that allows cache connections
- return new UriBuilder(usingTls ? Uri.UriSchemeHttps : Uri.UriSchemeHttp, hostName, port, cacheEpConfig["path"].GetString()).Uri;
+ return new UriBuilder
+ {
+ Scheme = tls ? "https" : "http",
+ Host = host,
+ Port = port,
+ Path = path
+ }.Uri;
}
}
}
diff --git a/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs b/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs
index 1ddf49b..a566390 100644
--- a/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs
+++ b/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs
@@ -33,7 +33,7 @@ using VNLib.Utils.Memory.Diagnostics;
using VNLib.Plugins.Extensions.Loading;
using VNLib.Plugins.Extensions.Loading.Routing;
using VNLib.Data.Caching.ObjectCache.Server.Endpoints;
-using VNLib.Data.Caching.ObjectCache.Server.Distribution;
+using VNLib.Data.Caching.ObjectCache.Server.Clustering;
namespace VNLib.Data.Caching.ObjectCache.Server
{
@@ -71,11 +71,13 @@ namespace VNLib.Data.Caching.ObjectCache.Server
}
}
-
protected override void OnLoad()
{
try
{
+ //Get the node configuration first
+ NodeConfig config = this.GetOrCreateSingleton<NodeConfig>();
+
//Route well-known endpoint
this.Route<WellKnownEndpoint>();
@@ -86,7 +88,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server
_ = this.GetOrCreateSingleton<CacheNodeReplicationMaanger>();
//Setup discovery endpoint
- if(this.HasConfigForType<PeerDiscoveryEndpoint>())
+ if(!string.IsNullOrWhiteSpace(config.DiscoveryPath))
{
this.Route<PeerDiscoveryEndpoint>();
}