From 2f674e79d42e7d36225fa9ac7ecefbc5bc62d325 Mon Sep 17 00:00:00 2001 From: vnugent Date: Thu, 13 Jul 2023 13:20:25 -0400 Subject: Checkpoint, kind of working clustering --- .../src/VnCacheClient.cs | 56 ++++++++++------------ 1 file changed, 24 insertions(+), 32 deletions(-) (limited to 'lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs') diff --git a/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs b/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs index f4f059b..aa3d88f 100644 --- a/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs +++ b/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs @@ -40,7 +40,7 @@ using VNLib.Data.Caching.Extensions; using VNLib.Data.Caching.ObjectCache; using VNLib.Net.Messaging.FBM.Client; using VNLib.Plugins.Extensions.Loading; - +using VNLib.Data.Caching.Extensions.Clustering; namespace VNLib.Plugins.Extensions.VNCache { @@ -56,7 +56,7 @@ namespace VNLib.Plugins.Extensions.VNCache /// A base class that manages /// [ConfigurationName(VNCacheExtensions.CACHE_CONFIG_KEY)] - internal class VnCacheClient : IGlobalCacheProvider, IAsyncBackgroundWork, IAsyncConfigurable + internal class VnCacheClient : IGlobalCacheProvider, IAsyncBackgroundWork { private readonly VnCacheClientConfig _config; @@ -70,12 +70,17 @@ namespace VNLib.Plugins.Extensions.VNCache /// public bool IsConnected { get; private set; } - public VnCacheClient(PluginBase pbase, IConfigScope config) + public VnCacheClient(PluginBase plugin, IConfigScope config) :this( - config.Deserialze(), - pbase.IsDebug() ? pbase.Log : null + config.Deserialze(), + plugin.IsDebug() ? plugin.Log : null ) - {} + { + //Set authenticator and error handler + Client.GetCacheConfiguration() + .WithAuthenticator(new AuthManager(plugin)) + .WithErrorHandler(new DiscoveryErrHAndler(plugin.Log)); + } public VnCacheClient(VnCacheClientConfig config, ILogProvider? debugLog) { @@ -92,17 +97,7 @@ namespace VNLib.Plugins.Extensions.VNCache //Add the configuration to the client Client.GetCacheConfiguration() .WithTls(config.UseTls) - .WithInitialPeers(config.InitialNodes!); - } - - public Task ConfigureServiceAsync(PluginBase plugin) - { - //Set authenticator - Client.GetCacheConfiguration() - .WithAuthenticator(new AuthManager(plugin)) - .WithErrorHandler(new DiscoveryErrHAndler(plugin.Log)); - - return Task.CompletedTask; + .WithInitialPeers(config.GetInitialNodeUris()); } /* @@ -115,21 +110,25 @@ namespace VNLib.Plugins.Extensions.VNCache { while (true) { - //Load the server list - ICacheNodeAdvertisment[]? servers; + //Discover nodes in the network + while (true) { try { - pluginLog.Debug("Discovering cluster nodes in broker"); + pluginLog.Debug("Discovering cluster nodes in network"); //Get server list - servers = await Client.DiscoverCacheNodesAsync(exitToken); + await Client.DiscoverAvailableNodesAsync(exitToken); break; } catch (HttpRequestException re) when (re.InnerException is SocketException) { pluginLog.Warn("Broker server is unreachable"); } + catch(CacheDiscoveryFailureException ce) + { + pluginLog.Warn("Failed to discover cache servers, reason {r}", ce); + } catch (Exception ex) { pluginLog.Warn("Failed to get server list from broker, reason {r}", ex.Message); @@ -140,19 +139,12 @@ namespace VNLib.Plugins.Extensions.VNCache await Task.Delay(randomMsDelay, exitToken); } - if (servers?.Length == 0) - { - pluginLog.Warn("No cluster nodes found, retrying"); - await Task.Delay(_config.RetryInterval, exitToken); - continue; - } - try { pluginLog.Debug("Connecting to random cache server"); //Connect to a random server - ICacheNodeAdvertisment selected = await Client.ConnectToRandomCacheAsync(exitToken); + CacheNodeAdvertisment selected = await Client.ConnectToRandomCacheAsync(exitToken); pluginLog.Debug("Connected to cache server {s}", selected.NodeId); //Set connection status flag @@ -304,13 +296,13 @@ namespace VNLib.Plugins.Extensions.VNCache } /// - public bool VerifyJwt(JsonWebToken jwt) + public bool VerifyJwt(JsonWebToken jwt, bool isPeer) { return jwt.VerifyFromJwk(_verKey.Value); } /// - public bool VerifyMessageHash(ReadOnlySpan hash, HashAlg alg, ReadOnlySpan signature) + public bool VerifyMessageHash(ReadOnlySpan hash, HashAlg alg, ReadOnlySpan signature, bool isPeer) { //try to get the rsa alg for the signing key using RSA? rsa = _verKey.Value.GetRSAPublicKey(); @@ -332,7 +324,7 @@ namespace VNLib.Plugins.Extensions.VNCache private sealed record class DiscoveryErrHAndler(ILogProvider Logger) : ICacheDiscoveryErrorHandler { - public void OnDiscoveryError(ICacheNodeAdvertisment errorNode, Exception ex) + public void OnDiscoveryError(CacheNodeAdvertisment errorNode, Exception ex) { Logger.Error("Failed to discover nodes from server {s} cause:\n{err}", errorNode.NodeId, ex); } -- cgit