diff options
Diffstat (limited to 'plugins/ObjectCacheServer/src/Distribution')
8 files changed, 773 insertions, 0 deletions
diff --git a/plugins/ObjectCacheServer/src/Distribution/CacheNodeReplicationMaanger.cs b/plugins/ObjectCacheServer/src/Distribution/CacheNodeReplicationMaanger.cs new file mode 100644 index 0000000..b453dcc --- /dev/null +++ b/plugins/ObjectCacheServer/src/Distribution/CacheNodeReplicationMaanger.cs @@ -0,0 +1,246 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: ObjectCacheServer +* File: ObjectCacheServerEntry.cs +* +* ObjectCacheServerEntry.cs is part of ObjectCacheServer which is part of the larger +* VNLib collection of libraries and utilities. +* +* ObjectCacheServer is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* ObjectCacheServer is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using System.Collections.Generic; + +using VNLib.Plugins; +using VNLib.Utils.Logging; +using VNLib.Data.Caching.Extensions; +using static VNLib.Data.Caching.Constants; +using VNLib.Net.Messaging.FBM; +using VNLib.Net.Messaging.FBM.Client; +using VNLib.Plugins.Extensions.Loading; + +namespace VNLib.Data.Caching.ObjectCache.Server.Distribution +{ + internal sealed class CacheNodeReplicationMaanger : IAsyncBackgroundWork + { + private static readonly TimeSpan GetItemTimeout = TimeSpan.FromSeconds(10); + + private readonly NodeConfig NodeConfig; + private readonly ICachePeerAdapter PeerAdapter; + private readonly ICacheStore CacheStore; + private readonly FBMClientConfig ClientConfig; + private readonly PluginBase Plugin; + + private CacheNodeConfiguration CacheConfig => NodeConfig.Config; + + public CacheNodeReplicationMaanger(PluginBase plugin) + { + //Load the node config + NodeConfig = plugin.GetOrCreateSingleton<NodeConfig>(); + + //Get peer adapter + PeerAdapter = plugin.GetOrCreateSingleton<PeerDiscoveryManager>(); + + + } + + public async Task DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken) + { + pluginLog.Information("[REPL] Initializing node replication worker"); + + try + { + while (true) + { + //Get all new peers + ICachePeerAdvertisment[] peers = PeerAdapter.GetNewPeers(); + + if (peers.Length == 0) + { + pluginLog.Verbose("[REPL] No new peers to connect to"); + } + + //Connect to each peer + foreach (ICachePeerAdvertisment peer in peers) + { + _ = Plugin.ObserveWork(() => OnNewPeerDoWorkAsync(peer, pluginLog, exitToken)); + } + + //Wait for a new peers + await Task.Delay(10000, exitToken); + } + } + catch (OperationCanceledException) + { + //Normal exit + } + catch + { + pluginLog.Error("[REPL] Node replication worker exited with an error"); + throw; + } + finally + { + + } + + pluginLog.Information("[REPL] Node replication worker exited"); + } + + private async Task OnNewPeerDoWorkAsync(ICachePeerAdvertisment newPeer, ILogProvider log, CancellationToken exitToken) + { + _ = newPeer ?? throw new ArgumentNullException(nameof(newPeer)); + + //Setup client + FBMClient client = new(ClientConfig); + + try + { + log.Information("Establishing replication connection to peer {server}...", newPeer.NodeId); + + //Connect to the server + await client.ConnectToCacheAsync(newPeer, CacheConfig, exitToken); + + log.Information("Connected to {server}, starting queue listeners", newPeer.NodeId); + + //Start worker tasks + List<Task> workerTasks = new(); + + for (int i = 0; i < Environment.ProcessorCount; i++) + { + Task workerTask = Task.Run(() => ReplicationWorkerDoWorkAsync(client, log, exitToken), exitToken); + + workerTasks.Add(workerTask); + } + + //Wait for sync workers to exit + await Task.WhenAll(workerTasks); + + log.Debug("All cache worker tasks exited successfully, disconnecting from {server}", newPeer.NodeId); + + //Disconnect client gracefully + await client.DisconnectAsync(CancellationToken.None); + } + catch (InvalidResponseException ie) + { + //See if the plugin is unloading + if (!exitToken.IsCancellationRequested) + { + log.Debug("Peer {p} responded with invalid response packet, disconnected. reason\n {reason}", newPeer.NodeId, ie); + } + //Disconnect client gracefully + try + { + await client.DisconnectAsync(CancellationToken.None); + } + catch (Exception ex) + { + log.Error(ex); + } + } + catch (OperationCanceledException) + { + //Plugin unloading, Try to disconnect + try + { + await client.DisconnectAsync(CancellationToken.None); + } + catch (Exception ex) + { + log.Error(ex); + } + } + catch (Exception ex) + { + log.Warn("Lost connection to peer {h}\n {m}", newPeer.NodeId, ex); + } + finally + { + client.Dispose(); + + //Notify monitor of disconnect + PeerAdapter.OnPeerListenerDetatched(newPeer); + } + } + + //Wroker task callback method + private async Task ReplicationWorkerDoWorkAsync(FBMClient client, ILogProvider log, CancellationToken exitToken) + { + //Listen for changes + while (true) + { + //Wait for changes + WaitForChangeResult changedObject = await client.WaitForChangeAsync(exitToken); + + log.Debug("Object changed {typ} {obj}", changedObject.Status, changedObject.CurrentId); + + switch (changedObject.Status) + { + case ResponseCodes.NotFound: + log.Warn("Server cache not properly configured, worker exiting"); + return; + case "deleted": + //Delete the object from the store + await CacheStore.DeleteItemAsync(changedObject.CurrentId); + break; + case "modified": + //Reload the record from the store + await UpdateRecordAsync(client, log, changedObject.CurrentId, changedObject.NewId, exitToken); + break; + } + } + } + + private async Task UpdateRecordAsync(FBMClient client, ILogProvider log, string objectId, string newId, CancellationToken cancellation) + { + //Get request message + FBMRequest modRequest = client.RentRequest(); + try + { + //Set action as get/create + modRequest.WriteHeader(HeaderCommand.Action, Actions.Get); + //Set session-id header + modRequest.WriteHeader(ObjectId, string.IsNullOrWhiteSpace(newId) ? objectId : newId); + + //Make request + using FBMResponse response = await client.SendAsync(modRequest, GetItemTimeout, cancellation); + + response.ThrowIfNotSet(); + + //Check response code + string status = response.Headers.First(static s => s.Header == HeaderCommand.Status).Value.ToString(); + if (ResponseCodes.Okay.Equals(status, StringComparison.Ordinal)) + { + //Update the record + await CacheStore.AddOrUpdateBlobAsync(objectId, newId, static (t) => t.ResponseBody, response, cancellation); + log.Debug("Updated object {id}", objectId); + } + else + { + log.Warn("Object {id} was missing on the remote server", objectId); + } + } + finally + { + client.ReturnRequest(modRequest); + } + } + } +} diff --git a/plugins/ObjectCacheServer/src/Distribution/CachePeerMonitor.cs b/plugins/ObjectCacheServer/src/Distribution/CachePeerMonitor.cs new file mode 100644 index 0000000..82b280c --- /dev/null +++ b/plugins/ObjectCacheServer/src/Distribution/CachePeerMonitor.cs @@ -0,0 +1,55 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: ObjectCacheServer +* File: CachePeerMonitor.cs +* +* CachePeerMonitor.cs is part of ObjectCacheServer which is part of the larger +* VNLib collection of libraries and utilities. +* +* ObjectCacheServer is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* ObjectCacheServer is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Collections.Generic; + +using VNLib.Plugins; + +namespace VNLib.Data.Caching.ObjectCache.Server.Distribution +{ + + internal sealed class CachePeerMonitor : IPeerMonitor + { + public CachePeerMonitor(PluginBase plugin) + { + + } + + public IEnumerable<ICachePeer> GetAllPeers() + { + throw new NotImplementedException(); + } + + public void OnPeerConnected(ICachePeer peer) + { + throw new NotImplementedException(); + } + + public void OnPeerDisconnected(ICachePeer peer) + { + throw new NotImplementedException(); + } + } +} diff --git a/plugins/ObjectCacheServer/src/Distribution/ICachePeerAdapter.cs b/plugins/ObjectCacheServer/src/Distribution/ICachePeerAdapter.cs new file mode 100644 index 0000000..d029f10 --- /dev/null +++ b/plugins/ObjectCacheServer/src/Distribution/ICachePeerAdapter.cs @@ -0,0 +1,49 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: ObjectCacheServer +* File: ObjectCacheServerEntry.cs +* +* ObjectCacheServerEntry.cs is part of ObjectCacheServer which is part of the larger +* VNLib collection of libraries and utilities. +* +* ObjectCacheServer is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* ObjectCacheServer is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using VNLib.Data.Caching.Extensions; + +namespace VNLib.Data.Caching.ObjectCache.Server.Distribution +{ + internal interface ICachePeerAdapter + { + /// <summary> + /// Gets the peers that have been discovered but not yet connected to + /// </summary> + /// <returns>A collection of peers that have not been connected to yet</returns> + ICachePeerAdvertisment[] GetNewPeers(); + + /// <summary> + /// Called when a peer has been connected to + /// </summary> + /// <param name="peer">The peer that has been connected</param> + void OnPeerListenerAttached(ICachePeerAdvertisment peer); + + /// <summary> + /// Called when a peer has been disconnected from + /// </summary> + /// <param name="peer">The disconnected peer</param> + void OnPeerListenerDetatched(ICachePeerAdvertisment peer); + } +} diff --git a/plugins/ObjectCacheServer/src/Distribution/INodeDiscoveryCollection.cs b/plugins/ObjectCacheServer/src/Distribution/INodeDiscoveryCollection.cs new file mode 100644 index 0000000..d69da40 --- /dev/null +++ b/plugins/ObjectCacheServer/src/Distribution/INodeDiscoveryCollection.cs @@ -0,0 +1,58 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: ObjectCacheServer +* File: INodeDiscoveryCollection.cs +* +* INodeDiscoveryCollection.cs is part of ObjectCacheServer which is +* part of the larger VNLib collection of libraries and utilities. +* +* ObjectCacheServer is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* ObjectCacheServer is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System.Collections.Generic; + +using VNLib.Data.Caching.Extensions; + +namespace VNLib.Data.Caching.ObjectCache.Server.Distribution +{ + internal interface INodeDiscoveryCollection + { + /// <summary> + /// Begins a new discovery and gets an enumerator for the discovery process + /// </summary> + /// <returns>An enumerator that simplifies discovery of unique nodes</returns> + INodeDiscoveryEnumerator BeginDiscovery(); + + /// <summary> + /// Begins a new discovery and gets an enumerator for the discovery process + /// </summary> + /// <param name="initialPeers">An initial collection of peers to add to the enumeration</param> + /// <returns>An enumerator that simplifies discovery of unique nodes</returns> + INodeDiscoveryEnumerator BeginDiscovery(IEnumerable<ICachePeerAdvertisment> initialPeers); + + /// <summary> + /// Gets a snapshot of all discovered nodes in the current collection. + /// </summary> + /// <returns>The current collection of notes</returns> + ICachePeerAdvertisment[] GetAllNodes(); + + /// <summary> + /// Completes a discovery process and updates the collection with the results + /// </summary> + /// <param name="enumerator">The enumerator used to collect discovered nodes</param> + void CompleteDiscovery(INodeDiscoveryEnumerator enumerator); + } +} diff --git a/plugins/ObjectCacheServer/src/Distribution/INodeDiscoveryEnumerator.cs b/plugins/ObjectCacheServer/src/Distribution/INodeDiscoveryEnumerator.cs new file mode 100644 index 0000000..5cddf9c --- /dev/null +++ b/plugins/ObjectCacheServer/src/Distribution/INodeDiscoveryEnumerator.cs @@ -0,0 +1,45 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: ObjectCacheServer +* File: INodeDiscoveryEnumerator.cs +* +* INodeDiscoveryEnumerator.cs is part of ObjectCacheServer which is part +* of the larger VNLib collection of libraries and utilities. +* +* ObjectCacheServer is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* ObjectCacheServer is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System.Collections.Generic; + +using VNLib.Data.Caching.Extensions; + +namespace VNLib.Data.Caching.ObjectCache.Server.Distribution +{ + internal interface INodeDiscoveryEnumerator + { + /// <summary> + /// Moves the enumerator to the next peer in the discovery process and returns the result + /// </summary> + /// <returns>The next peer advertisment in the enumeration</returns> + ICachePeerAdvertisment? GetNextPeer(); + + /// <summary> + /// Adds the specified peer to the collection of discovered peers + /// </summary> + /// <param name="discoveredPeers">The peer collection</param> + void OnPeerDiscoveryComplete(IEnumerable<ICachePeerAdvertisment> discoveredPeers); + } +} diff --git a/plugins/ObjectCacheServer/src/Distribution/IPeerMonitor.cs b/plugins/ObjectCacheServer/src/Distribution/IPeerMonitor.cs new file mode 100644 index 0000000..b4cb840 --- /dev/null +++ b/plugins/ObjectCacheServer/src/Distribution/IPeerMonitor.cs @@ -0,0 +1,53 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: ObjectCacheServer +* File: IPeerMonitor.cs +* +* IPeerMonitor.cs is part of ObjectCacheServer which is part of the larger +* VNLib collection of libraries and utilities. +* +* ObjectCacheServer is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* ObjectCacheServer is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System.Collections.Generic; + +namespace VNLib.Data.Caching.ObjectCache.Server.Distribution +{ + /// <summary> + /// Represents a monitor for peer cache servers to advertise their presence + /// in the cluster + /// </summary> + internal interface IPeerMonitor + { + /// <summary> + /// Notifies the monitor that a peer has connected to the cluster + /// </summary> + /// <param name="peer">The peer that connected</param> + void OnPeerConnected(ICachePeer peer); + + /// <summary> + /// Notifies the monitor that a peer has disconnected + /// </summary> + /// <param name="peer">The peer that has disconnected</param> + void OnPeerDisconnected(ICachePeer peer); + + /// <summary> + /// Gets an enumerable of all peers currently active in the current peer + /// </summary> + /// <returns></returns> + IEnumerable<ICachePeer> GetAllPeers(); + } +} diff --git a/plugins/ObjectCacheServer/src/Distribution/NodeDiscoveryCollection.cs b/plugins/ObjectCacheServer/src/Distribution/NodeDiscoveryCollection.cs new file mode 100644 index 0000000..f773a2e --- /dev/null +++ b/plugins/ObjectCacheServer/src/Distribution/NodeDiscoveryCollection.cs @@ -0,0 +1,99 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: ObjectCacheServer +* File: PeerDiscoveryManager.cs +* +* PeerDiscoveryManager.cs is part of ObjectCacheServer which is part of the larger +* VNLib collection of libraries and utilities. +* +* ObjectCacheServer is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* ObjectCacheServer is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Linq; +using System.Collections.Generic; + +using VNLib.Plugins; +using VNLib.Data.Caching.Extensions; + +namespace VNLib.Data.Caching.ObjectCache.Server.Distribution +{ + sealed class NodeDiscoveryCollection : INodeDiscoveryCollection + { + private LinkedList<ICachePeerAdvertisment> _peers; + + + public NodeDiscoveryCollection(PluginBase plugin) + { + _peers = new(); + } + + ///<inheritdoc/> + public INodeDiscoveryEnumerator BeginDiscovery() + { + return new NodeEnumerator(new()); + } + + ///<inheritdoc/> + public INodeDiscoveryEnumerator BeginDiscovery(IEnumerable<ICachePeerAdvertisment> initialPeers) + { + //Init new enumerator with the initial peers + return new NodeEnumerator(new(initialPeers)); + } + + ///<inheritdoc/> + public void CompleteDiscovery(INodeDiscoveryEnumerator enumerator) + { + _ = enumerator ?? throw new ArgumentNullException(nameof(enumerator)); + + //Capture all nodes from the enumerator and store them as our current peers + _peers = (enumerator as NodeEnumerator)!.Peers; + } + + ///<inheritdoc/> + public ICachePeerAdvertisment[] GetAllNodes() + { + //Capture all current peers + return _peers.ToArray(); + } + + private sealed record class NodeEnumerator(LinkedList<ICachePeerAdvertisment> Peers) : INodeDiscoveryEnumerator + { + //Keep track of the current node in the collection so we can move down the list + private LinkedListNode<ICachePeerAdvertisment>? _currentNode = Peers.First; + + public ICachePeerAdvertisment? GetNextPeer() + { + //Move to the next peer in the collection + _currentNode = _currentNode?.Next; + + return _currentNode?.Value; + } + + public void OnPeerDiscoveryComplete(IEnumerable<ICachePeerAdvertisment> discoveredPeers) + { + //Get only the peers from the discovery that are not already in the collection + IEnumerable<ICachePeerAdvertisment> newPeers = discoveredPeers.Except(Peers); + + //Add them to the end of the collection + foreach(ICachePeerAdvertisment ad in newPeers) + { + Peers.AddLast(ad); + } + } + } + } +} diff --git a/plugins/ObjectCacheServer/src/Distribution/PeerDiscoveryManager.cs b/plugins/ObjectCacheServer/src/Distribution/PeerDiscoveryManager.cs new file mode 100644 index 0000000..54e4258 --- /dev/null +++ b/plugins/ObjectCacheServer/src/Distribution/PeerDiscoveryManager.cs @@ -0,0 +1,168 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: ObjectCacheServer +* File: PeerDiscoveryManager.cs +* +* PeerDiscoveryManager.cs is part of ObjectCacheServer which is part of the larger +* VNLib collection of libraries and utilities. +* +* ObjectCacheServer is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* ObjectCacheServer is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using System.Collections.Generic; + +using VNLib.Plugins; +using VNLib.Utils.Logging; +using VNLib.Data.Caching.Extensions; +using VNLib.Plugins.Extensions.Loading; + +namespace VNLib.Data.Caching.ObjectCache.Server.Distribution +{ + + sealed class PeerDiscoveryManager : IAsyncBackgroundWork, ICachePeerAdapter + { + private readonly NodeConfig _config; + private readonly IPeerMonitor _monitor; + private readonly INodeDiscoveryCollection _peers; + + public PeerDiscoveryManager(PluginBase plugin) + { + //Get config + _config = plugin.GetOrCreateSingleton<NodeConfig>(); + + //Get the peer monitor + _monitor = plugin.GetOrCreateSingleton<CachePeerMonitor>(); + + //Get the node collection + _peers = plugin.GetOrCreateSingleton<NodeDiscoveryCollection>(); + + _connectedPeers = new(); + } + + async Task IAsyncBackgroundWork.DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken) + { + pluginLog.Information("Node discovery worker started"); + + try + { + while (true) + { + try + { + await DiscoverAllNodesAsync(pluginLog, exitToken); + } + catch(OperationCanceledException) + { + throw; + } + catch (Exception ex) + { + pluginLog.Error(ex, "Failed to discover new peer nodes"); + } + + //Delay the next discovery + await Task.Delay(_config.DiscoveryInterval, exitToken); + } + } + catch (OperationCanceledException) + { + //Normal exit + pluginLog.Information("Node discovery worker exiting"); + } + finally + { + + } + } + + async Task DiscoverAllNodesAsync(ILogProvider log, CancellationToken cancellation) + { + //Use the monitor to get the initial peers + IEnumerable<ICachePeerAdvertisment> ads = _monitor.GetAllPeers() + .Where(static p => p.Advertisment != null) + .Select(static p => p.Advertisment!); + + //Init enumerator with initial peers + INodeDiscoveryEnumerator enumerator = _peers.BeginDiscovery(ads); + + do + { + //Load the initial peer + ICachePeerAdvertisment? peer = enumerator.GetNextPeer(); + + if (peer == null) + { + break; + } + + log.Verbose("Discovering peer nodes from {Peer}", peer.NodeId); + + //Discover nodes from this peer + ICachePeerAdvertisment[]? newNodes = await _config.Config.DiscoverClusterNodesAsync(peer, cancellation); + + //Add nodes to the enumerator + if (newNodes != null) + { + enumerator.OnPeerDiscoveryComplete(newNodes); + } + + } while (true); + + //Commit peer updates + _peers.CompleteDiscovery(enumerator); + } + + + private readonly List<ICachePeerAdvertisment> _connectedPeers; + + ///<inheritdoc/> + public ICachePeerAdvertisment[] GetNewPeers() + { + lock (_connectedPeers) + { + //Get all discovered peers + ICachePeerAdvertisment[] peers = _peers.GetAllNodes(); + + //Get the difference between the discovered peers and the connected peers + return peers.Except(_connectedPeers).ToArray(); + } + } + + ///<inheritdoc/> + public void OnPeerListenerAttached(ICachePeerAdvertisment peer) + { + lock (_connectedPeers) + { + //Add to connected peers + _connectedPeers.Add(peer); + } + } + + ///<inheritdoc/> + public void OnPeerListenerDetatched(ICachePeerAdvertisment peer) + { + //remove from connected peers + lock (_connectedPeers) + { + _connectedPeers.Remove(peer); + } + } + } +} |