From e5bb0ee302e789cb96e7ecfe839cbbcc8e3fd5d7 Mon Sep 17 00:00:00 2001 From: vnugent Date: Sun, 10 Mar 2024 16:46:50 -0400 Subject: Squashed commit of the following: commit 2f7565976472f0f056db60520bf253a776112c10 Merge: 323ff67 6b87785 Author: vnugent Date: Sun Mar 10 16:45:23 2024 -0400 merge master commit 323ff67badfc46ad638d75f059d60d9425ccb2fa Author: vnugent Date: Sun Mar 10 15:50:07 2024 -0400 ci(server): Conainerize and add vncache server packages commit 5d4192880654fd6e00e587814169415b42621327 Author: vnugent Date: Sat Mar 9 19:13:21 2024 -0500 chore: #2 Minor fixes and polish before release commit a4b3504bb891829074d1efde0433eae010862181 Author: vnugent Date: Sat Mar 9 16:30:44 2024 -0500 package updates commit 4d8cfc10382105b0acbd94df93ad3d05ff91db54 Author: vnugent Date: Wed Mar 6 21:30:58 2024 -0500 refactor: #2 Centralize server state, default discovery endpoints & more commit 016a96a80cce025a86c6cf26707738f6a2eb2658 Author: vnugent Date: Thu Feb 29 21:22:38 2024 -0500 feat: add future support for memory diagnostics, and some docs commit 456ead9bc8b0f61357bae93152ad0403c4940101 Author: vnugent Date: Tue Feb 13 14:46:35 2024 -0500 fix: #1 shared cluster index on linux & latested core updates commit a481d63f964a5d5204cac2e95141f37f9a28d573 Author: vnugent Date: Tue Jan 23 15:43:50 2024 -0500 cache extension api tweaks --- .../src/Clustering/CacheNodeReplicationMaanger.cs | 85 ++++++++------ .../src/Clustering/CachePeerMonitor.cs | 11 +- .../src/Clustering/PeerDiscoveryManager.cs | 130 ++++++--------------- 3 files changed, 89 insertions(+), 137 deletions(-) (limited to 'plugins/ObjectCacheServer/src/Clustering') diff --git a/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs b/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs index a240dde..92f0352 100644 --- a/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs +++ b/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2023 Vaughn Nugent +* Copyright (c) 2024 Vaughn Nugent * * Library: VNLib * Package: ObjectCacheServer @@ -36,7 +36,6 @@ using VNLib.Net.Messaging.FBM; using VNLib.Net.Messaging.FBM.Client; using VNLib.Plugins.Extensions.Loading; using VNLib.Data.Caching.Extensions.Clustering; -using VNLib.Data.Caching.ObjectCache.Server.Cache; namespace VNLib.Data.Caching.ObjectCache.Server.Clustering { @@ -55,43 +54,36 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering internal sealed class CacheNodeReplicationMaanger : IAsyncBackgroundWork { - private const string LOG_SCOPE_NAME = "REPL"; - - private static readonly TimeSpan GetItemTimeout = TimeSpan.FromSeconds(10); - private const int MAX_MESSAGE_SIZE = 12 * 1024; - private readonly PluginBase _plugin; private readonly ILogProvider _log; - private readonly NodeConfig _nodeConfig; - private readonly ICacheStore _cacheStore; - private readonly ICachePeerAdapter _peerAdapter; private readonly FBMClientFactory _clientFactory; - + private readonly ObjectCacheSystemState _sysState; + private readonly bool _isDebug; private int _openConnections; public CacheNodeReplicationMaanger(PluginBase plugin) { - //Load the node config - _nodeConfig = plugin.GetOrCreateSingleton(); - _cacheStore = plugin.GetOrCreateSingleton(); - _peerAdapter = plugin.GetOrCreateSingleton(); + _sysState = plugin.GetOrCreateSingleton(); //Init fbm config with fixed message size FBMClientConfig clientConfig = FBMDataCacheExtensions.GetDefaultConfig( - (plugin as ObjectCacheServerEntry)!.ListenerHeap, - MAX_MESSAGE_SIZE, - debugLog: plugin.IsDebug() ? plugin.Log : null + _sysState.SharedCacheHeap, + CacheConstants.MaxSyncMessageSize, + debugLog: plugin.IsDebug() ? plugin.Log.CreateScope(CacheConstants.LogScopes.ReplicationFbmDebug) : null ); //Init ws fallback factory and client factory - FBMFallbackClientWsFactory wsFactory = new(); - _clientFactory = new(in clientConfig, wsFactory); + _clientFactory = new( + ref clientConfig, + new FBMFallbackClientWsFactory(), + (int)_sysState.ClusterConfig.MaxPeerConnections + ); _plugin = plugin; _isDebug = plugin.IsDebug(); - _log = plugin.Log.CreateScope(LOG_SCOPE_NAME); + _log = plugin.Log.CreateScope(CacheConstants.LogScopes.RepliactionManager); } public async Task DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken) @@ -103,7 +95,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering while (true) { //Get all new peers - CacheNodeAdvertisment[] peers = _peerAdapter.GetNewPeers(); + CacheNodeAdvertisment[] peers = _sysState.PeerDiscovery.GetNewPeers(); if (peers.Length == 0 && _isDebug) { @@ -111,7 +103,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering } //Make sure we don't exceed the max connections - if(_openConnections >= _nodeConfig.MaxPeerConnections) + if(_openConnections >= _sysState.ClusterConfig.MaxPeerConnections) { if (_isDebug) { @@ -148,15 +140,24 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering _log.Information("Node replication worker exited"); } + /* + * This method is called when a new peer has connected (or discovered) to establish a + * replication connection. + */ private async Task OnNewPeerDoWorkAsync(CacheNodeAdvertisment newPeer, ILogProvider log, CancellationToken exitToken) { - _ = newPeer ?? throw new ArgumentNullException(nameof(newPeer)); - - //Setup client + ArgumentNullException.ThrowIfNull(newPeer); + FBMClient client = _clientFactory.CreateClient(); - //Add peer to monitor - _peerAdapter.OnPeerListenerAttached(newPeer); + /* + * Notify discovery that we will be listening to this peer + * + * This exists so when a new discovery happens, the work loop will produce + * the difference of new peers to existing peers, and we can connect to them. + * Avoiding infinite connections to the same peer. + */ + _sysState.PeerDiscovery.OnPeerListenerAttached(newPeer); Interlocked.Increment(ref _openConnections); @@ -165,12 +166,12 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering log.Information("Establishing replication connection to peer {server}...", newPeer.NodeId); //Connect to the server - await client.ConnectToCacheAsync(newPeer, _nodeConfig.Config, exitToken); + await client.ConnectToCacheAsync(newPeer, _sysState.NodeConfig, exitToken); log.Information("Connected to {server}, starting queue listeners", newPeer.NodeId); //Start worker tasks - List workerTasks = new(); + List workerTasks = []; for (int i = 0; i < Environment.ProcessorCount; i++) { @@ -187,6 +188,10 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering //Disconnect client gracefully await client.DisconnectAsync(CancellationToken.None); } + catch(FBMServerNegiationException fbm) + { + log.Error("Failed to negotiate buffer configuration, check your cache memory configuration. Error:{err}", fbm.Message); + } catch (InvalidResponseException ie) { //See if the plugin is unloading @@ -218,7 +223,15 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering } catch (Exception ex) { - log.Warn("Lost connection to peer {h}\n {m}", newPeer.NodeId, ex); + //Avoid call stacks unless debug or higher logging levels + if (log.IsEnabled(LogLevel.Debug)) + { + log.Warn("Lost connection to peer {h}\n {m}", newPeer.NodeId, ex); + } + else + { + log.Warn("Lost connection to peer {h}\n {m}", newPeer.NodeId, ex.Message); + } } finally { @@ -226,8 +239,8 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering client.Dispose(); - //Notify monitor of disconnect - _peerAdapter.OnPeerListenerDetatched(newPeer); + //Notify monitor of disconnect to make it available again later + _sysState.PeerDiscovery.OnPeerListenerDetatched(newPeer); } } @@ -259,7 +272,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering return; case "deleted": //Delete the object from the store - await _cacheStore.DeleteItemAsync(changedObject.CurrentId, CancellationToken.None); + await _sysState.InternalStore.DeleteItemAsync(changedObject.CurrentId, CancellationToken.None); break; case "modified": //Reload the record from the store @@ -287,7 +300,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering modRequest.WriteHeader(ObjectId, string.IsNullOrWhiteSpace(newId) ? objectId : newId); //Make request - using FBMResponse response = await client.SendAsync(modRequest, GetItemTimeout, cancellation); + using FBMResponse response = await client.SendAsync(modRequest, CacheConstants.Delays.CacheSyncGetItemTimeout, cancellation); response.ThrowIfNotSet(); @@ -297,7 +310,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering if (ResponseCodes.Okay.Equals(status, StringComparison.Ordinal)) { //Update the record - await _cacheStore.AddOrUpdateBlobAsync(objectId, newId, static (t) => t.ResponseBody, response, cancellation); + await _sysState.InternalStore.AddOrUpdateBlobAsync(objectId, newId, static (t) => t.ResponseBody, response, cancellation); log.Debug("Updated object {id}", objectId); } else diff --git a/plugins/ObjectCacheServer/src/Clustering/CachePeerMonitor.cs b/plugins/ObjectCacheServer/src/Clustering/CachePeerMonitor.cs index c49a54b..c3fbd8e 100644 --- a/plugins/ObjectCacheServer/src/Clustering/CachePeerMonitor.cs +++ b/plugins/ObjectCacheServer/src/Clustering/CachePeerMonitor.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2023 Vaughn Nugent +* Copyright (c) 2024 Vaughn Nugent * * Library: VNLib * Package: ObjectCacheServer @@ -29,7 +29,6 @@ using System.Collections.Generic; using VNLib.Utils; using VNLib.Utils.Extensions; -using VNLib.Plugins; namespace VNLib.Data.Caching.ObjectCache.Server.Clustering { @@ -37,12 +36,9 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering internal sealed class CachePeerMonitor : VnDisposeable, IPeerMonitor { - private readonly LinkedList peers = new(); + private readonly List peers = new(); private readonly ManualResetEvent newPeerTrigger = new (false); - public CachePeerMonitor(PluginBase plugin) - { } - /// /// Waits for new peers to connect to the server /// @@ -70,7 +66,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering //When a peer is connected we can add it to the list so the replication manager can see it lock(peers) { - peers.AddLast(peer); + peers.Add(peer); } //Trigger monitor when change occurs @@ -92,6 +88,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering protected override void Free() { + peers.Clear(); newPeerTrigger.Dispose(); } } diff --git a/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs b/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs index 6475f9c..b9a220d 100644 --- a/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs +++ b/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2023 Vaughn Nugent +* Copyright (c) 2024 Vaughn Nugent * * Library: VNLib * Package: ObjectCacheServer @@ -24,14 +24,11 @@ using System; using System.Linq; -using System.Net.Http; using System.Threading; -using System.Net.Sockets; using System.Threading.Tasks; using System.Collections.Generic; using VNLib.Utils.Logging; -using VNLib.Plugins; using VNLib.Plugins.Extensions.Loading; using VNLib.Data.Caching.Extensions; using VNLib.Data.Caching.Extensions.Clustering; @@ -43,54 +40,19 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering * This class is responsible for resolving and discovering peer nodes in the cluster network. */ - internal sealed class PeerDiscoveryManager : IAsyncBackgroundWork, ICachePeerAdapter + internal sealed class PeerDiscoveryManager( + CacheNodeConfiguration config, + ServerClusterConfig clusterConf, + CachePeerMonitor Monitor, + ILogProvider Log, + bool IsDebug, + bool HasWellKnown + ) + : IAsyncBackgroundWork, ICachePeerAdapter { - private const string LOG_SCOPE_NAME = "DISC"; - /* - * The initial discovery delay. This allows for the server to initialize before - * starting the discovery process. This will probably be a shorter delay - * than a usual discovery interval. - */ - private static readonly TimeSpan InitialDelay = TimeSpan.FromSeconds(15); - private static readonly TimeSpan WhenWellKnownResolveFailed = TimeSpan.FromSeconds(20); - - - private readonly List _connectedPeers; - private readonly NodeConfig Config; - private readonly CachePeerMonitor Monitor; - private readonly ILogProvider Log; - private readonly bool IsDebug; - private readonly bool HasWellKnown; - - public PeerDiscoveryManager(PluginBase plugin) - { - //Get config - Config = plugin.GetOrCreateSingleton(); - - //Get the known peers array from config, its allowed to be null for master nodes - IConfigScope? config = plugin.TryGetConfig("known_peers"); - string[] kownPeers = config?.Deserialze() ?? Array.Empty(); - - //Add known peers to the monitor - Config.Config.WithInitialPeers(kownPeers.Select(static s => new Uri(s))); - - HasWellKnown = kownPeers.Length > 0; - - //Get the peer monitor - Monitor = plugin.GetOrCreateSingleton(); - - _connectedPeers = new(); - - //Create scoped logger - Log = plugin.Log.CreateScope(LOG_SCOPE_NAME); - - Log.Information("Inital peer nodes: {nodes}", kownPeers); - //Setup discovery error handler - Config.Config.WithErrorHandler(new ErrorHandler(Log)); - - IsDebug = plugin.IsDebug(); - } + private readonly List _connectedPeers = []; + private readonly VNCacheClusterManager clusterMan = new(config); async Task IAsyncBackgroundWork.DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken) { @@ -103,12 +65,11 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering //Start the change listener Task watcher = WatchForPeersAsync(exitToken); - Log.Information("Node discovery worker started, waiting for {idel} to start initial discovery", InitialDelay); + Log.Information("Node discovery worker started, waiting for {idel} to start initial discovery", CacheConstants.Delays.InitialDiscovery); try - { - //Wait for the initial delay - await Task.Delay(InitialDelay, exitToken); + { + await Task.Delay(CacheConstants.Delays.InitialDiscovery, exitToken); Log.Debug("Begining discovery loop"); @@ -123,26 +84,32 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering Log.Debug("Begining node discovery"); } - //Resolve all known peers - CacheNodeAdvertisment[] wellKnown = await Config.Config.ResolveWellKnownAsync(exitToken); - wellKnownFailed = wellKnown.Length == 0; + /* + * On every loop we will need to resolve well-known servers incase they go down + * or change. There probably should be some more advanced logic and caching here. + * + * Node may not have any well-known nodes, so we need to check for that. + */ + CacheNodeAdvertisment[] wellKnown = HasWellKnown ? + await clusterMan.ResolveWellKnownAsync(exitToken) : + Array.Empty(); //Use the monitor to get the initial peers IEnumerable ads = GetMonitorAds(); - //Combine well-known with new connected peers + //Combine well-known peers that are currently connected to this server CacheNodeAdvertisment[] allAds = ads.Union(wellKnown).ToArray(); if (allAds.Length > 0) { - //Discover all known nodes - await Config.Config.DiscoverNodesAsync(allAds, exitToken); + //Build the discovery map from all the known nodes to find all known nodes in the entire cluster + await clusterMan.DiscoverNodesAsync(allAds, exitToken); } //Log the discovered nodes if verbose logging is enabled if (IsDebug) { - CacheNodeAdvertisment[] found = Config.Config.NodeCollection.GetAllNodes(); + CacheNodeAdvertisment[] found = clusterMan.DiscoveredNodes.GetAllNodes(); Log.Debug("Discovered {count} nodes\n\t{nodes}", found.Length, found.Select(static s => s.NodeId)); } @@ -168,16 +135,16 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering { if (IsDebug) { - Log.Debug("Well known node discovery failed, waiting for {idel} before retrying", WhenWellKnownResolveFailed); + Log.Debug("Well known node discovery failed, waiting for {idel} before retrying", CacheConstants.Delays.WellKnownResolveFailed); } //Wait for shorter duration - await Task.Delay(WhenWellKnownResolveFailed, exitToken); + await Task.Delay(CacheConstants.Delays.WellKnownResolveFailed, exitToken); } else { //Delay the next discovery - await Task.Delay(Config.DiscoveryInterval, exitToken); + await Task.Delay(clusterConf.DiscoveryInterval, exitToken); } } } @@ -188,7 +155,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering } finally { - + Monitor.Dispose(); } //Wait for the watcher to exit @@ -197,10 +164,11 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering private IEnumerable GetMonitorAds() { + string selfId = (clusterMan.Config as CacheNodeConfiguration)!.NodeId; return Monitor.GetAllPeers() .Where(static p => p.Advertisment != null) //Without us - .Where(n => n.NodeId != Config.Config.NodeId) + .Where(n => !string.Equals(n.NodeId, selfId, StringComparison.OrdinalIgnoreCase)) .Select(static p => p.Advertisment!); } @@ -222,7 +190,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering //Use the monitor to get the initial peers IEnumerable ads = GetMonitorAds(); - ((NodeDiscoveryCollection)Config.Config.NodeCollection).AddManualNodes(ads); + clusterMan.AddManualNodes(ads); } } catch (OperationCanceledException) @@ -239,7 +207,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering lock (_connectedPeers) { //Get all discovered peers - CacheNodeAdvertisment[] peers = Config.Config.NodeCollection.GetAllNodes(); + CacheNodeAdvertisment[] peers = clusterMan.DiscoveredNodes.GetAllNodes(); //Get the difference between the discovered peers and the connected peers return peers.Except(_connectedPeers).ToArray(); @@ -265,31 +233,5 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering _connectedPeers.Remove(peer); } } - - - private sealed record class ErrorHandler(ILogProvider Logger) : ICacheDiscoveryErrorHandler - { - public void OnDiscoveryError(CacheNodeAdvertisment errorNode, Exception ex) - { - - if (ex is HttpRequestException hre) - { - if (hre.InnerException is SocketException se) - { - //traisnport failed - Logger.Warn("Failed to connect to server {serv} because {err}", errorNode, se.Message); - } - else - { - Logger.Error("Failed to connect to node {n}\n{err}", errorNode, hre); - } - } - else - { - Logger.Error("Failed to discover nodes from nodeid {nid}, with error\n{err}", errorNode, ex); - } - - } - } } } -- cgit