aboutsummaryrefslogtreecommitdiff
path: root/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs')
-rw-r--r--plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs130
1 files changed, 36 insertions, 94 deletions
diff --git a/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs b/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs
index 6475f9c..b9a220d 100644
--- a/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs
+++ b/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2023 Vaughn Nugent
+* Copyright (c) 2024 Vaughn Nugent
*
* Library: VNLib
* Package: ObjectCacheServer
@@ -24,14 +24,11 @@
using System;
using System.Linq;
-using System.Net.Http;
using System.Threading;
-using System.Net.Sockets;
using System.Threading.Tasks;
using System.Collections.Generic;
using VNLib.Utils.Logging;
-using VNLib.Plugins;
using VNLib.Plugins.Extensions.Loading;
using VNLib.Data.Caching.Extensions;
using VNLib.Data.Caching.Extensions.Clustering;
@@ -43,54 +40,19 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
* This class is responsible for resolving and discovering peer nodes in the cluster network.
*/
- internal sealed class PeerDiscoveryManager : IAsyncBackgroundWork, ICachePeerAdapter
+ internal sealed class PeerDiscoveryManager(
+ CacheNodeConfiguration config,
+ ServerClusterConfig clusterConf,
+ CachePeerMonitor Monitor,
+ ILogProvider Log,
+ bool IsDebug,
+ bool HasWellKnown
+ )
+ : IAsyncBackgroundWork, ICachePeerAdapter
{
- private const string LOG_SCOPE_NAME = "DISC";
- /*
- * The initial discovery delay. This allows for the server to initialize before
- * starting the discovery process. This will probably be a shorter delay
- * than a usual discovery interval.
- */
- private static readonly TimeSpan InitialDelay = TimeSpan.FromSeconds(15);
- private static readonly TimeSpan WhenWellKnownResolveFailed = TimeSpan.FromSeconds(20);
-
-
- private readonly List<CacheNodeAdvertisment> _connectedPeers;
- private readonly NodeConfig Config;
- private readonly CachePeerMonitor Monitor;
- private readonly ILogProvider Log;
- private readonly bool IsDebug;
- private readonly bool HasWellKnown;
-
- public PeerDiscoveryManager(PluginBase plugin)
- {
- //Get config
- Config = plugin.GetOrCreateSingleton<NodeConfig>();
-
- //Get the known peers array from config, its allowed to be null for master nodes
- IConfigScope? config = plugin.TryGetConfig("known_peers");
- string[] kownPeers = config?.Deserialze<string[]>() ?? Array.Empty<string>();
-
- //Add known peers to the monitor
- Config.Config.WithInitialPeers(kownPeers.Select(static s => new Uri(s)));
-
- HasWellKnown = kownPeers.Length > 0;
-
- //Get the peer monitor
- Monitor = plugin.GetOrCreateSingleton<CachePeerMonitor>();
-
- _connectedPeers = new();
-
- //Create scoped logger
- Log = plugin.Log.CreateScope(LOG_SCOPE_NAME);
-
- Log.Information("Inital peer nodes: {nodes}", kownPeers);
- //Setup discovery error handler
- Config.Config.WithErrorHandler(new ErrorHandler(Log));
-
- IsDebug = plugin.IsDebug();
- }
+ private readonly List<CacheNodeAdvertisment> _connectedPeers = [];
+ private readonly VNCacheClusterManager clusterMan = new(config);
async Task IAsyncBackgroundWork.DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken)
{
@@ -103,12 +65,11 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
//Start the change listener
Task watcher = WatchForPeersAsync(exitToken);
- Log.Information("Node discovery worker started, waiting for {idel} to start initial discovery", InitialDelay);
+ Log.Information("Node discovery worker started, waiting for {idel} to start initial discovery", CacheConstants.Delays.InitialDiscovery);
try
- {
- //Wait for the initial delay
- await Task.Delay(InitialDelay, exitToken);
+ {
+ await Task.Delay(CacheConstants.Delays.InitialDiscovery, exitToken);
Log.Debug("Begining discovery loop");
@@ -123,26 +84,32 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
Log.Debug("Begining node discovery");
}
- //Resolve all known peers
- CacheNodeAdvertisment[] wellKnown = await Config.Config.ResolveWellKnownAsync(exitToken);
- wellKnownFailed = wellKnown.Length == 0;
+ /*
+ * On every loop we will need to resolve well-known servers incase they go down
+ * or change. There probably should be some more advanced logic and caching here.
+ *
+ * Node may not have any well-known nodes, so we need to check for that.
+ */
+ CacheNodeAdvertisment[] wellKnown = HasWellKnown ?
+ await clusterMan.ResolveWellKnownAsync(exitToken) :
+ Array.Empty<CacheNodeAdvertisment>();
//Use the monitor to get the initial peers
IEnumerable<CacheNodeAdvertisment> ads = GetMonitorAds();
- //Combine well-known with new connected peers
+ //Combine well-known peers that are currently connected to this server
CacheNodeAdvertisment[] allAds = ads.Union(wellKnown).ToArray();
if (allAds.Length > 0)
{
- //Discover all known nodes
- await Config.Config.DiscoverNodesAsync(allAds, exitToken);
+ //Build the discovery map from all the known nodes to find all known nodes in the entire cluster
+ await clusterMan.DiscoverNodesAsync(allAds, exitToken);
}
//Log the discovered nodes if verbose logging is enabled
if (IsDebug)
{
- CacheNodeAdvertisment[] found = Config.Config.NodeCollection.GetAllNodes();
+ CacheNodeAdvertisment[] found = clusterMan.DiscoveredNodes.GetAllNodes();
Log.Debug("Discovered {count} nodes\n\t{nodes}", found.Length, found.Select(static s => s.NodeId));
}
@@ -168,16 +135,16 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
{
if (IsDebug)
{
- Log.Debug("Well known node discovery failed, waiting for {idel} before retrying", WhenWellKnownResolveFailed);
+ Log.Debug("Well known node discovery failed, waiting for {idel} before retrying", CacheConstants.Delays.WellKnownResolveFailed);
}
//Wait for shorter duration
- await Task.Delay(WhenWellKnownResolveFailed, exitToken);
+ await Task.Delay(CacheConstants.Delays.WellKnownResolveFailed, exitToken);
}
else
{
//Delay the next discovery
- await Task.Delay(Config.DiscoveryInterval, exitToken);
+ await Task.Delay(clusterConf.DiscoveryInterval, exitToken);
}
}
}
@@ -188,7 +155,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
}
finally
{
-
+ Monitor.Dispose();
}
//Wait for the watcher to exit
@@ -197,10 +164,11 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
private IEnumerable<CacheNodeAdvertisment> GetMonitorAds()
{
+ string selfId = (clusterMan.Config as CacheNodeConfiguration)!.NodeId;
return Monitor.GetAllPeers()
.Where(static p => p.Advertisment != null)
//Without us
- .Where(n => n.NodeId != Config.Config.NodeId)
+ .Where(n => !string.Equals(n.NodeId, selfId, StringComparison.OrdinalIgnoreCase))
.Select(static p => p.Advertisment!);
}
@@ -222,7 +190,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
//Use the monitor to get the initial peers
IEnumerable<CacheNodeAdvertisment> ads = GetMonitorAds();
- ((NodeDiscoveryCollection)Config.Config.NodeCollection).AddManualNodes(ads);
+ clusterMan.AddManualNodes(ads);
}
}
catch (OperationCanceledException)
@@ -239,7 +207,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
lock (_connectedPeers)
{
//Get all discovered peers
- CacheNodeAdvertisment[] peers = Config.Config.NodeCollection.GetAllNodes();
+ CacheNodeAdvertisment[] peers = clusterMan.DiscoveredNodes.GetAllNodes();
//Get the difference between the discovered peers and the connected peers
return peers.Except(_connectedPeers).ToArray();
@@ -265,31 +233,5 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
_connectedPeers.Remove(peer);
}
}
-
-
- private sealed record class ErrorHandler(ILogProvider Logger) : ICacheDiscoveryErrorHandler
- {
- public void OnDiscoveryError(CacheNodeAdvertisment errorNode, Exception ex)
- {
-
- if (ex is HttpRequestException hre)
- {
- if (hre.InnerException is SocketException se)
- {
- //traisnport failed
- Logger.Warn("Failed to connect to server {serv} because {err}", errorNode, se.Message);
- }
- else
- {
- Logger.Error("Failed to connect to node {n}\n{err}", errorNode, hre);
- }
- }
- else
- {
- Logger.Error("Failed to discover nodes from nodeid {nid}, with error\n{err}", errorNode, ex);
- }
-
- }
- }
}
}