From 8b4fb26473256da5eaa89f3e9d2ac5d44f1e9b88 Mon Sep 17 00:00:00 2001 From: vnugent Date: Sat, 15 Jul 2023 13:06:00 -0400 Subject: Latest working draft --- .../src/VnCacheClient.cs | 94 ++++++++++++++-------- 1 file changed, 61 insertions(+), 33 deletions(-) (limited to 'lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs') diff --git a/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs b/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs index aa3d88f..679c91d 100644 --- a/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs +++ b/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs @@ -41,6 +41,8 @@ using VNLib.Data.Caching.ObjectCache; using VNLib.Net.Messaging.FBM.Client; using VNLib.Plugins.Extensions.Loading; using VNLib.Data.Caching.Extensions.Clustering; +using VNLib.Plugins.Extensions.Loading.Events; +using VNLib.Plugins.Extensions.VNCache.Clustering; namespace VNLib.Plugins.Extensions.VNCache { @@ -51,14 +53,18 @@ namespace VNLib.Plugins.Extensions.VNCache TimeSpan RefreshInterval { get; } } - /// /// A base class that manages /// [ConfigurationName(VNCacheExtensions.CACHE_CONFIG_KEY)] internal class VnCacheClient : IGlobalCacheProvider, IAsyncBackgroundWork { + private const string LOG_NAME = "CLIENT"; + private static readonly TimeSpan InitialDelay = TimeSpan.FromSeconds(10); + private static readonly TimeSpan NoNodeDelay = TimeSpan.FromSeconds(10); + private readonly VnCacheClientConfig _config; + private readonly ClusterNodeIndex _index; /// /// The internal client @@ -76,10 +82,23 @@ namespace VNLib.Plugins.Extensions.VNCache plugin.IsDebug() ? plugin.Log : null ) { + ILogProvider scoped = plugin.Log.CreateScope(LOG_NAME); + //Set authenticator and error handler Client.GetCacheConfiguration() .WithAuthenticator(new AuthManager(plugin)) - .WithErrorHandler(new DiscoveryErrHAndler(plugin.Log)); + .WithErrorHandler(new DiscoveryErrHAndler(scoped)); + + //Schedule discovery interval + plugin.ScheduleInterval(_index, _config.DiscoveryInterval); + + //Run discovery after initial delay if interval is greater than initial delay + if(_config.DiscoveryInterval > InitialDelay) + { + //Run a manual initial load + scoped.Information("Running initial discovery in {delay}", InitialDelay); + _ = plugin.ObserveWork(() => _index.OnIntervalAsync(scoped, plugin.UnloadToken), (int)InitialDelay.TotalMilliseconds); + } } public VnCacheClient(VnCacheClientConfig config, ILogProvider? debugLog) @@ -98,6 +117,9 @@ namespace VNLib.Plugins.Extensions.VNCache Client.GetCacheConfiguration() .WithTls(config.UseTls) .WithInitialPeers(config.GetInitialNodeUris()); + + //Init index + _index = new ClusterNodeIndex(Client.GetCacheConfiguration()); } /* @@ -106,46 +128,50 @@ namespace VNLib.Plugins.Extensions.VNCache */ public virtual async Task DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken) { + //Scope log + pluginLog = pluginLog.CreateScope(LOG_NAME); + try { + //Initial delay + pluginLog.Debug("Worker started, waiting for startup delay"); + await Task.Delay((int)InitialDelay.TotalMilliseconds + 1000, exitToken); + while (true) { - //Discover nodes in the network - - while (true) + //Wait for a discovery to complete + await _index.WaitForDiscoveryAsync(exitToken); + + //Get the next node to connect to + CacheNodeAdvertisment? node = _index.GetNextNode(); + + if (node is null) { - try - { - pluginLog.Debug("Discovering cluster nodes in network"); - //Get server list - await Client.DiscoverAvailableNodesAsync(exitToken); - break; - } - catch (HttpRequestException re) when (re.InnerException is SocketException) - { - pluginLog.Warn("Broker server is unreachable"); - } - catch(CacheDiscoveryFailureException ce) - { - pluginLog.Warn("Failed to discover cache servers, reason {r}", ce); - } - catch (Exception ex) + pluginLog.Warn("No nodes available to connect to, trying again in {delay}", NoNodeDelay); + await Task.Delay(NoNodeDelay, exitToken); + + //Run another manual discovery if the interval is greater than the delay + if (_config.DiscoveryInterval > NoNodeDelay) { - pluginLog.Warn("Failed to get server list from broker, reason {r}", ex.Message); + pluginLog.Debug("Forcing a manual discovery"); + + //We dont need to await this because it is awaited at the top of the loop + _ = _index.OnIntervalAsync(pluginLog, exitToken); } - //Gen random ms delay - int randomMsDelay = RandomNumberGenerator.GetInt32(1000, 2000); - await Task.Delay(randomMsDelay, exitToken); + continue; } + //Ready to connect + try { - pluginLog.Debug("Connecting to random cache server"); + pluginLog.Debug("Connecting to {node}", node); - //Connect to a random server - CacheNodeAdvertisment selected = await Client.ConnectToRandomCacheAsync(exitToken); - pluginLog.Debug("Connected to cache server {s}", selected.NodeId); + //Connect to the node + await Client.ConnectToCacheAsync(node, exitToken); + + pluginLog.Debug("Successfully connected to {s}", node); //Set connection status flag IsConnected = true; @@ -157,7 +183,7 @@ namespace VNLib.Plugins.Extensions.VNCache } catch (WebSocketException wse) { - pluginLog.Warn("Failed to connect to cache server {reason}", wse.Message); + pluginLog.Warn("Failed to connect to cache server {reason}", wse); continue; } catch (HttpRequestException he) when (he.InnerException is SocketException) @@ -170,6 +196,8 @@ namespace VNLib.Plugins.Extensions.VNCache { IsConnected = false; } + + //Loop again } } catch (OperationCanceledException) @@ -279,14 +307,14 @@ namespace VNLib.Plugins.Extensions.VNCache public byte[] SignMessageHash(byte[] hash, HashAlg alg) { //try to get the rsa alg for the signing key - using RSA? rsa = _sigKey.Value.GetRSAPublicKey(); + using RSA? rsa = _sigKey.Value.GetRSAPrivateKey(); if(rsa != null) { return rsa.SignHash(hash, alg.GetAlgName(), RSASignaturePadding.Pkcs1); } //try to get the ecdsa alg for the signing key - using ECDsa? ecdsa = _sigKey.Value.GetECDsaPublicKey(); + using ECDsa? ecdsa = _sigKey.Value.GetECDsaPrivateKey(); if(ecdsa != null) { return ecdsa.SignHash(hash); @@ -326,7 +354,7 @@ namespace VNLib.Plugins.Extensions.VNCache { public void OnDiscoveryError(CacheNodeAdvertisment errorNode, Exception ex) { - Logger.Error("Failed to discover nodes from server {s} cause:\n{err}", errorNode.NodeId, ex); + Logger.Error("Failed to discover nodes from server {s} cause:\n{err}", errorNode?.NodeId, ex); } } } -- cgit