aboutsummaryrefslogtreecommitdiff
path: root/plugins/ObjectCacheServer/src/Distribution/PeerDiscoveryManager.cs
diff options
context:
space:
mode:
authorLibravatar vnugent <public@vaughnnugent.com>2023-06-22 21:16:28 -0400
committerLibravatar vnugent <public@vaughnnugent.com>2023-06-22 21:16:28 -0400
commit1a8ab1457244d15b19ddcc94958f645f5ec2abc7 (patch)
tree3994806e0737cf6f519a72cca8836c6e81eac7e2 /plugins/ObjectCacheServer/src/Distribution/PeerDiscoveryManager.cs
parentdc0fc53fd3c3f6c32c8b0d063922c7018fa2c48f (diff)
Save checkpoint
Diffstat (limited to 'plugins/ObjectCacheServer/src/Distribution/PeerDiscoveryManager.cs')
-rw-r--r--plugins/ObjectCacheServer/src/Distribution/PeerDiscoveryManager.cs91
1 files changed, 42 insertions, 49 deletions
diff --git a/plugins/ObjectCacheServer/src/Distribution/PeerDiscoveryManager.cs b/plugins/ObjectCacheServer/src/Distribution/PeerDiscoveryManager.cs
index 54e4258..26ec565 100644
--- a/plugins/ObjectCacheServer/src/Distribution/PeerDiscoveryManager.cs
+++ b/plugins/ObjectCacheServer/src/Distribution/PeerDiscoveryManager.cs
@@ -33,14 +33,16 @@ using VNLib.Utils.Logging;
using VNLib.Data.Caching.Extensions;
using VNLib.Plugins.Extensions.Loading;
+
namespace VNLib.Data.Caching.ObjectCache.Server.Distribution
{
sealed class PeerDiscoveryManager : IAsyncBackgroundWork, ICachePeerAdapter
{
+ private readonly List<ICacheNodeAdvertisment> _connectedPeers;
private readonly NodeConfig _config;
- private readonly IPeerMonitor _monitor;
- private readonly INodeDiscoveryCollection _peers;
+ private readonly IPeerMonitor _monitor;
+ private readonly KnownPeerList _knownPeers;
public PeerDiscoveryManager(PluginBase plugin)
{
@@ -50,14 +52,23 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Distribution
//Get the peer monitor
_monitor = plugin.GetOrCreateSingleton<CachePeerMonitor>();
- //Get the node collection
- _peers = plugin.GetOrCreateSingleton<NodeDiscoveryCollection>();
+ //Get the known peer list
+ _knownPeers = plugin.GetOrCreateSingleton<KnownPeerList>();
_connectedPeers = new();
+
+ //Setup discovery error handler
+ _config.Config.WithErrorHandler(new ErrorHandler(plugin.Log));
}
async Task IAsyncBackgroundWork.DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken)
{
+ /*
+ * This loop uses the peer monitor to keep track of all connected peers, then gets
+ * all the advertising peers (including known peers) and resolves all nodes across
+ * the network.
+ */
+
pluginLog.Information("Node discovery worker started");
try
@@ -66,7 +77,19 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Distribution
{
try
{
- await DiscoverAllNodesAsync(pluginLog, exitToken);
+ //Use the monitor to get the initial peers
+ IEnumerable<ICacheNodeAdvertisment> ads = _monitor.GetAllPeers()
+ .Where(static p => p.Advertisment != null)
+ .Select(static p => p.Advertisment!);
+
+ //Add known peers to the initial list
+ ads = ads.Union(_knownPeers.GetPeers());
+
+ //Set initial peers
+ _config.Config.WithInitialPeers(ads);
+
+ //Discover all nodes
+ await _config.Config.DiscoverNodesAsync(exitToken);
}
catch(OperationCanceledException)
{
@@ -91,54 +114,15 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Distribution
}
}
-
- async Task DiscoverAllNodesAsync(ILogProvider log, CancellationToken cancellation)
- {
- //Use the monitor to get the initial peers
- IEnumerable<ICachePeerAdvertisment> ads = _monitor.GetAllPeers()
- .Where(static p => p.Advertisment != null)
- .Select(static p => p.Advertisment!);
-
- //Init enumerator with initial peers
- INodeDiscoveryEnumerator enumerator = _peers.BeginDiscovery(ads);
-
- do
- {
- //Load the initial peer
- ICachePeerAdvertisment? peer = enumerator.GetNextPeer();
-
- if (peer == null)
- {
- break;
- }
-
- log.Verbose("Discovering peer nodes from {Peer}", peer.NodeId);
-
- //Discover nodes from this peer
- ICachePeerAdvertisment[]? newNodes = await _config.Config.DiscoverClusterNodesAsync(peer, cancellation);
-
- //Add nodes to the enumerator
- if (newNodes != null)
- {
- enumerator.OnPeerDiscoveryComplete(newNodes);
- }
-
- } while (true);
-
- //Commit peer updates
- _peers.CompleteDiscovery(enumerator);
- }
-
-
- private readonly List<ICachePeerAdvertisment> _connectedPeers;
+
///<inheritdoc/>
- public ICachePeerAdvertisment[] GetNewPeers()
+ public ICacheNodeAdvertisment[] GetNewPeers()
{
lock (_connectedPeers)
{
//Get all discovered peers
- ICachePeerAdvertisment[] peers = _peers.GetAllNodes();
+ ICacheNodeAdvertisment[] peers = _config.Config.NodeCollection.GetAllNodes();
//Get the difference between the discovered peers and the connected peers
return peers.Except(_connectedPeers).ToArray();
@@ -146,7 +130,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Distribution
}
///<inheritdoc/>
- public void OnPeerListenerAttached(ICachePeerAdvertisment peer)
+ public void OnPeerListenerAttached(ICacheNodeAdvertisment peer)
{
lock (_connectedPeers)
{
@@ -156,7 +140,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Distribution
}
///<inheritdoc/>
- public void OnPeerListenerDetatched(ICachePeerAdvertisment peer)
+ public void OnPeerListenerDetatched(ICacheNodeAdvertisment peer)
{
//remove from connected peers
lock (_connectedPeers)
@@ -164,5 +148,14 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Distribution
_connectedPeers.Remove(peer);
}
}
+
+
+ private sealed record class ErrorHandler(ILogProvider Logger) : ICacheDiscoveryErrorHandler
+ {
+ public void OnDiscoveryError(ICacheNodeAdvertisment errorNode, Exception ex)
+ {
+ Logger.Error("Failed to discover nodes from nodeid {nid}, with error\n{err}", errorNode, ex);
+ }
+ }
}
}