/*
* Copyright (c) 2024 Vaughn Nugent
*
* Library: VNLib
* Package: VNLib.Data.Caching.Extensions
* File: VNCacheClusterManager.cs
*
* VNCacheClusterManager.cs is part of VNLib.Data.Caching.Extensions
* which is part of the larger VNLib collection of libraries and utilities.
*
* VNLib.Data.Caching.Extensions is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* VNLib.Data.Caching.Extensions is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see https://www.gnu.org/licenses/.
*/
using System;
using System.Linq;
using System.Security;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using System.Collections.Generic;
using RestSharp;
using VNLib.Hashing.IdentityUtility;
using VNLib.Net.Rest.Client.Construction;
using VNLib.Data.Caching.Extensions.ApiModel;
using VNLib.Data.Caching.Extensions.Clustering;
namespace VNLib.Data.Caching.Extensions
{
///
/// A VNCache cluster client discovery maanger. Used to simplify the discovery
/// of cache nodes
///
/// The client configuration instance
public class VNCacheClusterManager(CacheClientConfiguration config)
{
///
/// The internal collection of discovered nodes
///
protected NodeDiscoveryCollection NodeCollection { get; } = GetNodeCollection(config);
///
/// Gets the collection of discovered nodes within the manager
///
public INodeDiscoveryCollection DiscoveredNodes => NodeCollection;
///
/// The underlying instance
///
public CacheClientConfiguration Config => config;
///
/// Adds an array of nodes manually to the collection of discovered cluster nodes
///
///
public void AddManualNodes(params CacheNodeAdvertisment[] nodes) => AddManualNodes(nodes.AsEnumerable());
///
/// Adds an array of nodes manually to the collection of discovered cluster nodes
///
///
public void AddManualNodes(IEnumerable nodes) => NodeCollection.AddManualNodes(nodes);
///
/// Removes an array of nodes manually from the collection of discovered cluster nodes
///
///
public void RemoveManualNodes(params CacheNodeAdvertisment[] nodes) => RemoveManualNodes(nodes.AsEnumerable());
///
/// Removes an array of nodes manually from the collection of discovered cluster nodes
///
///
public void RemoveManualNodes(IEnumerable nodes) => NodeCollection.RemoveManualNodes(nodes);
///
/// Resolves the initial well-known cache nodes into their advertisments
///
/// A token to cancel the operation
/// An array of resolved nodes
public async Task ResolveWellKnownAsync(CancellationToken cancellation)
{
//Make sure at least one node defined
if (config?.WellKnownNodes == null || config.WellKnownNodes.Length == 0)
{
throw new ArgumentException("There must be at least one cache node defined in the client configuration");
}
Task[] initialAdds = new Task[config.WellKnownNodes.Length];
//Discover initial advertisments from well-known addresses
for (int i = 0; i < config.WellKnownNodes.Length; i++)
{
initialAdds[i] = DiscoverNodeConfigAsync(config.WellKnownNodes[i], cancellation);
}
//Wait for all initial adds to complete
await Task.WhenAll(initialAdds);
//Get the initial advertisments that arent null
return initialAdds.Select(static x => x.Result!).Where(static s => s != null).ToArray();
}
///
/// Discovers ALL possible cache nodes itteritivley, first by collecting the configuration
/// from the initial peers.
/// This will make connections to all discoverable servers
///
/// A token to cancel the operation
///
///
///
///
/// This method simply combines the and
/// methods into a single operation
///
public async Task DiscoverNodesAsync(CancellationToken cancellation)
{
//Make sure at least one node defined
if (config?.WellKnownNodes == null || config.WellKnownNodes.Length == 0)
{
throw new ArgumentException("There must be at least one cache node defined in the client configuration");
}
/*
* Connect to well-known nodes from the client configuration to discovery its layout.
*
*/
CacheNodeAdvertisment[] initialPeers = await ResolveWellKnownAsync(cancellation);
if (initialPeers.Length == 0)
{
throw new CacheDiscoveryFailureException("There must be at least one available cache node to continue discovery");
}
await DiscoverNodesAsync(initialPeers, cancellation);
}
///
/// Discovers ALL possible cache nodes itteritivley from the current collection of initial peers.
/// This will make connections to all discoverable servers and update the client configuration, with all
/// discovered peers
///
/// Accepts an array of initial peers to override the endpoint discovery process
/// A token to cancel the operation
/// A task that completes when all nodes have been discovered
///
///
public async Task DiscoverNodesAsync(CacheNodeAdvertisment[] initialPeers, CancellationToken cancellation)
{
//Make sure at least one node defined
ArgumentNullException.ThrowIfNull(initialPeers);
ArgumentOutOfRangeException.ThrowIfZero(initialPeers.Length);
//Get the discovery enumerator with the initial peers
using INodeDiscoveryEnumerator enumerator = NodeCollection.BeginDiscovery(initialPeers);
//Start the discovery process
await DiscoverNodesAsync(enumerator, config, config.ErrorHandler, cancellation);
//Commit discovered nodes to stored node collection
NodeCollection.CompleteDiscovery(enumerator);
}
private static async Task DiscoverNodesAsync(
INodeDiscoveryEnumerator enumerator,
CacheClientConfiguration config,
ICacheDiscoveryErrorHandler? errHandler,
CancellationToken cancellation
)
{
//Loop through servers
while (enumerator.MoveNext())
{
//Make sure the node has a discovery endpoint
if (enumerator.Current.DiscoveryEndpoint == null)
{
//Skip this node
continue;
}
/*
* We are allowed to save nodes that do not have a discovery endpoint, but we cannot
* discover nodes from them we can only use them as cache
*/
//add a random delay to avoid spamming servers
await Task.Delay((int)Random.Shared.NextInt64(100, 500), cancellation);
try
{
//Discover nodes from the current node
CacheNodeAdvertisment[]? nodes = await GetCacheNodesAsync(enumerator.Current, config, cancellation);
if (nodes != null)
{
//Add nodes to the collection
enumerator.OnPeerDiscoveryComplete(nodes);
}
}
//Catch exceptions when an error handler is defined
catch (Exception ex) when (errHandler != null)
{
//Handle the error
errHandler.OnDiscoveryError(enumerator.Current, ex);
}
catch (Exception ex)
{
throw new CacheDiscoveryFailureException($"Failed to discovery peer node {enumerator.Current?.NodeId}, cannot continue", ex);
}
}
}
///
/// Contacts the given server's discovery endpoint to discover a list of available
/// servers we can connect to
///
/// An advertisment of a server to discover other nodes from
/// A token to cancel the operationS
/// The cache configuration object
/// The list of active servers
///
///
///
public static async Task GetCacheNodesAsync(CacheNodeAdvertisment advert, CacheClientConfiguration config, CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(advert);
ArgumentNullException.ThrowIfNull(config);
ArgumentNullException.ThrowIfNull(advert.DiscoveryEndpoint, nameof(advert.DiscoveryEndpoint));
DiscoveryRequest req = new (advert.DiscoveryEndpoint, config);
//Site adapter verifies response messages so we dont need to check on the response
byte[] data = await CacheSiteAdapter.Instance.ExecuteAsync(req, cancellationToken).AsBytes()
?? throw new InvalidOperationException($"No data returned from node {advert.NodeId}");
//Response is jwt
using JsonWebToken responseJwt = JsonWebToken.ParseRaw(data);
using JsonDocument doc = responseJwt.GetPayload();
return doc.RootElement.GetProperty("peers").Deserialize();
}
/*
* This method will connect to a given well-known (cache config endpoint) and discover the
* servers configuration (endpoint config)
*
* This function exists so clients only need a single endpoint to connect to, and the server
* will return it's signed configuration data (including cluster network information)
*/
private async Task DiscoverNodeConfigAsync(Uri serverUri, CancellationToken cancellation)
{
try
{
GetConfigRequest req = new (serverUri, config);
//Site adapter verifies response messages so we dont need to check on the response
byte[] data = await CacheSiteAdapter.Instance.ExecuteAsync(req, cancellation).AsBytes()
?? throw new CacheDiscoveryFailureException($"No data returned from desired cache node");
//Response is jwt
using JsonWebToken responseJwt = JsonWebToken.ParseRaw(data);
//The entire payload is just the single serialzed advertisment
using JsonDocument doc = responseJwt.GetPayload();
return doc.RootElement.GetProperty("sub").Deserialize();
}
//Bypass cdfe when error handler is null (avoid nesting)`
catch (CacheDiscoveryFailureException) when (config.ErrorHandler == null)
{
throw;
}
//Catch exceptions when an error handler is defined
catch (Exception ex) when (config.ErrorHandler != null)
{
//Handle the error
config.ErrorHandler.OnDiscoveryError(serverUri, ex);
return null;
}
catch (Exception ex)
{
throw new CacheDiscoveryFailureException("Failed to discover node configuration", ex);
}
}
private static NodeDiscoveryCollection GetNodeCollection(CacheClientConfiguration config)
{
return config is CacheNodeConfiguration cnc ? new (cnc.NodeIdRef!) : new (null);
}
}
}