diff options
Diffstat (limited to 'plugins/ObjectCacheServer/src')
3 files changed, 36 insertions, 13 deletions
diff --git a/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs b/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs index b8ee9c8..b9a220d 100644 --- a/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs +++ b/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs @@ -40,12 +40,18 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering * This class is responsible for resolving and discovering peer nodes in the cluster network. */ - internal sealed class PeerDiscoveryManager(CacheNodeConfiguration config, ServerClusterConfig clusterConf, ILogProvider Log, bool IsDebug, bool HasWellKnown) + internal sealed class PeerDiscoveryManager( + CacheNodeConfiguration config, + ServerClusterConfig clusterConf, + CachePeerMonitor Monitor, + ILogProvider Log, + bool IsDebug, + bool HasWellKnown + ) : IAsyncBackgroundWork, ICachePeerAdapter { private readonly List<CacheNodeAdvertisment> _connectedPeers = []; - private readonly CachePeerMonitor Monitor = new(); private readonly VNCacheClusterManager clusterMan = new(config); async Task IAsyncBackgroundWork.DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken) @@ -81,9 +87,12 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering /* * On every loop we will need to resolve well-known servers incase they go down * or change. There probably should be some more advanced logic and caching here. + * + * Node may not have any well-known nodes, so we need to check for that. */ - CacheNodeAdvertisment[] wellKnown = await clusterMan.ResolveWellKnownAsync(exitToken); - wellKnownFailed = wellKnown.Length == 0; + CacheNodeAdvertisment[] wellKnown = HasWellKnown ? + await clusterMan.ResolveWellKnownAsync(exitToken) : + Array.Empty<CacheNodeAdvertisment>(); //Use the monitor to get the initial peers IEnumerable<CacheNodeAdvertisment> ads = GetMonitorAds(); diff --git a/plugins/ObjectCacheServer/src/ObjectCacheSystemState.cs b/plugins/ObjectCacheServer/src/ObjectCacheSystemState.cs index 970e832..cd5bf1b 100644 --- a/plugins/ObjectCacheServer/src/ObjectCacheSystemState.cs +++ b/plugins/ObjectCacheServer/src/ObjectCacheSystemState.cs @@ -26,6 +26,9 @@ using System; using System.Linq; using System.Net.Http; using System.Net.Sockets; +using System.Threading; +using System.Threading.Tasks; +using System.Collections.Generic; using VNLib.Utils.Logging; using VNLib.Utils.Memory; @@ -37,8 +40,6 @@ using VNLib.Plugins.Extensions.Loading; using VNLib.Data.Caching.Extensions.Clustering; using VNLib.Data.Caching.ObjectCache.Server.Cache; using VNLib.Data.Caching.ObjectCache.Server.Clustering; -using System.Threading.Tasks; -using System.Threading; namespace VNLib.Data.Caching.ObjectCache.Server { @@ -116,6 +117,8 @@ namespace VNLib.Data.Caching.ObjectCache.Server LogMemConfiguration(); + PeerEventQueue = new(plugin, ClusterConfig); + //If the plugin is in debug mode enable heap tracking SharedCacheHeap = plugin.IsDebug() ? new TrackedHeapWrapper(MemoryUtil.InitializeNewHeapForProcess(), true) @@ -128,8 +131,6 @@ namespace VNLib.Data.Caching.ObjectCache.Server ConfigurePeerDiscovery(); ConfigureCacheListener(); - - PeerEventQueue = new(plugin, ClusterConfig); } private void ConfigurePeerDiscovery() @@ -140,14 +141,22 @@ namespace VNLib.Data.Caching.ObjectCache.Server ILogProvider discLogger = plugin.Log.CreateScope(CacheConstants.LogScopes.PeerDiscovery); - NodeConfig.WithInitialPeers(kownPeers.Select(static s => new Uri(s))) + //Allow just origin nodes to be used as known peers + IEnumerable<Uri> peerUris = kownPeers.Select(static p => + { + Uri bUri = new(p, UriKind.Absolute); + return bUri.LocalPath == "/" ? new Uri(bUri, CacheConstants.DefaultWellKnownPath) : bUri; + }); + + NodeConfig.WithInitialPeers(peerUris) .WithErrorHandler(new ErrorHandler(discLogger)); discLogger.Information("Inital peer nodes: {nodes}", kownPeers); PeerDiscovery = new PeerDiscoveryManager( NodeConfig, - ClusterConfig, + ClusterConfig, + PeerMonitor, discLogger, plugin.IsDebug(), kownPeers.Length > 0 @@ -176,10 +185,15 @@ namespace VNLib.Data.Caching.ObjectCache.Server _cacheMemManager = manager; + CacheListenerPubQueue queue = new(plugin, PeerEventQueue); + + //Must register background worker to listen for changes + _ = plugin.ObserveWork(queue, 150); + //Endpoint only allows for a single reader Listener = new( plugin.LoadMemoryCacheSystem(config, manager, MemoryConfiguration), - new CacheListenerPubQueue(plugin, PeerEventQueue), + queue, plugin.Log.CreateScope(CacheConstants.LogScopes.BlobCacheListener), new SharedHeapFBMMemoryManager(SharedCacheHeap) ); diff --git a/plugins/ObjectCacheServer/src/ServerClusterConfig.cs b/plugins/ObjectCacheServer/src/ServerClusterConfig.cs index 8f81ba6..8e098cd 100644 --- a/plugins/ObjectCacheServer/src/ServerClusterConfig.cs +++ b/plugins/ObjectCacheServer/src/ServerClusterConfig.cs @@ -45,7 +45,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server public int MaxQueueDepth { get; } = (int)config.GetRequiredProperty("max_queue_depth", p => p.GetUInt32()); - public string? DiscoveryPath { get; } = config.GetValueOrDefault(CacheConfigTemplate, p => p.GetString(), null); + public string? DiscoveryPath { get; } = config.GetValueOrDefault("discovery_path", p => p.GetString(), null); public string ConnectPath { get; } = config.GetRequiredProperty("connect_path", p => p.GetString()!); @@ -63,7 +63,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server /// The maxium number of concurrent client connections to allow /// before rejecting new connections /// </summary> - public uint MaxConcurrentConnections { get; } + public uint MaxConcurrentConnections { get; } = config.GetValueOrDefault("max_concurrent_connections", p => p.GetUInt32(), 100u); const string CacheConfigTemplate = @" |