aboutsummaryrefslogtreecommitdiff
path: root/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs
diff options
context:
space:
mode:
authorLibravatar vnugent <public@vaughnnugent.com>2023-07-13 13:20:25 -0400
committerLibravatar vnugent <public@vaughnnugent.com>2023-07-13 13:20:25 -0400
commit2f674e79d42e7d36225fa9ac7ecefbc5bc62d325 (patch)
treec58999489f5391bc044e7a9bb3e557afe2860415 /lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs
parent1a8ab1457244d15b19ddcc94958f645f5ec2abc7 (diff)
Checkpoint, kind of working clustering
Diffstat (limited to 'lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs')
-rw-r--r--lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs56
1 files changed, 24 insertions, 32 deletions
diff --git a/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs b/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs
index f4f059b..aa3d88f 100644
--- a/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs
+++ b/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs
@@ -40,7 +40,7 @@ using VNLib.Data.Caching.Extensions;
using VNLib.Data.Caching.ObjectCache;
using VNLib.Net.Messaging.FBM.Client;
using VNLib.Plugins.Extensions.Loading;
-
+using VNLib.Data.Caching.Extensions.Clustering;
namespace VNLib.Plugins.Extensions.VNCache
{
@@ -56,7 +56,7 @@ namespace VNLib.Plugins.Extensions.VNCache
/// A base class that manages
/// </summary>
[ConfigurationName(VNCacheExtensions.CACHE_CONFIG_KEY)]
- internal class VnCacheClient : IGlobalCacheProvider, IAsyncBackgroundWork, IAsyncConfigurable
+ internal class VnCacheClient : IGlobalCacheProvider, IAsyncBackgroundWork
{
private readonly VnCacheClientConfig _config;
@@ -70,12 +70,17 @@ namespace VNLib.Plugins.Extensions.VNCache
/// </summary>
public bool IsConnected { get; private set; }
- public VnCacheClient(PluginBase pbase, IConfigScope config)
+ public VnCacheClient(PluginBase plugin, IConfigScope config)
:this(
- config.Deserialze<VnCacheClientConfig>(),
- pbase.IsDebug() ? pbase.Log : null
+ config.Deserialze<VnCacheClientConfig>(),
+ plugin.IsDebug() ? plugin.Log : null
)
- {}
+ {
+ //Set authenticator and error handler
+ Client.GetCacheConfiguration()
+ .WithAuthenticator(new AuthManager(plugin))
+ .WithErrorHandler(new DiscoveryErrHAndler(plugin.Log));
+ }
public VnCacheClient(VnCacheClientConfig config, ILogProvider? debugLog)
{
@@ -92,17 +97,7 @@ namespace VNLib.Plugins.Extensions.VNCache
//Add the configuration to the client
Client.GetCacheConfiguration()
.WithTls(config.UseTls)
- .WithInitialPeers(config.InitialNodes!);
- }
-
- public Task ConfigureServiceAsync(PluginBase plugin)
- {
- //Set authenticator
- Client.GetCacheConfiguration()
- .WithAuthenticator(new AuthManager(plugin))
- .WithErrorHandler(new DiscoveryErrHAndler(plugin.Log));
-
- return Task.CompletedTask;
+ .WithInitialPeers(config.GetInitialNodeUris());
}
/*
@@ -115,21 +110,25 @@ namespace VNLib.Plugins.Extensions.VNCache
{
while (true)
{
- //Load the server list
- ICacheNodeAdvertisment[]? servers;
+ //Discover nodes in the network
+
while (true)
{
try
{
- pluginLog.Debug("Discovering cluster nodes in broker");
+ pluginLog.Debug("Discovering cluster nodes in network");
//Get server list
- servers = await Client.DiscoverCacheNodesAsync(exitToken);
+ await Client.DiscoverAvailableNodesAsync(exitToken);
break;
}
catch (HttpRequestException re) when (re.InnerException is SocketException)
{
pluginLog.Warn("Broker server is unreachable");
}
+ catch(CacheDiscoveryFailureException ce)
+ {
+ pluginLog.Warn("Failed to discover cache servers, reason {r}", ce);
+ }
catch (Exception ex)
{
pluginLog.Warn("Failed to get server list from broker, reason {r}", ex.Message);
@@ -140,19 +139,12 @@ namespace VNLib.Plugins.Extensions.VNCache
await Task.Delay(randomMsDelay, exitToken);
}
- if (servers?.Length == 0)
- {
- pluginLog.Warn("No cluster nodes found, retrying");
- await Task.Delay(_config.RetryInterval, exitToken);
- continue;
- }
-
try
{
pluginLog.Debug("Connecting to random cache server");
//Connect to a random server
- ICacheNodeAdvertisment selected = await Client.ConnectToRandomCacheAsync(exitToken);
+ CacheNodeAdvertisment selected = await Client.ConnectToRandomCacheAsync(exitToken);
pluginLog.Debug("Connected to cache server {s}", selected.NodeId);
//Set connection status flag
@@ -304,13 +296,13 @@ namespace VNLib.Plugins.Extensions.VNCache
}
///<inheritdoc/>
- public bool VerifyJwt(JsonWebToken jwt)
+ public bool VerifyJwt(JsonWebToken jwt, bool isPeer)
{
return jwt.VerifyFromJwk(_verKey.Value);
}
///<inheritdoc/>
- public bool VerifyMessageHash(ReadOnlySpan<byte> hash, HashAlg alg, ReadOnlySpan<byte> signature)
+ public bool VerifyMessageHash(ReadOnlySpan<byte> hash, HashAlg alg, ReadOnlySpan<byte> signature, bool isPeer)
{
//try to get the rsa alg for the signing key
using RSA? rsa = _verKey.Value.GetRSAPublicKey();
@@ -332,7 +324,7 @@ namespace VNLib.Plugins.Extensions.VNCache
private sealed record class DiscoveryErrHAndler(ILogProvider Logger) : ICacheDiscoveryErrorHandler
{
- public void OnDiscoveryError(ICacheNodeAdvertisment errorNode, Exception ex)
+ public void OnDiscoveryError(CacheNodeAdvertisment errorNode, Exception ex)
{
Logger.Error("Failed to discover nodes from server {s} cause:\n{err}", errorNode.NodeId, ex);
}