aboutsummaryrefslogtreecommitdiff
path: root/plugins/ObjectCacheServer/src/Clustering
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/ObjectCacheServer/src/Clustering')
-rw-r--r--plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs85
-rw-r--r--plugins/ObjectCacheServer/src/Clustering/CachePeerMonitor.cs11
-rw-r--r--plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs130
3 files changed, 89 insertions, 137 deletions
diff --git a/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs b/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs
index a240dde..92f0352 100644
--- a/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs
+++ b/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2023 Vaughn Nugent
+* Copyright (c) 2024 Vaughn Nugent
*
* Library: VNLib
* Package: ObjectCacheServer
@@ -36,7 +36,6 @@ using VNLib.Net.Messaging.FBM;
using VNLib.Net.Messaging.FBM.Client;
using VNLib.Plugins.Extensions.Loading;
using VNLib.Data.Caching.Extensions.Clustering;
-using VNLib.Data.Caching.ObjectCache.Server.Cache;
namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
{
@@ -55,43 +54,36 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
internal sealed class CacheNodeReplicationMaanger : IAsyncBackgroundWork
{
- private const string LOG_SCOPE_NAME = "REPL";
-
- private static readonly TimeSpan GetItemTimeout = TimeSpan.FromSeconds(10);
- private const int MAX_MESSAGE_SIZE = 12 * 1024;
-
private readonly PluginBase _plugin;
private readonly ILogProvider _log;
- private readonly NodeConfig _nodeConfig;
- private readonly ICacheStore _cacheStore;
- private readonly ICachePeerAdapter _peerAdapter;
private readonly FBMClientFactory _clientFactory;
-
+ private readonly ObjectCacheSystemState _sysState;
+
private readonly bool _isDebug;
private int _openConnections;
public CacheNodeReplicationMaanger(PluginBase plugin)
{
- //Load the node config
- _nodeConfig = plugin.GetOrCreateSingleton<NodeConfig>();
- _cacheStore = plugin.GetOrCreateSingleton<CacheStore>();
- _peerAdapter = plugin.GetOrCreateSingleton<PeerDiscoveryManager>();
+ _sysState = plugin.GetOrCreateSingleton<ObjectCacheSystemState>();
//Init fbm config with fixed message size
FBMClientConfig clientConfig = FBMDataCacheExtensions.GetDefaultConfig(
- (plugin as ObjectCacheServerEntry)!.ListenerHeap,
- MAX_MESSAGE_SIZE,
- debugLog: plugin.IsDebug() ? plugin.Log : null
+ _sysState.SharedCacheHeap,
+ CacheConstants.MaxSyncMessageSize,
+ debugLog: plugin.IsDebug() ? plugin.Log.CreateScope(CacheConstants.LogScopes.ReplicationFbmDebug) : null
);
//Init ws fallback factory and client factory
- FBMFallbackClientWsFactory wsFactory = new();
- _clientFactory = new(in clientConfig, wsFactory);
+ _clientFactory = new(
+ ref clientConfig,
+ new FBMFallbackClientWsFactory(),
+ (int)_sysState.ClusterConfig.MaxPeerConnections
+ );
_plugin = plugin;
_isDebug = plugin.IsDebug();
- _log = plugin.Log.CreateScope(LOG_SCOPE_NAME);
+ _log = plugin.Log.CreateScope(CacheConstants.LogScopes.RepliactionManager);
}
public async Task DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken)
@@ -103,7 +95,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
while (true)
{
//Get all new peers
- CacheNodeAdvertisment[] peers = _peerAdapter.GetNewPeers();
+ CacheNodeAdvertisment[] peers = _sysState.PeerDiscovery.GetNewPeers();
if (peers.Length == 0 && _isDebug)
{
@@ -111,7 +103,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
}
//Make sure we don't exceed the max connections
- if(_openConnections >= _nodeConfig.MaxPeerConnections)
+ if(_openConnections >= _sysState.ClusterConfig.MaxPeerConnections)
{
if (_isDebug)
{
@@ -148,15 +140,24 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
_log.Information("Node replication worker exited");
}
+ /*
+ * This method is called when a new peer has connected (or discovered) to establish a
+ * replication connection.
+ */
private async Task OnNewPeerDoWorkAsync(CacheNodeAdvertisment newPeer, ILogProvider log, CancellationToken exitToken)
{
- _ = newPeer ?? throw new ArgumentNullException(nameof(newPeer));
-
- //Setup client
+ ArgumentNullException.ThrowIfNull(newPeer);
+
FBMClient client = _clientFactory.CreateClient();
- //Add peer to monitor
- _peerAdapter.OnPeerListenerAttached(newPeer);
+ /*
+ * Notify discovery that we will be listening to this peer
+ *
+ * This exists so when a new discovery happens, the work loop will produce
+ * the difference of new peers to existing peers, and we can connect to them.
+ * Avoiding infinite connections to the same peer.
+ */
+ _sysState.PeerDiscovery.OnPeerListenerAttached(newPeer);
Interlocked.Increment(ref _openConnections);
@@ -165,12 +166,12 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
log.Information("Establishing replication connection to peer {server}...", newPeer.NodeId);
//Connect to the server
- await client.ConnectToCacheAsync(newPeer, _nodeConfig.Config, exitToken);
+ await client.ConnectToCacheAsync(newPeer, _sysState.NodeConfig, exitToken);
log.Information("Connected to {server}, starting queue listeners", newPeer.NodeId);
//Start worker tasks
- List<Task> workerTasks = new();
+ List<Task> workerTasks = [];
for (int i = 0; i < Environment.ProcessorCount; i++)
{
@@ -187,6 +188,10 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
//Disconnect client gracefully
await client.DisconnectAsync(CancellationToken.None);
}
+ catch(FBMServerNegiationException fbm)
+ {
+ log.Error("Failed to negotiate buffer configuration, check your cache memory configuration. Error:{err}", fbm.Message);
+ }
catch (InvalidResponseException ie)
{
//See if the plugin is unloading
@@ -218,7 +223,15 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
}
catch (Exception ex)
{
- log.Warn("Lost connection to peer {h}\n {m}", newPeer.NodeId, ex);
+ //Avoid call stacks unless debug or higher logging levels
+ if (log.IsEnabled(LogLevel.Debug))
+ {
+ log.Warn("Lost connection to peer {h}\n {m}", newPeer.NodeId, ex);
+ }
+ else
+ {
+ log.Warn("Lost connection to peer {h}\n {m}", newPeer.NodeId, ex.Message);
+ }
}
finally
{
@@ -226,8 +239,8 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
client.Dispose();
- //Notify monitor of disconnect
- _peerAdapter.OnPeerListenerDetatched(newPeer);
+ //Notify monitor of disconnect to make it available again later
+ _sysState.PeerDiscovery.OnPeerListenerDetatched(newPeer);
}
}
@@ -259,7 +272,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
return;
case "deleted":
//Delete the object from the store
- await _cacheStore.DeleteItemAsync(changedObject.CurrentId, CancellationToken.None);
+ await _sysState.InternalStore.DeleteItemAsync(changedObject.CurrentId, CancellationToken.None);
break;
case "modified":
//Reload the record from the store
@@ -287,7 +300,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
modRequest.WriteHeader(ObjectId, string.IsNullOrWhiteSpace(newId) ? objectId : newId);
//Make request
- using FBMResponse response = await client.SendAsync(modRequest, GetItemTimeout, cancellation);
+ using FBMResponse response = await client.SendAsync(modRequest, CacheConstants.Delays.CacheSyncGetItemTimeout, cancellation);
response.ThrowIfNotSet();
@@ -297,7 +310,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
if (ResponseCodes.Okay.Equals(status, StringComparison.Ordinal))
{
//Update the record
- await _cacheStore.AddOrUpdateBlobAsync(objectId, newId, static (t) => t.ResponseBody, response, cancellation);
+ await _sysState.InternalStore.AddOrUpdateBlobAsync(objectId, newId, static (t) => t.ResponseBody, response, cancellation);
log.Debug("Updated object {id}", objectId);
}
else
diff --git a/plugins/ObjectCacheServer/src/Clustering/CachePeerMonitor.cs b/plugins/ObjectCacheServer/src/Clustering/CachePeerMonitor.cs
index c49a54b..c3fbd8e 100644
--- a/plugins/ObjectCacheServer/src/Clustering/CachePeerMonitor.cs
+++ b/plugins/ObjectCacheServer/src/Clustering/CachePeerMonitor.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2023 Vaughn Nugent
+* Copyright (c) 2024 Vaughn Nugent
*
* Library: VNLib
* Package: ObjectCacheServer
@@ -29,7 +29,6 @@ using System.Collections.Generic;
using VNLib.Utils;
using VNLib.Utils.Extensions;
-using VNLib.Plugins;
namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
{
@@ -37,12 +36,9 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
internal sealed class CachePeerMonitor : VnDisposeable, IPeerMonitor
{
- private readonly LinkedList<ICachePeer> peers = new();
+ private readonly List<ICachePeer> peers = new();
private readonly ManualResetEvent newPeerTrigger = new (false);
- public CachePeerMonitor(PluginBase plugin)
- { }
-
/// <summary>
/// Waits for new peers to connect to the server
/// </summary>
@@ -70,7 +66,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
//When a peer is connected we can add it to the list so the replication manager can see it
lock(peers)
{
- peers.AddLast(peer);
+ peers.Add(peer);
}
//Trigger monitor when change occurs
@@ -92,6 +88,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
protected override void Free()
{
+ peers.Clear();
newPeerTrigger.Dispose();
}
}
diff --git a/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs b/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs
index 6475f9c..b9a220d 100644
--- a/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs
+++ b/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2023 Vaughn Nugent
+* Copyright (c) 2024 Vaughn Nugent
*
* Library: VNLib
* Package: ObjectCacheServer
@@ -24,14 +24,11 @@
using System;
using System.Linq;
-using System.Net.Http;
using System.Threading;
-using System.Net.Sockets;
using System.Threading.Tasks;
using System.Collections.Generic;
using VNLib.Utils.Logging;
-using VNLib.Plugins;
using VNLib.Plugins.Extensions.Loading;
using VNLib.Data.Caching.Extensions;
using VNLib.Data.Caching.Extensions.Clustering;
@@ -43,54 +40,19 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
* This class is responsible for resolving and discovering peer nodes in the cluster network.
*/
- internal sealed class PeerDiscoveryManager : IAsyncBackgroundWork, ICachePeerAdapter
+ internal sealed class PeerDiscoveryManager(
+ CacheNodeConfiguration config,
+ ServerClusterConfig clusterConf,
+ CachePeerMonitor Monitor,
+ ILogProvider Log,
+ bool IsDebug,
+ bool HasWellKnown
+ )
+ : IAsyncBackgroundWork, ICachePeerAdapter
{
- private const string LOG_SCOPE_NAME = "DISC";
- /*
- * The initial discovery delay. This allows for the server to initialize before
- * starting the discovery process. This will probably be a shorter delay
- * than a usual discovery interval.
- */
- private static readonly TimeSpan InitialDelay = TimeSpan.FromSeconds(15);
- private static readonly TimeSpan WhenWellKnownResolveFailed = TimeSpan.FromSeconds(20);
-
-
- private readonly List<CacheNodeAdvertisment> _connectedPeers;
- private readonly NodeConfig Config;
- private readonly CachePeerMonitor Monitor;
- private readonly ILogProvider Log;
- private readonly bool IsDebug;
- private readonly bool HasWellKnown;
-
- public PeerDiscoveryManager(PluginBase plugin)
- {
- //Get config
- Config = plugin.GetOrCreateSingleton<NodeConfig>();
-
- //Get the known peers array from config, its allowed to be null for master nodes
- IConfigScope? config = plugin.TryGetConfig("known_peers");
- string[] kownPeers = config?.Deserialze<string[]>() ?? Array.Empty<string>();
-
- //Add known peers to the monitor
- Config.Config.WithInitialPeers(kownPeers.Select(static s => new Uri(s)));
-
- HasWellKnown = kownPeers.Length > 0;
-
- //Get the peer monitor
- Monitor = plugin.GetOrCreateSingleton<CachePeerMonitor>();
-
- _connectedPeers = new();
-
- //Create scoped logger
- Log = plugin.Log.CreateScope(LOG_SCOPE_NAME);
-
- Log.Information("Inital peer nodes: {nodes}", kownPeers);
- //Setup discovery error handler
- Config.Config.WithErrorHandler(new ErrorHandler(Log));
-
- IsDebug = plugin.IsDebug();
- }
+ private readonly List<CacheNodeAdvertisment> _connectedPeers = [];
+ private readonly VNCacheClusterManager clusterMan = new(config);
async Task IAsyncBackgroundWork.DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken)
{
@@ -103,12 +65,11 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
//Start the change listener
Task watcher = WatchForPeersAsync(exitToken);
- Log.Information("Node discovery worker started, waiting for {idel} to start initial discovery", InitialDelay);
+ Log.Information("Node discovery worker started, waiting for {idel} to start initial discovery", CacheConstants.Delays.InitialDiscovery);
try
- {
- //Wait for the initial delay
- await Task.Delay(InitialDelay, exitToken);
+ {
+ await Task.Delay(CacheConstants.Delays.InitialDiscovery, exitToken);
Log.Debug("Begining discovery loop");
@@ -123,26 +84,32 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
Log.Debug("Begining node discovery");
}
- //Resolve all known peers
- CacheNodeAdvertisment[] wellKnown = await Config.Config.ResolveWellKnownAsync(exitToken);
- wellKnownFailed = wellKnown.Length == 0;
+ /*
+ * On every loop we will need to resolve well-known servers incase they go down
+ * or change. There probably should be some more advanced logic and caching here.
+ *
+ * Node may not have any well-known nodes, so we need to check for that.
+ */
+ CacheNodeAdvertisment[] wellKnown = HasWellKnown ?
+ await clusterMan.ResolveWellKnownAsync(exitToken) :
+ Array.Empty<CacheNodeAdvertisment>();
//Use the monitor to get the initial peers
IEnumerable<CacheNodeAdvertisment> ads = GetMonitorAds();
- //Combine well-known with new connected peers
+ //Combine well-known peers that are currently connected to this server
CacheNodeAdvertisment[] allAds = ads.Union(wellKnown).ToArray();
if (allAds.Length > 0)
{
- //Discover all known nodes
- await Config.Config.DiscoverNodesAsync(allAds, exitToken);
+ //Build the discovery map from all the known nodes to find all known nodes in the entire cluster
+ await clusterMan.DiscoverNodesAsync(allAds, exitToken);
}
//Log the discovered nodes if verbose logging is enabled
if (IsDebug)
{
- CacheNodeAdvertisment[] found = Config.Config.NodeCollection.GetAllNodes();
+ CacheNodeAdvertisment[] found = clusterMan.DiscoveredNodes.GetAllNodes();
Log.Debug("Discovered {count} nodes\n\t{nodes}", found.Length, found.Select(static s => s.NodeId));
}
@@ -168,16 +135,16 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
{
if (IsDebug)
{
- Log.Debug("Well known node discovery failed, waiting for {idel} before retrying", WhenWellKnownResolveFailed);
+ Log.Debug("Well known node discovery failed, waiting for {idel} before retrying", CacheConstants.Delays.WellKnownResolveFailed);
}
//Wait for shorter duration
- await Task.Delay(WhenWellKnownResolveFailed, exitToken);
+ await Task.Delay(CacheConstants.Delays.WellKnownResolveFailed, exitToken);
}
else
{
//Delay the next discovery
- await Task.Delay(Config.DiscoveryInterval, exitToken);
+ await Task.Delay(clusterConf.DiscoveryInterval, exitToken);
}
}
}
@@ -188,7 +155,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
}
finally
{
-
+ Monitor.Dispose();
}
//Wait for the watcher to exit
@@ -197,10 +164,11 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
private IEnumerable<CacheNodeAdvertisment> GetMonitorAds()
{
+ string selfId = (clusterMan.Config as CacheNodeConfiguration)!.NodeId;
return Monitor.GetAllPeers()
.Where(static p => p.Advertisment != null)
//Without us
- .Where(n => n.NodeId != Config.Config.NodeId)
+ .Where(n => !string.Equals(n.NodeId, selfId, StringComparison.OrdinalIgnoreCase))
.Select(static p => p.Advertisment!);
}
@@ -222,7 +190,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
//Use the monitor to get the initial peers
IEnumerable<CacheNodeAdvertisment> ads = GetMonitorAds();
- ((NodeDiscoveryCollection)Config.Config.NodeCollection).AddManualNodes(ads);
+ clusterMan.AddManualNodes(ads);
}
}
catch (OperationCanceledException)
@@ -239,7 +207,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
lock (_connectedPeers)
{
//Get all discovered peers
- CacheNodeAdvertisment[] peers = Config.Config.NodeCollection.GetAllNodes();
+ CacheNodeAdvertisment[] peers = clusterMan.DiscoveredNodes.GetAllNodes();
//Get the difference between the discovered peers and the connected peers
return peers.Except(_connectedPeers).ToArray();
@@ -265,31 +233,5 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
_connectedPeers.Remove(peer);
}
}
-
-
- private sealed record class ErrorHandler(ILogProvider Logger) : ICacheDiscoveryErrorHandler
- {
- public void OnDiscoveryError(CacheNodeAdvertisment errorNode, Exception ex)
- {
-
- if (ex is HttpRequestException hre)
- {
- if (hre.InnerException is SocketException se)
- {
- //traisnport failed
- Logger.Warn("Failed to connect to server {serv} because {err}", errorNode, se.Message);
- }
- else
- {
- Logger.Error("Failed to connect to node {n}\n{err}", errorNode, hre);
- }
- }
- else
- {
- Logger.Error("Failed to discover nodes from nodeid {nid}, with error\n{err}", errorNode, ex);
- }
-
- }
- }
}
}