diff options
Diffstat (limited to 'plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs')
-rw-r--r-- | plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs | 93 |
1 files changed, 16 insertions, 77 deletions
diff --git a/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs b/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs index 6475f9c..f22e1dd 100644 --- a/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs +++ b/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2023 Vaughn Nugent +* Copyright (c) 2024 Vaughn Nugent * * Library: VNLib * Package: ObjectCacheServer @@ -24,14 +24,11 @@ using System; using System.Linq; -using System.Net.Http; using System.Threading; -using System.Net.Sockets; using System.Threading.Tasks; using System.Collections.Generic; using VNLib.Utils.Logging; -using VNLib.Plugins; using VNLib.Plugins.Extensions.Loading; using VNLib.Data.Caching.Extensions; using VNLib.Data.Caching.Extensions.Clustering; @@ -43,9 +40,10 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering * This class is responsible for resolving and discovering peer nodes in the cluster network. */ - internal sealed class PeerDiscoveryManager : IAsyncBackgroundWork, ICachePeerAdapter + internal sealed class PeerDiscoveryManager(NodeConfig config, ILogProvider Log, bool IsDebug, bool HasWellKnown) : IAsyncBackgroundWork, ICachePeerAdapter { - private const string LOG_SCOPE_NAME = "DISC"; + internal const string LOG_SCOPE_NAME = "DISC"; + /* * The initial discovery delay. This allows for the server to initialize before * starting the discovery process. This will probably be a shorter delay @@ -54,43 +52,9 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering private static readonly TimeSpan InitialDelay = TimeSpan.FromSeconds(15); private static readonly TimeSpan WhenWellKnownResolveFailed = TimeSpan.FromSeconds(20); - - private readonly List<CacheNodeAdvertisment> _connectedPeers; - private readonly NodeConfig Config; - private readonly CachePeerMonitor Monitor; - private readonly ILogProvider Log; - private readonly bool IsDebug; - private readonly bool HasWellKnown; - - public PeerDiscoveryManager(PluginBase plugin) - { - //Get config - Config = plugin.GetOrCreateSingleton<NodeConfig>(); - - //Get the known peers array from config, its allowed to be null for master nodes - IConfigScope? config = plugin.TryGetConfig("known_peers"); - string[] kownPeers = config?.Deserialze<string[]>() ?? Array.Empty<string>(); - - //Add known peers to the monitor - Config.Config.WithInitialPeers(kownPeers.Select(static s => new Uri(s))); - - HasWellKnown = kownPeers.Length > 0; - - //Get the peer monitor - Monitor = plugin.GetOrCreateSingleton<CachePeerMonitor>(); - - _connectedPeers = new(); - - //Create scoped logger - Log = plugin.Log.CreateScope(LOG_SCOPE_NAME); - - Log.Information("Inital peer nodes: {nodes}", kownPeers); - - //Setup discovery error handler - Config.Config.WithErrorHandler(new ErrorHandler(Log)); - - IsDebug = plugin.IsDebug(); - } + private readonly List<CacheNodeAdvertisment> _connectedPeers = []; + private readonly CachePeerMonitor Monitor = new(); + private readonly VNCacheClusterManager clusterMan = new(config.Config); async Task IAsyncBackgroundWork.DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken) { @@ -124,7 +88,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering } //Resolve all known peers - CacheNodeAdvertisment[] wellKnown = await Config.Config.ResolveWellKnownAsync(exitToken); + CacheNodeAdvertisment[] wellKnown = await clusterMan.ResolveWellKnownAsync(exitToken); wellKnownFailed = wellKnown.Length == 0; //Use the monitor to get the initial peers @@ -136,13 +100,13 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering if (allAds.Length > 0) { //Discover all known nodes - await Config.Config.DiscoverNodesAsync(allAds, exitToken); + await clusterMan.DiscoverNodesAsync(allAds, exitToken); } //Log the discovered nodes if verbose logging is enabled if (IsDebug) { - CacheNodeAdvertisment[] found = Config.Config.NodeCollection.GetAllNodes(); + CacheNodeAdvertisment[] found = clusterMan.DiscoveredNodes.GetAllNodes(); Log.Debug("Discovered {count} nodes\n\t{nodes}", found.Length, found.Select(static s => s.NodeId)); } @@ -177,7 +141,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering else { //Delay the next discovery - await Task.Delay(Config.DiscoveryInterval, exitToken); + await Task.Delay(config.DiscoveryInterval, exitToken); } } } @@ -188,7 +152,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering } finally { - + Monitor.Dispose(); } //Wait for the watcher to exit @@ -197,10 +161,11 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering private IEnumerable<CacheNodeAdvertisment> GetMonitorAds() { + string selfId = (clusterMan.Config as CacheNodeConfiguration)!.NodeId; return Monitor.GetAllPeers() .Where(static p => p.Advertisment != null) //Without us - .Where(n => n.NodeId != Config.Config.NodeId) + .Where(n => !string.Equals(n.NodeId, selfId, StringComparison.OrdinalIgnoreCase)) .Select(static p => p.Advertisment!); } @@ -222,7 +187,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering //Use the monitor to get the initial peers IEnumerable<CacheNodeAdvertisment> ads = GetMonitorAds(); - ((NodeDiscoveryCollection)Config.Config.NodeCollection).AddManualNodes(ads); + clusterMan.AddManualNodes(ads); } } catch (OperationCanceledException) @@ -239,7 +204,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering lock (_connectedPeers) { //Get all discovered peers - CacheNodeAdvertisment[] peers = Config.Config.NodeCollection.GetAllNodes(); + CacheNodeAdvertisment[] peers = clusterMan.DiscoveredNodes.GetAllNodes(); //Get the difference between the discovered peers and the connected peers return peers.Except(_connectedPeers).ToArray(); @@ -265,31 +230,5 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering _connectedPeers.Remove(peer); } } - - - private sealed record class ErrorHandler(ILogProvider Logger) : ICacheDiscoveryErrorHandler - { - public void OnDiscoveryError(CacheNodeAdvertisment errorNode, Exception ex) - { - - if (ex is HttpRequestException hre) - { - if (hre.InnerException is SocketException se) - { - //traisnport failed - Logger.Warn("Failed to connect to server {serv} because {err}", errorNode, se.Message); - } - else - { - Logger.Error("Failed to connect to node {n}\n{err}", errorNode, hre); - } - } - else - { - Logger.Error("Failed to discover nodes from nodeid {nid}, with error\n{err}", errorNode, ex); - } - - } - } } } |