aboutsummaryrefslogtreecommitdiff
path: root/lib/VNLib.Data.Caching.Extensions/src
diff options
context:
space:
mode:
Diffstat (limited to 'lib/VNLib.Data.Caching.Extensions/src')
-rw-r--r--lib/VNLib.Data.Caching.Extensions/src/ActiveServer.cs2
-rw-r--r--lib/VNLib.Data.Caching.Extensions/src/BrokerRegistrationRequest.cs116
-rw-r--r--lib/VNLib.Data.Caching.Extensions/src/CacheClientConfiguration.cs106
-rw-r--r--lib/VNLib.Data.Caching.Extensions/src/CacheListServerRequest.cs82
-rw-r--r--lib/VNLib.Data.Caching.Extensions/src/CacheNodeConfiguration.cs14
-rw-r--r--lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs317
-rw-r--r--lib/VNLib.Data.Caching.Extensions/src/ICacheAuthManager.cs76
-rw-r--r--lib/VNLib.Data.Caching.Extensions/src/ICacheDiscoveryErrorHandler.cs41
-rw-r--r--lib/VNLib.Data.Caching.Extensions/src/ICacheNodeAdvertisment.cs (renamed from lib/VNLib.Data.Caching.Extensions/src/ICachePeerAdvertisment.cs)6
-rw-r--r--lib/VNLib.Data.Caching.Extensions/src/INodeDiscoveryCollection.cs62
-rw-r--r--lib/VNLib.Data.Caching.Extensions/src/INodeDiscoveryEnumerator.cs42
-rw-r--r--lib/VNLib.Data.Caching.Extensions/src/NodeDiscoveryCollection.cs117
12 files changed, 498 insertions, 483 deletions
diff --git a/lib/VNLib.Data.Caching.Extensions/src/ActiveServer.cs b/lib/VNLib.Data.Caching.Extensions/src/ActiveServer.cs
index 2d02491..3020376 100644
--- a/lib/VNLib.Data.Caching.Extensions/src/ActiveServer.cs
+++ b/lib/VNLib.Data.Caching.Extensions/src/ActiveServer.cs
@@ -27,7 +27,7 @@ using System.Text.Json.Serialization;
namespace VNLib.Data.Caching.Extensions
{
- public class ActiveServer : ICachePeerAdvertisment
+ public class ActiveServer : ICacheNodeAdvertisment
{
[JsonPropertyName("address")]
public string? HostName { get; set; }
diff --git a/lib/VNLib.Data.Caching.Extensions/src/BrokerRegistrationRequest.cs b/lib/VNLib.Data.Caching.Extensions/src/BrokerRegistrationRequest.cs
deleted file mode 100644
index 9ec559a..0000000
--- a/lib/VNLib.Data.Caching.Extensions/src/BrokerRegistrationRequest.cs
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
-* Copyright (c) 2023 Vaughn Nugent
-*
-* Library: VNLib
-* Package: VNLib.Data.Caching.Extensions
-* File: BrokerRegistrationRequest.cs
-*
-* BrokerRegistrationRequest.cs is part of VNLib.Data.Caching.Extensions which is part of the larger
-* VNLib collection of libraries and utilities.
-*
-* VNLib.Data.Caching.Extensions is free software: you can redistribute it and/or modify
-* it under the terms of the GNU Affero General Public License as
-* published by the Free Software Foundation, either version 3 of the
-* License, or (at your option) any later version.
-*
-* VNLib.Data.Caching.Extensions is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-* GNU Affero General Public License for more details.
-*
-* You should have received a copy of the GNU Affero General Public License
-* along with this program. If not, see https://www.gnu.org/licenses/.
-*/
-
-using System;
-using System.Collections.Generic;
-
-using VNLib.Utils;
-using VNLib.Hashing.IdentityUtility;
-
-
-namespace VNLib.Data.Caching.Extensions
-{
- /// <summary>
- /// A broker registration request message in a fluent api
- /// format. This message may be disposed when no longer in use
- /// </summary>
- public sealed class BrokerRegistrationRequest : VnDisposeable
- {
- private bool ownsKey;
- private ReadOnlyJsonWebKey? SigningKey;
-
- /// <summary>
- /// The cache server node id
- /// </summary>
- public string? NodeId { get; private set; }
- /// <summary>
- /// The broker server's address
- /// </summary>
- public Uri? BrokerAddress { get; private set; }
- /// <summary>
- /// The security token used by the broker server to
- /// authenticate during heartbeat connections
- /// </summary>
- public string? HeartbeatToken { get; private set; }
- /// <summary>
- /// The address for remote clients to use to
- /// connect to this server
- /// </summary>
- public string? RegistrationAddress { get; private set; }
-
- /// <summary>
- /// Recovers the private key from the supplied certificate
- /// </summary>
- /// <param name="jwk">The private key used to sign messages</param>
- /// <param name="ownsKey">A value that indicates if the current instance owns the key</param>
- /// <returns></returns>
- /// <exception cref="ArgumentException"></exception>
- public BrokerRegistrationRequest WithSigningKey(ReadOnlyJsonWebKey jwk, bool ownsKey)
- {
- this.ownsKey = ownsKey;
- SigningKey = jwk ?? throw new ArgumentNullException(nameof(jwk));
- return this;
- }
-
- public BrokerRegistrationRequest WithBroker(Uri brokerUri)
- {
- BrokerAddress = brokerUri;
- return this;
- }
-
- public BrokerRegistrationRequest WithRegistrationAddress(string address)
- {
- RegistrationAddress = address;
- return this;
- }
-
- public BrokerRegistrationRequest WithHeartbeatToken(string token)
- {
- HeartbeatToken = token;
- return this;
- }
-
- public BrokerRegistrationRequest WithNodeId(string nodeId)
- {
- NodeId = nodeId;
- return this;
- }
-
- internal void SignJwt(JsonWebToken jwt)
- {
- jwt.SignFromJwk(SigningKey);
- }
-
- internal IReadOnlyDictionary<string, string?> JsonHeader => SigningKey!.JwtHeader;
-
- ///<inheritdoc/>
- protected override void Free()
- {
- if (ownsKey)
- {
- SigningKey?.Dispose();
- }
- }
- }
-}
diff --git a/lib/VNLib.Data.Caching.Extensions/src/CacheClientConfiguration.cs b/lib/VNLib.Data.Caching.Extensions/src/CacheClientConfiguration.cs
index 05e4928..9229c89 100644
--- a/lib/VNLib.Data.Caching.Extensions/src/CacheClientConfiguration.cs
+++ b/lib/VNLib.Data.Caching.Extensions/src/CacheClientConfiguration.cs
@@ -3,9 +3,9 @@
*
* Library: VNLib
* Package: VNLib.Data.Caching.Extensions
-* File: ClientCacheConfiguration.cs
+* File: CacheClientConfiguration.cs
*
-* ClientCacheConfiguration.cs is part of VNLib.Data.Caching.Extensions which is part of the larger
+* CacheClientConfiguration.cs is part of VNLib.Data.Caching.Extensions which is part of the larger
* VNLib collection of libraries and utilities.
*
* VNLib.Data.Caching.Extensions is free software: you can redistribute it and/or modify
@@ -22,69 +22,47 @@
* along with this program. If not, see https://www.gnu.org/licenses/.
*/
-using System;
+using System.Linq;
using System.Collections.Generic;
-using System.Security.Cryptography;
-
-using VNLib.Hashing;
-using VNLib.Hashing.IdentityUtility;
namespace VNLib.Data.Caching.Extensions
{
- public interface ICacheJwtManager
- {
- IReadOnlyDictionary<string, string?> GetJwtHeader();
-
- void SignJwt(JsonWebToken jwt);
-
- bool VerifyCache(JsonWebToken jwt);
-
- bool VerifyBroker(JsonWebToken jwt);
- }
-
/// <summary>
/// A fluent api configuration object for configuring a <see cref="FBMClient"/>
/// to connect to cache servers.
/// </summary>
- public class CacheClientConfiguration : ICacheJwtManager, ICacheListServerRequest
+ public class CacheClientConfiguration
{
- public ReadOnlyJsonWebKey? SigningKey { get; private set; }
- public ReadOnlyJsonWebKey? VerificationKey { get; private set; }
- public ReadOnlyJsonWebKey? BrokerVerificationKey { get; private set; }
-
- public Uri? DiscoveryEndpoint { get; private set; }
- public bool UseTls { get; private set; }
- internal ICachePeerAdvertisment[]? CacheServers { get; set; }
+ /// <summary>
+ /// Stores available cache servers to be used for discovery, and connections
+ /// </summary>
+ public INodeDiscoveryCollection NodeCollection { get; } = new NodeDiscoveryCollection();
/// <summary>
- /// Imports the private key used to sign messages
+ /// The authentication manager to use for signing and verifying messages to and from the cache servers
/// </summary>
- /// <param name="jwk">The <see cref="ReadOnlyJsonWebKey"/> with a private key loaded</param>
- /// <returns>Chainable fluent object</returns>
- /// <exception cref="ArgumentException"></exception>
- /// <exception cref="CryptographicException"></exception>
- public CacheClientConfiguration WithSigningKey(ReadOnlyJsonWebKey jwk)
- {
- SigningKey = jwk ?? throw new ArgumentNullException(nameof(jwk));
- return this;
- }
+ public ICacheAuthManager AuthManager { get; private set; }
/// <summary>
- /// Imports the public key used to verify messages from the remote server
+ /// The error handler to use for handling errors that occur during the discovery process
/// </summary>
- /// <param name="jwk">The <see cref="ReadOnlyJsonWebKey"/> public key only used for message verification</param>
- /// <returns>Chainable fluent object</returns>
- /// <exception cref="ArgumentException"></exception>
- /// <exception cref="CryptographicException"></exception>
- public CacheClientConfiguration WithVerificationKey(ReadOnlyJsonWebKey jwk)
- {
- VerificationKey = jwk ?? throw new ArgumentNullException(nameof(jwk));
- return this;
- }
+ public ICacheDiscoveryErrorHandler? ErrorHandler { get; private set; }
- public CacheClientConfiguration WithBrokerVerificationKey(ReadOnlyJsonWebKey jwk)
+ /// <summary>
+ /// Specifies if all connections should use TLS
+ /// </summary>
+ public bool UseTls { get; private set; }
+
+ internal ICacheNodeAdvertisment[]? InitialPeers { get; set; }
+
+ /// <summary>
+ /// Specifies the JWT authentication manager to use for signing and verifying JWTs
+ /// </summary>
+ /// <param name="manager">The authentication manager</param>
+ /// <returns>Chainable fluent object</returns>
+ public CacheClientConfiguration WithAuthenticator(ICacheAuthManager manager)
{
- BrokerVerificationKey = jwk ?? throw new ArgumentNullException(nameof(jwk));
+ AuthManager = manager;
return this;
}
@@ -92,7 +70,6 @@ namespace VNLib.Data.Caching.Extensions
/// Specifies if all connections should be using TLS
/// </summary>
/// <param name="useTls">A value that indicates if connections should use TLS</param>
- /// <returns>Chainable fluent object</returns>
public CacheClientConfiguration WithTls(bool useTls)
{
UseTls = useTls;
@@ -100,28 +77,25 @@ namespace VNLib.Data.Caching.Extensions
}
/// <summary>
- /// Specifies the broker address to discover cache nodes from
+ /// Specifies the initial cache peers to connect to
/// </summary>
- /// <param name="brokerAddress">The address of the server broker</param>
+ /// <param name="peers">The collection of servers to discover peers from and connect to</param>
/// <returns>Chainable fluent object</returns>
- /// <exception cref="ArgumentNullException"></exception>
- public CacheClientConfiguration WithBroker(Uri brokerAddress)
+ public CacheClientConfiguration WithInitialPeers(IEnumerable<ICacheNodeAdvertisment> peers)
{
- DiscoveryEndpoint = brokerAddress ?? throw new ArgumentNullException(nameof(brokerAddress));
+ InitialPeers = peers.ToArray();
return this;
}
-
- ///<inheritdoc/>
- public void SignJwt(JsonWebToken jwt) => jwt.SignFromJwk(SigningKey!);
-
- ///<inheritdoc/>
- public bool VerifyCache(JsonWebToken jwt) => jwt.VerifyFromJwk(VerificationKey!);
-
- ///<inheritdoc/>
- public bool VerifyBroker(JsonWebToken jwt) => jwt.VerifyFromJwk(BrokerVerificationKey!);
-
- ///<inheritdoc/>
- public IReadOnlyDictionary<string, string?> GetJwtHeader() => SigningKey!.JwtHeader;
+ /// <summary>
+ /// Specifies the error handler to use for handling errors that occur during the discovery process
+ /// </summary>
+ /// <param name="handler">The error handler to use during a discovery</param>
+ /// <returns>Chainable fluent object</returns>
+ public CacheClientConfiguration WithErrorHandler(ICacheDiscoveryErrorHandler handler)
+ {
+ ErrorHandler = handler;
+ return this;
+ }
}
}
diff --git a/lib/VNLib.Data.Caching.Extensions/src/CacheListServerRequest.cs b/lib/VNLib.Data.Caching.Extensions/src/CacheListServerRequest.cs
deleted file mode 100644
index 76d4ad8..0000000
--- a/lib/VNLib.Data.Caching.Extensions/src/CacheListServerRequest.cs
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
-* Copyright (c) 2023 Vaughn Nugent
-*
-* Library: VNLib
-* Package: VNLib.Data.Caching.Extensions
-* File: ListServerRequest.cs
-*
-* ListServerRequest.cs is part of VNLib.Data.Caching.Extensions which is part of the larger
-* VNLib collection of libraries and utilities.
-*
-* VNLib.Data.Caching.Extensions is free software: you can redistribute it and/or modify
-* it under the terms of the GNU Affero General Public License as
-* published by the Free Software Foundation, either version 3 of the
-* License, or (at your option) any later version.
-*
-* VNLib.Data.Caching.Extensions is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-* GNU Affero General Public License for more details.
-*
-* You should have received a copy of the GNU Affero General Public License
-* along with this program. If not, see https://www.gnu.org/licenses/.
-*/
-
-using System;
-using System.Collections.Generic;
-
-using VNLib.Utils;
-using VNLib.Hashing.IdentityUtility;
-
-namespace VNLib.Data.Caching.Extensions
-{
- public interface ICacheListServerRequest : ICacheJwtManager
- {
- Uri DiscoveryEndpoint { get; }
- }
-
- /// <summary>
- /// A request container for a ListServer request
- /// </summary>
- public sealed class CacheListServerRequest : ICacheListServerRequest
- {
- private readonly ICacheJwtManager _manager;
-
-
- /// <summary>
- /// The address of the broker server to connect to
- /// </summary>
- public Uri DiscoveryEndpoint { get; private set; }
-
- public CacheListServerRequest(ICacheJwtManager keyManager, Uri? brokerAddress = null)
- {
- _manager = keyManager;
- DiscoveryEndpoint = brokerAddress!;
- }
-
-
- /// <summary>
- /// Sets the broker address for the request
- /// </summary>
- /// <param name="brokerAddr">The broker server's address to connect to</param>
- /// <returns>A fluent chainable value</returns>
- /// <exception cref="ArgumentNullException"></exception>
- public CacheListServerRequest WithDiscoveryEndpoint(Uri brokerAddr)
- {
- DiscoveryEndpoint = brokerAddr ?? throw new ArgumentNullException(nameof(brokerAddr));
- return this;
- }
-
- /// <inheritdoc/>
- public void SignJwt(JsonWebToken jwt) => _manager.SignJwt(jwt);
-
- /// <inheritdoc/>
- public bool VerifyCache(JsonWebToken jwt) => _manager.VerifyCache(jwt);
-
- /// <inheritdoc/>
- public bool VerifyBroker(JsonWebToken jwt) => _manager.VerifyBroker(jwt);
-
- /// <inheritdoc/>
- public IReadOnlyDictionary<string, string?> GetJwtHeader() => _manager.GetJwtHeader();
- }
-}
diff --git a/lib/VNLib.Data.Caching.Extensions/src/CacheNodeConfiguration.cs b/lib/VNLib.Data.Caching.Extensions/src/CacheNodeConfiguration.cs
index 21a99e1..29a763c 100644
--- a/lib/VNLib.Data.Caching.Extensions/src/CacheNodeConfiguration.cs
+++ b/lib/VNLib.Data.Caching.Extensions/src/CacheNodeConfiguration.cs
@@ -24,11 +24,13 @@
using System;
-
namespace VNLib.Data.Caching.Extensions
{
- public class CacheNodeConfiguration: CacheClientConfiguration, ICachePeerAdvertisment
+ /// <summary>
+ /// A cache configuration for cache servers (nodes)
+ /// </summary>
+ public class CacheNodeConfiguration: CacheClientConfiguration, ICacheNodeAdvertisment
{
/// <summary>
/// The address for clients to connect to
@@ -56,9 +58,13 @@ namespace VNLib.Data.Caching.Extensions
return this;
}
- public CacheNodeConfiguration EnableAdvertisment(bool enable, Uri? discoveryEndpoint)
+ /// <summary>
+ /// Enables or disables the advertisement of this node to other nodes
+ /// </summary>
+ /// <param name="discoveryEndpoint">The absolute endpoint clients will use to connect to</param>
+ public CacheNodeConfiguration EnableAdvertisment(Uri? discoveryEndpoint)
{
- BroadcastAdverisment = enable;
+ BroadcastAdverisment = discoveryEndpoint != null;
DiscoveryEndpoint = discoveryEndpoint;
return this;
}
diff --git a/lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs b/lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs
index 9efe16a..634b6de 100644
--- a/lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs
+++ b/lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs
@@ -139,91 +139,114 @@ namespace VNLib.Data.Caching.Extensions
client.Config.DebugLog?.Debug("{debug}: {data}", "[CACHE]", message);
}
+
/// <summary>
- /// Creats a new <see cref="CacheListServerRequest"/> from an existing <see cref="CacheClientConfiguration"/>
+ /// Discovers ALL possible cache nodes itteritivley from the current collection of initial peers.
+ /// This will make connections to all discoverable servers
/// </summary>
- /// <param name="conf">The prepared client configuration</param>
- /// <returns>The new <see cref="CacheListServerRequest"/></returns>
- public static CacheListServerRequest GetListMessage(this CacheClientConfiguration conf)
+ /// <param name="config"></param>
+ /// <param name="cancellation">A token to cancel the operation</param>
+ /// <returns></returns>
+ /// <exception cref="ArgumentException"></exception>
+ public static async Task DiscoverNodesAsync(this CacheClientConfiguration config, CancellationToken cancellation)
{
- return new(conf, conf.DiscoveryEndpoint);
+ //Make sure at least one node defined
+ if(config?.InitialPeers == null || config.InitialPeers.Length == 0)
+ {
+ throw new ArgumentException("There must be at least one cache server defined in the client configuration");
+ }
+
+ //Get the discovery enumerator with the initial peers
+ INodeDiscoveryEnumerator enumerator = config.NodeCollection.BeginDiscovery(config.InitialPeers);
+
+ //Start the discovery process
+ await DiscoverNodesAsync(enumerator, config.AuthManager, config.ErrorHandler, cancellation);
+
+ //Commit nodes
+ config.NodeCollection.CompleteDiscovery(enumerator);
}
- /// <summary>
- /// Discovers peer nodes from a given initial peer and returns a list of discovered nodes. If the config
- /// is for a cache peer node, the current peer is removed from the list of discovered nodes.
- /// </summary>
- /// <param name="cacheConfig"></param>
- /// <param name="initialPeer">The initial peer to discover nodes from</param>
- /// <param name="cancellation">A token to cancel the discovery operation</param>
- /// <returns>The collection of discovered nodes</returns>
- /// <exception cref="ArgumentNullException"></exception>
- public static async Task<ICachePeerAdvertisment[]?> DiscoverClusterNodesAsync(
- this CacheClientConfiguration cacheConfig,
- ICachePeerAdvertisment initialPeer,
- CancellationToken cancellation
+ private static async Task DiscoverNodesAsync(
+ INodeDiscoveryEnumerator enumerator,
+ ICacheAuthManager auth,
+ ICacheDiscoveryErrorHandler? errHandler,
+ CancellationToken cancellation
)
{
- _ = initialPeer?.DiscoveryEndpoint ?? throw new ArgumentException("Advertisment does not expose an advertisment endpoint");
-
- //Create list request
- CacheListServerRequest request = cacheConfig.GetListMessage();
+ //Loop through servers
+ while (enumerator.MoveNext())
+ {
+ //Make sure the node has a discovery endpoint
+ if (enumerator.Current.DiscoveryEndpoint == null)
+ {
+ //Skip this node
+ continue;
+ }
- //Override with the initial peer's discovery endpoint
- request.WithDiscoveryEndpoint(initialPeer.DiscoveryEndpoint);
+ /*
+ * We are allowed to save nodes that do not have a discovery endpoint, but we cannot discover nodes from them
+ * we can only use them as cache
+ */
- //Get the list of servers
- ICachePeerAdvertisment[]? servers = await ListServersAsync(request, cancellation);
+ //add a random delay to avoid spamming the server
+ await Task.Delay((int)Random.Shared.NextInt64(50, 500), cancellation);
- if (servers == null)
- {
- return null;
- }
-
- if(cacheConfig is CacheNodeConfiguration cnc)
- {
- //Filter out the current node
- return servers.Where(s => !cnc.NodeId.Equals(s.NodeId, StringComparison.OrdinalIgnoreCase)).ToArray();
+ try
+ {
+ //Discover nodes from the current node
+ ICacheNodeAdvertisment[]? nodes = await GetCacheNodesAsync(enumerator.Current, auth, cancellation);
+
+ if (nodes != null)
+ {
+ //Add nodes to the collection
+ enumerator.OnPeerDiscoveryComplete(nodes);
+ }
+ }
+ //Catch exceptions when an error handler is defined
+ catch(Exception ex) when (errHandler != null)
+ {
+ //Handle the error
+ errHandler.OnDiscoveryError(enumerator.Current, ex);
+ }
}
- else
- {
- //Do not filter
- return servers;
- }
}
/// <summary>
/// Contacts the cache broker to get a list of active servers to connect to
/// </summary>
- /// <param name="request">The request message used to connecto the broker server</param>
+ /// <param name="advert">An advertisment of a server to discover other nodes from</param>
/// <param name="cancellationToken">A token to cancel the operationS</param>
+ /// <param name="auth">The authentication manager</param>
/// <returns>The list of active servers</returns>
/// <exception cref="SecurityException"></exception>
+ /// <exception cref="ArgumentException"></exception>
/// <exception cref="ArgumentNullException"></exception>
- public static async Task<ICachePeerAdvertisment[]?> ListServersAsync(ICacheListServerRequest request, CancellationToken cancellationToken = default)
+ public static async Task<ICacheNodeAdvertisment[]?> GetCacheNodesAsync(ICacheNodeAdvertisment advert, ICacheAuthManager auth, CancellationToken cancellationToken = default)
{
- _ = request ?? throw new ArgumentNullException(nameof(request));
+ _ = advert ?? throw new ArgumentNullException(nameof(advert));
+ _ = auth ?? throw new ArgumentNullException(nameof(auth));
+ _ = advert.DiscoveryEndpoint ?? throw new ArgumentException("Advertisment does not expose an advertisment endpoint");
string jwtBody;
//Build request jwt
using (JsonWebToken requestJwt = new())
{
- requestJwt.WriteHeader(request.GetJwtHeader());
+ requestJwt.WriteHeader(auth.GetJwtHeader());
requestJwt.InitPayloadClaim()
.AddClaim("iat", DateTimeOffset.UtcNow.ToUnixTimeMilliseconds())
.AddClaim("nonce", RandomHash.GetRandomBase32(16))
.CommitClaims();
//sign the jwt
- request.SignJwt(requestJwt);
+ auth.SignJwt(requestJwt);
//Compile the jwt
jwtBody = requestJwt.Compile();
}
//New list request
- RestRequest listRequest = new(request.DiscoveryEndpoint, Method.Post);
+ RestRequest listRequest = new(advert.DiscoveryEndpoint, Method.Post);
//Add the jwt as a string to the request body
listRequest.AddStringBody(jwtBody, DataFormat.None);
@@ -248,87 +271,18 @@ namespace VNLib.Data.Caching.Extensions
//Response is jwt
using JsonWebToken responseJwt = JsonWebToken.ParseRaw(data);
-
+
//Verify the jwt
- if (!request.VerifyBroker(responseJwt))
+ if (!auth.VerifyJwt(responseJwt))
{
throw new SecurityException("Failed to verify the broker's challenge, cannot continue");
}
-
+
using JsonDocument doc = responseJwt.GetPayload();
return doc.RootElement.GetProperty("peers").Deserialize<Advertisment[]>();
}
/// <summary>
- /// Registers the current node with the broker
- /// </summary>
- /// <returns>A task that completes when the regitration has been made successfully</returns>
- /// <exception cref="ArgumentException"></exception>
- public static async Task RegisterWithBrokerAsync(this CacheNodeConfiguration config, string authToken)
- {
- //Recover the certificate
- ReadOnlyJsonWebKey cacheCert = config?.SigningKey ?? throw new ArgumentException(nameof(config.SigningKey));
-
- //init broker request
- using BrokerRegistrationRequest request = new();
-
- request.WithBroker(config.DiscoveryEndpoint!)
- .WithRegistrationAddress(config.ConnectEndpoint!.ToString())
- .WithNodeId(config.NodeId!)
- .WithSigningKey(cacheCert, true)
- .WithHeartbeatToken(authToken);
-
-
- //Send the request
- await RegisterWithBrokerAsync(request);
- }
-
- /// <summary>
- /// Registers the current server as active with the specified broker
- /// </summary>
- /// <param name="registration">The registration request</param>
- public static async Task RegisterWithBrokerAsync(BrokerRegistrationRequest registration)
- {
- _ = registration ?? throw new ArgumentNullException(nameof(registration));
- _ = registration.HeartbeatToken ?? throw new ArgumentException("Missing required heartbeat access token");
- _ = registration.NodeId ?? throw new ArgumentException("Missing required cache server NodeId");
- _ = registration.BrokerAddress ?? throw new ArgumentException("Broker server address has not been configured");
- _ = registration.RegistrationAddress ?? throw new ArgumentException("Missing required registration address", nameof(registration));
-
- string requestData;
- //Create the jwt for signed registration message
- using (JsonWebToken jwt = new())
- {
- //Shared jwt header
- jwt.WriteHeader(registration.JsonHeader);
- //build jwt claim
- jwt.InitPayloadClaim()
- .AddClaim("address", registration.RegistrationAddress)
- .AddClaim("sub", registration.NodeId)
- .AddClaim("token", registration.HeartbeatToken)
- .CommitClaims();
-
- //Sign the jwt
- registration.SignJwt(jwt);
- //Compile and save
- requestData = jwt.Compile();
- }
-
- //Create reg request message
- RestRequest regRequest = new(registration.BrokerAddress);
- regRequest.AddStringBody(requestData, DataFormat.None);
- regRequest.AddHeader("Content-Type", "text/plain");
-
- //Rent client
- using ClientContract cc = ClientPool.Lease();
-
- //Exec the regitration request
- RestResponse response = await cc.Resource.ExecutePutAsync(regRequest);
- response.ThrowIfError();
- }
-
-
- /// <summary>
/// Allows for configuration of an <see cref="FBMClient"/>
/// for a connection to a cache server
/// </summary>
@@ -359,32 +313,7 @@ namespace VNLib.Data.Caching.Extensions
ClientCacheConfig.AddOrUpdate(client, nodeConfig);
return nodeConfig;
}
-
- /// <summary>
- /// Discovers cache nodes in the broker configured for the current client.
- /// </summary>
- /// <param name="client"></param>
- /// <param name="token">A token to cancel the discovery</param>
- /// <returns>A task the resolves the list of active servers on the broker server</returns>
- public static Task<ICachePeerAdvertisment[]?> DiscoverCacheNodesAsync(this FBMClientWorkerBase client, CancellationToken token = default)
- {
- return client.Client.DiscoverCacheNodesAsync(token);
- }
-
- /// <summary>
- /// Discovers cache nodes in the broker configured for the current client.
- /// </summary>
- /// <param name="client"></param>
- /// <param name="token">A token to cancel the discovery </param>
- /// <returns>A task the resolves the list of active servers on the broker server</returns>
- public static async Task<ICachePeerAdvertisment[]?> DiscoverCacheNodesAsync(this FBMClient client, CancellationToken token = default)
- {
- //Get the stored client config
- CacheClientConfiguration conf = ClientCacheConfig.GetOrCreateValue(client);
-
- //List servers async
- return conf.CacheServers = await ListServersAsync(conf, token);
- }
+
/// <summary>
/// Waits for the client to disconnect from the server while observing
@@ -428,15 +357,18 @@ namespace VNLib.Data.Caching.Extensions
/// <exception cref="ArgumentNullException"></exception>
/// <exception cref="SecurityException"></exception>
/// <exception cref="ObjectDisposedException"></exception>
- public static async Task<ICachePeerAdvertisment> ConnectToRandomCacheAsync(this FBMClient client, CancellationToken cancellation = default)
+ public static async Task<ICacheNodeAdvertisment> ConnectToRandomCacheAsync(this FBMClient client, CancellationToken cancellation = default)
{
//Get stored config
CacheClientConfiguration conf = ClientCacheConfig.GetOrCreateValue(client);
- //Select random
- ICachePeerAdvertisment? randomServer = conf.CacheServers?.SelectRandom()
- ?? throw new ArgumentException("No servers detected, cannot connect");
+ //Get all available nodes, or at least the initial peers
+ ICacheNodeAdvertisment[]? adverts = conf.NodeCollection.GetAllNodes() ?? conf.InitialPeers ?? throw new ArgumentException("No cache nodes discovered, cannot connect");
+ //Select random node from all available nodes
+ ICacheNodeAdvertisment randomServer = adverts.SelectRandom();
+
+ //Connect to the random server
await ConnectToCacheAsync(client, randomServer, cancellation);
//Return the random server we connected to
@@ -456,13 +388,14 @@ namespace VNLib.Data.Caching.Extensions
/// <exception cref="ArgumentNullException"></exception>
/// <exception cref="SecurityException"></exception>
/// <exception cref="ObjectDisposedException"></exception>
- public static Task ConnectToCacheAsync(this FBMClient client, ICachePeerAdvertisment server, CancellationToken token = default)
+ public static Task ConnectToCacheAsync(this FBMClient client, ICacheNodeAdvertisment server, CancellationToken token = default)
{
_ = client ?? throw new ArgumentNullException(nameof(client));
_ = server ?? throw new ArgumentNullException(nameof(server));
//Get stored config
CacheClientConfiguration conf = ClientCacheConfig.GetOrCreateValue(client);
+
//Connect to server (no server id because client not replication server)
return ConnectToCacheAsync(client, conf, server, token);
}
@@ -481,7 +414,7 @@ namespace VNLib.Data.Caching.Extensions
/// <exception cref="ArgumentNullException"></exception>
/// <exception cref="SecurityException"></exception>
/// <exception cref="ObjectDisposedException"></exception>
- public static Task ConnectToCacheAsync(this FBMClient client, ICachePeerAdvertisment server, CacheClientConfiguration explicitConfig, CancellationToken token = default)
+ public static Task ConnectToCacheAsync(this FBMClient client, ICacheNodeAdvertisment server, CacheClientConfiguration explicitConfig, CancellationToken token = default)
{
_ = client ?? throw new ArgumentNullException(nameof(client));
_ = server ?? throw new ArgumentNullException(nameof(server));
@@ -494,7 +427,7 @@ namespace VNLib.Data.Caching.Extensions
private static async Task ConnectToCacheAsync(
FBMClient client,
CacheClientConfiguration config,
- ICachePeerAdvertisment server,
+ ICacheNodeAdvertisment server,
CancellationToken token = default
)
{
@@ -513,7 +446,7 @@ namespace VNLib.Data.Caching.Extensions
//Init jwt for connecting to server
using (JsonWebToken jwt = new())
{
- jwt.WriteHeader(config.GetJwtHeader());
+ jwt.WriteHeader(config.AuthManager.GetJwtHeader());
//Init claim
JwtPayload claim = jwt.InitPayloadClaim();
@@ -532,7 +465,7 @@ namespace VNLib.Data.Caching.Extensions
claim.CommitClaims();
//Sign jwt
- config.SignJwt(jwt);
+ config.AuthManager.SignJwt(jwt);
//Compile to string
jwtMessage = jwt.Compile();
@@ -576,7 +509,7 @@ namespace VNLib.Data.Caching.Extensions
using (JsonWebToken jwt = JsonWebToken.Parse(authToken))
{
//Verify the jwt
- if (!config.VerifyCache(jwt))
+ if (!config.AuthManager.VerifyJwt(jwt))
{
throw new SecurityException("Failed to verify the cache server's negotiation message, cannot continue");
}
@@ -591,7 +524,7 @@ namespace VNLib.Data.Caching.Extensions
client.ClientSocket.Headers[HttpRequestHeader.Authorization] = authToken;
//Compute the signature of the upgrade token
- client.ClientSocket.Headers[X_UPGRADE_SIG_HEADER] = GetBase64UpgradeSingature(authToken, config.SigningKey!);
+ client.ClientSocket.Headers[X_UPGRADE_SIG_HEADER] = config.AuthManager.GetBase64UpgradeSingature(authToken);
//Check to see if adversize self is enabled
if (cnc?.BroadcastAdverisment == true)
@@ -660,38 +593,16 @@ namespace VNLib.Data.Caching.Extensions
* compute a signature of the upgrade token and send it to the server to prove we hold the private key.
*/
- private static string GetBase64UpgradeSingature(string? token, ReadOnlyJsonWebKey key)
+ private static string GetBase64UpgradeSingature(this ICacheAuthManager man, string? token)
{
- //try to get the ecdsa key first
- using ECDsa? ec = key.GetECDsaPrivateKey();
-
- if(ec != null)
- {
- //Compute hash of the token
- byte[] hash = ManagedHash.ComputeHash(token, HashAlg.SHA256);
-
- //Sign the hash
- byte[] sig = ec.SignHash(hash, DSASignatureFormat.IeeeP1363FixedFieldConcatenation);
-
- //Return the base64 string
- return Convert.ToBase64String(sig);
- }
-
- //Check rsa next
- using RSA? rsa = key.GetRSAPrivateKey();
- if(rsa != null)
- {
- //Compute hash of the token
- byte[] hash = ManagedHash.ComputeHash(token, HashAlg.SHA256);
+ //Compute hash of the token
+ byte[] hash = ManagedHash.ComputeHash(token, HashAlg.SHA256);
- //Sign the hash
- byte[] sig = rsa.SignHash(hash, HashAlgorithmName.SHA256, RSASignaturePadding.Pkcs1);
+ //Sign the hash
+ byte[] sig = man.SignMessageHash(hash, HashAlg.SHA256);
- //Return the base64 string
- return Convert.ToBase64String(sig);
- }
-
- throw new CryptographicException("Cache JKW does not export a supported private key for upgrade challenges");
+ //Return the base64 string
+ return Convert.ToBase64String(sig);
}
/// <summary>
@@ -704,21 +615,21 @@ namespace VNLib.Data.Caching.Extensions
/// <exception cref="CryptographicException"></exception>
public static bool VerifyUpgradeToken(this CacheClientConfiguration nodeConfig, string signature, string token)
{
- return VerifyUpgradeToken(signature, token, nodeConfig.VerificationKey);
+ return VerifyUpgradeToken(nodeConfig.AuthManager, signature, token);
}
/// <summary>
/// Verifies the signed auth token against the given verification key
/// </summary>
+ /// <param name="man"></param>
/// <param name="signature">The base64 signature of the token</param>
/// <param name="token">The raw token to compute the hash of</param>
- /// <param name="verifcationKey">The key used to verify the singature with</param>
/// <returns>True if the singature matches, false otherwise</returns>
/// <exception cref="ArgumentNullException"></exception>
/// <exception cref="CryptographicException"></exception>
- public static bool VerifyUpgradeToken(string signature, string token, ReadOnlyJsonWebKey verifcationKey)
+ public static bool VerifyUpgradeToken(this ICacheAuthManager man, string signature, string token)
{
- _ = verifcationKey ?? throw new ArgumentNullException(nameof(verifcationKey));
+ _ = man ?? throw new ArgumentNullException(nameof(man));
//get the hash of the token
byte[] hash = ManagedHash.ComputeHash(token, HashAlg.SHA256);
@@ -726,23 +637,7 @@ namespace VNLib.Data.Caching.Extensions
//decode the signature
byte[] sig = Convert.FromBase64String(signature);
- //try to get the ecdsa key first
- using ECDsa? ec = verifcationKey.GetECDsaPublicKey();
- if(ec != null)
- {
- //Verify the signature
- return ec.VerifyHash(hash, sig, DSASignatureFormat.IeeeP1363FixedFieldConcatenation);
- }
-
- //Check rsa next
- using RSA? rsa = verifcationKey.GetRSAPublicKey();
- if(rsa != null)
- {
- //Verify the signature
- return rsa.VerifyHash(hash, sig, HashAlgorithmName.SHA256, RSASignaturePadding.Pkcs1);
- }
-
- throw new CryptographicException("Cache JKW does not export a supported public key for upgrade challenges");
+ return man.VerifyMessageHash(hash, HashAlg.SHA256, sig);
}
private static string GetAdvertismentHeader(CacheNodeConfiguration nodeConfiguration)
@@ -756,7 +651,7 @@ namespace VNLib.Data.Caching.Extensions
using JsonWebToken jwt = new();
//Get the jwt header
- jwt.WriteHeader(nodeConfiguration.GetJwtHeader());
+ jwt.WriteHeader(nodeConfiguration.AuthManager.GetJwtHeader());
jwt.InitPayloadClaim()
.AddClaim("nonce", RandomHash.GetRandomBase32(16))
@@ -768,7 +663,7 @@ namespace VNLib.Data.Caching.Extensions
.CommitClaims();
//Sign message
- nodeConfiguration.SignJwt(jwt);
+ nodeConfiguration.AuthManager.SignJwt(jwt);
return jwt.Compile();
}
@@ -780,12 +675,12 @@ namespace VNLib.Data.Caching.Extensions
/// <param name="message">The advertisment message to verify</param>
/// <returns>The advertisment message if successfully verified, or null otherwise</returns>
/// <exception cref="FormatException"></exception>
- public static ICachePeerAdvertisment? VerifyPeerAdvertisment(this ICacheJwtManager config, string message)
+ public static ICacheNodeAdvertisment? VerifyPeerAdvertisment(this ICacheAuthManager config, string message)
{
using JsonWebToken jwt = JsonWebToken.Parse(message);
//Verify the signature
- if (!config.VerifyCache(jwt))
+ if (!config.VerifyJwt(jwt))
{
return null;
}
@@ -800,7 +695,7 @@ namespace VNLib.Data.Caching.Extensions
/// </summary>
/// <param name="servers"></param>
/// <returns>A server selected at random</returns>
- public static ICachePeerAdvertisment SelectRandom(this ICollection<ICachePeerAdvertisment> servers)
+ public static ICacheNodeAdvertisment SelectRandom(this ICollection<ICacheNodeAdvertisment> servers)
{
//select random server
int randServer = RandomNumberGenerator.GetInt32(0, servers.Count);
@@ -808,7 +703,7 @@ namespace VNLib.Data.Caching.Extensions
}
- private class Advertisment : ICachePeerAdvertisment
+ private class Advertisment : ICacheNodeAdvertisment
{
[JsonIgnore]
public Uri? ConnectEndpoint { get; set; }
diff --git a/lib/VNLib.Data.Caching.Extensions/src/ICacheAuthManager.cs b/lib/VNLib.Data.Caching.Extensions/src/ICacheAuthManager.cs
new file mode 100644
index 0000000..e3ab868
--- /dev/null
+++ b/lib/VNLib.Data.Caching.Extensions/src/ICacheAuthManager.cs
@@ -0,0 +1,76 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Data.Caching.Extensions
+* File: ClientCacheConfiguration.cs
+*
+* ClientCacheConfiguration.cs is part of VNLib.Data.Caching.Extensions which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* VNLib.Data.Caching.Extensions is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Data.Caching.Extensions is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+using System.Collections.Generic;
+
+using VNLib.Hashing;
+using VNLib.Hashing.IdentityUtility;
+
+namespace VNLib.Data.Caching.Extensions
+{
+ /// <summary>
+ /// Provides authentication services for cache clients and
+ /// servers.
+ /// </summary>
+ public interface ICacheAuthManager
+ {
+ /// <summary>
+ /// Gets the JWT header to use for signing messages with the
+ /// given key
+ /// </summary>
+ /// <returns>The JWT header collection</returns>
+ IReadOnlyDictionary<string, string?> GetJwtHeader();
+
+ /// <summary>
+ /// Signs the given JWT
+ /// </summary>
+ /// <param name="jwt">The message to sign</param>
+ void SignJwt(JsonWebToken jwt);
+
+ /// <summary>
+ /// Verifies the given JWT
+ /// </summary>
+ /// <param name="jwt">The message to verify authenticity</param>
+ /// <returns>True of the JWT could be verified, false otherwise</returns>
+ bool VerifyJwt(JsonWebToken jwt);
+
+ /// <summary>
+ /// Signs the given message hash
+ /// </summary>
+ /// <param name="hash">The message hash to sign</param>
+ /// <param name="alg">The algorithm used to sign the message hash</param>
+ /// <returns>The signature of the hash</returns>
+ byte[] SignMessageHash(byte[] hash, HashAlg alg);
+
+ /// <summary>
+ /// Verifies the given message hash against the signature.
+ /// </summary>
+ /// <param name="hash">The message hash to compare</param>
+ /// <param name="alg">The algorithm used to produce the message hash</param>
+ /// <param name="signature">The message signature to verify the message against</param>
+ /// <returns>True of the signature could be verified</returns>
+ bool VerifyMessageHash(ReadOnlySpan<byte> hash, HashAlg alg, ReadOnlySpan<byte> signature);
+ }
+}
diff --git a/lib/VNLib.Data.Caching.Extensions/src/ICacheDiscoveryErrorHandler.cs b/lib/VNLib.Data.Caching.Extensions/src/ICacheDiscoveryErrorHandler.cs
new file mode 100644
index 0000000..3493d48
--- /dev/null
+++ b/lib/VNLib.Data.Caching.Extensions/src/ICacheDiscoveryErrorHandler.cs
@@ -0,0 +1,41 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Data.Caching.Extensions
+* File: ClientCacheConfiguration.cs
+*
+* ClientCacheConfiguration.cs is part of VNLib.Data.Caching.Extensions which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* VNLib.Data.Caching.Extensions is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Data.Caching.Extensions is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+
+namespace VNLib.Data.Caching.Extensions
+{
+ /// <summary>
+ /// Represents an type that will handle errors that occur during the discovery process
+ /// </summary>
+ public interface ICacheDiscoveryErrorHandler
+ {
+ /// <summary>
+ /// Invoked when an error occurs during the discovery process
+ /// </summary>
+ /// <param name="errorNode">The node that the error occured on</param>
+ /// <param name="ex">The exception that caused the invocation</param>
+ void OnDiscoveryError(ICacheNodeAdvertisment errorNode, Exception ex);
+ }
+}
diff --git a/lib/VNLib.Data.Caching.Extensions/src/ICachePeerAdvertisment.cs b/lib/VNLib.Data.Caching.Extensions/src/ICacheNodeAdvertisment.cs
index acf883e..fc29955 100644
--- a/lib/VNLib.Data.Caching.Extensions/src/ICachePeerAdvertisment.cs
+++ b/lib/VNLib.Data.Caching.Extensions/src/ICacheNodeAdvertisment.cs
@@ -3,9 +3,9 @@
*
* Library: VNLib
* Package: VNLib.Data.Caching.Extensions
-* File: ICachePeerAdvertisment.cs
+* File: ICacheNodeAdvertisment.cs
*
-* ICachePeerAdvertisment.cs is part of VNLib.Data.Caching.Extensions which is part of the larger
+* ICacheNodeAdvertisment.cs is part of VNLib.Data.Caching.Extensions which is part of the larger
* VNLib collection of libraries and utilities.
*
* VNLib.Data.Caching.Extensions is free software: you can redistribute it and/or modify
@@ -30,7 +30,7 @@ namespace VNLib.Data.Caching.Extensions
/// <summary>
/// Represents a node that can be advertised to clients
/// </summary>
- public interface ICachePeerAdvertisment
+ public interface ICacheNodeAdvertisment
{
/// <summary>
/// The endpoint for clients to connect to to access the cache
diff --git a/lib/VNLib.Data.Caching.Extensions/src/INodeDiscoveryCollection.cs b/lib/VNLib.Data.Caching.Extensions/src/INodeDiscoveryCollection.cs
new file mode 100644
index 0000000..9adebdc
--- /dev/null
+++ b/lib/VNLib.Data.Caching.Extensions/src/INodeDiscoveryCollection.cs
@@ -0,0 +1,62 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Data.Caching.Extensions
+* File: INodeDiscoveryCollection.cs
+*
+* INodeDiscoveryCollection.cs is part of VNLib.Data.Caching.Extensions which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* VNLib.Data.Caching.Extensions is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Data.Caching.Extensions is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+
+using System.Collections.Generic;
+
+namespace VNLib.Data.Caching.Extensions
+{
+ /// <summary>
+ /// Represents a collection of discovered nodes
+ /// </summary>
+#pragma warning disable CA1711 // Identifiers should not have incorrect suffix
+ public interface INodeDiscoveryCollection
+#pragma warning restore CA1711 // Identifiers should not have incorrect suffix
+ {
+ /// <summary>
+ /// Begins a new discovery and gets an enumerator for the discovery process
+ /// </summary>
+ /// <returns>An enumerator that simplifies discovery of unique nodes</returns>
+ INodeDiscoveryEnumerator BeginDiscovery();
+
+ /// <summary>
+ /// Begins a new discovery and gets an enumerator for the discovery process
+ /// </summary>
+ /// <param name="initialPeers">An initial collection of peers to add to the enumeration</param>
+ /// <returns>An enumerator that simplifies discovery of unique nodes</returns>
+ INodeDiscoveryEnumerator BeginDiscovery(IEnumerable<ICacheNodeAdvertisment> initialPeers);
+
+ /// <summary>
+ /// Gets a snapshot of all discovered nodes in the current collection.
+ /// </summary>
+ /// <returns>The current collection of notes</returns>
+ ICacheNodeAdvertisment[] GetAllNodes();
+
+ /// <summary>
+ /// Completes a discovery process and updates the collection with the results
+ /// </summary>
+ /// <param name="enumerator">The enumerator used to collect discovered nodes</param>
+ void CompleteDiscovery(INodeDiscoveryEnumerator enumerator);
+ }
+}
diff --git a/lib/VNLib.Data.Caching.Extensions/src/INodeDiscoveryEnumerator.cs b/lib/VNLib.Data.Caching.Extensions/src/INodeDiscoveryEnumerator.cs
new file mode 100644
index 0000000..f6d5f40
--- /dev/null
+++ b/lib/VNLib.Data.Caching.Extensions/src/INodeDiscoveryEnumerator.cs
@@ -0,0 +1,42 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Data.Caching.Extensions
+* File: INodeDiscoveryEnumerator.cs
+*
+* INodeDiscoveryEnumerator.cs is part of VNLib.Data.Caching.Extensions which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* VNLib.Data.Caching.Extensions is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Data.Caching.Extensions is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+
+using System.Collections.Generic;
+
+
+namespace VNLib.Data.Caching.Extensions
+{
+ /// <summary>
+ /// A custom enumerator for the node discovery process
+ /// </summary>
+ public interface INodeDiscoveryEnumerator : IEnumerator<ICacheNodeAdvertisment>
+ {
+ /// <summary>
+ /// Adds the specified peer to the collection of discovered peers
+ /// </summary>
+ /// <param name="discoveredPeers">The peer collection</param>
+ void OnPeerDiscoveryComplete(IEnumerable<ICacheNodeAdvertisment> discoveredPeers);
+ }
+}
diff --git a/lib/VNLib.Data.Caching.Extensions/src/NodeDiscoveryCollection.cs b/lib/VNLib.Data.Caching.Extensions/src/NodeDiscoveryCollection.cs
new file mode 100644
index 0000000..305f5de
--- /dev/null
+++ b/lib/VNLib.Data.Caching.Extensions/src/NodeDiscoveryCollection.cs
@@ -0,0 +1,117 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Data.Caching.Extensions
+* File: INodeDiscoveryCollection.cs
+*
+* INodeDiscoveryCollection.cs is part of VNLib.Data.Caching.Extensions which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* VNLib.Data.Caching.Extensions is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Data.Caching.Extensions is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+using System.Linq;
+using System.Collections.Generic;
+using System.Collections;
+
+namespace VNLib.Data.Caching.Extensions
+{
+ /// <summary>
+ /// Represents a collection of available cache nodes from a discovery process
+ /// </summary>
+ public sealed class NodeDiscoveryCollection : INodeDiscoveryCollection
+ {
+ private LinkedList<ICacheNodeAdvertisment> _peers;
+
+ /// <summary>
+ /// Initializes a new empty <see cref="NodeDiscoveryCollection"/>
+ /// </summary>
+ public NodeDiscoveryCollection()
+ {
+ _peers = new();
+ }
+
+ ///<inheritdoc/>
+ public INodeDiscoveryEnumerator BeginDiscovery()
+ {
+ return new NodeEnumerator(new());
+ }
+
+ ///<inheritdoc/>
+ public INodeDiscoveryEnumerator BeginDiscovery(IEnumerable<ICacheNodeAdvertisment> initialPeers)
+ {
+ //Init new enumerator with the initial peers
+ return new NodeEnumerator(new(initialPeers));
+ }
+
+ ///<inheritdoc/>
+ public void CompleteDiscovery(INodeDiscoveryEnumerator enumerator)
+ {
+ _ = enumerator ?? throw new ArgumentNullException(nameof(enumerator));
+
+ //Capture all nodes from the enumerator and store them as our current peers
+ _peers = (enumerator as NodeEnumerator)!.Peers;
+ }
+
+ ///<inheritdoc/>
+ public ICacheNodeAdvertisment[] GetAllNodes()
+ {
+ //Capture all current peers
+ return _peers.ToArray();
+ }
+
+ private sealed record class NodeEnumerator(LinkedList<ICacheNodeAdvertisment> Peers) : INodeDiscoveryEnumerator
+ {
+ //Keep track of the current node in the collection so we can move down the list
+ private LinkedListNode<ICacheNodeAdvertisment>? _currentNode = Peers.First;
+
+ public ICacheNodeAdvertisment Current => _currentNode?.Value;
+ object IEnumerator.Current => _currentNode?.Value;
+
+
+ ///<inheritdoc/>
+ public bool MoveNext()
+ {
+ //Move to the next peer in the collection
+ _currentNode = _currentNode?.Next;
+
+ return _currentNode?.Value != null;
+ }
+
+ ///<inheritdoc/>
+ public void OnPeerDiscoveryComplete(IEnumerable<ICacheNodeAdvertisment> discoveredPeers)
+ {
+ //Get only the peers from the discovery that are not already in the collection
+ IEnumerable<ICacheNodeAdvertisment> newPeers = discoveredPeers.Except(Peers);
+
+ //Add them to the end of the collection
+ foreach (ICacheNodeAdvertisment ad in newPeers)
+ {
+ Peers.AddLast(ad);
+ }
+ }
+
+ public void Reset()
+ {
+ //Go to the first node
+ _currentNode = Peers.First;
+ }
+
+ public void Dispose()
+ { }
+ }
+ }
+}