aboutsummaryrefslogtreecommitdiff
path: root/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs')
-rw-r--r--plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs85
1 files changed, 49 insertions, 36 deletions
diff --git a/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs b/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs
index a240dde..92f0352 100644
--- a/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs
+++ b/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2023 Vaughn Nugent
+* Copyright (c) 2024 Vaughn Nugent
*
* Library: VNLib
* Package: ObjectCacheServer
@@ -36,7 +36,6 @@ using VNLib.Net.Messaging.FBM;
using VNLib.Net.Messaging.FBM.Client;
using VNLib.Plugins.Extensions.Loading;
using VNLib.Data.Caching.Extensions.Clustering;
-using VNLib.Data.Caching.ObjectCache.Server.Cache;
namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
{
@@ -55,43 +54,36 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
internal sealed class CacheNodeReplicationMaanger : IAsyncBackgroundWork
{
- private const string LOG_SCOPE_NAME = "REPL";
-
- private static readonly TimeSpan GetItemTimeout = TimeSpan.FromSeconds(10);
- private const int MAX_MESSAGE_SIZE = 12 * 1024;
-
private readonly PluginBase _plugin;
private readonly ILogProvider _log;
- private readonly NodeConfig _nodeConfig;
- private readonly ICacheStore _cacheStore;
- private readonly ICachePeerAdapter _peerAdapter;
private readonly FBMClientFactory _clientFactory;
-
+ private readonly ObjectCacheSystemState _sysState;
+
private readonly bool _isDebug;
private int _openConnections;
public CacheNodeReplicationMaanger(PluginBase plugin)
{
- //Load the node config
- _nodeConfig = plugin.GetOrCreateSingleton<NodeConfig>();
- _cacheStore = plugin.GetOrCreateSingleton<CacheStore>();
- _peerAdapter = plugin.GetOrCreateSingleton<PeerDiscoveryManager>();
+ _sysState = plugin.GetOrCreateSingleton<ObjectCacheSystemState>();
//Init fbm config with fixed message size
FBMClientConfig clientConfig = FBMDataCacheExtensions.GetDefaultConfig(
- (plugin as ObjectCacheServerEntry)!.ListenerHeap,
- MAX_MESSAGE_SIZE,
- debugLog: plugin.IsDebug() ? plugin.Log : null
+ _sysState.SharedCacheHeap,
+ CacheConstants.MaxSyncMessageSize,
+ debugLog: plugin.IsDebug() ? plugin.Log.CreateScope(CacheConstants.LogScopes.ReplicationFbmDebug) : null
);
//Init ws fallback factory and client factory
- FBMFallbackClientWsFactory wsFactory = new();
- _clientFactory = new(in clientConfig, wsFactory);
+ _clientFactory = new(
+ ref clientConfig,
+ new FBMFallbackClientWsFactory(),
+ (int)_sysState.ClusterConfig.MaxPeerConnections
+ );
_plugin = plugin;
_isDebug = plugin.IsDebug();
- _log = plugin.Log.CreateScope(LOG_SCOPE_NAME);
+ _log = plugin.Log.CreateScope(CacheConstants.LogScopes.RepliactionManager);
}
public async Task DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken)
@@ -103,7 +95,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
while (true)
{
//Get all new peers
- CacheNodeAdvertisment[] peers = _peerAdapter.GetNewPeers();
+ CacheNodeAdvertisment[] peers = _sysState.PeerDiscovery.GetNewPeers();
if (peers.Length == 0 && _isDebug)
{
@@ -111,7 +103,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
}
//Make sure we don't exceed the max connections
- if(_openConnections >= _nodeConfig.MaxPeerConnections)
+ if(_openConnections >= _sysState.ClusterConfig.MaxPeerConnections)
{
if (_isDebug)
{
@@ -148,15 +140,24 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
_log.Information("Node replication worker exited");
}
+ /*
+ * This method is called when a new peer has connected (or discovered) to establish a
+ * replication connection.
+ */
private async Task OnNewPeerDoWorkAsync(CacheNodeAdvertisment newPeer, ILogProvider log, CancellationToken exitToken)
{
- _ = newPeer ?? throw new ArgumentNullException(nameof(newPeer));
-
- //Setup client
+ ArgumentNullException.ThrowIfNull(newPeer);
+
FBMClient client = _clientFactory.CreateClient();
- //Add peer to monitor
- _peerAdapter.OnPeerListenerAttached(newPeer);
+ /*
+ * Notify discovery that we will be listening to this peer
+ *
+ * This exists so when a new discovery happens, the work loop will produce
+ * the difference of new peers to existing peers, and we can connect to them.
+ * Avoiding infinite connections to the same peer.
+ */
+ _sysState.PeerDiscovery.OnPeerListenerAttached(newPeer);
Interlocked.Increment(ref _openConnections);
@@ -165,12 +166,12 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
log.Information("Establishing replication connection to peer {server}...", newPeer.NodeId);
//Connect to the server
- await client.ConnectToCacheAsync(newPeer, _nodeConfig.Config, exitToken);
+ await client.ConnectToCacheAsync(newPeer, _sysState.NodeConfig, exitToken);
log.Information("Connected to {server}, starting queue listeners", newPeer.NodeId);
//Start worker tasks
- List<Task> workerTasks = new();
+ List<Task> workerTasks = [];
for (int i = 0; i < Environment.ProcessorCount; i++)
{
@@ -187,6 +188,10 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
//Disconnect client gracefully
await client.DisconnectAsync(CancellationToken.None);
}
+ catch(FBMServerNegiationException fbm)
+ {
+ log.Error("Failed to negotiate buffer configuration, check your cache memory configuration. Error:{err}", fbm.Message);
+ }
catch (InvalidResponseException ie)
{
//See if the plugin is unloading
@@ -218,7 +223,15 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
}
catch (Exception ex)
{
- log.Warn("Lost connection to peer {h}\n {m}", newPeer.NodeId, ex);
+ //Avoid call stacks unless debug or higher logging levels
+ if (log.IsEnabled(LogLevel.Debug))
+ {
+ log.Warn("Lost connection to peer {h}\n {m}", newPeer.NodeId, ex);
+ }
+ else
+ {
+ log.Warn("Lost connection to peer {h}\n {m}", newPeer.NodeId, ex.Message);
+ }
}
finally
{
@@ -226,8 +239,8 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
client.Dispose();
- //Notify monitor of disconnect
- _peerAdapter.OnPeerListenerDetatched(newPeer);
+ //Notify monitor of disconnect to make it available again later
+ _sysState.PeerDiscovery.OnPeerListenerDetatched(newPeer);
}
}
@@ -259,7 +272,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
return;
case "deleted":
//Delete the object from the store
- await _cacheStore.DeleteItemAsync(changedObject.CurrentId, CancellationToken.None);
+ await _sysState.InternalStore.DeleteItemAsync(changedObject.CurrentId, CancellationToken.None);
break;
case "modified":
//Reload the record from the store
@@ -287,7 +300,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
modRequest.WriteHeader(ObjectId, string.IsNullOrWhiteSpace(newId) ? objectId : newId);
//Make request
- using FBMResponse response = await client.SendAsync(modRequest, GetItemTimeout, cancellation);
+ using FBMResponse response = await client.SendAsync(modRequest, CacheConstants.Delays.CacheSyncGetItemTimeout, cancellation);
response.ThrowIfNotSet();
@@ -297,7 +310,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
if (ResponseCodes.Okay.Equals(status, StringComparison.Ordinal))
{
//Update the record
- await _cacheStore.AddOrUpdateBlobAsync(objectId, newId, static (t) => t.ResponseBody, response, cancellation);
+ await _sysState.InternalStore.AddOrUpdateBlobAsync(objectId, newId, static (t) => t.ResponseBody, response, cancellation);
log.Debug("Updated object {id}", objectId);
}
else