aboutsummaryrefslogtreecommitdiff
path: root/lib/VNLib.Plugins.Extensions.VNCache/src
diff options
context:
space:
mode:
Diffstat (limited to 'lib/VNLib.Plugins.Extensions.VNCache/src')
-rw-r--r--lib/VNLib.Plugins.Extensions.VNCache/src/RemoteCacheOperator.cs11
-rw-r--r--lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs56
-rw-r--r--lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClientConfig.cs59
3 files changed, 54 insertions, 72 deletions
diff --git a/lib/VNLib.Plugins.Extensions.VNCache/src/RemoteCacheOperator.cs b/lib/VNLib.Plugins.Extensions.VNCache/src/RemoteCacheOperator.cs
index 1f0742d..f40f746 100644
--- a/lib/VNLib.Plugins.Extensions.VNCache/src/RemoteCacheOperator.cs
+++ b/lib/VNLib.Plugins.Extensions.VNCache/src/RemoteCacheOperator.cs
@@ -26,9 +26,9 @@ using System;
using System.Threading;
using System.Threading.Tasks;
+using VNLib.Utils.Logging;
using VNLib.Data.Caching;
using VNLib.Plugins.Extensions.Loading;
-using VNLib.Utils.Logging;
namespace VNLib.Plugins.Extensions.VNCache
{
@@ -41,7 +41,7 @@ namespace VNLib.Plugins.Extensions.VNCache
/// The background work method must be sheduled for the cache client to be
/// connected to the backing store
/// </remarks>
- public sealed class RemoteCacheOperator : IAsyncBackgroundWork, IAsyncConfigurable
+ public sealed class RemoteCacheOperator : IAsyncBackgroundWork
{
private readonly VnCacheClient _client;
private CancellationTokenSource? _tokenSource;
@@ -78,11 +78,6 @@ namespace VNLib.Plugins.Extensions.VNCache
/// Cancels the background cache client listener
/// </summary>
public void CancelListener() => _tokenSource?.Cancel();
-
- ///<inheritdoc/>
- public Task ConfigureServiceAsync(PluginBase plugin)
- {
- return _client.ConfigureServiceAsync(plugin);
- }
+
}
} \ No newline at end of file
diff --git a/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs b/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs
index f4f059b..aa3d88f 100644
--- a/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs
+++ b/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs
@@ -40,7 +40,7 @@ using VNLib.Data.Caching.Extensions;
using VNLib.Data.Caching.ObjectCache;
using VNLib.Net.Messaging.FBM.Client;
using VNLib.Plugins.Extensions.Loading;
-
+using VNLib.Data.Caching.Extensions.Clustering;
namespace VNLib.Plugins.Extensions.VNCache
{
@@ -56,7 +56,7 @@ namespace VNLib.Plugins.Extensions.VNCache
/// A base class that manages
/// </summary>
[ConfigurationName(VNCacheExtensions.CACHE_CONFIG_KEY)]
- internal class VnCacheClient : IGlobalCacheProvider, IAsyncBackgroundWork, IAsyncConfigurable
+ internal class VnCacheClient : IGlobalCacheProvider, IAsyncBackgroundWork
{
private readonly VnCacheClientConfig _config;
@@ -70,12 +70,17 @@ namespace VNLib.Plugins.Extensions.VNCache
/// </summary>
public bool IsConnected { get; private set; }
- public VnCacheClient(PluginBase pbase, IConfigScope config)
+ public VnCacheClient(PluginBase plugin, IConfigScope config)
:this(
- config.Deserialze<VnCacheClientConfig>(),
- pbase.IsDebug() ? pbase.Log : null
+ config.Deserialze<VnCacheClientConfig>(),
+ plugin.IsDebug() ? plugin.Log : null
)
- {}
+ {
+ //Set authenticator and error handler
+ Client.GetCacheConfiguration()
+ .WithAuthenticator(new AuthManager(plugin))
+ .WithErrorHandler(new DiscoveryErrHAndler(plugin.Log));
+ }
public VnCacheClient(VnCacheClientConfig config, ILogProvider? debugLog)
{
@@ -92,17 +97,7 @@ namespace VNLib.Plugins.Extensions.VNCache
//Add the configuration to the client
Client.GetCacheConfiguration()
.WithTls(config.UseTls)
- .WithInitialPeers(config.InitialNodes!);
- }
-
- public Task ConfigureServiceAsync(PluginBase plugin)
- {
- //Set authenticator
- Client.GetCacheConfiguration()
- .WithAuthenticator(new AuthManager(plugin))
- .WithErrorHandler(new DiscoveryErrHAndler(plugin.Log));
-
- return Task.CompletedTask;
+ .WithInitialPeers(config.GetInitialNodeUris());
}
/*
@@ -115,21 +110,25 @@ namespace VNLib.Plugins.Extensions.VNCache
{
while (true)
{
- //Load the server list
- ICacheNodeAdvertisment[]? servers;
+ //Discover nodes in the network
+
while (true)
{
try
{
- pluginLog.Debug("Discovering cluster nodes in broker");
+ pluginLog.Debug("Discovering cluster nodes in network");
//Get server list
- servers = await Client.DiscoverCacheNodesAsync(exitToken);
+ await Client.DiscoverAvailableNodesAsync(exitToken);
break;
}
catch (HttpRequestException re) when (re.InnerException is SocketException)
{
pluginLog.Warn("Broker server is unreachable");
}
+ catch(CacheDiscoveryFailureException ce)
+ {
+ pluginLog.Warn("Failed to discover cache servers, reason {r}", ce);
+ }
catch (Exception ex)
{
pluginLog.Warn("Failed to get server list from broker, reason {r}", ex.Message);
@@ -140,19 +139,12 @@ namespace VNLib.Plugins.Extensions.VNCache
await Task.Delay(randomMsDelay, exitToken);
}
- if (servers?.Length == 0)
- {
- pluginLog.Warn("No cluster nodes found, retrying");
- await Task.Delay(_config.RetryInterval, exitToken);
- continue;
- }
-
try
{
pluginLog.Debug("Connecting to random cache server");
//Connect to a random server
- ICacheNodeAdvertisment selected = await Client.ConnectToRandomCacheAsync(exitToken);
+ CacheNodeAdvertisment selected = await Client.ConnectToRandomCacheAsync(exitToken);
pluginLog.Debug("Connected to cache server {s}", selected.NodeId);
//Set connection status flag
@@ -304,13 +296,13 @@ namespace VNLib.Plugins.Extensions.VNCache
}
///<inheritdoc/>
- public bool VerifyJwt(JsonWebToken jwt)
+ public bool VerifyJwt(JsonWebToken jwt, bool isPeer)
{
return jwt.VerifyFromJwk(_verKey.Value);
}
///<inheritdoc/>
- public bool VerifyMessageHash(ReadOnlySpan<byte> hash, HashAlg alg, ReadOnlySpan<byte> signature)
+ public bool VerifyMessageHash(ReadOnlySpan<byte> hash, HashAlg alg, ReadOnlySpan<byte> signature, bool isPeer)
{
//try to get the rsa alg for the signing key
using RSA? rsa = _verKey.Value.GetRSAPublicKey();
@@ -332,7 +324,7 @@ namespace VNLib.Plugins.Extensions.VNCache
private sealed record class DiscoveryErrHAndler(ILogProvider Logger) : ICacheDiscoveryErrorHandler
{
- public void OnDiscoveryError(ICacheNodeAdvertisment errorNode, Exception ex)
+ public void OnDiscoveryError(CacheNodeAdvertisment errorNode, Exception ex)
{
Logger.Error("Failed to discover nodes from server {s} cause:\n{err}", errorNode.NodeId, ex);
}
diff --git a/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClientConfig.cs b/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClientConfig.cs
index 64d3e07..73fb70f 100644
--- a/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClientConfig.cs
+++ b/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClientConfig.cs
@@ -23,9 +23,9 @@
*/
using System;
+using System.Linq;
using System.Text.Json.Serialization;
-using VNLib.Data.Caching.Extensions;
using VNLib.Plugins.Extensions.Loading;
namespace VNLib.Plugins.Extensions.VNCache
@@ -40,7 +40,7 @@ namespace VNLib.Plugins.Extensions.VNCache
/// cache server. This value will be negotiated with the server
/// during a connection upgrade
/// </summary>
- [JsonPropertyName("max_message_size")]
+ [JsonPropertyName("max_object_size")]
public int? MaxMessageSize { get; set; }
/// <summary>
@@ -82,7 +82,18 @@ namespace VNLib.Plugins.Extensions.VNCache
/// The initial peers to connect to
/// </summary>
[JsonPropertyName("initial_nodes")]
- public InitialNode[]? InitialNodes { get; set; }
+ public string[]? InitialNodes { get; set; }
+
+ /// <summary>
+ /// Gets the initial nodes as a collection of URIs
+ /// </summary>
+ /// <returns>The nodes as a collection of URIs</returns>
+ /// <exception cref="InvalidOperationException"></exception>
+ public Uri[] GetInitialNodeUris()
+ {
+ _ = InitialNodes ?? throw new InvalidOperationException("Initial nodes have not been set");
+ return InitialNodes.Select(x => new Uri(x)).ToArray();
+ }
void IOnConfigValidation.Validate()
{
@@ -108,37 +119,21 @@ namespace VNLib.Plugins.Extensions.VNCache
throw new ArgumentException("You must specify at least one initial peer", "initial_peers");
}
- foreach (InitialNode peer in InitialNodes)
- {
- _ = peer.ConnectEndpoint ?? throw new ArgumentException("You must specify a connect endpoint for each initial node", "initial_nodes");
- _ = peer.NodeId ?? throw new ArgumentException("You must specify a node id for each initial node", "initial_nodes");
- }
- }
-
- public sealed record class InitialNode : ICacheNodeAdvertisment
- {
- [JsonIgnore]
- public Uri ConnectEndpoint { get; private set; }
-
- [JsonIgnore]
- public Uri? DiscoveryEndpoint { get; private set; }
-
- [JsonPropertyName("node_id")]
- public string? NodeId { get; set; }
-
- [JsonPropertyName("connect_endpoint")]
- public string? ConnectEndpointString
- {
- get => ConnectEndpoint.ToString();
- set => ConnectEndpoint = new Uri(value!);
- }
-
- [JsonPropertyName("discovery_endpoint")]
- public string? DiscoveryEndpointString
+ //Validate initial nodes
+ foreach (Uri peer in GetInitialNodeUris())
{
- get => DiscoveryEndpoint?.ToString();
- set => DiscoveryEndpoint = value == null ? null : new Uri(value);
+ if (!peer.IsAbsoluteUri)
+ {
+ throw new ArgumentException("You must specify an absolute URI for each initial node", "initial_nodes");
+ }
+
+ //Verify http connection
+ if(peer.Scheme != Uri.UriSchemeHttp || peer.Scheme != Uri.UriSchemeHttps)
+ {
+ throw new ArgumentException("You must specify an HTTP or HTTPS URI for each initial node", "initial_nodes");
+ }
}
}
+
}
} \ No newline at end of file