diff options
Diffstat (limited to 'lib/VNLib.Plugins.Extensions.VNCache/src')
3 files changed, 54 insertions, 72 deletions
diff --git a/lib/VNLib.Plugins.Extensions.VNCache/src/RemoteCacheOperator.cs b/lib/VNLib.Plugins.Extensions.VNCache/src/RemoteCacheOperator.cs index 1f0742d..f40f746 100644 --- a/lib/VNLib.Plugins.Extensions.VNCache/src/RemoteCacheOperator.cs +++ b/lib/VNLib.Plugins.Extensions.VNCache/src/RemoteCacheOperator.cs @@ -26,9 +26,9 @@ using System; using System.Threading; using System.Threading.Tasks; +using VNLib.Utils.Logging; using VNLib.Data.Caching; using VNLib.Plugins.Extensions.Loading; -using VNLib.Utils.Logging; namespace VNLib.Plugins.Extensions.VNCache { @@ -41,7 +41,7 @@ namespace VNLib.Plugins.Extensions.VNCache /// The background work method must be sheduled for the cache client to be /// connected to the backing store /// </remarks> - public sealed class RemoteCacheOperator : IAsyncBackgroundWork, IAsyncConfigurable + public sealed class RemoteCacheOperator : IAsyncBackgroundWork { private readonly VnCacheClient _client; private CancellationTokenSource? _tokenSource; @@ -78,11 +78,6 @@ namespace VNLib.Plugins.Extensions.VNCache /// Cancels the background cache client listener /// </summary> public void CancelListener() => _tokenSource?.Cancel(); - - ///<inheritdoc/> - public Task ConfigureServiceAsync(PluginBase plugin) - { - return _client.ConfigureServiceAsync(plugin); - } + } }
\ No newline at end of file diff --git a/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs b/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs index f4f059b..aa3d88f 100644 --- a/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs +++ b/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs @@ -40,7 +40,7 @@ using VNLib.Data.Caching.Extensions; using VNLib.Data.Caching.ObjectCache; using VNLib.Net.Messaging.FBM.Client; using VNLib.Plugins.Extensions.Loading; - +using VNLib.Data.Caching.Extensions.Clustering; namespace VNLib.Plugins.Extensions.VNCache { @@ -56,7 +56,7 @@ namespace VNLib.Plugins.Extensions.VNCache /// A base class that manages /// </summary> [ConfigurationName(VNCacheExtensions.CACHE_CONFIG_KEY)] - internal class VnCacheClient : IGlobalCacheProvider, IAsyncBackgroundWork, IAsyncConfigurable + internal class VnCacheClient : IGlobalCacheProvider, IAsyncBackgroundWork { private readonly VnCacheClientConfig _config; @@ -70,12 +70,17 @@ namespace VNLib.Plugins.Extensions.VNCache /// </summary> public bool IsConnected { get; private set; } - public VnCacheClient(PluginBase pbase, IConfigScope config) + public VnCacheClient(PluginBase plugin, IConfigScope config) :this( - config.Deserialze<VnCacheClientConfig>(), - pbase.IsDebug() ? pbase.Log : null + config.Deserialze<VnCacheClientConfig>(), + plugin.IsDebug() ? plugin.Log : null ) - {} + { + //Set authenticator and error handler + Client.GetCacheConfiguration() + .WithAuthenticator(new AuthManager(plugin)) + .WithErrorHandler(new DiscoveryErrHAndler(plugin.Log)); + } public VnCacheClient(VnCacheClientConfig config, ILogProvider? debugLog) { @@ -92,17 +97,7 @@ namespace VNLib.Plugins.Extensions.VNCache //Add the configuration to the client Client.GetCacheConfiguration() .WithTls(config.UseTls) - .WithInitialPeers(config.InitialNodes!); - } - - public Task ConfigureServiceAsync(PluginBase plugin) - { - //Set authenticator - Client.GetCacheConfiguration() - .WithAuthenticator(new AuthManager(plugin)) - .WithErrorHandler(new DiscoveryErrHAndler(plugin.Log)); - - return Task.CompletedTask; + .WithInitialPeers(config.GetInitialNodeUris()); } /* @@ -115,21 +110,25 @@ namespace VNLib.Plugins.Extensions.VNCache { while (true) { - //Load the server list - ICacheNodeAdvertisment[]? servers; + //Discover nodes in the network + while (true) { try { - pluginLog.Debug("Discovering cluster nodes in broker"); + pluginLog.Debug("Discovering cluster nodes in network"); //Get server list - servers = await Client.DiscoverCacheNodesAsync(exitToken); + await Client.DiscoverAvailableNodesAsync(exitToken); break; } catch (HttpRequestException re) when (re.InnerException is SocketException) { pluginLog.Warn("Broker server is unreachable"); } + catch(CacheDiscoveryFailureException ce) + { + pluginLog.Warn("Failed to discover cache servers, reason {r}", ce); + } catch (Exception ex) { pluginLog.Warn("Failed to get server list from broker, reason {r}", ex.Message); @@ -140,19 +139,12 @@ namespace VNLib.Plugins.Extensions.VNCache await Task.Delay(randomMsDelay, exitToken); } - if (servers?.Length == 0) - { - pluginLog.Warn("No cluster nodes found, retrying"); - await Task.Delay(_config.RetryInterval, exitToken); - continue; - } - try { pluginLog.Debug("Connecting to random cache server"); //Connect to a random server - ICacheNodeAdvertisment selected = await Client.ConnectToRandomCacheAsync(exitToken); + CacheNodeAdvertisment selected = await Client.ConnectToRandomCacheAsync(exitToken); pluginLog.Debug("Connected to cache server {s}", selected.NodeId); //Set connection status flag @@ -304,13 +296,13 @@ namespace VNLib.Plugins.Extensions.VNCache } ///<inheritdoc/> - public bool VerifyJwt(JsonWebToken jwt) + public bool VerifyJwt(JsonWebToken jwt, bool isPeer) { return jwt.VerifyFromJwk(_verKey.Value); } ///<inheritdoc/> - public bool VerifyMessageHash(ReadOnlySpan<byte> hash, HashAlg alg, ReadOnlySpan<byte> signature) + public bool VerifyMessageHash(ReadOnlySpan<byte> hash, HashAlg alg, ReadOnlySpan<byte> signature, bool isPeer) { //try to get the rsa alg for the signing key using RSA? rsa = _verKey.Value.GetRSAPublicKey(); @@ -332,7 +324,7 @@ namespace VNLib.Plugins.Extensions.VNCache private sealed record class DiscoveryErrHAndler(ILogProvider Logger) : ICacheDiscoveryErrorHandler { - public void OnDiscoveryError(ICacheNodeAdvertisment errorNode, Exception ex) + public void OnDiscoveryError(CacheNodeAdvertisment errorNode, Exception ex) { Logger.Error("Failed to discover nodes from server {s} cause:\n{err}", errorNode.NodeId, ex); } diff --git a/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClientConfig.cs b/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClientConfig.cs index 64d3e07..73fb70f 100644 --- a/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClientConfig.cs +++ b/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClientConfig.cs @@ -23,9 +23,9 @@ */ using System; +using System.Linq; using System.Text.Json.Serialization; -using VNLib.Data.Caching.Extensions; using VNLib.Plugins.Extensions.Loading; namespace VNLib.Plugins.Extensions.VNCache @@ -40,7 +40,7 @@ namespace VNLib.Plugins.Extensions.VNCache /// cache server. This value will be negotiated with the server /// during a connection upgrade /// </summary> - [JsonPropertyName("max_message_size")] + [JsonPropertyName("max_object_size")] public int? MaxMessageSize { get; set; } /// <summary> @@ -82,7 +82,18 @@ namespace VNLib.Plugins.Extensions.VNCache /// The initial peers to connect to /// </summary> [JsonPropertyName("initial_nodes")] - public InitialNode[]? InitialNodes { get; set; } + public string[]? InitialNodes { get; set; } + + /// <summary> + /// Gets the initial nodes as a collection of URIs + /// </summary> + /// <returns>The nodes as a collection of URIs</returns> + /// <exception cref="InvalidOperationException"></exception> + public Uri[] GetInitialNodeUris() + { + _ = InitialNodes ?? throw new InvalidOperationException("Initial nodes have not been set"); + return InitialNodes.Select(x => new Uri(x)).ToArray(); + } void IOnConfigValidation.Validate() { @@ -108,37 +119,21 @@ namespace VNLib.Plugins.Extensions.VNCache throw new ArgumentException("You must specify at least one initial peer", "initial_peers"); } - foreach (InitialNode peer in InitialNodes) - { - _ = peer.ConnectEndpoint ?? throw new ArgumentException("You must specify a connect endpoint for each initial node", "initial_nodes"); - _ = peer.NodeId ?? throw new ArgumentException("You must specify a node id for each initial node", "initial_nodes"); - } - } - - public sealed record class InitialNode : ICacheNodeAdvertisment - { - [JsonIgnore] - public Uri ConnectEndpoint { get; private set; } - - [JsonIgnore] - public Uri? DiscoveryEndpoint { get; private set; } - - [JsonPropertyName("node_id")] - public string? NodeId { get; set; } - - [JsonPropertyName("connect_endpoint")] - public string? ConnectEndpointString - { - get => ConnectEndpoint.ToString(); - set => ConnectEndpoint = new Uri(value!); - } - - [JsonPropertyName("discovery_endpoint")] - public string? DiscoveryEndpointString + //Validate initial nodes + foreach (Uri peer in GetInitialNodeUris()) { - get => DiscoveryEndpoint?.ToString(); - set => DiscoveryEndpoint = value == null ? null : new Uri(value); + if (!peer.IsAbsoluteUri) + { + throw new ArgumentException("You must specify an absolute URI for each initial node", "initial_nodes"); + } + + //Verify http connection + if(peer.Scheme != Uri.UriSchemeHttp || peer.Scheme != Uri.UriSchemeHttps) + { + throw new ArgumentException("You must specify an HTTP or HTTPS URI for each initial node", "initial_nodes"); + } } } + } }
\ No newline at end of file |