aboutsummaryrefslogtreecommitdiff
path: root/plugins/ObjectCacheServer/src/Distribution
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/ObjectCacheServer/src/Distribution')
-rw-r--r--plugins/ObjectCacheServer/src/Distribution/CacheNodeReplicationMaanger.cs246
-rw-r--r--plugins/ObjectCacheServer/src/Distribution/CachePeerMonitor.cs55
-rw-r--r--plugins/ObjectCacheServer/src/Distribution/ICachePeerAdapter.cs49
-rw-r--r--plugins/ObjectCacheServer/src/Distribution/INodeDiscoveryCollection.cs58
-rw-r--r--plugins/ObjectCacheServer/src/Distribution/INodeDiscoveryEnumerator.cs45
-rw-r--r--plugins/ObjectCacheServer/src/Distribution/IPeerMonitor.cs53
-rw-r--r--plugins/ObjectCacheServer/src/Distribution/NodeDiscoveryCollection.cs99
-rw-r--r--plugins/ObjectCacheServer/src/Distribution/PeerDiscoveryManager.cs168
8 files changed, 773 insertions, 0 deletions
diff --git a/plugins/ObjectCacheServer/src/Distribution/CacheNodeReplicationMaanger.cs b/plugins/ObjectCacheServer/src/Distribution/CacheNodeReplicationMaanger.cs
new file mode 100644
index 0000000..b453dcc
--- /dev/null
+++ b/plugins/ObjectCacheServer/src/Distribution/CacheNodeReplicationMaanger.cs
@@ -0,0 +1,246 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: ObjectCacheServer
+* File: ObjectCacheServerEntry.cs
+*
+* ObjectCacheServerEntry.cs is part of ObjectCacheServer which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* ObjectCacheServer is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* ObjectCacheServer is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using System.Collections.Generic;
+
+using VNLib.Plugins;
+using VNLib.Utils.Logging;
+using VNLib.Data.Caching.Extensions;
+using static VNLib.Data.Caching.Constants;
+using VNLib.Net.Messaging.FBM;
+using VNLib.Net.Messaging.FBM.Client;
+using VNLib.Plugins.Extensions.Loading;
+
+namespace VNLib.Data.Caching.ObjectCache.Server.Distribution
+{
+ internal sealed class CacheNodeReplicationMaanger : IAsyncBackgroundWork
+ {
+ private static readonly TimeSpan GetItemTimeout = TimeSpan.FromSeconds(10);
+
+ private readonly NodeConfig NodeConfig;
+ private readonly ICachePeerAdapter PeerAdapter;
+ private readonly ICacheStore CacheStore;
+ private readonly FBMClientConfig ClientConfig;
+ private readonly PluginBase Plugin;
+
+ private CacheNodeConfiguration CacheConfig => NodeConfig.Config;
+
+ public CacheNodeReplicationMaanger(PluginBase plugin)
+ {
+ //Load the node config
+ NodeConfig = plugin.GetOrCreateSingleton<NodeConfig>();
+
+ //Get peer adapter
+ PeerAdapter = plugin.GetOrCreateSingleton<PeerDiscoveryManager>();
+
+
+ }
+
+ public async Task DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken)
+ {
+ pluginLog.Information("[REPL] Initializing node replication worker");
+
+ try
+ {
+ while (true)
+ {
+ //Get all new peers
+ ICachePeerAdvertisment[] peers = PeerAdapter.GetNewPeers();
+
+ if (peers.Length == 0)
+ {
+ pluginLog.Verbose("[REPL] No new peers to connect to");
+ }
+
+ //Connect to each peer
+ foreach (ICachePeerAdvertisment peer in peers)
+ {
+ _ = Plugin.ObserveWork(() => OnNewPeerDoWorkAsync(peer, pluginLog, exitToken));
+ }
+
+ //Wait for a new peers
+ await Task.Delay(10000, exitToken);
+ }
+ }
+ catch (OperationCanceledException)
+ {
+ //Normal exit
+ }
+ catch
+ {
+ pluginLog.Error("[REPL] Node replication worker exited with an error");
+ throw;
+ }
+ finally
+ {
+
+ }
+
+ pluginLog.Information("[REPL] Node replication worker exited");
+ }
+
+ private async Task OnNewPeerDoWorkAsync(ICachePeerAdvertisment newPeer, ILogProvider log, CancellationToken exitToken)
+ {
+ _ = newPeer ?? throw new ArgumentNullException(nameof(newPeer));
+
+ //Setup client
+ FBMClient client = new(ClientConfig);
+
+ try
+ {
+ log.Information("Establishing replication connection to peer {server}...", newPeer.NodeId);
+
+ //Connect to the server
+ await client.ConnectToCacheAsync(newPeer, CacheConfig, exitToken);
+
+ log.Information("Connected to {server}, starting queue listeners", newPeer.NodeId);
+
+ //Start worker tasks
+ List<Task> workerTasks = new();
+
+ for (int i = 0; i < Environment.ProcessorCount; i++)
+ {
+ Task workerTask = Task.Run(() => ReplicationWorkerDoWorkAsync(client, log, exitToken), exitToken);
+
+ workerTasks.Add(workerTask);
+ }
+
+ //Wait for sync workers to exit
+ await Task.WhenAll(workerTasks);
+
+ log.Debug("All cache worker tasks exited successfully, disconnecting from {server}", newPeer.NodeId);
+
+ //Disconnect client gracefully
+ await client.DisconnectAsync(CancellationToken.None);
+ }
+ catch (InvalidResponseException ie)
+ {
+ //See if the plugin is unloading
+ if (!exitToken.IsCancellationRequested)
+ {
+ log.Debug("Peer {p} responded with invalid response packet, disconnected. reason\n {reason}", newPeer.NodeId, ie);
+ }
+ //Disconnect client gracefully
+ try
+ {
+ await client.DisconnectAsync(CancellationToken.None);
+ }
+ catch (Exception ex)
+ {
+ log.Error(ex);
+ }
+ }
+ catch (OperationCanceledException)
+ {
+ //Plugin unloading, Try to disconnect
+ try
+ {
+ await client.DisconnectAsync(CancellationToken.None);
+ }
+ catch (Exception ex)
+ {
+ log.Error(ex);
+ }
+ }
+ catch (Exception ex)
+ {
+ log.Warn("Lost connection to peer {h}\n {m}", newPeer.NodeId, ex);
+ }
+ finally
+ {
+ client.Dispose();
+
+ //Notify monitor of disconnect
+ PeerAdapter.OnPeerListenerDetatched(newPeer);
+ }
+ }
+
+ //Wroker task callback method
+ private async Task ReplicationWorkerDoWorkAsync(FBMClient client, ILogProvider log, CancellationToken exitToken)
+ {
+ //Listen for changes
+ while (true)
+ {
+ //Wait for changes
+ WaitForChangeResult changedObject = await client.WaitForChangeAsync(exitToken);
+
+ log.Debug("Object changed {typ} {obj}", changedObject.Status, changedObject.CurrentId);
+
+ switch (changedObject.Status)
+ {
+ case ResponseCodes.NotFound:
+ log.Warn("Server cache not properly configured, worker exiting");
+ return;
+ case "deleted":
+ //Delete the object from the store
+ await CacheStore.DeleteItemAsync(changedObject.CurrentId);
+ break;
+ case "modified":
+ //Reload the record from the store
+ await UpdateRecordAsync(client, log, changedObject.CurrentId, changedObject.NewId, exitToken);
+ break;
+ }
+ }
+ }
+
+ private async Task UpdateRecordAsync(FBMClient client, ILogProvider log, string objectId, string newId, CancellationToken cancellation)
+ {
+ //Get request message
+ FBMRequest modRequest = client.RentRequest();
+ try
+ {
+ //Set action as get/create
+ modRequest.WriteHeader(HeaderCommand.Action, Actions.Get);
+ //Set session-id header
+ modRequest.WriteHeader(ObjectId, string.IsNullOrWhiteSpace(newId) ? objectId : newId);
+
+ //Make request
+ using FBMResponse response = await client.SendAsync(modRequest, GetItemTimeout, cancellation);
+
+ response.ThrowIfNotSet();
+
+ //Check response code
+ string status = response.Headers.First(static s => s.Header == HeaderCommand.Status).Value.ToString();
+ if (ResponseCodes.Okay.Equals(status, StringComparison.Ordinal))
+ {
+ //Update the record
+ await CacheStore.AddOrUpdateBlobAsync(objectId, newId, static (t) => t.ResponseBody, response, cancellation);
+ log.Debug("Updated object {id}", objectId);
+ }
+ else
+ {
+ log.Warn("Object {id} was missing on the remote server", objectId);
+ }
+ }
+ finally
+ {
+ client.ReturnRequest(modRequest);
+ }
+ }
+ }
+}
diff --git a/plugins/ObjectCacheServer/src/Distribution/CachePeerMonitor.cs b/plugins/ObjectCacheServer/src/Distribution/CachePeerMonitor.cs
new file mode 100644
index 0000000..82b280c
--- /dev/null
+++ b/plugins/ObjectCacheServer/src/Distribution/CachePeerMonitor.cs
@@ -0,0 +1,55 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: ObjectCacheServer
+* File: CachePeerMonitor.cs
+*
+* CachePeerMonitor.cs is part of ObjectCacheServer which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* ObjectCacheServer is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* ObjectCacheServer is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+using System.Collections.Generic;
+
+using VNLib.Plugins;
+
+namespace VNLib.Data.Caching.ObjectCache.Server.Distribution
+{
+
+ internal sealed class CachePeerMonitor : IPeerMonitor
+ {
+ public CachePeerMonitor(PluginBase plugin)
+ {
+
+ }
+
+ public IEnumerable<ICachePeer> GetAllPeers()
+ {
+ throw new NotImplementedException();
+ }
+
+ public void OnPeerConnected(ICachePeer peer)
+ {
+ throw new NotImplementedException();
+ }
+
+ public void OnPeerDisconnected(ICachePeer peer)
+ {
+ throw new NotImplementedException();
+ }
+ }
+}
diff --git a/plugins/ObjectCacheServer/src/Distribution/ICachePeerAdapter.cs b/plugins/ObjectCacheServer/src/Distribution/ICachePeerAdapter.cs
new file mode 100644
index 0000000..d029f10
--- /dev/null
+++ b/plugins/ObjectCacheServer/src/Distribution/ICachePeerAdapter.cs
@@ -0,0 +1,49 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: ObjectCacheServer
+* File: ObjectCacheServerEntry.cs
+*
+* ObjectCacheServerEntry.cs is part of ObjectCacheServer which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* ObjectCacheServer is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* ObjectCacheServer is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using VNLib.Data.Caching.Extensions;
+
+namespace VNLib.Data.Caching.ObjectCache.Server.Distribution
+{
+ internal interface ICachePeerAdapter
+ {
+ /// <summary>
+ /// Gets the peers that have been discovered but not yet connected to
+ /// </summary>
+ /// <returns>A collection of peers that have not been connected to yet</returns>
+ ICachePeerAdvertisment[] GetNewPeers();
+
+ /// <summary>
+ /// Called when a peer has been connected to
+ /// </summary>
+ /// <param name="peer">The peer that has been connected</param>
+ void OnPeerListenerAttached(ICachePeerAdvertisment peer);
+
+ /// <summary>
+ /// Called when a peer has been disconnected from
+ /// </summary>
+ /// <param name="peer">The disconnected peer</param>
+ void OnPeerListenerDetatched(ICachePeerAdvertisment peer);
+ }
+}
diff --git a/plugins/ObjectCacheServer/src/Distribution/INodeDiscoveryCollection.cs b/plugins/ObjectCacheServer/src/Distribution/INodeDiscoveryCollection.cs
new file mode 100644
index 0000000..d69da40
--- /dev/null
+++ b/plugins/ObjectCacheServer/src/Distribution/INodeDiscoveryCollection.cs
@@ -0,0 +1,58 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: ObjectCacheServer
+* File: INodeDiscoveryCollection.cs
+*
+* INodeDiscoveryCollection.cs is part of ObjectCacheServer which is
+* part of the larger VNLib collection of libraries and utilities.
+*
+* ObjectCacheServer is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* ObjectCacheServer is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System.Collections.Generic;
+
+using VNLib.Data.Caching.Extensions;
+
+namespace VNLib.Data.Caching.ObjectCache.Server.Distribution
+{
+ internal interface INodeDiscoveryCollection
+ {
+ /// <summary>
+ /// Begins a new discovery and gets an enumerator for the discovery process
+ /// </summary>
+ /// <returns>An enumerator that simplifies discovery of unique nodes</returns>
+ INodeDiscoveryEnumerator BeginDiscovery();
+
+ /// <summary>
+ /// Begins a new discovery and gets an enumerator for the discovery process
+ /// </summary>
+ /// <param name="initialPeers">An initial collection of peers to add to the enumeration</param>
+ /// <returns>An enumerator that simplifies discovery of unique nodes</returns>
+ INodeDiscoveryEnumerator BeginDiscovery(IEnumerable<ICachePeerAdvertisment> initialPeers);
+
+ /// <summary>
+ /// Gets a snapshot of all discovered nodes in the current collection.
+ /// </summary>
+ /// <returns>The current collection of notes</returns>
+ ICachePeerAdvertisment[] GetAllNodes();
+
+ /// <summary>
+ /// Completes a discovery process and updates the collection with the results
+ /// </summary>
+ /// <param name="enumerator">The enumerator used to collect discovered nodes</param>
+ void CompleteDiscovery(INodeDiscoveryEnumerator enumerator);
+ }
+}
diff --git a/plugins/ObjectCacheServer/src/Distribution/INodeDiscoveryEnumerator.cs b/plugins/ObjectCacheServer/src/Distribution/INodeDiscoveryEnumerator.cs
new file mode 100644
index 0000000..5cddf9c
--- /dev/null
+++ b/plugins/ObjectCacheServer/src/Distribution/INodeDiscoveryEnumerator.cs
@@ -0,0 +1,45 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: ObjectCacheServer
+* File: INodeDiscoveryEnumerator.cs
+*
+* INodeDiscoveryEnumerator.cs is part of ObjectCacheServer which is part
+* of the larger VNLib collection of libraries and utilities.
+*
+* ObjectCacheServer is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* ObjectCacheServer is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System.Collections.Generic;
+
+using VNLib.Data.Caching.Extensions;
+
+namespace VNLib.Data.Caching.ObjectCache.Server.Distribution
+{
+ internal interface INodeDiscoveryEnumerator
+ {
+ /// <summary>
+ /// Moves the enumerator to the next peer in the discovery process and returns the result
+ /// </summary>
+ /// <returns>The next peer advertisment in the enumeration</returns>
+ ICachePeerAdvertisment? GetNextPeer();
+
+ /// <summary>
+ /// Adds the specified peer to the collection of discovered peers
+ /// </summary>
+ /// <param name="discoveredPeers">The peer collection</param>
+ void OnPeerDiscoveryComplete(IEnumerable<ICachePeerAdvertisment> discoveredPeers);
+ }
+}
diff --git a/plugins/ObjectCacheServer/src/Distribution/IPeerMonitor.cs b/plugins/ObjectCacheServer/src/Distribution/IPeerMonitor.cs
new file mode 100644
index 0000000..b4cb840
--- /dev/null
+++ b/plugins/ObjectCacheServer/src/Distribution/IPeerMonitor.cs
@@ -0,0 +1,53 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: ObjectCacheServer
+* File: IPeerMonitor.cs
+*
+* IPeerMonitor.cs is part of ObjectCacheServer which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* ObjectCacheServer is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* ObjectCacheServer is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System.Collections.Generic;
+
+namespace VNLib.Data.Caching.ObjectCache.Server.Distribution
+{
+ /// <summary>
+ /// Represents a monitor for peer cache servers to advertise their presence
+ /// in the cluster
+ /// </summary>
+ internal interface IPeerMonitor
+ {
+ /// <summary>
+ /// Notifies the monitor that a peer has connected to the cluster
+ /// </summary>
+ /// <param name="peer">The peer that connected</param>
+ void OnPeerConnected(ICachePeer peer);
+
+ /// <summary>
+ /// Notifies the monitor that a peer has disconnected
+ /// </summary>
+ /// <param name="peer">The peer that has disconnected</param>
+ void OnPeerDisconnected(ICachePeer peer);
+
+ /// <summary>
+ /// Gets an enumerable of all peers currently active in the current peer
+ /// </summary>
+ /// <returns></returns>
+ IEnumerable<ICachePeer> GetAllPeers();
+ }
+}
diff --git a/plugins/ObjectCacheServer/src/Distribution/NodeDiscoveryCollection.cs b/plugins/ObjectCacheServer/src/Distribution/NodeDiscoveryCollection.cs
new file mode 100644
index 0000000..f773a2e
--- /dev/null
+++ b/plugins/ObjectCacheServer/src/Distribution/NodeDiscoveryCollection.cs
@@ -0,0 +1,99 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: ObjectCacheServer
+* File: PeerDiscoveryManager.cs
+*
+* PeerDiscoveryManager.cs is part of ObjectCacheServer which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* ObjectCacheServer is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* ObjectCacheServer is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+using System.Linq;
+using System.Collections.Generic;
+
+using VNLib.Plugins;
+using VNLib.Data.Caching.Extensions;
+
+namespace VNLib.Data.Caching.ObjectCache.Server.Distribution
+{
+ sealed class NodeDiscoveryCollection : INodeDiscoveryCollection
+ {
+ private LinkedList<ICachePeerAdvertisment> _peers;
+
+
+ public NodeDiscoveryCollection(PluginBase plugin)
+ {
+ _peers = new();
+ }
+
+ ///<inheritdoc/>
+ public INodeDiscoveryEnumerator BeginDiscovery()
+ {
+ return new NodeEnumerator(new());
+ }
+
+ ///<inheritdoc/>
+ public INodeDiscoveryEnumerator BeginDiscovery(IEnumerable<ICachePeerAdvertisment> initialPeers)
+ {
+ //Init new enumerator with the initial peers
+ return new NodeEnumerator(new(initialPeers));
+ }
+
+ ///<inheritdoc/>
+ public void CompleteDiscovery(INodeDiscoveryEnumerator enumerator)
+ {
+ _ = enumerator ?? throw new ArgumentNullException(nameof(enumerator));
+
+ //Capture all nodes from the enumerator and store them as our current peers
+ _peers = (enumerator as NodeEnumerator)!.Peers;
+ }
+
+ ///<inheritdoc/>
+ public ICachePeerAdvertisment[] GetAllNodes()
+ {
+ //Capture all current peers
+ return _peers.ToArray();
+ }
+
+ private sealed record class NodeEnumerator(LinkedList<ICachePeerAdvertisment> Peers) : INodeDiscoveryEnumerator
+ {
+ //Keep track of the current node in the collection so we can move down the list
+ private LinkedListNode<ICachePeerAdvertisment>? _currentNode = Peers.First;
+
+ public ICachePeerAdvertisment? GetNextPeer()
+ {
+ //Move to the next peer in the collection
+ _currentNode = _currentNode?.Next;
+
+ return _currentNode?.Value;
+ }
+
+ public void OnPeerDiscoveryComplete(IEnumerable<ICachePeerAdvertisment> discoveredPeers)
+ {
+ //Get only the peers from the discovery that are not already in the collection
+ IEnumerable<ICachePeerAdvertisment> newPeers = discoveredPeers.Except(Peers);
+
+ //Add them to the end of the collection
+ foreach(ICachePeerAdvertisment ad in newPeers)
+ {
+ Peers.AddLast(ad);
+ }
+ }
+ }
+ }
+}
diff --git a/plugins/ObjectCacheServer/src/Distribution/PeerDiscoveryManager.cs b/plugins/ObjectCacheServer/src/Distribution/PeerDiscoveryManager.cs
new file mode 100644
index 0000000..54e4258
--- /dev/null
+++ b/plugins/ObjectCacheServer/src/Distribution/PeerDiscoveryManager.cs
@@ -0,0 +1,168 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: ObjectCacheServer
+* File: PeerDiscoveryManager.cs
+*
+* PeerDiscoveryManager.cs is part of ObjectCacheServer which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* ObjectCacheServer is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* ObjectCacheServer is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using System.Collections.Generic;
+
+using VNLib.Plugins;
+using VNLib.Utils.Logging;
+using VNLib.Data.Caching.Extensions;
+using VNLib.Plugins.Extensions.Loading;
+
+namespace VNLib.Data.Caching.ObjectCache.Server.Distribution
+{
+
+ sealed class PeerDiscoveryManager : IAsyncBackgroundWork, ICachePeerAdapter
+ {
+ private readonly NodeConfig _config;
+ private readonly IPeerMonitor _monitor;
+ private readonly INodeDiscoveryCollection _peers;
+
+ public PeerDiscoveryManager(PluginBase plugin)
+ {
+ //Get config
+ _config = plugin.GetOrCreateSingleton<NodeConfig>();
+
+ //Get the peer monitor
+ _monitor = plugin.GetOrCreateSingleton<CachePeerMonitor>();
+
+ //Get the node collection
+ _peers = plugin.GetOrCreateSingleton<NodeDiscoveryCollection>();
+
+ _connectedPeers = new();
+ }
+
+ async Task IAsyncBackgroundWork.DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken)
+ {
+ pluginLog.Information("Node discovery worker started");
+
+ try
+ {
+ while (true)
+ {
+ try
+ {
+ await DiscoverAllNodesAsync(pluginLog, exitToken);
+ }
+ catch(OperationCanceledException)
+ {
+ throw;
+ }
+ catch (Exception ex)
+ {
+ pluginLog.Error(ex, "Failed to discover new peer nodes");
+ }
+
+ //Delay the next discovery
+ await Task.Delay(_config.DiscoveryInterval, exitToken);
+ }
+ }
+ catch (OperationCanceledException)
+ {
+ //Normal exit
+ pluginLog.Information("Node discovery worker exiting");
+ }
+ finally
+ {
+
+ }
+ }
+
+ async Task DiscoverAllNodesAsync(ILogProvider log, CancellationToken cancellation)
+ {
+ //Use the monitor to get the initial peers
+ IEnumerable<ICachePeerAdvertisment> ads = _monitor.GetAllPeers()
+ .Where(static p => p.Advertisment != null)
+ .Select(static p => p.Advertisment!);
+
+ //Init enumerator with initial peers
+ INodeDiscoveryEnumerator enumerator = _peers.BeginDiscovery(ads);
+
+ do
+ {
+ //Load the initial peer
+ ICachePeerAdvertisment? peer = enumerator.GetNextPeer();
+
+ if (peer == null)
+ {
+ break;
+ }
+
+ log.Verbose("Discovering peer nodes from {Peer}", peer.NodeId);
+
+ //Discover nodes from this peer
+ ICachePeerAdvertisment[]? newNodes = await _config.Config.DiscoverClusterNodesAsync(peer, cancellation);
+
+ //Add nodes to the enumerator
+ if (newNodes != null)
+ {
+ enumerator.OnPeerDiscoveryComplete(newNodes);
+ }
+
+ } while (true);
+
+ //Commit peer updates
+ _peers.CompleteDiscovery(enumerator);
+ }
+
+
+ private readonly List<ICachePeerAdvertisment> _connectedPeers;
+
+ ///<inheritdoc/>
+ public ICachePeerAdvertisment[] GetNewPeers()
+ {
+ lock (_connectedPeers)
+ {
+ //Get all discovered peers
+ ICachePeerAdvertisment[] peers = _peers.GetAllNodes();
+
+ //Get the difference between the discovered peers and the connected peers
+ return peers.Except(_connectedPeers).ToArray();
+ }
+ }
+
+ ///<inheritdoc/>
+ public void OnPeerListenerAttached(ICachePeerAdvertisment peer)
+ {
+ lock (_connectedPeers)
+ {
+ //Add to connected peers
+ _connectedPeers.Add(peer);
+ }
+ }
+
+ ///<inheritdoc/>
+ public void OnPeerListenerDetatched(ICachePeerAdvertisment peer)
+ {
+ //remove from connected peers
+ lock (_connectedPeers)
+ {
+ _connectedPeers.Remove(peer);
+ }
+ }
+ }
+}