diff options
author | vnugent <public@vaughnnugent.com> | 2023-06-22 21:16:28 -0400 |
---|---|---|
committer | vnugent <public@vaughnnugent.com> | 2023-06-22 21:16:28 -0400 |
commit | 1a8ab1457244d15b19ddcc94958f645f5ec2abc7 (patch) | |
tree | 3994806e0737cf6f519a72cca8836c6e81eac7e2 /plugins/ObjectCacheServer/src/Distribution/PeerDiscoveryManager.cs | |
parent | dc0fc53fd3c3f6c32c8b0d063922c7018fa2c48f (diff) |
Save checkpoint
Diffstat (limited to 'plugins/ObjectCacheServer/src/Distribution/PeerDiscoveryManager.cs')
-rw-r--r-- | plugins/ObjectCacheServer/src/Distribution/PeerDiscoveryManager.cs | 91 |
1 files changed, 42 insertions, 49 deletions
diff --git a/plugins/ObjectCacheServer/src/Distribution/PeerDiscoveryManager.cs b/plugins/ObjectCacheServer/src/Distribution/PeerDiscoveryManager.cs index 54e4258..26ec565 100644 --- a/plugins/ObjectCacheServer/src/Distribution/PeerDiscoveryManager.cs +++ b/plugins/ObjectCacheServer/src/Distribution/PeerDiscoveryManager.cs @@ -33,14 +33,16 @@ using VNLib.Utils.Logging; using VNLib.Data.Caching.Extensions; using VNLib.Plugins.Extensions.Loading; + namespace VNLib.Data.Caching.ObjectCache.Server.Distribution { sealed class PeerDiscoveryManager : IAsyncBackgroundWork, ICachePeerAdapter { + private readonly List<ICacheNodeAdvertisment> _connectedPeers; private readonly NodeConfig _config; - private readonly IPeerMonitor _monitor; - private readonly INodeDiscoveryCollection _peers; + private readonly IPeerMonitor _monitor; + private readonly KnownPeerList _knownPeers; public PeerDiscoveryManager(PluginBase plugin) { @@ -50,14 +52,23 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Distribution //Get the peer monitor _monitor = plugin.GetOrCreateSingleton<CachePeerMonitor>(); - //Get the node collection - _peers = plugin.GetOrCreateSingleton<NodeDiscoveryCollection>(); + //Get the known peer list + _knownPeers = plugin.GetOrCreateSingleton<KnownPeerList>(); _connectedPeers = new(); + + //Setup discovery error handler + _config.Config.WithErrorHandler(new ErrorHandler(plugin.Log)); } async Task IAsyncBackgroundWork.DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken) { + /* + * This loop uses the peer monitor to keep track of all connected peers, then gets + * all the advertising peers (including known peers) and resolves all nodes across + * the network. + */ + pluginLog.Information("Node discovery worker started"); try @@ -66,7 +77,19 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Distribution { try { - await DiscoverAllNodesAsync(pluginLog, exitToken); + //Use the monitor to get the initial peers + IEnumerable<ICacheNodeAdvertisment> ads = _monitor.GetAllPeers() + .Where(static p => p.Advertisment != null) + .Select(static p => p.Advertisment!); + + //Add known peers to the initial list + ads = ads.Union(_knownPeers.GetPeers()); + + //Set initial peers + _config.Config.WithInitialPeers(ads); + + //Discover all nodes + await _config.Config.DiscoverNodesAsync(exitToken); } catch(OperationCanceledException) { @@ -91,54 +114,15 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Distribution } } - - async Task DiscoverAllNodesAsync(ILogProvider log, CancellationToken cancellation) - { - //Use the monitor to get the initial peers - IEnumerable<ICachePeerAdvertisment> ads = _monitor.GetAllPeers() - .Where(static p => p.Advertisment != null) - .Select(static p => p.Advertisment!); - - //Init enumerator with initial peers - INodeDiscoveryEnumerator enumerator = _peers.BeginDiscovery(ads); - - do - { - //Load the initial peer - ICachePeerAdvertisment? peer = enumerator.GetNextPeer(); - - if (peer == null) - { - break; - } - - log.Verbose("Discovering peer nodes from {Peer}", peer.NodeId); - - //Discover nodes from this peer - ICachePeerAdvertisment[]? newNodes = await _config.Config.DiscoverClusterNodesAsync(peer, cancellation); - - //Add nodes to the enumerator - if (newNodes != null) - { - enumerator.OnPeerDiscoveryComplete(newNodes); - } - - } while (true); - - //Commit peer updates - _peers.CompleteDiscovery(enumerator); - } - - - private readonly List<ICachePeerAdvertisment> _connectedPeers; + ///<inheritdoc/> - public ICachePeerAdvertisment[] GetNewPeers() + public ICacheNodeAdvertisment[] GetNewPeers() { lock (_connectedPeers) { //Get all discovered peers - ICachePeerAdvertisment[] peers = _peers.GetAllNodes(); + ICacheNodeAdvertisment[] peers = _config.Config.NodeCollection.GetAllNodes(); //Get the difference between the discovered peers and the connected peers return peers.Except(_connectedPeers).ToArray(); @@ -146,7 +130,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Distribution } ///<inheritdoc/> - public void OnPeerListenerAttached(ICachePeerAdvertisment peer) + public void OnPeerListenerAttached(ICacheNodeAdvertisment peer) { lock (_connectedPeers) { @@ -156,7 +140,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Distribution } ///<inheritdoc/> - public void OnPeerListenerDetatched(ICachePeerAdvertisment peer) + public void OnPeerListenerDetatched(ICacheNodeAdvertisment peer) { //remove from connected peers lock (_connectedPeers) @@ -164,5 +148,14 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Distribution _connectedPeers.Remove(peer); } } + + + private sealed record class ErrorHandler(ILogProvider Logger) : ICacheDiscoveryErrorHandler + { + public void OnDiscoveryError(ICacheNodeAdvertisment errorNode, Exception ex) + { + Logger.Error("Failed to discover nodes from nodeid {nid}, with error\n{err}", errorNode, ex); + } + } } } |