aboutsummaryrefslogtreecommitdiff
path: root/lib/VNLib.Data.Caching.Extensions/src
diff options
context:
space:
mode:
Diffstat (limited to 'lib/VNLib.Data.Caching.Extensions/src')
-rw-r--r--lib/VNLib.Data.Caching.Extensions/src/ActiveServer.cs12
-rw-r--r--lib/VNLib.Data.Caching.Extensions/src/CacheClientConfiguration.cs (renamed from lib/VNLib.Data.Caching.Extensions/src/ClientCacheConfiguration.cs)76
-rw-r--r--lib/VNLib.Data.Caching.Extensions/src/CacheListServerRequest.cs82
-rw-r--r--lib/VNLib.Data.Caching.Extensions/src/CacheNodeConfiguration.cs84
-rw-r--r--lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs431
-rw-r--r--lib/VNLib.Data.Caching.Extensions/src/ICachePeerAdvertisment.cs50
-rw-r--r--lib/VNLib.Data.Caching.Extensions/src/ListServerRequest.cs116
7 files changed, 649 insertions, 202 deletions
diff --git a/lib/VNLib.Data.Caching.Extensions/src/ActiveServer.cs b/lib/VNLib.Data.Caching.Extensions/src/ActiveServer.cs
index d9c463b..2d02491 100644
--- a/lib/VNLib.Data.Caching.Extensions/src/ActiveServer.cs
+++ b/lib/VNLib.Data.Caching.Extensions/src/ActiveServer.cs
@@ -27,14 +27,22 @@ using System.Text.Json.Serialization;
namespace VNLib.Data.Caching.Extensions
{
- public class ActiveServer
+ public class ActiveServer : ICachePeerAdvertisment
{
[JsonPropertyName("address")]
public string? HostName { get; set; }
- [JsonPropertyName("server_id")]
+
public string? ServerId { get; set; }
[JsonPropertyName("ip_address")]
public string? Ip { get; set; }
+
+ public Uri ConnectEndpoint { get; }
+
+ public Uri? DiscoveryEndpoint { get; }
+
+ [JsonPropertyName("server_id")]
+ public string NodeId { get; }
+
///<inheritdoc/>
public override int GetHashCode() => ServerId!.GetHashCode(StringComparison.OrdinalIgnoreCase);
///<inheritdoc/>
diff --git a/lib/VNLib.Data.Caching.Extensions/src/ClientCacheConfiguration.cs b/lib/VNLib.Data.Caching.Extensions/src/CacheClientConfiguration.cs
index ef44a29..05e4928 100644
--- a/lib/VNLib.Data.Caching.Extensions/src/ClientCacheConfiguration.cs
+++ b/lib/VNLib.Data.Caching.Extensions/src/CacheClientConfiguration.cs
@@ -31,23 +31,30 @@ using VNLib.Hashing.IdentityUtility;
namespace VNLib.Data.Caching.Extensions
{
+ public interface ICacheJwtManager
+ {
+ IReadOnlyDictionary<string, string?> GetJwtHeader();
+
+ void SignJwt(JsonWebToken jwt);
+
+ bool VerifyCache(JsonWebToken jwt);
+
+ bool VerifyBroker(JsonWebToken jwt);
+ }
+
/// <summary>
/// A fluent api configuration object for configuring a <see cref="FBMClient"/>
/// to connect to cache servers.
/// </summary>
- public sealed class ClientCacheConfiguration
+ public class CacheClientConfiguration : ICacheJwtManager, ICacheListServerRequest
{
- internal ReadOnlyJsonWebKey? SigningKey { get; private set; }
- internal ReadOnlyJsonWebKey? VerificationKey { get; private set; }
- internal ReadOnlyJsonWebKey? BrokerVerificationKey { get; private set; }
+ public ReadOnlyJsonWebKey? SigningKey { get; private set; }
+ public ReadOnlyJsonWebKey? VerificationKey { get; private set; }
+ public ReadOnlyJsonWebKey? BrokerVerificationKey { get; private set; }
- internal string ServerChallenge { get; } = RandomHash.GetRandomBase32(24);
- internal string? NodeId { get; set; }
- internal Uri? BrokerAddress { get; set; }
- internal bool UseTls { get; set; }
- internal ActiveServer[]? CacheServers { get; set; }
-
- internal IReadOnlyDictionary<string, string?> JwtHeader => SigningKey!.JwtHeader;
+ public Uri? DiscoveryEndpoint { get; private set; }
+ public bool UseTls { get; private set; }
+ internal ICachePeerAdvertisment[]? CacheServers { get; set; }
/// <summary>
/// Imports the private key used to sign messages
@@ -56,7 +63,7 @@ namespace VNLib.Data.Caching.Extensions
/// <returns>Chainable fluent object</returns>
/// <exception cref="ArgumentException"></exception>
/// <exception cref="CryptographicException"></exception>
- public ClientCacheConfiguration WithSigningCertificate(ReadOnlyJsonWebKey jwk)
+ public CacheClientConfiguration WithSigningKey(ReadOnlyJsonWebKey jwk)
{
SigningKey = jwk ?? throw new ArgumentNullException(nameof(jwk));
return this;
@@ -69,13 +76,13 @@ namespace VNLib.Data.Caching.Extensions
/// <returns>Chainable fluent object</returns>
/// <exception cref="ArgumentException"></exception>
/// <exception cref="CryptographicException"></exception>
- public ClientCacheConfiguration WithVerificationKey(ReadOnlyJsonWebKey jwk)
+ public CacheClientConfiguration WithVerificationKey(ReadOnlyJsonWebKey jwk)
{
VerificationKey = jwk ?? throw new ArgumentNullException(nameof(jwk));
return this;
}
- public ClientCacheConfiguration WithBrokerVerificationKey(ReadOnlyJsonWebKey jwk)
+ public CacheClientConfiguration WithBrokerVerificationKey(ReadOnlyJsonWebKey jwk)
{
BrokerVerificationKey = jwk ?? throw new ArgumentNullException(nameof(jwk));
return this;
@@ -86,50 +93,35 @@ namespace VNLib.Data.Caching.Extensions
/// </summary>
/// <param name="useTls">A value that indicates if connections should use TLS</param>
/// <returns>Chainable fluent object</returns>
- public ClientCacheConfiguration WithTls(bool useTls)
+ public CacheClientConfiguration WithTls(bool useTls)
{
UseTls = useTls;
return this;
}
+
/// <summary>
/// Specifies the broker address to discover cache nodes from
/// </summary>
/// <param name="brokerAddress">The address of the server broker</param>
/// <returns>Chainable fluent object</returns>
/// <exception cref="ArgumentNullException"></exception>
- public ClientCacheConfiguration WithBroker(Uri brokerAddress)
+ public CacheClientConfiguration WithBroker(Uri brokerAddress)
{
- this.BrokerAddress = brokerAddress ?? throw new ArgumentNullException(nameof(brokerAddress));
+ DiscoveryEndpoint = brokerAddress ?? throw new ArgumentNullException(nameof(brokerAddress));
return this;
}
+
+ ///<inheritdoc/>
+ public void SignJwt(JsonWebToken jwt) => jwt.SignFromJwk(SigningKey!);
- /// <summary>
- /// Specifies the current server's cluster node id. If this
- /// is a server connection attempting to listen for changes on the
- /// remote server, this id must be set and unique
- /// </summary>
- /// <param name="nodeId">The cluster node id of the current server</param>
- /// <returns>Chainable fluent object</returns>
- /// <exception cref="ArgumentNullException"></exception>
- public ClientCacheConfiguration WithNodeId(string nodeId)
- {
- this.NodeId = nodeId ?? throw new ArgumentNullException(nameof(nodeId));
- return this;
- }
+ ///<inheritdoc/>
+ public bool VerifyCache(JsonWebToken jwt) => jwt.VerifyFromJwk(VerificationKey!);
- internal void SignJwt(JsonWebToken jwt)
- {
- jwt.SignFromJwk(SigningKey);
- }
+ ///<inheritdoc/>
+ public bool VerifyBroker(JsonWebToken jwt) => jwt.VerifyFromJwk(BrokerVerificationKey!);
- internal bool VerifyCache(JsonWebToken jwt)
- {
- return jwt.VerifyFromJwk(VerificationKey);
- }
+ ///<inheritdoc/>
+ public IReadOnlyDictionary<string, string?> GetJwtHeader() => SigningKey!.JwtHeader;
- internal bool VerifyBroker(JsonWebToken jwt)
- {
- return jwt.VerifyFromJwk(BrokerVerificationKey);
- }
}
}
diff --git a/lib/VNLib.Data.Caching.Extensions/src/CacheListServerRequest.cs b/lib/VNLib.Data.Caching.Extensions/src/CacheListServerRequest.cs
new file mode 100644
index 0000000..76d4ad8
--- /dev/null
+++ b/lib/VNLib.Data.Caching.Extensions/src/CacheListServerRequest.cs
@@ -0,0 +1,82 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Data.Caching.Extensions
+* File: ListServerRequest.cs
+*
+* ListServerRequest.cs is part of VNLib.Data.Caching.Extensions which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* VNLib.Data.Caching.Extensions is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Data.Caching.Extensions is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+using System.Collections.Generic;
+
+using VNLib.Utils;
+using VNLib.Hashing.IdentityUtility;
+
+namespace VNLib.Data.Caching.Extensions
+{
+ public interface ICacheListServerRequest : ICacheJwtManager
+ {
+ Uri DiscoveryEndpoint { get; }
+ }
+
+ /// <summary>
+ /// A request container for a ListServer request
+ /// </summary>
+ public sealed class CacheListServerRequest : ICacheListServerRequest
+ {
+ private readonly ICacheJwtManager _manager;
+
+
+ /// <summary>
+ /// The address of the broker server to connect to
+ /// </summary>
+ public Uri DiscoveryEndpoint { get; private set; }
+
+ public CacheListServerRequest(ICacheJwtManager keyManager, Uri? brokerAddress = null)
+ {
+ _manager = keyManager;
+ DiscoveryEndpoint = brokerAddress!;
+ }
+
+
+ /// <summary>
+ /// Sets the broker address for the request
+ /// </summary>
+ /// <param name="brokerAddr">The broker server's address to connect to</param>
+ /// <returns>A fluent chainable value</returns>
+ /// <exception cref="ArgumentNullException"></exception>
+ public CacheListServerRequest WithDiscoveryEndpoint(Uri brokerAddr)
+ {
+ DiscoveryEndpoint = brokerAddr ?? throw new ArgumentNullException(nameof(brokerAddr));
+ return this;
+ }
+
+ /// <inheritdoc/>
+ public void SignJwt(JsonWebToken jwt) => _manager.SignJwt(jwt);
+
+ /// <inheritdoc/>
+ public bool VerifyCache(JsonWebToken jwt) => _manager.VerifyCache(jwt);
+
+ /// <inheritdoc/>
+ public bool VerifyBroker(JsonWebToken jwt) => _manager.VerifyBroker(jwt);
+
+ /// <inheritdoc/>
+ public IReadOnlyDictionary<string, string?> GetJwtHeader() => _manager.GetJwtHeader();
+ }
+}
diff --git a/lib/VNLib.Data.Caching.Extensions/src/CacheNodeConfiguration.cs b/lib/VNLib.Data.Caching.Extensions/src/CacheNodeConfiguration.cs
new file mode 100644
index 0000000..21a99e1
--- /dev/null
+++ b/lib/VNLib.Data.Caching.Extensions/src/CacheNodeConfiguration.cs
@@ -0,0 +1,84 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Data.Caching.Extensions
+* File: CacheNodeConfiguration.cs
+*
+* CacheNodeConfiguration.cs is part of VNLib.Data.Caching.Extensions which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* VNLib.Data.Caching.Extensions is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Data.Caching.Extensions is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+
+
+namespace VNLib.Data.Caching.Extensions
+{
+
+ public class CacheNodeConfiguration: CacheClientConfiguration, ICachePeerAdvertisment
+ {
+ /// <summary>
+ /// The address for clients to connect to
+ /// </summary>
+ public Uri? ConnectEndpoint { get; private set; }
+
+ /// <summary>
+ /// Whether or not to advertise ourself to peer nodes
+ /// </summary>
+ public bool BroadcastAdverisment { get; private set; }
+
+ /// <summary>
+ /// Define the endpoint for clients to connect to to discover
+ /// other discovertable nodes
+ /// </summary>
+ public Uri? DiscoveryEndpoint { get; private set; }
+
+ /// <summary>
+ /// Sets the full address of our cache endpoint for clients to connect to
+ /// </summary>
+ /// <param name="connectUri">The uri clients will attempt to connect to</param>
+ public CacheNodeConfiguration WithCacheEndpoint(Uri connectUri)
+ {
+ ConnectEndpoint = connectUri;
+ return this;
+ }
+
+ public CacheNodeConfiguration EnableAdvertisment(bool enable, Uri? discoveryEndpoint)
+ {
+ BroadcastAdverisment = enable;
+ DiscoveryEndpoint = discoveryEndpoint;
+ return this;
+ }
+
+ ///<inheritdoc/>
+ public string NodeId { get; private set; } = null!;
+
+ /// <summary>
+ /// Specifies the current server's cluster node id. If this
+ /// is a server connection attempting to listen for changes on the
+ /// remote server, this id must be set and unique
+ /// </summary>
+ /// <param name="nodeId">The cluster node id of the current server</param>
+ /// <returns>Chainable fluent object</returns>
+ /// <exception cref="ArgumentNullException"></exception>
+ public CacheClientConfiguration WithNodeId(string nodeId)
+ {
+ NodeId = nodeId ?? throw new ArgumentNullException(nameof(nodeId));
+ return this;
+ }
+
+ }
+}
diff --git a/lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs b/lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs
index 8ee02f7..9efe16a 100644
--- a/lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs
+++ b/lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs
@@ -32,6 +32,7 @@ using System.Threading;
using System.Threading.Tasks;
using System.Collections.Generic;
using System.Security.Cryptography;
+using System.Text.Json.Serialization;
using System.Runtime.CompilerServices;
using RestSharp;
@@ -45,12 +46,12 @@ using VNLib.Utils.Extensions;
using VNLib.Net.Rest.Client;
using VNLib.Net.Messaging.FBM;
using VNLib.Net.Messaging.FBM.Client;
-
using ContentType = VNLib.Net.Http.ContentType;
+
namespace VNLib.Data.Caching.Extensions
{
-
+
/// <summary>
/// Provides extension methods for FBM data caching using
/// cache servers and brokers
@@ -61,11 +62,22 @@ namespace VNLib.Data.Caching.Extensions
/// The websocket sub-protocol to use when connecting to cache servers
/// </summary>
public const string CACHE_WS_SUB_PROCOL = "object-cache";
+
/// <summary>
/// The default cache message header size
/// </summary>
public const int MAX_FBM_MESSAGE_HEADER_SIZE = 1024;
+ /// <summary>
+ /// The client nonce signature http header name
+ /// </summary>
+ public const string X_UPGRADE_SIG_HEADER = "X-Cache-Upgrade-Sig";
+
+ /// <summary>
+ /// The advertisment header for cache node discovery
+ /// </summary>
+ public const string X_NODE_DISCOVERY_HEADER = "X-Cache-Node-Discovery";
+
private static readonly RestClientPool ClientPool = new(2,new RestClientOptions()
{
MaxTimeout = 10 * 1000,
@@ -75,7 +87,7 @@ namespace VNLib.Data.Caching.Extensions
ThrowOnAnyError = true,
});
- private static readonly ConditionalWeakTable<FBMClient, ClientCacheConfiguration> ClientCacheConfig = new();
+ private static readonly ConditionalWeakTable<FBMClient, CacheClientConfiguration> ClientCacheConfig = new();
/// <summary>
/// Gets a <see cref="FBMClientConfig"/> preconfigured object caching
@@ -128,6 +140,59 @@ namespace VNLib.Data.Caching.Extensions
}
/// <summary>
+ /// Creats a new <see cref="CacheListServerRequest"/> from an existing <see cref="CacheClientConfiguration"/>
+ /// </summary>
+ /// <param name="conf">The prepared client configuration</param>
+ /// <returns>The new <see cref="CacheListServerRequest"/></returns>
+ public static CacheListServerRequest GetListMessage(this CacheClientConfiguration conf)
+ {
+ return new(conf, conf.DiscoveryEndpoint);
+ }
+
+ /// <summary>
+ /// Discovers peer nodes from a given initial peer and returns a list of discovered nodes. If the config
+ /// is for a cache peer node, the current peer is removed from the list of discovered nodes.
+ /// </summary>
+ /// <param name="cacheConfig"></param>
+ /// <param name="initialPeer">The initial peer to discover nodes from</param>
+ /// <param name="cancellation">A token to cancel the discovery operation</param>
+ /// <returns>The collection of discovered nodes</returns>
+ /// <exception cref="ArgumentNullException"></exception>
+ public static async Task<ICachePeerAdvertisment[]?> DiscoverClusterNodesAsync(
+ this CacheClientConfiguration cacheConfig,
+ ICachePeerAdvertisment initialPeer,
+ CancellationToken cancellation
+ )
+ {
+ _ = initialPeer?.DiscoveryEndpoint ?? throw new ArgumentException("Advertisment does not expose an advertisment endpoint");
+
+ //Create list request
+ CacheListServerRequest request = cacheConfig.GetListMessage();
+
+ //Override with the initial peer's discovery endpoint
+ request.WithDiscoveryEndpoint(initialPeer.DiscoveryEndpoint);
+
+ //Get the list of servers
+ ICachePeerAdvertisment[]? servers = await ListServersAsync(request, cancellation);
+
+ if (servers == null)
+ {
+ return null;
+ }
+
+ if(cacheConfig is CacheNodeConfiguration cnc)
+ {
+ //Filter out the current node
+ return servers.Where(s => !cnc.NodeId.Equals(s.NodeId, StringComparison.OrdinalIgnoreCase)).ToArray();
+ }
+ else
+ {
+ //Do not filter
+ return servers;
+ }
+ }
+
+ /// <summary>
/// Contacts the cache broker to get a list of active servers to connect to
/// </summary>
/// <param name="request">The request message used to connecto the broker server</param>
@@ -135,27 +200,30 @@ namespace VNLib.Data.Caching.Extensions
/// <returns>The list of active servers</returns>
/// <exception cref="SecurityException"></exception>
/// <exception cref="ArgumentNullException"></exception>
- public static async Task<ActiveServer[]?> ListServersAsync(ListServerRequest request, CancellationToken cancellationToken = default)
+ public static async Task<ICachePeerAdvertisment[]?> ListServersAsync(ICacheListServerRequest request, CancellationToken cancellationToken = default)
{
_ = request ?? throw new ArgumentNullException(nameof(request));
string jwtBody;
+
//Build request jwt
using (JsonWebToken requestJwt = new())
{
- requestJwt.WriteHeader(request.JwtHeader);
+ requestJwt.WriteHeader(request.GetJwtHeader());
requestJwt.InitPayloadClaim()
.AddClaim("iat", DateTimeOffset.UtcNow.ToUnixTimeMilliseconds())
.AddClaim("nonce", RandomHash.GetRandomBase32(16))
.CommitClaims();
+
//sign the jwt
request.SignJwt(requestJwt);
+
//Compile the jwt
jwtBody = requestJwt.Compile();
}
//New list request
- RestRequest listRequest = new(request.BrokerAddress, Method.Post);
+ RestRequest listRequest = new(request.DiscoveryEndpoint, Method.Post);
//Add the jwt as a string to the request body
listRequest.AddStringBody(jwtBody, DataFormat.None);
@@ -177,24 +245,49 @@ namespace VNLib.Data.Caching.Extensions
data = response.RawBytes ?? throw new InvalidOperationException("No data returned from broker");
}
+
//Response is jwt
using JsonWebToken responseJwt = JsonWebToken.ParseRaw(data);
//Verify the jwt
- if (!request.VerifyJwt(responseJwt))
+ if (!request.VerifyBroker(responseJwt))
{
throw new SecurityException("Failed to verify the broker's challenge, cannot continue");
}
using JsonDocument doc = responseJwt.GetPayload();
- return doc.RootElement.GetProperty("servers").Deserialize<ActiveServer[]>();
+ return doc.RootElement.GetProperty("peers").Deserialize<Advertisment[]>();
+ }
+
+ /// <summary>
+ /// Registers the current node with the broker
+ /// </summary>
+ /// <returns>A task that completes when the regitration has been made successfully</returns>
+ /// <exception cref="ArgumentException"></exception>
+ public static async Task RegisterWithBrokerAsync(this CacheNodeConfiguration config, string authToken)
+ {
+ //Recover the certificate
+ ReadOnlyJsonWebKey cacheCert = config?.SigningKey ?? throw new ArgumentException(nameof(config.SigningKey));
+
+ //init broker request
+ using BrokerRegistrationRequest request = new();
+
+ request.WithBroker(config.DiscoveryEndpoint!)
+ .WithRegistrationAddress(config.ConnectEndpoint!.ToString())
+ .WithNodeId(config.NodeId!)
+ .WithSigningKey(cacheCert, true)
+ .WithHeartbeatToken(authToken);
+
+
+ //Send the request
+ await RegisterWithBrokerAsync(request);
}
/// <summary>
/// Registers the current server as active with the specified broker
/// </summary>
/// <param name="registration">The registration request</param>
- public static async Task ResgisterWithBrokerAsync(BrokerRegistrationRequest registration)
+ public static async Task RegisterWithBrokerAsync(BrokerRegistrationRequest registration)
{
_ = registration ?? throw new ArgumentNullException(nameof(registration));
_ = registration.HeartbeatToken ?? throw new ArgumentException("Missing required heartbeat access token");
@@ -220,17 +313,20 @@ namespace VNLib.Data.Caching.Extensions
//Compile and save
requestData = jwt.Compile();
}
+
//Create reg request message
RestRequest regRequest = new(registration.BrokerAddress);
regRequest.AddStringBody(requestData, DataFormat.None);
regRequest.AddHeader("Content-Type", "text/plain");
+
//Rent client
using ClientContract cc = ClientPool.Lease();
+
//Exec the regitration request
RestResponse response = await cc.Resource.ExecutePutAsync(regRequest);
response.ThrowIfError();
- }
-
+ }
+
/// <summary>
/// Allows for configuration of an <see cref="FBMClient"/>
@@ -238,7 +334,31 @@ namespace VNLib.Data.Caching.Extensions
/// </summary>
/// <param name="client"></param>
/// <returns>A fluent api configuration builder for the current client</returns>
- public static ClientCacheConfiguration GetCacheConfiguration(this FBMClient client) => ClientCacheConfig.GetOrCreateValue(client);
+ public static CacheClientConfiguration GetCacheConfiguration(this FBMClient client) => ClientCacheConfig.GetOrCreateValue(client);
+
+ /// <summary>
+ /// Explicitly set the client cache configuration for the current client
+ /// </summary>
+ /// <param name="client"></param>
+ /// <param name="config">The cache node configuration</param>
+ /// <returns>The config instance</returns>
+ public static CacheClientConfiguration SetCacheConfiguration(this FBMClient client, CacheClientConfiguration config)
+ {
+ ClientCacheConfig.AddOrUpdate(client, config);
+ return config;
+ }
+
+ /// <summary>
+ /// Explicitly set the cache node configuration for the current client
+ /// </summary>
+ /// <param name="client"></param>
+ /// <param name="nodeConfig">The cache node configuration</param>
+ /// <returns>The config instance</returns>
+ public static CacheNodeConfiguration SetCacheConfiguration(this FBMClient client, CacheNodeConfiguration nodeConfig)
+ {
+ ClientCacheConfig.AddOrUpdate(client, nodeConfig);
+ return nodeConfig;
+ }
/// <summary>
/// Discovers cache nodes in the broker configured for the current client.
@@ -246,7 +366,10 @@ namespace VNLib.Data.Caching.Extensions
/// <param name="client"></param>
/// <param name="token">A token to cancel the discovery</param>
/// <returns>A task the resolves the list of active servers on the broker server</returns>
- public static Task<ActiveServer[]?> DiscoverCacheNodesAsync(this FBMClientWorkerBase client, CancellationToken token = default) => client.Client.DiscoverCacheNodesAsync(token);
+ public static Task<ICachePeerAdvertisment[]?> DiscoverCacheNodesAsync(this FBMClientWorkerBase client, CancellationToken token = default)
+ {
+ return client.Client.DiscoverCacheNodesAsync(token);
+ }
/// <summary>
/// Discovers cache nodes in the broker configured for the current client.
@@ -254,13 +377,13 @@ namespace VNLib.Data.Caching.Extensions
/// <param name="client"></param>
/// <param name="token">A token to cancel the discovery </param>
/// <returns>A task the resolves the list of active servers on the broker server</returns>
- public static async Task<ActiveServer[]?> DiscoverCacheNodesAsync(this FBMClient client, CancellationToken token = default)
+ public static async Task<ICachePeerAdvertisment[]?> DiscoverCacheNodesAsync(this FBMClient client, CancellationToken token = default)
{
- ClientCacheConfiguration conf = ClientCacheConfig.GetOrCreateValue(client);
- //Request from config
- using ListServerRequest req = ListServerRequest.FromConfig(conf);
+ //Get the stored client config
+ CacheClientConfiguration conf = ClientCacheConfig.GetOrCreateValue(client);
+
//List servers async
- return conf.CacheServers = await ListServersAsync(req, token);
+ return conf.CacheServers = await ListServersAsync(conf, token);
}
/// <summary>
@@ -274,16 +397,21 @@ namespace VNLib.Data.Caching.Extensions
public static async Task WaitForExitAsync(this FBMClient client, CancellationToken token = default)
{
client.LogDebug("Waiting for cache client to exit");
+
//Get task for cancellation
Task cancellation = token.WaitHandle.WaitAsync();
+
//Task for status handle
Task run = client.ConnectionStatusHandle.WaitAsync();
+
//Wait for cancellation or
_ = await Task.WhenAny(cancellation, run);
client.LogDebug("Disconnecting the cache client");
+
//Normal try to disconnect the socket
await client.DisconnectAsync(CancellationToken.None);
+
//Notify if cancelled
token.ThrowIfCancellationRequested();
}
@@ -300,14 +428,18 @@ namespace VNLib.Data.Caching.Extensions
/// <exception cref="ArgumentNullException"></exception>
/// <exception cref="SecurityException"></exception>
/// <exception cref="ObjectDisposedException"></exception>
- public static async Task<ActiveServer> ConnectToRandomCacheAsync(this FBMClient client, CancellationToken cancellation = default)
+ public static async Task<ICachePeerAdvertisment> ConnectToRandomCacheAsync(this FBMClient client, CancellationToken cancellation = default)
{
//Get stored config
- ClientCacheConfiguration conf = ClientCacheConfig.GetOrCreateValue(client);
+ CacheClientConfiguration conf = ClientCacheConfig.GetOrCreateValue(client);
+
//Select random
- ActiveServer? randomServer = conf.CacheServers?.SelectRandom();
- _ = randomServer ?? throw new ArgumentException("No servers detected, cannot connect");
+ ICachePeerAdvertisment? randomServer = conf.CacheServers?.SelectRandom()
+ ?? throw new ArgumentException("No servers detected, cannot connect");
+
await ConnectToCacheAsync(client, randomServer, cancellation);
+
+ //Return the random server we connected to
return randomServer;
}
@@ -324,59 +456,97 @@ namespace VNLib.Data.Caching.Extensions
/// <exception cref="ArgumentNullException"></exception>
/// <exception cref="SecurityException"></exception>
/// <exception cref="ObjectDisposedException"></exception>
- public static Task ConnectToCacheAsync(this FBMClient client, ActiveServer server, CancellationToken token = default)
+ public static Task ConnectToCacheAsync(this FBMClient client, ICachePeerAdvertisment server, CancellationToken token = default)
{
_ = client ?? throw new ArgumentNullException(nameof(client));
_ = server ?? throw new ArgumentNullException(nameof(server));
//Get stored config
- ClientCacheConfiguration conf = ClientCacheConfig.GetOrCreateValue(client);
+ CacheClientConfiguration conf = ClientCacheConfig.GetOrCreateValue(client);
//Connect to server (no server id because client not replication server)
return ConnectToCacheAsync(client, conf, server, token);
}
-
- private static async Task ConnectToCacheAsync(FBMClient client, ClientCacheConfiguration request, ActiveServer server, CancellationToken token = default)
+ /// <summary>
+ /// Connects to the specified server on the configured cache client
+ /// </summary>
+ /// <param name="client"></param>
+ /// <param name="server">The server to connect to</param>
+ /// <param name="token">A token to cancel the operation</param>
+ /// <param name="explicitConfig">Explicit cache configuration to use</param>
+ /// <returns>A task that resolves when the client is connected to the cache server</returns>
+ /// <exception cref="FBMException"></exception>
+ /// <exception cref="FBMServerNegiationException"></exception>
+ /// <exception cref="ArgumentException"></exception>
+ /// <exception cref="ArgumentNullException"></exception>
+ /// <exception cref="SecurityException"></exception>
+ /// <exception cref="ObjectDisposedException"></exception>
+ public static Task ConnectToCacheAsync(this FBMClient client, ICachePeerAdvertisment server, CacheClientConfiguration explicitConfig, CancellationToken token = default)
{
- //Construct server uri
- Uri serverUri = new(server.HostName!);
-
- //build ws uri
- UriBuilder uriBuilder = new(serverUri)
+ _ = client ?? throw new ArgumentNullException(nameof(client));
+ _ = server ?? throw new ArgumentNullException(nameof(server));
+
+ //Connect to server (no server id because client not replication server)
+ return ConnectToCacheAsync(client, explicitConfig, server, token);
+ }
+
+
+ private static async Task ConnectToCacheAsync(
+ FBMClient client,
+ CacheClientConfiguration config,
+ ICachePeerAdvertisment server,
+ CancellationToken token = default
+ )
+ {
+ //build ws uri from the connect endpoint
+ UriBuilder uriBuilder = new(server.ConnectEndpoint)
{
- Scheme = request.UseTls ? "wss://" : "ws://"
+ Scheme = config.UseTls ? "wss://" : "ws://"
};
-
+
+ string challenge = RandomHash.GetRandomBase32(24);
+
+ //See if the supplied config is for a cache node
+ CacheNodeConfiguration? cnc = config as CacheNodeConfiguration;
+
string jwtMessage;
//Init jwt for connecting to server
using (JsonWebToken jwt = new())
{
- jwt.WriteHeader(request.JwtHeader);
+ jwt.WriteHeader(config.GetJwtHeader());
//Init claim
JwtPayload claim = jwt.InitPayloadClaim();
- claim.AddClaim("chl", request.ServerChallenge);
+ claim.AddClaim("chl", challenge);
- if (!string.IsNullOrWhiteSpace(request.NodeId))
+ if (!string.IsNullOrWhiteSpace(cnc?.NodeId))
{
/*
* The unique node id so the other nodes know to load the
* proper event queue for the current server
*/
- claim.AddClaim("sub", request.NodeId);
+ claim.AddClaim("sub", cnc.NodeId);
}
claim.CommitClaims();
//Sign jwt
- request.SignJwt(jwt);
+ config.SignJwt(jwt);
//Compile to string
jwtMessage = jwt.Compile();
}
- RestRequest negotation = new(serverUri, Method.Get);
+ /*
+ * During a server negiation, the client makes an intial get request to the cache endpoint
+ * and passes some client negiation terms as a signed message to the server. The server then
+ * validates these values and returns a signed jwt with the server negiation terms.
+ *
+ * The response from the server is essentailly the 'access token'
+ */
+
+ RestRequest negotation = new(server.ConnectEndpoint, Method.Get);
//Set the jwt auth header for negotiation
negotation.AddHeader("Authorization", jwtMessage);
negotation.AddHeader("Accept", HttpHelpers.GetContentTypeString(ContentType.Text));
@@ -406,13 +576,13 @@ namespace VNLib.Data.Caching.Extensions
using (JsonWebToken jwt = JsonWebToken.Parse(authToken))
{
//Verify the jwt
- if (!request.VerifyCache(jwt))
+ if (!config.VerifyCache(jwt))
{
throw new SecurityException("Failed to verify the cache server's negotiation message, cannot continue");
}
//Confirm the server's buffer configuration
- ValidateServerNegotation(client, request.ServerChallenge, jwt);
+ ValidateServerNegotation(client, challenge, jwt);
}
client.LogDebug("Server negotiation validated, connecting to server");
@@ -420,6 +590,16 @@ namespace VNLib.Data.Caching.Extensions
//The client authorization header is the exact response
client.ClientSocket.Headers[HttpRequestHeader.Authorization] = authToken;
+ //Compute the signature of the upgrade token
+ client.ClientSocket.Headers[X_UPGRADE_SIG_HEADER] = GetBase64UpgradeSingature(authToken, config.SigningKey!);
+
+ //Check to see if adversize self is enabled
+ if (cnc?.BroadcastAdverisment == true)
+ {
+ //Set advertisment header
+ client.ClientSocket.Headers[X_NODE_DISCOVERY_HEADER] = GetAdvertismentHeader(cnc);
+ }
+
//Connect async
await client.ConnectAsync(uriBuilder.Uri, token);
}
@@ -475,16 +655,183 @@ namespace VNLib.Data.Caching.Extensions
}
}
+ /*
+ * Added layer to confirm that client that requested the negotation holds the private key
+ * compute a signature of the upgrade token and send it to the server to prove we hold the private key.
+ */
+
+ private static string GetBase64UpgradeSingature(string? token, ReadOnlyJsonWebKey key)
+ {
+ //try to get the ecdsa key first
+ using ECDsa? ec = key.GetECDsaPrivateKey();
+
+ if(ec != null)
+ {
+ //Compute hash of the token
+ byte[] hash = ManagedHash.ComputeHash(token, HashAlg.SHA256);
+
+ //Sign the hash
+ byte[] sig = ec.SignHash(hash, DSASignatureFormat.IeeeP1363FixedFieldConcatenation);
+
+ //Return the base64 string
+ return Convert.ToBase64String(sig);
+ }
+
+ //Check rsa next
+ using RSA? rsa = key.GetRSAPrivateKey();
+ if(rsa != null)
+ {
+ //Compute hash of the token
+ byte[] hash = ManagedHash.ComputeHash(token, HashAlg.SHA256);
+
+ //Sign the hash
+ byte[] sig = rsa.SignHash(hash, HashAlgorithmName.SHA256, RSASignaturePadding.Pkcs1);
+
+ //Return the base64 string
+ return Convert.ToBase64String(sig);
+ }
+
+ throw new CryptographicException("Cache JKW does not export a supported private key for upgrade challenges");
+ }
+
+ /// <summary>
+ /// Verifies the signed auth token against the given verification key
+ /// </summary>
+ /// <param name="signature">The base64 signature of the token</param>
+ /// <param name="token">The raw token to compute the hash of</param>
+ /// <param name="nodeConfig">The node configuration</param>
+ /// <returns>True if the singature matches, false otherwise</returns>
+ /// <exception cref="CryptographicException"></exception>
+ public static bool VerifyUpgradeToken(this CacheClientConfiguration nodeConfig, string signature, string token)
+ {
+ return VerifyUpgradeToken(signature, token, nodeConfig.VerificationKey);
+ }
+
+ /// <summary>
+ /// Verifies the signed auth token against the given verification key
+ /// </summary>
+ /// <param name="signature">The base64 signature of the token</param>
+ /// <param name="token">The raw token to compute the hash of</param>
+ /// <param name="verifcationKey">The key used to verify the singature with</param>
+ /// <returns>True if the singature matches, false otherwise</returns>
+ /// <exception cref="ArgumentNullException"></exception>
+ /// <exception cref="CryptographicException"></exception>
+ public static bool VerifyUpgradeToken(string signature, string token, ReadOnlyJsonWebKey verifcationKey)
+ {
+ _ = verifcationKey ?? throw new ArgumentNullException(nameof(verifcationKey));
+
+ //get the hash of the token
+ byte[] hash = ManagedHash.ComputeHash(token, HashAlg.SHA256);
+
+ //decode the signature
+ byte[] sig = Convert.FromBase64String(signature);
+
+ //try to get the ecdsa key first
+ using ECDsa? ec = verifcationKey.GetECDsaPublicKey();
+ if(ec != null)
+ {
+ //Verify the signature
+ return ec.VerifyHash(hash, sig, DSASignatureFormat.IeeeP1363FixedFieldConcatenation);
+ }
+
+ //Check rsa next
+ using RSA? rsa = verifcationKey.GetRSAPublicKey();
+ if(rsa != null)
+ {
+ //Verify the signature
+ return rsa.VerifyHash(hash, sig, HashAlgorithmName.SHA256, RSASignaturePadding.Pkcs1);
+ }
+
+ throw new CryptographicException("Cache JKW does not export a supported public key for upgrade challenges");
+ }
+
+ private static string GetAdvertismentHeader(CacheNodeConfiguration nodeConfiguration)
+ {
+ /*
+ * Create node advertisment message to publish to peer nodes
+ *
+ * these messages will allow other clients and peers to discover us
+ */
+
+ using JsonWebToken jwt = new();
+
+ //Get the jwt header
+ jwt.WriteHeader(nodeConfiguration.GetJwtHeader());
+
+ jwt.InitPayloadClaim()
+ .AddClaim("nonce", RandomHash.GetRandomBase32(16))
+ .AddClaim("iat", DateTimeOffset.UtcNow.ToUnixTimeSeconds())
+ .AddClaim("iss", nodeConfiguration.NodeId!)
+ .AddClaim("url", nodeConfiguration.ConnectEndpoint!.ToString())
+ //Optional discovery endpoint
+ .AddClaim("dis", nodeConfiguration.DiscoveryEndpoint?.ToString() ?? string.Empty)
+ .CommitClaims();
+
+ //Sign message
+ nodeConfiguration.SignJwt(jwt);
+
+ return jwt.Compile();
+ }
+
+ /// <summary>
+ /// Verifies the peer advertisment message
+ /// </summary>
+ /// <param name="config"></param>
+ /// <param name="message">The advertisment message to verify</param>
+ /// <returns>The advertisment message if successfully verified, or null otherwise</returns>
+ /// <exception cref="FormatException"></exception>
+ public static ICachePeerAdvertisment? VerifyPeerAdvertisment(this ICacheJwtManager config, string message)
+ {
+ using JsonWebToken jwt = JsonWebToken.Parse(message);
+
+ //Verify the signature
+ if (!config.VerifyCache(jwt))
+ {
+ return null;
+ }
+
+ //Get the payload
+ return jwt.GetPayload<Advertisment>();
+ }
+
+
/// <summary>
/// Selects a random server from a collection of active servers
/// </summary>
/// <param name="servers"></param>
/// <returns>A server selected at random</returns>
- public static ActiveServer SelectRandom(this ICollection<ActiveServer> servers)
+ public static ICachePeerAdvertisment SelectRandom(this ICollection<ICachePeerAdvertisment> servers)
{
//select random server
int randServer = RandomNumberGenerator.GetInt32(0, servers.Count);
return servers.ElementAt(randServer);
}
+
+
+ private class Advertisment : ICachePeerAdvertisment
+ {
+ [JsonIgnore]
+ public Uri? ConnectEndpoint { get; set; }
+
+ [JsonIgnore]
+ public Uri? DiscoveryEndpoint { get; set; }
+
+ [JsonPropertyName("iss")]
+ public string NodeId { get; set; }
+
+ [JsonPropertyName("url")]
+ public string? url
+ {
+ get => ConnectEndpoint?.ToString();
+ set => ConnectEndpoint = value == null ? null : new Uri(value);
+ }
+
+ [JsonPropertyName("dis")]
+ public string? dis
+ {
+ get => DiscoveryEndpoint?.ToString();
+ set => DiscoveryEndpoint = value == null ? null : new Uri(value);
+ }
+ }
}
}
diff --git a/lib/VNLib.Data.Caching.Extensions/src/ICachePeerAdvertisment.cs b/lib/VNLib.Data.Caching.Extensions/src/ICachePeerAdvertisment.cs
new file mode 100644
index 0000000..acf883e
--- /dev/null
+++ b/lib/VNLib.Data.Caching.Extensions/src/ICachePeerAdvertisment.cs
@@ -0,0 +1,50 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Data.Caching.Extensions
+* File: ICachePeerAdvertisment.cs
+*
+* ICachePeerAdvertisment.cs is part of VNLib.Data.Caching.Extensions which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* VNLib.Data.Caching.Extensions is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Data.Caching.Extensions is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+
+
+namespace VNLib.Data.Caching.Extensions
+{
+ /// <summary>
+ /// Represents a node that can be advertised to clients
+ /// </summary>
+ public interface ICachePeerAdvertisment
+ {
+ /// <summary>
+ /// The endpoint for clients to connect to to access the cache
+ /// </summary>
+ Uri ConnectEndpoint { get; }
+
+ /// <summary>
+ /// Gets the address for clients to connect to to discover other discovertable nodes
+ /// </summary>
+ Uri? DiscoveryEndpoint { get; }
+
+ /// <summary>
+ /// Gets the unique identifier for this node
+ /// </summary>
+ string NodeId { get; }
+ }
+}
diff --git a/lib/VNLib.Data.Caching.Extensions/src/ListServerRequest.cs b/lib/VNLib.Data.Caching.Extensions/src/ListServerRequest.cs
deleted file mode 100644
index fd25925..0000000
--- a/lib/VNLib.Data.Caching.Extensions/src/ListServerRequest.cs
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
-* Copyright (c) 2023 Vaughn Nugent
-*
-* Library: VNLib
-* Package: VNLib.Data.Caching.Extensions
-* File: ListServerRequest.cs
-*
-* ListServerRequest.cs is part of VNLib.Data.Caching.Extensions which is part of the larger
-* VNLib collection of libraries and utilities.
-*
-* VNLib.Data.Caching.Extensions is free software: you can redistribute it and/or modify
-* it under the terms of the GNU Affero General Public License as
-* published by the Free Software Foundation, either version 3 of the
-* License, or (at your option) any later version.
-*
-* VNLib.Data.Caching.Extensions is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-* GNU Affero General Public License for more details.
-*
-* You should have received a copy of the GNU Affero General Public License
-* along with this program. If not, see https://www.gnu.org/licenses/.
-*/
-
-using System;
-using System.Collections.Generic;
-
-using VNLib.Utils;
-using VNLib.Hashing.IdentityUtility;
-
-namespace VNLib.Data.Caching.Extensions
-{
- /// <summary>
- /// A request container for a ListServer request
- /// </summary>
- public sealed class ListServerRequest : VnDisposeable
- {
- private readonly bool _ownsKeys;
-
- private ReadOnlyJsonWebKey? VerificationKey;
- private ReadOnlyJsonWebKey? SigningAlg;
-
- /// <summary>
- /// The address of the broker server to connect to
- /// </summary>
- public Uri BrokerAddress { get; }
-
- public ListServerRequest(Uri brokerAddress)
- {
- BrokerAddress = brokerAddress;
- _ownsKeys = true;
- }
-
- private ListServerRequest(ClientCacheConfiguration conf)
- {
- //Broker verification key is required
- VerificationKey = conf.BrokerVerificationKey;
- SigningAlg = conf.SigningKey;
- BrokerAddress = conf.BrokerAddress ?? throw new ArgumentException("Broker address must be specified");
- _ownsKeys = false;
- }
-
- internal static ListServerRequest FromConfig(ClientCacheConfiguration conf) => new (conf);
-
- /// <summary>
- /// Sets the public key used to verify the signature of the response.
- /// </summary>
- /// <param name="jwk">The key used to verify messages </param>
- public ListServerRequest WithVerificationKey(ReadOnlyJsonWebKey jwk)
- {
- VerificationKey = jwk ?? throw new ArgumentNullException(nameof(jwk));
- return this;
- }
- /// <summary>
- /// Sets the private key used to sign the request.
- /// </summary>
- /// <param name="jwk">The <see cref="ReadOnlyJsonWebKey"/> containing the private key used to sign the message</param>
- /// <exception cref="ArgumentNullException"></exception>
- public ListServerRequest WithSigningKey(ReadOnlyJsonWebKey jwk)
- {
- SigningAlg = jwk ?? throw new ArgumentNullException(nameof(jwk));
- return this;
- }
-
- /// <summary>
- /// Signs the <see cref="JsonWebToken"/> using the private key.
- /// </summary>
- /// <param name="jwt">The message to sign</param>
- internal void SignJwt(JsonWebToken jwt)
- {
- jwt.SignFromJwk(SigningAlg);
- }
-
- /// <summary>
- /// Verifies the signature of the <see cref="JsonWebToken"/>
- /// </summary>
- /// <param name="jwt"></param>
- /// <returns>A value that indicates if the signature is verified</returns>
- internal bool VerifyJwt(JsonWebToken jwt)
- {
- return jwt.VerifyFromJwk(VerificationKey);
- }
-
- internal IReadOnlyDictionary<string, string?> JwtHeader => SigningAlg!.JwtHeader;
-
- ///<inheritdoc/>
- protected override void Free()
- {
- if (_ownsKeys)
- {
- VerificationKey?.Dispose();
- SigningAlg?.Dispose();
- }
- }
- }
-}