aboutsummaryrefslogtreecommitdiff
path: root/lib/VNLib.Data.Caching.Extensions/src
diff options
context:
space:
mode:
Diffstat (limited to 'lib/VNLib.Data.Caching.Extensions/src')
-rw-r--r--lib/VNLib.Data.Caching.Extensions/src/ApiModel/CacheSiteAdapter.cs65
-rw-r--r--lib/VNLib.Data.Caching.Extensions/src/ApiModel/DiscoveryRequest.cs (renamed from lib/VNLib.Data.Caching.Extensions/src/ActiveServer.cs)34
-rw-r--r--lib/VNLib.Data.Caching.Extensions/src/ApiModel/GetConfigRequest.cs42
-rw-r--r--lib/VNLib.Data.Caching.Extensions/src/ApiModel/ICacheConnectionRequest.cs (renamed from lib/VNLib.Data.Caching.Extensions/src/ICacheNodeAdvertisment.cs)29
-rw-r--r--lib/VNLib.Data.Caching.Extensions/src/ApiModel/NegotationRequest.cs41
-rw-r--r--lib/VNLib.Data.Caching.Extensions/src/ApiModel/ServiceEndpoints.cs176
-rw-r--r--lib/VNLib.Data.Caching.Extensions/src/Clustering/CacheClientConfiguration.cs (renamed from lib/VNLib.Data.Caching.Extensions/src/CacheClientConfiguration.cs)24
-rw-r--r--lib/VNLib.Data.Caching.Extensions/src/Clustering/CacheDiscoveryFailureException.cs44
-rw-r--r--lib/VNLib.Data.Caching.Extensions/src/Clustering/CacheNodeAdvertisment.cs96
-rw-r--r--lib/VNLib.Data.Caching.Extensions/src/Clustering/CacheNodeConfiguration.cs (renamed from lib/VNLib.Data.Caching.Extensions/src/CacheNodeConfiguration.cs)26
-rw-r--r--lib/VNLib.Data.Caching.Extensions/src/Clustering/ICacheDiscoveryErrorHandler.cs (renamed from lib/VNLib.Data.Caching.Extensions/src/ICacheDiscoveryErrorHandler.cs)4
-rw-r--r--lib/VNLib.Data.Caching.Extensions/src/Clustering/INodeDiscoveryCollection.cs (renamed from lib/VNLib.Data.Caching.Extensions/src/INodeDiscoveryCollection.cs)6
-rw-r--r--lib/VNLib.Data.Caching.Extensions/src/Clustering/INodeDiscoveryEnumerator.cs (renamed from lib/VNLib.Data.Caching.Extensions/src/INodeDiscoveryEnumerator.cs)7
-rw-r--r--lib/VNLib.Data.Caching.Extensions/src/Clustering/NodeDiscoveryCollection.cs (renamed from lib/VNLib.Data.Caching.Extensions/src/NodeDiscoveryCollection.cs)75
-rw-r--r--lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs413
-rw-r--r--lib/VNLib.Data.Caching.Extensions/src/ICacheAuthManager.cs6
16 files changed, 794 insertions, 294 deletions
diff --git a/lib/VNLib.Data.Caching.Extensions/src/ApiModel/CacheSiteAdapter.cs b/lib/VNLib.Data.Caching.Extensions/src/ApiModel/CacheSiteAdapter.cs
new file mode 100644
index 0000000..99acfd5
--- /dev/null
+++ b/lib/VNLib.Data.Caching.Extensions/src/ApiModel/CacheSiteAdapter.cs
@@ -0,0 +1,65 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Data.Caching.Extensions
+* File: CacheSiteAdapter.cs
+*
+* CacheSiteAdapter.cs is part of VNLib.Data.Caching.Extensions which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* VNLib.Data.Caching.Extensions is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Data.Caching.Extensions is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System.Net;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+
+using RestSharp;
+
+using VNLib.Net.Rest.Client;
+using VNLib.Net.Rest.Client.Construction;
+
+namespace VNLib.Data.Caching.Extensions.ApiModel
+{
+ /// <summary>
+ /// A site adapter for cache REST api requests
+ /// </summary>
+ internal sealed class CacheSiteAdapter : RestSiteAdapterBase
+ {
+ protected override RestClientPool Pool { get; }
+
+ public CacheSiteAdapter(int maxClients)
+ {
+ //Configure connection pool
+ Pool = new(maxClients, new RestClientOptions()
+ {
+ MaxTimeout = 10 * 1000,
+ FollowRedirects = false,
+ Encoding = Encoding.UTF8,
+ AutomaticDecompression = DecompressionMethods.All,
+ ThrowOnAnyError = true
+ });
+ }
+
+ public override void OnResponse(RestResponse response)
+ { }
+
+ public override Task WaitAsync(CancellationToken cancellation = default)
+ {
+ return Task.CompletedTask;
+ }
+ }
+}
diff --git a/lib/VNLib.Data.Caching.Extensions/src/ActiveServer.cs b/lib/VNLib.Data.Caching.Extensions/src/ApiModel/DiscoveryRequest.cs
index 3020376..22b11aa 100644
--- a/lib/VNLib.Data.Caching.Extensions/src/ActiveServer.cs
+++ b/lib/VNLib.Data.Caching.Extensions/src/ApiModel/DiscoveryRequest.cs
@@ -3,9 +3,9 @@
*
* Library: VNLib
* Package: VNLib.Data.Caching.Extensions
-* File: ActiveServer.cs
+* File: DiscoveryRequest.cs
*
-* ActiveServer.cs is part of VNLib.Data.Caching.Extensions which is part of the larger
+* DiscoveryRequest.cs is part of VNLib.Data.Caching.Extensions which is part of the larger
* VNLib collection of libraries and utilities.
*
* VNLib.Data.Caching.Extensions is free software: you can redistribute it and/or modify
@@ -23,29 +23,19 @@
*/
using System;
-using System.Text.Json.Serialization;
+using VNLib.Data.Caching.Extensions.Clustering;
-namespace VNLib.Data.Caching.Extensions
+namespace VNLib.Data.Caching.Extensions.ApiModel
{
- public class ActiveServer : ICacheNodeAdvertisment
+ /// <summary>
+ /// A request message for a discovery request
+ /// </summary>
+ /// <param name="DiscoveryUrl">The discovery endpoint to connec to</param>
+ /// <param name="Config">The local client configuration</param>
+ internal record class DiscoveryRequest(Uri DiscoveryUrl, CacheClientConfiguration Config)
+ : ICacheConnectionRequest
{
- [JsonPropertyName("address")]
- public string? HostName { get; set; }
-
- public string? ServerId { get; set; }
- [JsonPropertyName("ip_address")]
- public string? Ip { get; set; }
-
- public Uri ConnectEndpoint { get; }
-
- public Uri? DiscoveryEndpoint { get; }
-
- [JsonPropertyName("server_id")]
- public string NodeId { get; }
-
- ///<inheritdoc/>
- public override int GetHashCode() => ServerId!.GetHashCode(StringComparison.OrdinalIgnoreCase);
///<inheritdoc/>
- public override bool Equals(object? obj) => obj is ActiveServer s && GetHashCode() == s.GetHashCode();
+ public string? Challenge { get; set; }
}
}
diff --git a/lib/VNLib.Data.Caching.Extensions/src/ApiModel/GetConfigRequest.cs b/lib/VNLib.Data.Caching.Extensions/src/ApiModel/GetConfigRequest.cs
new file mode 100644
index 0000000..43cef4c
--- /dev/null
+++ b/lib/VNLib.Data.Caching.Extensions/src/ApiModel/GetConfigRequest.cs
@@ -0,0 +1,42 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Data.Caching.Extensions
+* File: GetConfigRequest.cs
+*
+* GetConfigRequest.cs is part of VNLib.Data.Caching.Extensions which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* VNLib.Data.Caching.Extensions is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Data.Caching.Extensions is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+using VNLib.Data.Caching.Extensions.Clustering;
+
+namespace VNLib.Data.Caching.Extensions.ApiModel
+{
+ /// <summary>
+ /// A request to get the cache configuration from a cache server's
+ /// well-known configuration endpoint
+ /// </summary>
+ /// <param name="WellKnownEp">The well-known configuration endpoint url</param>
+ /// <param name="Config">The client cache configuration</param>
+ internal record class GetConfigRequest(Uri WellKnownEp, CacheClientConfiguration Config)
+ : ICacheConnectionRequest
+ {
+ ///<inheritdoc/>
+ public string? Challenge { get; set; }
+ }
+}
diff --git a/lib/VNLib.Data.Caching.Extensions/src/ICacheNodeAdvertisment.cs b/lib/VNLib.Data.Caching.Extensions/src/ApiModel/ICacheConnectionRequest.cs
index fc29955..5fb7ba7 100644
--- a/lib/VNLib.Data.Caching.Extensions/src/ICacheNodeAdvertisment.cs
+++ b/lib/VNLib.Data.Caching.Extensions/src/ApiModel/ICacheConnectionRequest.cs
@@ -3,9 +3,9 @@
*
* Library: VNLib
* Package: VNLib.Data.Caching.Extensions
-* File: ICacheNodeAdvertisment.cs
+* File: ICacheConnectionRequest.cs
*
-* ICacheNodeAdvertisment.cs is part of VNLib.Data.Caching.Extensions which is part of the larger
+* ICacheConnectionRequest.cs is part of VNLib.Data.Caching.Extensions which is part of the larger
* VNLib collection of libraries and utilities.
*
* VNLib.Data.Caching.Extensions is free software: you can redistribute it and/or modify
@@ -22,29 +22,26 @@
* along with this program. If not, see https://www.gnu.org/licenses/.
*/
-using System;
+using VNLib.Data.Caching.Extensions.Clustering;
-
-namespace VNLib.Data.Caching.Extensions
+namespace VNLib.Data.Caching.Extensions.ApiModel
{
/// <summary>
- /// Represents a node that can be advertised to clients
+ /// Represents a request to connect to a cache server.
/// </summary>
- public interface ICacheNodeAdvertisment
+ internal interface ICacheConnectionRequest
{
/// <summary>
- /// The endpoint for clients to connect to to access the cache
- /// </summary>
- Uri ConnectEndpoint { get; }
-
- /// <summary>
- /// Gets the address for clients to connect to to discover other discovertable nodes
+ /// The <see cref="CacheClientConfiguration"/> used to configure, authenticate, and
+ /// verify messages sent to and received from cache servers.
/// </summary>
- Uri? DiscoveryEndpoint { get; }
+ CacheClientConfiguration Config { get; }
/// <summary>
- /// Gets the unique identifier for this node
+ /// An optional challenge string to be used during the authentication
+ /// process. When set, is sent in the request JWT, and is expected to
+ /// be returned in the response JWT.
/// </summary>
- string NodeId { get; }
+ string? Challenge { get; set; }
}
}
diff --git a/lib/VNLib.Data.Caching.Extensions/src/ApiModel/NegotationRequest.cs b/lib/VNLib.Data.Caching.Extensions/src/ApiModel/NegotationRequest.cs
new file mode 100644
index 0000000..5842586
--- /dev/null
+++ b/lib/VNLib.Data.Caching.Extensions/src/ApiModel/NegotationRequest.cs
@@ -0,0 +1,41 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Data.Caching.Extensions
+* File: NegotationRequest.cs
+*
+* NegotationRequest.cs is part of VNLib.Data.Caching.Extensions which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* VNLib.Data.Caching.Extensions is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Data.Caching.Extensions is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+using VNLib.Data.Caching.Extensions.Clustering;
+
+namespace VNLib.Data.Caching.Extensions.ApiModel
+{
+ /// <summary>
+ /// A request to negotiate a new connection with a cache server
+ /// </summary>
+ /// <param name="ConnectUrl">The cache endpoint uri to connec to</param>
+ /// <param name="Config">The client cache configuration</param>
+ internal record class NegotationRequest(Uri ConnectUrl, CacheClientConfiguration Config)
+ : ICacheConnectionRequest
+ {
+ ///<inheritdoc/>
+ public string? Challenge { get; set; }
+ }
+}
diff --git a/lib/VNLib.Data.Caching.Extensions/src/ApiModel/ServiceEndpoints.cs b/lib/VNLib.Data.Caching.Extensions/src/ApiModel/ServiceEndpoints.cs
new file mode 100644
index 0000000..c0de4a3
--- /dev/null
+++ b/lib/VNLib.Data.Caching.Extensions/src/ApiModel/ServiceEndpoints.cs
@@ -0,0 +1,176 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Data.Caching.Extensions
+* File: ServiceEndpoints.cs
+*
+* ServiceEndpoints.cs is part of VNLib.Data.Caching.Extensions which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* VNLib.Data.Caching.Extensions is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Data.Caching.Extensions is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+using System.Security;
+using System.Text.Json;
+
+using RestSharp;
+
+using VNLib.Net.Http;
+using VNLib.Hashing;
+using VNLib.Hashing.IdentityUtility;
+using VNLib.Net.Rest.Client.Construction;
+using ContentType = VNLib.Net.Http.ContentType;
+using VNLib.Data.Caching.Extensions.Clustering;
+
+namespace VNLib.Data.Caching.Extensions.ApiModel
+{
+ /*
+ * Defines the cache endpoints, builds and routes request messages to the
+ * server enpoints. In effect defines the client api for cache services.
+ *
+ * This class also define methods for authentication and message verification
+ */
+
+ internal static class ServiceEndpoints
+ {
+ private static readonly TimeSpan MaxTimeDisparity = TimeSpan.FromSeconds(10);
+
+ /// <summary>
+ /// Get the endpoint definition for cache services for the site adapter
+ /// </summary>
+ internal static IRestEndpointDefinition Definition { get; } = new EndpointBuilder();
+
+ private class EndpointBuilder : IRestEndpointDefinition
+ {
+ ///<inheritdoc/>
+ public void BuildRequest(IRestSiteAdapter site, IRestEndpointBuilder builder)
+ {
+ //Define cache service endpoints/requests
+
+ builder.WithEndpoint<DiscoveryRequest>()
+ .WithUrl(e => e.DiscoveryUrl)
+ .WithMethod(Method.Get)
+ //Accept text response (it should be a jwt)
+ .WithHeader("Accept", HttpHelpers.GetContentTypeString(ContentType.Text))
+ .WithHeader("Authorization", BuildDiscoveryAuthToken)
+ //Verify jwt responses
+ .OnResponse(VerifyJwtResponse);
+
+ builder.WithEndpoint<NegotationRequest>()
+ .WithUrl(e => e.ConnectUrl)
+ .WithMethod(Method.Get)
+ //Accept text response (its should be a jwt)
+ .WithHeader("Accept", HttpHelpers.GetContentTypeString(ContentType.Text))
+ .WithHeader("Authorization", BuildDiscoveryAuthToken)
+ //Verify jwt responses
+ .OnResponse(VerifyJwtResponse);
+
+ //Well known endpoint does not require authentication
+ builder.WithEndpoint<GetConfigRequest>()
+ .WithUrl(gc => gc.WellKnownEp)
+ .WithMethod(Method.Get)
+ //Responses should be a signed jwt
+ .WithHeader("Accept", HttpHelpers.GetContentTypeString(ContentType.Text))
+ //Verify jwt responses
+ .OnResponse(VerifyJwtResponse);
+ }
+ }
+
+
+ private static string BuildDiscoveryAuthToken(ICacheConnectionRequest request)
+ {
+ request.Challenge = RandomHash.GetRandomBase32(24);
+
+ //Build request jwt
+ using JsonWebToken jwt = new();
+ jwt.WriteHeader(request.Config.AuthManager.GetJwtHeader());
+
+ //See if the supplied config is for a cache node
+ CacheNodeConfiguration? cnc = request.Config as CacheNodeConfiguration;
+
+ //Init claim
+ JwtPayload claim = jwt.InitPayloadClaim();
+
+ claim.AddClaim("chl", request.Challenge)
+ .AddClaim("iat", DateTimeOffset.UtcNow.ToUnixTimeMilliseconds());
+
+ if (!string.IsNullOrWhiteSpace(cnc?.NodeId))
+ {
+ /*
+ * The unique node id so the other nodes know to load the
+ * proper event queue for the current server
+ */
+ claim.AddClaim("sub", cnc.NodeId);
+ }
+
+ claim.CommitClaims();
+
+ //sign the jwt
+ request.Config.AuthManager.SignJwt(jwt);
+
+ //Compile the jwt
+ return jwt.Compile();
+ }
+
+ private static void VerifyJwtResponse(ICacheConnectionRequest req, RestResponse response)
+ {
+ byte[] data = response.RawBytes ?? throw new ArgumentException("Server response was empty, cannot continue");
+
+ //If node config then set the is-node flag
+ bool isNode = req.Config is CacheNodeConfiguration;
+
+ //Response is jwt
+ using JsonWebToken responseJwt = JsonWebToken.ParseRaw(data);
+
+ //Verify the jwt
+ if (!req.Config.AuthManager.VerifyJwt(responseJwt, isNode))
+ {
+ throw new SecurityException("Failed to verify the discovery server's challenge, cannot continue");
+ }
+
+ //get payload as a document
+ using JsonDocument doc = responseJwt.GetPayload();
+
+ //Verify iat times
+ long iatSec = doc.RootElement.GetProperty("iat").GetInt64();
+
+ //Get dto
+ DateTimeOffset iat = DateTimeOffset.FromUnixTimeSeconds(iatSec);
+
+ DateTimeOffset now = DateTimeOffset.UtcNow;
+
+ //Verify iat is not before or after the current time with the disparity
+ if (iat.Add(MaxTimeDisparity) < now || iat.Subtract(MaxTimeDisparity) > now)
+ {
+ throw new SecurityException("Server returned a request that has expired. Please check your system clock");
+ }
+
+ //If a challenge is set, verify it
+ if (req.Challenge != null)
+ {
+ //Verify challenge
+ string challenge = doc.RootElement.GetProperty("chl").GetString()
+ ?? throw new SecurityException("Server did not return a challenge");
+
+ if (!challenge.Equals(req.Challenge, StringComparison.Ordinal))
+ {
+ throw new SecurityException("Server returned an invalid challenge");
+ }
+ }
+ }
+
+ }
+}
diff --git a/lib/VNLib.Data.Caching.Extensions/src/CacheClientConfiguration.cs b/lib/VNLib.Data.Caching.Extensions/src/Clustering/CacheClientConfiguration.cs
index 9229c89..1c1997e 100644
--- a/lib/VNLib.Data.Caching.Extensions/src/CacheClientConfiguration.cs
+++ b/lib/VNLib.Data.Caching.Extensions/src/Clustering/CacheClientConfiguration.cs
@@ -22,11 +22,14 @@
* along with this program. If not, see https://www.gnu.org/licenses/.
*/
+using System;
using System.Linq;
using System.Collections.Generic;
-namespace VNLib.Data.Caching.Extensions
+
+namespace VNLib.Data.Caching.Extensions.Clustering
{
+
/// <summary>
/// A fluent api configuration object for configuring a <see cref="FBMClient"/>
/// to connect to cache servers.
@@ -53,7 +56,7 @@ namespace VNLib.Data.Caching.Extensions
/// </summary>
public bool UseTls { get; private set; }
- internal ICacheNodeAdvertisment[]? InitialPeers { get; set; }
+ internal Uri[]? WellKnownNodes { get; set; }
/// <summary>
/// Specifies the JWT authentication manager to use for signing and verifying JWTs
@@ -73,7 +76,7 @@ namespace VNLib.Data.Caching.Extensions
public CacheClientConfiguration WithTls(bool useTls)
{
UseTls = useTls;
- return this;
+ return this;
}
/// <summary>
@@ -81,9 +84,20 @@ namespace VNLib.Data.Caching.Extensions
/// </summary>
/// <param name="peers">The collection of servers to discover peers from and connect to</param>
/// <returns>Chainable fluent object</returns>
- public CacheClientConfiguration WithInitialPeers(IEnumerable<ICacheNodeAdvertisment> peers)
+ public CacheClientConfiguration WithInitialPeers(IEnumerable<Uri> peers)
{
- InitialPeers = peers.ToArray();
+ //Check null
+ _ = peers ?? throw new ArgumentNullException(nameof(peers));
+
+ //Store peer array
+ WellKnownNodes = peers.ToArray();
+
+ if (WellKnownNodes.Any(p => !p.IsAbsoluteUri))
+ {
+ WellKnownNodes = null;
+ throw new ArgumentException("All discoverable node uris must be in absolute form");
+ }
+
return this;
}
diff --git a/lib/VNLib.Data.Caching.Extensions/src/Clustering/CacheDiscoveryFailureException.cs b/lib/VNLib.Data.Caching.Extensions/src/Clustering/CacheDiscoveryFailureException.cs
new file mode 100644
index 0000000..84d611e
--- /dev/null
+++ b/lib/VNLib.Data.Caching.Extensions/src/Clustering/CacheDiscoveryFailureException.cs
@@ -0,0 +1,44 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Data.Caching.Extensions
+* File: CacheDiscoveryFailureException.cs
+*
+* CacheDiscoveryFailureException.cs is part of VNLib.Data.Caching.Extensions which is
+* part of the larger VNLib collection of libraries and utilities.
+*
+* VNLib.Data.Caching.Extensions is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Data.Caching.Extensions is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+
+
+namespace VNLib.Data.Caching.Extensions.Clustering
+{
+ /// <summary>
+ /// Raised when an error occurs during cache discovery
+ /// </summary>
+ public class CacheDiscoveryFailureException : Exception
+ {
+ public CacheDiscoveryFailureException(string message) : base(message)
+ { }
+
+ public CacheDiscoveryFailureException(string message, Exception innerException) : base(message, innerException)
+ { }
+
+ public CacheDiscoveryFailureException()
+ { }
+ }
+}
diff --git a/lib/VNLib.Data.Caching.Extensions/src/Clustering/CacheNodeAdvertisment.cs b/lib/VNLib.Data.Caching.Extensions/src/Clustering/CacheNodeAdvertisment.cs
new file mode 100644
index 0000000..e81d506
--- /dev/null
+++ b/lib/VNLib.Data.Caching.Extensions/src/Clustering/CacheNodeAdvertisment.cs
@@ -0,0 +1,96 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Data.Caching.Extensions
+* File: CacheNodeAdvertisment.cs
+*
+* CacheNodeAdvertisment.cs is part of VNLib.Data.Caching.Extensions which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* VNLib.Data.Caching.Extensions is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Data.Caching.Extensions is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+using System.Text.Json.Serialization;
+
+namespace VNLib.Data.Caching.Extensions.Clustering
+{
+ /// <summary>
+ /// Represents a node that can be advertised to clients
+ /// </summary>
+ public class CacheNodeAdvertisment : IEquatable<CacheNodeAdvertisment>
+ {
+ /// <summary>
+ /// The endpoint for clients to connect to to access the cache
+ /// </summary>
+ [JsonIgnore]
+ public Uri? ConnectEndpoint { get; set; }
+
+ /// <summary>
+ /// Gets the address for clients to connect to to discover other discovertable nodes
+ /// </summary>
+ [JsonIgnore]
+ public Uri? DiscoveryEndpoint { get; set; }
+
+ /// <summary>
+ /// Gets the unique identifier for this node
+ /// </summary>
+ [JsonPropertyName("iss")]
+ public string NodeId { get; set; }
+
+ [JsonPropertyName("url")]
+ public string? url
+ {
+ get => ConnectEndpoint?.ToString();
+ set => ConnectEndpoint = value == null ? null : new Uri(value);
+ }
+
+ [JsonPropertyName("dis")]
+ public string? dis
+ {
+ get => DiscoveryEndpoint?.ToString();
+ set => DiscoveryEndpoint = value == null ? null : new Uri(value);
+ }
+
+ /// <summary>
+ /// Determines if the given node is equal to this node, by comparing the node ids
+ /// </summary>
+ /// <param name="obj">The other node advertisment to compare</param>
+ /// <returns>True if the nodes are equal, false otherwise</returns>
+ public override bool Equals(object? obj) => obj is CacheNodeAdvertisment ad && Equals(ad);
+
+ /// <summary>
+ /// Gets the hash code for this node, based on the node id
+ /// </summary>
+ /// <returns>The instance hash-code</returns>
+ public override int GetHashCode() => string.GetHashCode(NodeId, StringComparison.OrdinalIgnoreCase);
+
+ /// <summary>
+ /// Determines if the given node is equal to this node, by comparing the node ids
+ /// </summary>
+ /// <param name="other">The other node advertisment to compare</param>
+ /// <returns>True if the nodes are equal, false otherwise</returns>
+ public bool Equals(CacheNodeAdvertisment? other) => string.Equals(NodeId, other?.NodeId, StringComparison.OrdinalIgnoreCase);
+
+ /// <summary>
+ /// Formats a string representation of this node
+ /// </summary>
+ /// <returns>The formatted information string</returns>
+ public override string ToString()
+ {
+ return $"NodeId: {NodeId} Connect: {url}, Discover?: {dis}";
+ }
+ }
+}
diff --git a/lib/VNLib.Data.Caching.Extensions/src/CacheNodeConfiguration.cs b/lib/VNLib.Data.Caching.Extensions/src/Clustering/CacheNodeConfiguration.cs
index 29a763c..6b7ab48 100644
--- a/lib/VNLib.Data.Caching.Extensions/src/CacheNodeConfiguration.cs
+++ b/lib/VNLib.Data.Caching.Extensions/src/Clustering/CacheNodeConfiguration.cs
@@ -24,13 +24,13 @@
using System;
-namespace VNLib.Data.Caching.Extensions
+namespace VNLib.Data.Caching.Extensions.Clustering
{
/// <summary>
/// A cache configuration for cache servers (nodes)
/// </summary>
- public class CacheNodeConfiguration: CacheClientConfiguration, ICacheNodeAdvertisment
+ public class CacheNodeConfiguration : CacheClientConfiguration
{
/// <summary>
/// The address for clients to connect to
@@ -49,6 +49,22 @@ namespace VNLib.Data.Caching.Extensions
public Uri? DiscoveryEndpoint { get; private set; }
/// <summary>
+ /// Gets the configuration for this node as an advertisment
+ /// </summary>
+ public CacheNodeAdvertisment Advertisment
+ {
+ get
+ {
+ return new CacheNodeAdvertisment()
+ {
+ DiscoveryEndpoint = DiscoveryEndpoint,
+ ConnectEndpoint = ConnectEndpoint,
+ NodeId = NodeId
+ };
+ }
+ }
+
+ /// <summary>
/// Sets the full address of our cache endpoint for clients to connect to
/// </summary>
/// <param name="connectUri">The uri clients will attempt to connect to</param>
@@ -80,9 +96,13 @@ namespace VNLib.Data.Caching.Extensions
/// <param name="nodeId">The cluster node id of the current server</param>
/// <returns>Chainable fluent object</returns>
/// <exception cref="ArgumentNullException"></exception>
- public CacheClientConfiguration WithNodeId(string nodeId)
+ public CacheNodeConfiguration WithNodeId(string nodeId)
{
NodeId = nodeId ?? throw new ArgumentNullException(nameof(nodeId));
+
+ //Update the node id in the node collection
+ (NodeCollection as NodeDiscoveryCollection)!.SetSelfId(nodeId);
+
return this;
}
diff --git a/lib/VNLib.Data.Caching.Extensions/src/ICacheDiscoveryErrorHandler.cs b/lib/VNLib.Data.Caching.Extensions/src/Clustering/ICacheDiscoveryErrorHandler.cs
index 3493d48..984ce3d 100644
--- a/lib/VNLib.Data.Caching.Extensions/src/ICacheDiscoveryErrorHandler.cs
+++ b/lib/VNLib.Data.Caching.Extensions/src/Clustering/ICacheDiscoveryErrorHandler.cs
@@ -24,7 +24,7 @@
using System;
-namespace VNLib.Data.Caching.Extensions
+namespace VNLib.Data.Caching.Extensions.Clustering
{
/// <summary>
/// Represents an type that will handle errors that occur during the discovery process
@@ -36,6 +36,6 @@ namespace VNLib.Data.Caching.Extensions
/// </summary>
/// <param name="errorNode">The node that the error occured on</param>
/// <param name="ex">The exception that caused the invocation</param>
- void OnDiscoveryError(ICacheNodeAdvertisment errorNode, Exception ex);
+ void OnDiscoveryError(CacheNodeAdvertisment errorNode, Exception ex);
}
}
diff --git a/lib/VNLib.Data.Caching.Extensions/src/INodeDiscoveryCollection.cs b/lib/VNLib.Data.Caching.Extensions/src/Clustering/INodeDiscoveryCollection.cs
index 9adebdc..1f4d154 100644
--- a/lib/VNLib.Data.Caching.Extensions/src/INodeDiscoveryCollection.cs
+++ b/lib/VNLib.Data.Caching.Extensions/src/Clustering/INodeDiscoveryCollection.cs
@@ -25,7 +25,7 @@
using System.Collections.Generic;
-namespace VNLib.Data.Caching.Extensions
+namespace VNLib.Data.Caching.Extensions.Clustering
{
/// <summary>
/// Represents a collection of discovered nodes
@@ -45,13 +45,13 @@ namespace VNLib.Data.Caching.Extensions
/// </summary>
/// <param name="initialPeers">An initial collection of peers to add to the enumeration</param>
/// <returns>An enumerator that simplifies discovery of unique nodes</returns>
- INodeDiscoveryEnumerator BeginDiscovery(IEnumerable<ICacheNodeAdvertisment> initialPeers);
+ INodeDiscoveryEnumerator BeginDiscovery(IEnumerable<CacheNodeAdvertisment> initialPeers);
/// <summary>
/// Gets a snapshot of all discovered nodes in the current collection.
/// </summary>
/// <returns>The current collection of notes</returns>
- ICacheNodeAdvertisment[] GetAllNodes();
+ CacheNodeAdvertisment[] GetAllNodes();
/// <summary>
/// Completes a discovery process and updates the collection with the results
diff --git a/lib/VNLib.Data.Caching.Extensions/src/INodeDiscoveryEnumerator.cs b/lib/VNLib.Data.Caching.Extensions/src/Clustering/INodeDiscoveryEnumerator.cs
index f6d5f40..677088a 100644
--- a/lib/VNLib.Data.Caching.Extensions/src/INodeDiscoveryEnumerator.cs
+++ b/lib/VNLib.Data.Caching.Extensions/src/Clustering/INodeDiscoveryEnumerator.cs
@@ -25,18 +25,17 @@
using System.Collections.Generic;
-
-namespace VNLib.Data.Caching.Extensions
+namespace VNLib.Data.Caching.Extensions.Clustering
{
/// <summary>
/// A custom enumerator for the node discovery process
/// </summary>
- public interface INodeDiscoveryEnumerator : IEnumerator<ICacheNodeAdvertisment>
+ public interface INodeDiscoveryEnumerator : IEnumerator<CacheNodeAdvertisment>
{
/// <summary>
/// Adds the specified peer to the collection of discovered peers
/// </summary>
/// <param name="discoveredPeers">The peer collection</param>
- void OnPeerDiscoveryComplete(IEnumerable<ICacheNodeAdvertisment> discoveredPeers);
+ void OnPeerDiscoveryComplete(IEnumerable<CacheNodeAdvertisment> discoveredPeers);
}
}
diff --git a/lib/VNLib.Data.Caching.Extensions/src/NodeDiscoveryCollection.cs b/lib/VNLib.Data.Caching.Extensions/src/Clustering/NodeDiscoveryCollection.cs
index 305f5de..b0e53e1 100644
--- a/lib/VNLib.Data.Caching.Extensions/src/NodeDiscoveryCollection.cs
+++ b/lib/VNLib.Data.Caching.Extensions/src/Clustering/NodeDiscoveryCollection.cs
@@ -3,9 +3,9 @@
*
* Library: VNLib
* Package: VNLib.Data.Caching.Extensions
-* File: INodeDiscoveryCollection.cs
+* File: NodeDiscoveryCollection.cs
*
-* INodeDiscoveryCollection.cs is part of VNLib.Data.Caching.Extensions which is part of the larger
+* NodeDiscoveryCollection.cs is part of VNLib.Data.Caching.Extensions which is part of the larger
* VNLib collection of libraries and utilities.
*
* VNLib.Data.Caching.Extensions is free software: you can redistribute it and/or modify
@@ -24,17 +24,18 @@
using System;
using System.Linq;
-using System.Collections.Generic;
using System.Collections;
+using System.Collections.Generic;
-namespace VNLib.Data.Caching.Extensions
+namespace VNLib.Data.Caching.Extensions.Clustering
{
/// <summary>
/// Represents a collection of available cache nodes from a discovery process
/// </summary>
public sealed class NodeDiscoveryCollection : INodeDiscoveryCollection
{
- private LinkedList<ICacheNodeAdvertisment> _peers;
+ private string? _selfId;
+ private LinkedList<CacheNodeAdvertisment> _peers;
/// <summary>
/// Initializes a new empty <see cref="NodeDiscoveryCollection"/>
@@ -44,17 +45,39 @@ namespace VNLib.Data.Caching.Extensions
_peers = new();
}
+ /// <summary>
+ /// Manually adds nodes to the collection that were not discovered through the discovery process
+ /// </summary>
+ /// <param name="nodes">The nodes to add</param>
+ public void AddManualNodes(IEnumerable<CacheNodeAdvertisment> nodes)
+ {
+ //Get only the nodes that are not already in the collection
+ IEnumerable<CacheNodeAdvertisment> newPeers = nodes.Except(_peers);
+
+ //Add them to the end of the collection
+ foreach (CacheNodeAdvertisment peer in newPeers)
+ {
+ _peers.AddLast(peer);
+ }
+ }
+
+ /// <summary>
+ /// Sets the id of the current node, so it can be excluded from discovery
+ /// </summary>
+ /// <param name="selfId">The id of the current node to exclude</param>
+ public void SetSelfId(string? selfId) => _selfId = selfId;
+
///<inheritdoc/>
public INodeDiscoveryEnumerator BeginDiscovery()
{
- return new NodeEnumerator(new());
+ return new NodeEnumerator(new(), _selfId);
}
///<inheritdoc/>
- public INodeDiscoveryEnumerator BeginDiscovery(IEnumerable<ICacheNodeAdvertisment> initialPeers)
+ public INodeDiscoveryEnumerator BeginDiscovery(IEnumerable<CacheNodeAdvertisment> initialPeers)
{
//Init new enumerator with the initial peers
- return new NodeEnumerator(new(initialPeers));
+ return new NodeEnumerator(new(initialPeers), _selfId);
}
///<inheritdoc/>
@@ -67,38 +90,54 @@ namespace VNLib.Data.Caching.Extensions
}
///<inheritdoc/>
- public ICacheNodeAdvertisment[] GetAllNodes()
+ public CacheNodeAdvertisment[] GetAllNodes()
{
//Capture all current peers
return _peers.ToArray();
}
- private sealed record class NodeEnumerator(LinkedList<ICacheNodeAdvertisment> Peers) : INodeDiscoveryEnumerator
+ private sealed record class NodeEnumerator(LinkedList<CacheNodeAdvertisment> Peers, string? SelfNodeId) : INodeDiscoveryEnumerator
{
+ private bool isInit;
+
//Keep track of the current node in the collection so we can move down the list
- private LinkedListNode<ICacheNodeAdvertisment>? _currentNode = Peers.First;
+ private LinkedListNode<CacheNodeAdvertisment>? _currentNode;
- public ICacheNodeAdvertisment Current => _currentNode?.Value;
+ public CacheNodeAdvertisment Current => _currentNode?.Value;
object IEnumerator.Current => _currentNode?.Value;
///<inheritdoc/>
public bool MoveNext()
{
- //Move to the next peer in the collection
- _currentNode = _currentNode?.Next;
+ if (!isInit)
+ {
+ _currentNode = Peers.First;
+ isInit = true;
+ }
+ else
+ {
+ //Move to the next peer in the collection
+ _currentNode = _currentNode?.Next;
+ }
return _currentNode?.Value != null;
}
///<inheritdoc/>
- public void OnPeerDiscoveryComplete(IEnumerable<ICacheNodeAdvertisment> discoveredPeers)
+ public void OnPeerDiscoveryComplete(IEnumerable<CacheNodeAdvertisment> discoveredPeers)
{
- //Get only the peers from the discovery that are not already in the collection
- IEnumerable<ICacheNodeAdvertisment> newPeers = discoveredPeers.Except(Peers);
+ //Get only the peers from the discovery that are not already in the collection, or ourselves
+ IEnumerable<CacheNodeAdvertisment> newPeers = discoveredPeers.Except(Peers);
+
+ if (!string.IsNullOrWhiteSpace(SelfNodeId))
+ {
+ //remove ourselves from the list
+ newPeers = newPeers.Where(p => !SelfNodeId.Equals(p.NodeId, StringComparison.OrdinalIgnoreCase));
+ }
//Add them to the end of the collection
- foreach (ICacheNodeAdvertisment ad in newPeers)
+ foreach (CacheNodeAdvertisment ad in newPeers)
{
Peers.AddLast(ad);
}
diff --git a/lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs b/lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs
index 634b6de..d53431a 100644
--- a/lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs
+++ b/lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs
@@ -24,7 +24,6 @@
using System;
using System.Net;
-using System.Text;
using System.Linq;
using System.Security;
using System.Text.Json;
@@ -32,26 +31,24 @@ using System.Threading;
using System.Threading.Tasks;
using System.Collections.Generic;
using System.Security.Cryptography;
-using System.Text.Json.Serialization;
using System.Runtime.CompilerServices;
using RestSharp;
-using VNLib.Net.Http;
using VNLib.Hashing;
using VNLib.Hashing.IdentityUtility;
using VNLib.Utils.Memory;
using VNLib.Utils.Logging;
using VNLib.Utils.Extensions;
-using VNLib.Net.Rest.Client;
using VNLib.Net.Messaging.FBM;
using VNLib.Net.Messaging.FBM.Client;
-using ContentType = VNLib.Net.Http.ContentType;
-
+using VNLib.Net.Rest.Client.Construction;
+using VNLib.Data.Caching.Extensions.ApiModel;
+using VNLib.Data.Caching.Extensions.Clustering;
namespace VNLib.Data.Caching.Extensions
{
-
+
/// <summary>
/// Provides extension methods for FBM data caching using
/// cache servers and brokers
@@ -78,14 +75,18 @@ namespace VNLib.Data.Caching.Extensions
/// </summary>
public const string X_NODE_DISCOVERY_HEADER = "X-Cache-Node-Discovery";
- private static readonly RestClientPool ClientPool = new(2,new RestClientOptions()
+ /*
+ * Lazy to defer errors for debuggong
+ */
+ private static readonly Lazy<CacheSiteAdapter> SiteAdapter = new(() => ConfigureAdapter(2));
+
+ private static CacheSiteAdapter ConfigureAdapter(int maxClients)
{
- MaxTimeout = 10 * 1000,
- FollowRedirects = false,
- Encoding = Encoding.UTF8,
- AutomaticDecompression = DecompressionMethods.All,
- ThrowOnAnyError = true,
- });
+ CacheSiteAdapter adapter = new(maxClients);
+ //Configure the site endpoints
+ adapter.BuildEndpoints(ServiceEndpoints.Definition);
+ return adapter;
+ }
private static readonly ConditionalWeakTable<FBMClient, CacheClientConfiguration> ClientCacheConfig = new();
@@ -138,29 +139,85 @@ namespace VNLib.Data.Caching.Extensions
{
client.Config.DebugLog?.Debug("{debug}: {data}", "[CACHE]", message);
}
-
/// <summary>
- /// Discovers ALL possible cache nodes itteritivley from the current collection of initial peers.
+ /// Discovers ALL possible cache nodes itteritivley, first by collecting the configuration
+ /// from the initial peers.
/// This will make connections to all discoverable servers
/// </summary>
/// <param name="config"></param>
/// <param name="cancellation">A token to cancel the operation</param>
/// <returns></returns>
/// <exception cref="ArgumentException"></exception>
+ /// <exception cref="CacheDiscoveryFailureException"></exception>
public static async Task DiscoverNodesAsync(this CacheClientConfiguration config, CancellationToken cancellation)
{
//Make sure at least one node defined
- if(config?.InitialPeers == null || config.InitialPeers.Length == 0)
+ if(config?.WellKnownNodes == null || config.WellKnownNodes.Length == 0)
+ {
+ throw new ArgumentException("There must be at least one cache node defined in the client configuration");
+ }
+
+ //Get the initial advertisments that arent null
+ CacheNodeAdvertisment[] initialPeers = await ResolveWellKnownAsync(config, cancellation);
+
+ if (initialPeers.Length == 0)
+ {
+ throw new CacheDiscoveryFailureException("There must be at least one available cache node to continue discovery");
+ }
+
+ await DiscoverNodesAsync(config, initialPeers, cancellation);
+ }
+
+ /// <summary>
+ /// Resolves the initial well-known cache nodes into their advertisments
+ /// </summary>
+ /// <param name="config"></param>
+ /// <param name="cancellation">A token to cancel the operation</param>
+ /// <returns>An array of resolved nodes</returns>
+ public static async Task<CacheNodeAdvertisment[]> ResolveWellKnownAsync(this CacheClientConfiguration config, CancellationToken cancellation)
+ {
+ _ = config?.WellKnownNodes ?? throw new ArgumentNullException(nameof(config));
+
+ Task<CacheNodeAdvertisment?>[] initialAdds = new Task<CacheNodeAdvertisment?>[config.WellKnownNodes.Length];
+
+ //Discover initial advertisments from well-known addresses
+ for (int i = 0; i < config.WellKnownNodes.Length; i++)
{
- throw new ArgumentException("There must be at least one cache server defined in the client configuration");
+ initialAdds[i] = DiscoverNodeConfigAsync(config.WellKnownNodes[i], config, cancellation);
}
+ //Wait for all initial adds to complete
+ await Task.WhenAll(initialAdds);
+
+ //Get the initial advertisments that arent null
+ return initialAdds.Select(static x => x.Result!).Where(static s => s != null).ToArray();
+ }
+
+ /// <summary>
+ /// Discovers ALL possible cache nodes itteritivley from the current collection of initial peers.
+ /// This will make connections to all discoverable servers and update the client configuration, with all
+ /// discovered peers
+ /// </summary>
+ /// <param name="config"></param>
+ /// <param name="initialPeers">Accepts an array of initial peers to override the endpoint discovery process</param>
+ /// <param name="cancellation">A token to cancel the operation</param>
+ /// <returns>A task that completes when all nodes have been discovered</returns>
+ /// <exception cref="ArgumentException"></exception>
+ /// <exception cref="CacheDiscoveryFailureException"></exception>
+ public static async Task DiscoverNodesAsync(this CacheClientConfiguration config, CacheNodeAdvertisment[] initialPeers, CancellationToken cancellation)
+ {
+ //Make sure at least one node defined
+ if (initialPeers == null || initialPeers.Length == 0)
+ {
+ throw new ArgumentException("There must be at least one initial peer");
+ }
+
//Get the discovery enumerator with the initial peers
- INodeDiscoveryEnumerator enumerator = config.NodeCollection.BeginDiscovery(config.InitialPeers);
+ INodeDiscoveryEnumerator enumerator = config.NodeCollection.BeginDiscovery(initialPeers);
//Start the discovery process
- await DiscoverNodesAsync(enumerator, config.AuthManager, config.ErrorHandler, cancellation);
+ await DiscoverNodesAsync(enumerator, config, config.ErrorHandler, cancellation);
//Commit nodes
config.NodeCollection.CompleteDiscovery(enumerator);
@@ -168,7 +225,7 @@ namespace VNLib.Data.Caching.Extensions
private static async Task DiscoverNodesAsync(
INodeDiscoveryEnumerator enumerator,
- ICacheAuthManager auth,
+ CacheClientConfiguration config,
ICacheDiscoveryErrorHandler? errHandler,
CancellationToken cancellation
)
@@ -184,17 +241,17 @@ namespace VNLib.Data.Caching.Extensions
}
/*
- * We are allowed to save nodes that do not have a discovery endpoint, but we cannot discover nodes from them
- * we can only use them as cache
+ * We are allowed to save nodes that do not have a discovery endpoint, but we cannot
+ * discover nodes from them we can only use them as cache
*/
- //add a random delay to avoid spamming the server
- await Task.Delay((int)Random.Shared.NextInt64(50, 500), cancellation);
+ //add a random delay to avoid spamming servers
+ await Task.Delay((int)Random.Shared.NextInt64(100, 500), cancellation);
try
{
//Discover nodes from the current node
- ICacheNodeAdvertisment[]? nodes = await GetCacheNodesAsync(enumerator.Current, auth, cancellation);
+ CacheNodeAdvertisment[]? nodes = await GetCacheNodesAsync(enumerator.Current, config, cancellation);
if (nodes != null)
{
@@ -208,78 +265,77 @@ namespace VNLib.Data.Caching.Extensions
//Handle the error
errHandler.OnDiscoveryError(enumerator.Current, ex);
}
+ catch(Exception ex)
+ {
+ throw new CacheDiscoveryFailureException($"Failed to discovery peer node {enumerator.Current?.NodeId}, cannot continue", ex);
+ }
+ }
+ }
+
+ private static async Task<CacheNodeAdvertisment?> DiscoverNodeConfigAsync(Uri serverUri, CacheClientConfiguration config, CancellationToken cancellation)
+ {
+ try
+ {
+ GetConfigRequest req = new (serverUri, config);
+
+ //Site adapter verifies response messages so we dont need to check on the response
+ byte[] data = await SiteAdapter.Value.ExecuteAsync(req, cancellation).AsBytes()
+ ?? throw new CacheDiscoveryFailureException($"No data returned from desired cache node");
+
+ //Response is jwt
+ using JsonWebToken responseJwt = JsonWebToken.ParseRaw(data);
+
+ //The entire payload is just the single serialzed advertisment
+ using JsonDocument doc = responseJwt.GetPayload();
+
+ return doc.RootElement.GetProperty("sub").Deserialize<CacheNodeAdvertisment>();
+ }
+ //Bypass cdfe when error handler is null
+ catch(CacheDiscoveryFailureException) when(config.ErrorHandler == null)
+ {
+ throw;
+ }
+ //Catch exceptions when an error handler is defined
+ catch (Exception ex) when (config.ErrorHandler != null)
+ {
+ //Handle the error
+ config.ErrorHandler.OnDiscoveryError(null!, ex);
+ return null;
+ }
+ catch (Exception ex)
+ {
+ throw new CacheDiscoveryFailureException("Failed to discover node configuration", ex);
}
}
/// <summary>
- /// Contacts the cache broker to get a list of active servers to connect to
+ /// Contacts the given server's discovery endpoint to discover a list of available
+ /// servers we can connect to
/// </summary>
/// <param name="advert">An advertisment of a server to discover other nodes from</param>
/// <param name="cancellationToken">A token to cancel the operationS</param>
- /// <param name="auth">The authentication manager</param>
+ /// <param name="config">The cache configuration object</param>
/// <returns>The list of active servers</returns>
/// <exception cref="SecurityException"></exception>
/// <exception cref="ArgumentException"></exception>
/// <exception cref="ArgumentNullException"></exception>
- public static async Task<ICacheNodeAdvertisment[]?> GetCacheNodesAsync(ICacheNodeAdvertisment advert, ICacheAuthManager auth, CancellationToken cancellationToken = default)
+ public static async Task<CacheNodeAdvertisment[]?> GetCacheNodesAsync(CacheNodeAdvertisment advert, CacheClientConfiguration config, CancellationToken cancellationToken = default)
{
_ = advert ?? throw new ArgumentNullException(nameof(advert));
- _ = auth ?? throw new ArgumentNullException(nameof(auth));
+ _ = config ?? throw new ArgumentNullException(nameof(config));
_ = advert.DiscoveryEndpoint ?? throw new ArgumentException("Advertisment does not expose an advertisment endpoint");
- string jwtBody;
-
- //Build request jwt
- using (JsonWebToken requestJwt = new())
- {
- requestJwt.WriteHeader(auth.GetJwtHeader());
- requestJwt.InitPayloadClaim()
- .AddClaim("iat", DateTimeOffset.UtcNow.ToUnixTimeMilliseconds())
- .AddClaim("nonce", RandomHash.GetRandomBase32(16))
- .CommitClaims();
-
- //sign the jwt
- auth.SignJwt(requestJwt);
-
- //Compile the jwt
- jwtBody = requestJwt.Compile();
- }
-
- //New list request
- RestRequest listRequest = new(advert.DiscoveryEndpoint, Method.Post);
-
- //Add the jwt as a string to the request body
- listRequest.AddStringBody(jwtBody, DataFormat.None);
- listRequest.AddHeader("Accept", HttpHelpers.GetContentTypeString(ContentType.Text));
- listRequest.AddHeader("Content-Type", HttpHelpers.GetContentTypeString(ContentType.Text));
-
- byte[] data;
-
- //Rent client
- using (ClientContract client = ClientPool.Lease())
- {
- //Exec list request
- RestResponse response = await client.Resource.ExecuteAsync(listRequest, cancellationToken);
-
- if (!response.IsSuccessful)
- {
- throw response.ErrorException!;
- }
+ DiscoveryRequest req = new (advert.DiscoveryEndpoint, config);
- data = response.RawBytes ?? throw new InvalidOperationException("No data returned from broker");
- }
+ //Site adapter verifies response messages so we dont need to check on the response
+ byte[] data = await SiteAdapter.Value.ExecuteAsync(req, cancellationToken).AsBytes()
+ ?? throw new InvalidOperationException($"No data returned from node {advert.NodeId}");
//Response is jwt
using JsonWebToken responseJwt = JsonWebToken.ParseRaw(data);
- //Verify the jwt
- if (!auth.VerifyJwt(responseJwt))
- {
- throw new SecurityException("Failed to verify the broker's challenge, cannot continue");
- }
-
using JsonDocument doc = responseJwt.GetPayload();
- return doc.RootElement.GetProperty("peers").Deserialize<Advertisment[]>();
+ return doc.RootElement.GetProperty("peers").Deserialize<CacheNodeAdvertisment[]>();
}
/// <summary>
@@ -313,7 +369,6 @@ namespace VNLib.Data.Caching.Extensions
ClientCacheConfig.AddOrUpdate(client, nodeConfig);
return nodeConfig;
}
-
/// <summary>
/// Waits for the client to disconnect from the server while observing
@@ -322,6 +377,8 @@ namespace VNLib.Data.Caching.Extensions
/// </summary>
/// <param name="client"></param>
/// <param name="token">A token to cancel the connection to the server</param>
+ /// <exception cref="TaskCanceledException"></exception>
+ /// <exception cref="ObjectDisposedException"></exception>
/// <returns>A task that complets when the connecion has been closed successfully</returns>
public static async Task WaitForExitAsync(this FBMClient client, CancellationToken token = default)
{
@@ -341,8 +398,26 @@ namespace VNLib.Data.Caching.Extensions
//Normal try to disconnect the socket
await client.DisconnectAsync(CancellationToken.None);
- //Notify if cancelled
- token.ThrowIfCancellationRequested();
+ //If the cancellation is completed, throw a task cancelled exception
+ if (cancellation.IsCompleted)
+ {
+ throw new TaskCanceledException("The client disconnected because the connection was cancelled");
+ }
+ }
+
+ /// <summary>
+ /// Discovers all available nodes for the current client config
+ /// </summary>
+ /// <param name="client"></param>
+ /// <param name="cancellation">A token to cancel the operation</param>
+ /// <returns>A task that completes when all nodes have been discovered</returns>
+ public static Task DiscoverAvailableNodesAsync(this FBMClient client, CancellationToken cancellation = default)
+ {
+ //Get stored config
+ CacheClientConfiguration conf = ClientCacheConfig.GetOrCreateValue(client);
+
+ //Discover all nodes
+ return conf.DiscoverNodesAsync(cancellation);
}
/// <summary>
@@ -357,16 +432,16 @@ namespace VNLib.Data.Caching.Extensions
/// <exception cref="ArgumentNullException"></exception>
/// <exception cref="SecurityException"></exception>
/// <exception cref="ObjectDisposedException"></exception>
- public static async Task<ICacheNodeAdvertisment> ConnectToRandomCacheAsync(this FBMClient client, CancellationToken cancellation = default)
+ public static async Task<CacheNodeAdvertisment> ConnectToRandomCacheAsync(this FBMClient client, CancellationToken cancellation = default)
{
//Get stored config
CacheClientConfiguration conf = ClientCacheConfig.GetOrCreateValue(client);
//Get all available nodes, or at least the initial peers
- ICacheNodeAdvertisment[]? adverts = conf.NodeCollection.GetAllNodes() ?? conf.InitialPeers ?? throw new ArgumentException("No cache nodes discovered, cannot connect");
+ CacheNodeAdvertisment[]? adverts = conf.NodeCollection.GetAllNodes() ?? throw new ArgumentException("No cache nodes discovered, cannot connect");
//Select random node from all available nodes
- ICacheNodeAdvertisment randomServer = adverts.SelectRandom();
+ CacheNodeAdvertisment randomServer = adverts.SelectRandom();
//Connect to the random server
await ConnectToCacheAsync(client, randomServer, cancellation);
@@ -388,7 +463,7 @@ namespace VNLib.Data.Caching.Extensions
/// <exception cref="ArgumentNullException"></exception>
/// <exception cref="SecurityException"></exception>
/// <exception cref="ObjectDisposedException"></exception>
- public static Task ConnectToCacheAsync(this FBMClient client, ICacheNodeAdvertisment server, CancellationToken token = default)
+ public static Task ConnectToCacheAsync(this FBMClient client, CacheNodeAdvertisment server, CancellationToken token = default)
{
_ = client ?? throw new ArgumentNullException(nameof(client));
_ = server ?? throw new ArgumentNullException(nameof(server));
@@ -414,7 +489,7 @@ namespace VNLib.Data.Caching.Extensions
/// <exception cref="ArgumentNullException"></exception>
/// <exception cref="SecurityException"></exception>
/// <exception cref="ObjectDisposedException"></exception>
- public static Task ConnectToCacheAsync(this FBMClient client, ICacheNodeAdvertisment server, CacheClientConfiguration explicitConfig, CancellationToken token = default)
+ public static Task ConnectToCacheAsync(this FBMClient client, CacheNodeAdvertisment server, CacheClientConfiguration explicitConfig, CancellationToken token = default)
{
_ = client ?? throw new ArgumentNullException(nameof(client));
_ = server ?? throw new ArgumentNullException(nameof(server));
@@ -427,50 +502,10 @@ namespace VNLib.Data.Caching.Extensions
private static async Task ConnectToCacheAsync(
FBMClient client,
CacheClientConfiguration config,
- ICacheNodeAdvertisment server,
+ CacheNodeAdvertisment server,
CancellationToken token = default
)
{
- //build ws uri from the connect endpoint
- UriBuilder uriBuilder = new(server.ConnectEndpoint)
- {
- Scheme = config.UseTls ? "wss://" : "ws://"
- };
-
- string challenge = RandomHash.GetRandomBase32(24);
-
- //See if the supplied config is for a cache node
- CacheNodeConfiguration? cnc = config as CacheNodeConfiguration;
-
- string jwtMessage;
- //Init jwt for connecting to server
- using (JsonWebToken jwt = new())
- {
- jwt.WriteHeader(config.AuthManager.GetJwtHeader());
-
- //Init claim
- JwtPayload claim = jwt.InitPayloadClaim();
-
- claim.AddClaim("chl", challenge);
-
- if (!string.IsNullOrWhiteSpace(cnc?.NodeId))
- {
- /*
- * The unique node id so the other nodes know to load the
- * proper event queue for the current server
- */
- claim.AddClaim("sub", cnc.NodeId);
- }
-
- claim.CommitClaims();
-
- //Sign jwt
- config.AuthManager.SignJwt(jwt);
-
- //Compile to string
- jwtMessage = jwt.Compile();
- }
-
/*
* During a server negiation, the client makes an intial get request to the cache endpoint
* and passes some client negiation terms as a signed message to the server. The server then
@@ -479,52 +514,34 @@ namespace VNLib.Data.Caching.Extensions
* The response from the server is essentailly the 'access token'
*/
- RestRequest negotation = new(server.ConnectEndpoint, Method.Get);
- //Set the jwt auth header for negotiation
- negotation.AddHeader("Authorization", jwtMessage);
- negotation.AddHeader("Accept", HttpHelpers.GetContentTypeString(ContentType.Text));
-
client.LogDebug("Negotiating with cache server");
-
- string authToken;
-
- //rent client
- using (ClientContract clientContract = ClientPool.Lease())
- {
- //Execute the request
- RestResponse response = await clientContract.Resource.ExecuteGetAsync(negotation, token);
- response.ThrowIfError();
-
- if (response.Content == null)
- {
- throw new FBMServerNegiationException("Failed to negotiate with the server, no response");
- }
-
- //Raw content
- authToken = response.Content;
- }
+ //Create a new connection negotiation
+ NegotationRequest req = new(server.ConnectEndpoint, config);
- //Parse the jwt
- using (JsonWebToken jwt = JsonWebToken.Parse(authToken))
- {
- //Verify the jwt
- if (!config.AuthManager.VerifyJwt(jwt))
- {
- throw new SecurityException("Failed to verify the cache server's negotiation message, cannot continue");
- }
+ //Exec negotiation
+ RestResponse response = await SiteAdapter.Value.ExecuteAsync(req, token);
+ /*
+ * JWT will already be veified by the endpoint adapter, so we
+ * just need to validate the server's buffer configuration
+ */
+ using (JsonWebToken jwt = JsonWebToken.ParseRaw(response.RawBytes))
+ {
//Confirm the server's buffer configuration
- ValidateServerNegotation(client, challenge, jwt);
+ ValidateServerNegotation(client, jwt);
}
client.LogDebug("Server negotiation validated, connecting to server");
//The client authorization header is the exact response
- client.ClientSocket.Headers[HttpRequestHeader.Authorization] = authToken;
+ client.ClientSocket.Headers[HttpRequestHeader.Authorization] = response.Content!;
+
+ //See if the supplied config is for a cache node
+ CacheNodeConfiguration? cnc = config as CacheNodeConfiguration;
//Compute the signature of the upgrade token
- client.ClientSocket.Headers[X_UPGRADE_SIG_HEADER] = config.AuthManager.GetBase64UpgradeSingature(authToken);
+ client.ClientSocket.Headers[X_UPGRADE_SIG_HEADER] = config.AuthManager.GetBase64UpgradeSignature(response.Content, cnc != null);
//Check to see if adversize self is enabled
if (cnc?.BroadcastAdverisment == true)
@@ -533,11 +550,17 @@ namespace VNLib.Data.Caching.Extensions
client.ClientSocket.Headers[X_NODE_DISCOVERY_HEADER] = GetAdvertismentHeader(cnc);
}
+ //build ws uri from the connect endpoint
+ UriBuilder uriBuilder = new(server.ConnectEndpoint)
+ {
+ Scheme = config.UseTls ? "wss://" : "ws://"
+ };
+
//Connect async
await client.ConnectAsync(uriBuilder.Uri, token);
}
- private static void ValidateServerNegotation(FBMClient client, string challenge, JsonWebToken jwt)
+ private static void ValidateServerNegotation(FBMClient client, JsonWebToken jwt)
{
try
{
@@ -548,15 +571,6 @@ namespace VNLib.Data.Caching.Extensions
.EnumerateObject()
.ToDictionary(static k => k.Name, static v => v.Value);
- //get the challenge response
- string challengeResponse = args["chl"].GetString()!;
-
- //Check the challenge response
- if (!challenge.Equals(challengeResponse, StringComparison.Ordinal))
- {
- throw new FBMServerNegiationException("Failed to negotiate with the server, challenge response does not match");
- }
-
//Get the negiation values
uint recvBufSize = args[FBMClient.REQ_RECV_BUF_QUERY_ARG].GetUInt32();
uint headerBufSize = args[FBMClient.REQ_HEAD_BUF_QUERY_ARG].GetUInt32();
@@ -593,7 +607,7 @@ namespace VNLib.Data.Caching.Extensions
* compute a signature of the upgrade token and send it to the server to prove we hold the private key.
*/
- private static string GetBase64UpgradeSingature(this ICacheAuthManager man, string? token)
+ private static string GetBase64UpgradeSignature(this ICacheAuthManager man, string? token, bool isPeer)
{
//Compute hash of the token
byte[] hash = ManagedHash.ComputeHash(token, HashAlg.SHA256);
@@ -604,19 +618,7 @@ namespace VNLib.Data.Caching.Extensions
//Return the base64 string
return Convert.ToBase64String(sig);
}
-
- /// <summary>
- /// Verifies the signed auth token against the given verification key
- /// </summary>
- /// <param name="signature">The base64 signature of the token</param>
- /// <param name="token">The raw token to compute the hash of</param>
- /// <param name="nodeConfig">The node configuration</param>
- /// <returns>True if the singature matches, false otherwise</returns>
- /// <exception cref="CryptographicException"></exception>
- public static bool VerifyUpgradeToken(this CacheClientConfiguration nodeConfig, string signature, string token)
- {
- return VerifyUpgradeToken(nodeConfig.AuthManager, signature, token);
- }
+
/// <summary>
/// Verifies the signed auth token against the given verification key
@@ -624,10 +626,11 @@ namespace VNLib.Data.Caching.Extensions
/// <param name="man"></param>
/// <param name="signature">The base64 signature of the token</param>
/// <param name="token">The raw token to compute the hash of</param>
+ /// <param name="isPeer">A value that indicates if the connection is from a peer node</param>
/// <returns>True if the singature matches, false otherwise</returns>
/// <exception cref="ArgumentNullException"></exception>
/// <exception cref="CryptographicException"></exception>
- public static bool VerifyUpgradeToken(this ICacheAuthManager man, string signature, string token)
+ public static bool VerifyUpgradeToken(this ICacheAuthManager man, string signature, string token, bool isPeer)
{
_ = man ?? throw new ArgumentNullException(nameof(man));
@@ -637,7 +640,7 @@ namespace VNLib.Data.Caching.Extensions
//decode the signature
byte[] sig = Convert.FromBase64String(signature);
- return man.VerifyMessageHash(hash, HashAlg.SHA256, sig);
+ return man.VerifyMessageHash(hash, HashAlg.SHA256, sig, isPeer);
}
private static string GetAdvertismentHeader(CacheNodeConfiguration nodeConfiguration)
@@ -675,58 +678,30 @@ namespace VNLib.Data.Caching.Extensions
/// <param name="message">The advertisment message to verify</param>
/// <returns>The advertisment message if successfully verified, or null otherwise</returns>
/// <exception cref="FormatException"></exception>
- public static ICacheNodeAdvertisment? VerifyPeerAdvertisment(this ICacheAuthManager config, string message)
+ public static CacheNodeAdvertisment? VerifyPeerAdvertisment(this ICacheAuthManager config, string message)
{
using JsonWebToken jwt = JsonWebToken.Parse(message);
//Verify the signature
- if (!config.VerifyJwt(jwt))
+ if (!config.VerifyJwt(jwt, true))
{
return null;
}
//Get the payload
- return jwt.GetPayload<Advertisment>();
- }
-
+ return jwt.GetPayload<CacheNodeAdvertisment>();
+ }
/// <summary>
/// Selects a random server from a collection of active servers
/// </summary>
/// <param name="servers"></param>
/// <returns>A server selected at random</returns>
- public static ICacheNodeAdvertisment SelectRandom(this ICollection<ICacheNodeAdvertisment> servers)
+ public static CacheNodeAdvertisment SelectRandom(this ICollection<CacheNodeAdvertisment> servers)
{
//select random server
int randServer = RandomNumberGenerator.GetInt32(0, servers.Count);
return servers.ElementAt(randServer);
}
-
-
- private class Advertisment : ICacheNodeAdvertisment
- {
- [JsonIgnore]
- public Uri? ConnectEndpoint { get; set; }
-
- [JsonIgnore]
- public Uri? DiscoveryEndpoint { get; set; }
-
- [JsonPropertyName("iss")]
- public string NodeId { get; set; }
-
- [JsonPropertyName("url")]
- public string? url
- {
- get => ConnectEndpoint?.ToString();
- set => ConnectEndpoint = value == null ? null : new Uri(value);
- }
-
- [JsonPropertyName("dis")]
- public string? dis
- {
- get => DiscoveryEndpoint?.ToString();
- set => DiscoveryEndpoint = value == null ? null : new Uri(value);
- }
- }
}
}
diff --git a/lib/VNLib.Data.Caching.Extensions/src/ICacheAuthManager.cs b/lib/VNLib.Data.Caching.Extensions/src/ICacheAuthManager.cs
index e3ab868..32ae142 100644
--- a/lib/VNLib.Data.Caching.Extensions/src/ICacheAuthManager.cs
+++ b/lib/VNLib.Data.Caching.Extensions/src/ICacheAuthManager.cs
@@ -53,8 +53,9 @@ namespace VNLib.Data.Caching.Extensions
/// Verifies the given JWT
/// </summary>
/// <param name="jwt">The message to verify authenticity</param>
+ /// <param name="isPeer">A value indicating if the message is from a known node</param>
/// <returns>True of the JWT could be verified, false otherwise</returns>
- bool VerifyJwt(JsonWebToken jwt);
+ bool VerifyJwt(JsonWebToken jwt, bool isPeer);
/// <summary>
/// Signs the given message hash
@@ -70,7 +71,8 @@ namespace VNLib.Data.Caching.Extensions
/// <param name="hash">The message hash to compare</param>
/// <param name="alg">The algorithm used to produce the message hash</param>
/// <param name="signature">The message signature to verify the message against</param>
+ /// <param name="isPeer">A value indicating if the message is from a known node</param>
/// <returns>True of the signature could be verified</returns>
- bool VerifyMessageHash(ReadOnlySpan<byte> hash, HashAlg alg, ReadOnlySpan<byte> signature);
+ bool VerifyMessageHash(ReadOnlySpan<byte> hash, HashAlg alg, ReadOnlySpan<byte> signature, bool isPeer);
}
}