diff options
Diffstat (limited to 'lib/VNLib.Data.Caching.Extensions')
9 files changed, 493 insertions, 376 deletions
diff --git a/lib/VNLib.Data.Caching.Extensions/src/ApiModel/CacheSiteAdapter.cs b/lib/VNLib.Data.Caching.Extensions/src/ApiModel/CacheSiteAdapter.cs index 99acfd5..6edb912 100644 --- a/lib/VNLib.Data.Caching.Extensions/src/ApiModel/CacheSiteAdapter.cs +++ b/lib/VNLib.Data.Caching.Extensions/src/ApiModel/CacheSiteAdapter.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2023 Vaughn Nugent +* Copyright (c) 2024 Vaughn Nugent * * Library: VNLib * Package: VNLib.Data.Caching.Extensions @@ -22,7 +22,9 @@ * along with this program. If not, see https://www.gnu.org/licenses/. */ +using System; using System.Net; +using System.Security; using System.Text; using System.Threading; using System.Threading.Tasks; @@ -39,6 +41,22 @@ namespace VNLib.Data.Caching.Extensions.ApiModel /// </summary> internal sealed class CacheSiteAdapter : RestSiteAdapterBase { + /* + * Lazy to defer errors for debuggong + */ + private static readonly Lazy<CacheSiteAdapter> _lazy = new(() => ConfigureAdapter(2)); + + internal static CacheSiteAdapter Instance => _lazy.Value; + + private static CacheSiteAdapter ConfigureAdapter(int maxClients) + { + CacheSiteAdapter adapter = new(maxClients); + //Configure the site endpoints + adapter.BuildEndpoints(ServiceEndpoints.Definition); + return adapter; + } + + protected override RestClientPool Pool { get; } public CacheSiteAdapter(int maxClients) @@ -55,7 +73,23 @@ namespace VNLib.Data.Caching.Extensions.ApiModel } public override void OnResponse(RestResponse response) - { } + { + switch(response.StatusCode) + { + case HttpStatusCode.OK: + break; + case HttpStatusCode.Unauthorized: + throw new SecurityException("Unauthorized access to cache service"); + case HttpStatusCode.Forbidden: + throw new SecurityException("Forbidden access to cache service"); + case HttpStatusCode.NotFound: + throw new InvalidOperationException("Cache service not found"); + case HttpStatusCode.InternalServerError: + throw new InvalidOperationException("Cache service internal error"); + default: + throw new InvalidOperationException($"Cache service error: {response.StatusCode}"); + } + } public override Task WaitAsync(CancellationToken cancellation = default) { diff --git a/lib/VNLib.Data.Caching.Extensions/src/Clustering/CacheClientConfiguration.cs b/lib/VNLib.Data.Caching.Extensions/src/Clustering/CacheClientConfiguration.cs index 5a15d33..263f41a 100644 --- a/lib/VNLib.Data.Caching.Extensions/src/Clustering/CacheClientConfiguration.cs +++ b/lib/VNLib.Data.Caching.Extensions/src/Clustering/CacheClientConfiguration.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2023 Vaughn Nugent +* Copyright (c) 2024 Vaughn Nugent * * Library: VNLib * Package: VNLib.Data.Caching.Extensions @@ -37,14 +37,9 @@ namespace VNLib.Data.Caching.Extensions.Clustering public class CacheClientConfiguration { /// <summary> - /// Stores available cache servers to be used for discovery, and connections - /// </summary> - public INodeDiscoveryCollection NodeCollection { get; } = new NodeDiscoveryCollection(); - - /// <summary> /// The authentication manager to use for signing and verifying messages to and from the cache servers /// </summary> - public ICacheAuthManager AuthManager { get; private set; } + public ICacheAuthManager AuthManager { get; private set; } = null!; /// <summary> /// The error handler to use for handling errors that occur during the discovery process @@ -89,7 +84,7 @@ namespace VNLib.Data.Caching.Extensions.Clustering public CacheClientConfiguration WithInitialPeers(IEnumerable<Uri> peers) { //Check null - _ = peers ?? throw new ArgumentNullException(nameof(peers)); + ArgumentNullException.ThrowIfNull(peers); //Store peer array WellKnownNodes = peers.ToArray(); diff --git a/lib/VNLib.Data.Caching.Extensions/src/Clustering/CacheNodeConfiguration.cs b/lib/VNLib.Data.Caching.Extensions/src/Clustering/CacheNodeConfiguration.cs index 6b7ab48..c21ed05 100644 --- a/lib/VNLib.Data.Caching.Extensions/src/Clustering/CacheNodeConfiguration.cs +++ b/lib/VNLib.Data.Caching.Extensions/src/Clustering/CacheNodeConfiguration.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2023 Vaughn Nugent +* Copyright (c) 2024 Vaughn Nugent * * Library: VNLib * Package: VNLib.Data.Caching.Extensions @@ -23,6 +23,7 @@ */ using System; +using System.Runtime.CompilerServices; namespace VNLib.Data.Caching.Extensions.Clustering { @@ -85,8 +86,14 @@ namespace VNLib.Data.Caching.Extensions.Clustering return this; } + internal StrongBox<string> NodeIdRef { get; } = new(string.Empty); + ///<inheritdoc/> - public string NodeId { get; private set; } = null!; + public string NodeId + { + get => NodeIdRef.Value!; + private set => NodeIdRef.Value = value; + } /// <summary> /// Specifies the current server's cluster node id. If this @@ -99,10 +106,6 @@ namespace VNLib.Data.Caching.Extensions.Clustering public CacheNodeConfiguration WithNodeId(string nodeId) { NodeId = nodeId ?? throw new ArgumentNullException(nameof(nodeId)); - - //Update the node id in the node collection - (NodeCollection as NodeDiscoveryCollection)!.SetSelfId(nodeId); - return this; } diff --git a/lib/VNLib.Data.Caching.Extensions/src/Clustering/ICacheDiscoveryErrorHandler.cs b/lib/VNLib.Data.Caching.Extensions/src/Clustering/ICacheDiscoveryErrorHandler.cs index 984ce3d..f27f1fb 100644 --- a/lib/VNLib.Data.Caching.Extensions/src/Clustering/ICacheDiscoveryErrorHandler.cs +++ b/lib/VNLib.Data.Caching.Extensions/src/Clustering/ICacheDiscoveryErrorHandler.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2023 Vaughn Nugent +* Copyright (c) 2024 Vaughn Nugent * * Library: VNLib * Package: VNLib.Data.Caching.Extensions @@ -37,5 +37,12 @@ namespace VNLib.Data.Caching.Extensions.Clustering /// <param name="errorNode">The node that the error occured on</param> /// <param name="ex">The exception that caused the invocation</param> void OnDiscoveryError(CacheNodeAdvertisment errorNode, Exception ex); + + /// <summary> + /// Invoked when an error occurs during the discovery process + /// </summary> + /// <param name="errorAddress">The server address that failed to connect</param> + /// <param name="ex">The exception that caused the invocation</param> + void OnDiscoveryError(Uri errorAddress, Exception ex); } } diff --git a/lib/VNLib.Data.Caching.Extensions/src/Clustering/INodeDiscoveryEnumerator.cs b/lib/VNLib.Data.Caching.Extensions/src/Clustering/INodeDiscoveryEnumerator.cs index 677088a..e57e69b 100644 --- a/lib/VNLib.Data.Caching.Extensions/src/Clustering/INodeDiscoveryEnumerator.cs +++ b/lib/VNLib.Data.Caching.Extensions/src/Clustering/INodeDiscoveryEnumerator.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2023 Vaughn Nugent +* Copyright (c) 2024 Vaughn Nugent * * Library: VNLib * Package: VNLib.Data.Caching.Extensions @@ -28,7 +28,9 @@ using System.Collections.Generic; namespace VNLib.Data.Caching.Extensions.Clustering { /// <summary> - /// A custom enumerator for the node discovery process + /// A custom enumerator for the node discovery process. Simplifies the recursive processes + /// of discovering nodes in a cluster to a simple enumeration process. It allows for real-time + /// updates to the collection of discovered nodes as a union operation. /// </summary> public interface INodeDiscoveryEnumerator : IEnumerator<CacheNodeAdvertisment> { diff --git a/lib/VNLib.Data.Caching.Extensions/src/Clustering/NodeDiscoveryCollection.cs b/lib/VNLib.Data.Caching.Extensions/src/Clustering/NodeDiscoveryCollection.cs index b0e53e1..16b96a3 100644 --- a/lib/VNLib.Data.Caching.Extensions/src/Clustering/NodeDiscoveryCollection.cs +++ b/lib/VNLib.Data.Caching.Extensions/src/Clustering/NodeDiscoveryCollection.cs @@ -1,12 +1,12 @@ /* -* Copyright (c) 2023 Vaughn Nugent +* Copyright (c) 2024 Vaughn Nugent * * Library: VNLib * Package: VNLib.Data.Caching.Extensions * File: NodeDiscoveryCollection.cs * -* NodeDiscoveryCollection.cs is part of VNLib.Data.Caching.Extensions which is part of the larger -* VNLib collection of libraries and utilities. +* NodeDiscoveryCollection.cs is part of VNLib.Data.Caching.Extensions which is part +* of the larger VNLib collection of libraries and utilities. * * VNLib.Data.Caching.Extensions is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License as @@ -26,24 +26,18 @@ using System; using System.Linq; using System.Collections; using System.Collections.Generic; +using System.Runtime.CompilerServices; + +using VNLib.Utils.Extensions; namespace VNLib.Data.Caching.Extensions.Clustering { /// <summary> /// Represents a collection of available cache nodes from a discovery process /// </summary> - public sealed class NodeDiscoveryCollection : INodeDiscoveryCollection + public sealed class NodeDiscoveryCollection(StrongBox<string?>? selfId) : INodeDiscoveryCollection { - private string? _selfId; - private LinkedList<CacheNodeAdvertisment> _peers; - - /// <summary> - /// Initializes a new empty <see cref="NodeDiscoveryCollection"/> - /// </summary> - public NodeDiscoveryCollection() - { - _peers = new(); - } + private LinkedList<CacheNodeAdvertisment> _peers = new(); /// <summary> /// Manually adds nodes to the collection that were not discovered through the discovery process @@ -62,39 +56,34 @@ namespace VNLib.Data.Caching.Extensions.Clustering } /// <summary> - /// Sets the id of the current node, so it can be excluded from discovery + /// Removes a vector of nodes from the internal collection /// </summary> - /// <param name="selfId">The id of the current node to exclude</param> - public void SetSelfId(string? selfId) => _selfId = selfId; + /// <param name="nodes">The vector containg nodes to remove from the collection</param> + public void RemoveManualNodes(IEnumerable<CacheNodeAdvertisment> nodes) => nodes.ForEach(n => _peers.Remove(n)); ///<inheritdoc/> - public INodeDiscoveryEnumerator BeginDiscovery() - { - return new NodeEnumerator(new(), _selfId); - } + public INodeDiscoveryEnumerator BeginDiscovery() => new NodeEnumerator(new(), selfId?.Value); ///<inheritdoc/> public INodeDiscoveryEnumerator BeginDiscovery(IEnumerable<CacheNodeAdvertisment> initialPeers) { + ArgumentNullException.ThrowIfNull(initialPeers); + //Init new enumerator with the initial peers - return new NodeEnumerator(new(initialPeers), _selfId); + return new NodeEnumerator(new(initialPeers), selfId?.Value); } ///<inheritdoc/> public void CompleteDiscovery(INodeDiscoveryEnumerator enumerator) { - _ = enumerator ?? throw new ArgumentNullException(nameof(enumerator)); + ArgumentNullException.ThrowIfNull(enumerator); //Capture all nodes from the enumerator and store them as our current peers _peers = (enumerator as NodeEnumerator)!.Peers; } ///<inheritdoc/> - public CacheNodeAdvertisment[] GetAllNodes() - { - //Capture all current peers - return _peers.ToArray(); - } + public CacheNodeAdvertisment[] GetAllNodes() => _peers.ToArray(); private sealed record class NodeEnumerator(LinkedList<CacheNodeAdvertisment> Peers, string? SelfNodeId) : INodeDiscoveryEnumerator { diff --git a/lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs b/lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs index bd86461..47aadd9 100644 --- a/lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs +++ b/lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs @@ -31,7 +31,6 @@ using System.Threading; using System.Threading.Tasks; using System.Collections.Generic; using System.Security.Cryptography; -using System.Runtime.CompilerServices; using RestSharp; @@ -74,22 +73,7 @@ namespace VNLib.Data.Caching.Extensions /// The advertisment header for cache node discovery /// </summary> public const string X_NODE_DISCOVERY_HEADER = "X-Cache-Node-Discovery"; - - /* - * Lazy to defer errors for debuggong - */ - private static readonly Lazy<CacheSiteAdapter> SiteAdapter = new(() => ConfigureAdapter(2)); - - private static CacheSiteAdapter ConfigureAdapter(int maxClients) - { - CacheSiteAdapter adapter = new(maxClients); - //Configure the site endpoints - adapter.BuildEndpoints(ServiceEndpoints.Definition); - return adapter; - } - - private static readonly ConditionalWeakTable<FBMClientFactory, CacheClientConfiguration> ClientCacheConfig = new(); - + /// <summary> /// Gets a <see cref="FBMClientConfig"/> preconfigured object caching /// protocl @@ -101,7 +85,7 @@ namespace VNLib.Data.Caching.Extensions /// <returns>A preconfigured <see cref="FBMClientConfig"/> for object caching</returns> public static FBMClientConfig GetDefaultConfig(IUnmangedHeap heap, int maxMessageSize, TimeSpan timeout = default, ILogProvider? debugLog = null) { - return GetDefaultConfig(new FallbackFBMMemoryManager(heap), maxMessageSize, timeout, debugLog); + return GetDefaultConfig(new SharedHeapFBMMemoryManager(heap), maxMessageSize, timeout, debugLog); } /// <summary> @@ -153,236 +137,22 @@ namespace VNLib.Data.Caching.Extensions { client.Config.DebugLog?.Debug("{debug}: {data}", "[CACHE]", message); } - - /// <summary> - /// Discovers ALL possible cache nodes itteritivley, first by collecting the configuration - /// from the initial peers. - /// This will make connections to all discoverable servers - /// </summary> - /// <param name="config"></param> - /// <param name="cancellation">A token to cancel the operation</param> - /// <returns></returns> - /// <exception cref="ArgumentException"></exception> - /// <exception cref="CacheDiscoveryFailureException"></exception> - public static async Task DiscoverNodesAsync(this CacheClientConfiguration config, CancellationToken cancellation) - { - //Make sure at least one node defined - if(config?.WellKnownNodes == null || config.WellKnownNodes.Length == 0) - { - throw new ArgumentException("There must be at least one cache node defined in the client configuration"); - } - - //Get the initial advertisments that arent null - CacheNodeAdvertisment[] initialPeers = await ResolveWellKnownAsync(config, cancellation); - - if (initialPeers.Length == 0) - { - throw new CacheDiscoveryFailureException("There must be at least one available cache node to continue discovery"); - } - - await DiscoverNodesAsync(config, initialPeers, cancellation); - } /// <summary> - /// Resolves the initial well-known cache nodes into their advertisments + /// Gets the discovery manager for the current client configuration. Just a + /// convience method. /// </summary> - /// <param name="config"></param> - /// <param name="cancellation">A token to cancel the operation</param> - /// <returns>An array of resolved nodes</returns> - public static async Task<CacheNodeAdvertisment[]> ResolveWellKnownAsync(this CacheClientConfiguration config, CancellationToken cancellation) - { - _ = config?.WellKnownNodes ?? throw new ArgumentNullException(nameof(config)); - - Task<CacheNodeAdvertisment?>[] initialAdds = new Task<CacheNodeAdvertisment?>[config.WellKnownNodes.Length]; - - //Discover initial advertisments from well-known addresses - for (int i = 0; i < config.WellKnownNodes.Length; i++) - { - initialAdds[i] = DiscoverNodeConfigAsync(config.WellKnownNodes[i], config, cancellation); - } - - //Wait for all initial adds to complete - await Task.WhenAll(initialAdds); - - //Get the initial advertisments that arent null - return initialAdds.Select(static x => x.Result!).Where(static s => s != null).ToArray(); - } + /// <param name="conf"></param> + /// <returns>The new <see cref="VNCacheClusterManager"/> instance around your config</returns> + public static VNCacheClusterManager GetDiscoveryManager(this CacheClientConfiguration conf) => new(conf); /// <summary> - /// Discovers ALL possible cache nodes itteritivley from the current collection of initial peers. - /// This will make connections to all discoverable servers and update the client configuration, with all - /// discovered peers + /// Converts the cache client configuration to a cluster client /// </summary> /// <param name="config"></param> - /// <param name="initialPeers">Accepts an array of initial peers to override the endpoint discovery process</param> - /// <param name="cancellation">A token to cancel the operation</param> - /// <returns>A task that completes when all nodes have been discovered</returns> - /// <exception cref="ArgumentException"></exception> - /// <exception cref="CacheDiscoveryFailureException"></exception> - public static async Task DiscoverNodesAsync(this CacheClientConfiguration config, CacheNodeAdvertisment[] initialPeers, CancellationToken cancellation) - { - //Make sure at least one node defined - if (initialPeers == null || initialPeers.Length == 0) - { - throw new ArgumentException("There must be at least one initial peer"); - } - - //Get the discovery enumerator with the initial peers - INodeDiscoveryEnumerator enumerator = config.NodeCollection.BeginDiscovery(initialPeers); - - //Start the discovery process - await DiscoverNodesAsync(enumerator, config, config.ErrorHandler, cancellation); - - //Commit nodes - config.NodeCollection.CompleteDiscovery(enumerator); - } - - private static async Task DiscoverNodesAsync( - INodeDiscoveryEnumerator enumerator, - CacheClientConfiguration config, - ICacheDiscoveryErrorHandler? errHandler, - CancellationToken cancellation - ) - { - //Loop through servers - while (enumerator.MoveNext()) - { - //Make sure the node has a discovery endpoint - if (enumerator.Current.DiscoveryEndpoint == null) - { - //Skip this node - continue; - } - - /* - * We are allowed to save nodes that do not have a discovery endpoint, but we cannot - * discover nodes from them we can only use them as cache - */ - - //add a random delay to avoid spamming servers - await Task.Delay((int)Random.Shared.NextInt64(100, 500), cancellation); - - try - { - //Discover nodes from the current node - CacheNodeAdvertisment[]? nodes = await GetCacheNodesAsync(enumerator.Current, config, cancellation); - - if (nodes != null) - { - //Add nodes to the collection - enumerator.OnPeerDiscoveryComplete(nodes); - } - } - //Catch exceptions when an error handler is defined - catch(Exception ex) when (errHandler != null) - { - //Handle the error - errHandler.OnDiscoveryError(enumerator.Current, ex); - } - catch(Exception ex) - { - throw new CacheDiscoveryFailureException($"Failed to discovery peer node {enumerator.Current?.NodeId}, cannot continue", ex); - } - } - } - - private static async Task<CacheNodeAdvertisment?> DiscoverNodeConfigAsync(Uri serverUri, CacheClientConfiguration config, CancellationToken cancellation) - { - try - { - GetConfigRequest req = new (serverUri, config); - - //Site adapter verifies response messages so we dont need to check on the response - byte[] data = await SiteAdapter.Value.ExecuteAsync(req, cancellation).AsBytes() - ?? throw new CacheDiscoveryFailureException($"No data returned from desired cache node"); - - //Response is jwt - using JsonWebToken responseJwt = JsonWebToken.ParseRaw(data); - - //The entire payload is just the single serialzed advertisment - using JsonDocument doc = responseJwt.GetPayload(); - - return doc.RootElement.GetProperty("sub").Deserialize<CacheNodeAdvertisment>(); - } - //Bypass cdfe when error handler is null - catch(CacheDiscoveryFailureException) when(config.ErrorHandler == null) - { - throw; - } - //Catch exceptions when an error handler is defined - catch (Exception ex) when (config.ErrorHandler != null) - { - //Handle the error - config.ErrorHandler.OnDiscoveryError(null!, ex); - return null; - } - catch (Exception ex) - { - throw new CacheDiscoveryFailureException("Failed to discover node configuration", ex); - } - } - - /// <summary> - /// Contacts the given server's discovery endpoint to discover a list of available - /// servers we can connect to - /// </summary> - /// <param name="advert">An advertisment of a server to discover other nodes from</param> - /// <param name="cancellationToken">A token to cancel the operationS</param> - /// <param name="config">The cache configuration object</param> - /// <returns>The list of active servers</returns> - /// <exception cref="SecurityException"></exception> - /// <exception cref="ArgumentException"></exception> - /// <exception cref="ArgumentNullException"></exception> - public static async Task<CacheNodeAdvertisment[]?> GetCacheNodesAsync(CacheNodeAdvertisment advert, CacheClientConfiguration config, CancellationToken cancellationToken = default) - { - _ = advert ?? throw new ArgumentNullException(nameof(advert)); - _ = config ?? throw new ArgumentNullException(nameof(config)); - _ = advert.DiscoveryEndpoint ?? throw new ArgumentException("Advertisment does not expose an advertisment endpoint"); - - DiscoveryRequest req = new (advert.DiscoveryEndpoint, config); - - //Site adapter verifies response messages so we dont need to check on the response - byte[] data = await SiteAdapter.Value.ExecuteAsync(req, cancellationToken).AsBytes() - ?? throw new InvalidOperationException($"No data returned from node {advert.NodeId}"); - - //Response is jwt - using JsonWebToken responseJwt = JsonWebToken.ParseRaw(data); - - using JsonDocument doc = responseJwt.GetPayload(); - return doc.RootElement.GetProperty("peers").Deserialize<CacheNodeAdvertisment[]>(); - } - - /// <summary> - /// Allows for configuration of an <see cref="FBMClient"/> - /// for a connection to a cache server - /// </summary> - /// <param name="client"></param> - /// <returns>A fluent api configuration builder for the current client</returns> - public static CacheClientConfiguration GetCacheConfiguration(this FBMClientFactory client) => ClientCacheConfig.GetOrCreateValue(client); - - /// <summary> - /// Explicitly set the client cache configuration for the current client - /// </summary> - /// <param name="client"></param> - /// <param name="config">The cache node configuration</param> - /// <returns>The config instance</returns> - public static CacheClientConfiguration SetCacheConfiguration(this FBMClientFactory client, CacheClientConfiguration config) - { - ClientCacheConfig.AddOrUpdate(client, config); - return config; - } - - /// <summary> - /// Explicitly set the cache node configuration for the current client - /// </summary> - /// <param name="client"></param> - /// <param name="nodeConfig">The cache node configuration</param> - /// <returns>The config instance</returns> - public static CacheNodeConfiguration SetCacheConfiguration(this FBMClientFactory client, CacheNodeConfiguration nodeConfig) - { - ClientCacheConfig.AddOrUpdate(client, nodeConfig); - return nodeConfig; - } + /// <param name="factory">The FBM client factory instance to use</param> + /// <returns>The new cluster client instance</returns> + public static VNCacheClusterClient ToClusterClient(this CacheClientConfiguration config, FBMClientFactory factory) => new(config, factory); /// <summary> /// Waits for the client to disconnect from the server while observing @@ -396,6 +166,8 @@ namespace VNLib.Data.Caching.Extensions /// <returns>A task that complets when the connecion has been closed successfully</returns> public static async Task WaitForExitAsync(this FBMClient client, CancellationToken token = default) { + ArgumentNullException.ThrowIfNull(client); + client.LogDebug("Waiting for cache client to exit"); //Get task for cancellation @@ -420,88 +192,6 @@ namespace VNLib.Data.Caching.Extensions } /// <summary> - /// Discovers all available nodes for the current client config - /// </summary> - /// <param name="client"></param> - /// <param name="cancellation">A token to cancel the operation</param> - /// <returns>A task that completes when all nodes have been discovered</returns> - public static Task DiscoverAvailableNodesAsync(this FBMClientFactory client, CancellationToken cancellation = default) - { - //Get stored config - CacheClientConfiguration conf = ClientCacheConfig.GetOrCreateValue(client); - - //Discover all nodes - return conf.DiscoverNodesAsync(cancellation); - } - - /// <summary> - /// Connects to a random server from the servers discovered during a cache server discovery - /// </summary> - /// <param name="client"></param> - /// <param name="cancellation">A token to cancel the operation</param> - /// <returns>The server that the connection was made with</returns> - /// <exception cref="FBMException"></exception> - /// <exception cref="FBMServerNegiationException"></exception> - /// <exception cref="ArgumentException"></exception> - /// <exception cref="ArgumentNullException"></exception> - /// <exception cref="SecurityException"></exception> - /// <exception cref="ObjectDisposedException"></exception> - public static async Task<CacheNodeAdvertisment> ConnectToRandomCacheAsync(this FBMClientFactory client, CancellationToken cancellation = default) - { - //Get stored config - CacheClientConfiguration conf = ClientCacheConfig.GetOrCreateValue(client); - - //Get all available nodes, or at least the initial peers - CacheNodeAdvertisment[]? adverts = conf.NodeCollection.GetAllNodes() ?? throw new ArgumentException("No cache nodes discovered, cannot connect"); - - //Select random node from all available nodes - CacheNodeAdvertisment randomServer = adverts.SelectRandom(); - - //Connect to the random server - await ConnectToCacheAsync(client, randomServer, cancellation); - - //Return the random server we connected to - return randomServer; - } - - /// <summary> - /// Connects to the specified server on the configured cache client - /// </summary> - /// <param name="factory"></param> - /// <param name="server">The server to connect to</param> - /// <param name="token">A token to cancel the operation</param> - /// <returns>A task that resolves when the client is connected to the cache server</returns> - /// <exception cref="FBMException"></exception> - /// <exception cref="FBMServerNegiationException"></exception> - /// <exception cref="ArgumentException"></exception> - /// <exception cref="ArgumentNullException"></exception> - /// <exception cref="SecurityException"></exception> - /// <exception cref="ObjectDisposedException"></exception> - public static async Task<FBMClient> ConnectToCacheAsync(this FBMClientFactory factory, CacheNodeAdvertisment server, CancellationToken token = default) - { - _ = factory ?? throw new ArgumentNullException(nameof(factory)); - _ = server ?? throw new ArgumentNullException(nameof(server)); - - //Get stored config - CacheClientConfiguration conf = ClientCacheConfig.GetOrCreateValue(factory); - - //Create new client - FBMClient client = factory.CreateClient(); - - try - { - //Connect to server (no server id because client not replication server) - await ConnectToCacheAsync(client, conf, server, token); - return client; - } - catch - { - client.Dispose(); - throw; - } - } - - /// <summary> /// Connects to the specified server on the configured cache client /// </summary> /// <param name="client"></param> @@ -517,9 +207,9 @@ namespace VNLib.Data.Caching.Extensions /// <exception cref="ObjectDisposedException"></exception> public static Task ConnectToCacheAsync(this FBMClient client, CacheNodeAdvertisment server, CacheClientConfiguration explicitConfig, CancellationToken token = default) { - _ = client ?? throw new ArgumentNullException(nameof(client)); - _ = server ?? throw new ArgumentNullException(nameof(server)); - + ArgumentNullException.ThrowIfNull(client); + ArgumentNullException.ThrowIfNull(server); + //Connect to server (no server id because client not replication server) return ConnectToCacheAsync(client, explicitConfig, server, token); } @@ -546,7 +236,7 @@ namespace VNLib.Data.Caching.Extensions NegotationRequest req = new(server.ConnectEndpoint, config); //Exec negotiation - RestResponse response = await SiteAdapter.Value.ExecuteAsync(req, token); + RestResponse response = await CacheSiteAdapter.Instance.ExecuteAsync(req, token); /* * JWT will already be veified by the endpoint adapter, so we @@ -582,6 +272,12 @@ namespace VNLib.Data.Caching.Extensions Scheme = config.UseTls ? "wss://" : "ws://" }; + //if the server is specifying https urls, then attempt to upgrade to wss + if (server.ConnectEndpoint.Scheme == Uri.UriSchemeHttps) + { + uriBuilder.Scheme = "wss://"; + } + //Connect async await client.ConnectAsync(uriBuilder.Uri, token); } @@ -658,7 +354,7 @@ namespace VNLib.Data.Caching.Extensions /// <exception cref="CryptographicException"></exception> public static bool VerifyUpgradeToken(this ICacheAuthManager man, string signature, string token, bool isPeer) { - _ = man ?? throw new ArgumentNullException(nameof(man)); + ArgumentNullException.ThrowIfNull(man); //get the hash of the token byte[] hash = ManagedHash.ComputeHash(token, HashAlg.SHA256); @@ -704,8 +400,15 @@ namespace VNLib.Data.Caching.Extensions /// <param name="message">The advertisment message to verify</param> /// <returns>The advertisment message if successfully verified, or null otherwise</returns> /// <exception cref="FormatException"></exception> - public static CacheNodeAdvertisment? VerifyPeerAdvertisment(this ICacheAuthManager config, string message) + public static CacheNodeAdvertisment? VerifyPeerAdvertisment(this ICacheAuthManager config, string? message) { + ArgumentNullException.ThrowIfNull(config); + + if (string.IsNullOrWhiteSpace(message)) + { + return null; + } + using JsonWebToken jwt = JsonWebToken.Parse(message); //Verify the signature diff --git a/lib/VNLib.Data.Caching.Extensions/src/VNCacheClusterClient.cs b/lib/VNLib.Data.Caching.Extensions/src/VNCacheClusterClient.cs new file mode 100644 index 0000000..050e1a3 --- /dev/null +++ b/lib/VNLib.Data.Caching.Extensions/src/VNCacheClusterClient.cs @@ -0,0 +1,78 @@ +/* +* Copyright (c) 2024 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Data.Caching.Extensions +* File: VNCacheClusterClient.cs +* +* VNCacheClusterClient.cs is part of VNLib.Data.Caching.Extensions which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Data.Caching.Extensions is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Data.Caching.Extensions is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Security; +using System.Threading; +using System.Threading.Tasks; + +using VNLib.Net.Messaging.FBM; +using VNLib.Net.Messaging.FBM.Client; +using VNLib.Data.Caching.Extensions.Clustering; + +namespace VNLib.Data.Caching.Extensions +{ + /// <summary> + /// Manages client connections to cluster nodes with discovery from <see cref="VNCacheClusterManager"/> + /// instance. + /// </summary> + /// <param name="config">The client configuration to use when discovering or connecting to cache nodes</param> + /// <param name="factory">The fbm client factory instance</param> + public class VNCacheClusterClient(CacheClientConfiguration config, FBMClientFactory factory) + : VNCacheClusterManager(config) + { + + /// <summary> + /// Connects to the specified server on the configured cache client + /// </summary> + /// <param name="factory"></param> + /// <param name="server">The server to connect to</param> + /// <param name="token">A token to cancel the operation</param> + /// <returns>A task that resolves when the client is connected to the cache server</returns> + /// <exception cref="FBMException"></exception> + /// <exception cref="FBMServerNegiationException"></exception> + /// <exception cref="ArgumentException"></exception> + /// <exception cref="ArgumentNullException"></exception> + /// <exception cref="SecurityException"></exception> + /// <exception cref="ObjectDisposedException"></exception> + public async Task<FBMClient> ConnectToCacheAsync(CacheNodeAdvertisment server, CancellationToken token = default) + { + ArgumentNullException.ThrowIfNull(server); + + FBMClient client = factory.CreateClient(); + + try + { + //Connect to server (no server id because client not replication server) + await client.ConnectToCacheAsync(server, config, token); + return client; + } + catch + { + client.Dispose(); + throw; + } + } + } +} diff --git a/lib/VNLib.Data.Caching.Extensions/src/VNCacheClusterManager.cs b/lib/VNLib.Data.Caching.Extensions/src/VNCacheClusterManager.cs new file mode 100644 index 0000000..f68968d --- /dev/null +++ b/lib/VNLib.Data.Caching.Extensions/src/VNCacheClusterManager.cs @@ -0,0 +1,306 @@ +/* +* Copyright (c) 2024 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Data.Caching.Extensions +* File: VNCacheClusterManager.cs +* +* VNCacheClusterManager.cs is part of VNLib.Data.Caching.Extensions +* which is part of the larger VNLib collection of libraries and utilities. +* +* VNLib.Data.Caching.Extensions is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Data.Caching.Extensions is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Linq; +using System.Security; +using System.Text.Json; +using System.Threading; +using System.Threading.Tasks; +using System.Collections.Generic; + +using RestSharp; + +using VNLib.Hashing.IdentityUtility; +using VNLib.Net.Rest.Client.Construction; +using VNLib.Data.Caching.Extensions.ApiModel; +using VNLib.Data.Caching.Extensions.Clustering; + +namespace VNLib.Data.Caching.Extensions +{ + + /// <summary> + /// A VNCache cluster client discovery maanger. Used to simplify the discovery + /// of cache nodes + /// </summary> + /// <param name="config">The client configuration instance</param> + public class VNCacheClusterManager(CacheClientConfiguration config) + { + /// <summary> + /// The internal collection of discovered nodes + /// </summary> + protected NodeDiscoveryCollection NodeCollection { get; } = GetNodeCollection(config); + + /// <summary> + /// Gets the collection of discovered nodes within the manager + /// </summary> + public INodeDiscoveryCollection DiscoveredNodes => NodeCollection; + + /// <summary> + /// The underlying <see cref="CacheClientConfiguration"/> instance + /// </summary> + public CacheClientConfiguration Config => config; + + /// <summary> + /// Adds an array of nodes manually to the collection of discovered cluster nodes + /// </summary> + /// <param name="nodes"></param> + public void AddManualNodes(params CacheNodeAdvertisment[] nodes) => AddManualNodes(nodes.AsEnumerable()); + + /// <summary> + /// Adds an array of nodes manually to the collection of discovered cluster nodes + /// </summary> + /// <param name="nodes"></param> + public void AddManualNodes(IEnumerable<CacheNodeAdvertisment> nodes) => NodeCollection.AddManualNodes(nodes); + + /// <summary> + /// Removes an array of nodes manually from the collection of discovered cluster nodes + /// </summary> + /// <param name="nodes"></param> + public void RemoveManualNodes(params CacheNodeAdvertisment[] nodes) => RemoveManualNodes(nodes.AsEnumerable()); + + /// <summary> + /// Removes an array of nodes manually from the collection of discovered cluster nodes + /// </summary> + /// <param name="nodes"></param> + public void RemoveManualNodes(IEnumerable<CacheNodeAdvertisment> nodes) => NodeCollection.RemoveManualNodes(nodes); + + /// <summary> + /// Resolves the initial well-known cache nodes into their advertisments + /// </summary> + /// <param name="cancellation">A token to cancel the operation</param> + /// <returns>An array of resolved nodes</returns> + public async Task<CacheNodeAdvertisment[]> ResolveWellKnownAsync(CancellationToken cancellation) + { + //Make sure at least one node defined + if (config?.WellKnownNodes == null || config.WellKnownNodes.Length == 0) + { + throw new ArgumentException("There must be at least one cache node defined in the client configuration"); + } + + Task<CacheNodeAdvertisment?>[] initialAdds = new Task<CacheNodeAdvertisment?>[config.WellKnownNodes.Length]; + + //Discover initial advertisments from well-known addresses + for (int i = 0; i < config.WellKnownNodes.Length; i++) + { + initialAdds[i] = DiscoverNodeConfigAsync(config.WellKnownNodes[i], cancellation); + } + + //Wait for all initial adds to complete + await Task.WhenAll(initialAdds); + + //Get the initial advertisments that arent null + return initialAdds.Select(static x => x.Result!).Where(static s => s != null).ToArray(); + } + + /// <summary> + /// Discovers ALL possible cache nodes itteritivley, first by collecting the configuration + /// from the initial peers. + /// This will make connections to all discoverable servers + /// </summary> + /// <param name="cancellation">A token to cancel the operation</param> + /// <returns></returns> + /// <exception cref="ArgumentException"></exception> + /// <exception cref="CacheDiscoveryFailureException"></exception> + /// <remarks> + /// This method simply combines the <see cref="ResolveWellKnownAsync"/> and <see cref="DiscoverNodesAsync"/> + /// methods into a single operation + /// </remarks> + public async Task DiscoverNodesAsync(CancellationToken cancellation) + { + //Make sure at least one node defined + if (config?.WellKnownNodes == null || config.WellKnownNodes.Length == 0) + { + throw new ArgumentException("There must be at least one cache node defined in the client configuration"); + } + + /* + * Connect to well-known nodes from the client configuration to discovery its layout. + * + */ + CacheNodeAdvertisment[] initialPeers = await ResolveWellKnownAsync(cancellation); + + if (initialPeers.Length == 0) + { + throw new CacheDiscoveryFailureException("There must be at least one available cache node to continue discovery"); + } + + await DiscoverNodesAsync(initialPeers, cancellation); + } + + /// <summary> + /// Discovers ALL possible cache nodes itteritivley from the current collection of initial peers. + /// This will make connections to all discoverable servers and update the client configuration, with all + /// discovered peers + /// </summary> + /// <param name="initialPeers">Accepts an array of initial peers to override the endpoint discovery process</param> + /// <param name="cancellation">A token to cancel the operation</param> + /// <returns>A task that completes when all nodes have been discovered</returns> + /// <exception cref="ArgumentException"></exception> + /// <exception cref="CacheDiscoveryFailureException"></exception> + public async Task DiscoverNodesAsync(CacheNodeAdvertisment[] initialPeers, CancellationToken cancellation) + { + //Make sure at least one node defined + ArgumentNullException.ThrowIfNull(initialPeers); + ArgumentOutOfRangeException.ThrowIfZero(initialPeers.Length); + + //Get the discovery enumerator with the initial peers + using INodeDiscoveryEnumerator enumerator = NodeCollection.BeginDiscovery(initialPeers); + + //Start the discovery process + await DiscoverNodesAsync(enumerator, config, config.ErrorHandler, cancellation); + + //Commit discovered nodes to stored node collection + NodeCollection.CompleteDiscovery(enumerator); + } + + private static async Task DiscoverNodesAsync( + INodeDiscoveryEnumerator enumerator, + CacheClientConfiguration config, + ICacheDiscoveryErrorHandler? errHandler, + CancellationToken cancellation + ) + { + //Loop through servers + while (enumerator.MoveNext()) + { + //Make sure the node has a discovery endpoint + if (enumerator.Current.DiscoveryEndpoint == null) + { + //Skip this node + continue; + } + + /* + * We are allowed to save nodes that do not have a discovery endpoint, but we cannot + * discover nodes from them we can only use them as cache + */ + + //add a random delay to avoid spamming servers + await Task.Delay((int)Random.Shared.NextInt64(100, 500), cancellation); + + try + { + //Discover nodes from the current node + CacheNodeAdvertisment[]? nodes = await GetCacheNodesAsync(enumerator.Current, config, cancellation); + + if (nodes != null) + { + //Add nodes to the collection + enumerator.OnPeerDiscoveryComplete(nodes); + } + } + //Catch exceptions when an error handler is defined + catch (Exception ex) when (errHandler != null) + { + //Handle the error + errHandler.OnDiscoveryError(enumerator.Current, ex); + } + catch (Exception ex) + { + throw new CacheDiscoveryFailureException($"Failed to discovery peer node {enumerator.Current?.NodeId}, cannot continue", ex); + } + } + } + + /// <summary> + /// Contacts the given server's discovery endpoint to discover a list of available + /// servers we can connect to + /// </summary> + /// <param name="advert">An advertisment of a server to discover other nodes from</param> + /// <param name="cancellationToken">A token to cancel the operationS</param> + /// <param name="config">The cache configuration object</param> + /// <returns>The list of active servers</returns> + /// <exception cref="SecurityException"></exception> + /// <exception cref="ArgumentException"></exception> + /// <exception cref="ArgumentNullException"></exception> + public static async Task<CacheNodeAdvertisment[]?> GetCacheNodesAsync(CacheNodeAdvertisment advert, CacheClientConfiguration config, CancellationToken cancellationToken = default) + { + ArgumentNullException.ThrowIfNull(advert); + ArgumentNullException.ThrowIfNull(config); + ArgumentNullException.ThrowIfNull(advert.DiscoveryEndpoint, nameof(advert.DiscoveryEndpoint)); + + DiscoveryRequest req = new (advert.DiscoveryEndpoint, config); + + //Site adapter verifies response messages so we dont need to check on the response + byte[] data = await CacheSiteAdapter.Instance.ExecuteAsync(req, cancellationToken).AsBytes() + ?? throw new InvalidOperationException($"No data returned from node {advert.NodeId}"); + + //Response is jwt + using JsonWebToken responseJwt = JsonWebToken.ParseRaw(data); + + using JsonDocument doc = responseJwt.GetPayload(); + return doc.RootElement.GetProperty("peers").Deserialize<CacheNodeAdvertisment[]>(); + } + + + /* + * This method will connect to a given well-known (cache config endpoint) and discover the + * servers configuration (endpoint config) + * + * This function exists so clients only need a single endpoint to connect to, and the server + * will return it's signed configuration data (including cluster network information) + */ + private async Task<CacheNodeAdvertisment?> DiscoverNodeConfigAsync(Uri serverUri, CancellationToken cancellation) + { + try + { + GetConfigRequest req = new (serverUri, config); + + //Site adapter verifies response messages so we dont need to check on the response + byte[] data = await CacheSiteAdapter.Instance.ExecuteAsync(req, cancellation).AsBytes() + ?? throw new CacheDiscoveryFailureException($"No data returned from desired cache node"); + + //Response is jwt + using JsonWebToken responseJwt = JsonWebToken.ParseRaw(data); + + //The entire payload is just the single serialzed advertisment + using JsonDocument doc = responseJwt.GetPayload(); + + return doc.RootElement.GetProperty("sub").Deserialize<CacheNodeAdvertisment>(); + } + //Bypass cdfe when error handler is null (avoid nesting)` + catch (CacheDiscoveryFailureException) when (config.ErrorHandler == null) + { + throw; + } + //Catch exceptions when an error handler is defined + catch (Exception ex) when (config.ErrorHandler != null) + { + //Handle the error + config.ErrorHandler.OnDiscoveryError(serverUri, ex); + return null; + } + catch (Exception ex) + { + throw new CacheDiscoveryFailureException("Failed to discover node configuration", ex); + } + } + + private static NodeDiscoveryCollection GetNodeCollection(CacheClientConfiguration config) + { + return config is CacheNodeConfiguration cnc ? new (cnc.NodeIdRef!) : new (null); + } + } +} |