From 1a8ab1457244d15b19ddcc94958f645f5ec2abc7 Mon Sep 17 00:00:00 2001 From: vnugent Date: Thu, 22 Jun 2023 21:16:28 -0400 Subject: Save checkpoint --- .../src/VnCacheClient.cs | 132 +++++++++++++++++---- .../src/VnCacheClientConfig.cs | 48 +++++++- 2 files changed, 151 insertions(+), 29 deletions(-) (limited to 'lib/VNLib.Plugins.Extensions.VNCache') diff --git a/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs b/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs index 9f5ccfe..f4f059b 100644 --- a/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs +++ b/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs @@ -31,15 +31,17 @@ using System.Net.WebSockets; using System.Collections.Generic; using System.Security.Cryptography; +using VNLib.Hashing; +using VNLib.Hashing.IdentityUtility; using VNLib.Utils.Memory; using VNLib.Utils.Logging; -using VNLib.Hashing.IdentityUtility; using VNLib.Data.Caching; using VNLib.Data.Caching.Extensions; using VNLib.Data.Caching.ObjectCache; using VNLib.Net.Messaging.FBM.Client; using VNLib.Plugins.Extensions.Loading; + namespace VNLib.Plugins.Extensions.VNCache { public interface ICacheRefreshPolicy @@ -49,6 +51,7 @@ namespace VNLib.Plugins.Extensions.VNCache TimeSpan RefreshInterval { get; } } + /// /// A base class that manages /// @@ -81,8 +84,6 @@ namespace VNLib.Plugins.Extensions.VNCache _config = config; - Uri brokerUri = new(config.BrokerAddress!); - //Init the client with default settings FBMClientConfig conf = FBMDataCacheExtensions.GetDefaultConfig(MemoryUtil.Shared, config.MaxMessageSize!.Value, config.RequestTimeout, debugLog); @@ -90,30 +91,18 @@ namespace VNLib.Plugins.Extensions.VNCache //Add the configuration to the client Client.GetCacheConfiguration() - .WithBroker(brokerUri) - .WithTls(brokerUri.Scheme == Uri.UriSchemeHttps); + .WithTls(config.UseTls) + .WithInitialPeers(config.InitialNodes!); } - - public virtual async Task ConfigureServiceAsync(PluginBase plugin) + public Task ConfigureServiceAsync(PluginBase plugin) { - //Get keys async - Task clientPrivTask = plugin.TryGetSecretAsync("client_private_key").ToJsonWebKey(); - Task brokerPubTask = plugin.TryGetSecretAsync("broker_public_key").ToJsonWebKey(); - Task cachePubTask = plugin.TryGetSecretAsync("cache_public_key").ToJsonWebKey(); - - //Wait for all tasks to complete - _ = await Task.WhenAll(clientPrivTask, brokerPubTask, cachePubTask); - - ReadOnlyJsonWebKey clientPriv = await clientPrivTask ?? throw new KeyNotFoundException("Missing required secret client_private_key"); - ReadOnlyJsonWebKey brokerPub = await brokerPubTask ?? throw new KeyNotFoundException("Missing required secret broker_public_key"); - ReadOnlyJsonWebKey cachePub = await cachePubTask ?? throw new KeyNotFoundException("Missing required secret cache_public_key"); - - //Connection authentication methods + //Set authenticator Client.GetCacheConfiguration() - .WithVerificationKey(cachePub) - .WithSigningKey(clientPriv) - .WithBrokerVerificationKey(brokerPub); + .WithAuthenticator(new AuthManager(plugin)) + .WithErrorHandler(new DiscoveryErrHAndler(plugin.Log)); + + return Task.CompletedTask; } /* @@ -127,7 +116,7 @@ namespace VNLib.Plugins.Extensions.VNCache while (true) { //Load the server list - ICachePeerAdvertisment[]? servers; + ICacheNodeAdvertisment[]? servers; while (true) { try @@ -163,7 +152,7 @@ namespace VNLib.Plugins.Extensions.VNCache pluginLog.Debug("Connecting to random cache server"); //Connect to a random server - ICachePeerAdvertisment selected = await Client.ConnectToRandomCacheAsync(exitToken); + ICacheNodeAdvertisment selected = await Client.ConnectToRandomCacheAsync(exitToken); pluginLog.Debug("Connected to cache server {s}", selected.NodeId); //Set connection status flag @@ -255,5 +244,98 @@ namespace VNLib.Plugins.Extensions.VNCache ? throw new InvalidOperationException("The underlying client is not connected to a cache node") : Client!.AddOrUpdateObjectAsync(key, newKey, value, serialzer, cancellation); } + + + private sealed class AuthManager : ICacheAuthManager + { + + private IAsyncLazy _sigKey; + private IAsyncLazy _verKey; + + public AuthManager(PluginBase plugin) + { + //Lazy load keys + + //Get the signing key + _sigKey = plugin.GetSecretAsync("client_private_key").ToLazy(static r => r.GetJsonWebKey()); + + //Lazy load cache public key + _verKey = plugin.GetSecretAsync("cache_public_key").ToLazy(static r => r.GetJsonWebKey()); + } + + public async Task AwaitLazyKeyLoad() + { + await _sigKey; + await _verKey; + } + + /// + public IReadOnlyDictionary GetJwtHeader() + { + //Get the signing key jwt header + return _sigKey.Value.JwtHeader; + } + + /// + public void SignJwt(JsonWebToken jwt) + { + //Sign the jwt with signing key + jwt.SignFromJwk(_sigKey.Value); + } + + /// + public byte[] SignMessageHash(byte[] hash, HashAlg alg) + { + //try to get the rsa alg for the signing key + using RSA? rsa = _sigKey.Value.GetRSAPublicKey(); + if(rsa != null) + { + return rsa.SignHash(hash, alg.GetAlgName(), RSASignaturePadding.Pkcs1); + } + + //try to get the ecdsa alg for the signing key + using ECDsa? ecdsa = _sigKey.Value.GetECDsaPublicKey(); + if(ecdsa != null) + { + return ecdsa.SignHash(hash); + } + + throw new NotSupportedException("The signing key is not a valid RSA or ECDSA key"); + } + + /// + public bool VerifyJwt(JsonWebToken jwt) + { + return jwt.VerifyFromJwk(_verKey.Value); + } + + /// + public bool VerifyMessageHash(ReadOnlySpan hash, HashAlg alg, ReadOnlySpan signature) + { + //try to get the rsa alg for the signing key + using RSA? rsa = _verKey.Value.GetRSAPublicKey(); + if (rsa != null) + { + return rsa.VerifyHash(hash, signature, alg.GetAlgName(), RSASignaturePadding.Pkcs1); + } + + //try to get the ecdsa alg for the signing key + using ECDsa? ecdsa = _verKey.Value.GetECDsaPublicKey(); + if (ecdsa != null) + { + return ecdsa.VerifyHash(hash, signature); + } + + throw new NotSupportedException("The current key is not an RSA or ECDSA key and is not supported"); + } + } + + private sealed record class DiscoveryErrHAndler(ILogProvider Logger) : ICacheDiscoveryErrorHandler + { + public void OnDiscoveryError(ICacheNodeAdvertisment errorNode, Exception ex) + { + Logger.Error("Failed to discover nodes from server {s} cause:\n{err}", errorNode.NodeId, ex); + } + } } } \ No newline at end of file diff --git a/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClientConfig.cs b/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClientConfig.cs index 1d888ec..64d3e07 100644 --- a/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClientConfig.cs +++ b/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClientConfig.cs @@ -25,6 +25,7 @@ using System; using System.Text.Json.Serialization; +using VNLib.Data.Caching.Extensions; using VNLib.Plugins.Extensions.Loading; namespace VNLib.Plugins.Extensions.VNCache @@ -45,8 +46,8 @@ namespace VNLib.Plugins.Extensions.VNCache /// /// The broker server address /// - [JsonPropertyName("broker_address")] - public string? BrokerAddress { get; set; } + [JsonPropertyName("use_tls")] + public bool UseTls { get; set; } = true; /// /// The time (in seconds) to randomly delay polling the broker server @@ -77,6 +78,12 @@ namespace VNLib.Plugins.Extensions.VNCache /// internal TimeSpan RequestTimeout => TimeSpan.FromSeconds(RequestTimeoutSeconds!.Value); + /// + /// The initial peers to connect to + /// + [JsonPropertyName("initial_nodes")] + public InitialNode[]? InitialNodes { get; set; } + void IOnConfigValidation.Validate() { if (!MaxMessageSize.HasValue || MaxMessageSize.Value < 1) @@ -95,9 +102,42 @@ namespace VNLib.Plugins.Extensions.VNCache throw new ArgumentException("You must specify a positive integer FBM message timoeut", "request_timeout_sec"); } - if(!Uri.TryCreate(BrokerAddress, UriKind.RelativeOrAbsolute, out _)) + //Validate initial nodes + if (InitialNodes == null || InitialNodes.Length == 0) + { + throw new ArgumentException("You must specify at least one initial peer", "initial_peers"); + } + + foreach (InitialNode peer in InitialNodes) + { + _ = peer.ConnectEndpoint ?? throw new ArgumentException("You must specify a connect endpoint for each initial node", "initial_nodes"); + _ = peer.NodeId ?? throw new ArgumentException("You must specify a node id for each initial node", "initial_nodes"); + } + } + + public sealed record class InitialNode : ICacheNodeAdvertisment + { + [JsonIgnore] + public Uri ConnectEndpoint { get; private set; } + + [JsonIgnore] + public Uri? DiscoveryEndpoint { get; private set; } + + [JsonPropertyName("node_id")] + public string? NodeId { get; set; } + + [JsonPropertyName("connect_endpoint")] + public string? ConnectEndpointString + { + get => ConnectEndpoint.ToString(); + set => ConnectEndpoint = new Uri(value!); + } + + [JsonPropertyName("discovery_endpoint")] + public string? DiscoveryEndpointString { - throw new ArgumentException("You must specify a valid HTTP uri broker address", "broker_address"); + get => DiscoveryEndpoint?.ToString(); + set => DiscoveryEndpoint = value == null ? null : new Uri(value); } } } -- cgit