aboutsummaryrefslogtreecommitdiff
path: root/plugins/VNLib.Data.Caching.Providers.VNCache/src
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/VNLib.Data.Caching.Providers.VNCache/src')
-rw-r--r--plugins/VNLib.Data.Caching.Providers.VNCache/src/Clustering/ClusterNodeIndex.cs14
-rw-r--r--plugins/VNLib.Data.Caching.Providers.VNCache/src/FBMCacheClient.cs88
-rw-r--r--plugins/VNLib.Data.Caching.Providers.VNCache/src/VnCacheClientConfig.cs12
3 files changed, 76 insertions, 38 deletions
diff --git a/plugins/VNLib.Data.Caching.Providers.VNCache/src/Clustering/ClusterNodeIndex.cs b/plugins/VNLib.Data.Caching.Providers.VNCache/src/Clustering/ClusterNodeIndex.cs
index c9cd746..e9dcbc5 100644
--- a/plugins/VNLib.Data.Caching.Providers.VNCache/src/Clustering/ClusterNodeIndex.cs
+++ b/plugins/VNLib.Data.Caching.Providers.VNCache/src/Clustering/ClusterNodeIndex.cs
@@ -46,7 +46,7 @@ namespace VNLib.Data.Caching.Providers.VNCache.Clustering
* it in the app domain.
*/
- public static IClusterNodeIndex CreateIndex(CacheClientConfiguration config)
+ public static IClusterNodeIndex CreateIndex(VNCacheClusterManager cluster)
{
/* TEMPORARY:
* Named semaphores are only supported on Windows, which allowed synchronized communication between
@@ -75,7 +75,7 @@ namespace VNLib.Data.Caching.Providers.VNCache.Clustering
if (remoteIndex == null)
{
//Create a new index and store it in the app domain
- IClusterNodeIndex index = new LocalHandler(config);
+ IClusterNodeIndex index = new LocalHandler(cluster);
AppDomain.CurrentDomain.SetData(APP_DOMAIN_KEY, index);
return index;
}
@@ -92,7 +92,7 @@ namespace VNLib.Data.Caching.Providers.VNCache.Clustering
}
else
{
- return new LocalHandler(config);
+ return new LocalHandler(cluster);
}
}
@@ -114,15 +114,15 @@ namespace VNLib.Data.Caching.Providers.VNCache.Clustering
* Unless VNLib.Core supports a new way to safley share types across ALCs, this is my solution.
*/
- sealed class LocalHandler(CacheClientConfiguration Config) : IClusterNodeIndex, IIntervalScheduleable
+ sealed class LocalHandler(VNCacheClusterManager cluster) : IClusterNodeIndex, IIntervalScheduleable
{
private Task _currentUpdate = Task.CompletedTask;
///<inheritdoc/>
public CacheNodeAdvertisment? GetNextNode()
{
- //Get all nodes
- CacheNodeAdvertisment[] ads = Config.NodeCollection.GetAllNodes();
+ //Get all discovered nodes
+ CacheNodeAdvertisment[] ads = cluster.DiscoveredNodes.GetAllNodes();
//Just get a random node from the collection for now
return ads.Length > 0 ? ads.SelectRandom() : null;
}
@@ -134,7 +134,7 @@ namespace VNLib.Data.Caching.Providers.VNCache.Clustering
public Task OnIntervalAsync(ILogProvider log, CancellationToken cancellationToken)
{
//Run discovery operation and update the task
- _currentUpdate = Config.DiscoverNodesAsync(cancellationToken);
+ _currentUpdate = cluster.DiscoverNodesAsync(cancellationToken);
return Task.CompletedTask;
}
diff --git a/plugins/VNLib.Data.Caching.Providers.VNCache/src/FBMCacheClient.cs b/plugins/VNLib.Data.Caching.Providers.VNCache/src/FBMCacheClient.cs
index 73783dc..07fc9ee 100644
--- a/plugins/VNLib.Data.Caching.Providers.VNCache/src/FBMCacheClient.cs
+++ b/plugins/VNLib.Data.Caching.Providers.VNCache/src/FBMCacheClient.cs
@@ -59,7 +59,7 @@ namespace VNLib.Data.Caching.Providers.VNCache
private readonly VnCacheClientConfig _config;
private readonly IClusterNodeIndex _index;
- private readonly FBMClientFactory _clientFactory;
+ private readonly VNCacheClusterClient _cluster;
private readonly TimeSpan _initNodeDelay;
private bool _isConnected;
@@ -83,9 +83,8 @@ namespace VNLib.Data.Caching.Providers.VNCache
{
ILogProvider scoped = plugin.Log.CreateScope(LOG_NAME);
- //Set authenticator and error handler
- _clientFactory.GetCacheConfiguration()
- .WithAuthenticator(new AuthManager(plugin))
+ //When in plugin context, we can use plugin local secrets and a log-based error handler
+ _cluster.Config.WithAuthenticator(new AuthManager(plugin))
.WithErrorHandler(new DiscoveryErrHAndler(scoped));
//Only the master index is schedulable
@@ -116,17 +115,20 @@ namespace VNLib.Data.Caching.Providers.VNCache
//Init the client with default settings
FBMClientConfig conf = FBMDataCacheExtensions.GetDefaultConfig(BufferHeap, (int)config.MaxBlobSize, config.RequestTimeout, debugLog);
-
- FBMFallbackClientWsFactory wsFactory = new();
- _clientFactory = new(in conf, wsFactory);
-
- //Add the configuration to the client
- _clientFactory.GetCacheConfiguration()
+
+ FBMClientFactory clientFactory = new(
+ in conf,
+ new FBMFallbackClientWsFactory(),
+ 10
+ );
+
+ _cluster = (new CacheClientConfiguration())
.WithTls(config.UseTls)
- .WithInitialPeers(config.GetInitialNodeUris());
+ .WithInitialPeers(config.GetInitialNodeUris())
+ .ToClusterClient(clientFactory);
//Init index
- _index = ClusterNodeIndex.CreateIndex(_clientFactory.GetCacheConfiguration());
+ _index = ClusterNodeIndex.CreateIndex(_cluster);
}
/*
@@ -216,7 +218,7 @@ namespace VNLib.Data.Caching.Providers.VNCache
pluginLog.Debug("Connecting to {node}", node);
//Connect to the node and save new client
- _client = await _clientFactory.ConnectToCacheAsync(node, exitToken);
+ _client = await _cluster.ConnectToCacheAsync(node, exitToken);
if (pluginLog.IsEnabled(LogLevel.Debug))
{
@@ -256,6 +258,12 @@ namespace VNLib.Data.Caching.Providers.VNCache
pluginLog.Verbose("Stack trace: {re}", he);
await Task.Delay(1000, exitToken);
}
+ catch(HttpRequestException hre) when (hre.InnerException is SocketException se)
+ {
+ pluginLog.Warn("Failed to establish a TCP connection to server {server} {reason}", node.NodeId, se.Message);
+ pluginLog.Verbose("Stack trace: {re}", se);
+ await Task.Delay(1000, exitToken);
+ }
finally
{
_isConnected = false;
@@ -327,22 +335,11 @@ namespace VNLib.Data.Caching.Providers.VNCache
///<inheritdoc/>
public override object GetUnderlyingStore() => _client ?? throw new InvalidOperationException("The client is not currently connected");
- private sealed class AuthManager : ICacheAuthManager
+ private sealed class AuthManager(PluginBase plugin) : ICacheAuthManager
{
- private IAsyncLazy<ReadOnlyJsonWebKey> _sigKey;
- private IAsyncLazy<ReadOnlyJsonWebKey> _verKey;
-
- public AuthManager(PluginBase plugin)
- {
- //Lazy load keys
-
- //Get the signing key
- _sigKey = plugin.GetSecretAsync("client_private_key").ToLazy(static r => r.GetJsonWebKey());
-
- //Lazy load cache public key
- _verKey = plugin.GetSecretAsync("cache_public_key").ToLazy(static r => r.GetJsonWebKey());
- }
+ private IAsyncLazy<ReadOnlyJsonWebKey> _sigKey = plugin.GetSecretAsync("client_private_key").ToLazy(static r => r.GetJsonWebKey());
+ private IAsyncLazy<ReadOnlyJsonWebKey> _verKey = plugin.GetSecretAsync("cache_public_key").ToLazy(static r => r.GetJsonWebKey());
public async Task AwaitLazyKeyLoad()
{
@@ -413,9 +410,42 @@ namespace VNLib.Data.Caching.Providers.VNCache
private sealed record class DiscoveryErrHAndler(ILogProvider Logger) : ICacheDiscoveryErrorHandler
{
- public void OnDiscoveryError(CacheNodeAdvertisment errorNode, Exception ex)
+ public void OnDiscoveryError(CacheNodeAdvertisment errorNode, Exception ex)
+ => OnDiscoveryError(errorNode, ex);
+
+ public void OnDiscoveryError(Uri errorAddress, Exception ex)
+ => OnDiscoveryError(ex, null, errorAddress);
+
+ public void OnDiscoveryError(Exception ex, CacheNodeAdvertisment? errorNode, Uri? address)
{
- Logger.Error("Failed to discover nodes from server {s} cause:\n{err}", errorNode?.NodeId, ex);
+ string node = errorNode?.NodeId ?? address?.ToString() ?? "unknown";
+
+ if(ex is HttpRequestException he)
+ {
+ if(he.InnerException is SocketException se)
+ {
+ LogErrorException(se);
+ return;
+ }
+
+ LogErrorException(he);
+ return;
+ }
+
+ LogErrorException(ex);
+ return;
+
+ void LogErrorException(Exception ex)
+ {
+ if(Logger.IsEnabled(LogLevel.Debug))
+ {
+ Logger.Error("Failed to discover nodes from server {s} cause:\n{err}", node, ex);
+ }
+ else
+ {
+ Logger.Error("Failed to discover nodes from server {s} cause:\n{err}", node, ex.Message);
+ }
+ }
}
}
}
diff --git a/plugins/VNLib.Data.Caching.Providers.VNCache/src/VnCacheClientConfig.cs b/plugins/VNLib.Data.Caching.Providers.VNCache/src/VnCacheClientConfig.cs
index 383c979..0d6cd34 100644
--- a/plugins/VNLib.Data.Caching.Providers.VNCache/src/VnCacheClientConfig.cs
+++ b/plugins/VNLib.Data.Caching.Providers.VNCache/src/VnCacheClientConfig.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2023 Vaughn Nugent
+* Copyright (c) 2024 Vaughn Nugent
*
* Library: VNLib
* Package: VNLib.Data.Caching.Providers.VNCache
@@ -33,6 +33,8 @@ namespace VNLib.Data.Caching.Providers.VNCache
/// </summary>
public class VnCacheClientConfig : VNCacheConfig
{
+ const string DefaultWellKnownEndpoint = "/.well-known/vncache";
+
/// <summary>
/// The broker server address
/// </summary>
@@ -88,7 +90,13 @@ namespace VNLib.Data.Caching.Providers.VNCache
public Uri[] GetInitialNodeUris()
{
_ = InitialNodes ?? throw new InvalidOperationException("Initial nodes have not been set");
- return InitialNodes.Select(static x => new Uri(x, UriKind.Absolute)).ToArray();
+ return InitialNodes.Select(static x =>
+ {
+ //Append a default well known endpoint if the path is just a root
+ Uri ur = new (x, UriKind.Absolute);
+ return ur.LocalPath == "/" ? new Uri(ur, DefaultWellKnownEndpoint) : ur;
+ })
+ .ToArray();
}
///<inheritdoc/>