diff options
Diffstat (limited to 'lib/VNLib.Data.Caching.Extensions')
-rw-r--r-- | lib/VNLib.Data.Caching.Extensions/src/ActiveServer.cs | 2 | ||||
-rw-r--r-- | lib/VNLib.Data.Caching.Extensions/src/BrokerRegistrationRequest.cs | 116 | ||||
-rw-r--r-- | lib/VNLib.Data.Caching.Extensions/src/CacheClientConfiguration.cs | 106 | ||||
-rw-r--r-- | lib/VNLib.Data.Caching.Extensions/src/CacheListServerRequest.cs | 82 | ||||
-rw-r--r-- | lib/VNLib.Data.Caching.Extensions/src/CacheNodeConfiguration.cs | 14 | ||||
-rw-r--r-- | lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs | 317 | ||||
-rw-r--r-- | lib/VNLib.Data.Caching.Extensions/src/ICacheAuthManager.cs | 76 | ||||
-rw-r--r-- | lib/VNLib.Data.Caching.Extensions/src/ICacheDiscoveryErrorHandler.cs | 41 | ||||
-rw-r--r-- | lib/VNLib.Data.Caching.Extensions/src/ICacheNodeAdvertisment.cs (renamed from lib/VNLib.Data.Caching.Extensions/src/ICachePeerAdvertisment.cs) | 6 | ||||
-rw-r--r-- | lib/VNLib.Data.Caching.Extensions/src/INodeDiscoveryCollection.cs | 62 | ||||
-rw-r--r-- | lib/VNLib.Data.Caching.Extensions/src/INodeDiscoveryEnumerator.cs | 42 | ||||
-rw-r--r-- | lib/VNLib.Data.Caching.Extensions/src/NodeDiscoveryCollection.cs | 117 |
12 files changed, 498 insertions, 483 deletions
diff --git a/lib/VNLib.Data.Caching.Extensions/src/ActiveServer.cs b/lib/VNLib.Data.Caching.Extensions/src/ActiveServer.cs index 2d02491..3020376 100644 --- a/lib/VNLib.Data.Caching.Extensions/src/ActiveServer.cs +++ b/lib/VNLib.Data.Caching.Extensions/src/ActiveServer.cs @@ -27,7 +27,7 @@ using System.Text.Json.Serialization; namespace VNLib.Data.Caching.Extensions { - public class ActiveServer : ICachePeerAdvertisment + public class ActiveServer : ICacheNodeAdvertisment { [JsonPropertyName("address")] public string? HostName { get; set; } diff --git a/lib/VNLib.Data.Caching.Extensions/src/BrokerRegistrationRequest.cs b/lib/VNLib.Data.Caching.Extensions/src/BrokerRegistrationRequest.cs deleted file mode 100644 index 9ec559a..0000000 --- a/lib/VNLib.Data.Caching.Extensions/src/BrokerRegistrationRequest.cs +++ /dev/null @@ -1,116 +0,0 @@ -/* -* Copyright (c) 2023 Vaughn Nugent -* -* Library: VNLib -* Package: VNLib.Data.Caching.Extensions -* File: BrokerRegistrationRequest.cs -* -* BrokerRegistrationRequest.cs is part of VNLib.Data.Caching.Extensions which is part of the larger -* VNLib collection of libraries and utilities. -* -* VNLib.Data.Caching.Extensions is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License as -* published by the Free Software Foundation, either version 3 of the -* License, or (at your option) any later version. -* -* VNLib.Data.Caching.Extensions is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see https://www.gnu.org/licenses/. -*/ - -using System; -using System.Collections.Generic; - -using VNLib.Utils; -using VNLib.Hashing.IdentityUtility; - - -namespace VNLib.Data.Caching.Extensions -{ - /// <summary> - /// A broker registration request message in a fluent api - /// format. This message may be disposed when no longer in use - /// </summary> - public sealed class BrokerRegistrationRequest : VnDisposeable - { - private bool ownsKey; - private ReadOnlyJsonWebKey? SigningKey; - - /// <summary> - /// The cache server node id - /// </summary> - public string? NodeId { get; private set; } - /// <summary> - /// The broker server's address - /// </summary> - public Uri? BrokerAddress { get; private set; } - /// <summary> - /// The security token used by the broker server to - /// authenticate during heartbeat connections - /// </summary> - public string? HeartbeatToken { get; private set; } - /// <summary> - /// The address for remote clients to use to - /// connect to this server - /// </summary> - public string? RegistrationAddress { get; private set; } - - /// <summary> - /// Recovers the private key from the supplied certificate - /// </summary> - /// <param name="jwk">The private key used to sign messages</param> - /// <param name="ownsKey">A value that indicates if the current instance owns the key</param> - /// <returns></returns> - /// <exception cref="ArgumentException"></exception> - public BrokerRegistrationRequest WithSigningKey(ReadOnlyJsonWebKey jwk, bool ownsKey) - { - this.ownsKey = ownsKey; - SigningKey = jwk ?? throw new ArgumentNullException(nameof(jwk)); - return this; - } - - public BrokerRegistrationRequest WithBroker(Uri brokerUri) - { - BrokerAddress = brokerUri; - return this; - } - - public BrokerRegistrationRequest WithRegistrationAddress(string address) - { - RegistrationAddress = address; - return this; - } - - public BrokerRegistrationRequest WithHeartbeatToken(string token) - { - HeartbeatToken = token; - return this; - } - - public BrokerRegistrationRequest WithNodeId(string nodeId) - { - NodeId = nodeId; - return this; - } - - internal void SignJwt(JsonWebToken jwt) - { - jwt.SignFromJwk(SigningKey); - } - - internal IReadOnlyDictionary<string, string?> JsonHeader => SigningKey!.JwtHeader; - - ///<inheritdoc/> - protected override void Free() - { - if (ownsKey) - { - SigningKey?.Dispose(); - } - } - } -} diff --git a/lib/VNLib.Data.Caching.Extensions/src/CacheClientConfiguration.cs b/lib/VNLib.Data.Caching.Extensions/src/CacheClientConfiguration.cs index 05e4928..9229c89 100644 --- a/lib/VNLib.Data.Caching.Extensions/src/CacheClientConfiguration.cs +++ b/lib/VNLib.Data.Caching.Extensions/src/CacheClientConfiguration.cs @@ -3,9 +3,9 @@ * * Library: VNLib * Package: VNLib.Data.Caching.Extensions -* File: ClientCacheConfiguration.cs +* File: CacheClientConfiguration.cs * -* ClientCacheConfiguration.cs is part of VNLib.Data.Caching.Extensions which is part of the larger +* CacheClientConfiguration.cs is part of VNLib.Data.Caching.Extensions which is part of the larger * VNLib collection of libraries and utilities. * * VNLib.Data.Caching.Extensions is free software: you can redistribute it and/or modify @@ -22,69 +22,47 @@ * along with this program. If not, see https://www.gnu.org/licenses/. */ -using System; +using System.Linq; using System.Collections.Generic; -using System.Security.Cryptography; - -using VNLib.Hashing; -using VNLib.Hashing.IdentityUtility; namespace VNLib.Data.Caching.Extensions { - public interface ICacheJwtManager - { - IReadOnlyDictionary<string, string?> GetJwtHeader(); - - void SignJwt(JsonWebToken jwt); - - bool VerifyCache(JsonWebToken jwt); - - bool VerifyBroker(JsonWebToken jwt); - } - /// <summary> /// A fluent api configuration object for configuring a <see cref="FBMClient"/> /// to connect to cache servers. /// </summary> - public class CacheClientConfiguration : ICacheJwtManager, ICacheListServerRequest + public class CacheClientConfiguration { - public ReadOnlyJsonWebKey? SigningKey { get; private set; } - public ReadOnlyJsonWebKey? VerificationKey { get; private set; } - public ReadOnlyJsonWebKey? BrokerVerificationKey { get; private set; } - - public Uri? DiscoveryEndpoint { get; private set; } - public bool UseTls { get; private set; } - internal ICachePeerAdvertisment[]? CacheServers { get; set; } + /// <summary> + /// Stores available cache servers to be used for discovery, and connections + /// </summary> + public INodeDiscoveryCollection NodeCollection { get; } = new NodeDiscoveryCollection(); /// <summary> - /// Imports the private key used to sign messages + /// The authentication manager to use for signing and verifying messages to and from the cache servers /// </summary> - /// <param name="jwk">The <see cref="ReadOnlyJsonWebKey"/> with a private key loaded</param> - /// <returns>Chainable fluent object</returns> - /// <exception cref="ArgumentException"></exception> - /// <exception cref="CryptographicException"></exception> - public CacheClientConfiguration WithSigningKey(ReadOnlyJsonWebKey jwk) - { - SigningKey = jwk ?? throw new ArgumentNullException(nameof(jwk)); - return this; - } + public ICacheAuthManager AuthManager { get; private set; } /// <summary> - /// Imports the public key used to verify messages from the remote server + /// The error handler to use for handling errors that occur during the discovery process /// </summary> - /// <param name="jwk">The <see cref="ReadOnlyJsonWebKey"/> public key only used for message verification</param> - /// <returns>Chainable fluent object</returns> - /// <exception cref="ArgumentException"></exception> - /// <exception cref="CryptographicException"></exception> - public CacheClientConfiguration WithVerificationKey(ReadOnlyJsonWebKey jwk) - { - VerificationKey = jwk ?? throw new ArgumentNullException(nameof(jwk)); - return this; - } + public ICacheDiscoveryErrorHandler? ErrorHandler { get; private set; } - public CacheClientConfiguration WithBrokerVerificationKey(ReadOnlyJsonWebKey jwk) + /// <summary> + /// Specifies if all connections should use TLS + /// </summary> + public bool UseTls { get; private set; } + + internal ICacheNodeAdvertisment[]? InitialPeers { get; set; } + + /// <summary> + /// Specifies the JWT authentication manager to use for signing and verifying JWTs + /// </summary> + /// <param name="manager">The authentication manager</param> + /// <returns>Chainable fluent object</returns> + public CacheClientConfiguration WithAuthenticator(ICacheAuthManager manager) { - BrokerVerificationKey = jwk ?? throw new ArgumentNullException(nameof(jwk)); + AuthManager = manager; return this; } @@ -92,7 +70,6 @@ namespace VNLib.Data.Caching.Extensions /// Specifies if all connections should be using TLS /// </summary> /// <param name="useTls">A value that indicates if connections should use TLS</param> - /// <returns>Chainable fluent object</returns> public CacheClientConfiguration WithTls(bool useTls) { UseTls = useTls; @@ -100,28 +77,25 @@ namespace VNLib.Data.Caching.Extensions } /// <summary> - /// Specifies the broker address to discover cache nodes from + /// Specifies the initial cache peers to connect to /// </summary> - /// <param name="brokerAddress">The address of the server broker</param> + /// <param name="peers">The collection of servers to discover peers from and connect to</param> /// <returns>Chainable fluent object</returns> - /// <exception cref="ArgumentNullException"></exception> - public CacheClientConfiguration WithBroker(Uri brokerAddress) + public CacheClientConfiguration WithInitialPeers(IEnumerable<ICacheNodeAdvertisment> peers) { - DiscoveryEndpoint = brokerAddress ?? throw new ArgumentNullException(nameof(brokerAddress)); + InitialPeers = peers.ToArray(); return this; } - - ///<inheritdoc/> - public void SignJwt(JsonWebToken jwt) => jwt.SignFromJwk(SigningKey!); - - ///<inheritdoc/> - public bool VerifyCache(JsonWebToken jwt) => jwt.VerifyFromJwk(VerificationKey!); - - ///<inheritdoc/> - public bool VerifyBroker(JsonWebToken jwt) => jwt.VerifyFromJwk(BrokerVerificationKey!); - - ///<inheritdoc/> - public IReadOnlyDictionary<string, string?> GetJwtHeader() => SigningKey!.JwtHeader; + /// <summary> + /// Specifies the error handler to use for handling errors that occur during the discovery process + /// </summary> + /// <param name="handler">The error handler to use during a discovery</param> + /// <returns>Chainable fluent object</returns> + public CacheClientConfiguration WithErrorHandler(ICacheDiscoveryErrorHandler handler) + { + ErrorHandler = handler; + return this; + } } } diff --git a/lib/VNLib.Data.Caching.Extensions/src/CacheListServerRequest.cs b/lib/VNLib.Data.Caching.Extensions/src/CacheListServerRequest.cs deleted file mode 100644 index 76d4ad8..0000000 --- a/lib/VNLib.Data.Caching.Extensions/src/CacheListServerRequest.cs +++ /dev/null @@ -1,82 +0,0 @@ -/* -* Copyright (c) 2023 Vaughn Nugent -* -* Library: VNLib -* Package: VNLib.Data.Caching.Extensions -* File: ListServerRequest.cs -* -* ListServerRequest.cs is part of VNLib.Data.Caching.Extensions which is part of the larger -* VNLib collection of libraries and utilities. -* -* VNLib.Data.Caching.Extensions is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License as -* published by the Free Software Foundation, either version 3 of the -* License, or (at your option) any later version. -* -* VNLib.Data.Caching.Extensions is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see https://www.gnu.org/licenses/. -*/ - -using System; -using System.Collections.Generic; - -using VNLib.Utils; -using VNLib.Hashing.IdentityUtility; - -namespace VNLib.Data.Caching.Extensions -{ - public interface ICacheListServerRequest : ICacheJwtManager - { - Uri DiscoveryEndpoint { get; } - } - - /// <summary> - /// A request container for a ListServer request - /// </summary> - public sealed class CacheListServerRequest : ICacheListServerRequest - { - private readonly ICacheJwtManager _manager; - - - /// <summary> - /// The address of the broker server to connect to - /// </summary> - public Uri DiscoveryEndpoint { get; private set; } - - public CacheListServerRequest(ICacheJwtManager keyManager, Uri? brokerAddress = null) - { - _manager = keyManager; - DiscoveryEndpoint = brokerAddress!; - } - - - /// <summary> - /// Sets the broker address for the request - /// </summary> - /// <param name="brokerAddr">The broker server's address to connect to</param> - /// <returns>A fluent chainable value</returns> - /// <exception cref="ArgumentNullException"></exception> - public CacheListServerRequest WithDiscoveryEndpoint(Uri brokerAddr) - { - DiscoveryEndpoint = brokerAddr ?? throw new ArgumentNullException(nameof(brokerAddr)); - return this; - } - - /// <inheritdoc/> - public void SignJwt(JsonWebToken jwt) => _manager.SignJwt(jwt); - - /// <inheritdoc/> - public bool VerifyCache(JsonWebToken jwt) => _manager.VerifyCache(jwt); - - /// <inheritdoc/> - public bool VerifyBroker(JsonWebToken jwt) => _manager.VerifyBroker(jwt); - - /// <inheritdoc/> - public IReadOnlyDictionary<string, string?> GetJwtHeader() => _manager.GetJwtHeader(); - } -} diff --git a/lib/VNLib.Data.Caching.Extensions/src/CacheNodeConfiguration.cs b/lib/VNLib.Data.Caching.Extensions/src/CacheNodeConfiguration.cs index 21a99e1..29a763c 100644 --- a/lib/VNLib.Data.Caching.Extensions/src/CacheNodeConfiguration.cs +++ b/lib/VNLib.Data.Caching.Extensions/src/CacheNodeConfiguration.cs @@ -24,11 +24,13 @@ using System; - namespace VNLib.Data.Caching.Extensions { - public class CacheNodeConfiguration: CacheClientConfiguration, ICachePeerAdvertisment + /// <summary> + /// A cache configuration for cache servers (nodes) + /// </summary> + public class CacheNodeConfiguration: CacheClientConfiguration, ICacheNodeAdvertisment { /// <summary> /// The address for clients to connect to @@ -56,9 +58,13 @@ namespace VNLib.Data.Caching.Extensions return this; } - public CacheNodeConfiguration EnableAdvertisment(bool enable, Uri? discoveryEndpoint) + /// <summary> + /// Enables or disables the advertisement of this node to other nodes + /// </summary> + /// <param name="discoveryEndpoint">The absolute endpoint clients will use to connect to</param> + public CacheNodeConfiguration EnableAdvertisment(Uri? discoveryEndpoint) { - BroadcastAdverisment = enable; + BroadcastAdverisment = discoveryEndpoint != null; DiscoveryEndpoint = discoveryEndpoint; return this; } diff --git a/lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs b/lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs index 9efe16a..634b6de 100644 --- a/lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs +++ b/lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs @@ -139,91 +139,114 @@ namespace VNLib.Data.Caching.Extensions client.Config.DebugLog?.Debug("{debug}: {data}", "[CACHE]", message); } + /// <summary> - /// Creats a new <see cref="CacheListServerRequest"/> from an existing <see cref="CacheClientConfiguration"/> + /// Discovers ALL possible cache nodes itteritivley from the current collection of initial peers. + /// This will make connections to all discoverable servers /// </summary> - /// <param name="conf">The prepared client configuration</param> - /// <returns>The new <see cref="CacheListServerRequest"/></returns> - public static CacheListServerRequest GetListMessage(this CacheClientConfiguration conf) + /// <param name="config"></param> + /// <param name="cancellation">A token to cancel the operation</param> + /// <returns></returns> + /// <exception cref="ArgumentException"></exception> + public static async Task DiscoverNodesAsync(this CacheClientConfiguration config, CancellationToken cancellation) { - return new(conf, conf.DiscoveryEndpoint); + //Make sure at least one node defined + if(config?.InitialPeers == null || config.InitialPeers.Length == 0) + { + throw new ArgumentException("There must be at least one cache server defined in the client configuration"); + } + + //Get the discovery enumerator with the initial peers + INodeDiscoveryEnumerator enumerator = config.NodeCollection.BeginDiscovery(config.InitialPeers); + + //Start the discovery process + await DiscoverNodesAsync(enumerator, config.AuthManager, config.ErrorHandler, cancellation); + + //Commit nodes + config.NodeCollection.CompleteDiscovery(enumerator); } - /// <summary> - /// Discovers peer nodes from a given initial peer and returns a list of discovered nodes. If the config - /// is for a cache peer node, the current peer is removed from the list of discovered nodes. - /// </summary> - /// <param name="cacheConfig"></param> - /// <param name="initialPeer">The initial peer to discover nodes from</param> - /// <param name="cancellation">A token to cancel the discovery operation</param> - /// <returns>The collection of discovered nodes</returns> - /// <exception cref="ArgumentNullException"></exception> - public static async Task<ICachePeerAdvertisment[]?> DiscoverClusterNodesAsync( - this CacheClientConfiguration cacheConfig, - ICachePeerAdvertisment initialPeer, - CancellationToken cancellation + private static async Task DiscoverNodesAsync( + INodeDiscoveryEnumerator enumerator, + ICacheAuthManager auth, + ICacheDiscoveryErrorHandler? errHandler, + CancellationToken cancellation ) { - _ = initialPeer?.DiscoveryEndpoint ?? throw new ArgumentException("Advertisment does not expose an advertisment endpoint"); - - //Create list request - CacheListServerRequest request = cacheConfig.GetListMessage(); + //Loop through servers + while (enumerator.MoveNext()) + { + //Make sure the node has a discovery endpoint + if (enumerator.Current.DiscoveryEndpoint == null) + { + //Skip this node + continue; + } - //Override with the initial peer's discovery endpoint - request.WithDiscoveryEndpoint(initialPeer.DiscoveryEndpoint); + /* + * We are allowed to save nodes that do not have a discovery endpoint, but we cannot discover nodes from them + * we can only use them as cache + */ - //Get the list of servers - ICachePeerAdvertisment[]? servers = await ListServersAsync(request, cancellation); + //add a random delay to avoid spamming the server + await Task.Delay((int)Random.Shared.NextInt64(50, 500), cancellation); - if (servers == null) - { - return null; - } - - if(cacheConfig is CacheNodeConfiguration cnc) - { - //Filter out the current node - return servers.Where(s => !cnc.NodeId.Equals(s.NodeId, StringComparison.OrdinalIgnoreCase)).ToArray(); + try + { + //Discover nodes from the current node + ICacheNodeAdvertisment[]? nodes = await GetCacheNodesAsync(enumerator.Current, auth, cancellation); + + if (nodes != null) + { + //Add nodes to the collection + enumerator.OnPeerDiscoveryComplete(nodes); + } + } + //Catch exceptions when an error handler is defined + catch(Exception ex) when (errHandler != null) + { + //Handle the error + errHandler.OnDiscoveryError(enumerator.Current, ex); + } } - else - { - //Do not filter - return servers; - } } /// <summary> /// Contacts the cache broker to get a list of active servers to connect to /// </summary> - /// <param name="request">The request message used to connecto the broker server</param> + /// <param name="advert">An advertisment of a server to discover other nodes from</param> /// <param name="cancellationToken">A token to cancel the operationS</param> + /// <param name="auth">The authentication manager</param> /// <returns>The list of active servers</returns> /// <exception cref="SecurityException"></exception> + /// <exception cref="ArgumentException"></exception> /// <exception cref="ArgumentNullException"></exception> - public static async Task<ICachePeerAdvertisment[]?> ListServersAsync(ICacheListServerRequest request, CancellationToken cancellationToken = default) + public static async Task<ICacheNodeAdvertisment[]?> GetCacheNodesAsync(ICacheNodeAdvertisment advert, ICacheAuthManager auth, CancellationToken cancellationToken = default) { - _ = request ?? throw new ArgumentNullException(nameof(request)); + _ = advert ?? throw new ArgumentNullException(nameof(advert)); + _ = auth ?? throw new ArgumentNullException(nameof(auth)); + _ = advert.DiscoveryEndpoint ?? throw new ArgumentException("Advertisment does not expose an advertisment endpoint"); string jwtBody; //Build request jwt using (JsonWebToken requestJwt = new()) { - requestJwt.WriteHeader(request.GetJwtHeader()); + requestJwt.WriteHeader(auth.GetJwtHeader()); requestJwt.InitPayloadClaim() .AddClaim("iat", DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()) .AddClaim("nonce", RandomHash.GetRandomBase32(16)) .CommitClaims(); //sign the jwt - request.SignJwt(requestJwt); + auth.SignJwt(requestJwt); //Compile the jwt jwtBody = requestJwt.Compile(); } //New list request - RestRequest listRequest = new(request.DiscoveryEndpoint, Method.Post); + RestRequest listRequest = new(advert.DiscoveryEndpoint, Method.Post); //Add the jwt as a string to the request body listRequest.AddStringBody(jwtBody, DataFormat.None); @@ -248,87 +271,18 @@ namespace VNLib.Data.Caching.Extensions //Response is jwt using JsonWebToken responseJwt = JsonWebToken.ParseRaw(data); - + //Verify the jwt - if (!request.VerifyBroker(responseJwt)) + if (!auth.VerifyJwt(responseJwt)) { throw new SecurityException("Failed to verify the broker's challenge, cannot continue"); } - + using JsonDocument doc = responseJwt.GetPayload(); return doc.RootElement.GetProperty("peers").Deserialize<Advertisment[]>(); } /// <summary> - /// Registers the current node with the broker - /// </summary> - /// <returns>A task that completes when the regitration has been made successfully</returns> - /// <exception cref="ArgumentException"></exception> - public static async Task RegisterWithBrokerAsync(this CacheNodeConfiguration config, string authToken) - { - //Recover the certificate - ReadOnlyJsonWebKey cacheCert = config?.SigningKey ?? throw new ArgumentException(nameof(config.SigningKey)); - - //init broker request - using BrokerRegistrationRequest request = new(); - - request.WithBroker(config.DiscoveryEndpoint!) - .WithRegistrationAddress(config.ConnectEndpoint!.ToString()) - .WithNodeId(config.NodeId!) - .WithSigningKey(cacheCert, true) - .WithHeartbeatToken(authToken); - - - //Send the request - await RegisterWithBrokerAsync(request); - } - - /// <summary> - /// Registers the current server as active with the specified broker - /// </summary> - /// <param name="registration">The registration request</param> - public static async Task RegisterWithBrokerAsync(BrokerRegistrationRequest registration) - { - _ = registration ?? throw new ArgumentNullException(nameof(registration)); - _ = registration.HeartbeatToken ?? throw new ArgumentException("Missing required heartbeat access token"); - _ = registration.NodeId ?? throw new ArgumentException("Missing required cache server NodeId"); - _ = registration.BrokerAddress ?? throw new ArgumentException("Broker server address has not been configured"); - _ = registration.RegistrationAddress ?? throw new ArgumentException("Missing required registration address", nameof(registration)); - - string requestData; - //Create the jwt for signed registration message - using (JsonWebToken jwt = new()) - { - //Shared jwt header - jwt.WriteHeader(registration.JsonHeader); - //build jwt claim - jwt.InitPayloadClaim() - .AddClaim("address", registration.RegistrationAddress) - .AddClaim("sub", registration.NodeId) - .AddClaim("token", registration.HeartbeatToken) - .CommitClaims(); - - //Sign the jwt - registration.SignJwt(jwt); - //Compile and save - requestData = jwt.Compile(); - } - - //Create reg request message - RestRequest regRequest = new(registration.BrokerAddress); - regRequest.AddStringBody(requestData, DataFormat.None); - regRequest.AddHeader("Content-Type", "text/plain"); - - //Rent client - using ClientContract cc = ClientPool.Lease(); - - //Exec the regitration request - RestResponse response = await cc.Resource.ExecutePutAsync(regRequest); - response.ThrowIfError(); - } - - - /// <summary> /// Allows for configuration of an <see cref="FBMClient"/> /// for a connection to a cache server /// </summary> @@ -359,32 +313,7 @@ namespace VNLib.Data.Caching.Extensions ClientCacheConfig.AddOrUpdate(client, nodeConfig); return nodeConfig; } - - /// <summary> - /// Discovers cache nodes in the broker configured for the current client. - /// </summary> - /// <param name="client"></param> - /// <param name="token">A token to cancel the discovery</param> - /// <returns>A task the resolves the list of active servers on the broker server</returns> - public static Task<ICachePeerAdvertisment[]?> DiscoverCacheNodesAsync(this FBMClientWorkerBase client, CancellationToken token = default) - { - return client.Client.DiscoverCacheNodesAsync(token); - } - - /// <summary> - /// Discovers cache nodes in the broker configured for the current client. - /// </summary> - /// <param name="client"></param> - /// <param name="token">A token to cancel the discovery </param> - /// <returns>A task the resolves the list of active servers on the broker server</returns> - public static async Task<ICachePeerAdvertisment[]?> DiscoverCacheNodesAsync(this FBMClient client, CancellationToken token = default) - { - //Get the stored client config - CacheClientConfiguration conf = ClientCacheConfig.GetOrCreateValue(client); - - //List servers async - return conf.CacheServers = await ListServersAsync(conf, token); - } + /// <summary> /// Waits for the client to disconnect from the server while observing @@ -428,15 +357,18 @@ namespace VNLib.Data.Caching.Extensions /// <exception cref="ArgumentNullException"></exception> /// <exception cref="SecurityException"></exception> /// <exception cref="ObjectDisposedException"></exception> - public static async Task<ICachePeerAdvertisment> ConnectToRandomCacheAsync(this FBMClient client, CancellationToken cancellation = default) + public static async Task<ICacheNodeAdvertisment> ConnectToRandomCacheAsync(this FBMClient client, CancellationToken cancellation = default) { //Get stored config CacheClientConfiguration conf = ClientCacheConfig.GetOrCreateValue(client); - //Select random - ICachePeerAdvertisment? randomServer = conf.CacheServers?.SelectRandom() - ?? throw new ArgumentException("No servers detected, cannot connect"); + //Get all available nodes, or at least the initial peers + ICacheNodeAdvertisment[]? adverts = conf.NodeCollection.GetAllNodes() ?? conf.InitialPeers ?? throw new ArgumentException("No cache nodes discovered, cannot connect"); + //Select random node from all available nodes + ICacheNodeAdvertisment randomServer = adverts.SelectRandom(); + + //Connect to the random server await ConnectToCacheAsync(client, randomServer, cancellation); //Return the random server we connected to @@ -456,13 +388,14 @@ namespace VNLib.Data.Caching.Extensions /// <exception cref="ArgumentNullException"></exception> /// <exception cref="SecurityException"></exception> /// <exception cref="ObjectDisposedException"></exception> - public static Task ConnectToCacheAsync(this FBMClient client, ICachePeerAdvertisment server, CancellationToken token = default) + public static Task ConnectToCacheAsync(this FBMClient client, ICacheNodeAdvertisment server, CancellationToken token = default) { _ = client ?? throw new ArgumentNullException(nameof(client)); _ = server ?? throw new ArgumentNullException(nameof(server)); //Get stored config CacheClientConfiguration conf = ClientCacheConfig.GetOrCreateValue(client); + //Connect to server (no server id because client not replication server) return ConnectToCacheAsync(client, conf, server, token); } @@ -481,7 +414,7 @@ namespace VNLib.Data.Caching.Extensions /// <exception cref="ArgumentNullException"></exception> /// <exception cref="SecurityException"></exception> /// <exception cref="ObjectDisposedException"></exception> - public static Task ConnectToCacheAsync(this FBMClient client, ICachePeerAdvertisment server, CacheClientConfiguration explicitConfig, CancellationToken token = default) + public static Task ConnectToCacheAsync(this FBMClient client, ICacheNodeAdvertisment server, CacheClientConfiguration explicitConfig, CancellationToken token = default) { _ = client ?? throw new ArgumentNullException(nameof(client)); _ = server ?? throw new ArgumentNullException(nameof(server)); @@ -494,7 +427,7 @@ namespace VNLib.Data.Caching.Extensions private static async Task ConnectToCacheAsync( FBMClient client, CacheClientConfiguration config, - ICachePeerAdvertisment server, + ICacheNodeAdvertisment server, CancellationToken token = default ) { @@ -513,7 +446,7 @@ namespace VNLib.Data.Caching.Extensions //Init jwt for connecting to server using (JsonWebToken jwt = new()) { - jwt.WriteHeader(config.GetJwtHeader()); + jwt.WriteHeader(config.AuthManager.GetJwtHeader()); //Init claim JwtPayload claim = jwt.InitPayloadClaim(); @@ -532,7 +465,7 @@ namespace VNLib.Data.Caching.Extensions claim.CommitClaims(); //Sign jwt - config.SignJwt(jwt); + config.AuthManager.SignJwt(jwt); //Compile to string jwtMessage = jwt.Compile(); @@ -576,7 +509,7 @@ namespace VNLib.Data.Caching.Extensions using (JsonWebToken jwt = JsonWebToken.Parse(authToken)) { //Verify the jwt - if (!config.VerifyCache(jwt)) + if (!config.AuthManager.VerifyJwt(jwt)) { throw new SecurityException("Failed to verify the cache server's negotiation message, cannot continue"); } @@ -591,7 +524,7 @@ namespace VNLib.Data.Caching.Extensions client.ClientSocket.Headers[HttpRequestHeader.Authorization] = authToken; //Compute the signature of the upgrade token - client.ClientSocket.Headers[X_UPGRADE_SIG_HEADER] = GetBase64UpgradeSingature(authToken, config.SigningKey!); + client.ClientSocket.Headers[X_UPGRADE_SIG_HEADER] = config.AuthManager.GetBase64UpgradeSingature(authToken); //Check to see if adversize self is enabled if (cnc?.BroadcastAdverisment == true) @@ -660,38 +593,16 @@ namespace VNLib.Data.Caching.Extensions * compute a signature of the upgrade token and send it to the server to prove we hold the private key. */ - private static string GetBase64UpgradeSingature(string? token, ReadOnlyJsonWebKey key) + private static string GetBase64UpgradeSingature(this ICacheAuthManager man, string? token) { - //try to get the ecdsa key first - using ECDsa? ec = key.GetECDsaPrivateKey(); - - if(ec != null) - { - //Compute hash of the token - byte[] hash = ManagedHash.ComputeHash(token, HashAlg.SHA256); - - //Sign the hash - byte[] sig = ec.SignHash(hash, DSASignatureFormat.IeeeP1363FixedFieldConcatenation); - - //Return the base64 string - return Convert.ToBase64String(sig); - } - - //Check rsa next - using RSA? rsa = key.GetRSAPrivateKey(); - if(rsa != null) - { - //Compute hash of the token - byte[] hash = ManagedHash.ComputeHash(token, HashAlg.SHA256); + //Compute hash of the token + byte[] hash = ManagedHash.ComputeHash(token, HashAlg.SHA256); - //Sign the hash - byte[] sig = rsa.SignHash(hash, HashAlgorithmName.SHA256, RSASignaturePadding.Pkcs1); + //Sign the hash + byte[] sig = man.SignMessageHash(hash, HashAlg.SHA256); - //Return the base64 string - return Convert.ToBase64String(sig); - } - - throw new CryptographicException("Cache JKW does not export a supported private key for upgrade challenges"); + //Return the base64 string + return Convert.ToBase64String(sig); } /// <summary> @@ -704,21 +615,21 @@ namespace VNLib.Data.Caching.Extensions /// <exception cref="CryptographicException"></exception> public static bool VerifyUpgradeToken(this CacheClientConfiguration nodeConfig, string signature, string token) { - return VerifyUpgradeToken(signature, token, nodeConfig.VerificationKey); + return VerifyUpgradeToken(nodeConfig.AuthManager, signature, token); } /// <summary> /// Verifies the signed auth token against the given verification key /// </summary> + /// <param name="man"></param> /// <param name="signature">The base64 signature of the token</param> /// <param name="token">The raw token to compute the hash of</param> - /// <param name="verifcationKey">The key used to verify the singature with</param> /// <returns>True if the singature matches, false otherwise</returns> /// <exception cref="ArgumentNullException"></exception> /// <exception cref="CryptographicException"></exception> - public static bool VerifyUpgradeToken(string signature, string token, ReadOnlyJsonWebKey verifcationKey) + public static bool VerifyUpgradeToken(this ICacheAuthManager man, string signature, string token) { - _ = verifcationKey ?? throw new ArgumentNullException(nameof(verifcationKey)); + _ = man ?? throw new ArgumentNullException(nameof(man)); //get the hash of the token byte[] hash = ManagedHash.ComputeHash(token, HashAlg.SHA256); @@ -726,23 +637,7 @@ namespace VNLib.Data.Caching.Extensions //decode the signature byte[] sig = Convert.FromBase64String(signature); - //try to get the ecdsa key first - using ECDsa? ec = verifcationKey.GetECDsaPublicKey(); - if(ec != null) - { - //Verify the signature - return ec.VerifyHash(hash, sig, DSASignatureFormat.IeeeP1363FixedFieldConcatenation); - } - - //Check rsa next - using RSA? rsa = verifcationKey.GetRSAPublicKey(); - if(rsa != null) - { - //Verify the signature - return rsa.VerifyHash(hash, sig, HashAlgorithmName.SHA256, RSASignaturePadding.Pkcs1); - } - - throw new CryptographicException("Cache JKW does not export a supported public key for upgrade challenges"); + return man.VerifyMessageHash(hash, HashAlg.SHA256, sig); } private static string GetAdvertismentHeader(CacheNodeConfiguration nodeConfiguration) @@ -756,7 +651,7 @@ namespace VNLib.Data.Caching.Extensions using JsonWebToken jwt = new(); //Get the jwt header - jwt.WriteHeader(nodeConfiguration.GetJwtHeader()); + jwt.WriteHeader(nodeConfiguration.AuthManager.GetJwtHeader()); jwt.InitPayloadClaim() .AddClaim("nonce", RandomHash.GetRandomBase32(16)) @@ -768,7 +663,7 @@ namespace VNLib.Data.Caching.Extensions .CommitClaims(); //Sign message - nodeConfiguration.SignJwt(jwt); + nodeConfiguration.AuthManager.SignJwt(jwt); return jwt.Compile(); } @@ -780,12 +675,12 @@ namespace VNLib.Data.Caching.Extensions /// <param name="message">The advertisment message to verify</param> /// <returns>The advertisment message if successfully verified, or null otherwise</returns> /// <exception cref="FormatException"></exception> - public static ICachePeerAdvertisment? VerifyPeerAdvertisment(this ICacheJwtManager config, string message) + public static ICacheNodeAdvertisment? VerifyPeerAdvertisment(this ICacheAuthManager config, string message) { using JsonWebToken jwt = JsonWebToken.Parse(message); //Verify the signature - if (!config.VerifyCache(jwt)) + if (!config.VerifyJwt(jwt)) { return null; } @@ -800,7 +695,7 @@ namespace VNLib.Data.Caching.Extensions /// </summary> /// <param name="servers"></param> /// <returns>A server selected at random</returns> - public static ICachePeerAdvertisment SelectRandom(this ICollection<ICachePeerAdvertisment> servers) + public static ICacheNodeAdvertisment SelectRandom(this ICollection<ICacheNodeAdvertisment> servers) { //select random server int randServer = RandomNumberGenerator.GetInt32(0, servers.Count); @@ -808,7 +703,7 @@ namespace VNLib.Data.Caching.Extensions } - private class Advertisment : ICachePeerAdvertisment + private class Advertisment : ICacheNodeAdvertisment { [JsonIgnore] public Uri? ConnectEndpoint { get; set; } diff --git a/lib/VNLib.Data.Caching.Extensions/src/ICacheAuthManager.cs b/lib/VNLib.Data.Caching.Extensions/src/ICacheAuthManager.cs new file mode 100644 index 0000000..e3ab868 --- /dev/null +++ b/lib/VNLib.Data.Caching.Extensions/src/ICacheAuthManager.cs @@ -0,0 +1,76 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Data.Caching.Extensions +* File: ClientCacheConfiguration.cs +* +* ClientCacheConfiguration.cs is part of VNLib.Data.Caching.Extensions which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Data.Caching.Extensions is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Data.Caching.Extensions is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Collections.Generic; + +using VNLib.Hashing; +using VNLib.Hashing.IdentityUtility; + +namespace VNLib.Data.Caching.Extensions +{ + /// <summary> + /// Provides authentication services for cache clients and + /// servers. + /// </summary> + public interface ICacheAuthManager + { + /// <summary> + /// Gets the JWT header to use for signing messages with the + /// given key + /// </summary> + /// <returns>The JWT header collection</returns> + IReadOnlyDictionary<string, string?> GetJwtHeader(); + + /// <summary> + /// Signs the given JWT + /// </summary> + /// <param name="jwt">The message to sign</param> + void SignJwt(JsonWebToken jwt); + + /// <summary> + /// Verifies the given JWT + /// </summary> + /// <param name="jwt">The message to verify authenticity</param> + /// <returns>True of the JWT could be verified, false otherwise</returns> + bool VerifyJwt(JsonWebToken jwt); + + /// <summary> + /// Signs the given message hash + /// </summary> + /// <param name="hash">The message hash to sign</param> + /// <param name="alg">The algorithm used to sign the message hash</param> + /// <returns>The signature of the hash</returns> + byte[] SignMessageHash(byte[] hash, HashAlg alg); + + /// <summary> + /// Verifies the given message hash against the signature. + /// </summary> + /// <param name="hash">The message hash to compare</param> + /// <param name="alg">The algorithm used to produce the message hash</param> + /// <param name="signature">The message signature to verify the message against</param> + /// <returns>True of the signature could be verified</returns> + bool VerifyMessageHash(ReadOnlySpan<byte> hash, HashAlg alg, ReadOnlySpan<byte> signature); + } +} diff --git a/lib/VNLib.Data.Caching.Extensions/src/ICacheDiscoveryErrorHandler.cs b/lib/VNLib.Data.Caching.Extensions/src/ICacheDiscoveryErrorHandler.cs new file mode 100644 index 0000000..3493d48 --- /dev/null +++ b/lib/VNLib.Data.Caching.Extensions/src/ICacheDiscoveryErrorHandler.cs @@ -0,0 +1,41 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Data.Caching.Extensions +* File: ClientCacheConfiguration.cs +* +* ClientCacheConfiguration.cs is part of VNLib.Data.Caching.Extensions which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Data.Caching.Extensions is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Data.Caching.Extensions is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; + +namespace VNLib.Data.Caching.Extensions +{ + /// <summary> + /// Represents an type that will handle errors that occur during the discovery process + /// </summary> + public interface ICacheDiscoveryErrorHandler + { + /// <summary> + /// Invoked when an error occurs during the discovery process + /// </summary> + /// <param name="errorNode">The node that the error occured on</param> + /// <param name="ex">The exception that caused the invocation</param> + void OnDiscoveryError(ICacheNodeAdvertisment errorNode, Exception ex); + } +} diff --git a/lib/VNLib.Data.Caching.Extensions/src/ICachePeerAdvertisment.cs b/lib/VNLib.Data.Caching.Extensions/src/ICacheNodeAdvertisment.cs index acf883e..fc29955 100644 --- a/lib/VNLib.Data.Caching.Extensions/src/ICachePeerAdvertisment.cs +++ b/lib/VNLib.Data.Caching.Extensions/src/ICacheNodeAdvertisment.cs @@ -3,9 +3,9 @@ * * Library: VNLib * Package: VNLib.Data.Caching.Extensions -* File: ICachePeerAdvertisment.cs +* File: ICacheNodeAdvertisment.cs * -* ICachePeerAdvertisment.cs is part of VNLib.Data.Caching.Extensions which is part of the larger +* ICacheNodeAdvertisment.cs is part of VNLib.Data.Caching.Extensions which is part of the larger * VNLib collection of libraries and utilities. * * VNLib.Data.Caching.Extensions is free software: you can redistribute it and/or modify @@ -30,7 +30,7 @@ namespace VNLib.Data.Caching.Extensions /// <summary> /// Represents a node that can be advertised to clients /// </summary> - public interface ICachePeerAdvertisment + public interface ICacheNodeAdvertisment { /// <summary> /// The endpoint for clients to connect to to access the cache diff --git a/lib/VNLib.Data.Caching.Extensions/src/INodeDiscoveryCollection.cs b/lib/VNLib.Data.Caching.Extensions/src/INodeDiscoveryCollection.cs new file mode 100644 index 0000000..9adebdc --- /dev/null +++ b/lib/VNLib.Data.Caching.Extensions/src/INodeDiscoveryCollection.cs @@ -0,0 +1,62 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Data.Caching.Extensions +* File: INodeDiscoveryCollection.cs +* +* INodeDiscoveryCollection.cs is part of VNLib.Data.Caching.Extensions which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Data.Caching.Extensions is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Data.Caching.Extensions is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + + +using System.Collections.Generic; + +namespace VNLib.Data.Caching.Extensions +{ + /// <summary> + /// Represents a collection of discovered nodes + /// </summary> +#pragma warning disable CA1711 // Identifiers should not have incorrect suffix + public interface INodeDiscoveryCollection +#pragma warning restore CA1711 // Identifiers should not have incorrect suffix + { + /// <summary> + /// Begins a new discovery and gets an enumerator for the discovery process + /// </summary> + /// <returns>An enumerator that simplifies discovery of unique nodes</returns> + INodeDiscoveryEnumerator BeginDiscovery(); + + /// <summary> + /// Begins a new discovery and gets an enumerator for the discovery process + /// </summary> + /// <param name="initialPeers">An initial collection of peers to add to the enumeration</param> + /// <returns>An enumerator that simplifies discovery of unique nodes</returns> + INodeDiscoveryEnumerator BeginDiscovery(IEnumerable<ICacheNodeAdvertisment> initialPeers); + + /// <summary> + /// Gets a snapshot of all discovered nodes in the current collection. + /// </summary> + /// <returns>The current collection of notes</returns> + ICacheNodeAdvertisment[] GetAllNodes(); + + /// <summary> + /// Completes a discovery process and updates the collection with the results + /// </summary> + /// <param name="enumerator">The enumerator used to collect discovered nodes</param> + void CompleteDiscovery(INodeDiscoveryEnumerator enumerator); + } +} diff --git a/lib/VNLib.Data.Caching.Extensions/src/INodeDiscoveryEnumerator.cs b/lib/VNLib.Data.Caching.Extensions/src/INodeDiscoveryEnumerator.cs new file mode 100644 index 0000000..f6d5f40 --- /dev/null +++ b/lib/VNLib.Data.Caching.Extensions/src/INodeDiscoveryEnumerator.cs @@ -0,0 +1,42 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Data.Caching.Extensions +* File: INodeDiscoveryEnumerator.cs +* +* INodeDiscoveryEnumerator.cs is part of VNLib.Data.Caching.Extensions which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Data.Caching.Extensions is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Data.Caching.Extensions is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + + +using System.Collections.Generic; + + +namespace VNLib.Data.Caching.Extensions +{ + /// <summary> + /// A custom enumerator for the node discovery process + /// </summary> + public interface INodeDiscoveryEnumerator : IEnumerator<ICacheNodeAdvertisment> + { + /// <summary> + /// Adds the specified peer to the collection of discovered peers + /// </summary> + /// <param name="discoveredPeers">The peer collection</param> + void OnPeerDiscoveryComplete(IEnumerable<ICacheNodeAdvertisment> discoveredPeers); + } +} diff --git a/lib/VNLib.Data.Caching.Extensions/src/NodeDiscoveryCollection.cs b/lib/VNLib.Data.Caching.Extensions/src/NodeDiscoveryCollection.cs new file mode 100644 index 0000000..305f5de --- /dev/null +++ b/lib/VNLib.Data.Caching.Extensions/src/NodeDiscoveryCollection.cs @@ -0,0 +1,117 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Data.Caching.Extensions +* File: INodeDiscoveryCollection.cs +* +* INodeDiscoveryCollection.cs is part of VNLib.Data.Caching.Extensions which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Data.Caching.Extensions is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Data.Caching.Extensions is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Linq; +using System.Collections.Generic; +using System.Collections; + +namespace VNLib.Data.Caching.Extensions +{ + /// <summary> + /// Represents a collection of available cache nodes from a discovery process + /// </summary> + public sealed class NodeDiscoveryCollection : INodeDiscoveryCollection + { + private LinkedList<ICacheNodeAdvertisment> _peers; + + /// <summary> + /// Initializes a new empty <see cref="NodeDiscoveryCollection"/> + /// </summary> + public NodeDiscoveryCollection() + { + _peers = new(); + } + + ///<inheritdoc/> + public INodeDiscoveryEnumerator BeginDiscovery() + { + return new NodeEnumerator(new()); + } + + ///<inheritdoc/> + public INodeDiscoveryEnumerator BeginDiscovery(IEnumerable<ICacheNodeAdvertisment> initialPeers) + { + //Init new enumerator with the initial peers + return new NodeEnumerator(new(initialPeers)); + } + + ///<inheritdoc/> + public void CompleteDiscovery(INodeDiscoveryEnumerator enumerator) + { + _ = enumerator ?? throw new ArgumentNullException(nameof(enumerator)); + + //Capture all nodes from the enumerator and store them as our current peers + _peers = (enumerator as NodeEnumerator)!.Peers; + } + + ///<inheritdoc/> + public ICacheNodeAdvertisment[] GetAllNodes() + { + //Capture all current peers + return _peers.ToArray(); + } + + private sealed record class NodeEnumerator(LinkedList<ICacheNodeAdvertisment> Peers) : INodeDiscoveryEnumerator + { + //Keep track of the current node in the collection so we can move down the list + private LinkedListNode<ICacheNodeAdvertisment>? _currentNode = Peers.First; + + public ICacheNodeAdvertisment Current => _currentNode?.Value; + object IEnumerator.Current => _currentNode?.Value; + + + ///<inheritdoc/> + public bool MoveNext() + { + //Move to the next peer in the collection + _currentNode = _currentNode?.Next; + + return _currentNode?.Value != null; + } + + ///<inheritdoc/> + public void OnPeerDiscoveryComplete(IEnumerable<ICacheNodeAdvertisment> discoveredPeers) + { + //Get only the peers from the discovery that are not already in the collection + IEnumerable<ICacheNodeAdvertisment> newPeers = discoveredPeers.Except(Peers); + + //Add them to the end of the collection + foreach (ICacheNodeAdvertisment ad in newPeers) + { + Peers.AddLast(ad); + } + } + + public void Reset() + { + //Go to the first node + _currentNode = Peers.First; + } + + public void Dispose() + { } + } + } +} |