aboutsummaryrefslogtreecommitdiff
path: root/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs
diff options
context:
space:
mode:
authorLibravatar vnugent <public@vaughnnugent.com>2023-07-15 13:06:00 -0400
committerLibravatar vnugent <public@vaughnnugent.com>2023-07-15 13:06:00 -0400
commit8b4fb26473256da5eaa89f3e9d2ac5d44f1e9b88 (patch)
tree6ff979b6110b9e6c61ff9f22bb0dbdd2094e08cf /lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs
parent2f674e79d42e7d36225fa9ac7ecefbc5bc62d325 (diff)
Latest working draft
Diffstat (limited to 'lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs')
-rw-r--r--lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs94
1 files changed, 61 insertions, 33 deletions
diff --git a/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs b/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs
index aa3d88f..679c91d 100644
--- a/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs
+++ b/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs
@@ -41,6 +41,8 @@ using VNLib.Data.Caching.ObjectCache;
using VNLib.Net.Messaging.FBM.Client;
using VNLib.Plugins.Extensions.Loading;
using VNLib.Data.Caching.Extensions.Clustering;
+using VNLib.Plugins.Extensions.Loading.Events;
+using VNLib.Plugins.Extensions.VNCache.Clustering;
namespace VNLib.Plugins.Extensions.VNCache
{
@@ -51,14 +53,18 @@ namespace VNLib.Plugins.Extensions.VNCache
TimeSpan RefreshInterval { get; }
}
-
/// <summary>
/// A base class that manages
/// </summary>
[ConfigurationName(VNCacheExtensions.CACHE_CONFIG_KEY)]
internal class VnCacheClient : IGlobalCacheProvider, IAsyncBackgroundWork
{
+ private const string LOG_NAME = "CLIENT";
+ private static readonly TimeSpan InitialDelay = TimeSpan.FromSeconds(10);
+ private static readonly TimeSpan NoNodeDelay = TimeSpan.FromSeconds(10);
+
private readonly VnCacheClientConfig _config;
+ private readonly ClusterNodeIndex _index;
/// <summary>
/// The internal client
@@ -76,10 +82,23 @@ namespace VNLib.Plugins.Extensions.VNCache
plugin.IsDebug() ? plugin.Log : null
)
{
+ ILogProvider scoped = plugin.Log.CreateScope(LOG_NAME);
+
//Set authenticator and error handler
Client.GetCacheConfiguration()
.WithAuthenticator(new AuthManager(plugin))
- .WithErrorHandler(new DiscoveryErrHAndler(plugin.Log));
+ .WithErrorHandler(new DiscoveryErrHAndler(scoped));
+
+ //Schedule discovery interval
+ plugin.ScheduleInterval(_index, _config.DiscoveryInterval);
+
+ //Run discovery after initial delay if interval is greater than initial delay
+ if(_config.DiscoveryInterval > InitialDelay)
+ {
+ //Run a manual initial load
+ scoped.Information("Running initial discovery in {delay}", InitialDelay);
+ _ = plugin.ObserveWork(() => _index.OnIntervalAsync(scoped, plugin.UnloadToken), (int)InitialDelay.TotalMilliseconds);
+ }
}
public VnCacheClient(VnCacheClientConfig config, ILogProvider? debugLog)
@@ -98,6 +117,9 @@ namespace VNLib.Plugins.Extensions.VNCache
Client.GetCacheConfiguration()
.WithTls(config.UseTls)
.WithInitialPeers(config.GetInitialNodeUris());
+
+ //Init index
+ _index = new ClusterNodeIndex(Client.GetCacheConfiguration());
}
/*
@@ -106,46 +128,50 @@ namespace VNLib.Plugins.Extensions.VNCache
*/
public virtual async Task DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken)
{
+ //Scope log
+ pluginLog = pluginLog.CreateScope(LOG_NAME);
+
try
{
+ //Initial delay
+ pluginLog.Debug("Worker started, waiting for startup delay");
+ await Task.Delay((int)InitialDelay.TotalMilliseconds + 1000, exitToken);
+
while (true)
{
- //Discover nodes in the network
-
- while (true)
+ //Wait for a discovery to complete
+ await _index.WaitForDiscoveryAsync(exitToken);
+
+ //Get the next node to connect to
+ CacheNodeAdvertisment? node = _index.GetNextNode();
+
+ if (node is null)
{
- try
- {
- pluginLog.Debug("Discovering cluster nodes in network");
- //Get server list
- await Client.DiscoverAvailableNodesAsync(exitToken);
- break;
- }
- catch (HttpRequestException re) when (re.InnerException is SocketException)
- {
- pluginLog.Warn("Broker server is unreachable");
- }
- catch(CacheDiscoveryFailureException ce)
- {
- pluginLog.Warn("Failed to discover cache servers, reason {r}", ce);
- }
- catch (Exception ex)
+ pluginLog.Warn("No nodes available to connect to, trying again in {delay}", NoNodeDelay);
+ await Task.Delay(NoNodeDelay, exitToken);
+
+ //Run another manual discovery if the interval is greater than the delay
+ if (_config.DiscoveryInterval > NoNodeDelay)
{
- pluginLog.Warn("Failed to get server list from broker, reason {r}", ex.Message);
+ pluginLog.Debug("Forcing a manual discovery");
+
+ //We dont need to await this because it is awaited at the top of the loop
+ _ = _index.OnIntervalAsync(pluginLog, exitToken);
}
- //Gen random ms delay
- int randomMsDelay = RandomNumberGenerator.GetInt32(1000, 2000);
- await Task.Delay(randomMsDelay, exitToken);
+ continue;
}
+ //Ready to connect
+
try
{
- pluginLog.Debug("Connecting to random cache server");
+ pluginLog.Debug("Connecting to {node}", node);
- //Connect to a random server
- CacheNodeAdvertisment selected = await Client.ConnectToRandomCacheAsync(exitToken);
- pluginLog.Debug("Connected to cache server {s}", selected.NodeId);
+ //Connect to the node
+ await Client.ConnectToCacheAsync(node, exitToken);
+
+ pluginLog.Debug("Successfully connected to {s}", node);
//Set connection status flag
IsConnected = true;
@@ -157,7 +183,7 @@ namespace VNLib.Plugins.Extensions.VNCache
}
catch (WebSocketException wse)
{
- pluginLog.Warn("Failed to connect to cache server {reason}", wse.Message);
+ pluginLog.Warn("Failed to connect to cache server {reason}", wse);
continue;
}
catch (HttpRequestException he) when (he.InnerException is SocketException)
@@ -170,6 +196,8 @@ namespace VNLib.Plugins.Extensions.VNCache
{
IsConnected = false;
}
+
+ //Loop again
}
}
catch (OperationCanceledException)
@@ -279,14 +307,14 @@ namespace VNLib.Plugins.Extensions.VNCache
public byte[] SignMessageHash(byte[] hash, HashAlg alg)
{
//try to get the rsa alg for the signing key
- using RSA? rsa = _sigKey.Value.GetRSAPublicKey();
+ using RSA? rsa = _sigKey.Value.GetRSAPrivateKey();
if(rsa != null)
{
return rsa.SignHash(hash, alg.GetAlgName(), RSASignaturePadding.Pkcs1);
}
//try to get the ecdsa alg for the signing key
- using ECDsa? ecdsa = _sigKey.Value.GetECDsaPublicKey();
+ using ECDsa? ecdsa = _sigKey.Value.GetECDsaPrivateKey();
if(ecdsa != null)
{
return ecdsa.SignHash(hash);
@@ -326,7 +354,7 @@ namespace VNLib.Plugins.Extensions.VNCache
{
public void OnDiscoveryError(CacheNodeAdvertisment errorNode, Exception ex)
{
- Logger.Error("Failed to discover nodes from server {s} cause:\n{err}", errorNode.NodeId, ex);
+ Logger.Error("Failed to discover nodes from server {s} cause:\n{err}", errorNode?.NodeId, ex);
}
}
}