From e5bb0ee302e789cb96e7ecfe839cbbcc8e3fd5d7 Mon Sep 17 00:00:00 2001 From: vnugent Date: Sun, 10 Mar 2024 16:46:50 -0400 Subject: Squashed commit of the following: commit 2f7565976472f0f056db60520bf253a776112c10 Merge: 323ff67 6b87785 Author: vnugent Date: Sun Mar 10 16:45:23 2024 -0400 merge master commit 323ff67badfc46ad638d75f059d60d9425ccb2fa Author: vnugent Date: Sun Mar 10 15:50:07 2024 -0400 ci(server): Conainerize and add vncache server packages commit 5d4192880654fd6e00e587814169415b42621327 Author: vnugent Date: Sat Mar 9 19:13:21 2024 -0500 chore: #2 Minor fixes and polish before release commit a4b3504bb891829074d1efde0433eae010862181 Author: vnugent Date: Sat Mar 9 16:30:44 2024 -0500 package updates commit 4d8cfc10382105b0acbd94df93ad3d05ff91db54 Author: vnugent Date: Wed Mar 6 21:30:58 2024 -0500 refactor: #2 Centralize server state, default discovery endpoints & more commit 016a96a80cce025a86c6cf26707738f6a2eb2658 Author: vnugent Date: Thu Feb 29 21:22:38 2024 -0500 feat: add future support for memory diagnostics, and some docs commit 456ead9bc8b0f61357bae93152ad0403c4940101 Author: vnugent Date: Tue Feb 13 14:46:35 2024 -0500 fix: #1 shared cluster index on linux & latested core updates commit a481d63f964a5d5204cac2e95141f37f9a28d573 Author: vnugent Date: Tue Jan 23 15:43:50 2024 -0500 cache extension api tweaks --- .../src/Clustering/CacheNodeReplicationMaanger.cs | 85 +++++++++++++--------- 1 file changed, 49 insertions(+), 36 deletions(-) (limited to 'plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs') diff --git a/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs b/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs index a240dde..92f0352 100644 --- a/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs +++ b/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2023 Vaughn Nugent +* Copyright (c) 2024 Vaughn Nugent * * Library: VNLib * Package: ObjectCacheServer @@ -36,7 +36,6 @@ using VNLib.Net.Messaging.FBM; using VNLib.Net.Messaging.FBM.Client; using VNLib.Plugins.Extensions.Loading; using VNLib.Data.Caching.Extensions.Clustering; -using VNLib.Data.Caching.ObjectCache.Server.Cache; namespace VNLib.Data.Caching.ObjectCache.Server.Clustering { @@ -55,43 +54,36 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering internal sealed class CacheNodeReplicationMaanger : IAsyncBackgroundWork { - private const string LOG_SCOPE_NAME = "REPL"; - - private static readonly TimeSpan GetItemTimeout = TimeSpan.FromSeconds(10); - private const int MAX_MESSAGE_SIZE = 12 * 1024; - private readonly PluginBase _plugin; private readonly ILogProvider _log; - private readonly NodeConfig _nodeConfig; - private readonly ICacheStore _cacheStore; - private readonly ICachePeerAdapter _peerAdapter; private readonly FBMClientFactory _clientFactory; - + private readonly ObjectCacheSystemState _sysState; + private readonly bool _isDebug; private int _openConnections; public CacheNodeReplicationMaanger(PluginBase plugin) { - //Load the node config - _nodeConfig = plugin.GetOrCreateSingleton(); - _cacheStore = plugin.GetOrCreateSingleton(); - _peerAdapter = plugin.GetOrCreateSingleton(); + _sysState = plugin.GetOrCreateSingleton(); //Init fbm config with fixed message size FBMClientConfig clientConfig = FBMDataCacheExtensions.GetDefaultConfig( - (plugin as ObjectCacheServerEntry)!.ListenerHeap, - MAX_MESSAGE_SIZE, - debugLog: plugin.IsDebug() ? plugin.Log : null + _sysState.SharedCacheHeap, + CacheConstants.MaxSyncMessageSize, + debugLog: plugin.IsDebug() ? plugin.Log.CreateScope(CacheConstants.LogScopes.ReplicationFbmDebug) : null ); //Init ws fallback factory and client factory - FBMFallbackClientWsFactory wsFactory = new(); - _clientFactory = new(in clientConfig, wsFactory); + _clientFactory = new( + ref clientConfig, + new FBMFallbackClientWsFactory(), + (int)_sysState.ClusterConfig.MaxPeerConnections + ); _plugin = plugin; _isDebug = plugin.IsDebug(); - _log = plugin.Log.CreateScope(LOG_SCOPE_NAME); + _log = plugin.Log.CreateScope(CacheConstants.LogScopes.RepliactionManager); } public async Task DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken) @@ -103,7 +95,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering while (true) { //Get all new peers - CacheNodeAdvertisment[] peers = _peerAdapter.GetNewPeers(); + CacheNodeAdvertisment[] peers = _sysState.PeerDiscovery.GetNewPeers(); if (peers.Length == 0 && _isDebug) { @@ -111,7 +103,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering } //Make sure we don't exceed the max connections - if(_openConnections >= _nodeConfig.MaxPeerConnections) + if(_openConnections >= _sysState.ClusterConfig.MaxPeerConnections) { if (_isDebug) { @@ -148,15 +140,24 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering _log.Information("Node replication worker exited"); } + /* + * This method is called when a new peer has connected (or discovered) to establish a + * replication connection. + */ private async Task OnNewPeerDoWorkAsync(CacheNodeAdvertisment newPeer, ILogProvider log, CancellationToken exitToken) { - _ = newPeer ?? throw new ArgumentNullException(nameof(newPeer)); - - //Setup client + ArgumentNullException.ThrowIfNull(newPeer); + FBMClient client = _clientFactory.CreateClient(); - //Add peer to monitor - _peerAdapter.OnPeerListenerAttached(newPeer); + /* + * Notify discovery that we will be listening to this peer + * + * This exists so when a new discovery happens, the work loop will produce + * the difference of new peers to existing peers, and we can connect to them. + * Avoiding infinite connections to the same peer. + */ + _sysState.PeerDiscovery.OnPeerListenerAttached(newPeer); Interlocked.Increment(ref _openConnections); @@ -165,12 +166,12 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering log.Information("Establishing replication connection to peer {server}...", newPeer.NodeId); //Connect to the server - await client.ConnectToCacheAsync(newPeer, _nodeConfig.Config, exitToken); + await client.ConnectToCacheAsync(newPeer, _sysState.NodeConfig, exitToken); log.Information("Connected to {server}, starting queue listeners", newPeer.NodeId); //Start worker tasks - List workerTasks = new(); + List workerTasks = []; for (int i = 0; i < Environment.ProcessorCount; i++) { @@ -187,6 +188,10 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering //Disconnect client gracefully await client.DisconnectAsync(CancellationToken.None); } + catch(FBMServerNegiationException fbm) + { + log.Error("Failed to negotiate buffer configuration, check your cache memory configuration. Error:{err}", fbm.Message); + } catch (InvalidResponseException ie) { //See if the plugin is unloading @@ -218,7 +223,15 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering } catch (Exception ex) { - log.Warn("Lost connection to peer {h}\n {m}", newPeer.NodeId, ex); + //Avoid call stacks unless debug or higher logging levels + if (log.IsEnabled(LogLevel.Debug)) + { + log.Warn("Lost connection to peer {h}\n {m}", newPeer.NodeId, ex); + } + else + { + log.Warn("Lost connection to peer {h}\n {m}", newPeer.NodeId, ex.Message); + } } finally { @@ -226,8 +239,8 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering client.Dispose(); - //Notify monitor of disconnect - _peerAdapter.OnPeerListenerDetatched(newPeer); + //Notify monitor of disconnect to make it available again later + _sysState.PeerDiscovery.OnPeerListenerDetatched(newPeer); } } @@ -259,7 +272,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering return; case "deleted": //Delete the object from the store - await _cacheStore.DeleteItemAsync(changedObject.CurrentId, CancellationToken.None); + await _sysState.InternalStore.DeleteItemAsync(changedObject.CurrentId, CancellationToken.None); break; case "modified": //Reload the record from the store @@ -287,7 +300,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering modRequest.WriteHeader(ObjectId, string.IsNullOrWhiteSpace(newId) ? objectId : newId); //Make request - using FBMResponse response = await client.SendAsync(modRequest, GetItemTimeout, cancellation); + using FBMResponse response = await client.SendAsync(modRequest, CacheConstants.Delays.CacheSyncGetItemTimeout, cancellation); response.ThrowIfNotSet(); @@ -297,7 +310,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering if (ResponseCodes.Okay.Equals(status, StringComparison.Ordinal)) { //Update the record - await _cacheStore.AddOrUpdateBlobAsync(objectId, newId, static (t) => t.ResponseBody, response, cancellation); + await _sysState.InternalStore.AddOrUpdateBlobAsync(objectId, newId, static (t) => t.ResponseBody, response, cancellation); log.Debug("Updated object {id}", objectId); } else -- cgit