diff options
Diffstat (limited to 'plugins/ObjectCacheServer/src/Clustering')
-rw-r--r-- | plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs | 47 | ||||
-rw-r--r-- | plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs | 36 |
2 files changed, 44 insertions, 39 deletions
diff --git a/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs b/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs index 0a1bb4d..92f0352 100644 --- a/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs +++ b/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs @@ -54,12 +54,6 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering internal sealed class CacheNodeReplicationMaanger : IAsyncBackgroundWork { - private const string LOG_SCOPE_NAME = "REPL"; - private const string FBM_LOG_SCOPE_NAME = "REPL-CLNT"; - - private static readonly TimeSpan GetItemTimeout = TimeSpan.FromSeconds(10); - private const int MAX_MESSAGE_SIZE = 12 * 1024; - private readonly PluginBase _plugin; private readonly ILogProvider _log; private readonly FBMClientFactory _clientFactory; @@ -76,20 +70,20 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering //Init fbm config with fixed message size FBMClientConfig clientConfig = FBMDataCacheExtensions.GetDefaultConfig( _sysState.SharedCacheHeap, - MAX_MESSAGE_SIZE, - debugLog: plugin.IsDebug() ? plugin.Log.CreateScope(FBM_LOG_SCOPE_NAME) : null + CacheConstants.MaxSyncMessageSize, + debugLog: plugin.IsDebug() ? plugin.Log.CreateScope(CacheConstants.LogScopes.ReplicationFbmDebug) : null ); //Init ws fallback factory and client factory _clientFactory = new( ref clientConfig, new FBMFallbackClientWsFactory(), - (int)_sysState.Configuration.MaxPeerConnections + (int)_sysState.ClusterConfig.MaxPeerConnections ); _plugin = plugin; _isDebug = plugin.IsDebug(); - _log = plugin.Log.CreateScope(LOG_SCOPE_NAME); + _log = plugin.Log.CreateScope(CacheConstants.LogScopes.RepliactionManager); } public async Task DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken) @@ -109,7 +103,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering } //Make sure we don't exceed the max connections - if(_openConnections >= _sysState.Configuration.MaxPeerConnections) + if(_openConnections >= _sysState.ClusterConfig.MaxPeerConnections) { if (_isDebug) { @@ -146,14 +140,23 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering _log.Information("Node replication worker exited"); } + /* + * This method is called when a new peer has connected (or discovered) to establish a + * replication connection. + */ private async Task OnNewPeerDoWorkAsync(CacheNodeAdvertisment newPeer, ILogProvider log, CancellationToken exitToken) { ArgumentNullException.ThrowIfNull(newPeer); - - //Setup client + FBMClient client = _clientFactory.CreateClient(); - //Add peer to monitor + /* + * Notify discovery that we will be listening to this peer + * + * This exists so when a new discovery happens, the work loop will produce + * the difference of new peers to existing peers, and we can connect to them. + * Avoiding infinite connections to the same peer. + */ _sysState.PeerDiscovery.OnPeerListenerAttached(newPeer); Interlocked.Increment(ref _openConnections); @@ -163,7 +166,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering log.Information("Establishing replication connection to peer {server}...", newPeer.NodeId); //Connect to the server - await client.ConnectToCacheAsync(newPeer, _sysState.Configuration.Config, exitToken); + await client.ConnectToCacheAsync(newPeer, _sysState.NodeConfig, exitToken); log.Information("Connected to {server}, starting queue listeners", newPeer.NodeId); @@ -220,7 +223,15 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering } catch (Exception ex) { - log.Warn("Lost connection to peer {h}\n {m}", newPeer.NodeId, ex); + //Avoid call stacks unless debug or higher logging levels + if (log.IsEnabled(LogLevel.Debug)) + { + log.Warn("Lost connection to peer {h}\n {m}", newPeer.NodeId, ex); + } + else + { + log.Warn("Lost connection to peer {h}\n {m}", newPeer.NodeId, ex.Message); + } } finally { @@ -228,7 +239,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering client.Dispose(); - //Notify monitor of disconnect + //Notify monitor of disconnect to make it available again later _sysState.PeerDiscovery.OnPeerListenerDetatched(newPeer); } } @@ -289,7 +300,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering modRequest.WriteHeader(ObjectId, string.IsNullOrWhiteSpace(newId) ? objectId : newId); //Make request - using FBMResponse response = await client.SendAsync(modRequest, GetItemTimeout, cancellation); + using FBMResponse response = await client.SendAsync(modRequest, CacheConstants.Delays.CacheSyncGetItemTimeout, cancellation); response.ThrowIfNotSet(); diff --git a/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs b/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs index f22e1dd..b8ee9c8 100644 --- a/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs +++ b/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs @@ -40,21 +40,13 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering * This class is responsible for resolving and discovering peer nodes in the cluster network. */ - internal sealed class PeerDiscoveryManager(NodeConfig config, ILogProvider Log, bool IsDebug, bool HasWellKnown) : IAsyncBackgroundWork, ICachePeerAdapter + internal sealed class PeerDiscoveryManager(CacheNodeConfiguration config, ServerClusterConfig clusterConf, ILogProvider Log, bool IsDebug, bool HasWellKnown) + : IAsyncBackgroundWork, ICachePeerAdapter { - internal const string LOG_SCOPE_NAME = "DISC"; - - /* - * The initial discovery delay. This allows for the server to initialize before - * starting the discovery process. This will probably be a shorter delay - * than a usual discovery interval. - */ - private static readonly TimeSpan InitialDelay = TimeSpan.FromSeconds(15); - private static readonly TimeSpan WhenWellKnownResolveFailed = TimeSpan.FromSeconds(20); private readonly List<CacheNodeAdvertisment> _connectedPeers = []; private readonly CachePeerMonitor Monitor = new(); - private readonly VNCacheClusterManager clusterMan = new(config.Config); + private readonly VNCacheClusterManager clusterMan = new(config); async Task IAsyncBackgroundWork.DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken) { @@ -67,12 +59,11 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering //Start the change listener Task watcher = WatchForPeersAsync(exitToken); - Log.Information("Node discovery worker started, waiting for {idel} to start initial discovery", InitialDelay); + Log.Information("Node discovery worker started, waiting for {idel} to start initial discovery", CacheConstants.Delays.InitialDiscovery); try - { - //Wait for the initial delay - await Task.Delay(InitialDelay, exitToken); + { + await Task.Delay(CacheConstants.Delays.InitialDiscovery, exitToken); Log.Debug("Begining discovery loop"); @@ -87,19 +78,22 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering Log.Debug("Begining node discovery"); } - //Resolve all known peers + /* + * On every loop we will need to resolve well-known servers incase they go down + * or change. There probably should be some more advanced logic and caching here. + */ CacheNodeAdvertisment[] wellKnown = await clusterMan.ResolveWellKnownAsync(exitToken); wellKnownFailed = wellKnown.Length == 0; //Use the monitor to get the initial peers IEnumerable<CacheNodeAdvertisment> ads = GetMonitorAds(); - //Combine well-known with new connected peers + //Combine well-known peers that are currently connected to this server CacheNodeAdvertisment[] allAds = ads.Union(wellKnown).ToArray(); if (allAds.Length > 0) { - //Discover all known nodes + //Build the discovery map from all the known nodes to find all known nodes in the entire cluster await clusterMan.DiscoverNodesAsync(allAds, exitToken); } @@ -132,16 +126,16 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering { if (IsDebug) { - Log.Debug("Well known node discovery failed, waiting for {idel} before retrying", WhenWellKnownResolveFailed); + Log.Debug("Well known node discovery failed, waiting for {idel} before retrying", CacheConstants.Delays.WellKnownResolveFailed); } //Wait for shorter duration - await Task.Delay(WhenWellKnownResolveFailed, exitToken); + await Task.Delay(CacheConstants.Delays.WellKnownResolveFailed, exitToken); } else { //Delay the next discovery - await Task.Delay(config.DiscoveryInterval, exitToken); + await Task.Delay(clusterConf.DiscoveryInterval, exitToken); } } } |