aboutsummaryrefslogtreecommitdiff
path: root/lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs
diff options
context:
space:
mode:
Diffstat (limited to 'lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs')
-rw-r--r--lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs361
1 files changed, 32 insertions, 329 deletions
diff --git a/lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs b/lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs
index bd86461..47aadd9 100644
--- a/lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs
+++ b/lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs
@@ -31,7 +31,6 @@ using System.Threading;
using System.Threading.Tasks;
using System.Collections.Generic;
using System.Security.Cryptography;
-using System.Runtime.CompilerServices;
using RestSharp;
@@ -74,22 +73,7 @@ namespace VNLib.Data.Caching.Extensions
/// The advertisment header for cache node discovery
/// </summary>
public const string X_NODE_DISCOVERY_HEADER = "X-Cache-Node-Discovery";
-
- /*
- * Lazy to defer errors for debuggong
- */
- private static readonly Lazy<CacheSiteAdapter> SiteAdapter = new(() => ConfigureAdapter(2));
-
- private static CacheSiteAdapter ConfigureAdapter(int maxClients)
- {
- CacheSiteAdapter adapter = new(maxClients);
- //Configure the site endpoints
- adapter.BuildEndpoints(ServiceEndpoints.Definition);
- return adapter;
- }
-
- private static readonly ConditionalWeakTable<FBMClientFactory, CacheClientConfiguration> ClientCacheConfig = new();
-
+
/// <summary>
/// Gets a <see cref="FBMClientConfig"/> preconfigured object caching
/// protocl
@@ -101,7 +85,7 @@ namespace VNLib.Data.Caching.Extensions
/// <returns>A preconfigured <see cref="FBMClientConfig"/> for object caching</returns>
public static FBMClientConfig GetDefaultConfig(IUnmangedHeap heap, int maxMessageSize, TimeSpan timeout = default, ILogProvider? debugLog = null)
{
- return GetDefaultConfig(new FallbackFBMMemoryManager(heap), maxMessageSize, timeout, debugLog);
+ return GetDefaultConfig(new SharedHeapFBMMemoryManager(heap), maxMessageSize, timeout, debugLog);
}
/// <summary>
@@ -153,236 +137,22 @@ namespace VNLib.Data.Caching.Extensions
{
client.Config.DebugLog?.Debug("{debug}: {data}", "[CACHE]", message);
}
-
- /// <summary>
- /// Discovers ALL possible cache nodes itteritivley, first by collecting the configuration
- /// from the initial peers.
- /// This will make connections to all discoverable servers
- /// </summary>
- /// <param name="config"></param>
- /// <param name="cancellation">A token to cancel the operation</param>
- /// <returns></returns>
- /// <exception cref="ArgumentException"></exception>
- /// <exception cref="CacheDiscoveryFailureException"></exception>
- public static async Task DiscoverNodesAsync(this CacheClientConfiguration config, CancellationToken cancellation)
- {
- //Make sure at least one node defined
- if(config?.WellKnownNodes == null || config.WellKnownNodes.Length == 0)
- {
- throw new ArgumentException("There must be at least one cache node defined in the client configuration");
- }
-
- //Get the initial advertisments that arent null
- CacheNodeAdvertisment[] initialPeers = await ResolveWellKnownAsync(config, cancellation);
-
- if (initialPeers.Length == 0)
- {
- throw new CacheDiscoveryFailureException("There must be at least one available cache node to continue discovery");
- }
-
- await DiscoverNodesAsync(config, initialPeers, cancellation);
- }
/// <summary>
- /// Resolves the initial well-known cache nodes into their advertisments
+ /// Gets the discovery manager for the current client configuration. Just a
+ /// convience method.
/// </summary>
- /// <param name="config"></param>
- /// <param name="cancellation">A token to cancel the operation</param>
- /// <returns>An array of resolved nodes</returns>
- public static async Task<CacheNodeAdvertisment[]> ResolveWellKnownAsync(this CacheClientConfiguration config, CancellationToken cancellation)
- {
- _ = config?.WellKnownNodes ?? throw new ArgumentNullException(nameof(config));
-
- Task<CacheNodeAdvertisment?>[] initialAdds = new Task<CacheNodeAdvertisment?>[config.WellKnownNodes.Length];
-
- //Discover initial advertisments from well-known addresses
- for (int i = 0; i < config.WellKnownNodes.Length; i++)
- {
- initialAdds[i] = DiscoverNodeConfigAsync(config.WellKnownNodes[i], config, cancellation);
- }
-
- //Wait for all initial adds to complete
- await Task.WhenAll(initialAdds);
-
- //Get the initial advertisments that arent null
- return initialAdds.Select(static x => x.Result!).Where(static s => s != null).ToArray();
- }
+ /// <param name="conf"></param>
+ /// <returns>The new <see cref="VNCacheClusterManager"/> instance around your config</returns>
+ public static VNCacheClusterManager GetDiscoveryManager(this CacheClientConfiguration conf) => new(conf);
/// <summary>
- /// Discovers ALL possible cache nodes itteritivley from the current collection of initial peers.
- /// This will make connections to all discoverable servers and update the client configuration, with all
- /// discovered peers
+ /// Converts the cache client configuration to a cluster client
/// </summary>
/// <param name="config"></param>
- /// <param name="initialPeers">Accepts an array of initial peers to override the endpoint discovery process</param>
- /// <param name="cancellation">A token to cancel the operation</param>
- /// <returns>A task that completes when all nodes have been discovered</returns>
- /// <exception cref="ArgumentException"></exception>
- /// <exception cref="CacheDiscoveryFailureException"></exception>
- public static async Task DiscoverNodesAsync(this CacheClientConfiguration config, CacheNodeAdvertisment[] initialPeers, CancellationToken cancellation)
- {
- //Make sure at least one node defined
- if (initialPeers == null || initialPeers.Length == 0)
- {
- throw new ArgumentException("There must be at least one initial peer");
- }
-
- //Get the discovery enumerator with the initial peers
- INodeDiscoveryEnumerator enumerator = config.NodeCollection.BeginDiscovery(initialPeers);
-
- //Start the discovery process
- await DiscoverNodesAsync(enumerator, config, config.ErrorHandler, cancellation);
-
- //Commit nodes
- config.NodeCollection.CompleteDiscovery(enumerator);
- }
-
- private static async Task DiscoverNodesAsync(
- INodeDiscoveryEnumerator enumerator,
- CacheClientConfiguration config,
- ICacheDiscoveryErrorHandler? errHandler,
- CancellationToken cancellation
- )
- {
- //Loop through servers
- while (enumerator.MoveNext())
- {
- //Make sure the node has a discovery endpoint
- if (enumerator.Current.DiscoveryEndpoint == null)
- {
- //Skip this node
- continue;
- }
-
- /*
- * We are allowed to save nodes that do not have a discovery endpoint, but we cannot
- * discover nodes from them we can only use them as cache
- */
-
- //add a random delay to avoid spamming servers
- await Task.Delay((int)Random.Shared.NextInt64(100, 500), cancellation);
-
- try
- {
- //Discover nodes from the current node
- CacheNodeAdvertisment[]? nodes = await GetCacheNodesAsync(enumerator.Current, config, cancellation);
-
- if (nodes != null)
- {
- //Add nodes to the collection
- enumerator.OnPeerDiscoveryComplete(nodes);
- }
- }
- //Catch exceptions when an error handler is defined
- catch(Exception ex) when (errHandler != null)
- {
- //Handle the error
- errHandler.OnDiscoveryError(enumerator.Current, ex);
- }
- catch(Exception ex)
- {
- throw new CacheDiscoveryFailureException($"Failed to discovery peer node {enumerator.Current?.NodeId}, cannot continue", ex);
- }
- }
- }
-
- private static async Task<CacheNodeAdvertisment?> DiscoverNodeConfigAsync(Uri serverUri, CacheClientConfiguration config, CancellationToken cancellation)
- {
- try
- {
- GetConfigRequest req = new (serverUri, config);
-
- //Site adapter verifies response messages so we dont need to check on the response
- byte[] data = await SiteAdapter.Value.ExecuteAsync(req, cancellation).AsBytes()
- ?? throw new CacheDiscoveryFailureException($"No data returned from desired cache node");
-
- //Response is jwt
- using JsonWebToken responseJwt = JsonWebToken.ParseRaw(data);
-
- //The entire payload is just the single serialzed advertisment
- using JsonDocument doc = responseJwt.GetPayload();
-
- return doc.RootElement.GetProperty("sub").Deserialize<CacheNodeAdvertisment>();
- }
- //Bypass cdfe when error handler is null
- catch(CacheDiscoveryFailureException) when(config.ErrorHandler == null)
- {
- throw;
- }
- //Catch exceptions when an error handler is defined
- catch (Exception ex) when (config.ErrorHandler != null)
- {
- //Handle the error
- config.ErrorHandler.OnDiscoveryError(null!, ex);
- return null;
- }
- catch (Exception ex)
- {
- throw new CacheDiscoveryFailureException("Failed to discover node configuration", ex);
- }
- }
-
- /// <summary>
- /// Contacts the given server's discovery endpoint to discover a list of available
- /// servers we can connect to
- /// </summary>
- /// <param name="advert">An advertisment of a server to discover other nodes from</param>
- /// <param name="cancellationToken">A token to cancel the operationS</param>
- /// <param name="config">The cache configuration object</param>
- /// <returns>The list of active servers</returns>
- /// <exception cref="SecurityException"></exception>
- /// <exception cref="ArgumentException"></exception>
- /// <exception cref="ArgumentNullException"></exception>
- public static async Task<CacheNodeAdvertisment[]?> GetCacheNodesAsync(CacheNodeAdvertisment advert, CacheClientConfiguration config, CancellationToken cancellationToken = default)
- {
- _ = advert ?? throw new ArgumentNullException(nameof(advert));
- _ = config ?? throw new ArgumentNullException(nameof(config));
- _ = advert.DiscoveryEndpoint ?? throw new ArgumentException("Advertisment does not expose an advertisment endpoint");
-
- DiscoveryRequest req = new (advert.DiscoveryEndpoint, config);
-
- //Site adapter verifies response messages so we dont need to check on the response
- byte[] data = await SiteAdapter.Value.ExecuteAsync(req, cancellationToken).AsBytes()
- ?? throw new InvalidOperationException($"No data returned from node {advert.NodeId}");
-
- //Response is jwt
- using JsonWebToken responseJwt = JsonWebToken.ParseRaw(data);
-
- using JsonDocument doc = responseJwt.GetPayload();
- return doc.RootElement.GetProperty("peers").Deserialize<CacheNodeAdvertisment[]>();
- }
-
- /// <summary>
- /// Allows for configuration of an <see cref="FBMClient"/>
- /// for a connection to a cache server
- /// </summary>
- /// <param name="client"></param>
- /// <returns>A fluent api configuration builder for the current client</returns>
- public static CacheClientConfiguration GetCacheConfiguration(this FBMClientFactory client) => ClientCacheConfig.GetOrCreateValue(client);
-
- /// <summary>
- /// Explicitly set the client cache configuration for the current client
- /// </summary>
- /// <param name="client"></param>
- /// <param name="config">The cache node configuration</param>
- /// <returns>The config instance</returns>
- public static CacheClientConfiguration SetCacheConfiguration(this FBMClientFactory client, CacheClientConfiguration config)
- {
- ClientCacheConfig.AddOrUpdate(client, config);
- return config;
- }
-
- /// <summary>
- /// Explicitly set the cache node configuration for the current client
- /// </summary>
- /// <param name="client"></param>
- /// <param name="nodeConfig">The cache node configuration</param>
- /// <returns>The config instance</returns>
- public static CacheNodeConfiguration SetCacheConfiguration(this FBMClientFactory client, CacheNodeConfiguration nodeConfig)
- {
- ClientCacheConfig.AddOrUpdate(client, nodeConfig);
- return nodeConfig;
- }
+ /// <param name="factory">The FBM client factory instance to use</param>
+ /// <returns>The new cluster client instance</returns>
+ public static VNCacheClusterClient ToClusterClient(this CacheClientConfiguration config, FBMClientFactory factory) => new(config, factory);
/// <summary>
/// Waits for the client to disconnect from the server while observing
@@ -396,6 +166,8 @@ namespace VNLib.Data.Caching.Extensions
/// <returns>A task that complets when the connecion has been closed successfully</returns>
public static async Task WaitForExitAsync(this FBMClient client, CancellationToken token = default)
{
+ ArgumentNullException.ThrowIfNull(client);
+
client.LogDebug("Waiting for cache client to exit");
//Get task for cancellation
@@ -420,88 +192,6 @@ namespace VNLib.Data.Caching.Extensions
}
/// <summary>
- /// Discovers all available nodes for the current client config
- /// </summary>
- /// <param name="client"></param>
- /// <param name="cancellation">A token to cancel the operation</param>
- /// <returns>A task that completes when all nodes have been discovered</returns>
- public static Task DiscoverAvailableNodesAsync(this FBMClientFactory client, CancellationToken cancellation = default)
- {
- //Get stored config
- CacheClientConfiguration conf = ClientCacheConfig.GetOrCreateValue(client);
-
- //Discover all nodes
- return conf.DiscoverNodesAsync(cancellation);
- }
-
- /// <summary>
- /// Connects to a random server from the servers discovered during a cache server discovery
- /// </summary>
- /// <param name="client"></param>
- /// <param name="cancellation">A token to cancel the operation</param>
- /// <returns>The server that the connection was made with</returns>
- /// <exception cref="FBMException"></exception>
- /// <exception cref="FBMServerNegiationException"></exception>
- /// <exception cref="ArgumentException"></exception>
- /// <exception cref="ArgumentNullException"></exception>
- /// <exception cref="SecurityException"></exception>
- /// <exception cref="ObjectDisposedException"></exception>
- public static async Task<CacheNodeAdvertisment> ConnectToRandomCacheAsync(this FBMClientFactory client, CancellationToken cancellation = default)
- {
- //Get stored config
- CacheClientConfiguration conf = ClientCacheConfig.GetOrCreateValue(client);
-
- //Get all available nodes, or at least the initial peers
- CacheNodeAdvertisment[]? adverts = conf.NodeCollection.GetAllNodes() ?? throw new ArgumentException("No cache nodes discovered, cannot connect");
-
- //Select random node from all available nodes
- CacheNodeAdvertisment randomServer = adverts.SelectRandom();
-
- //Connect to the random server
- await ConnectToCacheAsync(client, randomServer, cancellation);
-
- //Return the random server we connected to
- return randomServer;
- }
-
- /// <summary>
- /// Connects to the specified server on the configured cache client
- /// </summary>
- /// <param name="factory"></param>
- /// <param name="server">The server to connect to</param>
- /// <param name="token">A token to cancel the operation</param>
- /// <returns>A task that resolves when the client is connected to the cache server</returns>
- /// <exception cref="FBMException"></exception>
- /// <exception cref="FBMServerNegiationException"></exception>
- /// <exception cref="ArgumentException"></exception>
- /// <exception cref="ArgumentNullException"></exception>
- /// <exception cref="SecurityException"></exception>
- /// <exception cref="ObjectDisposedException"></exception>
- public static async Task<FBMClient> ConnectToCacheAsync(this FBMClientFactory factory, CacheNodeAdvertisment server, CancellationToken token = default)
- {
- _ = factory ?? throw new ArgumentNullException(nameof(factory));
- _ = server ?? throw new ArgumentNullException(nameof(server));
-
- //Get stored config
- CacheClientConfiguration conf = ClientCacheConfig.GetOrCreateValue(factory);
-
- //Create new client
- FBMClient client = factory.CreateClient();
-
- try
- {
- //Connect to server (no server id because client not replication server)
- await ConnectToCacheAsync(client, conf, server, token);
- return client;
- }
- catch
- {
- client.Dispose();
- throw;
- }
- }
-
- /// <summary>
/// Connects to the specified server on the configured cache client
/// </summary>
/// <param name="client"></param>
@@ -517,9 +207,9 @@ namespace VNLib.Data.Caching.Extensions
/// <exception cref="ObjectDisposedException"></exception>
public static Task ConnectToCacheAsync(this FBMClient client, CacheNodeAdvertisment server, CacheClientConfiguration explicitConfig, CancellationToken token = default)
{
- _ = client ?? throw new ArgumentNullException(nameof(client));
- _ = server ?? throw new ArgumentNullException(nameof(server));
-
+ ArgumentNullException.ThrowIfNull(client);
+ ArgumentNullException.ThrowIfNull(server);
+
//Connect to server (no server id because client not replication server)
return ConnectToCacheAsync(client, explicitConfig, server, token);
}
@@ -546,7 +236,7 @@ namespace VNLib.Data.Caching.Extensions
NegotationRequest req = new(server.ConnectEndpoint, config);
//Exec negotiation
- RestResponse response = await SiteAdapter.Value.ExecuteAsync(req, token);
+ RestResponse response = await CacheSiteAdapter.Instance.ExecuteAsync(req, token);
/*
* JWT will already be veified by the endpoint adapter, so we
@@ -582,6 +272,12 @@ namespace VNLib.Data.Caching.Extensions
Scheme = config.UseTls ? "wss://" : "ws://"
};
+ //if the server is specifying https urls, then attempt to upgrade to wss
+ if (server.ConnectEndpoint.Scheme == Uri.UriSchemeHttps)
+ {
+ uriBuilder.Scheme = "wss://";
+ }
+
//Connect async
await client.ConnectAsync(uriBuilder.Uri, token);
}
@@ -658,7 +354,7 @@ namespace VNLib.Data.Caching.Extensions
/// <exception cref="CryptographicException"></exception>
public static bool VerifyUpgradeToken(this ICacheAuthManager man, string signature, string token, bool isPeer)
{
- _ = man ?? throw new ArgumentNullException(nameof(man));
+ ArgumentNullException.ThrowIfNull(man);
//get the hash of the token
byte[] hash = ManagedHash.ComputeHash(token, HashAlg.SHA256);
@@ -704,8 +400,15 @@ namespace VNLib.Data.Caching.Extensions
/// <param name="message">The advertisment message to verify</param>
/// <returns>The advertisment message if successfully verified, or null otherwise</returns>
/// <exception cref="FormatException"></exception>
- public static CacheNodeAdvertisment? VerifyPeerAdvertisment(this ICacheAuthManager config, string message)
+ public static CacheNodeAdvertisment? VerifyPeerAdvertisment(this ICacheAuthManager config, string? message)
{
+ ArgumentNullException.ThrowIfNull(config);
+
+ if (string.IsNullOrWhiteSpace(message))
+ {
+ return null;
+ }
+
using JsonWebToken jwt = JsonWebToken.Parse(message);
//Verify the signature