From 4d8cfc10382105b0acbd94df93ad3d05ff91db54 Mon Sep 17 00:00:00 2001 From: vnugent Date: Wed, 6 Mar 2024 21:30:58 -0500 Subject: refactor: #2 Centralize server state, default discovery endpoints & more --- .../src/Clustering/CacheNodeReplicationMaanger.cs | 48 +++++++++++----------- 1 file changed, 25 insertions(+), 23 deletions(-) (limited to 'plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs') diff --git a/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs b/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs index a240dde..0a1bb4d 100644 --- a/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs +++ b/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2023 Vaughn Nugent +* Copyright (c) 2024 Vaughn Nugent * * Library: VNLib * Package: ObjectCacheServer @@ -36,7 +36,6 @@ using VNLib.Net.Messaging.FBM; using VNLib.Net.Messaging.FBM.Client; using VNLib.Plugins.Extensions.Loading; using VNLib.Data.Caching.Extensions.Clustering; -using VNLib.Data.Caching.ObjectCache.Server.Cache; namespace VNLib.Data.Caching.ObjectCache.Server.Clustering { @@ -56,38 +55,37 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering internal sealed class CacheNodeReplicationMaanger : IAsyncBackgroundWork { private const string LOG_SCOPE_NAME = "REPL"; + private const string FBM_LOG_SCOPE_NAME = "REPL-CLNT"; private static readonly TimeSpan GetItemTimeout = TimeSpan.FromSeconds(10); private const int MAX_MESSAGE_SIZE = 12 * 1024; private readonly PluginBase _plugin; private readonly ILogProvider _log; - private readonly NodeConfig _nodeConfig; - private readonly ICacheStore _cacheStore; - private readonly ICachePeerAdapter _peerAdapter; private readonly FBMClientFactory _clientFactory; - + private readonly ObjectCacheSystemState _sysState; + private readonly bool _isDebug; private int _openConnections; public CacheNodeReplicationMaanger(PluginBase plugin) { - //Load the node config - _nodeConfig = plugin.GetOrCreateSingleton(); - _cacheStore = plugin.GetOrCreateSingleton(); - _peerAdapter = plugin.GetOrCreateSingleton(); + _sysState = plugin.GetOrCreateSingleton(); //Init fbm config with fixed message size FBMClientConfig clientConfig = FBMDataCacheExtensions.GetDefaultConfig( - (plugin as ObjectCacheServerEntry)!.ListenerHeap, + _sysState.SharedCacheHeap, MAX_MESSAGE_SIZE, - debugLog: plugin.IsDebug() ? plugin.Log : null + debugLog: plugin.IsDebug() ? plugin.Log.CreateScope(FBM_LOG_SCOPE_NAME) : null ); //Init ws fallback factory and client factory - FBMFallbackClientWsFactory wsFactory = new(); - _clientFactory = new(in clientConfig, wsFactory); + _clientFactory = new( + ref clientConfig, + new FBMFallbackClientWsFactory(), + (int)_sysState.Configuration.MaxPeerConnections + ); _plugin = plugin; _isDebug = plugin.IsDebug(); @@ -103,7 +101,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering while (true) { //Get all new peers - CacheNodeAdvertisment[] peers = _peerAdapter.GetNewPeers(); + CacheNodeAdvertisment[] peers = _sysState.PeerDiscovery.GetNewPeers(); if (peers.Length == 0 && _isDebug) { @@ -111,7 +109,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering } //Make sure we don't exceed the max connections - if(_openConnections >= _nodeConfig.MaxPeerConnections) + if(_openConnections >= _sysState.Configuration.MaxPeerConnections) { if (_isDebug) { @@ -150,13 +148,13 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering private async Task OnNewPeerDoWorkAsync(CacheNodeAdvertisment newPeer, ILogProvider log, CancellationToken exitToken) { - _ = newPeer ?? throw new ArgumentNullException(nameof(newPeer)); + ArgumentNullException.ThrowIfNull(newPeer); //Setup client FBMClient client = _clientFactory.CreateClient(); //Add peer to monitor - _peerAdapter.OnPeerListenerAttached(newPeer); + _sysState.PeerDiscovery.OnPeerListenerAttached(newPeer); Interlocked.Increment(ref _openConnections); @@ -165,12 +163,12 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering log.Information("Establishing replication connection to peer {server}...", newPeer.NodeId); //Connect to the server - await client.ConnectToCacheAsync(newPeer, _nodeConfig.Config, exitToken); + await client.ConnectToCacheAsync(newPeer, _sysState.Configuration.Config, exitToken); log.Information("Connected to {server}, starting queue listeners", newPeer.NodeId); //Start worker tasks - List workerTasks = new(); + List workerTasks = []; for (int i = 0; i < Environment.ProcessorCount; i++) { @@ -187,6 +185,10 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering //Disconnect client gracefully await client.DisconnectAsync(CancellationToken.None); } + catch(FBMServerNegiationException fbm) + { + log.Error("Failed to negotiate buffer configuration, check your cache memory configuration. Error:{err}", fbm.Message); + } catch (InvalidResponseException ie) { //See if the plugin is unloading @@ -227,7 +229,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering client.Dispose(); //Notify monitor of disconnect - _peerAdapter.OnPeerListenerDetatched(newPeer); + _sysState.PeerDiscovery.OnPeerListenerDetatched(newPeer); } } @@ -259,7 +261,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering return; case "deleted": //Delete the object from the store - await _cacheStore.DeleteItemAsync(changedObject.CurrentId, CancellationToken.None); + await _sysState.InternalStore.DeleteItemAsync(changedObject.CurrentId, CancellationToken.None); break; case "modified": //Reload the record from the store @@ -297,7 +299,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering if (ResponseCodes.Okay.Equals(status, StringComparison.Ordinal)) { //Update the record - await _cacheStore.AddOrUpdateBlobAsync(objectId, newId, static (t) => t.ResponseBody, response, cancellation); + await _sysState.InternalStore.AddOrUpdateBlobAsync(objectId, newId, static (t) => t.ResponseBody, response, cancellation); log.Debug("Updated object {id}", objectId); } else -- cgit