aboutsummaryrefslogtreecommitdiff
path: root/plugins/ObjectCacheServer/src/Clustering
diff options
context:
space:
mode:
authorLibravatar vnugent <public@vaughnnugent.com>2024-03-06 21:30:58 -0500
committerLibravatar vnugent <public@vaughnnugent.com>2024-03-06 21:30:58 -0500
commit4d8cfc10382105b0acbd94df93ad3d05ff91db54 (patch)
treed9795c60b2e2a4871eddff43311866784c1c054b /plugins/ObjectCacheServer/src/Clustering
parent016a96a80cce025a86c6cf26707738f6a2eb2658 (diff)
refactor: #2 Centralize server state, default discovery endpoints & more
Diffstat (limited to 'plugins/ObjectCacheServer/src/Clustering')
-rw-r--r--plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs48
-rw-r--r--plugins/ObjectCacheServer/src/Clustering/CachePeerMonitor.cs11
-rw-r--r--plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs93
3 files changed, 45 insertions, 107 deletions
diff --git a/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs b/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs
index a240dde..0a1bb4d 100644
--- a/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs
+++ b/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2023 Vaughn Nugent
+* Copyright (c) 2024 Vaughn Nugent
*
* Library: VNLib
* Package: ObjectCacheServer
@@ -36,7 +36,6 @@ using VNLib.Net.Messaging.FBM;
using VNLib.Net.Messaging.FBM.Client;
using VNLib.Plugins.Extensions.Loading;
using VNLib.Data.Caching.Extensions.Clustering;
-using VNLib.Data.Caching.ObjectCache.Server.Cache;
namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
{
@@ -56,38 +55,37 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
internal sealed class CacheNodeReplicationMaanger : IAsyncBackgroundWork
{
private const string LOG_SCOPE_NAME = "REPL";
+ private const string FBM_LOG_SCOPE_NAME = "REPL-CLNT";
private static readonly TimeSpan GetItemTimeout = TimeSpan.FromSeconds(10);
private const int MAX_MESSAGE_SIZE = 12 * 1024;
private readonly PluginBase _plugin;
private readonly ILogProvider _log;
- private readonly NodeConfig _nodeConfig;
- private readonly ICacheStore _cacheStore;
- private readonly ICachePeerAdapter _peerAdapter;
private readonly FBMClientFactory _clientFactory;
-
+ private readonly ObjectCacheSystemState _sysState;
+
private readonly bool _isDebug;
private int _openConnections;
public CacheNodeReplicationMaanger(PluginBase plugin)
{
- //Load the node config
- _nodeConfig = plugin.GetOrCreateSingleton<NodeConfig>();
- _cacheStore = plugin.GetOrCreateSingleton<CacheStore>();
- _peerAdapter = plugin.GetOrCreateSingleton<PeerDiscoveryManager>();
+ _sysState = plugin.GetOrCreateSingleton<ObjectCacheSystemState>();
//Init fbm config with fixed message size
FBMClientConfig clientConfig = FBMDataCacheExtensions.GetDefaultConfig(
- (plugin as ObjectCacheServerEntry)!.ListenerHeap,
+ _sysState.SharedCacheHeap,
MAX_MESSAGE_SIZE,
- debugLog: plugin.IsDebug() ? plugin.Log : null
+ debugLog: plugin.IsDebug() ? plugin.Log.CreateScope(FBM_LOG_SCOPE_NAME) : null
);
//Init ws fallback factory and client factory
- FBMFallbackClientWsFactory wsFactory = new();
- _clientFactory = new(in clientConfig, wsFactory);
+ _clientFactory = new(
+ ref clientConfig,
+ new FBMFallbackClientWsFactory(),
+ (int)_sysState.Configuration.MaxPeerConnections
+ );
_plugin = plugin;
_isDebug = plugin.IsDebug();
@@ -103,7 +101,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
while (true)
{
//Get all new peers
- CacheNodeAdvertisment[] peers = _peerAdapter.GetNewPeers();
+ CacheNodeAdvertisment[] peers = _sysState.PeerDiscovery.GetNewPeers();
if (peers.Length == 0 && _isDebug)
{
@@ -111,7 +109,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
}
//Make sure we don't exceed the max connections
- if(_openConnections >= _nodeConfig.MaxPeerConnections)
+ if(_openConnections >= _sysState.Configuration.MaxPeerConnections)
{
if (_isDebug)
{
@@ -150,13 +148,13 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
private async Task OnNewPeerDoWorkAsync(CacheNodeAdvertisment newPeer, ILogProvider log, CancellationToken exitToken)
{
- _ = newPeer ?? throw new ArgumentNullException(nameof(newPeer));
+ ArgumentNullException.ThrowIfNull(newPeer);
//Setup client
FBMClient client = _clientFactory.CreateClient();
//Add peer to monitor
- _peerAdapter.OnPeerListenerAttached(newPeer);
+ _sysState.PeerDiscovery.OnPeerListenerAttached(newPeer);
Interlocked.Increment(ref _openConnections);
@@ -165,12 +163,12 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
log.Information("Establishing replication connection to peer {server}...", newPeer.NodeId);
//Connect to the server
- await client.ConnectToCacheAsync(newPeer, _nodeConfig.Config, exitToken);
+ await client.ConnectToCacheAsync(newPeer, _sysState.Configuration.Config, exitToken);
log.Information("Connected to {server}, starting queue listeners", newPeer.NodeId);
//Start worker tasks
- List<Task> workerTasks = new();
+ List<Task> workerTasks = [];
for (int i = 0; i < Environment.ProcessorCount; i++)
{
@@ -187,6 +185,10 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
//Disconnect client gracefully
await client.DisconnectAsync(CancellationToken.None);
}
+ catch(FBMServerNegiationException fbm)
+ {
+ log.Error("Failed to negotiate buffer configuration, check your cache memory configuration. Error:{err}", fbm.Message);
+ }
catch (InvalidResponseException ie)
{
//See if the plugin is unloading
@@ -227,7 +229,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
client.Dispose();
//Notify monitor of disconnect
- _peerAdapter.OnPeerListenerDetatched(newPeer);
+ _sysState.PeerDiscovery.OnPeerListenerDetatched(newPeer);
}
}
@@ -259,7 +261,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
return;
case "deleted":
//Delete the object from the store
- await _cacheStore.DeleteItemAsync(changedObject.CurrentId, CancellationToken.None);
+ await _sysState.InternalStore.DeleteItemAsync(changedObject.CurrentId, CancellationToken.None);
break;
case "modified":
//Reload the record from the store
@@ -297,7 +299,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
if (ResponseCodes.Okay.Equals(status, StringComparison.Ordinal))
{
//Update the record
- await _cacheStore.AddOrUpdateBlobAsync(objectId, newId, static (t) => t.ResponseBody, response, cancellation);
+ await _sysState.InternalStore.AddOrUpdateBlobAsync(objectId, newId, static (t) => t.ResponseBody, response, cancellation);
log.Debug("Updated object {id}", objectId);
}
else
diff --git a/plugins/ObjectCacheServer/src/Clustering/CachePeerMonitor.cs b/plugins/ObjectCacheServer/src/Clustering/CachePeerMonitor.cs
index c49a54b..c3fbd8e 100644
--- a/plugins/ObjectCacheServer/src/Clustering/CachePeerMonitor.cs
+++ b/plugins/ObjectCacheServer/src/Clustering/CachePeerMonitor.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2023 Vaughn Nugent
+* Copyright (c) 2024 Vaughn Nugent
*
* Library: VNLib
* Package: ObjectCacheServer
@@ -29,7 +29,6 @@ using System.Collections.Generic;
using VNLib.Utils;
using VNLib.Utils.Extensions;
-using VNLib.Plugins;
namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
{
@@ -37,12 +36,9 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
internal sealed class CachePeerMonitor : VnDisposeable, IPeerMonitor
{
- private readonly LinkedList<ICachePeer> peers = new();
+ private readonly List<ICachePeer> peers = new();
private readonly ManualResetEvent newPeerTrigger = new (false);
- public CachePeerMonitor(PluginBase plugin)
- { }
-
/// <summary>
/// Waits for new peers to connect to the server
/// </summary>
@@ -70,7 +66,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
//When a peer is connected we can add it to the list so the replication manager can see it
lock(peers)
{
- peers.AddLast(peer);
+ peers.Add(peer);
}
//Trigger monitor when change occurs
@@ -92,6 +88,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
protected override void Free()
{
+ peers.Clear();
newPeerTrigger.Dispose();
}
}
diff --git a/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs b/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs
index 6475f9c..f22e1dd 100644
--- a/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs
+++ b/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2023 Vaughn Nugent
+* Copyright (c) 2024 Vaughn Nugent
*
* Library: VNLib
* Package: ObjectCacheServer
@@ -24,14 +24,11 @@
using System;
using System.Linq;
-using System.Net.Http;
using System.Threading;
-using System.Net.Sockets;
using System.Threading.Tasks;
using System.Collections.Generic;
using VNLib.Utils.Logging;
-using VNLib.Plugins;
using VNLib.Plugins.Extensions.Loading;
using VNLib.Data.Caching.Extensions;
using VNLib.Data.Caching.Extensions.Clustering;
@@ -43,9 +40,10 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
* This class is responsible for resolving and discovering peer nodes in the cluster network.
*/
- internal sealed class PeerDiscoveryManager : IAsyncBackgroundWork, ICachePeerAdapter
+ internal sealed class PeerDiscoveryManager(NodeConfig config, ILogProvider Log, bool IsDebug, bool HasWellKnown) : IAsyncBackgroundWork, ICachePeerAdapter
{
- private const string LOG_SCOPE_NAME = "DISC";
+ internal const string LOG_SCOPE_NAME = "DISC";
+
/*
* The initial discovery delay. This allows for the server to initialize before
* starting the discovery process. This will probably be a shorter delay
@@ -54,43 +52,9 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
private static readonly TimeSpan InitialDelay = TimeSpan.FromSeconds(15);
private static readonly TimeSpan WhenWellKnownResolveFailed = TimeSpan.FromSeconds(20);
-
- private readonly List<CacheNodeAdvertisment> _connectedPeers;
- private readonly NodeConfig Config;
- private readonly CachePeerMonitor Monitor;
- private readonly ILogProvider Log;
- private readonly bool IsDebug;
- private readonly bool HasWellKnown;
-
- public PeerDiscoveryManager(PluginBase plugin)
- {
- //Get config
- Config = plugin.GetOrCreateSingleton<NodeConfig>();
-
- //Get the known peers array from config, its allowed to be null for master nodes
- IConfigScope? config = plugin.TryGetConfig("known_peers");
- string[] kownPeers = config?.Deserialze<string[]>() ?? Array.Empty<string>();
-
- //Add known peers to the monitor
- Config.Config.WithInitialPeers(kownPeers.Select(static s => new Uri(s)));
-
- HasWellKnown = kownPeers.Length > 0;
-
- //Get the peer monitor
- Monitor = plugin.GetOrCreateSingleton<CachePeerMonitor>();
-
- _connectedPeers = new();
-
- //Create scoped logger
- Log = plugin.Log.CreateScope(LOG_SCOPE_NAME);
-
- Log.Information("Inital peer nodes: {nodes}", kownPeers);
-
- //Setup discovery error handler
- Config.Config.WithErrorHandler(new ErrorHandler(Log));
-
- IsDebug = plugin.IsDebug();
- }
+ private readonly List<CacheNodeAdvertisment> _connectedPeers = [];
+ private readonly CachePeerMonitor Monitor = new();
+ private readonly VNCacheClusterManager clusterMan = new(config.Config);
async Task IAsyncBackgroundWork.DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken)
{
@@ -124,7 +88,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
}
//Resolve all known peers
- CacheNodeAdvertisment[] wellKnown = await Config.Config.ResolveWellKnownAsync(exitToken);
+ CacheNodeAdvertisment[] wellKnown = await clusterMan.ResolveWellKnownAsync(exitToken);
wellKnownFailed = wellKnown.Length == 0;
//Use the monitor to get the initial peers
@@ -136,13 +100,13 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
if (allAds.Length > 0)
{
//Discover all known nodes
- await Config.Config.DiscoverNodesAsync(allAds, exitToken);
+ await clusterMan.DiscoverNodesAsync(allAds, exitToken);
}
//Log the discovered nodes if verbose logging is enabled
if (IsDebug)
{
- CacheNodeAdvertisment[] found = Config.Config.NodeCollection.GetAllNodes();
+ CacheNodeAdvertisment[] found = clusterMan.DiscoveredNodes.GetAllNodes();
Log.Debug("Discovered {count} nodes\n\t{nodes}", found.Length, found.Select(static s => s.NodeId));
}
@@ -177,7 +141,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
else
{
//Delay the next discovery
- await Task.Delay(Config.DiscoveryInterval, exitToken);
+ await Task.Delay(config.DiscoveryInterval, exitToken);
}
}
}
@@ -188,7 +152,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
}
finally
{
-
+ Monitor.Dispose();
}
//Wait for the watcher to exit
@@ -197,10 +161,11 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
private IEnumerable<CacheNodeAdvertisment> GetMonitorAds()
{
+ string selfId = (clusterMan.Config as CacheNodeConfiguration)!.NodeId;
return Monitor.GetAllPeers()
.Where(static p => p.Advertisment != null)
//Without us
- .Where(n => n.NodeId != Config.Config.NodeId)
+ .Where(n => !string.Equals(n.NodeId, selfId, StringComparison.OrdinalIgnoreCase))
.Select(static p => p.Advertisment!);
}
@@ -222,7 +187,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
//Use the monitor to get the initial peers
IEnumerable<CacheNodeAdvertisment> ads = GetMonitorAds();
- ((NodeDiscoveryCollection)Config.Config.NodeCollection).AddManualNodes(ads);
+ clusterMan.AddManualNodes(ads);
}
}
catch (OperationCanceledException)
@@ -239,7 +204,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
lock (_connectedPeers)
{
//Get all discovered peers
- CacheNodeAdvertisment[] peers = Config.Config.NodeCollection.GetAllNodes();
+ CacheNodeAdvertisment[] peers = clusterMan.DiscoveredNodes.GetAllNodes();
//Get the difference between the discovered peers and the connected peers
return peers.Except(_connectedPeers).ToArray();
@@ -265,31 +230,5 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
_connectedPeers.Remove(peer);
}
}
-
-
- private sealed record class ErrorHandler(ILogProvider Logger) : ICacheDiscoveryErrorHandler
- {
- public void OnDiscoveryError(CacheNodeAdvertisment errorNode, Exception ex)
- {
-
- if (ex is HttpRequestException hre)
- {
- if (hre.InnerException is SocketException se)
- {
- //traisnport failed
- Logger.Warn("Failed to connect to server {serv} because {err}", errorNode, se.Message);
- }
- else
- {
- Logger.Error("Failed to connect to node {n}\n{err}", errorNode, hre);
- }
- }
- else
- {
- Logger.Error("Failed to discover nodes from nodeid {nid}, with error\n{err}", errorNode, ex);
- }
-
- }
- }
}
}