aboutsummaryrefslogtreecommitdiff
path: root/lib/VNLib.Plugins.Extensions.VNCache
diff options
context:
space:
mode:
authorLibravatar vnugent <public@vaughnnugent.com>2023-06-22 21:16:28 -0400
committerLibravatar vnugent <public@vaughnnugent.com>2023-06-22 21:16:28 -0400
commit1a8ab1457244d15b19ddcc94958f645f5ec2abc7 (patch)
tree3994806e0737cf6f519a72cca8836c6e81eac7e2 /lib/VNLib.Plugins.Extensions.VNCache
parentdc0fc53fd3c3f6c32c8b0d063922c7018fa2c48f (diff)
Save checkpoint
Diffstat (limited to 'lib/VNLib.Plugins.Extensions.VNCache')
-rw-r--r--lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs132
-rw-r--r--lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClientConfig.cs48
2 files changed, 151 insertions, 29 deletions
diff --git a/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs b/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs
index 9f5ccfe..f4f059b 100644
--- a/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs
+++ b/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs
@@ -31,15 +31,17 @@ using System.Net.WebSockets;
using System.Collections.Generic;
using System.Security.Cryptography;
+using VNLib.Hashing;
+using VNLib.Hashing.IdentityUtility;
using VNLib.Utils.Memory;
using VNLib.Utils.Logging;
-using VNLib.Hashing.IdentityUtility;
using VNLib.Data.Caching;
using VNLib.Data.Caching.Extensions;
using VNLib.Data.Caching.ObjectCache;
using VNLib.Net.Messaging.FBM.Client;
using VNLib.Plugins.Extensions.Loading;
+
namespace VNLib.Plugins.Extensions.VNCache
{
public interface ICacheRefreshPolicy
@@ -49,6 +51,7 @@ namespace VNLib.Plugins.Extensions.VNCache
TimeSpan RefreshInterval { get; }
}
+
/// <summary>
/// A base class that manages
/// </summary>
@@ -81,8 +84,6 @@ namespace VNLib.Plugins.Extensions.VNCache
_config = config;
- Uri brokerUri = new(config.BrokerAddress!);
-
//Init the client with default settings
FBMClientConfig conf = FBMDataCacheExtensions.GetDefaultConfig(MemoryUtil.Shared, config.MaxMessageSize!.Value, config.RequestTimeout, debugLog);
@@ -90,30 +91,18 @@ namespace VNLib.Plugins.Extensions.VNCache
//Add the configuration to the client
Client.GetCacheConfiguration()
- .WithBroker(brokerUri)
- .WithTls(brokerUri.Scheme == Uri.UriSchemeHttps);
+ .WithTls(config.UseTls)
+ .WithInitialPeers(config.InitialNodes!);
}
-
- public virtual async Task ConfigureServiceAsync(PluginBase plugin)
+ public Task ConfigureServiceAsync(PluginBase plugin)
{
- //Get keys async
- Task<ReadOnlyJsonWebKey?> clientPrivTask = plugin.TryGetSecretAsync("client_private_key").ToJsonWebKey();
- Task<ReadOnlyJsonWebKey?> brokerPubTask = plugin.TryGetSecretAsync("broker_public_key").ToJsonWebKey();
- Task<ReadOnlyJsonWebKey?> cachePubTask = plugin.TryGetSecretAsync("cache_public_key").ToJsonWebKey();
-
- //Wait for all tasks to complete
- _ = await Task.WhenAll(clientPrivTask, brokerPubTask, cachePubTask);
-
- ReadOnlyJsonWebKey clientPriv = await clientPrivTask ?? throw new KeyNotFoundException("Missing required secret client_private_key");
- ReadOnlyJsonWebKey brokerPub = await brokerPubTask ?? throw new KeyNotFoundException("Missing required secret broker_public_key");
- ReadOnlyJsonWebKey cachePub = await cachePubTask ?? throw new KeyNotFoundException("Missing required secret cache_public_key");
-
- //Connection authentication methods
+ //Set authenticator
Client.GetCacheConfiguration()
- .WithVerificationKey(cachePub)
- .WithSigningKey(clientPriv)
- .WithBrokerVerificationKey(brokerPub);
+ .WithAuthenticator(new AuthManager(plugin))
+ .WithErrorHandler(new DiscoveryErrHAndler(plugin.Log));
+
+ return Task.CompletedTask;
}
/*
@@ -127,7 +116,7 @@ namespace VNLib.Plugins.Extensions.VNCache
while (true)
{
//Load the server list
- ICachePeerAdvertisment[]? servers;
+ ICacheNodeAdvertisment[]? servers;
while (true)
{
try
@@ -163,7 +152,7 @@ namespace VNLib.Plugins.Extensions.VNCache
pluginLog.Debug("Connecting to random cache server");
//Connect to a random server
- ICachePeerAdvertisment selected = await Client.ConnectToRandomCacheAsync(exitToken);
+ ICacheNodeAdvertisment selected = await Client.ConnectToRandomCacheAsync(exitToken);
pluginLog.Debug("Connected to cache server {s}", selected.NodeId);
//Set connection status flag
@@ -255,5 +244,98 @@ namespace VNLib.Plugins.Extensions.VNCache
? throw new InvalidOperationException("The underlying client is not connected to a cache node")
: Client!.AddOrUpdateObjectAsync(key, newKey, value, serialzer, cancellation);
}
+
+
+ private sealed class AuthManager : ICacheAuthManager
+ {
+
+ private IAsyncLazy<ReadOnlyJsonWebKey> _sigKey;
+ private IAsyncLazy<ReadOnlyJsonWebKey> _verKey;
+
+ public AuthManager(PluginBase plugin)
+ {
+ //Lazy load keys
+
+ //Get the signing key
+ _sigKey = plugin.GetSecretAsync("client_private_key").ToLazy(static r => r.GetJsonWebKey());
+
+ //Lazy load cache public key
+ _verKey = plugin.GetSecretAsync("cache_public_key").ToLazy(static r => r.GetJsonWebKey());
+ }
+
+ public async Task AwaitLazyKeyLoad()
+ {
+ await _sigKey;
+ await _verKey;
+ }
+
+ ///<inheritdoc/>
+ public IReadOnlyDictionary<string, string?> GetJwtHeader()
+ {
+ //Get the signing key jwt header
+ return _sigKey.Value.JwtHeader;
+ }
+
+ ///<inheritdoc/>
+ public void SignJwt(JsonWebToken jwt)
+ {
+ //Sign the jwt with signing key
+ jwt.SignFromJwk(_sigKey.Value);
+ }
+
+ ///<inheritdoc/>
+ public byte[] SignMessageHash(byte[] hash, HashAlg alg)
+ {
+ //try to get the rsa alg for the signing key
+ using RSA? rsa = _sigKey.Value.GetRSAPublicKey();
+ if(rsa != null)
+ {
+ return rsa.SignHash(hash, alg.GetAlgName(), RSASignaturePadding.Pkcs1);
+ }
+
+ //try to get the ecdsa alg for the signing key
+ using ECDsa? ecdsa = _sigKey.Value.GetECDsaPublicKey();
+ if(ecdsa != null)
+ {
+ return ecdsa.SignHash(hash);
+ }
+
+ throw new NotSupportedException("The signing key is not a valid RSA or ECDSA key");
+ }
+
+ ///<inheritdoc/>
+ public bool VerifyJwt(JsonWebToken jwt)
+ {
+ return jwt.VerifyFromJwk(_verKey.Value);
+ }
+
+ ///<inheritdoc/>
+ public bool VerifyMessageHash(ReadOnlySpan<byte> hash, HashAlg alg, ReadOnlySpan<byte> signature)
+ {
+ //try to get the rsa alg for the signing key
+ using RSA? rsa = _verKey.Value.GetRSAPublicKey();
+ if (rsa != null)
+ {
+ return rsa.VerifyHash(hash, signature, alg.GetAlgName(), RSASignaturePadding.Pkcs1);
+ }
+
+ //try to get the ecdsa alg for the signing key
+ using ECDsa? ecdsa = _verKey.Value.GetECDsaPublicKey();
+ if (ecdsa != null)
+ {
+ return ecdsa.VerifyHash(hash, signature);
+ }
+
+ throw new NotSupportedException("The current key is not an RSA or ECDSA key and is not supported");
+ }
+ }
+
+ private sealed record class DiscoveryErrHAndler(ILogProvider Logger) : ICacheDiscoveryErrorHandler
+ {
+ public void OnDiscoveryError(ICacheNodeAdvertisment errorNode, Exception ex)
+ {
+ Logger.Error("Failed to discover nodes from server {s} cause:\n{err}", errorNode.NodeId, ex);
+ }
+ }
}
} \ No newline at end of file
diff --git a/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClientConfig.cs b/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClientConfig.cs
index 1d888ec..64d3e07 100644
--- a/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClientConfig.cs
+++ b/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClientConfig.cs
@@ -25,6 +25,7 @@
using System;
using System.Text.Json.Serialization;
+using VNLib.Data.Caching.Extensions;
using VNLib.Plugins.Extensions.Loading;
namespace VNLib.Plugins.Extensions.VNCache
@@ -45,8 +46,8 @@ namespace VNLib.Plugins.Extensions.VNCache
/// <summary>
/// The broker server address
/// </summary>
- [JsonPropertyName("broker_address")]
- public string? BrokerAddress { get; set; }
+ [JsonPropertyName("use_tls")]
+ public bool UseTls { get; set; } = true;
/// <summary>
/// The time (in seconds) to randomly delay polling the broker server
@@ -77,6 +78,12 @@ namespace VNLib.Plugins.Extensions.VNCache
/// </summary>
internal TimeSpan RequestTimeout => TimeSpan.FromSeconds(RequestTimeoutSeconds!.Value);
+ /// <summary>
+ /// The initial peers to connect to
+ /// </summary>
+ [JsonPropertyName("initial_nodes")]
+ public InitialNode[]? InitialNodes { get; set; }
+
void IOnConfigValidation.Validate()
{
if (!MaxMessageSize.HasValue || MaxMessageSize.Value < 1)
@@ -95,9 +102,42 @@ namespace VNLib.Plugins.Extensions.VNCache
throw new ArgumentException("You must specify a positive integer FBM message timoeut", "request_timeout_sec");
}
- if(!Uri.TryCreate(BrokerAddress, UriKind.RelativeOrAbsolute, out _))
+ //Validate initial nodes
+ if (InitialNodes == null || InitialNodes.Length == 0)
+ {
+ throw new ArgumentException("You must specify at least one initial peer", "initial_peers");
+ }
+
+ foreach (InitialNode peer in InitialNodes)
+ {
+ _ = peer.ConnectEndpoint ?? throw new ArgumentException("You must specify a connect endpoint for each initial node", "initial_nodes");
+ _ = peer.NodeId ?? throw new ArgumentException("You must specify a node id for each initial node", "initial_nodes");
+ }
+ }
+
+ public sealed record class InitialNode : ICacheNodeAdvertisment
+ {
+ [JsonIgnore]
+ public Uri ConnectEndpoint { get; private set; }
+
+ [JsonIgnore]
+ public Uri? DiscoveryEndpoint { get; private set; }
+
+ [JsonPropertyName("node_id")]
+ public string? NodeId { get; set; }
+
+ [JsonPropertyName("connect_endpoint")]
+ public string? ConnectEndpointString
+ {
+ get => ConnectEndpoint.ToString();
+ set => ConnectEndpoint = new Uri(value!);
+ }
+
+ [JsonPropertyName("discovery_endpoint")]
+ public string? DiscoveryEndpointString
{
- throw new ArgumentException("You must specify a valid HTTP uri broker address", "broker_address");
+ get => DiscoveryEndpoint?.ToString();
+ set => DiscoveryEndpoint = value == null ? null : new Uri(value);
}
}
}