aboutsummaryrefslogtreecommitdiff
path: root/plugins/VNLib.Data.Caching.Providers.VNCache/src/FBMCacheClient.cs
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/VNLib.Data.Caching.Providers.VNCache/src/FBMCacheClient.cs')
-rw-r--r--plugins/VNLib.Data.Caching.Providers.VNCache/src/FBMCacheClient.cs88
1 files changed, 59 insertions, 29 deletions
diff --git a/plugins/VNLib.Data.Caching.Providers.VNCache/src/FBMCacheClient.cs b/plugins/VNLib.Data.Caching.Providers.VNCache/src/FBMCacheClient.cs
index 73783dc..07fc9ee 100644
--- a/plugins/VNLib.Data.Caching.Providers.VNCache/src/FBMCacheClient.cs
+++ b/plugins/VNLib.Data.Caching.Providers.VNCache/src/FBMCacheClient.cs
@@ -59,7 +59,7 @@ namespace VNLib.Data.Caching.Providers.VNCache
private readonly VnCacheClientConfig _config;
private readonly IClusterNodeIndex _index;
- private readonly FBMClientFactory _clientFactory;
+ private readonly VNCacheClusterClient _cluster;
private readonly TimeSpan _initNodeDelay;
private bool _isConnected;
@@ -83,9 +83,8 @@ namespace VNLib.Data.Caching.Providers.VNCache
{
ILogProvider scoped = plugin.Log.CreateScope(LOG_NAME);
- //Set authenticator and error handler
- _clientFactory.GetCacheConfiguration()
- .WithAuthenticator(new AuthManager(plugin))
+ //When in plugin context, we can use plugin local secrets and a log-based error handler
+ _cluster.Config.WithAuthenticator(new AuthManager(plugin))
.WithErrorHandler(new DiscoveryErrHAndler(scoped));
//Only the master index is schedulable
@@ -116,17 +115,20 @@ namespace VNLib.Data.Caching.Providers.VNCache
//Init the client with default settings
FBMClientConfig conf = FBMDataCacheExtensions.GetDefaultConfig(BufferHeap, (int)config.MaxBlobSize, config.RequestTimeout, debugLog);
-
- FBMFallbackClientWsFactory wsFactory = new();
- _clientFactory = new(in conf, wsFactory);
-
- //Add the configuration to the client
- _clientFactory.GetCacheConfiguration()
+
+ FBMClientFactory clientFactory = new(
+ in conf,
+ new FBMFallbackClientWsFactory(),
+ 10
+ );
+
+ _cluster = (new CacheClientConfiguration())
.WithTls(config.UseTls)
- .WithInitialPeers(config.GetInitialNodeUris());
+ .WithInitialPeers(config.GetInitialNodeUris())
+ .ToClusterClient(clientFactory);
//Init index
- _index = ClusterNodeIndex.CreateIndex(_clientFactory.GetCacheConfiguration());
+ _index = ClusterNodeIndex.CreateIndex(_cluster);
}
/*
@@ -216,7 +218,7 @@ namespace VNLib.Data.Caching.Providers.VNCache
pluginLog.Debug("Connecting to {node}", node);
//Connect to the node and save new client
- _client = await _clientFactory.ConnectToCacheAsync(node, exitToken);
+ _client = await _cluster.ConnectToCacheAsync(node, exitToken);
if (pluginLog.IsEnabled(LogLevel.Debug))
{
@@ -256,6 +258,12 @@ namespace VNLib.Data.Caching.Providers.VNCache
pluginLog.Verbose("Stack trace: {re}", he);
await Task.Delay(1000, exitToken);
}
+ catch(HttpRequestException hre) when (hre.InnerException is SocketException se)
+ {
+ pluginLog.Warn("Failed to establish a TCP connection to server {server} {reason}", node.NodeId, se.Message);
+ pluginLog.Verbose("Stack trace: {re}", se);
+ await Task.Delay(1000, exitToken);
+ }
finally
{
_isConnected = false;
@@ -327,22 +335,11 @@ namespace VNLib.Data.Caching.Providers.VNCache
///<inheritdoc/>
public override object GetUnderlyingStore() => _client ?? throw new InvalidOperationException("The client is not currently connected");
- private sealed class AuthManager : ICacheAuthManager
+ private sealed class AuthManager(PluginBase plugin) : ICacheAuthManager
{
- private IAsyncLazy<ReadOnlyJsonWebKey> _sigKey;
- private IAsyncLazy<ReadOnlyJsonWebKey> _verKey;
-
- public AuthManager(PluginBase plugin)
- {
- //Lazy load keys
-
- //Get the signing key
- _sigKey = plugin.GetSecretAsync("client_private_key").ToLazy(static r => r.GetJsonWebKey());
-
- //Lazy load cache public key
- _verKey = plugin.GetSecretAsync("cache_public_key").ToLazy(static r => r.GetJsonWebKey());
- }
+ private IAsyncLazy<ReadOnlyJsonWebKey> _sigKey = plugin.GetSecretAsync("client_private_key").ToLazy(static r => r.GetJsonWebKey());
+ private IAsyncLazy<ReadOnlyJsonWebKey> _verKey = plugin.GetSecretAsync("cache_public_key").ToLazy(static r => r.GetJsonWebKey());
public async Task AwaitLazyKeyLoad()
{
@@ -413,9 +410,42 @@ namespace VNLib.Data.Caching.Providers.VNCache
private sealed record class DiscoveryErrHAndler(ILogProvider Logger) : ICacheDiscoveryErrorHandler
{
- public void OnDiscoveryError(CacheNodeAdvertisment errorNode, Exception ex)
+ public void OnDiscoveryError(CacheNodeAdvertisment errorNode, Exception ex)
+ => OnDiscoveryError(errorNode, ex);
+
+ public void OnDiscoveryError(Uri errorAddress, Exception ex)
+ => OnDiscoveryError(ex, null, errorAddress);
+
+ public void OnDiscoveryError(Exception ex, CacheNodeAdvertisment? errorNode, Uri? address)
{
- Logger.Error("Failed to discover nodes from server {s} cause:\n{err}", errorNode?.NodeId, ex);
+ string node = errorNode?.NodeId ?? address?.ToString() ?? "unknown";
+
+ if(ex is HttpRequestException he)
+ {
+ if(he.InnerException is SocketException se)
+ {
+ LogErrorException(se);
+ return;
+ }
+
+ LogErrorException(he);
+ return;
+ }
+
+ LogErrorException(ex);
+ return;
+
+ void LogErrorException(Exception ex)
+ {
+ if(Logger.IsEnabled(LogLevel.Debug))
+ {
+ Logger.Error("Failed to discover nodes from server {s} cause:\n{err}", node, ex);
+ }
+ else
+ {
+ Logger.Error("Failed to discover nodes from server {s} cause:\n{err}", node, ex.Message);
+ }
+ }
}
}
}