aboutsummaryrefslogtreecommitdiff
path: root/plugins/ObjectCacheServer/src
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/ObjectCacheServer/src')
-rw-r--r--plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs17
-rw-r--r--plugins/ObjectCacheServer/src/ObjectCacheSystemState.cs28
-rw-r--r--plugins/ObjectCacheServer/src/ServerClusterConfig.cs4
3 files changed, 36 insertions, 13 deletions
diff --git a/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs b/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs
index b8ee9c8..b9a220d 100644
--- a/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs
+++ b/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs
@@ -40,12 +40,18 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
* This class is responsible for resolving and discovering peer nodes in the cluster network.
*/
- internal sealed class PeerDiscoveryManager(CacheNodeConfiguration config, ServerClusterConfig clusterConf, ILogProvider Log, bool IsDebug, bool HasWellKnown)
+ internal sealed class PeerDiscoveryManager(
+ CacheNodeConfiguration config,
+ ServerClusterConfig clusterConf,
+ CachePeerMonitor Monitor,
+ ILogProvider Log,
+ bool IsDebug,
+ bool HasWellKnown
+ )
: IAsyncBackgroundWork, ICachePeerAdapter
{
private readonly List<CacheNodeAdvertisment> _connectedPeers = [];
- private readonly CachePeerMonitor Monitor = new();
private readonly VNCacheClusterManager clusterMan = new(config);
async Task IAsyncBackgroundWork.DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken)
@@ -81,9 +87,12 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
/*
* On every loop we will need to resolve well-known servers incase they go down
* or change. There probably should be some more advanced logic and caching here.
+ *
+ * Node may not have any well-known nodes, so we need to check for that.
*/
- CacheNodeAdvertisment[] wellKnown = await clusterMan.ResolveWellKnownAsync(exitToken);
- wellKnownFailed = wellKnown.Length == 0;
+ CacheNodeAdvertisment[] wellKnown = HasWellKnown ?
+ await clusterMan.ResolveWellKnownAsync(exitToken) :
+ Array.Empty<CacheNodeAdvertisment>();
//Use the monitor to get the initial peers
IEnumerable<CacheNodeAdvertisment> ads = GetMonitorAds();
diff --git a/plugins/ObjectCacheServer/src/ObjectCacheSystemState.cs b/plugins/ObjectCacheServer/src/ObjectCacheSystemState.cs
index 970e832..cd5bf1b 100644
--- a/plugins/ObjectCacheServer/src/ObjectCacheSystemState.cs
+++ b/plugins/ObjectCacheServer/src/ObjectCacheSystemState.cs
@@ -26,6 +26,9 @@ using System;
using System.Linq;
using System.Net.Http;
using System.Net.Sockets;
+using System.Threading;
+using System.Threading.Tasks;
+using System.Collections.Generic;
using VNLib.Utils.Logging;
using VNLib.Utils.Memory;
@@ -37,8 +40,6 @@ using VNLib.Plugins.Extensions.Loading;
using VNLib.Data.Caching.Extensions.Clustering;
using VNLib.Data.Caching.ObjectCache.Server.Cache;
using VNLib.Data.Caching.ObjectCache.Server.Clustering;
-using System.Threading.Tasks;
-using System.Threading;
namespace VNLib.Data.Caching.ObjectCache.Server
{
@@ -116,6 +117,8 @@ namespace VNLib.Data.Caching.ObjectCache.Server
LogMemConfiguration();
+ PeerEventQueue = new(plugin, ClusterConfig);
+
//If the plugin is in debug mode enable heap tracking
SharedCacheHeap = plugin.IsDebug() ?
new TrackedHeapWrapper(MemoryUtil.InitializeNewHeapForProcess(), true)
@@ -128,8 +131,6 @@ namespace VNLib.Data.Caching.ObjectCache.Server
ConfigurePeerDiscovery();
ConfigureCacheListener();
-
- PeerEventQueue = new(plugin, ClusterConfig);
}
private void ConfigurePeerDiscovery()
@@ -140,14 +141,22 @@ namespace VNLib.Data.Caching.ObjectCache.Server
ILogProvider discLogger = plugin.Log.CreateScope(CacheConstants.LogScopes.PeerDiscovery);
- NodeConfig.WithInitialPeers(kownPeers.Select(static s => new Uri(s)))
+ //Allow just origin nodes to be used as known peers
+ IEnumerable<Uri> peerUris = kownPeers.Select(static p =>
+ {
+ Uri bUri = new(p, UriKind.Absolute);
+ return bUri.LocalPath == "/" ? new Uri(bUri, CacheConstants.DefaultWellKnownPath) : bUri;
+ });
+
+ NodeConfig.WithInitialPeers(peerUris)
.WithErrorHandler(new ErrorHandler(discLogger));
discLogger.Information("Inital peer nodes: {nodes}", kownPeers);
PeerDiscovery = new PeerDiscoveryManager(
NodeConfig,
- ClusterConfig,
+ ClusterConfig,
+ PeerMonitor,
discLogger,
plugin.IsDebug(),
kownPeers.Length > 0
@@ -176,10 +185,15 @@ namespace VNLib.Data.Caching.ObjectCache.Server
_cacheMemManager = manager;
+ CacheListenerPubQueue queue = new(plugin, PeerEventQueue);
+
+ //Must register background worker to listen for changes
+ _ = plugin.ObserveWork(queue, 150);
+
//Endpoint only allows for a single reader
Listener = new(
plugin.LoadMemoryCacheSystem(config, manager, MemoryConfiguration),
- new CacheListenerPubQueue(plugin, PeerEventQueue),
+ queue,
plugin.Log.CreateScope(CacheConstants.LogScopes.BlobCacheListener),
new SharedHeapFBMMemoryManager(SharedCacheHeap)
);
diff --git a/plugins/ObjectCacheServer/src/ServerClusterConfig.cs b/plugins/ObjectCacheServer/src/ServerClusterConfig.cs
index 8f81ba6..8e098cd 100644
--- a/plugins/ObjectCacheServer/src/ServerClusterConfig.cs
+++ b/plugins/ObjectCacheServer/src/ServerClusterConfig.cs
@@ -45,7 +45,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server
public int MaxQueueDepth { get; } = (int)config.GetRequiredProperty("max_queue_depth", p => p.GetUInt32());
- public string? DiscoveryPath { get; } = config.GetValueOrDefault(CacheConfigTemplate, p => p.GetString(), null);
+ public string? DiscoveryPath { get; } = config.GetValueOrDefault("discovery_path", p => p.GetString(), null);
public string ConnectPath { get; } = config.GetRequiredProperty("connect_path", p => p.GetString()!);
@@ -63,7 +63,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server
/// The maxium number of concurrent client connections to allow
/// before rejecting new connections
/// </summary>
- public uint MaxConcurrentConnections { get; }
+ public uint MaxConcurrentConnections { get; } = config.GetValueOrDefault("max_concurrent_connections", p => p.GetUInt32(), 100u);
const string CacheConfigTemplate =
@"