aboutsummaryrefslogtreecommitdiff
path: root/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs
diff options
context:
space:
mode:
authorLibravatar vnugent <public@vaughnnugent.com>2024-03-06 21:30:58 -0500
committerLibravatar vnugent <public@vaughnnugent.com>2024-03-06 21:30:58 -0500
commit4d8cfc10382105b0acbd94df93ad3d05ff91db54 (patch)
treed9795c60b2e2a4871eddff43311866784c1c054b /plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs
parent016a96a80cce025a86c6cf26707738f6a2eb2658 (diff)
refactor: #2 Centralize server state, default discovery endpoints & more
Diffstat (limited to 'plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs')
-rw-r--r--plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs48
1 files changed, 25 insertions, 23 deletions
diff --git a/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs b/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs
index a240dde..0a1bb4d 100644
--- a/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs
+++ b/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2023 Vaughn Nugent
+* Copyright (c) 2024 Vaughn Nugent
*
* Library: VNLib
* Package: ObjectCacheServer
@@ -36,7 +36,6 @@ using VNLib.Net.Messaging.FBM;
using VNLib.Net.Messaging.FBM.Client;
using VNLib.Plugins.Extensions.Loading;
using VNLib.Data.Caching.Extensions.Clustering;
-using VNLib.Data.Caching.ObjectCache.Server.Cache;
namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
{
@@ -56,38 +55,37 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
internal sealed class CacheNodeReplicationMaanger : IAsyncBackgroundWork
{
private const string LOG_SCOPE_NAME = "REPL";
+ private const string FBM_LOG_SCOPE_NAME = "REPL-CLNT";
private static readonly TimeSpan GetItemTimeout = TimeSpan.FromSeconds(10);
private const int MAX_MESSAGE_SIZE = 12 * 1024;
private readonly PluginBase _plugin;
private readonly ILogProvider _log;
- private readonly NodeConfig _nodeConfig;
- private readonly ICacheStore _cacheStore;
- private readonly ICachePeerAdapter _peerAdapter;
private readonly FBMClientFactory _clientFactory;
-
+ private readonly ObjectCacheSystemState _sysState;
+
private readonly bool _isDebug;
private int _openConnections;
public CacheNodeReplicationMaanger(PluginBase plugin)
{
- //Load the node config
- _nodeConfig = plugin.GetOrCreateSingleton<NodeConfig>();
- _cacheStore = plugin.GetOrCreateSingleton<CacheStore>();
- _peerAdapter = plugin.GetOrCreateSingleton<PeerDiscoveryManager>();
+ _sysState = plugin.GetOrCreateSingleton<ObjectCacheSystemState>();
//Init fbm config with fixed message size
FBMClientConfig clientConfig = FBMDataCacheExtensions.GetDefaultConfig(
- (plugin as ObjectCacheServerEntry)!.ListenerHeap,
+ _sysState.SharedCacheHeap,
MAX_MESSAGE_SIZE,
- debugLog: plugin.IsDebug() ? plugin.Log : null
+ debugLog: plugin.IsDebug() ? plugin.Log.CreateScope(FBM_LOG_SCOPE_NAME) : null
);
//Init ws fallback factory and client factory
- FBMFallbackClientWsFactory wsFactory = new();
- _clientFactory = new(in clientConfig, wsFactory);
+ _clientFactory = new(
+ ref clientConfig,
+ new FBMFallbackClientWsFactory(),
+ (int)_sysState.Configuration.MaxPeerConnections
+ );
_plugin = plugin;
_isDebug = plugin.IsDebug();
@@ -103,7 +101,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
while (true)
{
//Get all new peers
- CacheNodeAdvertisment[] peers = _peerAdapter.GetNewPeers();
+ CacheNodeAdvertisment[] peers = _sysState.PeerDiscovery.GetNewPeers();
if (peers.Length == 0 && _isDebug)
{
@@ -111,7 +109,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
}
//Make sure we don't exceed the max connections
- if(_openConnections >= _nodeConfig.MaxPeerConnections)
+ if(_openConnections >= _sysState.Configuration.MaxPeerConnections)
{
if (_isDebug)
{
@@ -150,13 +148,13 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
private async Task OnNewPeerDoWorkAsync(CacheNodeAdvertisment newPeer, ILogProvider log, CancellationToken exitToken)
{
- _ = newPeer ?? throw new ArgumentNullException(nameof(newPeer));
+ ArgumentNullException.ThrowIfNull(newPeer);
//Setup client
FBMClient client = _clientFactory.CreateClient();
//Add peer to monitor
- _peerAdapter.OnPeerListenerAttached(newPeer);
+ _sysState.PeerDiscovery.OnPeerListenerAttached(newPeer);
Interlocked.Increment(ref _openConnections);
@@ -165,12 +163,12 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
log.Information("Establishing replication connection to peer {server}...", newPeer.NodeId);
//Connect to the server
- await client.ConnectToCacheAsync(newPeer, _nodeConfig.Config, exitToken);
+ await client.ConnectToCacheAsync(newPeer, _sysState.Configuration.Config, exitToken);
log.Information("Connected to {server}, starting queue listeners", newPeer.NodeId);
//Start worker tasks
- List<Task> workerTasks = new();
+ List<Task> workerTasks = [];
for (int i = 0; i < Environment.ProcessorCount; i++)
{
@@ -187,6 +185,10 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
//Disconnect client gracefully
await client.DisconnectAsync(CancellationToken.None);
}
+ catch(FBMServerNegiationException fbm)
+ {
+ log.Error("Failed to negotiate buffer configuration, check your cache memory configuration. Error:{err}", fbm.Message);
+ }
catch (InvalidResponseException ie)
{
//See if the plugin is unloading
@@ -227,7 +229,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
client.Dispose();
//Notify monitor of disconnect
- _peerAdapter.OnPeerListenerDetatched(newPeer);
+ _sysState.PeerDiscovery.OnPeerListenerDetatched(newPeer);
}
}
@@ -259,7 +261,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
return;
case "deleted":
//Delete the object from the store
- await _cacheStore.DeleteItemAsync(changedObject.CurrentId, CancellationToken.None);
+ await _sysState.InternalStore.DeleteItemAsync(changedObject.CurrentId, CancellationToken.None);
break;
case "modified":
//Reload the record from the store
@@ -297,7 +299,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
if (ResponseCodes.Okay.Equals(status, StringComparison.Ordinal))
{
//Update the record
- await _cacheStore.AddOrUpdateBlobAsync(objectId, newId, static (t) => t.ResponseBody, response, cancellation);
+ await _sysState.InternalStore.AddOrUpdateBlobAsync(objectId, newId, static (t) => t.ResponseBody, response, cancellation);
log.Debug("Updated object {id}", objectId);
}
else