diff options
Diffstat (limited to 'lib')
-rw-r--r-- | lib/VNLib.Data.Caching.Extensions/src/ActiveServer.cs | 12 | ||||
-rw-r--r-- | lib/VNLib.Data.Caching.Extensions/src/CacheClientConfiguration.cs (renamed from lib/VNLib.Data.Caching.Extensions/src/ClientCacheConfiguration.cs) | 76 | ||||
-rw-r--r-- | lib/VNLib.Data.Caching.Extensions/src/CacheListServerRequest.cs | 82 | ||||
-rw-r--r-- | lib/VNLib.Data.Caching.Extensions/src/CacheNodeConfiguration.cs | 84 | ||||
-rw-r--r-- | lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs | 431 | ||||
-rw-r--r-- | lib/VNLib.Data.Caching.Extensions/src/ICachePeerAdvertisment.cs | 50 | ||||
-rw-r--r-- | lib/VNLib.Data.Caching.Extensions/src/ListServerRequest.cs | 116 | ||||
-rw-r--r-- | lib/VNLib.Data.Caching/src/ClientExtensions.cs | 3 | ||||
-rw-r--r-- | lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs | 8 |
9 files changed, 654 insertions, 208 deletions
diff --git a/lib/VNLib.Data.Caching.Extensions/src/ActiveServer.cs b/lib/VNLib.Data.Caching.Extensions/src/ActiveServer.cs index d9c463b..2d02491 100644 --- a/lib/VNLib.Data.Caching.Extensions/src/ActiveServer.cs +++ b/lib/VNLib.Data.Caching.Extensions/src/ActiveServer.cs @@ -27,14 +27,22 @@ using System.Text.Json.Serialization; namespace VNLib.Data.Caching.Extensions { - public class ActiveServer + public class ActiveServer : ICachePeerAdvertisment { [JsonPropertyName("address")] public string? HostName { get; set; } - [JsonPropertyName("server_id")] + public string? ServerId { get; set; } [JsonPropertyName("ip_address")] public string? Ip { get; set; } + + public Uri ConnectEndpoint { get; } + + public Uri? DiscoveryEndpoint { get; } + + [JsonPropertyName("server_id")] + public string NodeId { get; } + ///<inheritdoc/> public override int GetHashCode() => ServerId!.GetHashCode(StringComparison.OrdinalIgnoreCase); ///<inheritdoc/> diff --git a/lib/VNLib.Data.Caching.Extensions/src/ClientCacheConfiguration.cs b/lib/VNLib.Data.Caching.Extensions/src/CacheClientConfiguration.cs index ef44a29..05e4928 100644 --- a/lib/VNLib.Data.Caching.Extensions/src/ClientCacheConfiguration.cs +++ b/lib/VNLib.Data.Caching.Extensions/src/CacheClientConfiguration.cs @@ -31,23 +31,30 @@ using VNLib.Hashing.IdentityUtility; namespace VNLib.Data.Caching.Extensions { + public interface ICacheJwtManager + { + IReadOnlyDictionary<string, string?> GetJwtHeader(); + + void SignJwt(JsonWebToken jwt); + + bool VerifyCache(JsonWebToken jwt); + + bool VerifyBroker(JsonWebToken jwt); + } + /// <summary> /// A fluent api configuration object for configuring a <see cref="FBMClient"/> /// to connect to cache servers. /// </summary> - public sealed class ClientCacheConfiguration + public class CacheClientConfiguration : ICacheJwtManager, ICacheListServerRequest { - internal ReadOnlyJsonWebKey? SigningKey { get; private set; } - internal ReadOnlyJsonWebKey? VerificationKey { get; private set; } - internal ReadOnlyJsonWebKey? BrokerVerificationKey { get; private set; } + public ReadOnlyJsonWebKey? SigningKey { get; private set; } + public ReadOnlyJsonWebKey? VerificationKey { get; private set; } + public ReadOnlyJsonWebKey? BrokerVerificationKey { get; private set; } - internal string ServerChallenge { get; } = RandomHash.GetRandomBase32(24); - internal string? NodeId { get; set; } - internal Uri? BrokerAddress { get; set; } - internal bool UseTls { get; set; } - internal ActiveServer[]? CacheServers { get; set; } - - internal IReadOnlyDictionary<string, string?> JwtHeader => SigningKey!.JwtHeader; + public Uri? DiscoveryEndpoint { get; private set; } + public bool UseTls { get; private set; } + internal ICachePeerAdvertisment[]? CacheServers { get; set; } /// <summary> /// Imports the private key used to sign messages @@ -56,7 +63,7 @@ namespace VNLib.Data.Caching.Extensions /// <returns>Chainable fluent object</returns> /// <exception cref="ArgumentException"></exception> /// <exception cref="CryptographicException"></exception> - public ClientCacheConfiguration WithSigningCertificate(ReadOnlyJsonWebKey jwk) + public CacheClientConfiguration WithSigningKey(ReadOnlyJsonWebKey jwk) { SigningKey = jwk ?? throw new ArgumentNullException(nameof(jwk)); return this; @@ -69,13 +76,13 @@ namespace VNLib.Data.Caching.Extensions /// <returns>Chainable fluent object</returns> /// <exception cref="ArgumentException"></exception> /// <exception cref="CryptographicException"></exception> - public ClientCacheConfiguration WithVerificationKey(ReadOnlyJsonWebKey jwk) + public CacheClientConfiguration WithVerificationKey(ReadOnlyJsonWebKey jwk) { VerificationKey = jwk ?? throw new ArgumentNullException(nameof(jwk)); return this; } - public ClientCacheConfiguration WithBrokerVerificationKey(ReadOnlyJsonWebKey jwk) + public CacheClientConfiguration WithBrokerVerificationKey(ReadOnlyJsonWebKey jwk) { BrokerVerificationKey = jwk ?? throw new ArgumentNullException(nameof(jwk)); return this; @@ -86,50 +93,35 @@ namespace VNLib.Data.Caching.Extensions /// </summary> /// <param name="useTls">A value that indicates if connections should use TLS</param> /// <returns>Chainable fluent object</returns> - public ClientCacheConfiguration WithTls(bool useTls) + public CacheClientConfiguration WithTls(bool useTls) { UseTls = useTls; return this; } + /// <summary> /// Specifies the broker address to discover cache nodes from /// </summary> /// <param name="brokerAddress">The address of the server broker</param> /// <returns>Chainable fluent object</returns> /// <exception cref="ArgumentNullException"></exception> - public ClientCacheConfiguration WithBroker(Uri brokerAddress) + public CacheClientConfiguration WithBroker(Uri brokerAddress) { - this.BrokerAddress = brokerAddress ?? throw new ArgumentNullException(nameof(brokerAddress)); + DiscoveryEndpoint = brokerAddress ?? throw new ArgumentNullException(nameof(brokerAddress)); return this; } + + ///<inheritdoc/> + public void SignJwt(JsonWebToken jwt) => jwt.SignFromJwk(SigningKey!); - /// <summary> - /// Specifies the current server's cluster node id. If this - /// is a server connection attempting to listen for changes on the - /// remote server, this id must be set and unique - /// </summary> - /// <param name="nodeId">The cluster node id of the current server</param> - /// <returns>Chainable fluent object</returns> - /// <exception cref="ArgumentNullException"></exception> - public ClientCacheConfiguration WithNodeId(string nodeId) - { - this.NodeId = nodeId ?? throw new ArgumentNullException(nameof(nodeId)); - return this; - } + ///<inheritdoc/> + public bool VerifyCache(JsonWebToken jwt) => jwt.VerifyFromJwk(VerificationKey!); - internal void SignJwt(JsonWebToken jwt) - { - jwt.SignFromJwk(SigningKey); - } + ///<inheritdoc/> + public bool VerifyBroker(JsonWebToken jwt) => jwt.VerifyFromJwk(BrokerVerificationKey!); - internal bool VerifyCache(JsonWebToken jwt) - { - return jwt.VerifyFromJwk(VerificationKey); - } + ///<inheritdoc/> + public IReadOnlyDictionary<string, string?> GetJwtHeader() => SigningKey!.JwtHeader; - internal bool VerifyBroker(JsonWebToken jwt) - { - return jwt.VerifyFromJwk(BrokerVerificationKey); - } } } diff --git a/lib/VNLib.Data.Caching.Extensions/src/CacheListServerRequest.cs b/lib/VNLib.Data.Caching.Extensions/src/CacheListServerRequest.cs new file mode 100644 index 0000000..76d4ad8 --- /dev/null +++ b/lib/VNLib.Data.Caching.Extensions/src/CacheListServerRequest.cs @@ -0,0 +1,82 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Data.Caching.Extensions +* File: ListServerRequest.cs +* +* ListServerRequest.cs is part of VNLib.Data.Caching.Extensions which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Data.Caching.Extensions is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Data.Caching.Extensions is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Collections.Generic; + +using VNLib.Utils; +using VNLib.Hashing.IdentityUtility; + +namespace VNLib.Data.Caching.Extensions +{ + public interface ICacheListServerRequest : ICacheJwtManager + { + Uri DiscoveryEndpoint { get; } + } + + /// <summary> + /// A request container for a ListServer request + /// </summary> + public sealed class CacheListServerRequest : ICacheListServerRequest + { + private readonly ICacheJwtManager _manager; + + + /// <summary> + /// The address of the broker server to connect to + /// </summary> + public Uri DiscoveryEndpoint { get; private set; } + + public CacheListServerRequest(ICacheJwtManager keyManager, Uri? brokerAddress = null) + { + _manager = keyManager; + DiscoveryEndpoint = brokerAddress!; + } + + + /// <summary> + /// Sets the broker address for the request + /// </summary> + /// <param name="brokerAddr">The broker server's address to connect to</param> + /// <returns>A fluent chainable value</returns> + /// <exception cref="ArgumentNullException"></exception> + public CacheListServerRequest WithDiscoveryEndpoint(Uri brokerAddr) + { + DiscoveryEndpoint = brokerAddr ?? throw new ArgumentNullException(nameof(brokerAddr)); + return this; + } + + /// <inheritdoc/> + public void SignJwt(JsonWebToken jwt) => _manager.SignJwt(jwt); + + /// <inheritdoc/> + public bool VerifyCache(JsonWebToken jwt) => _manager.VerifyCache(jwt); + + /// <inheritdoc/> + public bool VerifyBroker(JsonWebToken jwt) => _manager.VerifyBroker(jwt); + + /// <inheritdoc/> + public IReadOnlyDictionary<string, string?> GetJwtHeader() => _manager.GetJwtHeader(); + } +} diff --git a/lib/VNLib.Data.Caching.Extensions/src/CacheNodeConfiguration.cs b/lib/VNLib.Data.Caching.Extensions/src/CacheNodeConfiguration.cs new file mode 100644 index 0000000..21a99e1 --- /dev/null +++ b/lib/VNLib.Data.Caching.Extensions/src/CacheNodeConfiguration.cs @@ -0,0 +1,84 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Data.Caching.Extensions +* File: CacheNodeConfiguration.cs +* +* CacheNodeConfiguration.cs is part of VNLib.Data.Caching.Extensions which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Data.Caching.Extensions is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Data.Caching.Extensions is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; + + +namespace VNLib.Data.Caching.Extensions +{ + + public class CacheNodeConfiguration: CacheClientConfiguration, ICachePeerAdvertisment + { + /// <summary> + /// The address for clients to connect to + /// </summary> + public Uri? ConnectEndpoint { get; private set; } + + /// <summary> + /// Whether or not to advertise ourself to peer nodes + /// </summary> + public bool BroadcastAdverisment { get; private set; } + + /// <summary> + /// Define the endpoint for clients to connect to to discover + /// other discovertable nodes + /// </summary> + public Uri? DiscoveryEndpoint { get; private set; } + + /// <summary> + /// Sets the full address of our cache endpoint for clients to connect to + /// </summary> + /// <param name="connectUri">The uri clients will attempt to connect to</param> + public CacheNodeConfiguration WithCacheEndpoint(Uri connectUri) + { + ConnectEndpoint = connectUri; + return this; + } + + public CacheNodeConfiguration EnableAdvertisment(bool enable, Uri? discoveryEndpoint) + { + BroadcastAdverisment = enable; + DiscoveryEndpoint = discoveryEndpoint; + return this; + } + + ///<inheritdoc/> + public string NodeId { get; private set; } = null!; + + /// <summary> + /// Specifies the current server's cluster node id. If this + /// is a server connection attempting to listen for changes on the + /// remote server, this id must be set and unique + /// </summary> + /// <param name="nodeId">The cluster node id of the current server</param> + /// <returns>Chainable fluent object</returns> + /// <exception cref="ArgumentNullException"></exception> + public CacheClientConfiguration WithNodeId(string nodeId) + { + NodeId = nodeId ?? throw new ArgumentNullException(nameof(nodeId)); + return this; + } + + } +} diff --git a/lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs b/lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs index 8ee02f7..9efe16a 100644 --- a/lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs +++ b/lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs @@ -32,6 +32,7 @@ using System.Threading; using System.Threading.Tasks; using System.Collections.Generic; using System.Security.Cryptography; +using System.Text.Json.Serialization; using System.Runtime.CompilerServices; using RestSharp; @@ -45,12 +46,12 @@ using VNLib.Utils.Extensions; using VNLib.Net.Rest.Client; using VNLib.Net.Messaging.FBM; using VNLib.Net.Messaging.FBM.Client; - using ContentType = VNLib.Net.Http.ContentType; + namespace VNLib.Data.Caching.Extensions { - + /// <summary> /// Provides extension methods for FBM data caching using /// cache servers and brokers @@ -61,11 +62,22 @@ namespace VNLib.Data.Caching.Extensions /// The websocket sub-protocol to use when connecting to cache servers /// </summary> public const string CACHE_WS_SUB_PROCOL = "object-cache"; + /// <summary> /// The default cache message header size /// </summary> public const int MAX_FBM_MESSAGE_HEADER_SIZE = 1024; + /// <summary> + /// The client nonce signature http header name + /// </summary> + public const string X_UPGRADE_SIG_HEADER = "X-Cache-Upgrade-Sig"; + + /// <summary> + /// The advertisment header for cache node discovery + /// </summary> + public const string X_NODE_DISCOVERY_HEADER = "X-Cache-Node-Discovery"; + private static readonly RestClientPool ClientPool = new(2,new RestClientOptions() { MaxTimeout = 10 * 1000, @@ -75,7 +87,7 @@ namespace VNLib.Data.Caching.Extensions ThrowOnAnyError = true, }); - private static readonly ConditionalWeakTable<FBMClient, ClientCacheConfiguration> ClientCacheConfig = new(); + private static readonly ConditionalWeakTable<FBMClient, CacheClientConfiguration> ClientCacheConfig = new(); /// <summary> /// Gets a <see cref="FBMClientConfig"/> preconfigured object caching @@ -128,6 +140,59 @@ namespace VNLib.Data.Caching.Extensions } /// <summary> + /// Creats a new <see cref="CacheListServerRequest"/> from an existing <see cref="CacheClientConfiguration"/> + /// </summary> + /// <param name="conf">The prepared client configuration</param> + /// <returns>The new <see cref="CacheListServerRequest"/></returns> + public static CacheListServerRequest GetListMessage(this CacheClientConfiguration conf) + { + return new(conf, conf.DiscoveryEndpoint); + } + + /// <summary> + /// Discovers peer nodes from a given initial peer and returns a list of discovered nodes. If the config + /// is for a cache peer node, the current peer is removed from the list of discovered nodes. + /// </summary> + /// <param name="cacheConfig"></param> + /// <param name="initialPeer">The initial peer to discover nodes from</param> + /// <param name="cancellation">A token to cancel the discovery operation</param> + /// <returns>The collection of discovered nodes</returns> + /// <exception cref="ArgumentNullException"></exception> + public static async Task<ICachePeerAdvertisment[]?> DiscoverClusterNodesAsync( + this CacheClientConfiguration cacheConfig, + ICachePeerAdvertisment initialPeer, + CancellationToken cancellation + ) + { + _ = initialPeer?.DiscoveryEndpoint ?? throw new ArgumentException("Advertisment does not expose an advertisment endpoint"); + + //Create list request + CacheListServerRequest request = cacheConfig.GetListMessage(); + + //Override with the initial peer's discovery endpoint + request.WithDiscoveryEndpoint(initialPeer.DiscoveryEndpoint); + + //Get the list of servers + ICachePeerAdvertisment[]? servers = await ListServersAsync(request, cancellation); + + if (servers == null) + { + return null; + } + + if(cacheConfig is CacheNodeConfiguration cnc) + { + //Filter out the current node + return servers.Where(s => !cnc.NodeId.Equals(s.NodeId, StringComparison.OrdinalIgnoreCase)).ToArray(); + } + else + { + //Do not filter + return servers; + } + } + + /// <summary> /// Contacts the cache broker to get a list of active servers to connect to /// </summary> /// <param name="request">The request message used to connecto the broker server</param> @@ -135,27 +200,30 @@ namespace VNLib.Data.Caching.Extensions /// <returns>The list of active servers</returns> /// <exception cref="SecurityException"></exception> /// <exception cref="ArgumentNullException"></exception> - public static async Task<ActiveServer[]?> ListServersAsync(ListServerRequest request, CancellationToken cancellationToken = default) + public static async Task<ICachePeerAdvertisment[]?> ListServersAsync(ICacheListServerRequest request, CancellationToken cancellationToken = default) { _ = request ?? throw new ArgumentNullException(nameof(request)); string jwtBody; + //Build request jwt using (JsonWebToken requestJwt = new()) { - requestJwt.WriteHeader(request.JwtHeader); + requestJwt.WriteHeader(request.GetJwtHeader()); requestJwt.InitPayloadClaim() .AddClaim("iat", DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()) .AddClaim("nonce", RandomHash.GetRandomBase32(16)) .CommitClaims(); + //sign the jwt request.SignJwt(requestJwt); + //Compile the jwt jwtBody = requestJwt.Compile(); } //New list request - RestRequest listRequest = new(request.BrokerAddress, Method.Post); + RestRequest listRequest = new(request.DiscoveryEndpoint, Method.Post); //Add the jwt as a string to the request body listRequest.AddStringBody(jwtBody, DataFormat.None); @@ -177,24 +245,49 @@ namespace VNLib.Data.Caching.Extensions data = response.RawBytes ?? throw new InvalidOperationException("No data returned from broker"); } + //Response is jwt using JsonWebToken responseJwt = JsonWebToken.ParseRaw(data); //Verify the jwt - if (!request.VerifyJwt(responseJwt)) + if (!request.VerifyBroker(responseJwt)) { throw new SecurityException("Failed to verify the broker's challenge, cannot continue"); } using JsonDocument doc = responseJwt.GetPayload(); - return doc.RootElement.GetProperty("servers").Deserialize<ActiveServer[]>(); + return doc.RootElement.GetProperty("peers").Deserialize<Advertisment[]>(); + } + + /// <summary> + /// Registers the current node with the broker + /// </summary> + /// <returns>A task that completes when the regitration has been made successfully</returns> + /// <exception cref="ArgumentException"></exception> + public static async Task RegisterWithBrokerAsync(this CacheNodeConfiguration config, string authToken) + { + //Recover the certificate + ReadOnlyJsonWebKey cacheCert = config?.SigningKey ?? throw new ArgumentException(nameof(config.SigningKey)); + + //init broker request + using BrokerRegistrationRequest request = new(); + + request.WithBroker(config.DiscoveryEndpoint!) + .WithRegistrationAddress(config.ConnectEndpoint!.ToString()) + .WithNodeId(config.NodeId!) + .WithSigningKey(cacheCert, true) + .WithHeartbeatToken(authToken); + + + //Send the request + await RegisterWithBrokerAsync(request); } /// <summary> /// Registers the current server as active with the specified broker /// </summary> /// <param name="registration">The registration request</param> - public static async Task ResgisterWithBrokerAsync(BrokerRegistrationRequest registration) + public static async Task RegisterWithBrokerAsync(BrokerRegistrationRequest registration) { _ = registration ?? throw new ArgumentNullException(nameof(registration)); _ = registration.HeartbeatToken ?? throw new ArgumentException("Missing required heartbeat access token"); @@ -220,17 +313,20 @@ namespace VNLib.Data.Caching.Extensions //Compile and save requestData = jwt.Compile(); } + //Create reg request message RestRequest regRequest = new(registration.BrokerAddress); regRequest.AddStringBody(requestData, DataFormat.None); regRequest.AddHeader("Content-Type", "text/plain"); + //Rent client using ClientContract cc = ClientPool.Lease(); + //Exec the regitration request RestResponse response = await cc.Resource.ExecutePutAsync(regRequest); response.ThrowIfError(); - } - + } + /// <summary> /// Allows for configuration of an <see cref="FBMClient"/> @@ -238,7 +334,31 @@ namespace VNLib.Data.Caching.Extensions /// </summary> /// <param name="client"></param> /// <returns>A fluent api configuration builder for the current client</returns> - public static ClientCacheConfiguration GetCacheConfiguration(this FBMClient client) => ClientCacheConfig.GetOrCreateValue(client); + public static CacheClientConfiguration GetCacheConfiguration(this FBMClient client) => ClientCacheConfig.GetOrCreateValue(client); + + /// <summary> + /// Explicitly set the client cache configuration for the current client + /// </summary> + /// <param name="client"></param> + /// <param name="config">The cache node configuration</param> + /// <returns>The config instance</returns> + public static CacheClientConfiguration SetCacheConfiguration(this FBMClient client, CacheClientConfiguration config) + { + ClientCacheConfig.AddOrUpdate(client, config); + return config; + } + + /// <summary> + /// Explicitly set the cache node configuration for the current client + /// </summary> + /// <param name="client"></param> + /// <param name="nodeConfig">The cache node configuration</param> + /// <returns>The config instance</returns> + public static CacheNodeConfiguration SetCacheConfiguration(this FBMClient client, CacheNodeConfiguration nodeConfig) + { + ClientCacheConfig.AddOrUpdate(client, nodeConfig); + return nodeConfig; + } /// <summary> /// Discovers cache nodes in the broker configured for the current client. @@ -246,7 +366,10 @@ namespace VNLib.Data.Caching.Extensions /// <param name="client"></param> /// <param name="token">A token to cancel the discovery</param> /// <returns>A task the resolves the list of active servers on the broker server</returns> - public static Task<ActiveServer[]?> DiscoverCacheNodesAsync(this FBMClientWorkerBase client, CancellationToken token = default) => client.Client.DiscoverCacheNodesAsync(token); + public static Task<ICachePeerAdvertisment[]?> DiscoverCacheNodesAsync(this FBMClientWorkerBase client, CancellationToken token = default) + { + return client.Client.DiscoverCacheNodesAsync(token); + } /// <summary> /// Discovers cache nodes in the broker configured for the current client. @@ -254,13 +377,13 @@ namespace VNLib.Data.Caching.Extensions /// <param name="client"></param> /// <param name="token">A token to cancel the discovery </param> /// <returns>A task the resolves the list of active servers on the broker server</returns> - public static async Task<ActiveServer[]?> DiscoverCacheNodesAsync(this FBMClient client, CancellationToken token = default) + public static async Task<ICachePeerAdvertisment[]?> DiscoverCacheNodesAsync(this FBMClient client, CancellationToken token = default) { - ClientCacheConfiguration conf = ClientCacheConfig.GetOrCreateValue(client); - //Request from config - using ListServerRequest req = ListServerRequest.FromConfig(conf); + //Get the stored client config + CacheClientConfiguration conf = ClientCacheConfig.GetOrCreateValue(client); + //List servers async - return conf.CacheServers = await ListServersAsync(req, token); + return conf.CacheServers = await ListServersAsync(conf, token); } /// <summary> @@ -274,16 +397,21 @@ namespace VNLib.Data.Caching.Extensions public static async Task WaitForExitAsync(this FBMClient client, CancellationToken token = default) { client.LogDebug("Waiting for cache client to exit"); + //Get task for cancellation Task cancellation = token.WaitHandle.WaitAsync(); + //Task for status handle Task run = client.ConnectionStatusHandle.WaitAsync(); + //Wait for cancellation or _ = await Task.WhenAny(cancellation, run); client.LogDebug("Disconnecting the cache client"); + //Normal try to disconnect the socket await client.DisconnectAsync(CancellationToken.None); + //Notify if cancelled token.ThrowIfCancellationRequested(); } @@ -300,14 +428,18 @@ namespace VNLib.Data.Caching.Extensions /// <exception cref="ArgumentNullException"></exception> /// <exception cref="SecurityException"></exception> /// <exception cref="ObjectDisposedException"></exception> - public static async Task<ActiveServer> ConnectToRandomCacheAsync(this FBMClient client, CancellationToken cancellation = default) + public static async Task<ICachePeerAdvertisment> ConnectToRandomCacheAsync(this FBMClient client, CancellationToken cancellation = default) { //Get stored config - ClientCacheConfiguration conf = ClientCacheConfig.GetOrCreateValue(client); + CacheClientConfiguration conf = ClientCacheConfig.GetOrCreateValue(client); + //Select random - ActiveServer? randomServer = conf.CacheServers?.SelectRandom(); - _ = randomServer ?? throw new ArgumentException("No servers detected, cannot connect"); + ICachePeerAdvertisment? randomServer = conf.CacheServers?.SelectRandom() + ?? throw new ArgumentException("No servers detected, cannot connect"); + await ConnectToCacheAsync(client, randomServer, cancellation); + + //Return the random server we connected to return randomServer; } @@ -324,59 +456,97 @@ namespace VNLib.Data.Caching.Extensions /// <exception cref="ArgumentNullException"></exception> /// <exception cref="SecurityException"></exception> /// <exception cref="ObjectDisposedException"></exception> - public static Task ConnectToCacheAsync(this FBMClient client, ActiveServer server, CancellationToken token = default) + public static Task ConnectToCacheAsync(this FBMClient client, ICachePeerAdvertisment server, CancellationToken token = default) { _ = client ?? throw new ArgumentNullException(nameof(client)); _ = server ?? throw new ArgumentNullException(nameof(server)); //Get stored config - ClientCacheConfiguration conf = ClientCacheConfig.GetOrCreateValue(client); + CacheClientConfiguration conf = ClientCacheConfig.GetOrCreateValue(client); //Connect to server (no server id because client not replication server) return ConnectToCacheAsync(client, conf, server, token); } - - private static async Task ConnectToCacheAsync(FBMClient client, ClientCacheConfiguration request, ActiveServer server, CancellationToken token = default) + /// <summary> + /// Connects to the specified server on the configured cache client + /// </summary> + /// <param name="client"></param> + /// <param name="server">The server to connect to</param> + /// <param name="token">A token to cancel the operation</param> + /// <param name="explicitConfig">Explicit cache configuration to use</param> + /// <returns>A task that resolves when the client is connected to the cache server</returns> + /// <exception cref="FBMException"></exception> + /// <exception cref="FBMServerNegiationException"></exception> + /// <exception cref="ArgumentException"></exception> + /// <exception cref="ArgumentNullException"></exception> + /// <exception cref="SecurityException"></exception> + /// <exception cref="ObjectDisposedException"></exception> + public static Task ConnectToCacheAsync(this FBMClient client, ICachePeerAdvertisment server, CacheClientConfiguration explicitConfig, CancellationToken token = default) { - //Construct server uri - Uri serverUri = new(server.HostName!); - - //build ws uri - UriBuilder uriBuilder = new(serverUri) + _ = client ?? throw new ArgumentNullException(nameof(client)); + _ = server ?? throw new ArgumentNullException(nameof(server)); + + //Connect to server (no server id because client not replication server) + return ConnectToCacheAsync(client, explicitConfig, server, token); + } + + + private static async Task ConnectToCacheAsync( + FBMClient client, + CacheClientConfiguration config, + ICachePeerAdvertisment server, + CancellationToken token = default + ) + { + //build ws uri from the connect endpoint + UriBuilder uriBuilder = new(server.ConnectEndpoint) { - Scheme = request.UseTls ? "wss://" : "ws://" + Scheme = config.UseTls ? "wss://" : "ws://" }; - + + string challenge = RandomHash.GetRandomBase32(24); + + //See if the supplied config is for a cache node + CacheNodeConfiguration? cnc = config as CacheNodeConfiguration; + string jwtMessage; //Init jwt for connecting to server using (JsonWebToken jwt = new()) { - jwt.WriteHeader(request.JwtHeader); + jwt.WriteHeader(config.GetJwtHeader()); //Init claim JwtPayload claim = jwt.InitPayloadClaim(); - claim.AddClaim("chl", request.ServerChallenge); + claim.AddClaim("chl", challenge); - if (!string.IsNullOrWhiteSpace(request.NodeId)) + if (!string.IsNullOrWhiteSpace(cnc?.NodeId)) { /* * The unique node id so the other nodes know to load the * proper event queue for the current server */ - claim.AddClaim("sub", request.NodeId); + claim.AddClaim("sub", cnc.NodeId); } claim.CommitClaims(); //Sign jwt - request.SignJwt(jwt); + config.SignJwt(jwt); //Compile to string jwtMessage = jwt.Compile(); } - RestRequest negotation = new(serverUri, Method.Get); + /* + * During a server negiation, the client makes an intial get request to the cache endpoint + * and passes some client negiation terms as a signed message to the server. The server then + * validates these values and returns a signed jwt with the server negiation terms. + * + * The response from the server is essentailly the 'access token' + */ + + RestRequest negotation = new(server.ConnectEndpoint, Method.Get); //Set the jwt auth header for negotiation negotation.AddHeader("Authorization", jwtMessage); negotation.AddHeader("Accept", HttpHelpers.GetContentTypeString(ContentType.Text)); @@ -406,13 +576,13 @@ namespace VNLib.Data.Caching.Extensions using (JsonWebToken jwt = JsonWebToken.Parse(authToken)) { //Verify the jwt - if (!request.VerifyCache(jwt)) + if (!config.VerifyCache(jwt)) { throw new SecurityException("Failed to verify the cache server's negotiation message, cannot continue"); } //Confirm the server's buffer configuration - ValidateServerNegotation(client, request.ServerChallenge, jwt); + ValidateServerNegotation(client, challenge, jwt); } client.LogDebug("Server negotiation validated, connecting to server"); @@ -420,6 +590,16 @@ namespace VNLib.Data.Caching.Extensions //The client authorization header is the exact response client.ClientSocket.Headers[HttpRequestHeader.Authorization] = authToken; + //Compute the signature of the upgrade token + client.ClientSocket.Headers[X_UPGRADE_SIG_HEADER] = GetBase64UpgradeSingature(authToken, config.SigningKey!); + + //Check to see if adversize self is enabled + if (cnc?.BroadcastAdverisment == true) + { + //Set advertisment header + client.ClientSocket.Headers[X_NODE_DISCOVERY_HEADER] = GetAdvertismentHeader(cnc); + } + //Connect async await client.ConnectAsync(uriBuilder.Uri, token); } @@ -475,16 +655,183 @@ namespace VNLib.Data.Caching.Extensions } } + /* + * Added layer to confirm that client that requested the negotation holds the private key + * compute a signature of the upgrade token and send it to the server to prove we hold the private key. + */ + + private static string GetBase64UpgradeSingature(string? token, ReadOnlyJsonWebKey key) + { + //try to get the ecdsa key first + using ECDsa? ec = key.GetECDsaPrivateKey(); + + if(ec != null) + { + //Compute hash of the token + byte[] hash = ManagedHash.ComputeHash(token, HashAlg.SHA256); + + //Sign the hash + byte[] sig = ec.SignHash(hash, DSASignatureFormat.IeeeP1363FixedFieldConcatenation); + + //Return the base64 string + return Convert.ToBase64String(sig); + } + + //Check rsa next + using RSA? rsa = key.GetRSAPrivateKey(); + if(rsa != null) + { + //Compute hash of the token + byte[] hash = ManagedHash.ComputeHash(token, HashAlg.SHA256); + + //Sign the hash + byte[] sig = rsa.SignHash(hash, HashAlgorithmName.SHA256, RSASignaturePadding.Pkcs1); + + //Return the base64 string + return Convert.ToBase64String(sig); + } + + throw new CryptographicException("Cache JKW does not export a supported private key for upgrade challenges"); + } + + /// <summary> + /// Verifies the signed auth token against the given verification key + /// </summary> + /// <param name="signature">The base64 signature of the token</param> + /// <param name="token">The raw token to compute the hash of</param> + /// <param name="nodeConfig">The node configuration</param> + /// <returns>True if the singature matches, false otherwise</returns> + /// <exception cref="CryptographicException"></exception> + public static bool VerifyUpgradeToken(this CacheClientConfiguration nodeConfig, string signature, string token) + { + return VerifyUpgradeToken(signature, token, nodeConfig.VerificationKey); + } + + /// <summary> + /// Verifies the signed auth token against the given verification key + /// </summary> + /// <param name="signature">The base64 signature of the token</param> + /// <param name="token">The raw token to compute the hash of</param> + /// <param name="verifcationKey">The key used to verify the singature with</param> + /// <returns>True if the singature matches, false otherwise</returns> + /// <exception cref="ArgumentNullException"></exception> + /// <exception cref="CryptographicException"></exception> + public static bool VerifyUpgradeToken(string signature, string token, ReadOnlyJsonWebKey verifcationKey) + { + _ = verifcationKey ?? throw new ArgumentNullException(nameof(verifcationKey)); + + //get the hash of the token + byte[] hash = ManagedHash.ComputeHash(token, HashAlg.SHA256); + + //decode the signature + byte[] sig = Convert.FromBase64String(signature); + + //try to get the ecdsa key first + using ECDsa? ec = verifcationKey.GetECDsaPublicKey(); + if(ec != null) + { + //Verify the signature + return ec.VerifyHash(hash, sig, DSASignatureFormat.IeeeP1363FixedFieldConcatenation); + } + + //Check rsa next + using RSA? rsa = verifcationKey.GetRSAPublicKey(); + if(rsa != null) + { + //Verify the signature + return rsa.VerifyHash(hash, sig, HashAlgorithmName.SHA256, RSASignaturePadding.Pkcs1); + } + + throw new CryptographicException("Cache JKW does not export a supported public key for upgrade challenges"); + } + + private static string GetAdvertismentHeader(CacheNodeConfiguration nodeConfiguration) + { + /* + * Create node advertisment message to publish to peer nodes + * + * these messages will allow other clients and peers to discover us + */ + + using JsonWebToken jwt = new(); + + //Get the jwt header + jwt.WriteHeader(nodeConfiguration.GetJwtHeader()); + + jwt.InitPayloadClaim() + .AddClaim("nonce", RandomHash.GetRandomBase32(16)) + .AddClaim("iat", DateTimeOffset.UtcNow.ToUnixTimeSeconds()) + .AddClaim("iss", nodeConfiguration.NodeId!) + .AddClaim("url", nodeConfiguration.ConnectEndpoint!.ToString()) + //Optional discovery endpoint + .AddClaim("dis", nodeConfiguration.DiscoveryEndpoint?.ToString() ?? string.Empty) + .CommitClaims(); + + //Sign message + nodeConfiguration.SignJwt(jwt); + + return jwt.Compile(); + } + + /// <summary> + /// Verifies the peer advertisment message + /// </summary> + /// <param name="config"></param> + /// <param name="message">The advertisment message to verify</param> + /// <returns>The advertisment message if successfully verified, or null otherwise</returns> + /// <exception cref="FormatException"></exception> + public static ICachePeerAdvertisment? VerifyPeerAdvertisment(this ICacheJwtManager config, string message) + { + using JsonWebToken jwt = JsonWebToken.Parse(message); + + //Verify the signature + if (!config.VerifyCache(jwt)) + { + return null; + } + + //Get the payload + return jwt.GetPayload<Advertisment>(); + } + + /// <summary> /// Selects a random server from a collection of active servers /// </summary> /// <param name="servers"></param> /// <returns>A server selected at random</returns> - public static ActiveServer SelectRandom(this ICollection<ActiveServer> servers) + public static ICachePeerAdvertisment SelectRandom(this ICollection<ICachePeerAdvertisment> servers) { //select random server int randServer = RandomNumberGenerator.GetInt32(0, servers.Count); return servers.ElementAt(randServer); } + + + private class Advertisment : ICachePeerAdvertisment + { + [JsonIgnore] + public Uri? ConnectEndpoint { get; set; } + + [JsonIgnore] + public Uri? DiscoveryEndpoint { get; set; } + + [JsonPropertyName("iss")] + public string NodeId { get; set; } + + [JsonPropertyName("url")] + public string? url + { + get => ConnectEndpoint?.ToString(); + set => ConnectEndpoint = value == null ? null : new Uri(value); + } + + [JsonPropertyName("dis")] + public string? dis + { + get => DiscoveryEndpoint?.ToString(); + set => DiscoveryEndpoint = value == null ? null : new Uri(value); + } + } } } diff --git a/lib/VNLib.Data.Caching.Extensions/src/ICachePeerAdvertisment.cs b/lib/VNLib.Data.Caching.Extensions/src/ICachePeerAdvertisment.cs new file mode 100644 index 0000000..acf883e --- /dev/null +++ b/lib/VNLib.Data.Caching.Extensions/src/ICachePeerAdvertisment.cs @@ -0,0 +1,50 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Data.Caching.Extensions +* File: ICachePeerAdvertisment.cs +* +* ICachePeerAdvertisment.cs is part of VNLib.Data.Caching.Extensions which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Data.Caching.Extensions is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Data.Caching.Extensions is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; + + +namespace VNLib.Data.Caching.Extensions +{ + /// <summary> + /// Represents a node that can be advertised to clients + /// </summary> + public interface ICachePeerAdvertisment + { + /// <summary> + /// The endpoint for clients to connect to to access the cache + /// </summary> + Uri ConnectEndpoint { get; } + + /// <summary> + /// Gets the address for clients to connect to to discover other discovertable nodes + /// </summary> + Uri? DiscoveryEndpoint { get; } + + /// <summary> + /// Gets the unique identifier for this node + /// </summary> + string NodeId { get; } + } +} diff --git a/lib/VNLib.Data.Caching.Extensions/src/ListServerRequest.cs b/lib/VNLib.Data.Caching.Extensions/src/ListServerRequest.cs deleted file mode 100644 index fd25925..0000000 --- a/lib/VNLib.Data.Caching.Extensions/src/ListServerRequest.cs +++ /dev/null @@ -1,116 +0,0 @@ -/* -* Copyright (c) 2023 Vaughn Nugent -* -* Library: VNLib -* Package: VNLib.Data.Caching.Extensions -* File: ListServerRequest.cs -* -* ListServerRequest.cs is part of VNLib.Data.Caching.Extensions which is part of the larger -* VNLib collection of libraries and utilities. -* -* VNLib.Data.Caching.Extensions is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License as -* published by the Free Software Foundation, either version 3 of the -* License, or (at your option) any later version. -* -* VNLib.Data.Caching.Extensions is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see https://www.gnu.org/licenses/. -*/ - -using System; -using System.Collections.Generic; - -using VNLib.Utils; -using VNLib.Hashing.IdentityUtility; - -namespace VNLib.Data.Caching.Extensions -{ - /// <summary> - /// A request container for a ListServer request - /// </summary> - public sealed class ListServerRequest : VnDisposeable - { - private readonly bool _ownsKeys; - - private ReadOnlyJsonWebKey? VerificationKey; - private ReadOnlyJsonWebKey? SigningAlg; - - /// <summary> - /// The address of the broker server to connect to - /// </summary> - public Uri BrokerAddress { get; } - - public ListServerRequest(Uri brokerAddress) - { - BrokerAddress = brokerAddress; - _ownsKeys = true; - } - - private ListServerRequest(ClientCacheConfiguration conf) - { - //Broker verification key is required - VerificationKey = conf.BrokerVerificationKey; - SigningAlg = conf.SigningKey; - BrokerAddress = conf.BrokerAddress ?? throw new ArgumentException("Broker address must be specified"); - _ownsKeys = false; - } - - internal static ListServerRequest FromConfig(ClientCacheConfiguration conf) => new (conf); - - /// <summary> - /// Sets the public key used to verify the signature of the response. - /// </summary> - /// <param name="jwk">The key used to verify messages </param> - public ListServerRequest WithVerificationKey(ReadOnlyJsonWebKey jwk) - { - VerificationKey = jwk ?? throw new ArgumentNullException(nameof(jwk)); - return this; - } - /// <summary> - /// Sets the private key used to sign the request. - /// </summary> - /// <param name="jwk">The <see cref="ReadOnlyJsonWebKey"/> containing the private key used to sign the message</param> - /// <exception cref="ArgumentNullException"></exception> - public ListServerRequest WithSigningKey(ReadOnlyJsonWebKey jwk) - { - SigningAlg = jwk ?? throw new ArgumentNullException(nameof(jwk)); - return this; - } - - /// <summary> - /// Signs the <see cref="JsonWebToken"/> using the private key. - /// </summary> - /// <param name="jwt">The message to sign</param> - internal void SignJwt(JsonWebToken jwt) - { - jwt.SignFromJwk(SigningAlg); - } - - /// <summary> - /// Verifies the signature of the <see cref="JsonWebToken"/> - /// </summary> - /// <param name="jwt"></param> - /// <returns>A value that indicates if the signature is verified</returns> - internal bool VerifyJwt(JsonWebToken jwt) - { - return jwt.VerifyFromJwk(VerificationKey); - } - - internal IReadOnlyDictionary<string, string?> JwtHeader => SigningAlg!.JwtHeader; - - ///<inheritdoc/> - protected override void Free() - { - if (_ownsKeys) - { - VerificationKey?.Dispose(); - SigningAlg?.Dispose(); - } - } - } -} diff --git a/lib/VNLib.Data.Caching/src/ClientExtensions.cs b/lib/VNLib.Data.Caching/src/ClientExtensions.cs index 2ac8b40..eced7fe 100644 --- a/lib/VNLib.Data.Caching/src/ClientExtensions.cs +++ b/lib/VNLib.Data.Caching/src/ClientExtensions.cs @@ -28,7 +28,6 @@ using System.Buffers; using System.Text.Json; using System.Threading; using System.Threading.Tasks; -using System.Collections.Generic; using System.Runtime.CompilerServices; using VNLib.Utils.Logging; @@ -417,7 +416,7 @@ namespace VNLib.Data.Caching /// </summary> /// <param name="client"></param> /// <param name="cancellationToken">A token to cancel the deuque operation</param> - /// <returns>A <see cref="KeyValuePair{TKey, TValue}"/> that contains the modified object id and optionally its new id</returns> + /// <returns>A <see cref="WaitForChangeResult"/> that contains information about the modified element</returns> public static async Task<WaitForChangeResult> WaitForChangeAsync(this FBMClient client, CancellationToken cancellationToken = default) { //Rent a new request diff --git a/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs b/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs index ae6ca59..9f5ccfe 100644 --- a/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs +++ b/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs @@ -112,7 +112,7 @@ namespace VNLib.Plugins.Extensions.VNCache //Connection authentication methods Client.GetCacheConfiguration() .WithVerificationKey(cachePub) - .WithSigningCertificate(clientPriv) + .WithSigningKey(clientPriv) .WithBrokerVerificationKey(brokerPub); } @@ -127,7 +127,7 @@ namespace VNLib.Plugins.Extensions.VNCache while (true) { //Load the server list - ActiveServer[]? servers; + ICachePeerAdvertisment[]? servers; while (true) { try @@ -163,8 +163,8 @@ namespace VNLib.Plugins.Extensions.VNCache pluginLog.Debug("Connecting to random cache server"); //Connect to a random server - ActiveServer selected = await Client.ConnectToRandomCacheAsync(exitToken); - pluginLog.Debug("Connected to cache server {s}", selected.ServerId); + ICachePeerAdvertisment selected = await Client.ConnectToRandomCacheAsync(exitToken); + pluginLog.Debug("Connected to cache server {s}", selected.NodeId); //Set connection status flag IsConnected = true; |