diff options
40 files changed, 1519 insertions, 807 deletions
diff --git a/lib/VNLib.Data.Caching.Extensions/src/ApiModel/CacheSiteAdapter.cs b/lib/VNLib.Data.Caching.Extensions/src/ApiModel/CacheSiteAdapter.cs new file mode 100644 index 0000000..99acfd5 --- /dev/null +++ b/lib/VNLib.Data.Caching.Extensions/src/ApiModel/CacheSiteAdapter.cs @@ -0,0 +1,65 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Data.Caching.Extensions +* File: CacheSiteAdapter.cs +* +* CacheSiteAdapter.cs is part of VNLib.Data.Caching.Extensions which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Data.Caching.Extensions is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Data.Caching.Extensions is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System.Net; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +using RestSharp; + +using VNLib.Net.Rest.Client; +using VNLib.Net.Rest.Client.Construction; + +namespace VNLib.Data.Caching.Extensions.ApiModel +{ + /// <summary> + /// A site adapter for cache REST api requests + /// </summary> + internal sealed class CacheSiteAdapter : RestSiteAdapterBase + { + protected override RestClientPool Pool { get; } + + public CacheSiteAdapter(int maxClients) + { + //Configure connection pool + Pool = new(maxClients, new RestClientOptions() + { + MaxTimeout = 10 * 1000, + FollowRedirects = false, + Encoding = Encoding.UTF8, + AutomaticDecompression = DecompressionMethods.All, + ThrowOnAnyError = true + }); + } + + public override void OnResponse(RestResponse response) + { } + + public override Task WaitAsync(CancellationToken cancellation = default) + { + return Task.CompletedTask; + } + } +} diff --git a/lib/VNLib.Data.Caching.Extensions/src/ActiveServer.cs b/lib/VNLib.Data.Caching.Extensions/src/ApiModel/DiscoveryRequest.cs index 3020376..22b11aa 100644 --- a/lib/VNLib.Data.Caching.Extensions/src/ActiveServer.cs +++ b/lib/VNLib.Data.Caching.Extensions/src/ApiModel/DiscoveryRequest.cs @@ -3,9 +3,9 @@ * * Library: VNLib * Package: VNLib.Data.Caching.Extensions -* File: ActiveServer.cs +* File: DiscoveryRequest.cs * -* ActiveServer.cs is part of VNLib.Data.Caching.Extensions which is part of the larger +* DiscoveryRequest.cs is part of VNLib.Data.Caching.Extensions which is part of the larger * VNLib collection of libraries and utilities. * * VNLib.Data.Caching.Extensions is free software: you can redistribute it and/or modify @@ -23,29 +23,19 @@ */ using System; -using System.Text.Json.Serialization; +using VNLib.Data.Caching.Extensions.Clustering; -namespace VNLib.Data.Caching.Extensions +namespace VNLib.Data.Caching.Extensions.ApiModel { - public class ActiveServer : ICacheNodeAdvertisment + /// <summary> + /// A request message for a discovery request + /// </summary> + /// <param name="DiscoveryUrl">The discovery endpoint to connec to</param> + /// <param name="Config">The local client configuration</param> + internal record class DiscoveryRequest(Uri DiscoveryUrl, CacheClientConfiguration Config) + : ICacheConnectionRequest { - [JsonPropertyName("address")] - public string? HostName { get; set; } - - public string? ServerId { get; set; } - [JsonPropertyName("ip_address")] - public string? Ip { get; set; } - - public Uri ConnectEndpoint { get; } - - public Uri? DiscoveryEndpoint { get; } - - [JsonPropertyName("server_id")] - public string NodeId { get; } - - ///<inheritdoc/> - public override int GetHashCode() => ServerId!.GetHashCode(StringComparison.OrdinalIgnoreCase); ///<inheritdoc/> - public override bool Equals(object? obj) => obj is ActiveServer s && GetHashCode() == s.GetHashCode(); + public string? Challenge { get; set; } } } diff --git a/lib/VNLib.Data.Caching.Extensions/src/ApiModel/GetConfigRequest.cs b/lib/VNLib.Data.Caching.Extensions/src/ApiModel/GetConfigRequest.cs new file mode 100644 index 0000000..43cef4c --- /dev/null +++ b/lib/VNLib.Data.Caching.Extensions/src/ApiModel/GetConfigRequest.cs @@ -0,0 +1,42 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Data.Caching.Extensions +* File: GetConfigRequest.cs +* +* GetConfigRequest.cs is part of VNLib.Data.Caching.Extensions which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Data.Caching.Extensions is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Data.Caching.Extensions is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using VNLib.Data.Caching.Extensions.Clustering; + +namespace VNLib.Data.Caching.Extensions.ApiModel +{ + /// <summary> + /// A request to get the cache configuration from a cache server's + /// well-known configuration endpoint + /// </summary> + /// <param name="WellKnownEp">The well-known configuration endpoint url</param> + /// <param name="Config">The client cache configuration</param> + internal record class GetConfigRequest(Uri WellKnownEp, CacheClientConfiguration Config) + : ICacheConnectionRequest + { + ///<inheritdoc/> + public string? Challenge { get; set; } + } +} diff --git a/lib/VNLib.Data.Caching.Extensions/src/ICacheNodeAdvertisment.cs b/lib/VNLib.Data.Caching.Extensions/src/ApiModel/ICacheConnectionRequest.cs index fc29955..5fb7ba7 100644 --- a/lib/VNLib.Data.Caching.Extensions/src/ICacheNodeAdvertisment.cs +++ b/lib/VNLib.Data.Caching.Extensions/src/ApiModel/ICacheConnectionRequest.cs @@ -3,9 +3,9 @@ * * Library: VNLib * Package: VNLib.Data.Caching.Extensions -* File: ICacheNodeAdvertisment.cs +* File: ICacheConnectionRequest.cs * -* ICacheNodeAdvertisment.cs is part of VNLib.Data.Caching.Extensions which is part of the larger +* ICacheConnectionRequest.cs is part of VNLib.Data.Caching.Extensions which is part of the larger * VNLib collection of libraries and utilities. * * VNLib.Data.Caching.Extensions is free software: you can redistribute it and/or modify @@ -22,29 +22,26 @@ * along with this program. If not, see https://www.gnu.org/licenses/. */ -using System; +using VNLib.Data.Caching.Extensions.Clustering; - -namespace VNLib.Data.Caching.Extensions +namespace VNLib.Data.Caching.Extensions.ApiModel { /// <summary> - /// Represents a node that can be advertised to clients + /// Represents a request to connect to a cache server. /// </summary> - public interface ICacheNodeAdvertisment + internal interface ICacheConnectionRequest { /// <summary> - /// The endpoint for clients to connect to to access the cache - /// </summary> - Uri ConnectEndpoint { get; } - - /// <summary> - /// Gets the address for clients to connect to to discover other discovertable nodes + /// The <see cref="CacheClientConfiguration"/> used to configure, authenticate, and + /// verify messages sent to and received from cache servers. /// </summary> - Uri? DiscoveryEndpoint { get; } + CacheClientConfiguration Config { get; } /// <summary> - /// Gets the unique identifier for this node + /// An optional challenge string to be used during the authentication + /// process. When set, is sent in the request JWT, and is expected to + /// be returned in the response JWT. /// </summary> - string NodeId { get; } + string? Challenge { get; set; } } } diff --git a/lib/VNLib.Data.Caching.Extensions/src/ApiModel/NegotationRequest.cs b/lib/VNLib.Data.Caching.Extensions/src/ApiModel/NegotationRequest.cs new file mode 100644 index 0000000..5842586 --- /dev/null +++ b/lib/VNLib.Data.Caching.Extensions/src/ApiModel/NegotationRequest.cs @@ -0,0 +1,41 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Data.Caching.Extensions +* File: NegotationRequest.cs +* +* NegotationRequest.cs is part of VNLib.Data.Caching.Extensions which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Data.Caching.Extensions is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Data.Caching.Extensions is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using VNLib.Data.Caching.Extensions.Clustering; + +namespace VNLib.Data.Caching.Extensions.ApiModel +{ + /// <summary> + /// A request to negotiate a new connection with a cache server + /// </summary> + /// <param name="ConnectUrl">The cache endpoint uri to connec to</param> + /// <param name="Config">The client cache configuration</param> + internal record class NegotationRequest(Uri ConnectUrl, CacheClientConfiguration Config) + : ICacheConnectionRequest + { + ///<inheritdoc/> + public string? Challenge { get; set; } + } +} diff --git a/lib/VNLib.Data.Caching.Extensions/src/ApiModel/ServiceEndpoints.cs b/lib/VNLib.Data.Caching.Extensions/src/ApiModel/ServiceEndpoints.cs new file mode 100644 index 0000000..c0de4a3 --- /dev/null +++ b/lib/VNLib.Data.Caching.Extensions/src/ApiModel/ServiceEndpoints.cs @@ -0,0 +1,176 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Data.Caching.Extensions +* File: ServiceEndpoints.cs +* +* ServiceEndpoints.cs is part of VNLib.Data.Caching.Extensions which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Data.Caching.Extensions is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Data.Caching.Extensions is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Security; +using System.Text.Json; + +using RestSharp; + +using VNLib.Net.Http; +using VNLib.Hashing; +using VNLib.Hashing.IdentityUtility; +using VNLib.Net.Rest.Client.Construction; +using ContentType = VNLib.Net.Http.ContentType; +using VNLib.Data.Caching.Extensions.Clustering; + +namespace VNLib.Data.Caching.Extensions.ApiModel +{ + /* + * Defines the cache endpoints, builds and routes request messages to the + * server enpoints. In effect defines the client api for cache services. + * + * This class also define methods for authentication and message verification + */ + + internal static class ServiceEndpoints + { + private static readonly TimeSpan MaxTimeDisparity = TimeSpan.FromSeconds(10); + + /// <summary> + /// Get the endpoint definition for cache services for the site adapter + /// </summary> + internal static IRestEndpointDefinition Definition { get; } = new EndpointBuilder(); + + private class EndpointBuilder : IRestEndpointDefinition + { + ///<inheritdoc/> + public void BuildRequest(IRestSiteAdapter site, IRestEndpointBuilder builder) + { + //Define cache service endpoints/requests + + builder.WithEndpoint<DiscoveryRequest>() + .WithUrl(e => e.DiscoveryUrl) + .WithMethod(Method.Get) + //Accept text response (it should be a jwt) + .WithHeader("Accept", HttpHelpers.GetContentTypeString(ContentType.Text)) + .WithHeader("Authorization", BuildDiscoveryAuthToken) + //Verify jwt responses + .OnResponse(VerifyJwtResponse); + + builder.WithEndpoint<NegotationRequest>() + .WithUrl(e => e.ConnectUrl) + .WithMethod(Method.Get) + //Accept text response (its should be a jwt) + .WithHeader("Accept", HttpHelpers.GetContentTypeString(ContentType.Text)) + .WithHeader("Authorization", BuildDiscoveryAuthToken) + //Verify jwt responses + .OnResponse(VerifyJwtResponse); + + //Well known endpoint does not require authentication + builder.WithEndpoint<GetConfigRequest>() + .WithUrl(gc => gc.WellKnownEp) + .WithMethod(Method.Get) + //Responses should be a signed jwt + .WithHeader("Accept", HttpHelpers.GetContentTypeString(ContentType.Text)) + //Verify jwt responses + .OnResponse(VerifyJwtResponse); + } + } + + + private static string BuildDiscoveryAuthToken(ICacheConnectionRequest request) + { + request.Challenge = RandomHash.GetRandomBase32(24); + + //Build request jwt + using JsonWebToken jwt = new(); + jwt.WriteHeader(request.Config.AuthManager.GetJwtHeader()); + + //See if the supplied config is for a cache node + CacheNodeConfiguration? cnc = request.Config as CacheNodeConfiguration; + + //Init claim + JwtPayload claim = jwt.InitPayloadClaim(); + + claim.AddClaim("chl", request.Challenge) + .AddClaim("iat", DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()); + + if (!string.IsNullOrWhiteSpace(cnc?.NodeId)) + { + /* + * The unique node id so the other nodes know to load the + * proper event queue for the current server + */ + claim.AddClaim("sub", cnc.NodeId); + } + + claim.CommitClaims(); + + //sign the jwt + request.Config.AuthManager.SignJwt(jwt); + + //Compile the jwt + return jwt.Compile(); + } + + private static void VerifyJwtResponse(ICacheConnectionRequest req, RestResponse response) + { + byte[] data = response.RawBytes ?? throw new ArgumentException("Server response was empty, cannot continue"); + + //If node config then set the is-node flag + bool isNode = req.Config is CacheNodeConfiguration; + + //Response is jwt + using JsonWebToken responseJwt = JsonWebToken.ParseRaw(data); + + //Verify the jwt + if (!req.Config.AuthManager.VerifyJwt(responseJwt, isNode)) + { + throw new SecurityException("Failed to verify the discovery server's challenge, cannot continue"); + } + + //get payload as a document + using JsonDocument doc = responseJwt.GetPayload(); + + //Verify iat times + long iatSec = doc.RootElement.GetProperty("iat").GetInt64(); + + //Get dto + DateTimeOffset iat = DateTimeOffset.FromUnixTimeSeconds(iatSec); + + DateTimeOffset now = DateTimeOffset.UtcNow; + + //Verify iat is not before or after the current time with the disparity + if (iat.Add(MaxTimeDisparity) < now || iat.Subtract(MaxTimeDisparity) > now) + { + throw new SecurityException("Server returned a request that has expired. Please check your system clock"); + } + + //If a challenge is set, verify it + if (req.Challenge != null) + { + //Verify challenge + string challenge = doc.RootElement.GetProperty("chl").GetString() + ?? throw new SecurityException("Server did not return a challenge"); + + if (!challenge.Equals(req.Challenge, StringComparison.Ordinal)) + { + throw new SecurityException("Server returned an invalid challenge"); + } + } + } + + } +} diff --git a/lib/VNLib.Data.Caching.Extensions/src/CacheClientConfiguration.cs b/lib/VNLib.Data.Caching.Extensions/src/Clustering/CacheClientConfiguration.cs index 9229c89..1c1997e 100644 --- a/lib/VNLib.Data.Caching.Extensions/src/CacheClientConfiguration.cs +++ b/lib/VNLib.Data.Caching.Extensions/src/Clustering/CacheClientConfiguration.cs @@ -22,11 +22,14 @@ * along with this program. If not, see https://www.gnu.org/licenses/. */ +using System; using System.Linq; using System.Collections.Generic; -namespace VNLib.Data.Caching.Extensions + +namespace VNLib.Data.Caching.Extensions.Clustering { + /// <summary> /// A fluent api configuration object for configuring a <see cref="FBMClient"/> /// to connect to cache servers. @@ -53,7 +56,7 @@ namespace VNLib.Data.Caching.Extensions /// </summary> public bool UseTls { get; private set; } - internal ICacheNodeAdvertisment[]? InitialPeers { get; set; } + internal Uri[]? WellKnownNodes { get; set; } /// <summary> /// Specifies the JWT authentication manager to use for signing and verifying JWTs @@ -73,7 +76,7 @@ namespace VNLib.Data.Caching.Extensions public CacheClientConfiguration WithTls(bool useTls) { UseTls = useTls; - return this; + return this; } /// <summary> @@ -81,9 +84,20 @@ namespace VNLib.Data.Caching.Extensions /// </summary> /// <param name="peers">The collection of servers to discover peers from and connect to</param> /// <returns>Chainable fluent object</returns> - public CacheClientConfiguration WithInitialPeers(IEnumerable<ICacheNodeAdvertisment> peers) + public CacheClientConfiguration WithInitialPeers(IEnumerable<Uri> peers) { - InitialPeers = peers.ToArray(); + //Check null + _ = peers ?? throw new ArgumentNullException(nameof(peers)); + + //Store peer array + WellKnownNodes = peers.ToArray(); + + if (WellKnownNodes.Any(p => !p.IsAbsoluteUri)) + { + WellKnownNodes = null; + throw new ArgumentException("All discoverable node uris must be in absolute form"); + } + return this; } diff --git a/lib/VNLib.Data.Caching.Extensions/src/Clustering/CacheDiscoveryFailureException.cs b/lib/VNLib.Data.Caching.Extensions/src/Clustering/CacheDiscoveryFailureException.cs new file mode 100644 index 0000000..84d611e --- /dev/null +++ b/lib/VNLib.Data.Caching.Extensions/src/Clustering/CacheDiscoveryFailureException.cs @@ -0,0 +1,44 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Data.Caching.Extensions +* File: CacheDiscoveryFailureException.cs +* +* CacheDiscoveryFailureException.cs is part of VNLib.Data.Caching.Extensions which is +* part of the larger VNLib collection of libraries and utilities. +* +* VNLib.Data.Caching.Extensions is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Data.Caching.Extensions is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; + + +namespace VNLib.Data.Caching.Extensions.Clustering +{ + /// <summary> + /// Raised when an error occurs during cache discovery + /// </summary> + public class CacheDiscoveryFailureException : Exception + { + public CacheDiscoveryFailureException(string message) : base(message) + { } + + public CacheDiscoveryFailureException(string message, Exception innerException) : base(message, innerException) + { } + + public CacheDiscoveryFailureException() + { } + } +} diff --git a/lib/VNLib.Data.Caching.Extensions/src/Clustering/CacheNodeAdvertisment.cs b/lib/VNLib.Data.Caching.Extensions/src/Clustering/CacheNodeAdvertisment.cs new file mode 100644 index 0000000..e81d506 --- /dev/null +++ b/lib/VNLib.Data.Caching.Extensions/src/Clustering/CacheNodeAdvertisment.cs @@ -0,0 +1,96 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Data.Caching.Extensions +* File: CacheNodeAdvertisment.cs +* +* CacheNodeAdvertisment.cs is part of VNLib.Data.Caching.Extensions which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Data.Caching.Extensions is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Data.Caching.Extensions is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Text.Json.Serialization; + +namespace VNLib.Data.Caching.Extensions.Clustering +{ + /// <summary> + /// Represents a node that can be advertised to clients + /// </summary> + public class CacheNodeAdvertisment : IEquatable<CacheNodeAdvertisment> + { + /// <summary> + /// The endpoint for clients to connect to to access the cache + /// </summary> + [JsonIgnore] + public Uri? ConnectEndpoint { get; set; } + + /// <summary> + /// Gets the address for clients to connect to to discover other discovertable nodes + /// </summary> + [JsonIgnore] + public Uri? DiscoveryEndpoint { get; set; } + + /// <summary> + /// Gets the unique identifier for this node + /// </summary> + [JsonPropertyName("iss")] + public string NodeId { get; set; } + + [JsonPropertyName("url")] + public string? url + { + get => ConnectEndpoint?.ToString(); + set => ConnectEndpoint = value == null ? null : new Uri(value); + } + + [JsonPropertyName("dis")] + public string? dis + { + get => DiscoveryEndpoint?.ToString(); + set => DiscoveryEndpoint = value == null ? null : new Uri(value); + } + + /// <summary> + /// Determines if the given node is equal to this node, by comparing the node ids + /// </summary> + /// <param name="obj">The other node advertisment to compare</param> + /// <returns>True if the nodes are equal, false otherwise</returns> + public override bool Equals(object? obj) => obj is CacheNodeAdvertisment ad && Equals(ad); + + /// <summary> + /// Gets the hash code for this node, based on the node id + /// </summary> + /// <returns>The instance hash-code</returns> + public override int GetHashCode() => string.GetHashCode(NodeId, StringComparison.OrdinalIgnoreCase); + + /// <summary> + /// Determines if the given node is equal to this node, by comparing the node ids + /// </summary> + /// <param name="other">The other node advertisment to compare</param> + /// <returns>True if the nodes are equal, false otherwise</returns> + public bool Equals(CacheNodeAdvertisment? other) => string.Equals(NodeId, other?.NodeId, StringComparison.OrdinalIgnoreCase); + + /// <summary> + /// Formats a string representation of this node + /// </summary> + /// <returns>The formatted information string</returns> + public override string ToString() + { + return $"NodeId: {NodeId} Connect: {url}, Discover?: {dis}"; + } + } +} diff --git a/lib/VNLib.Data.Caching.Extensions/src/CacheNodeConfiguration.cs b/lib/VNLib.Data.Caching.Extensions/src/Clustering/CacheNodeConfiguration.cs index 29a763c..6b7ab48 100644 --- a/lib/VNLib.Data.Caching.Extensions/src/CacheNodeConfiguration.cs +++ b/lib/VNLib.Data.Caching.Extensions/src/Clustering/CacheNodeConfiguration.cs @@ -24,13 +24,13 @@ using System; -namespace VNLib.Data.Caching.Extensions +namespace VNLib.Data.Caching.Extensions.Clustering { /// <summary> /// A cache configuration for cache servers (nodes) /// </summary> - public class CacheNodeConfiguration: CacheClientConfiguration, ICacheNodeAdvertisment + public class CacheNodeConfiguration : CacheClientConfiguration { /// <summary> /// The address for clients to connect to @@ -49,6 +49,22 @@ namespace VNLib.Data.Caching.Extensions public Uri? DiscoveryEndpoint { get; private set; } /// <summary> + /// Gets the configuration for this node as an advertisment + /// </summary> + public CacheNodeAdvertisment Advertisment + { + get + { + return new CacheNodeAdvertisment() + { + DiscoveryEndpoint = DiscoveryEndpoint, + ConnectEndpoint = ConnectEndpoint, + NodeId = NodeId + }; + } + } + + /// <summary> /// Sets the full address of our cache endpoint for clients to connect to /// </summary> /// <param name="connectUri">The uri clients will attempt to connect to</param> @@ -80,9 +96,13 @@ namespace VNLib.Data.Caching.Extensions /// <param name="nodeId">The cluster node id of the current server</param> /// <returns>Chainable fluent object</returns> /// <exception cref="ArgumentNullException"></exception> - public CacheClientConfiguration WithNodeId(string nodeId) + public CacheNodeConfiguration WithNodeId(string nodeId) { NodeId = nodeId ?? throw new ArgumentNullException(nameof(nodeId)); + + //Update the node id in the node collection + (NodeCollection as NodeDiscoveryCollection)!.SetSelfId(nodeId); + return this; } diff --git a/lib/VNLib.Data.Caching.Extensions/src/ICacheDiscoveryErrorHandler.cs b/lib/VNLib.Data.Caching.Extensions/src/Clustering/ICacheDiscoveryErrorHandler.cs index 3493d48..984ce3d 100644 --- a/lib/VNLib.Data.Caching.Extensions/src/ICacheDiscoveryErrorHandler.cs +++ b/lib/VNLib.Data.Caching.Extensions/src/Clustering/ICacheDiscoveryErrorHandler.cs @@ -24,7 +24,7 @@ using System; -namespace VNLib.Data.Caching.Extensions +namespace VNLib.Data.Caching.Extensions.Clustering { /// <summary> /// Represents an type that will handle errors that occur during the discovery process @@ -36,6 +36,6 @@ namespace VNLib.Data.Caching.Extensions /// </summary> /// <param name="errorNode">The node that the error occured on</param> /// <param name="ex">The exception that caused the invocation</param> - void OnDiscoveryError(ICacheNodeAdvertisment errorNode, Exception ex); + void OnDiscoveryError(CacheNodeAdvertisment errorNode, Exception ex); } } diff --git a/lib/VNLib.Data.Caching.Extensions/src/INodeDiscoveryCollection.cs b/lib/VNLib.Data.Caching.Extensions/src/Clustering/INodeDiscoveryCollection.cs index 9adebdc..1f4d154 100644 --- a/lib/VNLib.Data.Caching.Extensions/src/INodeDiscoveryCollection.cs +++ b/lib/VNLib.Data.Caching.Extensions/src/Clustering/INodeDiscoveryCollection.cs @@ -25,7 +25,7 @@ using System.Collections.Generic; -namespace VNLib.Data.Caching.Extensions +namespace VNLib.Data.Caching.Extensions.Clustering { /// <summary> /// Represents a collection of discovered nodes @@ -45,13 +45,13 @@ namespace VNLib.Data.Caching.Extensions /// </summary> /// <param name="initialPeers">An initial collection of peers to add to the enumeration</param> /// <returns>An enumerator that simplifies discovery of unique nodes</returns> - INodeDiscoveryEnumerator BeginDiscovery(IEnumerable<ICacheNodeAdvertisment> initialPeers); + INodeDiscoveryEnumerator BeginDiscovery(IEnumerable<CacheNodeAdvertisment> initialPeers); /// <summary> /// Gets a snapshot of all discovered nodes in the current collection. /// </summary> /// <returns>The current collection of notes</returns> - ICacheNodeAdvertisment[] GetAllNodes(); + CacheNodeAdvertisment[] GetAllNodes(); /// <summary> /// Completes a discovery process and updates the collection with the results diff --git a/lib/VNLib.Data.Caching.Extensions/src/INodeDiscoveryEnumerator.cs b/lib/VNLib.Data.Caching.Extensions/src/Clustering/INodeDiscoveryEnumerator.cs index f6d5f40..677088a 100644 --- a/lib/VNLib.Data.Caching.Extensions/src/INodeDiscoveryEnumerator.cs +++ b/lib/VNLib.Data.Caching.Extensions/src/Clustering/INodeDiscoveryEnumerator.cs @@ -25,18 +25,17 @@ using System.Collections.Generic; - -namespace VNLib.Data.Caching.Extensions +namespace VNLib.Data.Caching.Extensions.Clustering { /// <summary> /// A custom enumerator for the node discovery process /// </summary> - public interface INodeDiscoveryEnumerator : IEnumerator<ICacheNodeAdvertisment> + public interface INodeDiscoveryEnumerator : IEnumerator<CacheNodeAdvertisment> { /// <summary> /// Adds the specified peer to the collection of discovered peers /// </summary> /// <param name="discoveredPeers">The peer collection</param> - void OnPeerDiscoveryComplete(IEnumerable<ICacheNodeAdvertisment> discoveredPeers); + void OnPeerDiscoveryComplete(IEnumerable<CacheNodeAdvertisment> discoveredPeers); } } diff --git a/lib/VNLib.Data.Caching.Extensions/src/NodeDiscoveryCollection.cs b/lib/VNLib.Data.Caching.Extensions/src/Clustering/NodeDiscoveryCollection.cs index 305f5de..b0e53e1 100644 --- a/lib/VNLib.Data.Caching.Extensions/src/NodeDiscoveryCollection.cs +++ b/lib/VNLib.Data.Caching.Extensions/src/Clustering/NodeDiscoveryCollection.cs @@ -3,9 +3,9 @@ * * Library: VNLib * Package: VNLib.Data.Caching.Extensions -* File: INodeDiscoveryCollection.cs +* File: NodeDiscoveryCollection.cs * -* INodeDiscoveryCollection.cs is part of VNLib.Data.Caching.Extensions which is part of the larger +* NodeDiscoveryCollection.cs is part of VNLib.Data.Caching.Extensions which is part of the larger * VNLib collection of libraries and utilities. * * VNLib.Data.Caching.Extensions is free software: you can redistribute it and/or modify @@ -24,17 +24,18 @@ using System; using System.Linq; -using System.Collections.Generic; using System.Collections; +using System.Collections.Generic; -namespace VNLib.Data.Caching.Extensions +namespace VNLib.Data.Caching.Extensions.Clustering { /// <summary> /// Represents a collection of available cache nodes from a discovery process /// </summary> public sealed class NodeDiscoveryCollection : INodeDiscoveryCollection { - private LinkedList<ICacheNodeAdvertisment> _peers; + private string? _selfId; + private LinkedList<CacheNodeAdvertisment> _peers; /// <summary> /// Initializes a new empty <see cref="NodeDiscoveryCollection"/> @@ -44,17 +45,39 @@ namespace VNLib.Data.Caching.Extensions _peers = new(); } + /// <summary> + /// Manually adds nodes to the collection that were not discovered through the discovery process + /// </summary> + /// <param name="nodes">The nodes to add</param> + public void AddManualNodes(IEnumerable<CacheNodeAdvertisment> nodes) + { + //Get only the nodes that are not already in the collection + IEnumerable<CacheNodeAdvertisment> newPeers = nodes.Except(_peers); + + //Add them to the end of the collection + foreach (CacheNodeAdvertisment peer in newPeers) + { + _peers.AddLast(peer); + } + } + + /// <summary> + /// Sets the id of the current node, so it can be excluded from discovery + /// </summary> + /// <param name="selfId">The id of the current node to exclude</param> + public void SetSelfId(string? selfId) => _selfId = selfId; + ///<inheritdoc/> public INodeDiscoveryEnumerator BeginDiscovery() { - return new NodeEnumerator(new()); + return new NodeEnumerator(new(), _selfId); } ///<inheritdoc/> - public INodeDiscoveryEnumerator BeginDiscovery(IEnumerable<ICacheNodeAdvertisment> initialPeers) + public INodeDiscoveryEnumerator BeginDiscovery(IEnumerable<CacheNodeAdvertisment> initialPeers) { //Init new enumerator with the initial peers - return new NodeEnumerator(new(initialPeers)); + return new NodeEnumerator(new(initialPeers), _selfId); } ///<inheritdoc/> @@ -67,38 +90,54 @@ namespace VNLib.Data.Caching.Extensions } ///<inheritdoc/> - public ICacheNodeAdvertisment[] GetAllNodes() + public CacheNodeAdvertisment[] GetAllNodes() { //Capture all current peers return _peers.ToArray(); } - private sealed record class NodeEnumerator(LinkedList<ICacheNodeAdvertisment> Peers) : INodeDiscoveryEnumerator + private sealed record class NodeEnumerator(LinkedList<CacheNodeAdvertisment> Peers, string? SelfNodeId) : INodeDiscoveryEnumerator { + private bool isInit; + //Keep track of the current node in the collection so we can move down the list - private LinkedListNode<ICacheNodeAdvertisment>? _currentNode = Peers.First; + private LinkedListNode<CacheNodeAdvertisment>? _currentNode; - public ICacheNodeAdvertisment Current => _currentNode?.Value; + public CacheNodeAdvertisment Current => _currentNode?.Value; object IEnumerator.Current => _currentNode?.Value; ///<inheritdoc/> public bool MoveNext() { - //Move to the next peer in the collection - _currentNode = _currentNode?.Next; + if (!isInit) + { + _currentNode = Peers.First; + isInit = true; + } + else + { + //Move to the next peer in the collection + _currentNode = _currentNode?.Next; + } return _currentNode?.Value != null; } ///<inheritdoc/> - public void OnPeerDiscoveryComplete(IEnumerable<ICacheNodeAdvertisment> discoveredPeers) + public void OnPeerDiscoveryComplete(IEnumerable<CacheNodeAdvertisment> discoveredPeers) { - //Get only the peers from the discovery that are not already in the collection - IEnumerable<ICacheNodeAdvertisment> newPeers = discoveredPeers.Except(Peers); + //Get only the peers from the discovery that are not already in the collection, or ourselves + IEnumerable<CacheNodeAdvertisment> newPeers = discoveredPeers.Except(Peers); + + if (!string.IsNullOrWhiteSpace(SelfNodeId)) + { + //remove ourselves from the list + newPeers = newPeers.Where(p => !SelfNodeId.Equals(p.NodeId, StringComparison.OrdinalIgnoreCase)); + } //Add them to the end of the collection - foreach (ICacheNodeAdvertisment ad in newPeers) + foreach (CacheNodeAdvertisment ad in newPeers) { Peers.AddLast(ad); } diff --git a/lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs b/lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs index 634b6de..d53431a 100644 --- a/lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs +++ b/lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs @@ -24,7 +24,6 @@ using System; using System.Net; -using System.Text; using System.Linq; using System.Security; using System.Text.Json; @@ -32,26 +31,24 @@ using System.Threading; using System.Threading.Tasks; using System.Collections.Generic; using System.Security.Cryptography; -using System.Text.Json.Serialization; using System.Runtime.CompilerServices; using RestSharp; -using VNLib.Net.Http; using VNLib.Hashing; using VNLib.Hashing.IdentityUtility; using VNLib.Utils.Memory; using VNLib.Utils.Logging; using VNLib.Utils.Extensions; -using VNLib.Net.Rest.Client; using VNLib.Net.Messaging.FBM; using VNLib.Net.Messaging.FBM.Client; -using ContentType = VNLib.Net.Http.ContentType; - +using VNLib.Net.Rest.Client.Construction; +using VNLib.Data.Caching.Extensions.ApiModel; +using VNLib.Data.Caching.Extensions.Clustering; namespace VNLib.Data.Caching.Extensions { - + /// <summary> /// Provides extension methods for FBM data caching using /// cache servers and brokers @@ -78,14 +75,18 @@ namespace VNLib.Data.Caching.Extensions /// </summary> public const string X_NODE_DISCOVERY_HEADER = "X-Cache-Node-Discovery"; - private static readonly RestClientPool ClientPool = new(2,new RestClientOptions() + /* + * Lazy to defer errors for debuggong + */ + private static readonly Lazy<CacheSiteAdapter> SiteAdapter = new(() => ConfigureAdapter(2)); + + private static CacheSiteAdapter ConfigureAdapter(int maxClients) { - MaxTimeout = 10 * 1000, - FollowRedirects = false, - Encoding = Encoding.UTF8, - AutomaticDecompression = DecompressionMethods.All, - ThrowOnAnyError = true, - }); + CacheSiteAdapter adapter = new(maxClients); + //Configure the site endpoints + adapter.BuildEndpoints(ServiceEndpoints.Definition); + return adapter; + } private static readonly ConditionalWeakTable<FBMClient, CacheClientConfiguration> ClientCacheConfig = new(); @@ -138,29 +139,85 @@ namespace VNLib.Data.Caching.Extensions { client.Config.DebugLog?.Debug("{debug}: {data}", "[CACHE]", message); } - /// <summary> - /// Discovers ALL possible cache nodes itteritivley from the current collection of initial peers. + /// Discovers ALL possible cache nodes itteritivley, first by collecting the configuration + /// from the initial peers. /// This will make connections to all discoverable servers /// </summary> /// <param name="config"></param> /// <param name="cancellation">A token to cancel the operation</param> /// <returns></returns> /// <exception cref="ArgumentException"></exception> + /// <exception cref="CacheDiscoveryFailureException"></exception> public static async Task DiscoverNodesAsync(this CacheClientConfiguration config, CancellationToken cancellation) { //Make sure at least one node defined - if(config?.InitialPeers == null || config.InitialPeers.Length == 0) + if(config?.WellKnownNodes == null || config.WellKnownNodes.Length == 0) + { + throw new ArgumentException("There must be at least one cache node defined in the client configuration"); + } + + //Get the initial advertisments that arent null + CacheNodeAdvertisment[] initialPeers = await ResolveWellKnownAsync(config, cancellation); + + if (initialPeers.Length == 0) + { + throw new CacheDiscoveryFailureException("There must be at least one available cache node to continue discovery"); + } + + await DiscoverNodesAsync(config, initialPeers, cancellation); + } + + /// <summary> + /// Resolves the initial well-known cache nodes into their advertisments + /// </summary> + /// <param name="config"></param> + /// <param name="cancellation">A token to cancel the operation</param> + /// <returns>An array of resolved nodes</returns> + public static async Task<CacheNodeAdvertisment[]> ResolveWellKnownAsync(this CacheClientConfiguration config, CancellationToken cancellation) + { + _ = config?.WellKnownNodes ?? throw new ArgumentNullException(nameof(config)); + + Task<CacheNodeAdvertisment?>[] initialAdds = new Task<CacheNodeAdvertisment?>[config.WellKnownNodes.Length]; + + //Discover initial advertisments from well-known addresses + for (int i = 0; i < config.WellKnownNodes.Length; i++) { - throw new ArgumentException("There must be at least one cache server defined in the client configuration"); + initialAdds[i] = DiscoverNodeConfigAsync(config.WellKnownNodes[i], config, cancellation); } + //Wait for all initial adds to complete + await Task.WhenAll(initialAdds); + + //Get the initial advertisments that arent null + return initialAdds.Select(static x => x.Result!).Where(static s => s != null).ToArray(); + } + + /// <summary> + /// Discovers ALL possible cache nodes itteritivley from the current collection of initial peers. + /// This will make connections to all discoverable servers and update the client configuration, with all + /// discovered peers + /// </summary> + /// <param name="config"></param> + /// <param name="initialPeers">Accepts an array of initial peers to override the endpoint discovery process</param> + /// <param name="cancellation">A token to cancel the operation</param> + /// <returns>A task that completes when all nodes have been discovered</returns> + /// <exception cref="ArgumentException"></exception> + /// <exception cref="CacheDiscoveryFailureException"></exception> + public static async Task DiscoverNodesAsync(this CacheClientConfiguration config, CacheNodeAdvertisment[] initialPeers, CancellationToken cancellation) + { + //Make sure at least one node defined + if (initialPeers == null || initialPeers.Length == 0) + { + throw new ArgumentException("There must be at least one initial peer"); + } + //Get the discovery enumerator with the initial peers - INodeDiscoveryEnumerator enumerator = config.NodeCollection.BeginDiscovery(config.InitialPeers); + INodeDiscoveryEnumerator enumerator = config.NodeCollection.BeginDiscovery(initialPeers); //Start the discovery process - await DiscoverNodesAsync(enumerator, config.AuthManager, config.ErrorHandler, cancellation); + await DiscoverNodesAsync(enumerator, config, config.ErrorHandler, cancellation); //Commit nodes config.NodeCollection.CompleteDiscovery(enumerator); @@ -168,7 +225,7 @@ namespace VNLib.Data.Caching.Extensions private static async Task DiscoverNodesAsync( INodeDiscoveryEnumerator enumerator, - ICacheAuthManager auth, + CacheClientConfiguration config, ICacheDiscoveryErrorHandler? errHandler, CancellationToken cancellation ) @@ -184,17 +241,17 @@ namespace VNLib.Data.Caching.Extensions } /* - * We are allowed to save nodes that do not have a discovery endpoint, but we cannot discover nodes from them - * we can only use them as cache + * We are allowed to save nodes that do not have a discovery endpoint, but we cannot + * discover nodes from them we can only use them as cache */ - //add a random delay to avoid spamming the server - await Task.Delay((int)Random.Shared.NextInt64(50, 500), cancellation); + //add a random delay to avoid spamming servers + await Task.Delay((int)Random.Shared.NextInt64(100, 500), cancellation); try { //Discover nodes from the current node - ICacheNodeAdvertisment[]? nodes = await GetCacheNodesAsync(enumerator.Current, auth, cancellation); + CacheNodeAdvertisment[]? nodes = await GetCacheNodesAsync(enumerator.Current, config, cancellation); if (nodes != null) { @@ -208,78 +265,77 @@ namespace VNLib.Data.Caching.Extensions //Handle the error errHandler.OnDiscoveryError(enumerator.Current, ex); } + catch(Exception ex) + { + throw new CacheDiscoveryFailureException($"Failed to discovery peer node {enumerator.Current?.NodeId}, cannot continue", ex); + } + } + } + + private static async Task<CacheNodeAdvertisment?> DiscoverNodeConfigAsync(Uri serverUri, CacheClientConfiguration config, CancellationToken cancellation) + { + try + { + GetConfigRequest req = new (serverUri, config); + + //Site adapter verifies response messages so we dont need to check on the response + byte[] data = await SiteAdapter.Value.ExecuteAsync(req, cancellation).AsBytes() + ?? throw new CacheDiscoveryFailureException($"No data returned from desired cache node"); + + //Response is jwt + using JsonWebToken responseJwt = JsonWebToken.ParseRaw(data); + + //The entire payload is just the single serialzed advertisment + using JsonDocument doc = responseJwt.GetPayload(); + + return doc.RootElement.GetProperty("sub").Deserialize<CacheNodeAdvertisment>(); + } + //Bypass cdfe when error handler is null + catch(CacheDiscoveryFailureException) when(config.ErrorHandler == null) + { + throw; + } + //Catch exceptions when an error handler is defined + catch (Exception ex) when (config.ErrorHandler != null) + { + //Handle the error + config.ErrorHandler.OnDiscoveryError(null!, ex); + return null; + } + catch (Exception ex) + { + throw new CacheDiscoveryFailureException("Failed to discover node configuration", ex); } } /// <summary> - /// Contacts the cache broker to get a list of active servers to connect to + /// Contacts the given server's discovery endpoint to discover a list of available + /// servers we can connect to /// </summary> /// <param name="advert">An advertisment of a server to discover other nodes from</param> /// <param name="cancellationToken">A token to cancel the operationS</param> - /// <param name="auth">The authentication manager</param> + /// <param name="config">The cache configuration object</param> /// <returns>The list of active servers</returns> /// <exception cref="SecurityException"></exception> /// <exception cref="ArgumentException"></exception> /// <exception cref="ArgumentNullException"></exception> - public static async Task<ICacheNodeAdvertisment[]?> GetCacheNodesAsync(ICacheNodeAdvertisment advert, ICacheAuthManager auth, CancellationToken cancellationToken = default) + public static async Task<CacheNodeAdvertisment[]?> GetCacheNodesAsync(CacheNodeAdvertisment advert, CacheClientConfiguration config, CancellationToken cancellationToken = default) { _ = advert ?? throw new ArgumentNullException(nameof(advert)); - _ = auth ?? throw new ArgumentNullException(nameof(auth)); + _ = config ?? throw new ArgumentNullException(nameof(config)); _ = advert.DiscoveryEndpoint ?? throw new ArgumentException("Advertisment does not expose an advertisment endpoint"); - string jwtBody; - - //Build request jwt - using (JsonWebToken requestJwt = new()) - { - requestJwt.WriteHeader(auth.GetJwtHeader()); - requestJwt.InitPayloadClaim() - .AddClaim("iat", DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()) - .AddClaim("nonce", RandomHash.GetRandomBase32(16)) - .CommitClaims(); - - //sign the jwt - auth.SignJwt(requestJwt); - - //Compile the jwt - jwtBody = requestJwt.Compile(); - } - - //New list request - RestRequest listRequest = new(advert.DiscoveryEndpoint, Method.Post); - - //Add the jwt as a string to the request body - listRequest.AddStringBody(jwtBody, DataFormat.None); - listRequest.AddHeader("Accept", HttpHelpers.GetContentTypeString(ContentType.Text)); - listRequest.AddHeader("Content-Type", HttpHelpers.GetContentTypeString(ContentType.Text)); - - byte[] data; - - //Rent client - using (ClientContract client = ClientPool.Lease()) - { - //Exec list request - RestResponse response = await client.Resource.ExecuteAsync(listRequest, cancellationToken); - - if (!response.IsSuccessful) - { - throw response.ErrorException!; - } + DiscoveryRequest req = new (advert.DiscoveryEndpoint, config); - data = response.RawBytes ?? throw new InvalidOperationException("No data returned from broker"); - } + //Site adapter verifies response messages so we dont need to check on the response + byte[] data = await SiteAdapter.Value.ExecuteAsync(req, cancellationToken).AsBytes() + ?? throw new InvalidOperationException($"No data returned from node {advert.NodeId}"); //Response is jwt using JsonWebToken responseJwt = JsonWebToken.ParseRaw(data); - //Verify the jwt - if (!auth.VerifyJwt(responseJwt)) - { - throw new SecurityException("Failed to verify the broker's challenge, cannot continue"); - } - using JsonDocument doc = responseJwt.GetPayload(); - return doc.RootElement.GetProperty("peers").Deserialize<Advertisment[]>(); + return doc.RootElement.GetProperty("peers").Deserialize<CacheNodeAdvertisment[]>(); } /// <summary> @@ -313,7 +369,6 @@ namespace VNLib.Data.Caching.Extensions ClientCacheConfig.AddOrUpdate(client, nodeConfig); return nodeConfig; } - /// <summary> /// Waits for the client to disconnect from the server while observing @@ -322,6 +377,8 @@ namespace VNLib.Data.Caching.Extensions /// </summary> /// <param name="client"></param> /// <param name="token">A token to cancel the connection to the server</param> + /// <exception cref="TaskCanceledException"></exception> + /// <exception cref="ObjectDisposedException"></exception> /// <returns>A task that complets when the connecion has been closed successfully</returns> public static async Task WaitForExitAsync(this FBMClient client, CancellationToken token = default) { @@ -341,8 +398,26 @@ namespace VNLib.Data.Caching.Extensions //Normal try to disconnect the socket await client.DisconnectAsync(CancellationToken.None); - //Notify if cancelled - token.ThrowIfCancellationRequested(); + //If the cancellation is completed, throw a task cancelled exception + if (cancellation.IsCompleted) + { + throw new TaskCanceledException("The client disconnected because the connection was cancelled"); + } + } + + /// <summary> + /// Discovers all available nodes for the current client config + /// </summary> + /// <param name="client"></param> + /// <param name="cancellation">A token to cancel the operation</param> + /// <returns>A task that completes when all nodes have been discovered</returns> + public static Task DiscoverAvailableNodesAsync(this FBMClient client, CancellationToken cancellation = default) + { + //Get stored config + CacheClientConfiguration conf = ClientCacheConfig.GetOrCreateValue(client); + + //Discover all nodes + return conf.DiscoverNodesAsync(cancellation); } /// <summary> @@ -357,16 +432,16 @@ namespace VNLib.Data.Caching.Extensions /// <exception cref="ArgumentNullException"></exception> /// <exception cref="SecurityException"></exception> /// <exception cref="ObjectDisposedException"></exception> - public static async Task<ICacheNodeAdvertisment> ConnectToRandomCacheAsync(this FBMClient client, CancellationToken cancellation = default) + public static async Task<CacheNodeAdvertisment> ConnectToRandomCacheAsync(this FBMClient client, CancellationToken cancellation = default) { //Get stored config CacheClientConfiguration conf = ClientCacheConfig.GetOrCreateValue(client); //Get all available nodes, or at least the initial peers - ICacheNodeAdvertisment[]? adverts = conf.NodeCollection.GetAllNodes() ?? conf.InitialPeers ?? throw new ArgumentException("No cache nodes discovered, cannot connect"); + CacheNodeAdvertisment[]? adverts = conf.NodeCollection.GetAllNodes() ?? throw new ArgumentException("No cache nodes discovered, cannot connect"); //Select random node from all available nodes - ICacheNodeAdvertisment randomServer = adverts.SelectRandom(); + CacheNodeAdvertisment randomServer = adverts.SelectRandom(); //Connect to the random server await ConnectToCacheAsync(client, randomServer, cancellation); @@ -388,7 +463,7 @@ namespace VNLib.Data.Caching.Extensions /// <exception cref="ArgumentNullException"></exception> /// <exception cref="SecurityException"></exception> /// <exception cref="ObjectDisposedException"></exception> - public static Task ConnectToCacheAsync(this FBMClient client, ICacheNodeAdvertisment server, CancellationToken token = default) + public static Task ConnectToCacheAsync(this FBMClient client, CacheNodeAdvertisment server, CancellationToken token = default) { _ = client ?? throw new ArgumentNullException(nameof(client)); _ = server ?? throw new ArgumentNullException(nameof(server)); @@ -414,7 +489,7 @@ namespace VNLib.Data.Caching.Extensions /// <exception cref="ArgumentNullException"></exception> /// <exception cref="SecurityException"></exception> /// <exception cref="ObjectDisposedException"></exception> - public static Task ConnectToCacheAsync(this FBMClient client, ICacheNodeAdvertisment server, CacheClientConfiguration explicitConfig, CancellationToken token = default) + public static Task ConnectToCacheAsync(this FBMClient client, CacheNodeAdvertisment server, CacheClientConfiguration explicitConfig, CancellationToken token = default) { _ = client ?? throw new ArgumentNullException(nameof(client)); _ = server ?? throw new ArgumentNullException(nameof(server)); @@ -427,50 +502,10 @@ namespace VNLib.Data.Caching.Extensions private static async Task ConnectToCacheAsync( FBMClient client, CacheClientConfiguration config, - ICacheNodeAdvertisment server, + CacheNodeAdvertisment server, CancellationToken token = default ) { - //build ws uri from the connect endpoint - UriBuilder uriBuilder = new(server.ConnectEndpoint) - { - Scheme = config.UseTls ? "wss://" : "ws://" - }; - - string challenge = RandomHash.GetRandomBase32(24); - - //See if the supplied config is for a cache node - CacheNodeConfiguration? cnc = config as CacheNodeConfiguration; - - string jwtMessage; - //Init jwt for connecting to server - using (JsonWebToken jwt = new()) - { - jwt.WriteHeader(config.AuthManager.GetJwtHeader()); - - //Init claim - JwtPayload claim = jwt.InitPayloadClaim(); - - claim.AddClaim("chl", challenge); - - if (!string.IsNullOrWhiteSpace(cnc?.NodeId)) - { - /* - * The unique node id so the other nodes know to load the - * proper event queue for the current server - */ - claim.AddClaim("sub", cnc.NodeId); - } - - claim.CommitClaims(); - - //Sign jwt - config.AuthManager.SignJwt(jwt); - - //Compile to string - jwtMessage = jwt.Compile(); - } - /* * During a server negiation, the client makes an intial get request to the cache endpoint * and passes some client negiation terms as a signed message to the server. The server then @@ -479,52 +514,34 @@ namespace VNLib.Data.Caching.Extensions * The response from the server is essentailly the 'access token' */ - RestRequest negotation = new(server.ConnectEndpoint, Method.Get); - //Set the jwt auth header for negotiation - negotation.AddHeader("Authorization", jwtMessage); - negotation.AddHeader("Accept", HttpHelpers.GetContentTypeString(ContentType.Text)); - client.LogDebug("Negotiating with cache server"); - - string authToken; - - //rent client - using (ClientContract clientContract = ClientPool.Lease()) - { - //Execute the request - RestResponse response = await clientContract.Resource.ExecuteGetAsync(negotation, token); - response.ThrowIfError(); - - if (response.Content == null) - { - throw new FBMServerNegiationException("Failed to negotiate with the server, no response"); - } - - //Raw content - authToken = response.Content; - } + //Create a new connection negotiation + NegotationRequest req = new(server.ConnectEndpoint, config); - //Parse the jwt - using (JsonWebToken jwt = JsonWebToken.Parse(authToken)) - { - //Verify the jwt - if (!config.AuthManager.VerifyJwt(jwt)) - { - throw new SecurityException("Failed to verify the cache server's negotiation message, cannot continue"); - } + //Exec negotiation + RestResponse response = await SiteAdapter.Value.ExecuteAsync(req, token); + /* + * JWT will already be veified by the endpoint adapter, so we + * just need to validate the server's buffer configuration + */ + using (JsonWebToken jwt = JsonWebToken.ParseRaw(response.RawBytes)) + { //Confirm the server's buffer configuration - ValidateServerNegotation(client, challenge, jwt); + ValidateServerNegotation(client, jwt); } client.LogDebug("Server negotiation validated, connecting to server"); //The client authorization header is the exact response - client.ClientSocket.Headers[HttpRequestHeader.Authorization] = authToken; + client.ClientSocket.Headers[HttpRequestHeader.Authorization] = response.Content!; + + //See if the supplied config is for a cache node + CacheNodeConfiguration? cnc = config as CacheNodeConfiguration; //Compute the signature of the upgrade token - client.ClientSocket.Headers[X_UPGRADE_SIG_HEADER] = config.AuthManager.GetBase64UpgradeSingature(authToken); + client.ClientSocket.Headers[X_UPGRADE_SIG_HEADER] = config.AuthManager.GetBase64UpgradeSignature(response.Content, cnc != null); //Check to see if adversize self is enabled if (cnc?.BroadcastAdverisment == true) @@ -533,11 +550,17 @@ namespace VNLib.Data.Caching.Extensions client.ClientSocket.Headers[X_NODE_DISCOVERY_HEADER] = GetAdvertismentHeader(cnc); } + //build ws uri from the connect endpoint + UriBuilder uriBuilder = new(server.ConnectEndpoint) + { + Scheme = config.UseTls ? "wss://" : "ws://" + }; + //Connect async await client.ConnectAsync(uriBuilder.Uri, token); } - private static void ValidateServerNegotation(FBMClient client, string challenge, JsonWebToken jwt) + private static void ValidateServerNegotation(FBMClient client, JsonWebToken jwt) { try { @@ -548,15 +571,6 @@ namespace VNLib.Data.Caching.Extensions .EnumerateObject() .ToDictionary(static k => k.Name, static v => v.Value); - //get the challenge response - string challengeResponse = args["chl"].GetString()!; - - //Check the challenge response - if (!challenge.Equals(challengeResponse, StringComparison.Ordinal)) - { - throw new FBMServerNegiationException("Failed to negotiate with the server, challenge response does not match"); - } - //Get the negiation values uint recvBufSize = args[FBMClient.REQ_RECV_BUF_QUERY_ARG].GetUInt32(); uint headerBufSize = args[FBMClient.REQ_HEAD_BUF_QUERY_ARG].GetUInt32(); @@ -593,7 +607,7 @@ namespace VNLib.Data.Caching.Extensions * compute a signature of the upgrade token and send it to the server to prove we hold the private key. */ - private static string GetBase64UpgradeSingature(this ICacheAuthManager man, string? token) + private static string GetBase64UpgradeSignature(this ICacheAuthManager man, string? token, bool isPeer) { //Compute hash of the token byte[] hash = ManagedHash.ComputeHash(token, HashAlg.SHA256); @@ -604,19 +618,7 @@ namespace VNLib.Data.Caching.Extensions //Return the base64 string return Convert.ToBase64String(sig); } - - /// <summary> - /// Verifies the signed auth token against the given verification key - /// </summary> - /// <param name="signature">The base64 signature of the token</param> - /// <param name="token">The raw token to compute the hash of</param> - /// <param name="nodeConfig">The node configuration</param> - /// <returns>True if the singature matches, false otherwise</returns> - /// <exception cref="CryptographicException"></exception> - public static bool VerifyUpgradeToken(this CacheClientConfiguration nodeConfig, string signature, string token) - { - return VerifyUpgradeToken(nodeConfig.AuthManager, signature, token); - } + /// <summary> /// Verifies the signed auth token against the given verification key @@ -624,10 +626,11 @@ namespace VNLib.Data.Caching.Extensions /// <param name="man"></param> /// <param name="signature">The base64 signature of the token</param> /// <param name="token">The raw token to compute the hash of</param> + /// <param name="isPeer">A value that indicates if the connection is from a peer node</param> /// <returns>True if the singature matches, false otherwise</returns> /// <exception cref="ArgumentNullException"></exception> /// <exception cref="CryptographicException"></exception> - public static bool VerifyUpgradeToken(this ICacheAuthManager man, string signature, string token) + public static bool VerifyUpgradeToken(this ICacheAuthManager man, string signature, string token, bool isPeer) { _ = man ?? throw new ArgumentNullException(nameof(man)); @@ -637,7 +640,7 @@ namespace VNLib.Data.Caching.Extensions //decode the signature byte[] sig = Convert.FromBase64String(signature); - return man.VerifyMessageHash(hash, HashAlg.SHA256, sig); + return man.VerifyMessageHash(hash, HashAlg.SHA256, sig, isPeer); } private static string GetAdvertismentHeader(CacheNodeConfiguration nodeConfiguration) @@ -675,58 +678,30 @@ namespace VNLib.Data.Caching.Extensions /// <param name="message">The advertisment message to verify</param> /// <returns>The advertisment message if successfully verified, or null otherwise</returns> /// <exception cref="FormatException"></exception> - public static ICacheNodeAdvertisment? VerifyPeerAdvertisment(this ICacheAuthManager config, string message) + public static CacheNodeAdvertisment? VerifyPeerAdvertisment(this ICacheAuthManager config, string message) { using JsonWebToken jwt = JsonWebToken.Parse(message); //Verify the signature - if (!config.VerifyJwt(jwt)) + if (!config.VerifyJwt(jwt, true)) { return null; } //Get the payload - return jwt.GetPayload<Advertisment>(); - } - + return jwt.GetPayload<CacheNodeAdvertisment>(); + } /// <summary> /// Selects a random server from a collection of active servers /// </summary> /// <param name="servers"></param> /// <returns>A server selected at random</returns> - public static ICacheNodeAdvertisment SelectRandom(this ICollection<ICacheNodeAdvertisment> servers) + public static CacheNodeAdvertisment SelectRandom(this ICollection<CacheNodeAdvertisment> servers) { //select random server int randServer = RandomNumberGenerator.GetInt32(0, servers.Count); return servers.ElementAt(randServer); } - - - private class Advertisment : ICacheNodeAdvertisment - { - [JsonIgnore] - public Uri? ConnectEndpoint { get; set; } - - [JsonIgnore] - public Uri? DiscoveryEndpoint { get; set; } - - [JsonPropertyName("iss")] - public string NodeId { get; set; } - - [JsonPropertyName("url")] - public string? url - { - get => ConnectEndpoint?.ToString(); - set => ConnectEndpoint = value == null ? null : new Uri(value); - } - - [JsonPropertyName("dis")] - public string? dis - { - get => DiscoveryEndpoint?.ToString(); - set => DiscoveryEndpoint = value == null ? null : new Uri(value); - } - } } } diff --git a/lib/VNLib.Data.Caching.Extensions/src/ICacheAuthManager.cs b/lib/VNLib.Data.Caching.Extensions/src/ICacheAuthManager.cs index e3ab868..32ae142 100644 --- a/lib/VNLib.Data.Caching.Extensions/src/ICacheAuthManager.cs +++ b/lib/VNLib.Data.Caching.Extensions/src/ICacheAuthManager.cs @@ -53,8 +53,9 @@ namespace VNLib.Data.Caching.Extensions /// Verifies the given JWT /// </summary> /// <param name="jwt">The message to verify authenticity</param> + /// <param name="isPeer">A value indicating if the message is from a known node</param> /// <returns>True of the JWT could be verified, false otherwise</returns> - bool VerifyJwt(JsonWebToken jwt); + bool VerifyJwt(JsonWebToken jwt, bool isPeer); /// <summary> /// Signs the given message hash @@ -70,7 +71,8 @@ namespace VNLib.Data.Caching.Extensions /// <param name="hash">The message hash to compare</param> /// <param name="alg">The algorithm used to produce the message hash</param> /// <param name="signature">The message signature to verify the message against</param> + /// <param name="isPeer">A value indicating if the message is from a known node</param> /// <returns>True of the signature could be verified</returns> - bool VerifyMessageHash(ReadOnlySpan<byte> hash, HashAlg alg, ReadOnlySpan<byte> signature); + bool VerifyMessageHash(ReadOnlySpan<byte> hash, HashAlg alg, ReadOnlySpan<byte> signature, bool isPeer); } } diff --git a/lib/VNLib.Data.Caching.ObjectCache/src/BlobCache.cs b/lib/VNLib.Data.Caching.ObjectCache/src/BlobCache.cs index f77587b..5e28dde 100644 --- a/lib/VNLib.Data.Caching.ObjectCache/src/BlobCache.cs +++ b/lib/VNLib.Data.Caching.ObjectCache/src/BlobCache.cs @@ -187,9 +187,9 @@ namespace VNLib.Data.Caching.ObjectCache { //remove the entry and bypass the disposal bool result = base.Remove(objectId); -#if DEBUG + Debug.Assert(result == true); -#endif + return true; } diff --git a/lib/VNLib.Data.Caching.ObjectCache/src/CacheEntry.cs b/lib/VNLib.Data.Caching.ObjectCache/src/CacheEntry.cs index a02605e..f650cc3 100644 --- a/lib/VNLib.Data.Caching.ObjectCache/src/CacheEntry.cs +++ b/lib/VNLib.Data.Caching.ObjectCache/src/CacheEntry.cs @@ -24,6 +24,7 @@ using System; using System.Buffers; +using System.Diagnostics; using System.Buffers.Binary; using System.Runtime.CompilerServices; @@ -204,10 +205,9 @@ namespace VNLib.Data.Caching //Get the data segment Span<byte> segment = GetDataSegment(); -#if DEBUG //Test segment length is equivalent to the requested data length - System.Diagnostics.Debug.Assert(segment.Length == data.Length); -#endif + Debug.Assert(segment.Length == data.Length); + //Copy data segment data.CopyTo(segment); } diff --git a/lib/VNLib.Plugins.Extensions.VNCache/src/RemoteCacheOperator.cs b/lib/VNLib.Plugins.Extensions.VNCache/src/RemoteCacheOperator.cs index 1f0742d..f40f746 100644 --- a/lib/VNLib.Plugins.Extensions.VNCache/src/RemoteCacheOperator.cs +++ b/lib/VNLib.Plugins.Extensions.VNCache/src/RemoteCacheOperator.cs @@ -26,9 +26,9 @@ using System; using System.Threading; using System.Threading.Tasks; +using VNLib.Utils.Logging; using VNLib.Data.Caching; using VNLib.Plugins.Extensions.Loading; -using VNLib.Utils.Logging; namespace VNLib.Plugins.Extensions.VNCache { @@ -41,7 +41,7 @@ namespace VNLib.Plugins.Extensions.VNCache /// The background work method must be sheduled for the cache client to be /// connected to the backing store /// </remarks> - public sealed class RemoteCacheOperator : IAsyncBackgroundWork, IAsyncConfigurable + public sealed class RemoteCacheOperator : IAsyncBackgroundWork { private readonly VnCacheClient _client; private CancellationTokenSource? _tokenSource; @@ -78,11 +78,6 @@ namespace VNLib.Plugins.Extensions.VNCache /// Cancels the background cache client listener /// </summary> public void CancelListener() => _tokenSource?.Cancel(); - - ///<inheritdoc/> - public Task ConfigureServiceAsync(PluginBase plugin) - { - return _client.ConfigureServiceAsync(plugin); - } + } }
\ No newline at end of file diff --git a/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs b/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs index f4f059b..aa3d88f 100644 --- a/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs +++ b/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs @@ -40,7 +40,7 @@ using VNLib.Data.Caching.Extensions; using VNLib.Data.Caching.ObjectCache; using VNLib.Net.Messaging.FBM.Client; using VNLib.Plugins.Extensions.Loading; - +using VNLib.Data.Caching.Extensions.Clustering; namespace VNLib.Plugins.Extensions.VNCache { @@ -56,7 +56,7 @@ namespace VNLib.Plugins.Extensions.VNCache /// A base class that manages /// </summary> [ConfigurationName(VNCacheExtensions.CACHE_CONFIG_KEY)] - internal class VnCacheClient : IGlobalCacheProvider, IAsyncBackgroundWork, IAsyncConfigurable + internal class VnCacheClient : IGlobalCacheProvider, IAsyncBackgroundWork { private readonly VnCacheClientConfig _config; @@ -70,12 +70,17 @@ namespace VNLib.Plugins.Extensions.VNCache /// </summary> public bool IsConnected { get; private set; } - public VnCacheClient(PluginBase pbase, IConfigScope config) + public VnCacheClient(PluginBase plugin, IConfigScope config) :this( - config.Deserialze<VnCacheClientConfig>(), - pbase.IsDebug() ? pbase.Log : null + config.Deserialze<VnCacheClientConfig>(), + plugin.IsDebug() ? plugin.Log : null ) - {} + { + //Set authenticator and error handler + Client.GetCacheConfiguration() + .WithAuthenticator(new AuthManager(plugin)) + .WithErrorHandler(new DiscoveryErrHAndler(plugin.Log)); + } public VnCacheClient(VnCacheClientConfig config, ILogProvider? debugLog) { @@ -92,17 +97,7 @@ namespace VNLib.Plugins.Extensions.VNCache //Add the configuration to the client Client.GetCacheConfiguration() .WithTls(config.UseTls) - .WithInitialPeers(config.InitialNodes!); - } - - public Task ConfigureServiceAsync(PluginBase plugin) - { - //Set authenticator - Client.GetCacheConfiguration() - .WithAuthenticator(new AuthManager(plugin)) - .WithErrorHandler(new DiscoveryErrHAndler(plugin.Log)); - - return Task.CompletedTask; + .WithInitialPeers(config.GetInitialNodeUris()); } /* @@ -115,21 +110,25 @@ namespace VNLib.Plugins.Extensions.VNCache { while (true) { - //Load the server list - ICacheNodeAdvertisment[]? servers; + //Discover nodes in the network + while (true) { try { - pluginLog.Debug("Discovering cluster nodes in broker"); + pluginLog.Debug("Discovering cluster nodes in network"); //Get server list - servers = await Client.DiscoverCacheNodesAsync(exitToken); + await Client.DiscoverAvailableNodesAsync(exitToken); break; } catch (HttpRequestException re) when (re.InnerException is SocketException) { pluginLog.Warn("Broker server is unreachable"); } + catch(CacheDiscoveryFailureException ce) + { + pluginLog.Warn("Failed to discover cache servers, reason {r}", ce); + } catch (Exception ex) { pluginLog.Warn("Failed to get server list from broker, reason {r}", ex.Message); @@ -140,19 +139,12 @@ namespace VNLib.Plugins.Extensions.VNCache await Task.Delay(randomMsDelay, exitToken); } - if (servers?.Length == 0) - { - pluginLog.Warn("No cluster nodes found, retrying"); - await Task.Delay(_config.RetryInterval, exitToken); - continue; - } - try { pluginLog.Debug("Connecting to random cache server"); //Connect to a random server - ICacheNodeAdvertisment selected = await Client.ConnectToRandomCacheAsync(exitToken); + CacheNodeAdvertisment selected = await Client.ConnectToRandomCacheAsync(exitToken); pluginLog.Debug("Connected to cache server {s}", selected.NodeId); //Set connection status flag @@ -304,13 +296,13 @@ namespace VNLib.Plugins.Extensions.VNCache } ///<inheritdoc/> - public bool VerifyJwt(JsonWebToken jwt) + public bool VerifyJwt(JsonWebToken jwt, bool isPeer) { return jwt.VerifyFromJwk(_verKey.Value); } ///<inheritdoc/> - public bool VerifyMessageHash(ReadOnlySpan<byte> hash, HashAlg alg, ReadOnlySpan<byte> signature) + public bool VerifyMessageHash(ReadOnlySpan<byte> hash, HashAlg alg, ReadOnlySpan<byte> signature, bool isPeer) { //try to get the rsa alg for the signing key using RSA? rsa = _verKey.Value.GetRSAPublicKey(); @@ -332,7 +324,7 @@ namespace VNLib.Plugins.Extensions.VNCache private sealed record class DiscoveryErrHAndler(ILogProvider Logger) : ICacheDiscoveryErrorHandler { - public void OnDiscoveryError(ICacheNodeAdvertisment errorNode, Exception ex) + public void OnDiscoveryError(CacheNodeAdvertisment errorNode, Exception ex) { Logger.Error("Failed to discover nodes from server {s} cause:\n{err}", errorNode.NodeId, ex); } diff --git a/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClientConfig.cs b/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClientConfig.cs index 64d3e07..73fb70f 100644 --- a/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClientConfig.cs +++ b/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClientConfig.cs @@ -23,9 +23,9 @@ */ using System; +using System.Linq; using System.Text.Json.Serialization; -using VNLib.Data.Caching.Extensions; using VNLib.Plugins.Extensions.Loading; namespace VNLib.Plugins.Extensions.VNCache @@ -40,7 +40,7 @@ namespace VNLib.Plugins.Extensions.VNCache /// cache server. This value will be negotiated with the server /// during a connection upgrade /// </summary> - [JsonPropertyName("max_message_size")] + [JsonPropertyName("max_object_size")] public int? MaxMessageSize { get; set; } /// <summary> @@ -82,7 +82,18 @@ namespace VNLib.Plugins.Extensions.VNCache /// The initial peers to connect to /// </summary> [JsonPropertyName("initial_nodes")] - public InitialNode[]? InitialNodes { get; set; } + public string[]? InitialNodes { get; set; } + + /// <summary> + /// Gets the initial nodes as a collection of URIs + /// </summary> + /// <returns>The nodes as a collection of URIs</returns> + /// <exception cref="InvalidOperationException"></exception> + public Uri[] GetInitialNodeUris() + { + _ = InitialNodes ?? throw new InvalidOperationException("Initial nodes have not been set"); + return InitialNodes.Select(x => new Uri(x)).ToArray(); + } void IOnConfigValidation.Validate() { @@ -108,37 +119,21 @@ namespace VNLib.Plugins.Extensions.VNCache throw new ArgumentException("You must specify at least one initial peer", "initial_peers"); } - foreach (InitialNode peer in InitialNodes) - { - _ = peer.ConnectEndpoint ?? throw new ArgumentException("You must specify a connect endpoint for each initial node", "initial_nodes"); - _ = peer.NodeId ?? throw new ArgumentException("You must specify a node id for each initial node", "initial_nodes"); - } - } - - public sealed record class InitialNode : ICacheNodeAdvertisment - { - [JsonIgnore] - public Uri ConnectEndpoint { get; private set; } - - [JsonIgnore] - public Uri? DiscoveryEndpoint { get; private set; } - - [JsonPropertyName("node_id")] - public string? NodeId { get; set; } - - [JsonPropertyName("connect_endpoint")] - public string? ConnectEndpointString - { - get => ConnectEndpoint.ToString(); - set => ConnectEndpoint = new Uri(value!); - } - - [JsonPropertyName("discovery_endpoint")] - public string? DiscoveryEndpointString + //Validate initial nodes + foreach (Uri peer in GetInitialNodeUris()) { - get => DiscoveryEndpoint?.ToString(); - set => DiscoveryEndpoint = value == null ? null : new Uri(value); + if (!peer.IsAbsoluteUri) + { + throw new ArgumentException("You must specify an absolute URI for each initial node", "initial_nodes"); + } + + //Verify http connection + if(peer.Scheme != Uri.UriSchemeHttp || peer.Scheme != Uri.UriSchemeHttps) + { + throw new ArgumentException("You must specify an HTTP or HTTPS URI for each initial node", "initial_nodes"); + } } } + } }
\ No newline at end of file diff --git a/plugins/ObjectCacheServer/src/CacheAuthKeyStore.cs b/plugins/ObjectCacheServer/src/CacheAuthKeyStore.cs index 6725fbe..5fc700b 100644 --- a/plugins/ObjectCacheServer/src/CacheAuthKeyStore.cs +++ b/plugins/ObjectCacheServer/src/CacheAuthKeyStore.cs @@ -32,7 +32,6 @@ using VNLib.Hashing.IdentityUtility; using VNLib.Plugins.Extensions.Loading; using VNLib.Data.Caching.Extensions; - namespace VNLib.Data.Caching.ObjectCache.Server { sealed record class CacheAuthKeyStore : ICacheAuthManager @@ -59,9 +58,10 @@ namespace VNLib.Data.Caching.ObjectCache.Server } ///<inheritdoc/> - public bool VerifyJwt(JsonWebToken jwt) + public bool VerifyJwt(JsonWebToken jwt, bool isPeer) { - return jwt.VerifyFromJwk(_clientPub.Value); + //Verify from peer server or client + return isPeer ? jwt.VerifyFromJwk(_cachePriv.Value) : jwt.VerifyFromJwk(_clientPub.Value); } /// <summary> @@ -85,7 +85,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server } //try to get the ecdsa alg for the signing key - using ECDsa? ecdsa = _cachePriv.Value.GetECDsaPublicKey(); + using ECDsa? ecdsa = _cachePriv.Value.GetECDsaPrivateKey(); if (ecdsa != null) { return ecdsa.SignHash(hash); @@ -95,17 +95,20 @@ namespace VNLib.Data.Caching.ObjectCache.Server } ///<inheritdoc/> - public bool VerifyMessageHash(ReadOnlySpan<byte> hash, HashAlg alg, ReadOnlySpan<byte> signature) + public bool VerifyMessageHash(ReadOnlySpan<byte> hash, HashAlg alg, ReadOnlySpan<byte> signature, bool isPeer) { + //Determine the key to verify against + ReadOnlyJsonWebKey jwk = isPeer ? _cachePriv.Value : _clientPub.Value; + //try to get the rsa alg for the signing key - using RSA? rsa = _clientPub.Value.GetRSAPublicKey(); + using RSA? rsa = jwk.GetRSAPublicKey(); if (rsa != null) { return rsa.VerifyHash(hash, signature, alg.GetAlgName(), RSASignaturePadding.Pkcs1); } //try to get the ecdsa alg for the signing key - using ECDsa? ecdsa = _clientPub.Value.GetECDsaPublicKey(); + using ECDsa? ecdsa = jwk.GetECDsaPublicKey(); if (ecdsa != null) { return ecdsa.VerifyHash(hash, signature); diff --git a/plugins/ObjectCacheServer/src/CacheEventQueueManager.cs b/plugins/ObjectCacheServer/src/CacheEventQueueManager.cs index 049069e..3827121 100644 --- a/plugins/ObjectCacheServer/src/CacheEventQueueManager.cs +++ b/plugins/ObjectCacheServer/src/CacheEventQueueManager.cs @@ -104,7 +104,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server } //Return the node's queue - return nq.Queue; + return nq; } ///<inheritdoc/> @@ -183,8 +183,10 @@ namespace VNLib.Data.Caching.ObjectCache.Server //Interval to purge stale subscribers Task IIntervalScheduleable.OnIntervalAsync(ILogProvider log, CancellationToken cancellationToken) { - //Purge + log.Debug("Purging stale peer event queues"); + PurgeStaleSubscribers(); + return Task.CompletedTask; } diff --git a/plugins/ObjectCacheServer/src/CacheListenerPubQueue.cs b/plugins/ObjectCacheServer/src/CacheListenerPubQueue.cs index 52b6abf..9c7388e 100644 --- a/plugins/ObjectCacheServer/src/CacheListenerPubQueue.cs +++ b/plugins/ObjectCacheServer/src/CacheListenerPubQueue.cs @@ -25,13 +25,13 @@ using System; using System.Threading; using System.Threading.Tasks; +using System.Threading.Channels; using VNLib.Utils.Async; using VNLib.Utils.Logging; using VNLib.Plugins; using VNLib.Plugins.Extensions.Loading; - namespace VNLib.Data.Caching.ObjectCache.Server { internal sealed class CacheListenerPubQueue : ICacheListenerEventQueue, IAsyncBackgroundWork @@ -44,10 +44,13 @@ namespace VNLib.Data.Caching.ObjectCache.Server { _queueManager = plugin.GetOrCreateSingleton<CacheEventQueueManager>(); _logProvider = plugin.Log; - _listenerQueue = new AsyncQueue<ChangeEvent>(false, true); - - //Register processing worker - _ = plugin.ObserveWork(this, 500); + _listenerQueue = new AsyncQueue<ChangeEvent>(new BoundedChannelOptions(10000) + { + AllowSynchronousContinuations = true, + FullMode = BoundedChannelFullMode.DropOldest, + SingleReader = true, + SingleWriter = false, + }); } ///<inheritdoc/> @@ -59,25 +62,25 @@ namespace VNLib.Data.Caching.ObjectCache.Server { //Accumulator for events ChangeEvent[] accumulator = new ChangeEvent[accumulatorSize]; - int ptr = 0; + int index = 0; //Listen for changes while (true) { //Wait for next event - accumulator[ptr++] = await _listenerQueue.DequeueAsync(exitToken); + accumulator[index++] = await _listenerQueue.DequeueAsync(exitToken); //try to accumulate more events until we can't anymore - while (_listenerQueue.TryDequeue(out ChangeEvent? ev) && ptr < accumulatorSize) + while (_listenerQueue.TryDequeue(out ChangeEvent? ev) && index < accumulatorSize) { - accumulator[ptr++] = ev; + accumulator[index++] = ev; } //Publish all events to subscribers - _queueManager.PublishMultiple(accumulator.AsSpan(0, ptr)); + _queueManager.PublishMultiple(accumulator.AsSpan(0, index)); //Reset pointer - ptr = 0; + index = 0; } } catch (OperationCanceledException) diff --git a/plugins/ObjectCacheServer/src/CacheStore.cs b/plugins/ObjectCacheServer/src/CacheStore.cs index e7a7c63..c1d47f6 100644 --- a/plugins/ObjectCacheServer/src/CacheStore.cs +++ b/plugins/ObjectCacheServer/src/CacheStore.cs @@ -30,7 +30,6 @@ using VNLib.Utils.Logging; using VNLib.Plugins; using VNLib.Plugins.Extensions.Loading; - namespace VNLib.Data.Caching.ObjectCache.Server { [ConfigurationName("cache")] @@ -62,6 +61,14 @@ namespace VNLib.Data.Caching.ObjectCache.Server private static BlobCacheListener InitializeCache(ObjectCacheServerEntry plugin, IConfigScope config) { + const string CacheConfigTemplate = +@" +Cache Configuration: + Max memory: {max} Mb + Buckets: {bc} + Entries per-bucket: {mc} +"; + //Deserialize the cache config CacheConfiguration cacheConf = config.Deserialze<CacheConfiguration>(); @@ -74,22 +81,24 @@ namespace VNLib.Data.Caching.ObjectCache.Server if (cacheConf.MaxCacheEntries < 200) { plugin.Log.Information("Suggestion: You may want a larger cache size, you have less than 200 items in cache"); - } - - plugin.Log.Verbose("Creating cache store with {bc} buckets, with {mc} items/bucket", cacheConf.BucketCount, cacheConf.MaxCacheEntries); + } //calculate the max memory usage ulong maxByteSize = ((ulong)cacheConf.MaxCacheEntries * (ulong)cacheConf.BucketCount * (ulong)cacheConf.MaxMessageSize); - //Log max memory usage - plugin.Log.Debug("Maxium memory consumption {mx}Mb", maxByteSize / (ulong)(1024 * 1000)); - - //Load the blob cache table system - IBlobCacheTable bc = plugin.LoadMemoryCacheSystem(config, plugin.CacheHeap, cacheConf); + //Log the cache config + plugin.Log.Information(CacheConfigTemplate, + maxByteSize / (ulong)(1024 * 1000), + cacheConf.BucketCount, + cacheConf.MaxCacheEntries + ); //Get the event listener ICacheListenerEventQueue queue = plugin.GetOrCreateSingleton<CacheListenerPubQueue>(); + //Load the blob cache table system + IBlobCacheTable bc = plugin.LoadMemoryCacheSystem(config, plugin.CacheHeap, cacheConf); + //Endpoint only allows for a single reader return new(bc, queue, plugin.Log, plugin.CacheHeap); } diff --git a/plugins/ObjectCacheServer/src/CacheSystemUtil.cs b/plugins/ObjectCacheServer/src/CacheSystemUtil.cs index 669b84f..f8aedae 100644 --- a/plugins/ObjectCacheServer/src/CacheSystemUtil.cs +++ b/plugins/ObjectCacheServer/src/CacheSystemUtil.cs @@ -70,7 +70,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server else { //Default type - table = GetInternalBlobCache(heap, cacheConf, pCManager); + table = GetInternalBlobCache(heap, cacheConf, pCManager); } //Initialize the subsystem from the cache table diff --git a/plugins/ObjectCacheServer/src/Distribution/CacheNodeReplicationMaanger.cs b/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs index a55e8e2..ffdd4f4 100644 --- a/plugins/ObjectCacheServer/src/Distribution/CacheNodeReplicationMaanger.cs +++ b/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs @@ -3,10 +3,10 @@ * * Library: VNLib * Package: ObjectCacheServer -* File: ObjectCacheServerEntry.cs +* File: CacheNodeReplicationMaanger.cs * -* ObjectCacheServerEntry.cs is part of ObjectCacheServer which is part of the larger -* VNLib collection of libraries and utilities. +* CacheNodeReplicationMaanger.cs is part of ObjectCacheServer which is part +* of the larger VNLib collection of libraries and utilities. * * ObjectCacheServer is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License as @@ -35,52 +35,91 @@ using static VNLib.Data.Caching.Constants; using VNLib.Net.Messaging.FBM; using VNLib.Net.Messaging.FBM.Client; using VNLib.Plugins.Extensions.Loading; +using VNLib.Data.Caching.Extensions.Clustering; -namespace VNLib.Data.Caching.ObjectCache.Server.Distribution +namespace VNLib.Data.Caching.ObjectCache.Server.Clustering { + + /* + * This class is responsible for replicating the cache with other nodes. + * + * It does this by connecting to other nodes and listening for change events. + * When a change event occurs, it takes action against the local cache store, + * to keep it consistent with the other nodes. + * + * Change events are only handled first-hand, meaning that events do not + * propagate to other nodes, they must be connected individually to each node + * and listen for changes. + */ + internal sealed class CacheNodeReplicationMaanger : IAsyncBackgroundWork { + private const string LOG_SCOPE_NAME = "REPL"; + private static readonly TimeSpan GetItemTimeout = TimeSpan.FromSeconds(10); + private const int MAX_MESSAGE_SIZE = 12 * 1024; - private readonly NodeConfig NodeConfig; - private readonly ICachePeerAdapter PeerAdapter; - private readonly ICacheStore CacheStore; - private readonly FBMClientConfig ClientConfig; - private readonly PluginBase Plugin; + private readonly PluginBase _plugin; + private readonly ILogProvider _log; + private readonly NodeConfig _nodeConfig; + private readonly ICacheStore _cacheStore; + private readonly ICachePeerAdapter _peerAdapter; + private readonly FBMClientConfig _replicationClientConfig; + + private readonly bool _isDebug; - private CacheNodeConfiguration CacheConfig => NodeConfig.Config; + private int _openConnections; public CacheNodeReplicationMaanger(PluginBase plugin) { //Load the node config - NodeConfig = plugin.GetOrCreateSingleton<NodeConfig>(); - - //Get peer adapter - PeerAdapter = plugin.GetOrCreateSingleton<PeerDiscoveryManager>(); - - CacheStore = plugin.GetOrCreateSingleton<CacheStore>(); + _nodeConfig = plugin.GetOrCreateSingleton<NodeConfig>(); + _cacheStore = plugin.GetOrCreateSingleton<CacheStore>(); + _peerAdapter = plugin.GetOrCreateSingleton<PeerDiscoveryManager>(); + + //Init fbm config with fixed message size + _replicationClientConfig = FBMDataCacheExtensions.GetDefaultConfig( + (plugin as ObjectCacheServerEntry)!.CacheHeap, + MAX_MESSAGE_SIZE, + debugLog: plugin.IsDebug() ? plugin.Log : null + ); + + _plugin = plugin; + _isDebug = plugin.IsDebug(); + _log = plugin.Log.CreateScope(LOG_SCOPE_NAME); } public async Task DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken) { - pluginLog.Information("[REPL] Initializing node replication worker"); + _log.Information("Initializing node replication worker"); try { while (true) { //Get all new peers - ICacheNodeAdvertisment[] peers = PeerAdapter.GetNewPeers(); + CacheNodeAdvertisment[] peers = _peerAdapter.GetNewPeers(); - if (peers.Length == 0) + if (peers.Length == 0 && _isDebug) { - pluginLog.Verbose("[REPL] No new peers to connect to"); + _log.Verbose("No new peers to connect to"); } - //Connect to each peer as a background task - foreach (ICacheNodeAdvertisment peer in peers) + //Make sure we don't exceed the max connections + if(_openConnections >= _nodeConfig.MaxPeerConnections) { - _ = Plugin.ObserveWork(() => OnNewPeerDoWorkAsync(peer, pluginLog, exitToken)); + if (_isDebug) + { + _log.Verbose("Max peer connections reached, waiting for a connection to close"); + } + } + else + { + //Connect to each peer as a background task + foreach (CacheNodeAdvertisment peer in peers) + { + _ = _plugin.ObserveWork(() => OnNewPeerDoWorkAsync(peer, _log, exitToken)); + } } //Wait for a new peers @@ -93,7 +132,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Distribution } catch { - pluginLog.Error("[REPL] Node replication worker exited with an error"); + _log.Error("Node replication worker exited with an error"); throw; } finally @@ -101,25 +140,27 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Distribution } - pluginLog.Information("[REPL] Node replication worker exited"); + _log.Information("Node replication worker exited"); } - private async Task OnNewPeerDoWorkAsync(ICacheNodeAdvertisment newPeer, ILogProvider log, CancellationToken exitToken) + private async Task OnNewPeerDoWorkAsync(CacheNodeAdvertisment newPeer, ILogProvider log, CancellationToken exitToken) { _ = newPeer ?? throw new ArgumentNullException(nameof(newPeer)); //Setup client - FBMClient client = new(ClientConfig); + FBMClient client = new(_replicationClientConfig); //Add peer to monitor - PeerAdapter.OnPeerListenerAttached(newPeer); + _peerAdapter.OnPeerListenerAttached(newPeer); + + Interlocked.Increment(ref _openConnections); try { log.Information("Establishing replication connection to peer {server}...", newPeer.NodeId); //Connect to the server - await client.ConnectToCacheAsync(newPeer, CacheConfig, exitToken); + await client.ConnectToCacheAsync(newPeer, _nodeConfig.Config, exitToken); log.Information("Connected to {server}, starting queue listeners", newPeer.NodeId); @@ -176,16 +217,21 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Distribution } finally { + Interlocked.Decrement(ref _openConnections); + client.Dispose(); //Notify monitor of disconnect - PeerAdapter.OnPeerListenerDetatched(newPeer); + _peerAdapter.OnPeerListenerDetatched(newPeer); } } //Wroker task callback method private async Task ReplicationWorkerDoWorkAsync(FBMClient client, ILogProvider log, CancellationToken exitToken) { + //Reusable request message + using FBMRequest request = new(client.Config); + //Listen for changes while (true) { @@ -197,53 +243,47 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Distribution switch (changedObject.Status) { case ResponseCodes.NotFound: - log.Warn("Server cache not properly configured, worker exiting"); + log.Error("Server cache not properly configured, worker exiting"); return; case "deleted": //Delete the object from the store - await CacheStore.DeleteItemAsync(changedObject.CurrentId, CancellationToken.None); + await _cacheStore.DeleteItemAsync(changedObject.CurrentId, CancellationToken.None); break; case "modified": //Reload the record from the store - await UpdateRecordAsync(client, log, changedObject.CurrentId, changedObject.NewId, exitToken); + await UpdateRecordAsync(client, request, log, changedObject.CurrentId, changedObject.NewId, exitToken); break; } + + //Reset request message + request.Reset(); } } - private async Task UpdateRecordAsync(FBMClient client, ILogProvider log, string objectId, string newId, CancellationToken cancellation) + private async Task UpdateRecordAsync(FBMClient client, FBMRequest modRequest, ILogProvider log, string objectId, string newId, CancellationToken cancellation) { - //Get request message - FBMRequest modRequest = client.RentRequest(); - try - { - //Set action as get/create - modRequest.WriteHeader(HeaderCommand.Action, Actions.Get); - //Set session-id header - modRequest.WriteHeader(ObjectId, string.IsNullOrWhiteSpace(newId) ? objectId : newId); + //Set action as get/create + modRequest.WriteHeader(HeaderCommand.Action, Actions.Get); + //Set session-id header + modRequest.WriteHeader(ObjectId, string.IsNullOrWhiteSpace(newId) ? objectId : newId); - //Make request - using FBMResponse response = await client.SendAsync(modRequest, GetItemTimeout, cancellation); + //Make request + using FBMResponse response = await client.SendAsync(modRequest, GetItemTimeout, cancellation); - response.ThrowIfNotSet(); + response.ThrowIfNotSet(); - //Check response code - string status = response.Headers.First(static s => s.Header == HeaderCommand.Status).Value.ToString(); + //Check response code + string status = response.Headers.First(static s => s.Header == HeaderCommand.Status).Value.ToString(); - if (ResponseCodes.Okay.Equals(status, StringComparison.Ordinal)) - { - //Update the record - await CacheStore.AddOrUpdateBlobAsync(objectId, newId, static (t) => t.ResponseBody, response, cancellation); - log.Debug("Updated object {id}", objectId); - } - else - { - log.Warn("Object {id} was missing on the remote server", objectId); - } + if (ResponseCodes.Okay.Equals(status, StringComparison.Ordinal)) + { + //Update the record + await _cacheStore.AddOrUpdateBlobAsync(objectId, newId, static (t) => t.ResponseBody, response, cancellation); + log.Debug("Updated object {id}", objectId); } - finally + else { - client.ReturnRequest(modRequest); + log.Warn("Object {id} was missing on the remote server", objectId); } } } diff --git a/plugins/ObjectCacheServer/src/Distribution/CachePeerMonitor.cs b/plugins/ObjectCacheServer/src/Clustering/CachePeerMonitor.cs index f191c9d..c49a54b 100644 --- a/plugins/ObjectCacheServer/src/Distribution/CachePeerMonitor.cs +++ b/plugins/ObjectCacheServer/src/Clustering/CachePeerMonitor.cs @@ -23,21 +23,38 @@ */ using System.Linq; +using System.Threading; +using System.Threading.Tasks; using System.Collections.Generic; +using VNLib.Utils; +using VNLib.Utils.Extensions; using VNLib.Plugins; -namespace VNLib.Data.Caching.ObjectCache.Server.Distribution +namespace VNLib.Data.Caching.ObjectCache.Server.Clustering { - internal sealed class CachePeerMonitor : IPeerMonitor + internal sealed class CachePeerMonitor : VnDisposeable, IPeerMonitor { private readonly LinkedList<ICachePeer> peers = new(); + private readonly ManualResetEvent newPeerTrigger = new (false); public CachePeerMonitor(PluginBase plugin) { } + /// <summary> + /// Waits for new peers to connect to the server + /// </summary> + /// <returns>A task that complets when a new peer has connected</returns> + public async Task WaitForChangeAsync() + { + await newPeerTrigger.WaitAsync(); + + //Reset the trigger for next call + newPeerTrigger.Reset(); + } + ///<inheritdoc/> public IEnumerable<ICachePeer> GetAllPeers() { @@ -55,6 +72,12 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Distribution { peers.AddLast(peer); } + + //Trigger monitor when change occurs + if(peer.Advertisment != null) + { + newPeerTrigger.Set(); + } } ///<inheritdoc/> @@ -66,5 +89,10 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Distribution peers.Remove(peer); } } + + protected override void Free() + { + newPeerTrigger.Dispose(); + } } } diff --git a/plugins/ObjectCacheServer/src/Distribution/ICachePeerAdapter.cs b/plugins/ObjectCacheServer/src/Clustering/ICachePeerAdapter.cs index c3fb022..dd426f6 100644 --- a/plugins/ObjectCacheServer/src/Distribution/ICachePeerAdapter.cs +++ b/plugins/ObjectCacheServer/src/Clustering/ICachePeerAdapter.cs @@ -22,9 +22,9 @@ * along with this program. If not, see https://www.gnu.org/licenses/. */ -using VNLib.Data.Caching.Extensions; +using VNLib.Data.Caching.Extensions.Clustering; -namespace VNLib.Data.Caching.ObjectCache.Server.Distribution +namespace VNLib.Data.Caching.ObjectCache.Server.Clustering { internal interface ICachePeerAdapter { @@ -32,18 +32,18 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Distribution /// Gets the peers that have been discovered but not yet connected to /// </summary> /// <returns>A collection of peers that have not been connected to yet</returns> - ICacheNodeAdvertisment[] GetNewPeers(); + CacheNodeAdvertisment[] GetNewPeers(); /// <summary> /// Called when a peer has been connected to /// </summary> /// <param name="peer">The peer that has been connected</param> - void OnPeerListenerAttached(ICacheNodeAdvertisment peer); + void OnPeerListenerAttached(CacheNodeAdvertisment peer); /// <summary> /// Called when a peer has been disconnected from /// </summary> /// <param name="peer">The disconnected peer</param> - void OnPeerListenerDetatched(ICacheNodeAdvertisment peer); + void OnPeerListenerDetatched(CacheNodeAdvertisment peer); } } diff --git a/plugins/ObjectCacheServer/src/Distribution/IPeerMonitor.cs b/plugins/ObjectCacheServer/src/Clustering/IPeerMonitor.cs index 028171f..d8f358f 100644 --- a/plugins/ObjectCacheServer/src/Distribution/IPeerMonitor.cs +++ b/plugins/ObjectCacheServer/src/Clustering/IPeerMonitor.cs @@ -24,7 +24,7 @@ using System.Collections.Generic; -namespace VNLib.Data.Caching.ObjectCache.Server.Distribution +namespace VNLib.Data.Caching.ObjectCache.Server.Clustering { /// <summary> /// Represents a monitor for peer cache servers to advertise their presence @@ -33,7 +33,8 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Distribution internal interface IPeerMonitor { /// <summary> - /// Notifies the monitor that a peer has connected to the cluster + /// Notifies the monitor that a remote peer has connected to the current node for + /// replication /// </summary> /// <param name="peer">The peer that connected</param> void OnPeerConnected(ICachePeer peer); diff --git a/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs b/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs new file mode 100644 index 0000000..f132cab --- /dev/null +++ b/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs @@ -0,0 +1,268 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: ObjectCacheServer +* File: PeerDiscoveryManager.cs +* +* PeerDiscoveryManager.cs is part of ObjectCacheServer which is part of the larger +* VNLib collection of libraries and utilities. +* +* ObjectCacheServer is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* ObjectCacheServer is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Linq; +using System.Net.Http; +using System.Threading; +using System.Net.Sockets; +using System.Threading.Tasks; +using System.Collections.Generic; + +using VNLib.Utils.Logging; +using VNLib.Data.Caching.Extensions; +using VNLib.Plugins; +using VNLib.Plugins.Extensions.Loading; +using VNLib.Data.Caching.Extensions.Clustering; + +namespace VNLib.Data.Caching.ObjectCache.Server.Clustering +{ + + /* + * This class is responsible for resolving and discovering peer nodes in the cluster network. + */ + + internal sealed class PeerDiscoveryManager : IAsyncBackgroundWork, ICachePeerAdapter + { + private const string LOG_SCOPE_NAME = "DISC"; + private static readonly TimeSpan InitialDelay = TimeSpan.FromSeconds(15); + + private readonly List<CacheNodeAdvertisment> _connectedPeers; + private readonly NodeConfig _config; + private readonly CachePeerMonitor _monitor; + private readonly bool IsDebug; + private readonly ILogProvider _log; + + public PeerDiscoveryManager(PluginBase plugin) + { + //Get config + _config = plugin.GetOrCreateSingleton<NodeConfig>(); + + //Get the known peers array from config, its allowed to be null for master nodes + IConfigScope? config = plugin.TryGetConfig("known_peers"); + string[] kownPeers = config?.Deserialze<string[]>() ?? Array.Empty<string>(); + + //Add known peers to the monitor + _config.Config.WithInitialPeers(kownPeers.Select(static s => new Uri(s))); + + plugin.Log.Information("Inital peer nodes: {nodes}", kownPeers); + + //Get the peer monitor + _monitor = plugin.GetOrCreateSingleton<CachePeerMonitor>(); + + _connectedPeers = new(); + + //Create scoped logger + _log = plugin.Log.CreateScope(LOG_SCOPE_NAME); + + //Setup discovery error handler + _config.Config.WithErrorHandler(new ErrorHandler(_log)); + + IsDebug = plugin.IsDebug(); + } + + async Task IAsyncBackgroundWork.DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken) + { + /* + * This loop uses the peer monitor to keep track of all connected peers, then gets + * all the advertising peers (including known peers) and resolves all nodes across + * the network. + */ + + //Start the change listener + Task watcher = WatchForPeersAsync(exitToken); + + _log.Information("Node discovery worker started, waiting for {idel} to start initial discovery", InitialDelay); + + try + { + //Wait for the initial delay + await Task.Delay(InitialDelay, exitToken); + + _log.Debug("Begining discovery loop"); + + /* + * To avoid connecting to ourself, we add ourselves to the connected list + * and it should never get removed. This is because the monitor will never + * report our own advertisment. + */ + _connectedPeers.Add(_config.Config.Advertisment); + + while (true) + { + try + { + if (IsDebug) + { + _log.Debug("Begining node discovery"); + } + + //Resolve all known peers + CacheNodeAdvertisment[] wellKnown = await _config.Config.ResolveWellKnownAsync(exitToken); + + //Use the monitor to get the initial peers + IEnumerable<CacheNodeAdvertisment> ads = GetMonitorAds(); + + //Combine well-known with new connected peers + CacheNodeAdvertisment[] allAds = ads.Union(wellKnown).ToArray(); + + if (allAds.Length > 0) + { + //Discover all kown nodes + await _config.Config.DiscoverNodesAsync(allAds, exitToken); + } + + //Log the discovered nodes if verbose logging is enabled + if (IsDebug) + { + CacheNodeAdvertisment[] found = _config.Config.NodeCollection.GetAllNodes(); + + _log.Debug("Discovered {count} nodes\n\t{nodes}", found.Length, found.Select(static s => s.NodeId)); + } + } + catch(OperationCanceledException) + { + throw; + } + catch (Exception ex) + { + _log.Error(ex, "Failed to discover new peer nodes"); + } + + //Delay the next discovery + await Task.Delay(_config.DiscoveryInterval, exitToken); + } + } + catch (OperationCanceledException) + { + //Normal exit + _log.Information("Node discovery worker exiting"); + } + finally + { + + } + + //Wait for the watcher to exit + await watcher.ConfigureAwait(false); + } + + private IEnumerable<CacheNodeAdvertisment> GetMonitorAds() + { + return _monitor.GetAllPeers() + .Where(static p => p.Advertisment != null) + //Without us + .Where(n => n.NodeId != _config.Config.NodeId) + .Select(static p => p.Advertisment!); + } + + //Wait for new peers and update the collection + private async Task WatchForPeersAsync(CancellationToken cancellation) + { + try + { + _log.Debug("Discovery worker waiting for new peers to connect"); + + while (true) + { + + //Wait for changes, then get new peers + await _monitor.WaitForChangeAsync().WaitAsync(cancellation); + + _log.Verbose("New peers connected"); + + //Use the monitor to get the initial peers + IEnumerable<CacheNodeAdvertisment> ads = GetMonitorAds(); + + ((NodeDiscoveryCollection)_config.Config.NodeCollection).AddManualNodes(ads); + } + } + catch (OperationCanceledException) + { + //Normal ext + _log.Debug("Connected peer listener exited"); + } + } + + + ///<inheritdoc/> + public CacheNodeAdvertisment[] GetNewPeers() + { + lock (_connectedPeers) + { + //Get all discovered peers + CacheNodeAdvertisment[] peers = _config.Config.NodeCollection.GetAllNodes(); + + //Get the difference between the discovered peers and the connected peers + return peers.Except(_connectedPeers).ToArray(); + } + } + + ///<inheritdoc/> + public void OnPeerListenerAttached(CacheNodeAdvertisment peer) + { + lock (_connectedPeers) + { + //Add to connected peers + _connectedPeers.Add(peer); + } + } + + ///<inheritdoc/> + public void OnPeerListenerDetatched(CacheNodeAdvertisment peer) + { + //remove from connected peers + lock (_connectedPeers) + { + _connectedPeers.Remove(peer); + } + } + + + private sealed record class ErrorHandler(ILogProvider Logger) : ICacheDiscoveryErrorHandler + { + public void OnDiscoveryError(CacheNodeAdvertisment errorNode, Exception ex) + { + + if (ex is HttpRequestException hre) + { + if (hre.InnerException is SocketException se) + { + //traisnport failed + Logger.Warn("Failed to connect to server {serv} because {err}", errorNode, se.Message); + } + else + { + Logger.Error("Failed to connect to node {n}\n{err}", errorNode, hre); + } + } + else + { + Logger.Error("Failed to discover nodes from nodeid {nid}, with error\n{err}", errorNode, ex); + } + + } + } + } +} diff --git a/plugins/ObjectCacheServer/src/Distribution/KnownPeerList.cs b/plugins/ObjectCacheServer/src/Distribution/KnownPeerList.cs deleted file mode 100644 index 74df81f..0000000 --- a/plugins/ObjectCacheServer/src/Distribution/KnownPeerList.cs +++ /dev/null @@ -1,100 +0,0 @@ -/* -* Copyright (c) 2023 Vaughn Nugent -* -* Library: VNLib -* Package: ObjectCacheServer -* File: KnownPeerList.cs -* -* KnownPeerList.cs is part of ObjectCacheServer which is part of the larger -* VNLib collection of libraries and utilities. -* -* ObjectCacheServer is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License as -* published by the Free Software Foundation, either version 3 of the -* License, or (at your option) any later version. -* -* ObjectCacheServer is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see https://www.gnu.org/licenses/. -*/ - -using System; -using System.Linq; -using System.Collections.Generic; -using System.Text.Json.Serialization; - -using VNLib.Plugins; -using VNLib.Data.Caching.Extensions; -using VNLib.Plugins.Extensions.Loading; - - -namespace VNLib.Data.Caching.ObjectCache.Server.Distribution -{ - [ConfigurationName("known_peers")] - internal sealed class KnownPeerList - { - private readonly List<KnownPeer> _peers; - - public KnownPeerList(PluginBase plugin, IConfigScope config) - { - //Deserialze the known peers into an array - KnownPeer[] peers = config.Deserialze<KnownPeer[]>(); - - foreach (KnownPeer peer in peers) - { - //Validate the peer - peer.Validate(); - } - - _peers = peers?.ToList() ?? new(); - } - - public IEnumerable<ICacheNodeAdvertisment> GetPeers() - { - return _peers; - } - - private sealed class KnownPeer : ICacheNodeAdvertisment - { - public Uri? ConnectEndpoint { get; set; } - public Uri? DiscoveryEndpoint { get; set; } - - [JsonPropertyName("node_id")] - public string NodeId { get; set; } - - [JsonPropertyName("connect_url")] - public string? ConnectEpPath - { - get => ConnectEndpoint?.ToString() ?? string.Empty; - set => ConnectEndpoint = new Uri(value ?? string.Empty); - } - - [JsonPropertyName("discovery_url")] - public string? DiscoveryEpPath - { - get => DiscoveryEndpoint?.ToString() ?? string.Empty; - set => DiscoveryEndpoint = new Uri(value ?? string.Empty); - } - - public void Validate() - { - if (string.IsNullOrWhiteSpace(NodeId)) - { - throw new ArgumentException("Node ID cannot be null or whitespace"); - } - if (ConnectEndpoint is null) - { - throw new ArgumentException("Connect endpoint cannot be null"); - } - if (DiscoveryEndpoint is null) - { - throw new ArgumentException("Discovery endpoint cannot be null"); - } - } - } - } -} diff --git a/plugins/ObjectCacheServer/src/Distribution/PeerDiscoveryManager.cs b/plugins/ObjectCacheServer/src/Distribution/PeerDiscoveryManager.cs deleted file mode 100644 index 26ec565..0000000 --- a/plugins/ObjectCacheServer/src/Distribution/PeerDiscoveryManager.cs +++ /dev/null @@ -1,161 +0,0 @@ -/* -* Copyright (c) 2023 Vaughn Nugent -* -* Library: VNLib -* Package: ObjectCacheServer -* File: PeerDiscoveryManager.cs -* -* PeerDiscoveryManager.cs is part of ObjectCacheServer which is part of the larger -* VNLib collection of libraries and utilities. -* -* ObjectCacheServer is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License as -* published by the Free Software Foundation, either version 3 of the -* License, or (at your option) any later version. -* -* ObjectCacheServer is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see https://www.gnu.org/licenses/. -*/ - -using System; -using System.Linq; -using System.Threading; -using System.Threading.Tasks; -using System.Collections.Generic; - -using VNLib.Plugins; -using VNLib.Utils.Logging; -using VNLib.Data.Caching.Extensions; -using VNLib.Plugins.Extensions.Loading; - - -namespace VNLib.Data.Caching.ObjectCache.Server.Distribution -{ - - sealed class PeerDiscoveryManager : IAsyncBackgroundWork, ICachePeerAdapter - { - private readonly List<ICacheNodeAdvertisment> _connectedPeers; - private readonly NodeConfig _config; - private readonly IPeerMonitor _monitor; - private readonly KnownPeerList _knownPeers; - - public PeerDiscoveryManager(PluginBase plugin) - { - //Get config - _config = plugin.GetOrCreateSingleton<NodeConfig>(); - - //Get the peer monitor - _monitor = plugin.GetOrCreateSingleton<CachePeerMonitor>(); - - //Get the known peer list - _knownPeers = plugin.GetOrCreateSingleton<KnownPeerList>(); - - _connectedPeers = new(); - - //Setup discovery error handler - _config.Config.WithErrorHandler(new ErrorHandler(plugin.Log)); - } - - async Task IAsyncBackgroundWork.DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken) - { - /* - * This loop uses the peer monitor to keep track of all connected peers, then gets - * all the advertising peers (including known peers) and resolves all nodes across - * the network. - */ - - pluginLog.Information("Node discovery worker started"); - - try - { - while (true) - { - try - { - //Use the monitor to get the initial peers - IEnumerable<ICacheNodeAdvertisment> ads = _monitor.GetAllPeers() - .Where(static p => p.Advertisment != null) - .Select(static p => p.Advertisment!); - - //Add known peers to the initial list - ads = ads.Union(_knownPeers.GetPeers()); - - //Set initial peers - _config.Config.WithInitialPeers(ads); - - //Discover all nodes - await _config.Config.DiscoverNodesAsync(exitToken); - } - catch(OperationCanceledException) - { - throw; - } - catch (Exception ex) - { - pluginLog.Error(ex, "Failed to discover new peer nodes"); - } - - //Delay the next discovery - await Task.Delay(_config.DiscoveryInterval, exitToken); - } - } - catch (OperationCanceledException) - { - //Normal exit - pluginLog.Information("Node discovery worker exiting"); - } - finally - { - - } - } - - - ///<inheritdoc/> - public ICacheNodeAdvertisment[] GetNewPeers() - { - lock (_connectedPeers) - { - //Get all discovered peers - ICacheNodeAdvertisment[] peers = _config.Config.NodeCollection.GetAllNodes(); - - //Get the difference between the discovered peers and the connected peers - return peers.Except(_connectedPeers).ToArray(); - } - } - - ///<inheritdoc/> - public void OnPeerListenerAttached(ICacheNodeAdvertisment peer) - { - lock (_connectedPeers) - { - //Add to connected peers - _connectedPeers.Add(peer); - } - } - - ///<inheritdoc/> - public void OnPeerListenerDetatched(ICacheNodeAdvertisment peer) - { - //remove from connected peers - lock (_connectedPeers) - { - _connectedPeers.Remove(peer); - } - } - - - private sealed record class ErrorHandler(ILogProvider Logger) : ICacheDiscoveryErrorHandler - { - public void OnDiscoveryError(ICacheNodeAdvertisment errorNode, Exception ex) - { - Logger.Error("Failed to discover nodes from nodeid {nid}, with error\n{err}", errorNode, ex); - } - } - } -} diff --git a/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs b/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs index 8352635..5e794f8 100644 --- a/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs +++ b/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs @@ -46,7 +46,7 @@ using VNLib.Plugins.Essentials.Endpoints; using VNLib.Plugins.Essentials.Extensions; using VNLib.Plugins.Extensions.Loading.Routing; using VNLib.Data.Caching.ObjectCache.Server.Distribution; - +using VNLib.Data.Caching.Extensions.Clustering; namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints { @@ -54,6 +54,8 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints [ConfigurationName("connect_endpoint")] internal sealed class ConnectEndpoint : ResourceEndpointBase { + private const string LOG_SCOPE_NAME = "CONEP"; + private static readonly TimeSpan AuthTokenExpiration = TimeSpan.FromSeconds(30); @@ -90,7 +92,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints { string? path = config["path"].GetString(); - InitPathAndLog(path, plugin.Log); + InitPathAndLog(path, plugin.Log.CreateScope(LOG_SCOPE_NAME)); //Check for ip-verification flag VerifyIp = config.TryGetValue("verify_ip", out JsonElement vIpEl) && vIpEl.GetBoolean(); @@ -152,28 +154,24 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints // Parse jwt using (JsonWebToken jwt = JsonWebToken.Parse(jwtAuth)) { - bool verified = false; - //verify signature for client - if (NodeConfiguration.KeyStore.VerifyJwt(jwt)) + if (NodeConfiguration.KeyStore.VerifyJwt(jwt, false)) { - verified = true; + //Validated } //May be signed by a cache server - else + else if(NodeConfiguration.KeyStore.VerifyJwt(jwt, true)) { //Set peer and verified flag since the another cache server signed the request - isPeer = verified = NodeConfiguration.KeyStore.VerifyCachePeer(jwt); + isPeer = true; } - - //Check flag - if (!verified) + else { Log.Information("Client signature verification failed"); entity.CloseResponse(HttpStatusCode.Unauthorized); return VfReturnType.VirtualSkip; } - + //Recover json body using JsonDocument doc = jwt.GetPayload(); @@ -196,6 +194,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints auth.WriteHeader(NodeConfiguration.KeyStore.GetJwtHeader()); auth.InitPayloadClaim() .AddClaim("aud", AudienceLocalServerId) + .AddClaim("iat", entity.RequestedTimeUtc.ToUnixTimeSeconds()) .AddClaim("exp", entity.RequestedTimeUtc.Add(AuthTokenExpiration).ToUnixTimeSeconds()) .AddClaim("nonce", RandomHash.GetRandomBase32(8)) .AddClaim("chl", challenge!) @@ -240,7 +239,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints } string? nodeId = null; - ICacheNodeAdvertisment? discoveryAd = null; + CacheNodeAdvertisment? discoveryAd = null; //Parse jwt using (JsonWebToken jwt = JsonWebToken.Parse(jwtAuth)) @@ -299,15 +298,15 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints //Verify the signature the client included of the auth token - if (isPeer) + //Verify token signature against a fellow cache public key + if (!NodeConfiguration.KeyStore.VerifyUpgradeToken(clientSignature, jwtAuth, isPeer)) { - //Verify token signature against a fellow cache public key - if (!NodeConfiguration.KeyStore.VerifyUpgradeToken(clientSignature, jwtAuth)) - { - entity.CloseResponse(HttpStatusCode.Unauthorized); - return VfReturnType.VirtualSkip; - } + entity.CloseResponse(HttpStatusCode.Unauthorized); + return VfReturnType.VirtualSkip; + } + if (isPeer) + { //Try to get the node advertisement header string? discoveryHeader = entity.Server.Headers[FBMDataCacheExtensions.X_NODE_DISCOVERY_HEADER]; @@ -317,15 +316,6 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints discoveryAd = NodeConfiguration.KeyStore.VerifyPeerAdvertisment(discoveryHeader); } } - else - { - //Not a peer, so verify against the client's public key - if (!NodeConfiguration.KeyStore.VerifyUpgradeToken(clientSignature, jwtAuth)) - { - entity.CloseResponse(HttpStatusCode.Unauthorized); - return VfReturnType.VirtualSkip; - } - } } try @@ -389,12 +379,12 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints //Notify peers of new connection Peers.OnPeerConnected(state); - //Inc connected count - Interlocked.Increment(ref _connectedClients); - //Register plugin exit token to cancel the connected socket CancellationTokenRegistration reg = this.GetPlugin().UnloadToken.Register(wss.CancelAll); + //Inc connected count + Interlocked.Increment(ref _connectedClients); + try { //Init listener args from request @@ -442,14 +432,12 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints { Log.Debug(ex); } - finally - { - //Dec connected count - Interlocked.Decrement(ref _connectedClients); - //Unregister the - reg.Unregister(); - } + //Dec connected count + Interlocked.Decrement(ref _connectedClients); + + //Unregister the token + reg.Unregister(); //Notify monitor of disconnect Peers.OnPeerDisconnected(state); @@ -465,12 +453,13 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints public int MaxMessageSize { get; init; } public int MaxResponseBufferSize { get; init; } public string? NodeId { get; init; } - public ICacheNodeAdvertisment? Advertisment { get; init; } + public CacheNodeAdvertisment? Advertisment { get; init; } public override string ToString() { return - $"{nameof(RecvBufferSize)}:{RecvBufferSize}, {nameof(MaxHeaderBufferSize)}: {MaxHeaderBufferSize}, {nameof(MaxMessageSize)}:{MaxMessageSize}, {nameof(MaxResponseBufferSize)}:{MaxResponseBufferSize}"; + $"{nameof(RecvBufferSize)}:{RecvBufferSize}, {nameof(MaxHeaderBufferSize)}: {MaxHeaderBufferSize}, " + + $"{nameof(MaxMessageSize)}:{MaxMessageSize}, {nameof(MaxResponseBufferSize)}:{MaxResponseBufferSize}"; } } } diff --git a/plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs b/plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs index 90ffca0..77d59dd 100644 --- a/plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs +++ b/plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs @@ -26,7 +26,6 @@ using System; using System.Net; using System.Linq; using System.Text.Json; -using System.Threading.Tasks; using VNLib.Hashing; using VNLib.Hashing.IdentityUtility; @@ -35,17 +34,25 @@ using VNLib.Plugins.Essentials; using VNLib.Plugins.Essentials.Endpoints; using VNLib.Plugins.Essentials.Extensions; using VNLib.Plugins.Extensions.Loading; -using VNLib.Data.Caching.Extensions; using VNLib.Data.Caching.ObjectCache.Server.Distribution; +using VNLib.Data.Caching.Extensions.Clustering; namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints { [ConfigurationName("discovery_endpoint")] - internal sealed class PeerDiscoveryEndpoint : UnprotectedWebEndpoint + internal sealed class PeerDiscoveryEndpoint : ResourceEndpointBase { private readonly IPeerMonitor PeerMonitor; private readonly NodeConfig Config; + //Loosen up protection settings + ///<inheritdoc/> + protected override ProtectionSettings EndpointProtectionSettings { get; } = new() + { + DisableBrowsersOnly = true, + DisableSessionsRequired = true + }; + public PeerDiscoveryEndpoint(PluginBase plugin, IConfigScope config) { string? path = config["path"].GetString(); @@ -59,7 +66,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints Config = plugin.GetOrCreateSingleton<NodeConfig>(); } - protected override async ValueTask<VfReturnType> GetAsync(HttpEntity entity) + protected override VfReturnType Get(HttpEntity entity) { //Get auth token string? authToken = entity.Server.Headers[HttpRequestHeader.Authorization]; @@ -71,17 +78,18 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints } string subject = string.Empty; + string challenge = string.Empty; //Parse auth token using(JsonWebToken jwt = JsonWebToken.Parse(authToken)) { //try to verify against cache node first - if (!Config.KeyStore.VerifyCachePeer(jwt)) + if (!Config.KeyStore.VerifyJwt(jwt, true)) { //failed... //try to verify against client key - if (!Config.KeyStore.VerifyJwt(jwt)) + if (!Config.KeyStore.VerifyJwt(jwt, false)) { //invalid token entity.CloseResponse(HttpStatusCode.Unauthorized); @@ -90,12 +98,14 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints } using JsonDocument payload = jwt.GetPayload(); - + + //Get client info to pass back subject = payload.RootElement.GetProperty("sub").GetString() ?? string.Empty; + challenge = payload.RootElement.GetProperty("chl").GetString() ?? string.Empty; } //Valid key, get peer list to send to client - ICacheNodeAdvertisment[] peers = PeerMonitor.GetAllPeers() + CacheNodeAdvertisment[] peers = PeerMonitor.GetAllPeers() .Where(static p => p.Advertisment != null) .Select(static p => p.Advertisment!) .ToArray(); @@ -113,7 +123,8 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints .AddClaim("iat", entity.RequestedTimeUtc.ToUnixTimeSeconds()) //Send all peers as a json array .AddClaim("peers", peers) - .AddClaim("nonce", RandomHash.GetRandomBase32(24)) + //Send the challenge back + .AddClaim("chl", challenge) .CommitClaims(); //Sign the response diff --git a/plugins/ObjectCacheServer/src/Endpoints/WellKnownEndpoint.cs b/plugins/ObjectCacheServer/src/Endpoints/WellKnownEndpoint.cs new file mode 100644 index 0000000..99c7f19 --- /dev/null +++ b/plugins/ObjectCacheServer/src/Endpoints/WellKnownEndpoint.cs @@ -0,0 +1,111 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: ObjectCacheServer +* File: WellKnownEndpoint.cs +* +* WellKnownEndpoint.cs is part of ObjectCacheServer which is part of the larger +* VNLib collection of libraries and utilities. +* +* ObjectCacheServer is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* ObjectCacheServer is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Net; +using System.Text.Json; + +using VNLib.Data.Caching.Extensions; +using VNLib.Data.Caching.Extensions.Clustering; +using VNLib.Hashing; +using VNLib.Hashing.IdentityUtility; +using VNLib.Plugins; +using VNLib.Plugins.Essentials; +using VNLib.Plugins.Essentials.Endpoints; +using VNLib.Plugins.Essentials.Extensions; +using VNLib.Plugins.Extensions.Loading; + +namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints +{ + + /* + * The well-known endpoint is used to advertise the node's existence to + * the network. Clients need to know the endpoint layout to be able to + * connect and discover other nodes. + */ + + [ConfigurationName("well_known", Required = false)] + internal sealed class WellKnownEndpoint : ResourceEndpointBase + { + //Default path for the well known endpoint + const string DefaultPath = "/.well-known/vncache"; + + //Store serialized advertisment + private readonly CacheNodeAdvertisment _advertisment; + private readonly ICacheAuthManager _keyStore; + + //Loosen up security requirements + protected override ProtectionSettings EndpointProtectionSettings { get; } = new() + { + DisableBrowsersOnly = true, + DisableSessionsRequired = true, + }; + + public WellKnownEndpoint(PluginBase plugin):this(plugin, null) + { } + + public WellKnownEndpoint(PluginBase plugin, IConfigScope? config) + { + //Get the node config + NodeConfig nodeConfig = plugin.GetOrCreateSingleton<NodeConfig>(); + + //serialize the config, discovery may not be enabled + _advertisment = nodeConfig.Config.Advertisment; + _keyStore = nodeConfig.KeyStore; + + //Default to the well known path + string path = DefaultPath; + + //See if the user configured a path + if(config != null && config.TryGetValue("path", out JsonElement pathEl)) + { + path = pathEl.GetString() ?? DefaultPath; + } + + InitPathAndLog(path, plugin.Log); + } + + protected override VfReturnType Get(HttpEntity entity) + { + string entropy = RandomHash.GetRandomBase32(16); + + //Create jwt signed for the client + using JsonWebToken jwt = new(); + + jwt.WriteHeader(_keyStore.GetJwtHeader()); + //Write the advertisment as the message body + jwt.InitPayloadClaim() + .AddClaim("sub", _advertisment) + .AddClaim("chl", entropy) + .AddClaim("iat", entity.RequestedTimeUtc.ToUnixTimeSeconds()) + .CommitClaims(); + + _keyStore.SignJwt(jwt); + + //Return the advertisment jwt + entity.CloseResponse(HttpStatusCode.OK, Net.Http.ContentType.Text, jwt.DataBuffer); + return VfReturnType.VirtualSkip; + } + } +} diff --git a/plugins/ObjectCacheServer/src/ICachePeer.cs b/plugins/ObjectCacheServer/src/ICachePeer.cs index 97b406f..897655a 100644 --- a/plugins/ObjectCacheServer/src/ICachePeer.cs +++ b/plugins/ObjectCacheServer/src/ICachePeer.cs @@ -22,7 +22,7 @@ * along with this program. If not, see https://www.gnu.org/licenses/. */ -using VNLib.Data.Caching.Extensions; +using VNLib.Data.Caching.Extensions.Clustering; namespace VNLib.Data.Caching.ObjectCache.Server { @@ -39,6 +39,6 @@ namespace VNLib.Data.Caching.ObjectCache.Server /// <summary> /// An optional signed advertisment message for other peers /// </summary> - ICacheNodeAdvertisment? Advertisment { get; } + CacheNodeAdvertisment? Advertisment { get; } } } diff --git a/plugins/ObjectCacheServer/src/NodeConfig.cs b/plugins/ObjectCacheServer/src/NodeConfig.cs index 81b8a32..a6c5be9 100644 --- a/plugins/ObjectCacheServer/src/NodeConfig.cs +++ b/plugins/ObjectCacheServer/src/NodeConfig.cs @@ -26,36 +26,43 @@ using System; using System.Net; using System.Linq; using System.Text.Json; -using System.Threading.Tasks; using VNLib.Plugins; -using VNLib.Utils; using VNLib.Utils.Logging; -using VNLib.Data.Caching.Extensions; +using VNLib.Utils.Extensions; using VNLib.Plugins.Extensions.Loading; using VNLib.Data.Caching.ObjectCache.Server.Endpoints; +using VNLib.Data.Caching.Extensions.Clustering; namespace VNLib.Data.Caching.ObjectCache.Server { [ConfigurationName("cluster")] - internal sealed class NodeConfig : VnDisposeable + internal sealed class NodeConfig { const string CacheConfigTemplate = @" - Cluster Configuration: - Node Id: {id} - TlsEndabled: {tls}, - Cache Endpoint: {ep} +Cluster Configuration: + Node Id: {id} + TlsEndabled: {tls} + Cache Endpoint: {ep} + Discovery Endpoint: {dep} + Discovery Interval: {di} + Max Peers: {mpc} "; public CacheNodeConfiguration Config { get; } public CacheAuthKeyStore KeyStore { get; } + public TimeSpan DiscoveryInterval { get; } + + /// <summary> + /// The maximum number of peer connections to allow + /// </summary> + public uint MaxPeerConnections { get; } = 10; + public NodeConfig(PluginBase plugin, IConfigScope config) - { - //Server id is just dns name for now - string nodeId = Dns.GetHostName(); + { Config = new(); @@ -75,9 +82,13 @@ namespace VNLib.Data.Caching.ObjectCache.Server //If the ssl element is present, ssl is enabled for the server usingTls = firstHost.TryGetProperty("ssl", out _); } + string hostname = Dns.GetHostName(); + + //Server id is just dns name for now + string nodeId = $"{hostname}:{port}"; //The endpoint to advertise to cache clients that allows cache connections - Uri cacheEndpoint = GetEndpointUri<ConnectEndpoint>(plugin, usingTls, port, nodeId); + Uri cacheEndpoint = GetEndpointUri<ConnectEndpoint>(plugin, usingTls, port, hostname); //Init key store KeyStore = new(plugin); @@ -89,21 +100,32 @@ namespace VNLib.Data.Caching.ObjectCache.Server .WithTls(usingTls); //Check if advertising is enabled - if(config.TryGetValue("advertise", out JsonElement adEl) && adEl.GetBoolean()) + if(plugin.HasConfigForType<PeerDiscoveryEndpoint>()) { //Get the the broadcast endpoint - Uri discoveryEndpoint = GetEndpointUri<PeerDiscoveryEndpoint>(plugin, usingTls, port, nodeId); + Uri discoveryEndpoint = GetEndpointUri<PeerDiscoveryEndpoint>(plugin, usingTls, port, hostname); //Enable advertising Config.EnableAdvertisment(discoveryEndpoint); } - - + + + DiscoveryInterval = config["discovery_interval_sec"].GetTimeSpan(TimeParseType.Seconds); + + //Get the max peer connections + if(config.TryGetValue("max_peers", out JsonElement maxPeerEl)) + { + MaxPeerConnections = maxPeerEl.GetUInt32(); + } + //log the config plugin.Log.Information(CacheConfigTemplate, nodeId, usingTls, - cacheEndpoint + cacheEndpoint, + Config.DiscoveryEndpoint, + DiscoveryInterval, + MaxPeerConnections ); } @@ -115,12 +137,5 @@ namespace VNLib.Data.Caching.ObjectCache.Server //The endpoint to advertise to cache clients that allows cache connections return new UriBuilder(usingTls ? Uri.UriSchemeHttps : Uri.UriSchemeHttp, hostName, port, cacheEpConfig["path"].GetString()).Uri; } - - - protected override void Free() - { - //cleanup keys - - } } } diff --git a/plugins/ObjectCacheServer/src/ObjectCacheServer.csproj b/plugins/ObjectCacheServer/src/ObjectCacheServer.csproj index e8d6291..5273b64 100644 --- a/plugins/ObjectCacheServer/src/ObjectCacheServer.csproj +++ b/plugins/ObjectCacheServer/src/ObjectCacheServer.csproj @@ -41,4 +41,9 @@ <ProjectReference Include="..\..\..\lib\VNLib.Data.Caching.Extensions\src\VNLib.Data.Caching.Extensions.csproj" /> <ProjectReference Include="..\..\..\lib\VNLib.Data.Caching.ObjectCache\src\VNLib.Data.Caching.ObjectCache.csproj" /> </ItemGroup> + + <Target Condition="'$(BuildingInsideVisualStudio)' == true" Name="PostBuild" AfterTargets="PostBuildEvent"> + <Exec Command="start xcopy "$(TargetDir)" "F:\downloads\cache-test\plugins\$(TargetName)" /E /Y /R" /> + </Target> + </Project> diff --git a/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs b/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs index 5d9a50b..1ddf49b 100644 --- a/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs +++ b/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs @@ -33,7 +33,7 @@ using VNLib.Utils.Memory.Diagnostics; using VNLib.Plugins.Extensions.Loading; using VNLib.Plugins.Extensions.Loading.Routing; using VNLib.Data.Caching.ObjectCache.Server.Endpoints; - +using VNLib.Data.Caching.ObjectCache.Server.Distribution; namespace VNLib.Data.Caching.ObjectCache.Server { @@ -76,9 +76,15 @@ namespace VNLib.Data.Caching.ObjectCache.Server { try { + //Route well-known endpoint + this.Route<WellKnownEndpoint>(); + //Init connect endpoint this.Route<ConnectEndpoint>(); + //We must initialize the replication manager + _ = this.GetOrCreateSingleton<CacheNodeReplicationMaanger>(); + //Setup discovery endpoint if(this.HasConfigForType<PeerDiscoveryEndpoint>()) { |