aboutsummaryrefslogtreecommitdiff
path: root/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs
diff options
context:
space:
mode:
authorLibravatar vnugent <public@vaughnnugent.com>2023-07-15 13:06:00 -0400
committerLibravatar vnugent <public@vaughnnugent.com>2023-07-15 13:06:00 -0400
commit8b4fb26473256da5eaa89f3e9d2ac5d44f1e9b88 (patch)
tree6ff979b6110b9e6c61ff9f22bb0dbdd2094e08cf /plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs
parent2f674e79d42e7d36225fa9ac7ecefbc5bc62d325 (diff)
Latest working draft
Diffstat (limited to 'plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs')
-rw-r--r--plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs99
1 files changed, 63 insertions, 36 deletions
diff --git a/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs b/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs
index f132cab..65cc009 100644
--- a/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs
+++ b/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs
@@ -31,9 +31,9 @@ using System.Threading.Tasks;
using System.Collections.Generic;
using VNLib.Utils.Logging;
-using VNLib.Data.Caching.Extensions;
using VNLib.Plugins;
using VNLib.Plugins.Extensions.Loading;
+using VNLib.Data.Caching.Extensions;
using VNLib.Data.Caching.Extensions.Clustering;
namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
@@ -46,38 +46,48 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
internal sealed class PeerDiscoveryManager : IAsyncBackgroundWork, ICachePeerAdapter
{
private const string LOG_SCOPE_NAME = "DISC";
+ /*
+ * The initial discovery delay. This allows for the server to initialize before
+ * starting the discovery process. This will probably be a shorter delay
+ * than a usual discovery interval.
+ */
private static readonly TimeSpan InitialDelay = TimeSpan.FromSeconds(15);
+ private static readonly TimeSpan WhenWellKnownResolveFailed = TimeSpan.FromSeconds(20);
+
private readonly List<CacheNodeAdvertisment> _connectedPeers;
- private readonly NodeConfig _config;
- private readonly CachePeerMonitor _monitor;
+ private readonly NodeConfig Config;
+ private readonly CachePeerMonitor Monitor;
+ private readonly ILogProvider Log;
private readonly bool IsDebug;
- private readonly ILogProvider _log;
+ private readonly bool HasWellKnown;
public PeerDiscoveryManager(PluginBase plugin)
{
//Get config
- _config = plugin.GetOrCreateSingleton<NodeConfig>();
+ Config = plugin.GetOrCreateSingleton<NodeConfig>();
//Get the known peers array from config, its allowed to be null for master nodes
IConfigScope? config = plugin.TryGetConfig("known_peers");
string[] kownPeers = config?.Deserialze<string[]>() ?? Array.Empty<string>();
//Add known peers to the monitor
- _config.Config.WithInitialPeers(kownPeers.Select(static s => new Uri(s)));
+ Config.Config.WithInitialPeers(kownPeers.Select(static s => new Uri(s)));
- plugin.Log.Information("Inital peer nodes: {nodes}", kownPeers);
+ HasWellKnown = kownPeers.Length > 0;
//Get the peer monitor
- _monitor = plugin.GetOrCreateSingleton<CachePeerMonitor>();
+ Monitor = plugin.GetOrCreateSingleton<CachePeerMonitor>();
_connectedPeers = new();
//Create scoped logger
- _log = plugin.Log.CreateScope(LOG_SCOPE_NAME);
+ Log = plugin.Log.CreateScope(LOG_SCOPE_NAME);
+
+ Log.Information("Inital peer nodes: {nodes}", kownPeers);
//Setup discovery error handler
- _config.Config.WithErrorHandler(new ErrorHandler(_log));
+ Config.Config.WithErrorHandler(new ErrorHandler(Log));
IsDebug = plugin.IsDebug();
}
@@ -93,33 +103,29 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
//Start the change listener
Task watcher = WatchForPeersAsync(exitToken);
- _log.Information("Node discovery worker started, waiting for {idel} to start initial discovery", InitialDelay);
+ Log.Information("Node discovery worker started, waiting for {idel} to start initial discovery", InitialDelay);
try
{
//Wait for the initial delay
await Task.Delay(InitialDelay, exitToken);
- _log.Debug("Begining discovery loop");
-
- /*
- * To avoid connecting to ourself, we add ourselves to the connected list
- * and it should never get removed. This is because the monitor will never
- * report our own advertisment.
- */
- _connectedPeers.Add(_config.Config.Advertisment);
+ Log.Debug("Begining discovery loop");
while (true)
{
+ bool wellKnownFailed = false;
+
try
{
if (IsDebug)
{
- _log.Debug("Begining node discovery");
+ Log.Debug("Begining node discovery");
}
//Resolve all known peers
- CacheNodeAdvertisment[] wellKnown = await _config.Config.ResolveWellKnownAsync(exitToken);
+ CacheNodeAdvertisment[] wellKnown = await Config.Config.ResolveWellKnownAsync(exitToken);
+ wellKnownFailed = wellKnown.Length == 0;
//Use the monitor to get the initial peers
IEnumerable<CacheNodeAdvertisment> ads = GetMonitorAds();
@@ -130,15 +136,15 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
if (allAds.Length > 0)
{
//Discover all kown nodes
- await _config.Config.DiscoverNodesAsync(allAds, exitToken);
+ await Config.Config.DiscoverNodesAsync(allAds, exitToken);
}
//Log the discovered nodes if verbose logging is enabled
if (IsDebug)
{
- CacheNodeAdvertisment[] found = _config.Config.NodeCollection.GetAllNodes();
+ CacheNodeAdvertisment[] found = Config.Config.NodeCollection.GetAllNodes();
- _log.Debug("Discovered {count} nodes\n\t{nodes}", found.Length, found.Select(static s => s.NodeId));
+ Log.Debug("Discovered {count} nodes\n\t{nodes}", found.Length, found.Select(static s => s.NodeId));
}
}
catch(OperationCanceledException)
@@ -147,17 +153,38 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
}
catch (Exception ex)
{
- _log.Error(ex, "Failed to discover new peer nodes");
+ Log.Error(ex, "Failed to discover new peer nodes");
}
- //Delay the next discovery
- await Task.Delay(_config.DiscoveryInterval, exitToken);
+ /*
+ * If we have well known nodes and the discovery failed, we wait for a shorter
+ * duration before retrying. This is to avoid spamming the network with requests
+ * if the well known nodes are down. But if we don't have any well known nodes
+ * we cannot continue.
+ *
+ * This only matters if we are exepcted to have well known nodes.
+ */
+ if(HasWellKnown && wellKnownFailed)
+ {
+ if (IsDebug)
+ {
+ Log.Debug("Well known node discovery failed, waiting for {idel} before retrying", WhenWellKnownResolveFailed);
+ }
+
+ //Wait for shorter duration
+ await Task.Delay(WhenWellKnownResolveFailed, exitToken);
+ }
+ else
+ {
+ //Delay the next discovery
+ await Task.Delay(Config.DiscoveryInterval, exitToken);
+ }
}
}
catch (OperationCanceledException)
{
//Normal exit
- _log.Information("Node discovery worker exiting");
+ Log.Information("Node discovery worker exiting on plugin exit");
}
finally
{
@@ -170,10 +197,10 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
private IEnumerable<CacheNodeAdvertisment> GetMonitorAds()
{
- return _monitor.GetAllPeers()
+ return Monitor.GetAllPeers()
.Where(static p => p.Advertisment != null)
//Without us
- .Where(n => n.NodeId != _config.Config.NodeId)
+ .Where(n => n.NodeId != Config.Config.NodeId)
.Select(static p => p.Advertisment!);
}
@@ -182,26 +209,26 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
{
try
{
- _log.Debug("Discovery worker waiting for new peers to connect");
+ Log.Debug("Discovery worker waiting for new peers to connect");
while (true)
{
//Wait for changes, then get new peers
- await _monitor.WaitForChangeAsync().WaitAsync(cancellation);
+ await Monitor.WaitForChangeAsync().WaitAsync(cancellation);
- _log.Verbose("New peers connected");
+ Log.Verbose("New peers connected");
//Use the monitor to get the initial peers
IEnumerable<CacheNodeAdvertisment> ads = GetMonitorAds();
- ((NodeDiscoveryCollection)_config.Config.NodeCollection).AddManualNodes(ads);
+ ((NodeDiscoveryCollection)Config.Config.NodeCollection).AddManualNodes(ads);
}
}
catch (OperationCanceledException)
{
//Normal ext
- _log.Debug("Connected peer listener exited");
+ Log.Debug("Connected peer listener exited");
}
}
@@ -212,7 +239,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
lock (_connectedPeers)
{
//Get all discovered peers
- CacheNodeAdvertisment[] peers = _config.Config.NodeCollection.GetAllNodes();
+ CacheNodeAdvertisment[] peers = Config.Config.NodeCollection.GetAllNodes();
//Get the difference between the discovered peers and the connected peers
return peers.Except(_connectedPeers).ToArray();