diff options
Diffstat (limited to 'plugins/ObjectCacheServer/src/Clustering')
-rw-r--r-- | plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs | 1 | ||||
-rw-r--r-- | plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs | 99 |
2 files changed, 64 insertions, 36 deletions
diff --git a/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs b/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs index ffdd4f4..5a04737 100644 --- a/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs +++ b/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs @@ -36,6 +36,7 @@ using VNLib.Net.Messaging.FBM; using VNLib.Net.Messaging.FBM.Client; using VNLib.Plugins.Extensions.Loading; using VNLib.Data.Caching.Extensions.Clustering; +using VNLib.Data.Caching.ObjectCache.Server.Cache; namespace VNLib.Data.Caching.ObjectCache.Server.Clustering { diff --git a/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs b/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs index f132cab..65cc009 100644 --- a/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs +++ b/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs @@ -31,9 +31,9 @@ using System.Threading.Tasks; using System.Collections.Generic; using VNLib.Utils.Logging; -using VNLib.Data.Caching.Extensions; using VNLib.Plugins; using VNLib.Plugins.Extensions.Loading; +using VNLib.Data.Caching.Extensions; using VNLib.Data.Caching.Extensions.Clustering; namespace VNLib.Data.Caching.ObjectCache.Server.Clustering @@ -46,38 +46,48 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering internal sealed class PeerDiscoveryManager : IAsyncBackgroundWork, ICachePeerAdapter { private const string LOG_SCOPE_NAME = "DISC"; + /* + * The initial discovery delay. This allows for the server to initialize before + * starting the discovery process. This will probably be a shorter delay + * than a usual discovery interval. + */ private static readonly TimeSpan InitialDelay = TimeSpan.FromSeconds(15); + private static readonly TimeSpan WhenWellKnownResolveFailed = TimeSpan.FromSeconds(20); + private readonly List<CacheNodeAdvertisment> _connectedPeers; - private readonly NodeConfig _config; - private readonly CachePeerMonitor _monitor; + private readonly NodeConfig Config; + private readonly CachePeerMonitor Monitor; + private readonly ILogProvider Log; private readonly bool IsDebug; - private readonly ILogProvider _log; + private readonly bool HasWellKnown; public PeerDiscoveryManager(PluginBase plugin) { //Get config - _config = plugin.GetOrCreateSingleton<NodeConfig>(); + Config = plugin.GetOrCreateSingleton<NodeConfig>(); //Get the known peers array from config, its allowed to be null for master nodes IConfigScope? config = plugin.TryGetConfig("known_peers"); string[] kownPeers = config?.Deserialze<string[]>() ?? Array.Empty<string>(); //Add known peers to the monitor - _config.Config.WithInitialPeers(kownPeers.Select(static s => new Uri(s))); + Config.Config.WithInitialPeers(kownPeers.Select(static s => new Uri(s))); - plugin.Log.Information("Inital peer nodes: {nodes}", kownPeers); + HasWellKnown = kownPeers.Length > 0; //Get the peer monitor - _monitor = plugin.GetOrCreateSingleton<CachePeerMonitor>(); + Monitor = plugin.GetOrCreateSingleton<CachePeerMonitor>(); _connectedPeers = new(); //Create scoped logger - _log = plugin.Log.CreateScope(LOG_SCOPE_NAME); + Log = plugin.Log.CreateScope(LOG_SCOPE_NAME); + + Log.Information("Inital peer nodes: {nodes}", kownPeers); //Setup discovery error handler - _config.Config.WithErrorHandler(new ErrorHandler(_log)); + Config.Config.WithErrorHandler(new ErrorHandler(Log)); IsDebug = plugin.IsDebug(); } @@ -93,33 +103,29 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering //Start the change listener Task watcher = WatchForPeersAsync(exitToken); - _log.Information("Node discovery worker started, waiting for {idel} to start initial discovery", InitialDelay); + Log.Information("Node discovery worker started, waiting for {idel} to start initial discovery", InitialDelay); try { //Wait for the initial delay await Task.Delay(InitialDelay, exitToken); - _log.Debug("Begining discovery loop"); - - /* - * To avoid connecting to ourself, we add ourselves to the connected list - * and it should never get removed. This is because the monitor will never - * report our own advertisment. - */ - _connectedPeers.Add(_config.Config.Advertisment); + Log.Debug("Begining discovery loop"); while (true) { + bool wellKnownFailed = false; + try { if (IsDebug) { - _log.Debug("Begining node discovery"); + Log.Debug("Begining node discovery"); } //Resolve all known peers - CacheNodeAdvertisment[] wellKnown = await _config.Config.ResolveWellKnownAsync(exitToken); + CacheNodeAdvertisment[] wellKnown = await Config.Config.ResolveWellKnownAsync(exitToken); + wellKnownFailed = wellKnown.Length == 0; //Use the monitor to get the initial peers IEnumerable<CacheNodeAdvertisment> ads = GetMonitorAds(); @@ -130,15 +136,15 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering if (allAds.Length > 0) { //Discover all kown nodes - await _config.Config.DiscoverNodesAsync(allAds, exitToken); + await Config.Config.DiscoverNodesAsync(allAds, exitToken); } //Log the discovered nodes if verbose logging is enabled if (IsDebug) { - CacheNodeAdvertisment[] found = _config.Config.NodeCollection.GetAllNodes(); + CacheNodeAdvertisment[] found = Config.Config.NodeCollection.GetAllNodes(); - _log.Debug("Discovered {count} nodes\n\t{nodes}", found.Length, found.Select(static s => s.NodeId)); + Log.Debug("Discovered {count} nodes\n\t{nodes}", found.Length, found.Select(static s => s.NodeId)); } } catch(OperationCanceledException) @@ -147,17 +153,38 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering } catch (Exception ex) { - _log.Error(ex, "Failed to discover new peer nodes"); + Log.Error(ex, "Failed to discover new peer nodes"); } - //Delay the next discovery - await Task.Delay(_config.DiscoveryInterval, exitToken); + /* + * If we have well known nodes and the discovery failed, we wait for a shorter + * duration before retrying. This is to avoid spamming the network with requests + * if the well known nodes are down. But if we don't have any well known nodes + * we cannot continue. + * + * This only matters if we are exepcted to have well known nodes. + */ + if(HasWellKnown && wellKnownFailed) + { + if (IsDebug) + { + Log.Debug("Well known node discovery failed, waiting for {idel} before retrying", WhenWellKnownResolveFailed); + } + + //Wait for shorter duration + await Task.Delay(WhenWellKnownResolveFailed, exitToken); + } + else + { + //Delay the next discovery + await Task.Delay(Config.DiscoveryInterval, exitToken); + } } } catch (OperationCanceledException) { //Normal exit - _log.Information("Node discovery worker exiting"); + Log.Information("Node discovery worker exiting on plugin exit"); } finally { @@ -170,10 +197,10 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering private IEnumerable<CacheNodeAdvertisment> GetMonitorAds() { - return _monitor.GetAllPeers() + return Monitor.GetAllPeers() .Where(static p => p.Advertisment != null) //Without us - .Where(n => n.NodeId != _config.Config.NodeId) + .Where(n => n.NodeId != Config.Config.NodeId) .Select(static p => p.Advertisment!); } @@ -182,26 +209,26 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering { try { - _log.Debug("Discovery worker waiting for new peers to connect"); + Log.Debug("Discovery worker waiting for new peers to connect"); while (true) { //Wait for changes, then get new peers - await _monitor.WaitForChangeAsync().WaitAsync(cancellation); + await Monitor.WaitForChangeAsync().WaitAsync(cancellation); - _log.Verbose("New peers connected"); + Log.Verbose("New peers connected"); //Use the monitor to get the initial peers IEnumerable<CacheNodeAdvertisment> ads = GetMonitorAds(); - ((NodeDiscoveryCollection)_config.Config.NodeCollection).AddManualNodes(ads); + ((NodeDiscoveryCollection)Config.Config.NodeCollection).AddManualNodes(ads); } } catch (OperationCanceledException) { //Normal ext - _log.Debug("Connected peer listener exited"); + Log.Debug("Connected peer listener exited"); } } @@ -212,7 +239,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering lock (_connectedPeers) { //Get all discovered peers - CacheNodeAdvertisment[] peers = _config.Config.NodeCollection.GetAllNodes(); + CacheNodeAdvertisment[] peers = Config.Config.NodeCollection.GetAllNodes(); //Get the difference between the discovered peers and the connected peers return peers.Except(_connectedPeers).ToArray(); |