From 1a8ab1457244d15b19ddcc94958f645f5ec2abc7 Mon Sep 17 00:00:00 2001 From: vnugent Date: Thu, 22 Jun 2023 21:16:28 -0400 Subject: Save checkpoint --- .../src/Distribution/PeerDiscoveryManager.cs | 91 ++++++++++------------ 1 file changed, 42 insertions(+), 49 deletions(-) (limited to 'plugins/ObjectCacheServer/src/Distribution/PeerDiscoveryManager.cs') diff --git a/plugins/ObjectCacheServer/src/Distribution/PeerDiscoveryManager.cs b/plugins/ObjectCacheServer/src/Distribution/PeerDiscoveryManager.cs index 54e4258..26ec565 100644 --- a/plugins/ObjectCacheServer/src/Distribution/PeerDiscoveryManager.cs +++ b/plugins/ObjectCacheServer/src/Distribution/PeerDiscoveryManager.cs @@ -33,14 +33,16 @@ using VNLib.Utils.Logging; using VNLib.Data.Caching.Extensions; using VNLib.Plugins.Extensions.Loading; + namespace VNLib.Data.Caching.ObjectCache.Server.Distribution { sealed class PeerDiscoveryManager : IAsyncBackgroundWork, ICachePeerAdapter { + private readonly List _connectedPeers; private readonly NodeConfig _config; - private readonly IPeerMonitor _monitor; - private readonly INodeDiscoveryCollection _peers; + private readonly IPeerMonitor _monitor; + private readonly KnownPeerList _knownPeers; public PeerDiscoveryManager(PluginBase plugin) { @@ -50,14 +52,23 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Distribution //Get the peer monitor _monitor = plugin.GetOrCreateSingleton(); - //Get the node collection - _peers = plugin.GetOrCreateSingleton(); + //Get the known peer list + _knownPeers = plugin.GetOrCreateSingleton(); _connectedPeers = new(); + + //Setup discovery error handler + _config.Config.WithErrorHandler(new ErrorHandler(plugin.Log)); } async Task IAsyncBackgroundWork.DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken) { + /* + * This loop uses the peer monitor to keep track of all connected peers, then gets + * all the advertising peers (including known peers) and resolves all nodes across + * the network. + */ + pluginLog.Information("Node discovery worker started"); try @@ -66,7 +77,19 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Distribution { try { - await DiscoverAllNodesAsync(pluginLog, exitToken); + //Use the monitor to get the initial peers + IEnumerable ads = _monitor.GetAllPeers() + .Where(static p => p.Advertisment != null) + .Select(static p => p.Advertisment!); + + //Add known peers to the initial list + ads = ads.Union(_knownPeers.GetPeers()); + + //Set initial peers + _config.Config.WithInitialPeers(ads); + + //Discover all nodes + await _config.Config.DiscoverNodesAsync(exitToken); } catch(OperationCanceledException) { @@ -91,54 +114,15 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Distribution } } - - async Task DiscoverAllNodesAsync(ILogProvider log, CancellationToken cancellation) - { - //Use the monitor to get the initial peers - IEnumerable ads = _monitor.GetAllPeers() - .Where(static p => p.Advertisment != null) - .Select(static p => p.Advertisment!); - - //Init enumerator with initial peers - INodeDiscoveryEnumerator enumerator = _peers.BeginDiscovery(ads); - - do - { - //Load the initial peer - ICachePeerAdvertisment? peer = enumerator.GetNextPeer(); - - if (peer == null) - { - break; - } - - log.Verbose("Discovering peer nodes from {Peer}", peer.NodeId); - - //Discover nodes from this peer - ICachePeerAdvertisment[]? newNodes = await _config.Config.DiscoverClusterNodesAsync(peer, cancellation); - - //Add nodes to the enumerator - if (newNodes != null) - { - enumerator.OnPeerDiscoveryComplete(newNodes); - } - - } while (true); - - //Commit peer updates - _peers.CompleteDiscovery(enumerator); - } - - - private readonly List _connectedPeers; + /// - public ICachePeerAdvertisment[] GetNewPeers() + public ICacheNodeAdvertisment[] GetNewPeers() { lock (_connectedPeers) { //Get all discovered peers - ICachePeerAdvertisment[] peers = _peers.GetAllNodes(); + ICacheNodeAdvertisment[] peers = _config.Config.NodeCollection.GetAllNodes(); //Get the difference between the discovered peers and the connected peers return peers.Except(_connectedPeers).ToArray(); @@ -146,7 +130,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Distribution } /// - public void OnPeerListenerAttached(ICachePeerAdvertisment peer) + public void OnPeerListenerAttached(ICacheNodeAdvertisment peer) { lock (_connectedPeers) { @@ -156,7 +140,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Distribution } /// - public void OnPeerListenerDetatched(ICachePeerAdvertisment peer) + public void OnPeerListenerDetatched(ICacheNodeAdvertisment peer) { //remove from connected peers lock (_connectedPeers) @@ -164,5 +148,14 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Distribution _connectedPeers.Remove(peer); } } + + + private sealed record class ErrorHandler(ILogProvider Logger) : ICacheDiscoveryErrorHandler + { + public void OnDiscoveryError(ICacheNodeAdvertisment errorNode, Exception ex) + { + Logger.Error("Failed to discover nodes from nodeid {nid}, with error\n{err}", errorNode, ex); + } + } } } -- cgit