aboutsummaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to 'lib')
-rw-r--r--lib/VNLib.Data.Caching.Extensions/src/ActiveServer.cs2
-rw-r--r--lib/VNLib.Data.Caching.Extensions/src/BrokerRegistrationRequest.cs116
-rw-r--r--lib/VNLib.Data.Caching.Extensions/src/CacheClientConfiguration.cs106
-rw-r--r--lib/VNLib.Data.Caching.Extensions/src/CacheListServerRequest.cs82
-rw-r--r--lib/VNLib.Data.Caching.Extensions/src/CacheNodeConfiguration.cs14
-rw-r--r--lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs317
-rw-r--r--lib/VNLib.Data.Caching.Extensions/src/ICacheAuthManager.cs76
-rw-r--r--lib/VNLib.Data.Caching.Extensions/src/ICacheDiscoveryErrorHandler.cs41
-rw-r--r--lib/VNLib.Data.Caching.Extensions/src/ICacheNodeAdvertisment.cs (renamed from lib/VNLib.Data.Caching.Extensions/src/ICachePeerAdvertisment.cs)6
-rw-r--r--lib/VNLib.Data.Caching.Extensions/src/INodeDiscoveryCollection.cs62
-rw-r--r--lib/VNLib.Data.Caching.Extensions/src/INodeDiscoveryEnumerator.cs42
-rw-r--r--lib/VNLib.Data.Caching.Extensions/src/NodeDiscoveryCollection.cs117
-rw-r--r--lib/VNLib.Data.Caching.ObjectCache/src/BlobCacheLIstener.cs38
-rw-r--r--lib/VNLib.Data.Caching.ObjectCache/src/ICacheListenerEventQueue.cs65
-rw-r--r--lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs132
-rw-r--r--lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClientConfig.cs48
16 files changed, 727 insertions, 537 deletions
diff --git a/lib/VNLib.Data.Caching.Extensions/src/ActiveServer.cs b/lib/VNLib.Data.Caching.Extensions/src/ActiveServer.cs
index 2d02491..3020376 100644
--- a/lib/VNLib.Data.Caching.Extensions/src/ActiveServer.cs
+++ b/lib/VNLib.Data.Caching.Extensions/src/ActiveServer.cs
@@ -27,7 +27,7 @@ using System.Text.Json.Serialization;
namespace VNLib.Data.Caching.Extensions
{
- public class ActiveServer : ICachePeerAdvertisment
+ public class ActiveServer : ICacheNodeAdvertisment
{
[JsonPropertyName("address")]
public string? HostName { get; set; }
diff --git a/lib/VNLib.Data.Caching.Extensions/src/BrokerRegistrationRequest.cs b/lib/VNLib.Data.Caching.Extensions/src/BrokerRegistrationRequest.cs
deleted file mode 100644
index 9ec559a..0000000
--- a/lib/VNLib.Data.Caching.Extensions/src/BrokerRegistrationRequest.cs
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
-* Copyright (c) 2023 Vaughn Nugent
-*
-* Library: VNLib
-* Package: VNLib.Data.Caching.Extensions
-* File: BrokerRegistrationRequest.cs
-*
-* BrokerRegistrationRequest.cs is part of VNLib.Data.Caching.Extensions which is part of the larger
-* VNLib collection of libraries and utilities.
-*
-* VNLib.Data.Caching.Extensions is free software: you can redistribute it and/or modify
-* it under the terms of the GNU Affero General Public License as
-* published by the Free Software Foundation, either version 3 of the
-* License, or (at your option) any later version.
-*
-* VNLib.Data.Caching.Extensions is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-* GNU Affero General Public License for more details.
-*
-* You should have received a copy of the GNU Affero General Public License
-* along with this program. If not, see https://www.gnu.org/licenses/.
-*/
-
-using System;
-using System.Collections.Generic;
-
-using VNLib.Utils;
-using VNLib.Hashing.IdentityUtility;
-
-
-namespace VNLib.Data.Caching.Extensions
-{
- /// <summary>
- /// A broker registration request message in a fluent api
- /// format. This message may be disposed when no longer in use
- /// </summary>
- public sealed class BrokerRegistrationRequest : VnDisposeable
- {
- private bool ownsKey;
- private ReadOnlyJsonWebKey? SigningKey;
-
- /// <summary>
- /// The cache server node id
- /// </summary>
- public string? NodeId { get; private set; }
- /// <summary>
- /// The broker server's address
- /// </summary>
- public Uri? BrokerAddress { get; private set; }
- /// <summary>
- /// The security token used by the broker server to
- /// authenticate during heartbeat connections
- /// </summary>
- public string? HeartbeatToken { get; private set; }
- /// <summary>
- /// The address for remote clients to use to
- /// connect to this server
- /// </summary>
- public string? RegistrationAddress { get; private set; }
-
- /// <summary>
- /// Recovers the private key from the supplied certificate
- /// </summary>
- /// <param name="jwk">The private key used to sign messages</param>
- /// <param name="ownsKey">A value that indicates if the current instance owns the key</param>
- /// <returns></returns>
- /// <exception cref="ArgumentException"></exception>
- public BrokerRegistrationRequest WithSigningKey(ReadOnlyJsonWebKey jwk, bool ownsKey)
- {
- this.ownsKey = ownsKey;
- SigningKey = jwk ?? throw new ArgumentNullException(nameof(jwk));
- return this;
- }
-
- public BrokerRegistrationRequest WithBroker(Uri brokerUri)
- {
- BrokerAddress = brokerUri;
- return this;
- }
-
- public BrokerRegistrationRequest WithRegistrationAddress(string address)
- {
- RegistrationAddress = address;
- return this;
- }
-
- public BrokerRegistrationRequest WithHeartbeatToken(string token)
- {
- HeartbeatToken = token;
- return this;
- }
-
- public BrokerRegistrationRequest WithNodeId(string nodeId)
- {
- NodeId = nodeId;
- return this;
- }
-
- internal void SignJwt(JsonWebToken jwt)
- {
- jwt.SignFromJwk(SigningKey);
- }
-
- internal IReadOnlyDictionary<string, string?> JsonHeader => SigningKey!.JwtHeader;
-
- ///<inheritdoc/>
- protected override void Free()
- {
- if (ownsKey)
- {
- SigningKey?.Dispose();
- }
- }
- }
-}
diff --git a/lib/VNLib.Data.Caching.Extensions/src/CacheClientConfiguration.cs b/lib/VNLib.Data.Caching.Extensions/src/CacheClientConfiguration.cs
index 05e4928..9229c89 100644
--- a/lib/VNLib.Data.Caching.Extensions/src/CacheClientConfiguration.cs
+++ b/lib/VNLib.Data.Caching.Extensions/src/CacheClientConfiguration.cs
@@ -3,9 +3,9 @@
*
* Library: VNLib
* Package: VNLib.Data.Caching.Extensions
-* File: ClientCacheConfiguration.cs
+* File: CacheClientConfiguration.cs
*
-* ClientCacheConfiguration.cs is part of VNLib.Data.Caching.Extensions which is part of the larger
+* CacheClientConfiguration.cs is part of VNLib.Data.Caching.Extensions which is part of the larger
* VNLib collection of libraries and utilities.
*
* VNLib.Data.Caching.Extensions is free software: you can redistribute it and/or modify
@@ -22,69 +22,47 @@
* along with this program. If not, see https://www.gnu.org/licenses/.
*/
-using System;
+using System.Linq;
using System.Collections.Generic;
-using System.Security.Cryptography;
-
-using VNLib.Hashing;
-using VNLib.Hashing.IdentityUtility;
namespace VNLib.Data.Caching.Extensions
{
- public interface ICacheJwtManager
- {
- IReadOnlyDictionary<string, string?> GetJwtHeader();
-
- void SignJwt(JsonWebToken jwt);
-
- bool VerifyCache(JsonWebToken jwt);
-
- bool VerifyBroker(JsonWebToken jwt);
- }
-
/// <summary>
/// A fluent api configuration object for configuring a <see cref="FBMClient"/>
/// to connect to cache servers.
/// </summary>
- public class CacheClientConfiguration : ICacheJwtManager, ICacheListServerRequest
+ public class CacheClientConfiguration
{
- public ReadOnlyJsonWebKey? SigningKey { get; private set; }
- public ReadOnlyJsonWebKey? VerificationKey { get; private set; }
- public ReadOnlyJsonWebKey? BrokerVerificationKey { get; private set; }
-
- public Uri? DiscoveryEndpoint { get; private set; }
- public bool UseTls { get; private set; }
- internal ICachePeerAdvertisment[]? CacheServers { get; set; }
+ /// <summary>
+ /// Stores available cache servers to be used for discovery, and connections
+ /// </summary>
+ public INodeDiscoveryCollection NodeCollection { get; } = new NodeDiscoveryCollection();
/// <summary>
- /// Imports the private key used to sign messages
+ /// The authentication manager to use for signing and verifying messages to and from the cache servers
/// </summary>
- /// <param name="jwk">The <see cref="ReadOnlyJsonWebKey"/> with a private key loaded</param>
- /// <returns>Chainable fluent object</returns>
- /// <exception cref="ArgumentException"></exception>
- /// <exception cref="CryptographicException"></exception>
- public CacheClientConfiguration WithSigningKey(ReadOnlyJsonWebKey jwk)
- {
- SigningKey = jwk ?? throw new ArgumentNullException(nameof(jwk));
- return this;
- }
+ public ICacheAuthManager AuthManager { get; private set; }
/// <summary>
- /// Imports the public key used to verify messages from the remote server
+ /// The error handler to use for handling errors that occur during the discovery process
/// </summary>
- /// <param name="jwk">The <see cref="ReadOnlyJsonWebKey"/> public key only used for message verification</param>
- /// <returns>Chainable fluent object</returns>
- /// <exception cref="ArgumentException"></exception>
- /// <exception cref="CryptographicException"></exception>
- public CacheClientConfiguration WithVerificationKey(ReadOnlyJsonWebKey jwk)
- {
- VerificationKey = jwk ?? throw new ArgumentNullException(nameof(jwk));
- return this;
- }
+ public ICacheDiscoveryErrorHandler? ErrorHandler { get; private set; }
- public CacheClientConfiguration WithBrokerVerificationKey(ReadOnlyJsonWebKey jwk)
+ /// <summary>
+ /// Specifies if all connections should use TLS
+ /// </summary>
+ public bool UseTls { get; private set; }
+
+ internal ICacheNodeAdvertisment[]? InitialPeers { get; set; }
+
+ /// <summary>
+ /// Specifies the JWT authentication manager to use for signing and verifying JWTs
+ /// </summary>
+ /// <param name="manager">The authentication manager</param>
+ /// <returns>Chainable fluent object</returns>
+ public CacheClientConfiguration WithAuthenticator(ICacheAuthManager manager)
{
- BrokerVerificationKey = jwk ?? throw new ArgumentNullException(nameof(jwk));
+ AuthManager = manager;
return this;
}
@@ -92,7 +70,6 @@ namespace VNLib.Data.Caching.Extensions
/// Specifies if all connections should be using TLS
/// </summary>
/// <param name="useTls">A value that indicates if connections should use TLS</param>
- /// <returns>Chainable fluent object</returns>
public CacheClientConfiguration WithTls(bool useTls)
{
UseTls = useTls;
@@ -100,28 +77,25 @@ namespace VNLib.Data.Caching.Extensions
}
/// <summary>
- /// Specifies the broker address to discover cache nodes from
+ /// Specifies the initial cache peers to connect to
/// </summary>
- /// <param name="brokerAddress">The address of the server broker</param>
+ /// <param name="peers">The collection of servers to discover peers from and connect to</param>
/// <returns>Chainable fluent object</returns>
- /// <exception cref="ArgumentNullException"></exception>
- public CacheClientConfiguration WithBroker(Uri brokerAddress)
+ public CacheClientConfiguration WithInitialPeers(IEnumerable<ICacheNodeAdvertisment> peers)
{
- DiscoveryEndpoint = brokerAddress ?? throw new ArgumentNullException(nameof(brokerAddress));
+ InitialPeers = peers.ToArray();
return this;
}
-
- ///<inheritdoc/>
- public void SignJwt(JsonWebToken jwt) => jwt.SignFromJwk(SigningKey!);
-
- ///<inheritdoc/>
- public bool VerifyCache(JsonWebToken jwt) => jwt.VerifyFromJwk(VerificationKey!);
-
- ///<inheritdoc/>
- public bool VerifyBroker(JsonWebToken jwt) => jwt.VerifyFromJwk(BrokerVerificationKey!);
-
- ///<inheritdoc/>
- public IReadOnlyDictionary<string, string?> GetJwtHeader() => SigningKey!.JwtHeader;
+ /// <summary>
+ /// Specifies the error handler to use for handling errors that occur during the discovery process
+ /// </summary>
+ /// <param name="handler">The error handler to use during a discovery</param>
+ /// <returns>Chainable fluent object</returns>
+ public CacheClientConfiguration WithErrorHandler(ICacheDiscoveryErrorHandler handler)
+ {
+ ErrorHandler = handler;
+ return this;
+ }
}
}
diff --git a/lib/VNLib.Data.Caching.Extensions/src/CacheListServerRequest.cs b/lib/VNLib.Data.Caching.Extensions/src/CacheListServerRequest.cs
deleted file mode 100644
index 76d4ad8..0000000
--- a/lib/VNLib.Data.Caching.Extensions/src/CacheListServerRequest.cs
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
-* Copyright (c) 2023 Vaughn Nugent
-*
-* Library: VNLib
-* Package: VNLib.Data.Caching.Extensions
-* File: ListServerRequest.cs
-*
-* ListServerRequest.cs is part of VNLib.Data.Caching.Extensions which is part of the larger
-* VNLib collection of libraries and utilities.
-*
-* VNLib.Data.Caching.Extensions is free software: you can redistribute it and/or modify
-* it under the terms of the GNU Affero General Public License as
-* published by the Free Software Foundation, either version 3 of the
-* License, or (at your option) any later version.
-*
-* VNLib.Data.Caching.Extensions is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-* GNU Affero General Public License for more details.
-*
-* You should have received a copy of the GNU Affero General Public License
-* along with this program. If not, see https://www.gnu.org/licenses/.
-*/
-
-using System;
-using System.Collections.Generic;
-
-using VNLib.Utils;
-using VNLib.Hashing.IdentityUtility;
-
-namespace VNLib.Data.Caching.Extensions
-{
- public interface ICacheListServerRequest : ICacheJwtManager
- {
- Uri DiscoveryEndpoint { get; }
- }
-
- /// <summary>
- /// A request container for a ListServer request
- /// </summary>
- public sealed class CacheListServerRequest : ICacheListServerRequest
- {
- private readonly ICacheJwtManager _manager;
-
-
- /// <summary>
- /// The address of the broker server to connect to
- /// </summary>
- public Uri DiscoveryEndpoint { get; private set; }
-
- public CacheListServerRequest(ICacheJwtManager keyManager, Uri? brokerAddress = null)
- {
- _manager = keyManager;
- DiscoveryEndpoint = brokerAddress!;
- }
-
-
- /// <summary>
- /// Sets the broker address for the request
- /// </summary>
- /// <param name="brokerAddr">The broker server's address to connect to</param>
- /// <returns>A fluent chainable value</returns>
- /// <exception cref="ArgumentNullException"></exception>
- public CacheListServerRequest WithDiscoveryEndpoint(Uri brokerAddr)
- {
- DiscoveryEndpoint = brokerAddr ?? throw new ArgumentNullException(nameof(brokerAddr));
- return this;
- }
-
- /// <inheritdoc/>
- public void SignJwt(JsonWebToken jwt) => _manager.SignJwt(jwt);
-
- /// <inheritdoc/>
- public bool VerifyCache(JsonWebToken jwt) => _manager.VerifyCache(jwt);
-
- /// <inheritdoc/>
- public bool VerifyBroker(JsonWebToken jwt) => _manager.VerifyBroker(jwt);
-
- /// <inheritdoc/>
- public IReadOnlyDictionary<string, string?> GetJwtHeader() => _manager.GetJwtHeader();
- }
-}
diff --git a/lib/VNLib.Data.Caching.Extensions/src/CacheNodeConfiguration.cs b/lib/VNLib.Data.Caching.Extensions/src/CacheNodeConfiguration.cs
index 21a99e1..29a763c 100644
--- a/lib/VNLib.Data.Caching.Extensions/src/CacheNodeConfiguration.cs
+++ b/lib/VNLib.Data.Caching.Extensions/src/CacheNodeConfiguration.cs
@@ -24,11 +24,13 @@
using System;
-
namespace VNLib.Data.Caching.Extensions
{
- public class CacheNodeConfiguration: CacheClientConfiguration, ICachePeerAdvertisment
+ /// <summary>
+ /// A cache configuration for cache servers (nodes)
+ /// </summary>
+ public class CacheNodeConfiguration: CacheClientConfiguration, ICacheNodeAdvertisment
{
/// <summary>
/// The address for clients to connect to
@@ -56,9 +58,13 @@ namespace VNLib.Data.Caching.Extensions
return this;
}
- public CacheNodeConfiguration EnableAdvertisment(bool enable, Uri? discoveryEndpoint)
+ /// <summary>
+ /// Enables or disables the advertisement of this node to other nodes
+ /// </summary>
+ /// <param name="discoveryEndpoint">The absolute endpoint clients will use to connect to</param>
+ public CacheNodeConfiguration EnableAdvertisment(Uri? discoveryEndpoint)
{
- BroadcastAdverisment = enable;
+ BroadcastAdverisment = discoveryEndpoint != null;
DiscoveryEndpoint = discoveryEndpoint;
return this;
}
diff --git a/lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs b/lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs
index 9efe16a..634b6de 100644
--- a/lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs
+++ b/lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs
@@ -139,91 +139,114 @@ namespace VNLib.Data.Caching.Extensions
client.Config.DebugLog?.Debug("{debug}: {data}", "[CACHE]", message);
}
+
/// <summary>
- /// Creats a new <see cref="CacheListServerRequest"/> from an existing <see cref="CacheClientConfiguration"/>
+ /// Discovers ALL possible cache nodes itteritivley from the current collection of initial peers.
+ /// This will make connections to all discoverable servers
/// </summary>
- /// <param name="conf">The prepared client configuration</param>
- /// <returns>The new <see cref="CacheListServerRequest"/></returns>
- public static CacheListServerRequest GetListMessage(this CacheClientConfiguration conf)
+ /// <param name="config"></param>
+ /// <param name="cancellation">A token to cancel the operation</param>
+ /// <returns></returns>
+ /// <exception cref="ArgumentException"></exception>
+ public static async Task DiscoverNodesAsync(this CacheClientConfiguration config, CancellationToken cancellation)
{
- return new(conf, conf.DiscoveryEndpoint);
+ //Make sure at least one node defined
+ if(config?.InitialPeers == null || config.InitialPeers.Length == 0)
+ {
+ throw new ArgumentException("There must be at least one cache server defined in the client configuration");
+ }
+
+ //Get the discovery enumerator with the initial peers
+ INodeDiscoveryEnumerator enumerator = config.NodeCollection.BeginDiscovery(config.InitialPeers);
+
+ //Start the discovery process
+ await DiscoverNodesAsync(enumerator, config.AuthManager, config.ErrorHandler, cancellation);
+
+ //Commit nodes
+ config.NodeCollection.CompleteDiscovery(enumerator);
}
- /// <summary>
- /// Discovers peer nodes from a given initial peer and returns a list of discovered nodes. If the config
- /// is for a cache peer node, the current peer is removed from the list of discovered nodes.
- /// </summary>
- /// <param name="cacheConfig"></param>
- /// <param name="initialPeer">The initial peer to discover nodes from</param>
- /// <param name="cancellation">A token to cancel the discovery operation</param>
- /// <returns>The collection of discovered nodes</returns>
- /// <exception cref="ArgumentNullException"></exception>
- public static async Task<ICachePeerAdvertisment[]?> DiscoverClusterNodesAsync(
- this CacheClientConfiguration cacheConfig,
- ICachePeerAdvertisment initialPeer,
- CancellationToken cancellation
+ private static async Task DiscoverNodesAsync(
+ INodeDiscoveryEnumerator enumerator,
+ ICacheAuthManager auth,
+ ICacheDiscoveryErrorHandler? errHandler,
+ CancellationToken cancellation
)
{
- _ = initialPeer?.DiscoveryEndpoint ?? throw new ArgumentException("Advertisment does not expose an advertisment endpoint");
-
- //Create list request
- CacheListServerRequest request = cacheConfig.GetListMessage();
+ //Loop through servers
+ while (enumerator.MoveNext())
+ {
+ //Make sure the node has a discovery endpoint
+ if (enumerator.Current.DiscoveryEndpoint == null)
+ {
+ //Skip this node
+ continue;
+ }
- //Override with the initial peer's discovery endpoint
- request.WithDiscoveryEndpoint(initialPeer.DiscoveryEndpoint);
+ /*
+ * We are allowed to save nodes that do not have a discovery endpoint, but we cannot discover nodes from them
+ * we can only use them as cache
+ */
- //Get the list of servers
- ICachePeerAdvertisment[]? servers = await ListServersAsync(request, cancellation);
+ //add a random delay to avoid spamming the server
+ await Task.Delay((int)Random.Shared.NextInt64(50, 500), cancellation);
- if (servers == null)
- {
- return null;
- }
-
- if(cacheConfig is CacheNodeConfiguration cnc)
- {
- //Filter out the current node
- return servers.Where(s => !cnc.NodeId.Equals(s.NodeId, StringComparison.OrdinalIgnoreCase)).ToArray();
+ try
+ {
+ //Discover nodes from the current node
+ ICacheNodeAdvertisment[]? nodes = await GetCacheNodesAsync(enumerator.Current, auth, cancellation);
+
+ if (nodes != null)
+ {
+ //Add nodes to the collection
+ enumerator.OnPeerDiscoveryComplete(nodes);
+ }
+ }
+ //Catch exceptions when an error handler is defined
+ catch(Exception ex) when (errHandler != null)
+ {
+ //Handle the error
+ errHandler.OnDiscoveryError(enumerator.Current, ex);
+ }
}
- else
- {
- //Do not filter
- return servers;
- }
}
/// <summary>
/// Contacts the cache broker to get a list of active servers to connect to
/// </summary>
- /// <param name="request">The request message used to connecto the broker server</param>
+ /// <param name="advert">An advertisment of a server to discover other nodes from</param>
/// <param name="cancellationToken">A token to cancel the operationS</param>
+ /// <param name="auth">The authentication manager</param>
/// <returns>The list of active servers</returns>
/// <exception cref="SecurityException"></exception>
+ /// <exception cref="ArgumentException"></exception>
/// <exception cref="ArgumentNullException"></exception>
- public static async Task<ICachePeerAdvertisment[]?> ListServersAsync(ICacheListServerRequest request, CancellationToken cancellationToken = default)
+ public static async Task<ICacheNodeAdvertisment[]?> GetCacheNodesAsync(ICacheNodeAdvertisment advert, ICacheAuthManager auth, CancellationToken cancellationToken = default)
{
- _ = request ?? throw new ArgumentNullException(nameof(request));
+ _ = advert ?? throw new ArgumentNullException(nameof(advert));
+ _ = auth ?? throw new ArgumentNullException(nameof(auth));
+ _ = advert.DiscoveryEndpoint ?? throw new ArgumentException("Advertisment does not expose an advertisment endpoint");
string jwtBody;
//Build request jwt
using (JsonWebToken requestJwt = new())
{
- requestJwt.WriteHeader(request.GetJwtHeader());
+ requestJwt.WriteHeader(auth.GetJwtHeader());
requestJwt.InitPayloadClaim()
.AddClaim("iat", DateTimeOffset.UtcNow.ToUnixTimeMilliseconds())
.AddClaim("nonce", RandomHash.GetRandomBase32(16))
.CommitClaims();
//sign the jwt
- request.SignJwt(requestJwt);
+ auth.SignJwt(requestJwt);
//Compile the jwt
jwtBody = requestJwt.Compile();
}
//New list request
- RestRequest listRequest = new(request.DiscoveryEndpoint, Method.Post);
+ RestRequest listRequest = new(advert.DiscoveryEndpoint, Method.Post);
//Add the jwt as a string to the request body
listRequest.AddStringBody(jwtBody, DataFormat.None);
@@ -248,87 +271,18 @@ namespace VNLib.Data.Caching.Extensions
//Response is jwt
using JsonWebToken responseJwt = JsonWebToken.ParseRaw(data);
-
+
//Verify the jwt
- if (!request.VerifyBroker(responseJwt))
+ if (!auth.VerifyJwt(responseJwt))
{
throw new SecurityException("Failed to verify the broker's challenge, cannot continue");
}
-
+
using JsonDocument doc = responseJwt.GetPayload();
return doc.RootElement.GetProperty("peers").Deserialize<Advertisment[]>();
}
/// <summary>
- /// Registers the current node with the broker
- /// </summary>
- /// <returns>A task that completes when the regitration has been made successfully</returns>
- /// <exception cref="ArgumentException"></exception>
- public static async Task RegisterWithBrokerAsync(this CacheNodeConfiguration config, string authToken)
- {
- //Recover the certificate
- ReadOnlyJsonWebKey cacheCert = config?.SigningKey ?? throw new ArgumentException(nameof(config.SigningKey));
-
- //init broker request
- using BrokerRegistrationRequest request = new();
-
- request.WithBroker(config.DiscoveryEndpoint!)
- .WithRegistrationAddress(config.ConnectEndpoint!.ToString())
- .WithNodeId(config.NodeId!)
- .WithSigningKey(cacheCert, true)
- .WithHeartbeatToken(authToken);
-
-
- //Send the request
- await RegisterWithBrokerAsync(request);
- }
-
- /// <summary>
- /// Registers the current server as active with the specified broker
- /// </summary>
- /// <param name="registration">The registration request</param>
- public static async Task RegisterWithBrokerAsync(BrokerRegistrationRequest registration)
- {
- _ = registration ?? throw new ArgumentNullException(nameof(registration));
- _ = registration.HeartbeatToken ?? throw new ArgumentException("Missing required heartbeat access token");
- _ = registration.NodeId ?? throw new ArgumentException("Missing required cache server NodeId");
- _ = registration.BrokerAddress ?? throw new ArgumentException("Broker server address has not been configured");
- _ = registration.RegistrationAddress ?? throw new ArgumentException("Missing required registration address", nameof(registration));
-
- string requestData;
- //Create the jwt for signed registration message
- using (JsonWebToken jwt = new())
- {
- //Shared jwt header
- jwt.WriteHeader(registration.JsonHeader);
- //build jwt claim
- jwt.InitPayloadClaim()
- .AddClaim("address", registration.RegistrationAddress)
- .AddClaim("sub", registration.NodeId)
- .AddClaim("token", registration.HeartbeatToken)
- .CommitClaims();
-
- //Sign the jwt
- registration.SignJwt(jwt);
- //Compile and save
- requestData = jwt.Compile();
- }
-
- //Create reg request message
- RestRequest regRequest = new(registration.BrokerAddress);
- regRequest.AddStringBody(requestData, DataFormat.None);
- regRequest.AddHeader("Content-Type", "text/plain");
-
- //Rent client
- using ClientContract cc = ClientPool.Lease();
-
- //Exec the regitration request
- RestResponse response = await cc.Resource.ExecutePutAsync(regRequest);
- response.ThrowIfError();
- }
-
-
- /// <summary>
/// Allows for configuration of an <see cref="FBMClient"/>
/// for a connection to a cache server
/// </summary>
@@ -359,32 +313,7 @@ namespace VNLib.Data.Caching.Extensions
ClientCacheConfig.AddOrUpdate(client, nodeConfig);
return nodeConfig;
}
-
- /// <summary>
- /// Discovers cache nodes in the broker configured for the current client.
- /// </summary>
- /// <param name="client"></param>
- /// <param name="token">A token to cancel the discovery</param>
- /// <returns>A task the resolves the list of active servers on the broker server</returns>
- public static Task<ICachePeerAdvertisment[]?> DiscoverCacheNodesAsync(this FBMClientWorkerBase client, CancellationToken token = default)
- {
- return client.Client.DiscoverCacheNodesAsync(token);
- }
-
- /// <summary>
- /// Discovers cache nodes in the broker configured for the current client.
- /// </summary>
- /// <param name="client"></param>
- /// <param name="token">A token to cancel the discovery </param>
- /// <returns>A task the resolves the list of active servers on the broker server</returns>
- public static async Task<ICachePeerAdvertisment[]?> DiscoverCacheNodesAsync(this FBMClient client, CancellationToken token = default)
- {
- //Get the stored client config
- CacheClientConfiguration conf = ClientCacheConfig.GetOrCreateValue(client);
-
- //List servers async
- return conf.CacheServers = await ListServersAsync(conf, token);
- }
+
/// <summary>
/// Waits for the client to disconnect from the server while observing
@@ -428,15 +357,18 @@ namespace VNLib.Data.Caching.Extensions
/// <exception cref="ArgumentNullException"></exception>
/// <exception cref="SecurityException"></exception>
/// <exception cref="ObjectDisposedException"></exception>
- public static async Task<ICachePeerAdvertisment> ConnectToRandomCacheAsync(this FBMClient client, CancellationToken cancellation = default)
+ public static async Task<ICacheNodeAdvertisment> ConnectToRandomCacheAsync(this FBMClient client, CancellationToken cancellation = default)
{
//Get stored config
CacheClientConfiguration conf = ClientCacheConfig.GetOrCreateValue(client);
- //Select random
- ICachePeerAdvertisment? randomServer = conf.CacheServers?.SelectRandom()
- ?? throw new ArgumentException("No servers detected, cannot connect");
+ //Get all available nodes, or at least the initial peers
+ ICacheNodeAdvertisment[]? adverts = conf.NodeCollection.GetAllNodes() ?? conf.InitialPeers ?? throw new ArgumentException("No cache nodes discovered, cannot connect");
+ //Select random node from all available nodes
+ ICacheNodeAdvertisment randomServer = adverts.SelectRandom();
+
+ //Connect to the random server
await ConnectToCacheAsync(client, randomServer, cancellation);
//Return the random server we connected to
@@ -456,13 +388,14 @@ namespace VNLib.Data.Caching.Extensions
/// <exception cref="ArgumentNullException"></exception>
/// <exception cref="SecurityException"></exception>
/// <exception cref="ObjectDisposedException"></exception>
- public static Task ConnectToCacheAsync(this FBMClient client, ICachePeerAdvertisment server, CancellationToken token = default)
+ public static Task ConnectToCacheAsync(this FBMClient client, ICacheNodeAdvertisment server, CancellationToken token = default)
{
_ = client ?? throw new ArgumentNullException(nameof(client));
_ = server ?? throw new ArgumentNullException(nameof(server));
//Get stored config
CacheClientConfiguration conf = ClientCacheConfig.GetOrCreateValue(client);
+
//Connect to server (no server id because client not replication server)
return ConnectToCacheAsync(client, conf, server, token);
}
@@ -481,7 +414,7 @@ namespace VNLib.Data.Caching.Extensions
/// <exception cref="ArgumentNullException"></exception>
/// <exception cref="SecurityException"></exception>
/// <exception cref="ObjectDisposedException"></exception>
- public static Task ConnectToCacheAsync(this FBMClient client, ICachePeerAdvertisment server, CacheClientConfiguration explicitConfig, CancellationToken token = default)
+ public static Task ConnectToCacheAsync(this FBMClient client, ICacheNodeAdvertisment server, CacheClientConfiguration explicitConfig, CancellationToken token = default)
{
_ = client ?? throw new ArgumentNullException(nameof(client));
_ = server ?? throw new ArgumentNullException(nameof(server));
@@ -494,7 +427,7 @@ namespace VNLib.Data.Caching.Extensions
private static async Task ConnectToCacheAsync(
FBMClient client,
CacheClientConfiguration config,
- ICachePeerAdvertisment server,
+ ICacheNodeAdvertisment server,
CancellationToken token = default
)
{
@@ -513,7 +446,7 @@ namespace VNLib.Data.Caching.Extensions
//Init jwt for connecting to server
using (JsonWebToken jwt = new())
{
- jwt.WriteHeader(config.GetJwtHeader());
+ jwt.WriteHeader(config.AuthManager.GetJwtHeader());
//Init claim
JwtPayload claim = jwt.InitPayloadClaim();
@@ -532,7 +465,7 @@ namespace VNLib.Data.Caching.Extensions
claim.CommitClaims();
//Sign jwt
- config.SignJwt(jwt);
+ config.AuthManager.SignJwt(jwt);
//Compile to string
jwtMessage = jwt.Compile();
@@ -576,7 +509,7 @@ namespace VNLib.Data.Caching.Extensions
using (JsonWebToken jwt = JsonWebToken.Parse(authToken))
{
//Verify the jwt
- if (!config.VerifyCache(jwt))
+ if (!config.AuthManager.VerifyJwt(jwt))
{
throw new SecurityException("Failed to verify the cache server's negotiation message, cannot continue");
}
@@ -591,7 +524,7 @@ namespace VNLib.Data.Caching.Extensions
client.ClientSocket.Headers[HttpRequestHeader.Authorization] = authToken;
//Compute the signature of the upgrade token
- client.ClientSocket.Headers[X_UPGRADE_SIG_HEADER] = GetBase64UpgradeSingature(authToken, config.SigningKey!);
+ client.ClientSocket.Headers[X_UPGRADE_SIG_HEADER] = config.AuthManager.GetBase64UpgradeSingature(authToken);
//Check to see if adversize self is enabled
if (cnc?.BroadcastAdverisment == true)
@@ -660,38 +593,16 @@ namespace VNLib.Data.Caching.Extensions
* compute a signature of the upgrade token and send it to the server to prove we hold the private key.
*/
- private static string GetBase64UpgradeSingature(string? token, ReadOnlyJsonWebKey key)
+ private static string GetBase64UpgradeSingature(this ICacheAuthManager man, string? token)
{
- //try to get the ecdsa key first
- using ECDsa? ec = key.GetECDsaPrivateKey();
-
- if(ec != null)
- {
- //Compute hash of the token
- byte[] hash = ManagedHash.ComputeHash(token, HashAlg.SHA256);
-
- //Sign the hash
- byte[] sig = ec.SignHash(hash, DSASignatureFormat.IeeeP1363FixedFieldConcatenation);
-
- //Return the base64 string
- return Convert.ToBase64String(sig);
- }
-
- //Check rsa next
- using RSA? rsa = key.GetRSAPrivateKey();
- if(rsa != null)
- {
- //Compute hash of the token
- byte[] hash = ManagedHash.ComputeHash(token, HashAlg.SHA256);
+ //Compute hash of the token
+ byte[] hash = ManagedHash.ComputeHash(token, HashAlg.SHA256);
- //Sign the hash
- byte[] sig = rsa.SignHash(hash, HashAlgorithmName.SHA256, RSASignaturePadding.Pkcs1);
+ //Sign the hash
+ byte[] sig = man.SignMessageHash(hash, HashAlg.SHA256);
- //Return the base64 string
- return Convert.ToBase64String(sig);
- }
-
- throw new CryptographicException("Cache JKW does not export a supported private key for upgrade challenges");
+ //Return the base64 string
+ return Convert.ToBase64String(sig);
}
/// <summary>
@@ -704,21 +615,21 @@ namespace VNLib.Data.Caching.Extensions
/// <exception cref="CryptographicException"></exception>
public static bool VerifyUpgradeToken(this CacheClientConfiguration nodeConfig, string signature, string token)
{
- return VerifyUpgradeToken(signature, token, nodeConfig.VerificationKey);
+ return VerifyUpgradeToken(nodeConfig.AuthManager, signature, token);
}
/// <summary>
/// Verifies the signed auth token against the given verification key
/// </summary>
+ /// <param name="man"></param>
/// <param name="signature">The base64 signature of the token</param>
/// <param name="token">The raw token to compute the hash of</param>
- /// <param name="verifcationKey">The key used to verify the singature with</param>
/// <returns>True if the singature matches, false otherwise</returns>
/// <exception cref="ArgumentNullException"></exception>
/// <exception cref="CryptographicException"></exception>
- public static bool VerifyUpgradeToken(string signature, string token, ReadOnlyJsonWebKey verifcationKey)
+ public static bool VerifyUpgradeToken(this ICacheAuthManager man, string signature, string token)
{
- _ = verifcationKey ?? throw new ArgumentNullException(nameof(verifcationKey));
+ _ = man ?? throw new ArgumentNullException(nameof(man));
//get the hash of the token
byte[] hash = ManagedHash.ComputeHash(token, HashAlg.SHA256);
@@ -726,23 +637,7 @@ namespace VNLib.Data.Caching.Extensions
//decode the signature
byte[] sig = Convert.FromBase64String(signature);
- //try to get the ecdsa key first
- using ECDsa? ec = verifcationKey.GetECDsaPublicKey();
- if(ec != null)
- {
- //Verify the signature
- return ec.VerifyHash(hash, sig, DSASignatureFormat.IeeeP1363FixedFieldConcatenation);
- }
-
- //Check rsa next
- using RSA? rsa = verifcationKey.GetRSAPublicKey();
- if(rsa != null)
- {
- //Verify the signature
- return rsa.VerifyHash(hash, sig, HashAlgorithmName.SHA256, RSASignaturePadding.Pkcs1);
- }
-
- throw new CryptographicException("Cache JKW does not export a supported public key for upgrade challenges");
+ return man.VerifyMessageHash(hash, HashAlg.SHA256, sig);
}
private static string GetAdvertismentHeader(CacheNodeConfiguration nodeConfiguration)
@@ -756,7 +651,7 @@ namespace VNLib.Data.Caching.Extensions
using JsonWebToken jwt = new();
//Get the jwt header
- jwt.WriteHeader(nodeConfiguration.GetJwtHeader());
+ jwt.WriteHeader(nodeConfiguration.AuthManager.GetJwtHeader());
jwt.InitPayloadClaim()
.AddClaim("nonce", RandomHash.GetRandomBase32(16))
@@ -768,7 +663,7 @@ namespace VNLib.Data.Caching.Extensions
.CommitClaims();
//Sign message
- nodeConfiguration.SignJwt(jwt);
+ nodeConfiguration.AuthManager.SignJwt(jwt);
return jwt.Compile();
}
@@ -780,12 +675,12 @@ namespace VNLib.Data.Caching.Extensions
/// <param name="message">The advertisment message to verify</param>
/// <returns>The advertisment message if successfully verified, or null otherwise</returns>
/// <exception cref="FormatException"></exception>
- public static ICachePeerAdvertisment? VerifyPeerAdvertisment(this ICacheJwtManager config, string message)
+ public static ICacheNodeAdvertisment? VerifyPeerAdvertisment(this ICacheAuthManager config, string message)
{
using JsonWebToken jwt = JsonWebToken.Parse(message);
//Verify the signature
- if (!config.VerifyCache(jwt))
+ if (!config.VerifyJwt(jwt))
{
return null;
}
@@ -800,7 +695,7 @@ namespace VNLib.Data.Caching.Extensions
/// </summary>
/// <param name="servers"></param>
/// <returns>A server selected at random</returns>
- public static ICachePeerAdvertisment SelectRandom(this ICollection<ICachePeerAdvertisment> servers)
+ public static ICacheNodeAdvertisment SelectRandom(this ICollection<ICacheNodeAdvertisment> servers)
{
//select random server
int randServer = RandomNumberGenerator.GetInt32(0, servers.Count);
@@ -808,7 +703,7 @@ namespace VNLib.Data.Caching.Extensions
}
- private class Advertisment : ICachePeerAdvertisment
+ private class Advertisment : ICacheNodeAdvertisment
{
[JsonIgnore]
public Uri? ConnectEndpoint { get; set; }
diff --git a/lib/VNLib.Data.Caching.Extensions/src/ICacheAuthManager.cs b/lib/VNLib.Data.Caching.Extensions/src/ICacheAuthManager.cs
new file mode 100644
index 0000000..e3ab868
--- /dev/null
+++ b/lib/VNLib.Data.Caching.Extensions/src/ICacheAuthManager.cs
@@ -0,0 +1,76 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Data.Caching.Extensions
+* File: ClientCacheConfiguration.cs
+*
+* ClientCacheConfiguration.cs is part of VNLib.Data.Caching.Extensions which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* VNLib.Data.Caching.Extensions is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Data.Caching.Extensions is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+using System.Collections.Generic;
+
+using VNLib.Hashing;
+using VNLib.Hashing.IdentityUtility;
+
+namespace VNLib.Data.Caching.Extensions
+{
+ /// <summary>
+ /// Provides authentication services for cache clients and
+ /// servers.
+ /// </summary>
+ public interface ICacheAuthManager
+ {
+ /// <summary>
+ /// Gets the JWT header to use for signing messages with the
+ /// given key
+ /// </summary>
+ /// <returns>The JWT header collection</returns>
+ IReadOnlyDictionary<string, string?> GetJwtHeader();
+
+ /// <summary>
+ /// Signs the given JWT
+ /// </summary>
+ /// <param name="jwt">The message to sign</param>
+ void SignJwt(JsonWebToken jwt);
+
+ /// <summary>
+ /// Verifies the given JWT
+ /// </summary>
+ /// <param name="jwt">The message to verify authenticity</param>
+ /// <returns>True of the JWT could be verified, false otherwise</returns>
+ bool VerifyJwt(JsonWebToken jwt);
+
+ /// <summary>
+ /// Signs the given message hash
+ /// </summary>
+ /// <param name="hash">The message hash to sign</param>
+ /// <param name="alg">The algorithm used to sign the message hash</param>
+ /// <returns>The signature of the hash</returns>
+ byte[] SignMessageHash(byte[] hash, HashAlg alg);
+
+ /// <summary>
+ /// Verifies the given message hash against the signature.
+ /// </summary>
+ /// <param name="hash">The message hash to compare</param>
+ /// <param name="alg">The algorithm used to produce the message hash</param>
+ /// <param name="signature">The message signature to verify the message against</param>
+ /// <returns>True of the signature could be verified</returns>
+ bool VerifyMessageHash(ReadOnlySpan<byte> hash, HashAlg alg, ReadOnlySpan<byte> signature);
+ }
+}
diff --git a/lib/VNLib.Data.Caching.Extensions/src/ICacheDiscoveryErrorHandler.cs b/lib/VNLib.Data.Caching.Extensions/src/ICacheDiscoveryErrorHandler.cs
new file mode 100644
index 0000000..3493d48
--- /dev/null
+++ b/lib/VNLib.Data.Caching.Extensions/src/ICacheDiscoveryErrorHandler.cs
@@ -0,0 +1,41 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Data.Caching.Extensions
+* File: ClientCacheConfiguration.cs
+*
+* ClientCacheConfiguration.cs is part of VNLib.Data.Caching.Extensions which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* VNLib.Data.Caching.Extensions is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Data.Caching.Extensions is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+
+namespace VNLib.Data.Caching.Extensions
+{
+ /// <summary>
+ /// Represents an type that will handle errors that occur during the discovery process
+ /// </summary>
+ public interface ICacheDiscoveryErrorHandler
+ {
+ /// <summary>
+ /// Invoked when an error occurs during the discovery process
+ /// </summary>
+ /// <param name="errorNode">The node that the error occured on</param>
+ /// <param name="ex">The exception that caused the invocation</param>
+ void OnDiscoveryError(ICacheNodeAdvertisment errorNode, Exception ex);
+ }
+}
diff --git a/lib/VNLib.Data.Caching.Extensions/src/ICachePeerAdvertisment.cs b/lib/VNLib.Data.Caching.Extensions/src/ICacheNodeAdvertisment.cs
index acf883e..fc29955 100644
--- a/lib/VNLib.Data.Caching.Extensions/src/ICachePeerAdvertisment.cs
+++ b/lib/VNLib.Data.Caching.Extensions/src/ICacheNodeAdvertisment.cs
@@ -3,9 +3,9 @@
*
* Library: VNLib
* Package: VNLib.Data.Caching.Extensions
-* File: ICachePeerAdvertisment.cs
+* File: ICacheNodeAdvertisment.cs
*
-* ICachePeerAdvertisment.cs is part of VNLib.Data.Caching.Extensions which is part of the larger
+* ICacheNodeAdvertisment.cs is part of VNLib.Data.Caching.Extensions which is part of the larger
* VNLib collection of libraries and utilities.
*
* VNLib.Data.Caching.Extensions is free software: you can redistribute it and/or modify
@@ -30,7 +30,7 @@ namespace VNLib.Data.Caching.Extensions
/// <summary>
/// Represents a node that can be advertised to clients
/// </summary>
- public interface ICachePeerAdvertisment
+ public interface ICacheNodeAdvertisment
{
/// <summary>
/// The endpoint for clients to connect to to access the cache
diff --git a/lib/VNLib.Data.Caching.Extensions/src/INodeDiscoveryCollection.cs b/lib/VNLib.Data.Caching.Extensions/src/INodeDiscoveryCollection.cs
new file mode 100644
index 0000000..9adebdc
--- /dev/null
+++ b/lib/VNLib.Data.Caching.Extensions/src/INodeDiscoveryCollection.cs
@@ -0,0 +1,62 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Data.Caching.Extensions
+* File: INodeDiscoveryCollection.cs
+*
+* INodeDiscoveryCollection.cs is part of VNLib.Data.Caching.Extensions which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* VNLib.Data.Caching.Extensions is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Data.Caching.Extensions is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+
+using System.Collections.Generic;
+
+namespace VNLib.Data.Caching.Extensions
+{
+ /// <summary>
+ /// Represents a collection of discovered nodes
+ /// </summary>
+#pragma warning disable CA1711 // Identifiers should not have incorrect suffix
+ public interface INodeDiscoveryCollection
+#pragma warning restore CA1711 // Identifiers should not have incorrect suffix
+ {
+ /// <summary>
+ /// Begins a new discovery and gets an enumerator for the discovery process
+ /// </summary>
+ /// <returns>An enumerator that simplifies discovery of unique nodes</returns>
+ INodeDiscoveryEnumerator BeginDiscovery();
+
+ /// <summary>
+ /// Begins a new discovery and gets an enumerator for the discovery process
+ /// </summary>
+ /// <param name="initialPeers">An initial collection of peers to add to the enumeration</param>
+ /// <returns>An enumerator that simplifies discovery of unique nodes</returns>
+ INodeDiscoveryEnumerator BeginDiscovery(IEnumerable<ICacheNodeAdvertisment> initialPeers);
+
+ /// <summary>
+ /// Gets a snapshot of all discovered nodes in the current collection.
+ /// </summary>
+ /// <returns>The current collection of notes</returns>
+ ICacheNodeAdvertisment[] GetAllNodes();
+
+ /// <summary>
+ /// Completes a discovery process and updates the collection with the results
+ /// </summary>
+ /// <param name="enumerator">The enumerator used to collect discovered nodes</param>
+ void CompleteDiscovery(INodeDiscoveryEnumerator enumerator);
+ }
+}
diff --git a/lib/VNLib.Data.Caching.Extensions/src/INodeDiscoveryEnumerator.cs b/lib/VNLib.Data.Caching.Extensions/src/INodeDiscoveryEnumerator.cs
new file mode 100644
index 0000000..f6d5f40
--- /dev/null
+++ b/lib/VNLib.Data.Caching.Extensions/src/INodeDiscoveryEnumerator.cs
@@ -0,0 +1,42 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Data.Caching.Extensions
+* File: INodeDiscoveryEnumerator.cs
+*
+* INodeDiscoveryEnumerator.cs is part of VNLib.Data.Caching.Extensions which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* VNLib.Data.Caching.Extensions is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Data.Caching.Extensions is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+
+using System.Collections.Generic;
+
+
+namespace VNLib.Data.Caching.Extensions
+{
+ /// <summary>
+ /// A custom enumerator for the node discovery process
+ /// </summary>
+ public interface INodeDiscoveryEnumerator : IEnumerator<ICacheNodeAdvertisment>
+ {
+ /// <summary>
+ /// Adds the specified peer to the collection of discovered peers
+ /// </summary>
+ /// <param name="discoveredPeers">The peer collection</param>
+ void OnPeerDiscoveryComplete(IEnumerable<ICacheNodeAdvertisment> discoveredPeers);
+ }
+}
diff --git a/lib/VNLib.Data.Caching.Extensions/src/NodeDiscoveryCollection.cs b/lib/VNLib.Data.Caching.Extensions/src/NodeDiscoveryCollection.cs
new file mode 100644
index 0000000..305f5de
--- /dev/null
+++ b/lib/VNLib.Data.Caching.Extensions/src/NodeDiscoveryCollection.cs
@@ -0,0 +1,117 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Data.Caching.Extensions
+* File: INodeDiscoveryCollection.cs
+*
+* INodeDiscoveryCollection.cs is part of VNLib.Data.Caching.Extensions which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* VNLib.Data.Caching.Extensions is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Data.Caching.Extensions is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+using System.Linq;
+using System.Collections.Generic;
+using System.Collections;
+
+namespace VNLib.Data.Caching.Extensions
+{
+ /// <summary>
+ /// Represents a collection of available cache nodes from a discovery process
+ /// </summary>
+ public sealed class NodeDiscoveryCollection : INodeDiscoveryCollection
+ {
+ private LinkedList<ICacheNodeAdvertisment> _peers;
+
+ /// <summary>
+ /// Initializes a new empty <see cref="NodeDiscoveryCollection"/>
+ /// </summary>
+ public NodeDiscoveryCollection()
+ {
+ _peers = new();
+ }
+
+ ///<inheritdoc/>
+ public INodeDiscoveryEnumerator BeginDiscovery()
+ {
+ return new NodeEnumerator(new());
+ }
+
+ ///<inheritdoc/>
+ public INodeDiscoveryEnumerator BeginDiscovery(IEnumerable<ICacheNodeAdvertisment> initialPeers)
+ {
+ //Init new enumerator with the initial peers
+ return new NodeEnumerator(new(initialPeers));
+ }
+
+ ///<inheritdoc/>
+ public void CompleteDiscovery(INodeDiscoveryEnumerator enumerator)
+ {
+ _ = enumerator ?? throw new ArgumentNullException(nameof(enumerator));
+
+ //Capture all nodes from the enumerator and store them as our current peers
+ _peers = (enumerator as NodeEnumerator)!.Peers;
+ }
+
+ ///<inheritdoc/>
+ public ICacheNodeAdvertisment[] GetAllNodes()
+ {
+ //Capture all current peers
+ return _peers.ToArray();
+ }
+
+ private sealed record class NodeEnumerator(LinkedList<ICacheNodeAdvertisment> Peers) : INodeDiscoveryEnumerator
+ {
+ //Keep track of the current node in the collection so we can move down the list
+ private LinkedListNode<ICacheNodeAdvertisment>? _currentNode = Peers.First;
+
+ public ICacheNodeAdvertisment Current => _currentNode?.Value;
+ object IEnumerator.Current => _currentNode?.Value;
+
+
+ ///<inheritdoc/>
+ public bool MoveNext()
+ {
+ //Move to the next peer in the collection
+ _currentNode = _currentNode?.Next;
+
+ return _currentNode?.Value != null;
+ }
+
+ ///<inheritdoc/>
+ public void OnPeerDiscoveryComplete(IEnumerable<ICacheNodeAdvertisment> discoveredPeers)
+ {
+ //Get only the peers from the discovery that are not already in the collection
+ IEnumerable<ICacheNodeAdvertisment> newPeers = discoveredPeers.Except(Peers);
+
+ //Add them to the end of the collection
+ foreach (ICacheNodeAdvertisment ad in newPeers)
+ {
+ Peers.AddLast(ad);
+ }
+ }
+
+ public void Reset()
+ {
+ //Go to the first node
+ _currentNode = Peers.First;
+ }
+
+ public void Dispose()
+ { }
+ }
+ }
+}
diff --git a/lib/VNLib.Data.Caching.ObjectCache/src/BlobCacheLIstener.cs b/lib/VNLib.Data.Caching.ObjectCache/src/BlobCacheLIstener.cs
index f69c2a4..0fe0663 100644
--- a/lib/VNLib.Data.Caching.ObjectCache/src/BlobCacheLIstener.cs
+++ b/lib/VNLib.Data.Caching.ObjectCache/src/BlobCacheLIstener.cs
@@ -42,7 +42,6 @@ using System;
using System.Threading;
using System.Threading.Tasks;
-using VNLib.Utils.Async;
using VNLib.Utils.Memory;
using VNLib.Utils.Logging;
using VNLib.Net.Messaging.FBM.Server;
@@ -65,7 +64,7 @@ namespace VNLib.Data.Caching.ObjectCache
/// <summary>
/// A queue that stores update and delete events
/// </summary>
- public AsyncQueue<ChangeEvent> EventQueue { get; }
+ public ICacheListenerEventQueue EventQueue { get; }
/// <summary>
/// The Cache store to access data blobs
@@ -77,18 +76,17 @@ namespace VNLib.Data.Caching.ObjectCache
/// Initialzies a new <see cref="BlobCacheListener"/>
/// </summary>
/// <param name="cache">The cache table to work from</param>
+ /// <param name="queue">The event queue to publish changes to</param>
/// <param name="log">Writes error and debug logging information</param>
/// <param name="heap">The heap to alloc FBM buffers and <see cref="CacheEntry"/> cache buffers from</param>
- /// <param name="singleReader">A value that indicates if a single thread is processing events</param>
/// <exception cref="ArgumentNullException"></exception>
- public BlobCacheListener(IBlobCacheTable cache, ILogProvider log, IUnmangedHeap heap, bool singleReader)
+ public BlobCacheListener(IBlobCacheTable cache, ICacheListenerEventQueue queue, ILogProvider log, IUnmangedHeap heap)
{
Log = log;
Cache = cache ?? throw new ArgumentNullException(nameof(cache));
- //Writes may happen from multple threads with bucket design and no lock
- EventQueue = new(false, singleReader);
+ EventQueue = queue ?? throw new ArgumentNullException(nameof(queue));
InitListener(heap);
}
@@ -161,32 +159,25 @@ namespace VNLib.Data.Caching.ObjectCache
}
}
- static async Task DequeAsync(AsyncQueue<ChangeEvent> queue, FBMContext context, CancellationToken exitToken)
- {
- //Wait for a new message to process
- ChangeEvent ev = await queue.DequeueAsync(exitToken);
-
- //Set the response
- SetResponse(ev, context);
- }
-
- //If no event bus is registered, then this is not a legal command
- if (userState is not AsyncQueue<ChangeEvent> eventBus)
+ //Determine if the queue is enabled for the user
+ if(!EventQueue.IsEnabled(userState!))
{
context.CloseResponse(ResponseCodes.NotFound);
-
return;
}
//try to deq without awaiting
- if (eventBus.TryDequeue(out ChangeEvent? change))
+ if (EventQueue.TryDequeue(userState!, out ChangeEvent? change))
{
SetResponse(change, context);
}
else
{
- //Process async
- await DequeAsync(eventBus, context, exitToken);
+ //Wait for a new message to process
+ ChangeEvent ev = await EventQueue.DequeueAsync(userState!, exitToken);
+
+ //Set the response
+ SetResponse(ev, context);
}
return;
@@ -257,10 +248,7 @@ namespace VNLib.Data.Caching.ObjectCache
private void EnqueEvent(ChangeEvent change)
{
- if (!EventQueue.TryEnque(change))
- {
- Log.Warn("Change event {ev} was not enqued because the event queue is overflowing!", change.CurrentId);
- }
+ EventQueue.PublishEvent(change);
}
diff --git a/lib/VNLib.Data.Caching.ObjectCache/src/ICacheListenerEventQueue.cs b/lib/VNLib.Data.Caching.ObjectCache/src/ICacheListenerEventQueue.cs
new file mode 100644
index 0000000..06be4fa
--- /dev/null
+++ b/lib/VNLib.Data.Caching.ObjectCache/src/ICacheListenerEventQueue.cs
@@ -0,0 +1,65 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Data.Caching.ObjectCache
+* File: ICacheListenerEventQueue.cs
+*
+* ICacheListenerEventQueue.cs is part of VNLib.Data.Caching.ObjectCache which
+* is part of the larger VNLib collection of libraries and utilities.
+*
+* VNLib.Data.Caching.ObjectCache is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Data.Caching.ObjectCache is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace VNLib.Data.Caching.ObjectCache
+{
+ /// <summary>
+ /// Represents a single client's event queue
+ /// </summary>
+ public interface ICacheListenerEventQueue
+ {
+ /// <summary>
+ /// Determines if the queue is enabled for the given user state
+ /// </summary>
+ /// <param name="userState">The unique state of the connection</param>
+ /// <returns>True if event queuing is enabled</returns>
+ bool IsEnabled(object userState);
+
+ /// <summary>
+ /// Attempts to dequeue a single event from the queue without blocking
+ /// </summary>
+ /// <param name="userState">A user state object to associate with the wait operation</param>
+ /// <param name="changeEvent">The dequeued event if successfully dequeued</param>
+ /// <returns>True if an event was waiting and could be dequeued, false otherwise</returns>
+ bool TryDequeue(object userState, out ChangeEvent changeEvent);
+
+ /// <summary>
+ /// Waits asynchronously for an event to be dequeued
+ /// </summary>
+ /// <param name="userState">A user state object to associate with the wait operation</param>
+ /// <param name="cancellation">A token to cancel the wait operation</param>
+ /// <returns>The <see cref="ChangeEvent"/> that as a result of the dequeue operation</returns>
+ ValueTask<ChangeEvent> DequeueAsync(object userState, CancellationToken cancellation);
+
+ /// <summary>
+ /// Publishes an event to the queue
+ /// </summary>
+ /// <param name="changeEvent">The change event to publish</param>
+ void PublishEvent(ChangeEvent changeEvent);
+ }
+}
diff --git a/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs b/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs
index 9f5ccfe..f4f059b 100644
--- a/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs
+++ b/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs
@@ -31,15 +31,17 @@ using System.Net.WebSockets;
using System.Collections.Generic;
using System.Security.Cryptography;
+using VNLib.Hashing;
+using VNLib.Hashing.IdentityUtility;
using VNLib.Utils.Memory;
using VNLib.Utils.Logging;
-using VNLib.Hashing.IdentityUtility;
using VNLib.Data.Caching;
using VNLib.Data.Caching.Extensions;
using VNLib.Data.Caching.ObjectCache;
using VNLib.Net.Messaging.FBM.Client;
using VNLib.Plugins.Extensions.Loading;
+
namespace VNLib.Plugins.Extensions.VNCache
{
public interface ICacheRefreshPolicy
@@ -49,6 +51,7 @@ namespace VNLib.Plugins.Extensions.VNCache
TimeSpan RefreshInterval { get; }
}
+
/// <summary>
/// A base class that manages
/// </summary>
@@ -81,8 +84,6 @@ namespace VNLib.Plugins.Extensions.VNCache
_config = config;
- Uri brokerUri = new(config.BrokerAddress!);
-
//Init the client with default settings
FBMClientConfig conf = FBMDataCacheExtensions.GetDefaultConfig(MemoryUtil.Shared, config.MaxMessageSize!.Value, config.RequestTimeout, debugLog);
@@ -90,30 +91,18 @@ namespace VNLib.Plugins.Extensions.VNCache
//Add the configuration to the client
Client.GetCacheConfiguration()
- .WithBroker(brokerUri)
- .WithTls(brokerUri.Scheme == Uri.UriSchemeHttps);
+ .WithTls(config.UseTls)
+ .WithInitialPeers(config.InitialNodes!);
}
-
- public virtual async Task ConfigureServiceAsync(PluginBase plugin)
+ public Task ConfigureServiceAsync(PluginBase plugin)
{
- //Get keys async
- Task<ReadOnlyJsonWebKey?> clientPrivTask = plugin.TryGetSecretAsync("client_private_key").ToJsonWebKey();
- Task<ReadOnlyJsonWebKey?> brokerPubTask = plugin.TryGetSecretAsync("broker_public_key").ToJsonWebKey();
- Task<ReadOnlyJsonWebKey?> cachePubTask = plugin.TryGetSecretAsync("cache_public_key").ToJsonWebKey();
-
- //Wait for all tasks to complete
- _ = await Task.WhenAll(clientPrivTask, brokerPubTask, cachePubTask);
-
- ReadOnlyJsonWebKey clientPriv = await clientPrivTask ?? throw new KeyNotFoundException("Missing required secret client_private_key");
- ReadOnlyJsonWebKey brokerPub = await brokerPubTask ?? throw new KeyNotFoundException("Missing required secret broker_public_key");
- ReadOnlyJsonWebKey cachePub = await cachePubTask ?? throw new KeyNotFoundException("Missing required secret cache_public_key");
-
- //Connection authentication methods
+ //Set authenticator
Client.GetCacheConfiguration()
- .WithVerificationKey(cachePub)
- .WithSigningKey(clientPriv)
- .WithBrokerVerificationKey(brokerPub);
+ .WithAuthenticator(new AuthManager(plugin))
+ .WithErrorHandler(new DiscoveryErrHAndler(plugin.Log));
+
+ return Task.CompletedTask;
}
/*
@@ -127,7 +116,7 @@ namespace VNLib.Plugins.Extensions.VNCache
while (true)
{
//Load the server list
- ICachePeerAdvertisment[]? servers;
+ ICacheNodeAdvertisment[]? servers;
while (true)
{
try
@@ -163,7 +152,7 @@ namespace VNLib.Plugins.Extensions.VNCache
pluginLog.Debug("Connecting to random cache server");
//Connect to a random server
- ICachePeerAdvertisment selected = await Client.ConnectToRandomCacheAsync(exitToken);
+ ICacheNodeAdvertisment selected = await Client.ConnectToRandomCacheAsync(exitToken);
pluginLog.Debug("Connected to cache server {s}", selected.NodeId);
//Set connection status flag
@@ -255,5 +244,98 @@ namespace VNLib.Plugins.Extensions.VNCache
? throw new InvalidOperationException("The underlying client is not connected to a cache node")
: Client!.AddOrUpdateObjectAsync(key, newKey, value, serialzer, cancellation);
}
+
+
+ private sealed class AuthManager : ICacheAuthManager
+ {
+
+ private IAsyncLazy<ReadOnlyJsonWebKey> _sigKey;
+ private IAsyncLazy<ReadOnlyJsonWebKey> _verKey;
+
+ public AuthManager(PluginBase plugin)
+ {
+ //Lazy load keys
+
+ //Get the signing key
+ _sigKey = plugin.GetSecretAsync("client_private_key").ToLazy(static r => r.GetJsonWebKey());
+
+ //Lazy load cache public key
+ _verKey = plugin.GetSecretAsync("cache_public_key").ToLazy(static r => r.GetJsonWebKey());
+ }
+
+ public async Task AwaitLazyKeyLoad()
+ {
+ await _sigKey;
+ await _verKey;
+ }
+
+ ///<inheritdoc/>
+ public IReadOnlyDictionary<string, string?> GetJwtHeader()
+ {
+ //Get the signing key jwt header
+ return _sigKey.Value.JwtHeader;
+ }
+
+ ///<inheritdoc/>
+ public void SignJwt(JsonWebToken jwt)
+ {
+ //Sign the jwt with signing key
+ jwt.SignFromJwk(_sigKey.Value);
+ }
+
+ ///<inheritdoc/>
+ public byte[] SignMessageHash(byte[] hash, HashAlg alg)
+ {
+ //try to get the rsa alg for the signing key
+ using RSA? rsa = _sigKey.Value.GetRSAPublicKey();
+ if(rsa != null)
+ {
+ return rsa.SignHash(hash, alg.GetAlgName(), RSASignaturePadding.Pkcs1);
+ }
+
+ //try to get the ecdsa alg for the signing key
+ using ECDsa? ecdsa = _sigKey.Value.GetECDsaPublicKey();
+ if(ecdsa != null)
+ {
+ return ecdsa.SignHash(hash);
+ }
+
+ throw new NotSupportedException("The signing key is not a valid RSA or ECDSA key");
+ }
+
+ ///<inheritdoc/>
+ public bool VerifyJwt(JsonWebToken jwt)
+ {
+ return jwt.VerifyFromJwk(_verKey.Value);
+ }
+
+ ///<inheritdoc/>
+ public bool VerifyMessageHash(ReadOnlySpan<byte> hash, HashAlg alg, ReadOnlySpan<byte> signature)
+ {
+ //try to get the rsa alg for the signing key
+ using RSA? rsa = _verKey.Value.GetRSAPublicKey();
+ if (rsa != null)
+ {
+ return rsa.VerifyHash(hash, signature, alg.GetAlgName(), RSASignaturePadding.Pkcs1);
+ }
+
+ //try to get the ecdsa alg for the signing key
+ using ECDsa? ecdsa = _verKey.Value.GetECDsaPublicKey();
+ if (ecdsa != null)
+ {
+ return ecdsa.VerifyHash(hash, signature);
+ }
+
+ throw new NotSupportedException("The current key is not an RSA or ECDSA key and is not supported");
+ }
+ }
+
+ private sealed record class DiscoveryErrHAndler(ILogProvider Logger) : ICacheDiscoveryErrorHandler
+ {
+ public void OnDiscoveryError(ICacheNodeAdvertisment errorNode, Exception ex)
+ {
+ Logger.Error("Failed to discover nodes from server {s} cause:\n{err}", errorNode.NodeId, ex);
+ }
+ }
}
} \ No newline at end of file
diff --git a/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClientConfig.cs b/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClientConfig.cs
index 1d888ec..64d3e07 100644
--- a/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClientConfig.cs
+++ b/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClientConfig.cs
@@ -25,6 +25,7 @@
using System;
using System.Text.Json.Serialization;
+using VNLib.Data.Caching.Extensions;
using VNLib.Plugins.Extensions.Loading;
namespace VNLib.Plugins.Extensions.VNCache
@@ -45,8 +46,8 @@ namespace VNLib.Plugins.Extensions.VNCache
/// <summary>
/// The broker server address
/// </summary>
- [JsonPropertyName("broker_address")]
- public string? BrokerAddress { get; set; }
+ [JsonPropertyName("use_tls")]
+ public bool UseTls { get; set; } = true;
/// <summary>
/// The time (in seconds) to randomly delay polling the broker server
@@ -77,6 +78,12 @@ namespace VNLib.Plugins.Extensions.VNCache
/// </summary>
internal TimeSpan RequestTimeout => TimeSpan.FromSeconds(RequestTimeoutSeconds!.Value);
+ /// <summary>
+ /// The initial peers to connect to
+ /// </summary>
+ [JsonPropertyName("initial_nodes")]
+ public InitialNode[]? InitialNodes { get; set; }
+
void IOnConfigValidation.Validate()
{
if (!MaxMessageSize.HasValue || MaxMessageSize.Value < 1)
@@ -95,9 +102,42 @@ namespace VNLib.Plugins.Extensions.VNCache
throw new ArgumentException("You must specify a positive integer FBM message timoeut", "request_timeout_sec");
}
- if(!Uri.TryCreate(BrokerAddress, UriKind.RelativeOrAbsolute, out _))
+ //Validate initial nodes
+ if (InitialNodes == null || InitialNodes.Length == 0)
+ {
+ throw new ArgumentException("You must specify at least one initial peer", "initial_peers");
+ }
+
+ foreach (InitialNode peer in InitialNodes)
+ {
+ _ = peer.ConnectEndpoint ?? throw new ArgumentException("You must specify a connect endpoint for each initial node", "initial_nodes");
+ _ = peer.NodeId ?? throw new ArgumentException("You must specify a node id for each initial node", "initial_nodes");
+ }
+ }
+
+ public sealed record class InitialNode : ICacheNodeAdvertisment
+ {
+ [JsonIgnore]
+ public Uri ConnectEndpoint { get; private set; }
+
+ [JsonIgnore]
+ public Uri? DiscoveryEndpoint { get; private set; }
+
+ [JsonPropertyName("node_id")]
+ public string? NodeId { get; set; }
+
+ [JsonPropertyName("connect_endpoint")]
+ public string? ConnectEndpointString
+ {
+ get => ConnectEndpoint.ToString();
+ set => ConnectEndpoint = new Uri(value!);
+ }
+
+ [JsonPropertyName("discovery_endpoint")]
+ public string? DiscoveryEndpointString
{
- throw new ArgumentException("You must specify a valid HTTP uri broker address", "broker_address");
+ get => DiscoveryEndpoint?.ToString();
+ set => DiscoveryEndpoint = value == null ? null : new Uri(value);
}
}
}