diff options
Diffstat (limited to 'plugins/ObjectCacheServer/src/ObjectCacheSystemState.cs')
-rw-r--r-- | plugins/ObjectCacheServer/src/ObjectCacheSystemState.cs | 28 |
1 files changed, 21 insertions, 7 deletions
diff --git a/plugins/ObjectCacheServer/src/ObjectCacheSystemState.cs b/plugins/ObjectCacheServer/src/ObjectCacheSystemState.cs index 970e832..cd5bf1b 100644 --- a/plugins/ObjectCacheServer/src/ObjectCacheSystemState.cs +++ b/plugins/ObjectCacheServer/src/ObjectCacheSystemState.cs @@ -26,6 +26,9 @@ using System; using System.Linq; using System.Net.Http; using System.Net.Sockets; +using System.Threading; +using System.Threading.Tasks; +using System.Collections.Generic; using VNLib.Utils.Logging; using VNLib.Utils.Memory; @@ -37,8 +40,6 @@ using VNLib.Plugins.Extensions.Loading; using VNLib.Data.Caching.Extensions.Clustering; using VNLib.Data.Caching.ObjectCache.Server.Cache; using VNLib.Data.Caching.ObjectCache.Server.Clustering; -using System.Threading.Tasks; -using System.Threading; namespace VNLib.Data.Caching.ObjectCache.Server { @@ -116,6 +117,8 @@ namespace VNLib.Data.Caching.ObjectCache.Server LogMemConfiguration(); + PeerEventQueue = new(plugin, ClusterConfig); + //If the plugin is in debug mode enable heap tracking SharedCacheHeap = plugin.IsDebug() ? new TrackedHeapWrapper(MemoryUtil.InitializeNewHeapForProcess(), true) @@ -128,8 +131,6 @@ namespace VNLib.Data.Caching.ObjectCache.Server ConfigurePeerDiscovery(); ConfigureCacheListener(); - - PeerEventQueue = new(plugin, ClusterConfig); } private void ConfigurePeerDiscovery() @@ -140,14 +141,22 @@ namespace VNLib.Data.Caching.ObjectCache.Server ILogProvider discLogger = plugin.Log.CreateScope(CacheConstants.LogScopes.PeerDiscovery); - NodeConfig.WithInitialPeers(kownPeers.Select(static s => new Uri(s))) + //Allow just origin nodes to be used as known peers + IEnumerable<Uri> peerUris = kownPeers.Select(static p => + { + Uri bUri = new(p, UriKind.Absolute); + return bUri.LocalPath == "/" ? new Uri(bUri, CacheConstants.DefaultWellKnownPath) : bUri; + }); + + NodeConfig.WithInitialPeers(peerUris) .WithErrorHandler(new ErrorHandler(discLogger)); discLogger.Information("Inital peer nodes: {nodes}", kownPeers); PeerDiscovery = new PeerDiscoveryManager( NodeConfig, - ClusterConfig, + ClusterConfig, + PeerMonitor, discLogger, plugin.IsDebug(), kownPeers.Length > 0 @@ -176,10 +185,15 @@ namespace VNLib.Data.Caching.ObjectCache.Server _cacheMemManager = manager; + CacheListenerPubQueue queue = new(plugin, PeerEventQueue); + + //Must register background worker to listen for changes + _ = plugin.ObserveWork(queue, 150); + //Endpoint only allows for a single reader Listener = new( plugin.LoadMemoryCacheSystem(config, manager, MemoryConfiguration), - new CacheListenerPubQueue(plugin, PeerEventQueue), + queue, plugin.Log.CreateScope(CacheConstants.LogScopes.BlobCacheListener), new SharedHeapFBMMemoryManager(SharedCacheHeap) ); |