aboutsummaryrefslogtreecommitdiff
path: root/lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs
diff options
context:
space:
mode:
Diffstat (limited to 'lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs')
-rw-r--r--lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs317
1 files changed, 106 insertions, 211 deletions
diff --git a/lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs b/lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs
index 9efe16a..634b6de 100644
--- a/lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs
+++ b/lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs
@@ -139,91 +139,114 @@ namespace VNLib.Data.Caching.Extensions
client.Config.DebugLog?.Debug("{debug}: {data}", "[CACHE]", message);
}
+
/// <summary>
- /// Creats a new <see cref="CacheListServerRequest"/> from an existing <see cref="CacheClientConfiguration"/>
+ /// Discovers ALL possible cache nodes itteritivley from the current collection of initial peers.
+ /// This will make connections to all discoverable servers
/// </summary>
- /// <param name="conf">The prepared client configuration</param>
- /// <returns>The new <see cref="CacheListServerRequest"/></returns>
- public static CacheListServerRequest GetListMessage(this CacheClientConfiguration conf)
+ /// <param name="config"></param>
+ /// <param name="cancellation">A token to cancel the operation</param>
+ /// <returns></returns>
+ /// <exception cref="ArgumentException"></exception>
+ public static async Task DiscoverNodesAsync(this CacheClientConfiguration config, CancellationToken cancellation)
{
- return new(conf, conf.DiscoveryEndpoint);
+ //Make sure at least one node defined
+ if(config?.InitialPeers == null || config.InitialPeers.Length == 0)
+ {
+ throw new ArgumentException("There must be at least one cache server defined in the client configuration");
+ }
+
+ //Get the discovery enumerator with the initial peers
+ INodeDiscoveryEnumerator enumerator = config.NodeCollection.BeginDiscovery(config.InitialPeers);
+
+ //Start the discovery process
+ await DiscoverNodesAsync(enumerator, config.AuthManager, config.ErrorHandler, cancellation);
+
+ //Commit nodes
+ config.NodeCollection.CompleteDiscovery(enumerator);
}
- /// <summary>
- /// Discovers peer nodes from a given initial peer and returns a list of discovered nodes. If the config
- /// is for a cache peer node, the current peer is removed from the list of discovered nodes.
- /// </summary>
- /// <param name="cacheConfig"></param>
- /// <param name="initialPeer">The initial peer to discover nodes from</param>
- /// <param name="cancellation">A token to cancel the discovery operation</param>
- /// <returns>The collection of discovered nodes</returns>
- /// <exception cref="ArgumentNullException"></exception>
- public static async Task<ICachePeerAdvertisment[]?> DiscoverClusterNodesAsync(
- this CacheClientConfiguration cacheConfig,
- ICachePeerAdvertisment initialPeer,
- CancellationToken cancellation
+ private static async Task DiscoverNodesAsync(
+ INodeDiscoveryEnumerator enumerator,
+ ICacheAuthManager auth,
+ ICacheDiscoveryErrorHandler? errHandler,
+ CancellationToken cancellation
)
{
- _ = initialPeer?.DiscoveryEndpoint ?? throw new ArgumentException("Advertisment does not expose an advertisment endpoint");
-
- //Create list request
- CacheListServerRequest request = cacheConfig.GetListMessage();
+ //Loop through servers
+ while (enumerator.MoveNext())
+ {
+ //Make sure the node has a discovery endpoint
+ if (enumerator.Current.DiscoveryEndpoint == null)
+ {
+ //Skip this node
+ continue;
+ }
- //Override with the initial peer's discovery endpoint
- request.WithDiscoveryEndpoint(initialPeer.DiscoveryEndpoint);
+ /*
+ * We are allowed to save nodes that do not have a discovery endpoint, but we cannot discover nodes from them
+ * we can only use them as cache
+ */
- //Get the list of servers
- ICachePeerAdvertisment[]? servers = await ListServersAsync(request, cancellation);
+ //add a random delay to avoid spamming the server
+ await Task.Delay((int)Random.Shared.NextInt64(50, 500), cancellation);
- if (servers == null)
- {
- return null;
- }
-
- if(cacheConfig is CacheNodeConfiguration cnc)
- {
- //Filter out the current node
- return servers.Where(s => !cnc.NodeId.Equals(s.NodeId, StringComparison.OrdinalIgnoreCase)).ToArray();
+ try
+ {
+ //Discover nodes from the current node
+ ICacheNodeAdvertisment[]? nodes = await GetCacheNodesAsync(enumerator.Current, auth, cancellation);
+
+ if (nodes != null)
+ {
+ //Add nodes to the collection
+ enumerator.OnPeerDiscoveryComplete(nodes);
+ }
+ }
+ //Catch exceptions when an error handler is defined
+ catch(Exception ex) when (errHandler != null)
+ {
+ //Handle the error
+ errHandler.OnDiscoveryError(enumerator.Current, ex);
+ }
}
- else
- {
- //Do not filter
- return servers;
- }
}
/// <summary>
/// Contacts the cache broker to get a list of active servers to connect to
/// </summary>
- /// <param name="request">The request message used to connecto the broker server</param>
+ /// <param name="advert">An advertisment of a server to discover other nodes from</param>
/// <param name="cancellationToken">A token to cancel the operationS</param>
+ /// <param name="auth">The authentication manager</param>
/// <returns>The list of active servers</returns>
/// <exception cref="SecurityException"></exception>
+ /// <exception cref="ArgumentException"></exception>
/// <exception cref="ArgumentNullException"></exception>
- public static async Task<ICachePeerAdvertisment[]?> ListServersAsync(ICacheListServerRequest request, CancellationToken cancellationToken = default)
+ public static async Task<ICacheNodeAdvertisment[]?> GetCacheNodesAsync(ICacheNodeAdvertisment advert, ICacheAuthManager auth, CancellationToken cancellationToken = default)
{
- _ = request ?? throw new ArgumentNullException(nameof(request));
+ _ = advert ?? throw new ArgumentNullException(nameof(advert));
+ _ = auth ?? throw new ArgumentNullException(nameof(auth));
+ _ = advert.DiscoveryEndpoint ?? throw new ArgumentException("Advertisment does not expose an advertisment endpoint");
string jwtBody;
//Build request jwt
using (JsonWebToken requestJwt = new())
{
- requestJwt.WriteHeader(request.GetJwtHeader());
+ requestJwt.WriteHeader(auth.GetJwtHeader());
requestJwt.InitPayloadClaim()
.AddClaim("iat", DateTimeOffset.UtcNow.ToUnixTimeMilliseconds())
.AddClaim("nonce", RandomHash.GetRandomBase32(16))
.CommitClaims();
//sign the jwt
- request.SignJwt(requestJwt);
+ auth.SignJwt(requestJwt);
//Compile the jwt
jwtBody = requestJwt.Compile();
}
//New list request
- RestRequest listRequest = new(request.DiscoveryEndpoint, Method.Post);
+ RestRequest listRequest = new(advert.DiscoveryEndpoint, Method.Post);
//Add the jwt as a string to the request body
listRequest.AddStringBody(jwtBody, DataFormat.None);
@@ -248,87 +271,18 @@ namespace VNLib.Data.Caching.Extensions
//Response is jwt
using JsonWebToken responseJwt = JsonWebToken.ParseRaw(data);
-
+
//Verify the jwt
- if (!request.VerifyBroker(responseJwt))
+ if (!auth.VerifyJwt(responseJwt))
{
throw new SecurityException("Failed to verify the broker's challenge, cannot continue");
}
-
+
using JsonDocument doc = responseJwt.GetPayload();
return doc.RootElement.GetProperty("peers").Deserialize<Advertisment[]>();
}
/// <summary>
- /// Registers the current node with the broker
- /// </summary>
- /// <returns>A task that completes when the regitration has been made successfully</returns>
- /// <exception cref="ArgumentException"></exception>
- public static async Task RegisterWithBrokerAsync(this CacheNodeConfiguration config, string authToken)
- {
- //Recover the certificate
- ReadOnlyJsonWebKey cacheCert = config?.SigningKey ?? throw new ArgumentException(nameof(config.SigningKey));
-
- //init broker request
- using BrokerRegistrationRequest request = new();
-
- request.WithBroker(config.DiscoveryEndpoint!)
- .WithRegistrationAddress(config.ConnectEndpoint!.ToString())
- .WithNodeId(config.NodeId!)
- .WithSigningKey(cacheCert, true)
- .WithHeartbeatToken(authToken);
-
-
- //Send the request
- await RegisterWithBrokerAsync(request);
- }
-
- /// <summary>
- /// Registers the current server as active with the specified broker
- /// </summary>
- /// <param name="registration">The registration request</param>
- public static async Task RegisterWithBrokerAsync(BrokerRegistrationRequest registration)
- {
- _ = registration ?? throw new ArgumentNullException(nameof(registration));
- _ = registration.HeartbeatToken ?? throw new ArgumentException("Missing required heartbeat access token");
- _ = registration.NodeId ?? throw new ArgumentException("Missing required cache server NodeId");
- _ = registration.BrokerAddress ?? throw new ArgumentException("Broker server address has not been configured");
- _ = registration.RegistrationAddress ?? throw new ArgumentException("Missing required registration address", nameof(registration));
-
- string requestData;
- //Create the jwt for signed registration message
- using (JsonWebToken jwt = new())
- {
- //Shared jwt header
- jwt.WriteHeader(registration.JsonHeader);
- //build jwt claim
- jwt.InitPayloadClaim()
- .AddClaim("address", registration.RegistrationAddress)
- .AddClaim("sub", registration.NodeId)
- .AddClaim("token", registration.HeartbeatToken)
- .CommitClaims();
-
- //Sign the jwt
- registration.SignJwt(jwt);
- //Compile and save
- requestData = jwt.Compile();
- }
-
- //Create reg request message
- RestRequest regRequest = new(registration.BrokerAddress);
- regRequest.AddStringBody(requestData, DataFormat.None);
- regRequest.AddHeader("Content-Type", "text/plain");
-
- //Rent client
- using ClientContract cc = ClientPool.Lease();
-
- //Exec the regitration request
- RestResponse response = await cc.Resource.ExecutePutAsync(regRequest);
- response.ThrowIfError();
- }
-
-
- /// <summary>
/// Allows for configuration of an <see cref="FBMClient"/>
/// for a connection to a cache server
/// </summary>
@@ -359,32 +313,7 @@ namespace VNLib.Data.Caching.Extensions
ClientCacheConfig.AddOrUpdate(client, nodeConfig);
return nodeConfig;
}
-
- /// <summary>
- /// Discovers cache nodes in the broker configured for the current client.
- /// </summary>
- /// <param name="client"></param>
- /// <param name="token">A token to cancel the discovery</param>
- /// <returns>A task the resolves the list of active servers on the broker server</returns>
- public static Task<ICachePeerAdvertisment[]?> DiscoverCacheNodesAsync(this FBMClientWorkerBase client, CancellationToken token = default)
- {
- return client.Client.DiscoverCacheNodesAsync(token);
- }
-
- /// <summary>
- /// Discovers cache nodes in the broker configured for the current client.
- /// </summary>
- /// <param name="client"></param>
- /// <param name="token">A token to cancel the discovery </param>
- /// <returns>A task the resolves the list of active servers on the broker server</returns>
- public static async Task<ICachePeerAdvertisment[]?> DiscoverCacheNodesAsync(this FBMClient client, CancellationToken token = default)
- {
- //Get the stored client config
- CacheClientConfiguration conf = ClientCacheConfig.GetOrCreateValue(client);
-
- //List servers async
- return conf.CacheServers = await ListServersAsync(conf, token);
- }
+
/// <summary>
/// Waits for the client to disconnect from the server while observing
@@ -428,15 +357,18 @@ namespace VNLib.Data.Caching.Extensions
/// <exception cref="ArgumentNullException"></exception>
/// <exception cref="SecurityException"></exception>
/// <exception cref="ObjectDisposedException"></exception>
- public static async Task<ICachePeerAdvertisment> ConnectToRandomCacheAsync(this FBMClient client, CancellationToken cancellation = default)
+ public static async Task<ICacheNodeAdvertisment> ConnectToRandomCacheAsync(this FBMClient client, CancellationToken cancellation = default)
{
//Get stored config
CacheClientConfiguration conf = ClientCacheConfig.GetOrCreateValue(client);
- //Select random
- ICachePeerAdvertisment? randomServer = conf.CacheServers?.SelectRandom()
- ?? throw new ArgumentException("No servers detected, cannot connect");
+ //Get all available nodes, or at least the initial peers
+ ICacheNodeAdvertisment[]? adverts = conf.NodeCollection.GetAllNodes() ?? conf.InitialPeers ?? throw new ArgumentException("No cache nodes discovered, cannot connect");
+ //Select random node from all available nodes
+ ICacheNodeAdvertisment randomServer = adverts.SelectRandom();
+
+ //Connect to the random server
await ConnectToCacheAsync(client, randomServer, cancellation);
//Return the random server we connected to
@@ -456,13 +388,14 @@ namespace VNLib.Data.Caching.Extensions
/// <exception cref="ArgumentNullException"></exception>
/// <exception cref="SecurityException"></exception>
/// <exception cref="ObjectDisposedException"></exception>
- public static Task ConnectToCacheAsync(this FBMClient client, ICachePeerAdvertisment server, CancellationToken token = default)
+ public static Task ConnectToCacheAsync(this FBMClient client, ICacheNodeAdvertisment server, CancellationToken token = default)
{
_ = client ?? throw new ArgumentNullException(nameof(client));
_ = server ?? throw new ArgumentNullException(nameof(server));
//Get stored config
CacheClientConfiguration conf = ClientCacheConfig.GetOrCreateValue(client);
+
//Connect to server (no server id because client not replication server)
return ConnectToCacheAsync(client, conf, server, token);
}
@@ -481,7 +414,7 @@ namespace VNLib.Data.Caching.Extensions
/// <exception cref="ArgumentNullException"></exception>
/// <exception cref="SecurityException"></exception>
/// <exception cref="ObjectDisposedException"></exception>
- public static Task ConnectToCacheAsync(this FBMClient client, ICachePeerAdvertisment server, CacheClientConfiguration explicitConfig, CancellationToken token = default)
+ public static Task ConnectToCacheAsync(this FBMClient client, ICacheNodeAdvertisment server, CacheClientConfiguration explicitConfig, CancellationToken token = default)
{
_ = client ?? throw new ArgumentNullException(nameof(client));
_ = server ?? throw new ArgumentNullException(nameof(server));
@@ -494,7 +427,7 @@ namespace VNLib.Data.Caching.Extensions
private static async Task ConnectToCacheAsync(
FBMClient client,
CacheClientConfiguration config,
- ICachePeerAdvertisment server,
+ ICacheNodeAdvertisment server,
CancellationToken token = default
)
{
@@ -513,7 +446,7 @@ namespace VNLib.Data.Caching.Extensions
//Init jwt for connecting to server
using (JsonWebToken jwt = new())
{
- jwt.WriteHeader(config.GetJwtHeader());
+ jwt.WriteHeader(config.AuthManager.GetJwtHeader());
//Init claim
JwtPayload claim = jwt.InitPayloadClaim();
@@ -532,7 +465,7 @@ namespace VNLib.Data.Caching.Extensions
claim.CommitClaims();
//Sign jwt
- config.SignJwt(jwt);
+ config.AuthManager.SignJwt(jwt);
//Compile to string
jwtMessage = jwt.Compile();
@@ -576,7 +509,7 @@ namespace VNLib.Data.Caching.Extensions
using (JsonWebToken jwt = JsonWebToken.Parse(authToken))
{
//Verify the jwt
- if (!config.VerifyCache(jwt))
+ if (!config.AuthManager.VerifyJwt(jwt))
{
throw new SecurityException("Failed to verify the cache server's negotiation message, cannot continue");
}
@@ -591,7 +524,7 @@ namespace VNLib.Data.Caching.Extensions
client.ClientSocket.Headers[HttpRequestHeader.Authorization] = authToken;
//Compute the signature of the upgrade token
- client.ClientSocket.Headers[X_UPGRADE_SIG_HEADER] = GetBase64UpgradeSingature(authToken, config.SigningKey!);
+ client.ClientSocket.Headers[X_UPGRADE_SIG_HEADER] = config.AuthManager.GetBase64UpgradeSingature(authToken);
//Check to see if adversize self is enabled
if (cnc?.BroadcastAdverisment == true)
@@ -660,38 +593,16 @@ namespace VNLib.Data.Caching.Extensions
* compute a signature of the upgrade token and send it to the server to prove we hold the private key.
*/
- private static string GetBase64UpgradeSingature(string? token, ReadOnlyJsonWebKey key)
+ private static string GetBase64UpgradeSingature(this ICacheAuthManager man, string? token)
{
- //try to get the ecdsa key first
- using ECDsa? ec = key.GetECDsaPrivateKey();
-
- if(ec != null)
- {
- //Compute hash of the token
- byte[] hash = ManagedHash.ComputeHash(token, HashAlg.SHA256);
-
- //Sign the hash
- byte[] sig = ec.SignHash(hash, DSASignatureFormat.IeeeP1363FixedFieldConcatenation);
-
- //Return the base64 string
- return Convert.ToBase64String(sig);
- }
-
- //Check rsa next
- using RSA? rsa = key.GetRSAPrivateKey();
- if(rsa != null)
- {
- //Compute hash of the token
- byte[] hash = ManagedHash.ComputeHash(token, HashAlg.SHA256);
+ //Compute hash of the token
+ byte[] hash = ManagedHash.ComputeHash(token, HashAlg.SHA256);
- //Sign the hash
- byte[] sig = rsa.SignHash(hash, HashAlgorithmName.SHA256, RSASignaturePadding.Pkcs1);
+ //Sign the hash
+ byte[] sig = man.SignMessageHash(hash, HashAlg.SHA256);
- //Return the base64 string
- return Convert.ToBase64String(sig);
- }
-
- throw new CryptographicException("Cache JKW does not export a supported private key for upgrade challenges");
+ //Return the base64 string
+ return Convert.ToBase64String(sig);
}
/// <summary>
@@ -704,21 +615,21 @@ namespace VNLib.Data.Caching.Extensions
/// <exception cref="CryptographicException"></exception>
public static bool VerifyUpgradeToken(this CacheClientConfiguration nodeConfig, string signature, string token)
{
- return VerifyUpgradeToken(signature, token, nodeConfig.VerificationKey);
+ return VerifyUpgradeToken(nodeConfig.AuthManager, signature, token);
}
/// <summary>
/// Verifies the signed auth token against the given verification key
/// </summary>
+ /// <param name="man"></param>
/// <param name="signature">The base64 signature of the token</param>
/// <param name="token">The raw token to compute the hash of</param>
- /// <param name="verifcationKey">The key used to verify the singature with</param>
/// <returns>True if the singature matches, false otherwise</returns>
/// <exception cref="ArgumentNullException"></exception>
/// <exception cref="CryptographicException"></exception>
- public static bool VerifyUpgradeToken(string signature, string token, ReadOnlyJsonWebKey verifcationKey)
+ public static bool VerifyUpgradeToken(this ICacheAuthManager man, string signature, string token)
{
- _ = verifcationKey ?? throw new ArgumentNullException(nameof(verifcationKey));
+ _ = man ?? throw new ArgumentNullException(nameof(man));
//get the hash of the token
byte[] hash = ManagedHash.ComputeHash(token, HashAlg.SHA256);
@@ -726,23 +637,7 @@ namespace VNLib.Data.Caching.Extensions
//decode the signature
byte[] sig = Convert.FromBase64String(signature);
- //try to get the ecdsa key first
- using ECDsa? ec = verifcationKey.GetECDsaPublicKey();
- if(ec != null)
- {
- //Verify the signature
- return ec.VerifyHash(hash, sig, DSASignatureFormat.IeeeP1363FixedFieldConcatenation);
- }
-
- //Check rsa next
- using RSA? rsa = verifcationKey.GetRSAPublicKey();
- if(rsa != null)
- {
- //Verify the signature
- return rsa.VerifyHash(hash, sig, HashAlgorithmName.SHA256, RSASignaturePadding.Pkcs1);
- }
-
- throw new CryptographicException("Cache JKW does not export a supported public key for upgrade challenges");
+ return man.VerifyMessageHash(hash, HashAlg.SHA256, sig);
}
private static string GetAdvertismentHeader(CacheNodeConfiguration nodeConfiguration)
@@ -756,7 +651,7 @@ namespace VNLib.Data.Caching.Extensions
using JsonWebToken jwt = new();
//Get the jwt header
- jwt.WriteHeader(nodeConfiguration.GetJwtHeader());
+ jwt.WriteHeader(nodeConfiguration.AuthManager.GetJwtHeader());
jwt.InitPayloadClaim()
.AddClaim("nonce", RandomHash.GetRandomBase32(16))
@@ -768,7 +663,7 @@ namespace VNLib.Data.Caching.Extensions
.CommitClaims();
//Sign message
- nodeConfiguration.SignJwt(jwt);
+ nodeConfiguration.AuthManager.SignJwt(jwt);
return jwt.Compile();
}
@@ -780,12 +675,12 @@ namespace VNLib.Data.Caching.Extensions
/// <param name="message">The advertisment message to verify</param>
/// <returns>The advertisment message if successfully verified, or null otherwise</returns>
/// <exception cref="FormatException"></exception>
- public static ICachePeerAdvertisment? VerifyPeerAdvertisment(this ICacheJwtManager config, string message)
+ public static ICacheNodeAdvertisment? VerifyPeerAdvertisment(this ICacheAuthManager config, string message)
{
using JsonWebToken jwt = JsonWebToken.Parse(message);
//Verify the signature
- if (!config.VerifyCache(jwt))
+ if (!config.VerifyJwt(jwt))
{
return null;
}
@@ -800,7 +695,7 @@ namespace VNLib.Data.Caching.Extensions
/// </summary>
/// <param name="servers"></param>
/// <returns>A server selected at random</returns>
- public static ICachePeerAdvertisment SelectRandom(this ICollection<ICachePeerAdvertisment> servers)
+ public static ICacheNodeAdvertisment SelectRandom(this ICollection<ICacheNodeAdvertisment> servers)
{
//select random server
int randServer = RandomNumberGenerator.GetInt32(0, servers.Count);
@@ -808,7 +703,7 @@ namespace VNLib.Data.Caching.Extensions
}
- private class Advertisment : ICachePeerAdvertisment
+ private class Advertisment : ICacheNodeAdvertisment
{
[JsonIgnore]
public Uri? ConnectEndpoint { get; set; }