aboutsummaryrefslogtreecommitdiff
path: root/plugins/ObjectCacheServer/src/Clustering
diff options
context:
space:
mode:
authorLibravatar vnugent <public@vaughnnugent.com>2023-07-13 13:20:25 -0400
committerLibravatar vnugent <public@vaughnnugent.com>2023-07-13 13:20:25 -0400
commit2f674e79d42e7d36225fa9ac7ecefbc5bc62d325 (patch)
treec58999489f5391bc044e7a9bb3e557afe2860415 /plugins/ObjectCacheServer/src/Clustering
parent1a8ab1457244d15b19ddcc94958f645f5ec2abc7 (diff)
Checkpoint, kind of working clustering
Diffstat (limited to 'plugins/ObjectCacheServer/src/Clustering')
-rw-r--r--plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs290
-rw-r--r--plugins/ObjectCacheServer/src/Clustering/CachePeerMonitor.cs98
-rw-r--r--plugins/ObjectCacheServer/src/Clustering/ICachePeerAdapter.cs49
-rw-r--r--plugins/ObjectCacheServer/src/Clustering/IPeerMonitor.cs54
-rw-r--r--plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs268
5 files changed, 759 insertions, 0 deletions
diff --git a/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs b/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs
new file mode 100644
index 0000000..ffdd4f4
--- /dev/null
+++ b/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs
@@ -0,0 +1,290 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: ObjectCacheServer
+* File: CacheNodeReplicationMaanger.cs
+*
+* CacheNodeReplicationMaanger.cs is part of ObjectCacheServer which is part
+* of the larger VNLib collection of libraries and utilities.
+*
+* ObjectCacheServer is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* ObjectCacheServer is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using System.Collections.Generic;
+
+using VNLib.Plugins;
+using VNLib.Utils.Logging;
+using VNLib.Data.Caching.Extensions;
+using static VNLib.Data.Caching.Constants;
+using VNLib.Net.Messaging.FBM;
+using VNLib.Net.Messaging.FBM.Client;
+using VNLib.Plugins.Extensions.Loading;
+using VNLib.Data.Caching.Extensions.Clustering;
+
+namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
+{
+
+ /*
+ * This class is responsible for replicating the cache with other nodes.
+ *
+ * It does this by connecting to other nodes and listening for change events.
+ * When a change event occurs, it takes action against the local cache store,
+ * to keep it consistent with the other nodes.
+ *
+ * Change events are only handled first-hand, meaning that events do not
+ * propagate to other nodes, they must be connected individually to each node
+ * and listen for changes.
+ */
+
+ internal sealed class CacheNodeReplicationMaanger : IAsyncBackgroundWork
+ {
+ private const string LOG_SCOPE_NAME = "REPL";
+
+ private static readonly TimeSpan GetItemTimeout = TimeSpan.FromSeconds(10);
+ private const int MAX_MESSAGE_SIZE = 12 * 1024;
+
+ private readonly PluginBase _plugin;
+ private readonly ILogProvider _log;
+ private readonly NodeConfig _nodeConfig;
+ private readonly ICacheStore _cacheStore;
+ private readonly ICachePeerAdapter _peerAdapter;
+ private readonly FBMClientConfig _replicationClientConfig;
+
+ private readonly bool _isDebug;
+
+ private int _openConnections;
+
+ public CacheNodeReplicationMaanger(PluginBase plugin)
+ {
+ //Load the node config
+ _nodeConfig = plugin.GetOrCreateSingleton<NodeConfig>();
+ _cacheStore = plugin.GetOrCreateSingleton<CacheStore>();
+ _peerAdapter = plugin.GetOrCreateSingleton<PeerDiscoveryManager>();
+
+ //Init fbm config with fixed message size
+ _replicationClientConfig = FBMDataCacheExtensions.GetDefaultConfig(
+ (plugin as ObjectCacheServerEntry)!.CacheHeap,
+ MAX_MESSAGE_SIZE,
+ debugLog: plugin.IsDebug() ? plugin.Log : null
+ );
+
+ _plugin = plugin;
+ _isDebug = plugin.IsDebug();
+ _log = plugin.Log.CreateScope(LOG_SCOPE_NAME);
+ }
+
+ public async Task DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken)
+ {
+ _log.Information("Initializing node replication worker");
+
+ try
+ {
+ while (true)
+ {
+ //Get all new peers
+ CacheNodeAdvertisment[] peers = _peerAdapter.GetNewPeers();
+
+ if (peers.Length == 0 && _isDebug)
+ {
+ _log.Verbose("No new peers to connect to");
+ }
+
+ //Make sure we don't exceed the max connections
+ if(_openConnections >= _nodeConfig.MaxPeerConnections)
+ {
+ if (_isDebug)
+ {
+ _log.Verbose("Max peer connections reached, waiting for a connection to close");
+ }
+ }
+ else
+ {
+ //Connect to each peer as a background task
+ foreach (CacheNodeAdvertisment peer in peers)
+ {
+ _ = _plugin.ObserveWork(() => OnNewPeerDoWorkAsync(peer, _log, exitToken));
+ }
+ }
+
+ //Wait for a new peers
+ await Task.Delay(10000, exitToken);
+ }
+ }
+ catch (OperationCanceledException)
+ {
+ //Normal exit
+ }
+ catch
+ {
+ _log.Error("Node replication worker exited with an error");
+ throw;
+ }
+ finally
+ {
+
+ }
+
+ _log.Information("Node replication worker exited");
+ }
+
+ private async Task OnNewPeerDoWorkAsync(CacheNodeAdvertisment newPeer, ILogProvider log, CancellationToken exitToken)
+ {
+ _ = newPeer ?? throw new ArgumentNullException(nameof(newPeer));
+
+ //Setup client
+ FBMClient client = new(_replicationClientConfig);
+
+ //Add peer to monitor
+ _peerAdapter.OnPeerListenerAttached(newPeer);
+
+ Interlocked.Increment(ref _openConnections);
+
+ try
+ {
+ log.Information("Establishing replication connection to peer {server}...", newPeer.NodeId);
+
+ //Connect to the server
+ await client.ConnectToCacheAsync(newPeer, _nodeConfig.Config, exitToken);
+
+ log.Information("Connected to {server}, starting queue listeners", newPeer.NodeId);
+
+ //Start worker tasks
+ List<Task> workerTasks = new();
+
+ for (int i = 0; i < Environment.ProcessorCount; i++)
+ {
+ Task workerTask = Task.Run(() => ReplicationWorkerDoWorkAsync(client, log, exitToken), exitToken);
+
+ workerTasks.Add(workerTask);
+ }
+
+ //Wait for sync workers to exit
+ await Task.WhenAll(workerTasks);
+
+ log.Debug("All cache worker tasks exited successfully, disconnecting from {server}", newPeer.NodeId);
+
+ //Disconnect client gracefully
+ await client.DisconnectAsync(CancellationToken.None);
+ }
+ catch (InvalidResponseException ie)
+ {
+ //See if the plugin is unloading
+ if (!exitToken.IsCancellationRequested)
+ {
+ log.Debug("Peer {p} responded with invalid response packet, disconnected. reason\n {reason}", newPeer.NodeId, ie);
+ }
+ //Disconnect client gracefully
+ try
+ {
+ await client.DisconnectAsync(CancellationToken.None);
+ }
+ catch (Exception ex)
+ {
+ log.Error(ex);
+ }
+ }
+ catch (OperationCanceledException)
+ {
+ //Plugin unloading, Try to disconnect
+ try
+ {
+ await client.DisconnectAsync(CancellationToken.None);
+ }
+ catch (Exception ex)
+ {
+ log.Error(ex);
+ }
+ }
+ catch (Exception ex)
+ {
+ log.Warn("Lost connection to peer {h}\n {m}", newPeer.NodeId, ex);
+ }
+ finally
+ {
+ Interlocked.Decrement(ref _openConnections);
+
+ client.Dispose();
+
+ //Notify monitor of disconnect
+ _peerAdapter.OnPeerListenerDetatched(newPeer);
+ }
+ }
+
+ //Wroker task callback method
+ private async Task ReplicationWorkerDoWorkAsync(FBMClient client, ILogProvider log, CancellationToken exitToken)
+ {
+ //Reusable request message
+ using FBMRequest request = new(client.Config);
+
+ //Listen for changes
+ while (true)
+ {
+ //Wait for changes
+ WaitForChangeResult changedObject = await client.WaitForChangeAsync(exitToken);
+
+ log.Debug("Object changed {typ} {obj}", changedObject.Status, changedObject.CurrentId);
+
+ switch (changedObject.Status)
+ {
+ case ResponseCodes.NotFound:
+ log.Error("Server cache not properly configured, worker exiting");
+ return;
+ case "deleted":
+ //Delete the object from the store
+ await _cacheStore.DeleteItemAsync(changedObject.CurrentId, CancellationToken.None);
+ break;
+ case "modified":
+ //Reload the record from the store
+ await UpdateRecordAsync(client, request, log, changedObject.CurrentId, changedObject.NewId, exitToken);
+ break;
+ }
+
+ //Reset request message
+ request.Reset();
+ }
+ }
+
+ private async Task UpdateRecordAsync(FBMClient client, FBMRequest modRequest, ILogProvider log, string objectId, string newId, CancellationToken cancellation)
+ {
+ //Set action as get/create
+ modRequest.WriteHeader(HeaderCommand.Action, Actions.Get);
+ //Set session-id header
+ modRequest.WriteHeader(ObjectId, string.IsNullOrWhiteSpace(newId) ? objectId : newId);
+
+ //Make request
+ using FBMResponse response = await client.SendAsync(modRequest, GetItemTimeout, cancellation);
+
+ response.ThrowIfNotSet();
+
+ //Check response code
+ string status = response.Headers.First(static s => s.Header == HeaderCommand.Status).Value.ToString();
+
+ if (ResponseCodes.Okay.Equals(status, StringComparison.Ordinal))
+ {
+ //Update the record
+ await _cacheStore.AddOrUpdateBlobAsync(objectId, newId, static (t) => t.ResponseBody, response, cancellation);
+ log.Debug("Updated object {id}", objectId);
+ }
+ else
+ {
+ log.Warn("Object {id} was missing on the remote server", objectId);
+ }
+ }
+ }
+}
diff --git a/plugins/ObjectCacheServer/src/Clustering/CachePeerMonitor.cs b/plugins/ObjectCacheServer/src/Clustering/CachePeerMonitor.cs
new file mode 100644
index 0000000..c49a54b
--- /dev/null
+++ b/plugins/ObjectCacheServer/src/Clustering/CachePeerMonitor.cs
@@ -0,0 +1,98 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: ObjectCacheServer
+* File: CachePeerMonitor.cs
+*
+* CachePeerMonitor.cs is part of ObjectCacheServer which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* ObjectCacheServer is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* ObjectCacheServer is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using System.Collections.Generic;
+
+using VNLib.Utils;
+using VNLib.Utils.Extensions;
+using VNLib.Plugins;
+
+namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
+{
+
+ internal sealed class CachePeerMonitor : VnDisposeable, IPeerMonitor
+ {
+
+ private readonly LinkedList<ICachePeer> peers = new();
+ private readonly ManualResetEvent newPeerTrigger = new (false);
+
+ public CachePeerMonitor(PluginBase plugin)
+ { }
+
+ /// <summary>
+ /// Waits for new peers to connect to the server
+ /// </summary>
+ /// <returns>A task that complets when a new peer has connected</returns>
+ public async Task WaitForChangeAsync()
+ {
+ await newPeerTrigger.WaitAsync();
+
+ //Reset the trigger for next call
+ newPeerTrigger.Reset();
+ }
+
+ ///<inheritdoc/>
+ public IEnumerable<ICachePeer> GetAllPeers()
+ {
+ lock(peers)
+ {
+ return peers.ToArray();
+ }
+ }
+
+ ///<inheritdoc/>
+ public void OnPeerConnected(ICachePeer peer)
+ {
+ //When a peer is connected we can add it to the list so the replication manager can see it
+ lock(peers)
+ {
+ peers.AddLast(peer);
+ }
+
+ //Trigger monitor when change occurs
+ if(peer.Advertisment != null)
+ {
+ newPeerTrigger.Set();
+ }
+ }
+
+ ///<inheritdoc/>
+ public void OnPeerDisconnected(ICachePeer peer)
+ {
+ //When a peer is disconnected we can remove it from the list
+ lock(peers)
+ {
+ peers.Remove(peer);
+ }
+ }
+
+ protected override void Free()
+ {
+ newPeerTrigger.Dispose();
+ }
+ }
+}
diff --git a/plugins/ObjectCacheServer/src/Clustering/ICachePeerAdapter.cs b/plugins/ObjectCacheServer/src/Clustering/ICachePeerAdapter.cs
new file mode 100644
index 0000000..dd426f6
--- /dev/null
+++ b/plugins/ObjectCacheServer/src/Clustering/ICachePeerAdapter.cs
@@ -0,0 +1,49 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: ObjectCacheServer
+* File: ICachePeerAdapter.cs
+*
+* ICachePeerAdapter.cs is part of ObjectCacheServer which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* ObjectCacheServer is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* ObjectCacheServer is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using VNLib.Data.Caching.Extensions.Clustering;
+
+namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
+{
+ internal interface ICachePeerAdapter
+ {
+ /// <summary>
+ /// Gets the peers that have been discovered but not yet connected to
+ /// </summary>
+ /// <returns>A collection of peers that have not been connected to yet</returns>
+ CacheNodeAdvertisment[] GetNewPeers();
+
+ /// <summary>
+ /// Called when a peer has been connected to
+ /// </summary>
+ /// <param name="peer">The peer that has been connected</param>
+ void OnPeerListenerAttached(CacheNodeAdvertisment peer);
+
+ /// <summary>
+ /// Called when a peer has been disconnected from
+ /// </summary>
+ /// <param name="peer">The disconnected peer</param>
+ void OnPeerListenerDetatched(CacheNodeAdvertisment peer);
+ }
+}
diff --git a/plugins/ObjectCacheServer/src/Clustering/IPeerMonitor.cs b/plugins/ObjectCacheServer/src/Clustering/IPeerMonitor.cs
new file mode 100644
index 0000000..d8f358f
--- /dev/null
+++ b/plugins/ObjectCacheServer/src/Clustering/IPeerMonitor.cs
@@ -0,0 +1,54 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: ObjectCacheServer
+* File: IPeerMonitor.cs
+*
+* IPeerMonitor.cs is part of ObjectCacheServer which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* ObjectCacheServer is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* ObjectCacheServer is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System.Collections.Generic;
+
+namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
+{
+ /// <summary>
+ /// Represents a monitor for peer cache servers to advertise their presence
+ /// in the cluster
+ /// </summary>
+ internal interface IPeerMonitor
+ {
+ /// <summary>
+ /// Notifies the monitor that a remote peer has connected to the current node for
+ /// replication
+ /// </summary>
+ /// <param name="peer">The peer that connected</param>
+ void OnPeerConnected(ICachePeer peer);
+
+ /// <summary>
+ /// Notifies the monitor that a peer has disconnected
+ /// </summary>
+ /// <param name="peer">The peer that has disconnected</param>
+ void OnPeerDisconnected(ICachePeer peer);
+
+ /// <summary>
+ /// Gets an enumerable of all peers currently connected to this node
+ /// </summary>
+ /// <returns>The collection of all connected peers</returns>
+ IEnumerable<ICachePeer> GetAllPeers();
+ }
+}
diff --git a/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs b/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs
new file mode 100644
index 0000000..f132cab
--- /dev/null
+++ b/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs
@@ -0,0 +1,268 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: ObjectCacheServer
+* File: PeerDiscoveryManager.cs
+*
+* PeerDiscoveryManager.cs is part of ObjectCacheServer which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* ObjectCacheServer is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* ObjectCacheServer is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+using System.Linq;
+using System.Net.Http;
+using System.Threading;
+using System.Net.Sockets;
+using System.Threading.Tasks;
+using System.Collections.Generic;
+
+using VNLib.Utils.Logging;
+using VNLib.Data.Caching.Extensions;
+using VNLib.Plugins;
+using VNLib.Plugins.Extensions.Loading;
+using VNLib.Data.Caching.Extensions.Clustering;
+
+namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
+{
+
+ /*
+ * This class is responsible for resolving and discovering peer nodes in the cluster network.
+ */
+
+ internal sealed class PeerDiscoveryManager : IAsyncBackgroundWork, ICachePeerAdapter
+ {
+ private const string LOG_SCOPE_NAME = "DISC";
+ private static readonly TimeSpan InitialDelay = TimeSpan.FromSeconds(15);
+
+ private readonly List<CacheNodeAdvertisment> _connectedPeers;
+ private readonly NodeConfig _config;
+ private readonly CachePeerMonitor _monitor;
+ private readonly bool IsDebug;
+ private readonly ILogProvider _log;
+
+ public PeerDiscoveryManager(PluginBase plugin)
+ {
+ //Get config
+ _config = plugin.GetOrCreateSingleton<NodeConfig>();
+
+ //Get the known peers array from config, its allowed to be null for master nodes
+ IConfigScope? config = plugin.TryGetConfig("known_peers");
+ string[] kownPeers = config?.Deserialze<string[]>() ?? Array.Empty<string>();
+
+ //Add known peers to the monitor
+ _config.Config.WithInitialPeers(kownPeers.Select(static s => new Uri(s)));
+
+ plugin.Log.Information("Inital peer nodes: {nodes}", kownPeers);
+
+ //Get the peer monitor
+ _monitor = plugin.GetOrCreateSingleton<CachePeerMonitor>();
+
+ _connectedPeers = new();
+
+ //Create scoped logger
+ _log = plugin.Log.CreateScope(LOG_SCOPE_NAME);
+
+ //Setup discovery error handler
+ _config.Config.WithErrorHandler(new ErrorHandler(_log));
+
+ IsDebug = plugin.IsDebug();
+ }
+
+ async Task IAsyncBackgroundWork.DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken)
+ {
+ /*
+ * This loop uses the peer monitor to keep track of all connected peers, then gets
+ * all the advertising peers (including known peers) and resolves all nodes across
+ * the network.
+ */
+
+ //Start the change listener
+ Task watcher = WatchForPeersAsync(exitToken);
+
+ _log.Information("Node discovery worker started, waiting for {idel} to start initial discovery", InitialDelay);
+
+ try
+ {
+ //Wait for the initial delay
+ await Task.Delay(InitialDelay, exitToken);
+
+ _log.Debug("Begining discovery loop");
+
+ /*
+ * To avoid connecting to ourself, we add ourselves to the connected list
+ * and it should never get removed. This is because the monitor will never
+ * report our own advertisment.
+ */
+ _connectedPeers.Add(_config.Config.Advertisment);
+
+ while (true)
+ {
+ try
+ {
+ if (IsDebug)
+ {
+ _log.Debug("Begining node discovery");
+ }
+
+ //Resolve all known peers
+ CacheNodeAdvertisment[] wellKnown = await _config.Config.ResolveWellKnownAsync(exitToken);
+
+ //Use the monitor to get the initial peers
+ IEnumerable<CacheNodeAdvertisment> ads = GetMonitorAds();
+
+ //Combine well-known with new connected peers
+ CacheNodeAdvertisment[] allAds = ads.Union(wellKnown).ToArray();
+
+ if (allAds.Length > 0)
+ {
+ //Discover all kown nodes
+ await _config.Config.DiscoverNodesAsync(allAds, exitToken);
+ }
+
+ //Log the discovered nodes if verbose logging is enabled
+ if (IsDebug)
+ {
+ CacheNodeAdvertisment[] found = _config.Config.NodeCollection.GetAllNodes();
+
+ _log.Debug("Discovered {count} nodes\n\t{nodes}", found.Length, found.Select(static s => s.NodeId));
+ }
+ }
+ catch(OperationCanceledException)
+ {
+ throw;
+ }
+ catch (Exception ex)
+ {
+ _log.Error(ex, "Failed to discover new peer nodes");
+ }
+
+ //Delay the next discovery
+ await Task.Delay(_config.DiscoveryInterval, exitToken);
+ }
+ }
+ catch (OperationCanceledException)
+ {
+ //Normal exit
+ _log.Information("Node discovery worker exiting");
+ }
+ finally
+ {
+
+ }
+
+ //Wait for the watcher to exit
+ await watcher.ConfigureAwait(false);
+ }
+
+ private IEnumerable<CacheNodeAdvertisment> GetMonitorAds()
+ {
+ return _monitor.GetAllPeers()
+ .Where(static p => p.Advertisment != null)
+ //Without us
+ .Where(n => n.NodeId != _config.Config.NodeId)
+ .Select(static p => p.Advertisment!);
+ }
+
+ //Wait for new peers and update the collection
+ private async Task WatchForPeersAsync(CancellationToken cancellation)
+ {
+ try
+ {
+ _log.Debug("Discovery worker waiting for new peers to connect");
+
+ while (true)
+ {
+
+ //Wait for changes, then get new peers
+ await _monitor.WaitForChangeAsync().WaitAsync(cancellation);
+
+ _log.Verbose("New peers connected");
+
+ //Use the monitor to get the initial peers
+ IEnumerable<CacheNodeAdvertisment> ads = GetMonitorAds();
+
+ ((NodeDiscoveryCollection)_config.Config.NodeCollection).AddManualNodes(ads);
+ }
+ }
+ catch (OperationCanceledException)
+ {
+ //Normal ext
+ _log.Debug("Connected peer listener exited");
+ }
+ }
+
+
+ ///<inheritdoc/>
+ public CacheNodeAdvertisment[] GetNewPeers()
+ {
+ lock (_connectedPeers)
+ {
+ //Get all discovered peers
+ CacheNodeAdvertisment[] peers = _config.Config.NodeCollection.GetAllNodes();
+
+ //Get the difference between the discovered peers and the connected peers
+ return peers.Except(_connectedPeers).ToArray();
+ }
+ }
+
+ ///<inheritdoc/>
+ public void OnPeerListenerAttached(CacheNodeAdvertisment peer)
+ {
+ lock (_connectedPeers)
+ {
+ //Add to connected peers
+ _connectedPeers.Add(peer);
+ }
+ }
+
+ ///<inheritdoc/>
+ public void OnPeerListenerDetatched(CacheNodeAdvertisment peer)
+ {
+ //remove from connected peers
+ lock (_connectedPeers)
+ {
+ _connectedPeers.Remove(peer);
+ }
+ }
+
+
+ private sealed record class ErrorHandler(ILogProvider Logger) : ICacheDiscoveryErrorHandler
+ {
+ public void OnDiscoveryError(CacheNodeAdvertisment errorNode, Exception ex)
+ {
+
+ if (ex is HttpRequestException hre)
+ {
+ if (hre.InnerException is SocketException se)
+ {
+ //traisnport failed
+ Logger.Warn("Failed to connect to server {serv} because {err}", errorNode, se.Message);
+ }
+ else
+ {
+ Logger.Error("Failed to connect to node {n}\n{err}", errorNode, hre);
+ }
+ }
+ else
+ {
+ Logger.Error("Failed to discover nodes from nodeid {nid}, with error\n{err}", errorNode, ex);
+ }
+
+ }
+ }
+ }
+}