aboutsummaryrefslogtreecommitdiff
path: root/plugins/ObjectCacheServer/src/Clustering
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/ObjectCacheServer/src/Clustering')
-rw-r--r--plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs47
-rw-r--r--plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs36
2 files changed, 44 insertions, 39 deletions
diff --git a/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs b/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs
index 0a1bb4d..92f0352 100644
--- a/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs
+++ b/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs
@@ -54,12 +54,6 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
internal sealed class CacheNodeReplicationMaanger : IAsyncBackgroundWork
{
- private const string LOG_SCOPE_NAME = "REPL";
- private const string FBM_LOG_SCOPE_NAME = "REPL-CLNT";
-
- private static readonly TimeSpan GetItemTimeout = TimeSpan.FromSeconds(10);
- private const int MAX_MESSAGE_SIZE = 12 * 1024;
-
private readonly PluginBase _plugin;
private readonly ILogProvider _log;
private readonly FBMClientFactory _clientFactory;
@@ -76,20 +70,20 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
//Init fbm config with fixed message size
FBMClientConfig clientConfig = FBMDataCacheExtensions.GetDefaultConfig(
_sysState.SharedCacheHeap,
- MAX_MESSAGE_SIZE,
- debugLog: plugin.IsDebug() ? plugin.Log.CreateScope(FBM_LOG_SCOPE_NAME) : null
+ CacheConstants.MaxSyncMessageSize,
+ debugLog: plugin.IsDebug() ? plugin.Log.CreateScope(CacheConstants.LogScopes.ReplicationFbmDebug) : null
);
//Init ws fallback factory and client factory
_clientFactory = new(
ref clientConfig,
new FBMFallbackClientWsFactory(),
- (int)_sysState.Configuration.MaxPeerConnections
+ (int)_sysState.ClusterConfig.MaxPeerConnections
);
_plugin = plugin;
_isDebug = plugin.IsDebug();
- _log = plugin.Log.CreateScope(LOG_SCOPE_NAME);
+ _log = plugin.Log.CreateScope(CacheConstants.LogScopes.RepliactionManager);
}
public async Task DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken)
@@ -109,7 +103,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
}
//Make sure we don't exceed the max connections
- if(_openConnections >= _sysState.Configuration.MaxPeerConnections)
+ if(_openConnections >= _sysState.ClusterConfig.MaxPeerConnections)
{
if (_isDebug)
{
@@ -146,14 +140,23 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
_log.Information("Node replication worker exited");
}
+ /*
+ * This method is called when a new peer has connected (or discovered) to establish a
+ * replication connection.
+ */
private async Task OnNewPeerDoWorkAsync(CacheNodeAdvertisment newPeer, ILogProvider log, CancellationToken exitToken)
{
ArgumentNullException.ThrowIfNull(newPeer);
-
- //Setup client
+
FBMClient client = _clientFactory.CreateClient();
- //Add peer to monitor
+ /*
+ * Notify discovery that we will be listening to this peer
+ *
+ * This exists so when a new discovery happens, the work loop will produce
+ * the difference of new peers to existing peers, and we can connect to them.
+ * Avoiding infinite connections to the same peer.
+ */
_sysState.PeerDiscovery.OnPeerListenerAttached(newPeer);
Interlocked.Increment(ref _openConnections);
@@ -163,7 +166,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
log.Information("Establishing replication connection to peer {server}...", newPeer.NodeId);
//Connect to the server
- await client.ConnectToCacheAsync(newPeer, _sysState.Configuration.Config, exitToken);
+ await client.ConnectToCacheAsync(newPeer, _sysState.NodeConfig, exitToken);
log.Information("Connected to {server}, starting queue listeners", newPeer.NodeId);
@@ -220,7 +223,15 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
}
catch (Exception ex)
{
- log.Warn("Lost connection to peer {h}\n {m}", newPeer.NodeId, ex);
+ //Avoid call stacks unless debug or higher logging levels
+ if (log.IsEnabled(LogLevel.Debug))
+ {
+ log.Warn("Lost connection to peer {h}\n {m}", newPeer.NodeId, ex);
+ }
+ else
+ {
+ log.Warn("Lost connection to peer {h}\n {m}", newPeer.NodeId, ex.Message);
+ }
}
finally
{
@@ -228,7 +239,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
client.Dispose();
- //Notify monitor of disconnect
+ //Notify monitor of disconnect to make it available again later
_sysState.PeerDiscovery.OnPeerListenerDetatched(newPeer);
}
}
@@ -289,7 +300,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
modRequest.WriteHeader(ObjectId, string.IsNullOrWhiteSpace(newId) ? objectId : newId);
//Make request
- using FBMResponse response = await client.SendAsync(modRequest, GetItemTimeout, cancellation);
+ using FBMResponse response = await client.SendAsync(modRequest, CacheConstants.Delays.CacheSyncGetItemTimeout, cancellation);
response.ThrowIfNotSet();
diff --git a/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs b/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs
index f22e1dd..b8ee9c8 100644
--- a/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs
+++ b/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs
@@ -40,21 +40,13 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
* This class is responsible for resolving and discovering peer nodes in the cluster network.
*/
- internal sealed class PeerDiscoveryManager(NodeConfig config, ILogProvider Log, bool IsDebug, bool HasWellKnown) : IAsyncBackgroundWork, ICachePeerAdapter
+ internal sealed class PeerDiscoveryManager(CacheNodeConfiguration config, ServerClusterConfig clusterConf, ILogProvider Log, bool IsDebug, bool HasWellKnown)
+ : IAsyncBackgroundWork, ICachePeerAdapter
{
- internal const string LOG_SCOPE_NAME = "DISC";
-
- /*
- * The initial discovery delay. This allows for the server to initialize before
- * starting the discovery process. This will probably be a shorter delay
- * than a usual discovery interval.
- */
- private static readonly TimeSpan InitialDelay = TimeSpan.FromSeconds(15);
- private static readonly TimeSpan WhenWellKnownResolveFailed = TimeSpan.FromSeconds(20);
private readonly List<CacheNodeAdvertisment> _connectedPeers = [];
private readonly CachePeerMonitor Monitor = new();
- private readonly VNCacheClusterManager clusterMan = new(config.Config);
+ private readonly VNCacheClusterManager clusterMan = new(config);
async Task IAsyncBackgroundWork.DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken)
{
@@ -67,12 +59,11 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
//Start the change listener
Task watcher = WatchForPeersAsync(exitToken);
- Log.Information("Node discovery worker started, waiting for {idel} to start initial discovery", InitialDelay);
+ Log.Information("Node discovery worker started, waiting for {idel} to start initial discovery", CacheConstants.Delays.InitialDiscovery);
try
- {
- //Wait for the initial delay
- await Task.Delay(InitialDelay, exitToken);
+ {
+ await Task.Delay(CacheConstants.Delays.InitialDiscovery, exitToken);
Log.Debug("Begining discovery loop");
@@ -87,19 +78,22 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
Log.Debug("Begining node discovery");
}
- //Resolve all known peers
+ /*
+ * On every loop we will need to resolve well-known servers incase they go down
+ * or change. There probably should be some more advanced logic and caching here.
+ */
CacheNodeAdvertisment[] wellKnown = await clusterMan.ResolveWellKnownAsync(exitToken);
wellKnownFailed = wellKnown.Length == 0;
//Use the monitor to get the initial peers
IEnumerable<CacheNodeAdvertisment> ads = GetMonitorAds();
- //Combine well-known with new connected peers
+ //Combine well-known peers that are currently connected to this server
CacheNodeAdvertisment[] allAds = ads.Union(wellKnown).ToArray();
if (allAds.Length > 0)
{
- //Discover all known nodes
+ //Build the discovery map from all the known nodes to find all known nodes in the entire cluster
await clusterMan.DiscoverNodesAsync(allAds, exitToken);
}
@@ -132,16 +126,16 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
{
if (IsDebug)
{
- Log.Debug("Well known node discovery failed, waiting for {idel} before retrying", WhenWellKnownResolveFailed);
+ Log.Debug("Well known node discovery failed, waiting for {idel} before retrying", CacheConstants.Delays.WellKnownResolveFailed);
}
//Wait for shorter duration
- await Task.Delay(WhenWellKnownResolveFailed, exitToken);
+ await Task.Delay(CacheConstants.Delays.WellKnownResolveFailed, exitToken);
}
else
{
//Delay the next discovery
- await Task.Delay(config.DiscoveryInterval, exitToken);
+ await Task.Delay(clusterConf.DiscoveryInterval, exitToken);
}
}
}