From 4d8cfc10382105b0acbd94df93ad3d05ff91db54 Mon Sep 17 00:00:00 2001 From: vnugent Date: Wed, 6 Mar 2024 21:30:58 -0500 Subject: refactor: #2 Centralize server state, default discovery endpoints & more --- .../src/FBMDataCacheExtensions.cs | 361 ++------------------- 1 file changed, 32 insertions(+), 329 deletions(-) (limited to 'lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs') diff --git a/lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs b/lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs index bd86461..47aadd9 100644 --- a/lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs +++ b/lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs @@ -31,7 +31,6 @@ using System.Threading; using System.Threading.Tasks; using System.Collections.Generic; using System.Security.Cryptography; -using System.Runtime.CompilerServices; using RestSharp; @@ -74,22 +73,7 @@ namespace VNLib.Data.Caching.Extensions /// The advertisment header for cache node discovery /// public const string X_NODE_DISCOVERY_HEADER = "X-Cache-Node-Discovery"; - - /* - * Lazy to defer errors for debuggong - */ - private static readonly Lazy SiteAdapter = new(() => ConfigureAdapter(2)); - - private static CacheSiteAdapter ConfigureAdapter(int maxClients) - { - CacheSiteAdapter adapter = new(maxClients); - //Configure the site endpoints - adapter.BuildEndpoints(ServiceEndpoints.Definition); - return adapter; - } - - private static readonly ConditionalWeakTable ClientCacheConfig = new(); - + /// /// Gets a preconfigured object caching /// protocl @@ -101,7 +85,7 @@ namespace VNLib.Data.Caching.Extensions /// A preconfigured for object caching public static FBMClientConfig GetDefaultConfig(IUnmangedHeap heap, int maxMessageSize, TimeSpan timeout = default, ILogProvider? debugLog = null) { - return GetDefaultConfig(new FallbackFBMMemoryManager(heap), maxMessageSize, timeout, debugLog); + return GetDefaultConfig(new SharedHeapFBMMemoryManager(heap), maxMessageSize, timeout, debugLog); } /// @@ -153,236 +137,22 @@ namespace VNLib.Data.Caching.Extensions { client.Config.DebugLog?.Debug("{debug}: {data}", "[CACHE]", message); } - - /// - /// Discovers ALL possible cache nodes itteritivley, first by collecting the configuration - /// from the initial peers. - /// This will make connections to all discoverable servers - /// - /// - /// A token to cancel the operation - /// - /// - /// - public static async Task DiscoverNodesAsync(this CacheClientConfiguration config, CancellationToken cancellation) - { - //Make sure at least one node defined - if(config?.WellKnownNodes == null || config.WellKnownNodes.Length == 0) - { - throw new ArgumentException("There must be at least one cache node defined in the client configuration"); - } - - //Get the initial advertisments that arent null - CacheNodeAdvertisment[] initialPeers = await ResolveWellKnownAsync(config, cancellation); - - if (initialPeers.Length == 0) - { - throw new CacheDiscoveryFailureException("There must be at least one available cache node to continue discovery"); - } - - await DiscoverNodesAsync(config, initialPeers, cancellation); - } /// - /// Resolves the initial well-known cache nodes into their advertisments + /// Gets the discovery manager for the current client configuration. Just a + /// convience method. /// - /// - /// A token to cancel the operation - /// An array of resolved nodes - public static async Task ResolveWellKnownAsync(this CacheClientConfiguration config, CancellationToken cancellation) - { - _ = config?.WellKnownNodes ?? throw new ArgumentNullException(nameof(config)); - - Task[] initialAdds = new Task[config.WellKnownNodes.Length]; - - //Discover initial advertisments from well-known addresses - for (int i = 0; i < config.WellKnownNodes.Length; i++) - { - initialAdds[i] = DiscoverNodeConfigAsync(config.WellKnownNodes[i], config, cancellation); - } - - //Wait for all initial adds to complete - await Task.WhenAll(initialAdds); - - //Get the initial advertisments that arent null - return initialAdds.Select(static x => x.Result!).Where(static s => s != null).ToArray(); - } + /// + /// The new instance around your config + public static VNCacheClusterManager GetDiscoveryManager(this CacheClientConfiguration conf) => new(conf); /// - /// Discovers ALL possible cache nodes itteritivley from the current collection of initial peers. - /// This will make connections to all discoverable servers and update the client configuration, with all - /// discovered peers + /// Converts the cache client configuration to a cluster client /// /// - /// Accepts an array of initial peers to override the endpoint discovery process - /// A token to cancel the operation - /// A task that completes when all nodes have been discovered - /// - /// - public static async Task DiscoverNodesAsync(this CacheClientConfiguration config, CacheNodeAdvertisment[] initialPeers, CancellationToken cancellation) - { - //Make sure at least one node defined - if (initialPeers == null || initialPeers.Length == 0) - { - throw new ArgumentException("There must be at least one initial peer"); - } - - //Get the discovery enumerator with the initial peers - INodeDiscoveryEnumerator enumerator = config.NodeCollection.BeginDiscovery(initialPeers); - - //Start the discovery process - await DiscoverNodesAsync(enumerator, config, config.ErrorHandler, cancellation); - - //Commit nodes - config.NodeCollection.CompleteDiscovery(enumerator); - } - - private static async Task DiscoverNodesAsync( - INodeDiscoveryEnumerator enumerator, - CacheClientConfiguration config, - ICacheDiscoveryErrorHandler? errHandler, - CancellationToken cancellation - ) - { - //Loop through servers - while (enumerator.MoveNext()) - { - //Make sure the node has a discovery endpoint - if (enumerator.Current.DiscoveryEndpoint == null) - { - //Skip this node - continue; - } - - /* - * We are allowed to save nodes that do not have a discovery endpoint, but we cannot - * discover nodes from them we can only use them as cache - */ - - //add a random delay to avoid spamming servers - await Task.Delay((int)Random.Shared.NextInt64(100, 500), cancellation); - - try - { - //Discover nodes from the current node - CacheNodeAdvertisment[]? nodes = await GetCacheNodesAsync(enumerator.Current, config, cancellation); - - if (nodes != null) - { - //Add nodes to the collection - enumerator.OnPeerDiscoveryComplete(nodes); - } - } - //Catch exceptions when an error handler is defined - catch(Exception ex) when (errHandler != null) - { - //Handle the error - errHandler.OnDiscoveryError(enumerator.Current, ex); - } - catch(Exception ex) - { - throw new CacheDiscoveryFailureException($"Failed to discovery peer node {enumerator.Current?.NodeId}, cannot continue", ex); - } - } - } - - private static async Task DiscoverNodeConfigAsync(Uri serverUri, CacheClientConfiguration config, CancellationToken cancellation) - { - try - { - GetConfigRequest req = new (serverUri, config); - - //Site adapter verifies response messages so we dont need to check on the response - byte[] data = await SiteAdapter.Value.ExecuteAsync(req, cancellation).AsBytes() - ?? throw new CacheDiscoveryFailureException($"No data returned from desired cache node"); - - //Response is jwt - using JsonWebToken responseJwt = JsonWebToken.ParseRaw(data); - - //The entire payload is just the single serialzed advertisment - using JsonDocument doc = responseJwt.GetPayload(); - - return doc.RootElement.GetProperty("sub").Deserialize(); - } - //Bypass cdfe when error handler is null - catch(CacheDiscoveryFailureException) when(config.ErrorHandler == null) - { - throw; - } - //Catch exceptions when an error handler is defined - catch (Exception ex) when (config.ErrorHandler != null) - { - //Handle the error - config.ErrorHandler.OnDiscoveryError(null!, ex); - return null; - } - catch (Exception ex) - { - throw new CacheDiscoveryFailureException("Failed to discover node configuration", ex); - } - } - - /// - /// Contacts the given server's discovery endpoint to discover a list of available - /// servers we can connect to - /// - /// An advertisment of a server to discover other nodes from - /// A token to cancel the operationS - /// The cache configuration object - /// The list of active servers - /// - /// - /// - public static async Task GetCacheNodesAsync(CacheNodeAdvertisment advert, CacheClientConfiguration config, CancellationToken cancellationToken = default) - { - _ = advert ?? throw new ArgumentNullException(nameof(advert)); - _ = config ?? throw new ArgumentNullException(nameof(config)); - _ = advert.DiscoveryEndpoint ?? throw new ArgumentException("Advertisment does not expose an advertisment endpoint"); - - DiscoveryRequest req = new (advert.DiscoveryEndpoint, config); - - //Site adapter verifies response messages so we dont need to check on the response - byte[] data = await SiteAdapter.Value.ExecuteAsync(req, cancellationToken).AsBytes() - ?? throw new InvalidOperationException($"No data returned from node {advert.NodeId}"); - - //Response is jwt - using JsonWebToken responseJwt = JsonWebToken.ParseRaw(data); - - using JsonDocument doc = responseJwt.GetPayload(); - return doc.RootElement.GetProperty("peers").Deserialize(); - } - - /// - /// Allows for configuration of an - /// for a connection to a cache server - /// - /// - /// A fluent api configuration builder for the current client - public static CacheClientConfiguration GetCacheConfiguration(this FBMClientFactory client) => ClientCacheConfig.GetOrCreateValue(client); - - /// - /// Explicitly set the client cache configuration for the current client - /// - /// - /// The cache node configuration - /// The config instance - public static CacheClientConfiguration SetCacheConfiguration(this FBMClientFactory client, CacheClientConfiguration config) - { - ClientCacheConfig.AddOrUpdate(client, config); - return config; - } - - /// - /// Explicitly set the cache node configuration for the current client - /// - /// - /// The cache node configuration - /// The config instance - public static CacheNodeConfiguration SetCacheConfiguration(this FBMClientFactory client, CacheNodeConfiguration nodeConfig) - { - ClientCacheConfig.AddOrUpdate(client, nodeConfig); - return nodeConfig; - } + /// The FBM client factory instance to use + /// The new cluster client instance + public static VNCacheClusterClient ToClusterClient(this CacheClientConfiguration config, FBMClientFactory factory) => new(config, factory); /// /// Waits for the client to disconnect from the server while observing @@ -396,6 +166,8 @@ namespace VNLib.Data.Caching.Extensions /// A task that complets when the connecion has been closed successfully public static async Task WaitForExitAsync(this FBMClient client, CancellationToken token = default) { + ArgumentNullException.ThrowIfNull(client); + client.LogDebug("Waiting for cache client to exit"); //Get task for cancellation @@ -419,88 +191,6 @@ namespace VNLib.Data.Caching.Extensions } } - /// - /// Discovers all available nodes for the current client config - /// - /// - /// A token to cancel the operation - /// A task that completes when all nodes have been discovered - public static Task DiscoverAvailableNodesAsync(this FBMClientFactory client, CancellationToken cancellation = default) - { - //Get stored config - CacheClientConfiguration conf = ClientCacheConfig.GetOrCreateValue(client); - - //Discover all nodes - return conf.DiscoverNodesAsync(cancellation); - } - - /// - /// Connects to a random server from the servers discovered during a cache server discovery - /// - /// - /// A token to cancel the operation - /// The server that the connection was made with - /// - /// - /// - /// - /// - /// - public static async Task ConnectToRandomCacheAsync(this FBMClientFactory client, CancellationToken cancellation = default) - { - //Get stored config - CacheClientConfiguration conf = ClientCacheConfig.GetOrCreateValue(client); - - //Get all available nodes, or at least the initial peers - CacheNodeAdvertisment[]? adverts = conf.NodeCollection.GetAllNodes() ?? throw new ArgumentException("No cache nodes discovered, cannot connect"); - - //Select random node from all available nodes - CacheNodeAdvertisment randomServer = adverts.SelectRandom(); - - //Connect to the random server - await ConnectToCacheAsync(client, randomServer, cancellation); - - //Return the random server we connected to - return randomServer; - } - - /// - /// Connects to the specified server on the configured cache client - /// - /// - /// The server to connect to - /// A token to cancel the operation - /// A task that resolves when the client is connected to the cache server - /// - /// - /// - /// - /// - /// - public static async Task ConnectToCacheAsync(this FBMClientFactory factory, CacheNodeAdvertisment server, CancellationToken token = default) - { - _ = factory ?? throw new ArgumentNullException(nameof(factory)); - _ = server ?? throw new ArgumentNullException(nameof(server)); - - //Get stored config - CacheClientConfiguration conf = ClientCacheConfig.GetOrCreateValue(factory); - - //Create new client - FBMClient client = factory.CreateClient(); - - try - { - //Connect to server (no server id because client not replication server) - await ConnectToCacheAsync(client, conf, server, token); - return client; - } - catch - { - client.Dispose(); - throw; - } - } - /// /// Connects to the specified server on the configured cache client /// @@ -517,9 +207,9 @@ namespace VNLib.Data.Caching.Extensions /// public static Task ConnectToCacheAsync(this FBMClient client, CacheNodeAdvertisment server, CacheClientConfiguration explicitConfig, CancellationToken token = default) { - _ = client ?? throw new ArgumentNullException(nameof(client)); - _ = server ?? throw new ArgumentNullException(nameof(server)); - + ArgumentNullException.ThrowIfNull(client); + ArgumentNullException.ThrowIfNull(server); + //Connect to server (no server id because client not replication server) return ConnectToCacheAsync(client, explicitConfig, server, token); } @@ -546,7 +236,7 @@ namespace VNLib.Data.Caching.Extensions NegotationRequest req = new(server.ConnectEndpoint, config); //Exec negotiation - RestResponse response = await SiteAdapter.Value.ExecuteAsync(req, token); + RestResponse response = await CacheSiteAdapter.Instance.ExecuteAsync(req, token); /* * JWT will already be veified by the endpoint adapter, so we @@ -582,6 +272,12 @@ namespace VNLib.Data.Caching.Extensions Scheme = config.UseTls ? "wss://" : "ws://" }; + //if the server is specifying https urls, then attempt to upgrade to wss + if (server.ConnectEndpoint.Scheme == Uri.UriSchemeHttps) + { + uriBuilder.Scheme = "wss://"; + } + //Connect async await client.ConnectAsync(uriBuilder.Uri, token); } @@ -658,7 +354,7 @@ namespace VNLib.Data.Caching.Extensions /// public static bool VerifyUpgradeToken(this ICacheAuthManager man, string signature, string token, bool isPeer) { - _ = man ?? throw new ArgumentNullException(nameof(man)); + ArgumentNullException.ThrowIfNull(man); //get the hash of the token byte[] hash = ManagedHash.ComputeHash(token, HashAlg.SHA256); @@ -704,8 +400,15 @@ namespace VNLib.Data.Caching.Extensions /// The advertisment message to verify /// The advertisment message if successfully verified, or null otherwise /// - public static CacheNodeAdvertisment? VerifyPeerAdvertisment(this ICacheAuthManager config, string message) + public static CacheNodeAdvertisment? VerifyPeerAdvertisment(this ICacheAuthManager config, string? message) { + ArgumentNullException.ThrowIfNull(config); + + if (string.IsNullOrWhiteSpace(message)) + { + return null; + } + using JsonWebToken jwt = JsonWebToken.Parse(message); //Verify the signature -- cgit