diff options
37 files changed, 1149 insertions, 1127 deletions
@@ -1,9 +1,11 @@ # DataCaching *A collection of data-caching libraries and server plugins for cache clients, servers, and clustering* -#### Builds -Debug build w/ symbols & xml docs, release builds, NuGet packages, and individually packaged source code are available on my [website](https://www.vaughnnugent.com/resources/software). All tar-gzip (.tgz) files will have an associated .sha384 appended checksum of the desired download file. +## Docs +You can find the documentation for this module and all child extension projects on my website [here](https://www.vaughnnugent.com/resources/software/articles?tags=docs,_VNLib.Data.Caching). +## Builds +Debug build w/ symbols & xml docs, release builds, NuGet packages, and individually packaged source code are available on my [website](https://www.vaughnnugent.com/resources/software/modules/VNLib.Data.Caching). All tar-gzip (.tgz) files will have an associated .sha384 appended checksum of the desired download file. ## Licensing Projects contained in this repository are individually licensed, either GNU GPL V2+ or GNU GPL GPL Aferro V3+. Builds contain the required license txt files in the archive.
\ No newline at end of file diff --git a/lib/VNLib.Data.Caching.Extensions/src/ActiveServer.cs b/lib/VNLib.Data.Caching.Extensions/src/ActiveServer.cs index 2d02491..3020376 100644 --- a/lib/VNLib.Data.Caching.Extensions/src/ActiveServer.cs +++ b/lib/VNLib.Data.Caching.Extensions/src/ActiveServer.cs @@ -27,7 +27,7 @@ using System.Text.Json.Serialization; namespace VNLib.Data.Caching.Extensions { - public class ActiveServer : ICachePeerAdvertisment + public class ActiveServer : ICacheNodeAdvertisment { [JsonPropertyName("address")] public string? HostName { get; set; } diff --git a/lib/VNLib.Data.Caching.Extensions/src/BrokerRegistrationRequest.cs b/lib/VNLib.Data.Caching.Extensions/src/BrokerRegistrationRequest.cs deleted file mode 100644 index 9ec559a..0000000 --- a/lib/VNLib.Data.Caching.Extensions/src/BrokerRegistrationRequest.cs +++ /dev/null @@ -1,116 +0,0 @@ -/* -* Copyright (c) 2023 Vaughn Nugent -* -* Library: VNLib -* Package: VNLib.Data.Caching.Extensions -* File: BrokerRegistrationRequest.cs -* -* BrokerRegistrationRequest.cs is part of VNLib.Data.Caching.Extensions which is part of the larger -* VNLib collection of libraries and utilities. -* -* VNLib.Data.Caching.Extensions is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License as -* published by the Free Software Foundation, either version 3 of the -* License, or (at your option) any later version. -* -* VNLib.Data.Caching.Extensions is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see https://www.gnu.org/licenses/. -*/ - -using System; -using System.Collections.Generic; - -using VNLib.Utils; -using VNLib.Hashing.IdentityUtility; - - -namespace VNLib.Data.Caching.Extensions -{ - /// <summary> - /// A broker registration request message in a fluent api - /// format. This message may be disposed when no longer in use - /// </summary> - public sealed class BrokerRegistrationRequest : VnDisposeable - { - private bool ownsKey; - private ReadOnlyJsonWebKey? SigningKey; - - /// <summary> - /// The cache server node id - /// </summary> - public string? NodeId { get; private set; } - /// <summary> - /// The broker server's address - /// </summary> - public Uri? BrokerAddress { get; private set; } - /// <summary> - /// The security token used by the broker server to - /// authenticate during heartbeat connections - /// </summary> - public string? HeartbeatToken { get; private set; } - /// <summary> - /// The address for remote clients to use to - /// connect to this server - /// </summary> - public string? RegistrationAddress { get; private set; } - - /// <summary> - /// Recovers the private key from the supplied certificate - /// </summary> - /// <param name="jwk">The private key used to sign messages</param> - /// <param name="ownsKey">A value that indicates if the current instance owns the key</param> - /// <returns></returns> - /// <exception cref="ArgumentException"></exception> - public BrokerRegistrationRequest WithSigningKey(ReadOnlyJsonWebKey jwk, bool ownsKey) - { - this.ownsKey = ownsKey; - SigningKey = jwk ?? throw new ArgumentNullException(nameof(jwk)); - return this; - } - - public BrokerRegistrationRequest WithBroker(Uri brokerUri) - { - BrokerAddress = brokerUri; - return this; - } - - public BrokerRegistrationRequest WithRegistrationAddress(string address) - { - RegistrationAddress = address; - return this; - } - - public BrokerRegistrationRequest WithHeartbeatToken(string token) - { - HeartbeatToken = token; - return this; - } - - public BrokerRegistrationRequest WithNodeId(string nodeId) - { - NodeId = nodeId; - return this; - } - - internal void SignJwt(JsonWebToken jwt) - { - jwt.SignFromJwk(SigningKey); - } - - internal IReadOnlyDictionary<string, string?> JsonHeader => SigningKey!.JwtHeader; - - ///<inheritdoc/> - protected override void Free() - { - if (ownsKey) - { - SigningKey?.Dispose(); - } - } - } -} diff --git a/lib/VNLib.Data.Caching.Extensions/src/CacheClientConfiguration.cs b/lib/VNLib.Data.Caching.Extensions/src/CacheClientConfiguration.cs index 05e4928..9229c89 100644 --- a/lib/VNLib.Data.Caching.Extensions/src/CacheClientConfiguration.cs +++ b/lib/VNLib.Data.Caching.Extensions/src/CacheClientConfiguration.cs @@ -3,9 +3,9 @@ * * Library: VNLib * Package: VNLib.Data.Caching.Extensions -* File: ClientCacheConfiguration.cs +* File: CacheClientConfiguration.cs * -* ClientCacheConfiguration.cs is part of VNLib.Data.Caching.Extensions which is part of the larger +* CacheClientConfiguration.cs is part of VNLib.Data.Caching.Extensions which is part of the larger * VNLib collection of libraries and utilities. * * VNLib.Data.Caching.Extensions is free software: you can redistribute it and/or modify @@ -22,69 +22,47 @@ * along with this program. If not, see https://www.gnu.org/licenses/. */ -using System; +using System.Linq; using System.Collections.Generic; -using System.Security.Cryptography; - -using VNLib.Hashing; -using VNLib.Hashing.IdentityUtility; namespace VNLib.Data.Caching.Extensions { - public interface ICacheJwtManager - { - IReadOnlyDictionary<string, string?> GetJwtHeader(); - - void SignJwt(JsonWebToken jwt); - - bool VerifyCache(JsonWebToken jwt); - - bool VerifyBroker(JsonWebToken jwt); - } - /// <summary> /// A fluent api configuration object for configuring a <see cref="FBMClient"/> /// to connect to cache servers. /// </summary> - public class CacheClientConfiguration : ICacheJwtManager, ICacheListServerRequest + public class CacheClientConfiguration { - public ReadOnlyJsonWebKey? SigningKey { get; private set; } - public ReadOnlyJsonWebKey? VerificationKey { get; private set; } - public ReadOnlyJsonWebKey? BrokerVerificationKey { get; private set; } - - public Uri? DiscoveryEndpoint { get; private set; } - public bool UseTls { get; private set; } - internal ICachePeerAdvertisment[]? CacheServers { get; set; } + /// <summary> + /// Stores available cache servers to be used for discovery, and connections + /// </summary> + public INodeDiscoveryCollection NodeCollection { get; } = new NodeDiscoveryCollection(); /// <summary> - /// Imports the private key used to sign messages + /// The authentication manager to use for signing and verifying messages to and from the cache servers /// </summary> - /// <param name="jwk">The <see cref="ReadOnlyJsonWebKey"/> with a private key loaded</param> - /// <returns>Chainable fluent object</returns> - /// <exception cref="ArgumentException"></exception> - /// <exception cref="CryptographicException"></exception> - public CacheClientConfiguration WithSigningKey(ReadOnlyJsonWebKey jwk) - { - SigningKey = jwk ?? throw new ArgumentNullException(nameof(jwk)); - return this; - } + public ICacheAuthManager AuthManager { get; private set; } /// <summary> - /// Imports the public key used to verify messages from the remote server + /// The error handler to use for handling errors that occur during the discovery process /// </summary> - /// <param name="jwk">The <see cref="ReadOnlyJsonWebKey"/> public key only used for message verification</param> - /// <returns>Chainable fluent object</returns> - /// <exception cref="ArgumentException"></exception> - /// <exception cref="CryptographicException"></exception> - public CacheClientConfiguration WithVerificationKey(ReadOnlyJsonWebKey jwk) - { - VerificationKey = jwk ?? throw new ArgumentNullException(nameof(jwk)); - return this; - } + public ICacheDiscoveryErrorHandler? ErrorHandler { get; private set; } - public CacheClientConfiguration WithBrokerVerificationKey(ReadOnlyJsonWebKey jwk) + /// <summary> + /// Specifies if all connections should use TLS + /// </summary> + public bool UseTls { get; private set; } + + internal ICacheNodeAdvertisment[]? InitialPeers { get; set; } + + /// <summary> + /// Specifies the JWT authentication manager to use for signing and verifying JWTs + /// </summary> + /// <param name="manager">The authentication manager</param> + /// <returns>Chainable fluent object</returns> + public CacheClientConfiguration WithAuthenticator(ICacheAuthManager manager) { - BrokerVerificationKey = jwk ?? throw new ArgumentNullException(nameof(jwk)); + AuthManager = manager; return this; } @@ -92,7 +70,6 @@ namespace VNLib.Data.Caching.Extensions /// Specifies if all connections should be using TLS /// </summary> /// <param name="useTls">A value that indicates if connections should use TLS</param> - /// <returns>Chainable fluent object</returns> public CacheClientConfiguration WithTls(bool useTls) { UseTls = useTls; @@ -100,28 +77,25 @@ namespace VNLib.Data.Caching.Extensions } /// <summary> - /// Specifies the broker address to discover cache nodes from + /// Specifies the initial cache peers to connect to /// </summary> - /// <param name="brokerAddress">The address of the server broker</param> + /// <param name="peers">The collection of servers to discover peers from and connect to</param> /// <returns>Chainable fluent object</returns> - /// <exception cref="ArgumentNullException"></exception> - public CacheClientConfiguration WithBroker(Uri brokerAddress) + public CacheClientConfiguration WithInitialPeers(IEnumerable<ICacheNodeAdvertisment> peers) { - DiscoveryEndpoint = brokerAddress ?? throw new ArgumentNullException(nameof(brokerAddress)); + InitialPeers = peers.ToArray(); return this; } - - ///<inheritdoc/> - public void SignJwt(JsonWebToken jwt) => jwt.SignFromJwk(SigningKey!); - - ///<inheritdoc/> - public bool VerifyCache(JsonWebToken jwt) => jwt.VerifyFromJwk(VerificationKey!); - - ///<inheritdoc/> - public bool VerifyBroker(JsonWebToken jwt) => jwt.VerifyFromJwk(BrokerVerificationKey!); - - ///<inheritdoc/> - public IReadOnlyDictionary<string, string?> GetJwtHeader() => SigningKey!.JwtHeader; + /// <summary> + /// Specifies the error handler to use for handling errors that occur during the discovery process + /// </summary> + /// <param name="handler">The error handler to use during a discovery</param> + /// <returns>Chainable fluent object</returns> + public CacheClientConfiguration WithErrorHandler(ICacheDiscoveryErrorHandler handler) + { + ErrorHandler = handler; + return this; + } } } diff --git a/lib/VNLib.Data.Caching.Extensions/src/CacheListServerRequest.cs b/lib/VNLib.Data.Caching.Extensions/src/CacheListServerRequest.cs deleted file mode 100644 index 76d4ad8..0000000 --- a/lib/VNLib.Data.Caching.Extensions/src/CacheListServerRequest.cs +++ /dev/null @@ -1,82 +0,0 @@ -/* -* Copyright (c) 2023 Vaughn Nugent -* -* Library: VNLib -* Package: VNLib.Data.Caching.Extensions -* File: ListServerRequest.cs -* -* ListServerRequest.cs is part of VNLib.Data.Caching.Extensions which is part of the larger -* VNLib collection of libraries and utilities. -* -* VNLib.Data.Caching.Extensions is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License as -* published by the Free Software Foundation, either version 3 of the -* License, or (at your option) any later version. -* -* VNLib.Data.Caching.Extensions is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see https://www.gnu.org/licenses/. -*/ - -using System; -using System.Collections.Generic; - -using VNLib.Utils; -using VNLib.Hashing.IdentityUtility; - -namespace VNLib.Data.Caching.Extensions -{ - public interface ICacheListServerRequest : ICacheJwtManager - { - Uri DiscoveryEndpoint { get; } - } - - /// <summary> - /// A request container for a ListServer request - /// </summary> - public sealed class CacheListServerRequest : ICacheListServerRequest - { - private readonly ICacheJwtManager _manager; - - - /// <summary> - /// The address of the broker server to connect to - /// </summary> - public Uri DiscoveryEndpoint { get; private set; } - - public CacheListServerRequest(ICacheJwtManager keyManager, Uri? brokerAddress = null) - { - _manager = keyManager; - DiscoveryEndpoint = brokerAddress!; - } - - - /// <summary> - /// Sets the broker address for the request - /// </summary> - /// <param name="brokerAddr">The broker server's address to connect to</param> - /// <returns>A fluent chainable value</returns> - /// <exception cref="ArgumentNullException"></exception> - public CacheListServerRequest WithDiscoveryEndpoint(Uri brokerAddr) - { - DiscoveryEndpoint = brokerAddr ?? throw new ArgumentNullException(nameof(brokerAddr)); - return this; - } - - /// <inheritdoc/> - public void SignJwt(JsonWebToken jwt) => _manager.SignJwt(jwt); - - /// <inheritdoc/> - public bool VerifyCache(JsonWebToken jwt) => _manager.VerifyCache(jwt); - - /// <inheritdoc/> - public bool VerifyBroker(JsonWebToken jwt) => _manager.VerifyBroker(jwt); - - /// <inheritdoc/> - public IReadOnlyDictionary<string, string?> GetJwtHeader() => _manager.GetJwtHeader(); - } -} diff --git a/lib/VNLib.Data.Caching.Extensions/src/CacheNodeConfiguration.cs b/lib/VNLib.Data.Caching.Extensions/src/CacheNodeConfiguration.cs index 21a99e1..29a763c 100644 --- a/lib/VNLib.Data.Caching.Extensions/src/CacheNodeConfiguration.cs +++ b/lib/VNLib.Data.Caching.Extensions/src/CacheNodeConfiguration.cs @@ -24,11 +24,13 @@ using System; - namespace VNLib.Data.Caching.Extensions { - public class CacheNodeConfiguration: CacheClientConfiguration, ICachePeerAdvertisment + /// <summary> + /// A cache configuration for cache servers (nodes) + /// </summary> + public class CacheNodeConfiguration: CacheClientConfiguration, ICacheNodeAdvertisment { /// <summary> /// The address for clients to connect to @@ -56,9 +58,13 @@ namespace VNLib.Data.Caching.Extensions return this; } - public CacheNodeConfiguration EnableAdvertisment(bool enable, Uri? discoveryEndpoint) + /// <summary> + /// Enables or disables the advertisement of this node to other nodes + /// </summary> + /// <param name="discoveryEndpoint">The absolute endpoint clients will use to connect to</param> + public CacheNodeConfiguration EnableAdvertisment(Uri? discoveryEndpoint) { - BroadcastAdverisment = enable; + BroadcastAdverisment = discoveryEndpoint != null; DiscoveryEndpoint = discoveryEndpoint; return this; } diff --git a/lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs b/lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs index 9efe16a..634b6de 100644 --- a/lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs +++ b/lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs @@ -139,91 +139,114 @@ namespace VNLib.Data.Caching.Extensions client.Config.DebugLog?.Debug("{debug}: {data}", "[CACHE]", message); } + /// <summary> - /// Creats a new <see cref="CacheListServerRequest"/> from an existing <see cref="CacheClientConfiguration"/> + /// Discovers ALL possible cache nodes itteritivley from the current collection of initial peers. + /// This will make connections to all discoverable servers /// </summary> - /// <param name="conf">The prepared client configuration</param> - /// <returns>The new <see cref="CacheListServerRequest"/></returns> - public static CacheListServerRequest GetListMessage(this CacheClientConfiguration conf) + /// <param name="config"></param> + /// <param name="cancellation">A token to cancel the operation</param> + /// <returns></returns> + /// <exception cref="ArgumentException"></exception> + public static async Task DiscoverNodesAsync(this CacheClientConfiguration config, CancellationToken cancellation) { - return new(conf, conf.DiscoveryEndpoint); + //Make sure at least one node defined + if(config?.InitialPeers == null || config.InitialPeers.Length == 0) + { + throw new ArgumentException("There must be at least one cache server defined in the client configuration"); + } + + //Get the discovery enumerator with the initial peers + INodeDiscoveryEnumerator enumerator = config.NodeCollection.BeginDiscovery(config.InitialPeers); + + //Start the discovery process + await DiscoverNodesAsync(enumerator, config.AuthManager, config.ErrorHandler, cancellation); + + //Commit nodes + config.NodeCollection.CompleteDiscovery(enumerator); } - /// <summary> - /// Discovers peer nodes from a given initial peer and returns a list of discovered nodes. If the config - /// is for a cache peer node, the current peer is removed from the list of discovered nodes. - /// </summary> - /// <param name="cacheConfig"></param> - /// <param name="initialPeer">The initial peer to discover nodes from</param> - /// <param name="cancellation">A token to cancel the discovery operation</param> - /// <returns>The collection of discovered nodes</returns> - /// <exception cref="ArgumentNullException"></exception> - public static async Task<ICachePeerAdvertisment[]?> DiscoverClusterNodesAsync( - this CacheClientConfiguration cacheConfig, - ICachePeerAdvertisment initialPeer, - CancellationToken cancellation + private static async Task DiscoverNodesAsync( + INodeDiscoveryEnumerator enumerator, + ICacheAuthManager auth, + ICacheDiscoveryErrorHandler? errHandler, + CancellationToken cancellation ) { - _ = initialPeer?.DiscoveryEndpoint ?? throw new ArgumentException("Advertisment does not expose an advertisment endpoint"); - - //Create list request - CacheListServerRequest request = cacheConfig.GetListMessage(); + //Loop through servers + while (enumerator.MoveNext()) + { + //Make sure the node has a discovery endpoint + if (enumerator.Current.DiscoveryEndpoint == null) + { + //Skip this node + continue; + } - //Override with the initial peer's discovery endpoint - request.WithDiscoveryEndpoint(initialPeer.DiscoveryEndpoint); + /* + * We are allowed to save nodes that do not have a discovery endpoint, but we cannot discover nodes from them + * we can only use them as cache + */ - //Get the list of servers - ICachePeerAdvertisment[]? servers = await ListServersAsync(request, cancellation); + //add a random delay to avoid spamming the server + await Task.Delay((int)Random.Shared.NextInt64(50, 500), cancellation); - if (servers == null) - { - return null; - } - - if(cacheConfig is CacheNodeConfiguration cnc) - { - //Filter out the current node - return servers.Where(s => !cnc.NodeId.Equals(s.NodeId, StringComparison.OrdinalIgnoreCase)).ToArray(); + try + { + //Discover nodes from the current node + ICacheNodeAdvertisment[]? nodes = await GetCacheNodesAsync(enumerator.Current, auth, cancellation); + + if (nodes != null) + { + //Add nodes to the collection + enumerator.OnPeerDiscoveryComplete(nodes); + } + } + //Catch exceptions when an error handler is defined + catch(Exception ex) when (errHandler != null) + { + //Handle the error + errHandler.OnDiscoveryError(enumerator.Current, ex); + } } - else - { - //Do not filter - return servers; - } } /// <summary> /// Contacts the cache broker to get a list of active servers to connect to /// </summary> - /// <param name="request">The request message used to connecto the broker server</param> + /// <param name="advert">An advertisment of a server to discover other nodes from</param> /// <param name="cancellationToken">A token to cancel the operationS</param> + /// <param name="auth">The authentication manager</param> /// <returns>The list of active servers</returns> /// <exception cref="SecurityException"></exception> + /// <exception cref="ArgumentException"></exception> /// <exception cref="ArgumentNullException"></exception> - public static async Task<ICachePeerAdvertisment[]?> ListServersAsync(ICacheListServerRequest request, CancellationToken cancellationToken = default) + public static async Task<ICacheNodeAdvertisment[]?> GetCacheNodesAsync(ICacheNodeAdvertisment advert, ICacheAuthManager auth, CancellationToken cancellationToken = default) { - _ = request ?? throw new ArgumentNullException(nameof(request)); + _ = advert ?? throw new ArgumentNullException(nameof(advert)); + _ = auth ?? throw new ArgumentNullException(nameof(auth)); + _ = advert.DiscoveryEndpoint ?? throw new ArgumentException("Advertisment does not expose an advertisment endpoint"); string jwtBody; //Build request jwt using (JsonWebToken requestJwt = new()) { - requestJwt.WriteHeader(request.GetJwtHeader()); + requestJwt.WriteHeader(auth.GetJwtHeader()); requestJwt.InitPayloadClaim() .AddClaim("iat", DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()) .AddClaim("nonce", RandomHash.GetRandomBase32(16)) .CommitClaims(); //sign the jwt - request.SignJwt(requestJwt); + auth.SignJwt(requestJwt); //Compile the jwt jwtBody = requestJwt.Compile(); } //New list request - RestRequest listRequest = new(request.DiscoveryEndpoint, Method.Post); + RestRequest listRequest = new(advert.DiscoveryEndpoint, Method.Post); //Add the jwt as a string to the request body listRequest.AddStringBody(jwtBody, DataFormat.None); @@ -248,87 +271,18 @@ namespace VNLib.Data.Caching.Extensions //Response is jwt using JsonWebToken responseJwt = JsonWebToken.ParseRaw(data); - + //Verify the jwt - if (!request.VerifyBroker(responseJwt)) + if (!auth.VerifyJwt(responseJwt)) { throw new SecurityException("Failed to verify the broker's challenge, cannot continue"); } - + using JsonDocument doc = responseJwt.GetPayload(); return doc.RootElement.GetProperty("peers").Deserialize<Advertisment[]>(); } /// <summary> - /// Registers the current node with the broker - /// </summary> - /// <returns>A task that completes when the regitration has been made successfully</returns> - /// <exception cref="ArgumentException"></exception> - public static async Task RegisterWithBrokerAsync(this CacheNodeConfiguration config, string authToken) - { - //Recover the certificate - ReadOnlyJsonWebKey cacheCert = config?.SigningKey ?? throw new ArgumentException(nameof(config.SigningKey)); - - //init broker request - using BrokerRegistrationRequest request = new(); - - request.WithBroker(config.DiscoveryEndpoint!) - .WithRegistrationAddress(config.ConnectEndpoint!.ToString()) - .WithNodeId(config.NodeId!) - .WithSigningKey(cacheCert, true) - .WithHeartbeatToken(authToken); - - - //Send the request - await RegisterWithBrokerAsync(request); - } - - /// <summary> - /// Registers the current server as active with the specified broker - /// </summary> - /// <param name="registration">The registration request</param> - public static async Task RegisterWithBrokerAsync(BrokerRegistrationRequest registration) - { - _ = registration ?? throw new ArgumentNullException(nameof(registration)); - _ = registration.HeartbeatToken ?? throw new ArgumentException("Missing required heartbeat access token"); - _ = registration.NodeId ?? throw new ArgumentException("Missing required cache server NodeId"); - _ = registration.BrokerAddress ?? throw new ArgumentException("Broker server address has not been configured"); - _ = registration.RegistrationAddress ?? throw new ArgumentException("Missing required registration address", nameof(registration)); - - string requestData; - //Create the jwt for signed registration message - using (JsonWebToken jwt = new()) - { - //Shared jwt header - jwt.WriteHeader(registration.JsonHeader); - //build jwt claim - jwt.InitPayloadClaim() - .AddClaim("address", registration.RegistrationAddress) - .AddClaim("sub", registration.NodeId) - .AddClaim("token", registration.HeartbeatToken) - .CommitClaims(); - - //Sign the jwt - registration.SignJwt(jwt); - //Compile and save - requestData = jwt.Compile(); - } - - //Create reg request message - RestRequest regRequest = new(registration.BrokerAddress); - regRequest.AddStringBody(requestData, DataFormat.None); - regRequest.AddHeader("Content-Type", "text/plain"); - - //Rent client - using ClientContract cc = ClientPool.Lease(); - - //Exec the regitration request - RestResponse response = await cc.Resource.ExecutePutAsync(regRequest); - response.ThrowIfError(); - } - - - /// <summary> /// Allows for configuration of an <see cref="FBMClient"/> /// for a connection to a cache server /// </summary> @@ -359,32 +313,7 @@ namespace VNLib.Data.Caching.Extensions ClientCacheConfig.AddOrUpdate(client, nodeConfig); return nodeConfig; } - - /// <summary> - /// Discovers cache nodes in the broker configured for the current client. - /// </summary> - /// <param name="client"></param> - /// <param name="token">A token to cancel the discovery</param> - /// <returns>A task the resolves the list of active servers on the broker server</returns> - public static Task<ICachePeerAdvertisment[]?> DiscoverCacheNodesAsync(this FBMClientWorkerBase client, CancellationToken token = default) - { - return client.Client.DiscoverCacheNodesAsync(token); - } - - /// <summary> - /// Discovers cache nodes in the broker configured for the current client. - /// </summary> - /// <param name="client"></param> - /// <param name="token">A token to cancel the discovery </param> - /// <returns>A task the resolves the list of active servers on the broker server</returns> - public static async Task<ICachePeerAdvertisment[]?> DiscoverCacheNodesAsync(this FBMClient client, CancellationToken token = default) - { - //Get the stored client config - CacheClientConfiguration conf = ClientCacheConfig.GetOrCreateValue(client); - - //List servers async - return conf.CacheServers = await ListServersAsync(conf, token); - } + /// <summary> /// Waits for the client to disconnect from the server while observing @@ -428,15 +357,18 @@ namespace VNLib.Data.Caching.Extensions /// <exception cref="ArgumentNullException"></exception> /// <exception cref="SecurityException"></exception> /// <exception cref="ObjectDisposedException"></exception> - public static async Task<ICachePeerAdvertisment> ConnectToRandomCacheAsync(this FBMClient client, CancellationToken cancellation = default) + public static async Task<ICacheNodeAdvertisment> ConnectToRandomCacheAsync(this FBMClient client, CancellationToken cancellation = default) { //Get stored config CacheClientConfiguration conf = ClientCacheConfig.GetOrCreateValue(client); - //Select random - ICachePeerAdvertisment? randomServer = conf.CacheServers?.SelectRandom() - ?? throw new ArgumentException("No servers detected, cannot connect"); + //Get all available nodes, or at least the initial peers + ICacheNodeAdvertisment[]? adverts = conf.NodeCollection.GetAllNodes() ?? conf.InitialPeers ?? throw new ArgumentException("No cache nodes discovered, cannot connect"); + //Select random node from all available nodes + ICacheNodeAdvertisment randomServer = adverts.SelectRandom(); + + //Connect to the random server await ConnectToCacheAsync(client, randomServer, cancellation); //Return the random server we connected to @@ -456,13 +388,14 @@ namespace VNLib.Data.Caching.Extensions /// <exception cref="ArgumentNullException"></exception> /// <exception cref="SecurityException"></exception> /// <exception cref="ObjectDisposedException"></exception> - public static Task ConnectToCacheAsync(this FBMClient client, ICachePeerAdvertisment server, CancellationToken token = default) + public static Task ConnectToCacheAsync(this FBMClient client, ICacheNodeAdvertisment server, CancellationToken token = default) { _ = client ?? throw new ArgumentNullException(nameof(client)); _ = server ?? throw new ArgumentNullException(nameof(server)); //Get stored config CacheClientConfiguration conf = ClientCacheConfig.GetOrCreateValue(client); + //Connect to server (no server id because client not replication server) return ConnectToCacheAsync(client, conf, server, token); } @@ -481,7 +414,7 @@ namespace VNLib.Data.Caching.Extensions /// <exception cref="ArgumentNullException"></exception> /// <exception cref="SecurityException"></exception> /// <exception cref="ObjectDisposedException"></exception> - public static Task ConnectToCacheAsync(this FBMClient client, ICachePeerAdvertisment server, CacheClientConfiguration explicitConfig, CancellationToken token = default) + public static Task ConnectToCacheAsync(this FBMClient client, ICacheNodeAdvertisment server, CacheClientConfiguration explicitConfig, CancellationToken token = default) { _ = client ?? throw new ArgumentNullException(nameof(client)); _ = server ?? throw new ArgumentNullException(nameof(server)); @@ -494,7 +427,7 @@ namespace VNLib.Data.Caching.Extensions private static async Task ConnectToCacheAsync( FBMClient client, CacheClientConfiguration config, - ICachePeerAdvertisment server, + ICacheNodeAdvertisment server, CancellationToken token = default ) { @@ -513,7 +446,7 @@ namespace VNLib.Data.Caching.Extensions //Init jwt for connecting to server using (JsonWebToken jwt = new()) { - jwt.WriteHeader(config.GetJwtHeader()); + jwt.WriteHeader(config.AuthManager.GetJwtHeader()); //Init claim JwtPayload claim = jwt.InitPayloadClaim(); @@ -532,7 +465,7 @@ namespace VNLib.Data.Caching.Extensions claim.CommitClaims(); //Sign jwt - config.SignJwt(jwt); + config.AuthManager.SignJwt(jwt); //Compile to string jwtMessage = jwt.Compile(); @@ -576,7 +509,7 @@ namespace VNLib.Data.Caching.Extensions using (JsonWebToken jwt = JsonWebToken.Parse(authToken)) { //Verify the jwt - if (!config.VerifyCache(jwt)) + if (!config.AuthManager.VerifyJwt(jwt)) { throw new SecurityException("Failed to verify the cache server's negotiation message, cannot continue"); } @@ -591,7 +524,7 @@ namespace VNLib.Data.Caching.Extensions client.ClientSocket.Headers[HttpRequestHeader.Authorization] = authToken; //Compute the signature of the upgrade token - client.ClientSocket.Headers[X_UPGRADE_SIG_HEADER] = GetBase64UpgradeSingature(authToken, config.SigningKey!); + client.ClientSocket.Headers[X_UPGRADE_SIG_HEADER] = config.AuthManager.GetBase64UpgradeSingature(authToken); //Check to see if adversize self is enabled if (cnc?.BroadcastAdverisment == true) @@ -660,38 +593,16 @@ namespace VNLib.Data.Caching.Extensions * compute a signature of the upgrade token and send it to the server to prove we hold the private key. */ - private static string GetBase64UpgradeSingature(string? token, ReadOnlyJsonWebKey key) + private static string GetBase64UpgradeSingature(this ICacheAuthManager man, string? token) { - //try to get the ecdsa key first - using ECDsa? ec = key.GetECDsaPrivateKey(); - - if(ec != null) - { - //Compute hash of the token - byte[] hash = ManagedHash.ComputeHash(token, HashAlg.SHA256); - - //Sign the hash - byte[] sig = ec.SignHash(hash, DSASignatureFormat.IeeeP1363FixedFieldConcatenation); - - //Return the base64 string - return Convert.ToBase64String(sig); - } - - //Check rsa next - using RSA? rsa = key.GetRSAPrivateKey(); - if(rsa != null) - { - //Compute hash of the token - byte[] hash = ManagedHash.ComputeHash(token, HashAlg.SHA256); + //Compute hash of the token + byte[] hash = ManagedHash.ComputeHash(token, HashAlg.SHA256); - //Sign the hash - byte[] sig = rsa.SignHash(hash, HashAlgorithmName.SHA256, RSASignaturePadding.Pkcs1); + //Sign the hash + byte[] sig = man.SignMessageHash(hash, HashAlg.SHA256); - //Return the base64 string - return Convert.ToBase64String(sig); - } - - throw new CryptographicException("Cache JKW does not export a supported private key for upgrade challenges"); + //Return the base64 string + return Convert.ToBase64String(sig); } /// <summary> @@ -704,21 +615,21 @@ namespace VNLib.Data.Caching.Extensions /// <exception cref="CryptographicException"></exception> public static bool VerifyUpgradeToken(this CacheClientConfiguration nodeConfig, string signature, string token) { - return VerifyUpgradeToken(signature, token, nodeConfig.VerificationKey); + return VerifyUpgradeToken(nodeConfig.AuthManager, signature, token); } /// <summary> /// Verifies the signed auth token against the given verification key /// </summary> + /// <param name="man"></param> /// <param name="signature">The base64 signature of the token</param> /// <param name="token">The raw token to compute the hash of</param> - /// <param name="verifcationKey">The key used to verify the singature with</param> /// <returns>True if the singature matches, false otherwise</returns> /// <exception cref="ArgumentNullException"></exception> /// <exception cref="CryptographicException"></exception> - public static bool VerifyUpgradeToken(string signature, string token, ReadOnlyJsonWebKey verifcationKey) + public static bool VerifyUpgradeToken(this ICacheAuthManager man, string signature, string token) { - _ = verifcationKey ?? throw new ArgumentNullException(nameof(verifcationKey)); + _ = man ?? throw new ArgumentNullException(nameof(man)); //get the hash of the token byte[] hash = ManagedHash.ComputeHash(token, HashAlg.SHA256); @@ -726,23 +637,7 @@ namespace VNLib.Data.Caching.Extensions //decode the signature byte[] sig = Convert.FromBase64String(signature); - //try to get the ecdsa key first - using ECDsa? ec = verifcationKey.GetECDsaPublicKey(); - if(ec != null) - { - //Verify the signature - return ec.VerifyHash(hash, sig, DSASignatureFormat.IeeeP1363FixedFieldConcatenation); - } - - //Check rsa next - using RSA? rsa = verifcationKey.GetRSAPublicKey(); - if(rsa != null) - { - //Verify the signature - return rsa.VerifyHash(hash, sig, HashAlgorithmName.SHA256, RSASignaturePadding.Pkcs1); - } - - throw new CryptographicException("Cache JKW does not export a supported public key for upgrade challenges"); + return man.VerifyMessageHash(hash, HashAlg.SHA256, sig); } private static string GetAdvertismentHeader(CacheNodeConfiguration nodeConfiguration) @@ -756,7 +651,7 @@ namespace VNLib.Data.Caching.Extensions using JsonWebToken jwt = new(); //Get the jwt header - jwt.WriteHeader(nodeConfiguration.GetJwtHeader()); + jwt.WriteHeader(nodeConfiguration.AuthManager.GetJwtHeader()); jwt.InitPayloadClaim() .AddClaim("nonce", RandomHash.GetRandomBase32(16)) @@ -768,7 +663,7 @@ namespace VNLib.Data.Caching.Extensions .CommitClaims(); //Sign message - nodeConfiguration.SignJwt(jwt); + nodeConfiguration.AuthManager.SignJwt(jwt); return jwt.Compile(); } @@ -780,12 +675,12 @@ namespace VNLib.Data.Caching.Extensions /// <param name="message">The advertisment message to verify</param> /// <returns>The advertisment message if successfully verified, or null otherwise</returns> /// <exception cref="FormatException"></exception> - public static ICachePeerAdvertisment? VerifyPeerAdvertisment(this ICacheJwtManager config, string message) + public static ICacheNodeAdvertisment? VerifyPeerAdvertisment(this ICacheAuthManager config, string message) { using JsonWebToken jwt = JsonWebToken.Parse(message); //Verify the signature - if (!config.VerifyCache(jwt)) + if (!config.VerifyJwt(jwt)) { return null; } @@ -800,7 +695,7 @@ namespace VNLib.Data.Caching.Extensions /// </summary> /// <param name="servers"></param> /// <returns>A server selected at random</returns> - public static ICachePeerAdvertisment SelectRandom(this ICollection<ICachePeerAdvertisment> servers) + public static ICacheNodeAdvertisment SelectRandom(this ICollection<ICacheNodeAdvertisment> servers) { //select random server int randServer = RandomNumberGenerator.GetInt32(0, servers.Count); @@ -808,7 +703,7 @@ namespace VNLib.Data.Caching.Extensions } - private class Advertisment : ICachePeerAdvertisment + private class Advertisment : ICacheNodeAdvertisment { [JsonIgnore] public Uri? ConnectEndpoint { get; set; } diff --git a/lib/VNLib.Data.Caching.Extensions/src/ICacheAuthManager.cs b/lib/VNLib.Data.Caching.Extensions/src/ICacheAuthManager.cs new file mode 100644 index 0000000..e3ab868 --- /dev/null +++ b/lib/VNLib.Data.Caching.Extensions/src/ICacheAuthManager.cs @@ -0,0 +1,76 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Data.Caching.Extensions +* File: ClientCacheConfiguration.cs +* +* ClientCacheConfiguration.cs is part of VNLib.Data.Caching.Extensions which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Data.Caching.Extensions is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Data.Caching.Extensions is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Collections.Generic; + +using VNLib.Hashing; +using VNLib.Hashing.IdentityUtility; + +namespace VNLib.Data.Caching.Extensions +{ + /// <summary> + /// Provides authentication services for cache clients and + /// servers. + /// </summary> + public interface ICacheAuthManager + { + /// <summary> + /// Gets the JWT header to use for signing messages with the + /// given key + /// </summary> + /// <returns>The JWT header collection</returns> + IReadOnlyDictionary<string, string?> GetJwtHeader(); + + /// <summary> + /// Signs the given JWT + /// </summary> + /// <param name="jwt">The message to sign</param> + void SignJwt(JsonWebToken jwt); + + /// <summary> + /// Verifies the given JWT + /// </summary> + /// <param name="jwt">The message to verify authenticity</param> + /// <returns>True of the JWT could be verified, false otherwise</returns> + bool VerifyJwt(JsonWebToken jwt); + + /// <summary> + /// Signs the given message hash + /// </summary> + /// <param name="hash">The message hash to sign</param> + /// <param name="alg">The algorithm used to sign the message hash</param> + /// <returns>The signature of the hash</returns> + byte[] SignMessageHash(byte[] hash, HashAlg alg); + + /// <summary> + /// Verifies the given message hash against the signature. + /// </summary> + /// <param name="hash">The message hash to compare</param> + /// <param name="alg">The algorithm used to produce the message hash</param> + /// <param name="signature">The message signature to verify the message against</param> + /// <returns>True of the signature could be verified</returns> + bool VerifyMessageHash(ReadOnlySpan<byte> hash, HashAlg alg, ReadOnlySpan<byte> signature); + } +} diff --git a/lib/VNLib.Data.Caching.Extensions/src/ICacheDiscoveryErrorHandler.cs b/lib/VNLib.Data.Caching.Extensions/src/ICacheDiscoveryErrorHandler.cs new file mode 100644 index 0000000..3493d48 --- /dev/null +++ b/lib/VNLib.Data.Caching.Extensions/src/ICacheDiscoveryErrorHandler.cs @@ -0,0 +1,41 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Data.Caching.Extensions +* File: ClientCacheConfiguration.cs +* +* ClientCacheConfiguration.cs is part of VNLib.Data.Caching.Extensions which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Data.Caching.Extensions is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Data.Caching.Extensions is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; + +namespace VNLib.Data.Caching.Extensions +{ + /// <summary> + /// Represents an type that will handle errors that occur during the discovery process + /// </summary> + public interface ICacheDiscoveryErrorHandler + { + /// <summary> + /// Invoked when an error occurs during the discovery process + /// </summary> + /// <param name="errorNode">The node that the error occured on</param> + /// <param name="ex">The exception that caused the invocation</param> + void OnDiscoveryError(ICacheNodeAdvertisment errorNode, Exception ex); + } +} diff --git a/lib/VNLib.Data.Caching.Extensions/src/ICachePeerAdvertisment.cs b/lib/VNLib.Data.Caching.Extensions/src/ICacheNodeAdvertisment.cs index acf883e..fc29955 100644 --- a/lib/VNLib.Data.Caching.Extensions/src/ICachePeerAdvertisment.cs +++ b/lib/VNLib.Data.Caching.Extensions/src/ICacheNodeAdvertisment.cs @@ -3,9 +3,9 @@ * * Library: VNLib * Package: VNLib.Data.Caching.Extensions -* File: ICachePeerAdvertisment.cs +* File: ICacheNodeAdvertisment.cs * -* ICachePeerAdvertisment.cs is part of VNLib.Data.Caching.Extensions which is part of the larger +* ICacheNodeAdvertisment.cs is part of VNLib.Data.Caching.Extensions which is part of the larger * VNLib collection of libraries and utilities. * * VNLib.Data.Caching.Extensions is free software: you can redistribute it and/or modify @@ -30,7 +30,7 @@ namespace VNLib.Data.Caching.Extensions /// <summary> /// Represents a node that can be advertised to clients /// </summary> - public interface ICachePeerAdvertisment + public interface ICacheNodeAdvertisment { /// <summary> /// The endpoint for clients to connect to to access the cache diff --git a/plugins/ObjectCacheServer/src/Distribution/INodeDiscoveryCollection.cs b/lib/VNLib.Data.Caching.Extensions/src/INodeDiscoveryCollection.cs index d69da40..9adebdc 100644 --- a/plugins/ObjectCacheServer/src/Distribution/INodeDiscoveryCollection.cs +++ b/lib/VNLib.Data.Caching.Extensions/src/INodeDiscoveryCollection.cs @@ -2,18 +2,18 @@ * Copyright (c) 2023 Vaughn Nugent * * Library: VNLib -* Package: ObjectCacheServer +* Package: VNLib.Data.Caching.Extensions * File: INodeDiscoveryCollection.cs * -* INodeDiscoveryCollection.cs is part of ObjectCacheServer which is -* part of the larger VNLib collection of libraries and utilities. +* INodeDiscoveryCollection.cs is part of VNLib.Data.Caching.Extensions which is part of the larger +* VNLib collection of libraries and utilities. * -* ObjectCacheServer is free software: you can redistribute it and/or modify +* VNLib.Data.Caching.Extensions is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License as * published by the Free Software Foundation, either version 3 of the * License, or (at your option) any later version. * -* ObjectCacheServer is distributed in the hope that it will be useful, +* VNLib.Data.Caching.Extensions is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. @@ -22,13 +22,17 @@ * along with this program. If not, see https://www.gnu.org/licenses/. */ -using System.Collections.Generic; -using VNLib.Data.Caching.Extensions; +using System.Collections.Generic; -namespace VNLib.Data.Caching.ObjectCache.Server.Distribution +namespace VNLib.Data.Caching.Extensions { - internal interface INodeDiscoveryCollection + /// <summary> + /// Represents a collection of discovered nodes + /// </summary> +#pragma warning disable CA1711 // Identifiers should not have incorrect suffix + public interface INodeDiscoveryCollection +#pragma warning restore CA1711 // Identifiers should not have incorrect suffix { /// <summary> /// Begins a new discovery and gets an enumerator for the discovery process @@ -41,13 +45,13 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Distribution /// </summary> /// <param name="initialPeers">An initial collection of peers to add to the enumeration</param> /// <returns>An enumerator that simplifies discovery of unique nodes</returns> - INodeDiscoveryEnumerator BeginDiscovery(IEnumerable<ICachePeerAdvertisment> initialPeers); + INodeDiscoveryEnumerator BeginDiscovery(IEnumerable<ICacheNodeAdvertisment> initialPeers); /// <summary> /// Gets a snapshot of all discovered nodes in the current collection. /// </summary> /// <returns>The current collection of notes</returns> - ICachePeerAdvertisment[] GetAllNodes(); + ICacheNodeAdvertisment[] GetAllNodes(); /// <summary> /// Completes a discovery process and updates the collection with the results diff --git a/plugins/ObjectCacheServer/src/Distribution/INodeDiscoveryEnumerator.cs b/lib/VNLib.Data.Caching.Extensions/src/INodeDiscoveryEnumerator.cs index 5cddf9c..f6d5f40 100644 --- a/plugins/ObjectCacheServer/src/Distribution/INodeDiscoveryEnumerator.cs +++ b/lib/VNLib.Data.Caching.Extensions/src/INodeDiscoveryEnumerator.cs @@ -2,18 +2,18 @@ * Copyright (c) 2023 Vaughn Nugent * * Library: VNLib -* Package: ObjectCacheServer +* Package: VNLib.Data.Caching.Extensions * File: INodeDiscoveryEnumerator.cs * -* INodeDiscoveryEnumerator.cs is part of ObjectCacheServer which is part -* of the larger VNLib collection of libraries and utilities. +* INodeDiscoveryEnumerator.cs is part of VNLib.Data.Caching.Extensions which is part of the larger +* VNLib collection of libraries and utilities. * -* ObjectCacheServer is free software: you can redistribute it and/or modify +* VNLib.Data.Caching.Extensions is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License as * published by the Free Software Foundation, either version 3 of the * License, or (at your option) any later version. * -* ObjectCacheServer is distributed in the hope that it will be useful, +* VNLib.Data.Caching.Extensions is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. @@ -22,24 +22,21 @@ * along with this program. If not, see https://www.gnu.org/licenses/. */ + using System.Collections.Generic; -using VNLib.Data.Caching.Extensions; -namespace VNLib.Data.Caching.ObjectCache.Server.Distribution +namespace VNLib.Data.Caching.Extensions { - internal interface INodeDiscoveryEnumerator + /// <summary> + /// A custom enumerator for the node discovery process + /// </summary> + public interface INodeDiscoveryEnumerator : IEnumerator<ICacheNodeAdvertisment> { /// <summary> - /// Moves the enumerator to the next peer in the discovery process and returns the result - /// </summary> - /// <returns>The next peer advertisment in the enumeration</returns> - ICachePeerAdvertisment? GetNextPeer(); - - /// <summary> /// Adds the specified peer to the collection of discovered peers /// </summary> /// <param name="discoveredPeers">The peer collection</param> - void OnPeerDiscoveryComplete(IEnumerable<ICachePeerAdvertisment> discoveredPeers); + void OnPeerDiscoveryComplete(IEnumerable<ICacheNodeAdvertisment> discoveredPeers); } } diff --git a/plugins/ObjectCacheServer/src/Distribution/NodeDiscoveryCollection.cs b/lib/VNLib.Data.Caching.Extensions/src/NodeDiscoveryCollection.cs index f773a2e..305f5de 100644 --- a/plugins/ObjectCacheServer/src/Distribution/NodeDiscoveryCollection.cs +++ b/lib/VNLib.Data.Caching.Extensions/src/NodeDiscoveryCollection.cs @@ -2,18 +2,18 @@ * Copyright (c) 2023 Vaughn Nugent * * Library: VNLib -* Package: ObjectCacheServer -* File: PeerDiscoveryManager.cs +* Package: VNLib.Data.Caching.Extensions +* File: INodeDiscoveryCollection.cs * -* PeerDiscoveryManager.cs is part of ObjectCacheServer which is part of the larger +* INodeDiscoveryCollection.cs is part of VNLib.Data.Caching.Extensions which is part of the larger * VNLib collection of libraries and utilities. * -* ObjectCacheServer is free software: you can redistribute it and/or modify +* VNLib.Data.Caching.Extensions is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License as * published by the Free Software Foundation, either version 3 of the * License, or (at your option) any later version. * -* ObjectCacheServer is distributed in the hope that it will be useful, +* VNLib.Data.Caching.Extensions is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. @@ -25,21 +25,24 @@ using System; using System.Linq; using System.Collections.Generic; +using System.Collections; -using VNLib.Plugins; -using VNLib.Data.Caching.Extensions; - -namespace VNLib.Data.Caching.ObjectCache.Server.Distribution +namespace VNLib.Data.Caching.Extensions { - sealed class NodeDiscoveryCollection : INodeDiscoveryCollection + /// <summary> + /// Represents a collection of available cache nodes from a discovery process + /// </summary> + public sealed class NodeDiscoveryCollection : INodeDiscoveryCollection { - private LinkedList<ICachePeerAdvertisment> _peers; - + private LinkedList<ICacheNodeAdvertisment> _peers; - public NodeDiscoveryCollection(PluginBase plugin) + /// <summary> + /// Initializes a new empty <see cref="NodeDiscoveryCollection"/> + /// </summary> + public NodeDiscoveryCollection() { - _peers = new(); - } + _peers = new(); + } ///<inheritdoc/> public INodeDiscoveryEnumerator BeginDiscovery() @@ -48,7 +51,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Distribution } ///<inheritdoc/> - public INodeDiscoveryEnumerator BeginDiscovery(IEnumerable<ICachePeerAdvertisment> initialPeers) + public INodeDiscoveryEnumerator BeginDiscovery(IEnumerable<ICacheNodeAdvertisment> initialPeers) { //Init new enumerator with the initial peers return new NodeEnumerator(new(initialPeers)); @@ -64,36 +67,51 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Distribution } ///<inheritdoc/> - public ICachePeerAdvertisment[] GetAllNodes() + public ICacheNodeAdvertisment[] GetAllNodes() { //Capture all current peers return _peers.ToArray(); } - private sealed record class NodeEnumerator(LinkedList<ICachePeerAdvertisment> Peers) : INodeDiscoveryEnumerator + private sealed record class NodeEnumerator(LinkedList<ICacheNodeAdvertisment> Peers) : INodeDiscoveryEnumerator { //Keep track of the current node in the collection so we can move down the list - private LinkedListNode<ICachePeerAdvertisment>? _currentNode = Peers.First; + private LinkedListNode<ICacheNodeAdvertisment>? _currentNode = Peers.First; + + public ICacheNodeAdvertisment Current => _currentNode?.Value; + object IEnumerator.Current => _currentNode?.Value; + - public ICachePeerAdvertisment? GetNextPeer() + ///<inheritdoc/> + public bool MoveNext() { //Move to the next peer in the collection _currentNode = _currentNode?.Next; - return _currentNode?.Value; + return _currentNode?.Value != null; } - public void OnPeerDiscoveryComplete(IEnumerable<ICachePeerAdvertisment> discoveredPeers) + ///<inheritdoc/> + public void OnPeerDiscoveryComplete(IEnumerable<ICacheNodeAdvertisment> discoveredPeers) { //Get only the peers from the discovery that are not already in the collection - IEnumerable<ICachePeerAdvertisment> newPeers = discoveredPeers.Except(Peers); - + IEnumerable<ICacheNodeAdvertisment> newPeers = discoveredPeers.Except(Peers); + //Add them to the end of the collection - foreach(ICachePeerAdvertisment ad in newPeers) + foreach (ICacheNodeAdvertisment ad in newPeers) { Peers.AddLast(ad); } } + + public void Reset() + { + //Go to the first node + _currentNode = Peers.First; + } + + public void Dispose() + { } } } } diff --git a/lib/VNLib.Data.Caching.ObjectCache/src/BlobCacheLIstener.cs b/lib/VNLib.Data.Caching.ObjectCache/src/BlobCacheLIstener.cs index f69c2a4..0fe0663 100644 --- a/lib/VNLib.Data.Caching.ObjectCache/src/BlobCacheLIstener.cs +++ b/lib/VNLib.Data.Caching.ObjectCache/src/BlobCacheLIstener.cs @@ -42,7 +42,6 @@ using System; using System.Threading; using System.Threading.Tasks; -using VNLib.Utils.Async; using VNLib.Utils.Memory; using VNLib.Utils.Logging; using VNLib.Net.Messaging.FBM.Server; @@ -65,7 +64,7 @@ namespace VNLib.Data.Caching.ObjectCache /// <summary> /// A queue that stores update and delete events /// </summary> - public AsyncQueue<ChangeEvent> EventQueue { get; } + public ICacheListenerEventQueue EventQueue { get; } /// <summary> /// The Cache store to access data blobs @@ -77,18 +76,17 @@ namespace VNLib.Data.Caching.ObjectCache /// Initialzies a new <see cref="BlobCacheListener"/> /// </summary> /// <param name="cache">The cache table to work from</param> + /// <param name="queue">The event queue to publish changes to</param> /// <param name="log">Writes error and debug logging information</param> /// <param name="heap">The heap to alloc FBM buffers and <see cref="CacheEntry"/> cache buffers from</param> - /// <param name="singleReader">A value that indicates if a single thread is processing events</param> /// <exception cref="ArgumentNullException"></exception> - public BlobCacheListener(IBlobCacheTable cache, ILogProvider log, IUnmangedHeap heap, bool singleReader) + public BlobCacheListener(IBlobCacheTable cache, ICacheListenerEventQueue queue, ILogProvider log, IUnmangedHeap heap) { Log = log; Cache = cache ?? throw new ArgumentNullException(nameof(cache)); - //Writes may happen from multple threads with bucket design and no lock - EventQueue = new(false, singleReader); + EventQueue = queue ?? throw new ArgumentNullException(nameof(queue)); InitListener(heap); } @@ -161,32 +159,25 @@ namespace VNLib.Data.Caching.ObjectCache } } - static async Task DequeAsync(AsyncQueue<ChangeEvent> queue, FBMContext context, CancellationToken exitToken) - { - //Wait for a new message to process - ChangeEvent ev = await queue.DequeueAsync(exitToken); - - //Set the response - SetResponse(ev, context); - } - - //If no event bus is registered, then this is not a legal command - if (userState is not AsyncQueue<ChangeEvent> eventBus) + //Determine if the queue is enabled for the user + if(!EventQueue.IsEnabled(userState!)) { context.CloseResponse(ResponseCodes.NotFound); - return; } //try to deq without awaiting - if (eventBus.TryDequeue(out ChangeEvent? change)) + if (EventQueue.TryDequeue(userState!, out ChangeEvent? change)) { SetResponse(change, context); } else { - //Process async - await DequeAsync(eventBus, context, exitToken); + //Wait for a new message to process + ChangeEvent ev = await EventQueue.DequeueAsync(userState!, exitToken); + + //Set the response + SetResponse(ev, context); } return; @@ -257,10 +248,7 @@ namespace VNLib.Data.Caching.ObjectCache private void EnqueEvent(ChangeEvent change) { - if (!EventQueue.TryEnque(change)) - { - Log.Warn("Change event {ev} was not enqued because the event queue is overflowing!", change.CurrentId); - } + EventQueue.PublishEvent(change); } diff --git a/lib/VNLib.Data.Caching.ObjectCache/src/ICacheListenerEventQueue.cs b/lib/VNLib.Data.Caching.ObjectCache/src/ICacheListenerEventQueue.cs new file mode 100644 index 0000000..06be4fa --- /dev/null +++ b/lib/VNLib.Data.Caching.ObjectCache/src/ICacheListenerEventQueue.cs @@ -0,0 +1,65 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Data.Caching.ObjectCache +* File: ICacheListenerEventQueue.cs +* +* ICacheListenerEventQueue.cs is part of VNLib.Data.Caching.ObjectCache which +* is part of the larger VNLib collection of libraries and utilities. +* +* VNLib.Data.Caching.ObjectCache is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Data.Caching.ObjectCache is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + + +using System.Threading; +using System.Threading.Tasks; + +namespace VNLib.Data.Caching.ObjectCache +{ + /// <summary> + /// Represents a single client's event queue + /// </summary> + public interface ICacheListenerEventQueue + { + /// <summary> + /// Determines if the queue is enabled for the given user state + /// </summary> + /// <param name="userState">The unique state of the connection</param> + /// <returns>True if event queuing is enabled</returns> + bool IsEnabled(object userState); + + /// <summary> + /// Attempts to dequeue a single event from the queue without blocking + /// </summary> + /// <param name="userState">A user state object to associate with the wait operation</param> + /// <param name="changeEvent">The dequeued event if successfully dequeued</param> + /// <returns>True if an event was waiting and could be dequeued, false otherwise</returns> + bool TryDequeue(object userState, out ChangeEvent changeEvent); + + /// <summary> + /// Waits asynchronously for an event to be dequeued + /// </summary> + /// <param name="userState">A user state object to associate with the wait operation</param> + /// <param name="cancellation">A token to cancel the wait operation</param> + /// <returns>The <see cref="ChangeEvent"/> that as a result of the dequeue operation</returns> + ValueTask<ChangeEvent> DequeueAsync(object userState, CancellationToken cancellation); + + /// <summary> + /// Publishes an event to the queue + /// </summary> + /// <param name="changeEvent">The change event to publish</param> + void PublishEvent(ChangeEvent changeEvent); + } +} diff --git a/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs b/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs index 9f5ccfe..f4f059b 100644 --- a/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs +++ b/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs @@ -31,15 +31,17 @@ using System.Net.WebSockets; using System.Collections.Generic; using System.Security.Cryptography; +using VNLib.Hashing; +using VNLib.Hashing.IdentityUtility; using VNLib.Utils.Memory; using VNLib.Utils.Logging; -using VNLib.Hashing.IdentityUtility; using VNLib.Data.Caching; using VNLib.Data.Caching.Extensions; using VNLib.Data.Caching.ObjectCache; using VNLib.Net.Messaging.FBM.Client; using VNLib.Plugins.Extensions.Loading; + namespace VNLib.Plugins.Extensions.VNCache { public interface ICacheRefreshPolicy @@ -49,6 +51,7 @@ namespace VNLib.Plugins.Extensions.VNCache TimeSpan RefreshInterval { get; } } + /// <summary> /// A base class that manages /// </summary> @@ -81,8 +84,6 @@ namespace VNLib.Plugins.Extensions.VNCache _config = config; - Uri brokerUri = new(config.BrokerAddress!); - //Init the client with default settings FBMClientConfig conf = FBMDataCacheExtensions.GetDefaultConfig(MemoryUtil.Shared, config.MaxMessageSize!.Value, config.RequestTimeout, debugLog); @@ -90,30 +91,18 @@ namespace VNLib.Plugins.Extensions.VNCache //Add the configuration to the client Client.GetCacheConfiguration() - .WithBroker(brokerUri) - .WithTls(brokerUri.Scheme == Uri.UriSchemeHttps); + .WithTls(config.UseTls) + .WithInitialPeers(config.InitialNodes!); } - - public virtual async Task ConfigureServiceAsync(PluginBase plugin) + public Task ConfigureServiceAsync(PluginBase plugin) { - //Get keys async - Task<ReadOnlyJsonWebKey?> clientPrivTask = plugin.TryGetSecretAsync("client_private_key").ToJsonWebKey(); - Task<ReadOnlyJsonWebKey?> brokerPubTask = plugin.TryGetSecretAsync("broker_public_key").ToJsonWebKey(); - Task<ReadOnlyJsonWebKey?> cachePubTask = plugin.TryGetSecretAsync("cache_public_key").ToJsonWebKey(); - - //Wait for all tasks to complete - _ = await Task.WhenAll(clientPrivTask, brokerPubTask, cachePubTask); - - ReadOnlyJsonWebKey clientPriv = await clientPrivTask ?? throw new KeyNotFoundException("Missing required secret client_private_key"); - ReadOnlyJsonWebKey brokerPub = await brokerPubTask ?? throw new KeyNotFoundException("Missing required secret broker_public_key"); - ReadOnlyJsonWebKey cachePub = await cachePubTask ?? throw new KeyNotFoundException("Missing required secret cache_public_key"); - - //Connection authentication methods + //Set authenticator Client.GetCacheConfiguration() - .WithVerificationKey(cachePub) - .WithSigningKey(clientPriv) - .WithBrokerVerificationKey(brokerPub); + .WithAuthenticator(new AuthManager(plugin)) + .WithErrorHandler(new DiscoveryErrHAndler(plugin.Log)); + + return Task.CompletedTask; } /* @@ -127,7 +116,7 @@ namespace VNLib.Plugins.Extensions.VNCache while (true) { //Load the server list - ICachePeerAdvertisment[]? servers; + ICacheNodeAdvertisment[]? servers; while (true) { try @@ -163,7 +152,7 @@ namespace VNLib.Plugins.Extensions.VNCache pluginLog.Debug("Connecting to random cache server"); //Connect to a random server - ICachePeerAdvertisment selected = await Client.ConnectToRandomCacheAsync(exitToken); + ICacheNodeAdvertisment selected = await Client.ConnectToRandomCacheAsync(exitToken); pluginLog.Debug("Connected to cache server {s}", selected.NodeId); //Set connection status flag @@ -255,5 +244,98 @@ namespace VNLib.Plugins.Extensions.VNCache ? throw new InvalidOperationException("The underlying client is not connected to a cache node") : Client!.AddOrUpdateObjectAsync(key, newKey, value, serialzer, cancellation); } + + + private sealed class AuthManager : ICacheAuthManager + { + + private IAsyncLazy<ReadOnlyJsonWebKey> _sigKey; + private IAsyncLazy<ReadOnlyJsonWebKey> _verKey; + + public AuthManager(PluginBase plugin) + { + //Lazy load keys + + //Get the signing key + _sigKey = plugin.GetSecretAsync("client_private_key").ToLazy(static r => r.GetJsonWebKey()); + + //Lazy load cache public key + _verKey = plugin.GetSecretAsync("cache_public_key").ToLazy(static r => r.GetJsonWebKey()); + } + + public async Task AwaitLazyKeyLoad() + { + await _sigKey; + await _verKey; + } + + ///<inheritdoc/> + public IReadOnlyDictionary<string, string?> GetJwtHeader() + { + //Get the signing key jwt header + return _sigKey.Value.JwtHeader; + } + + ///<inheritdoc/> + public void SignJwt(JsonWebToken jwt) + { + //Sign the jwt with signing key + jwt.SignFromJwk(_sigKey.Value); + } + + ///<inheritdoc/> + public byte[] SignMessageHash(byte[] hash, HashAlg alg) + { + //try to get the rsa alg for the signing key + using RSA? rsa = _sigKey.Value.GetRSAPublicKey(); + if(rsa != null) + { + return rsa.SignHash(hash, alg.GetAlgName(), RSASignaturePadding.Pkcs1); + } + + //try to get the ecdsa alg for the signing key + using ECDsa? ecdsa = _sigKey.Value.GetECDsaPublicKey(); + if(ecdsa != null) + { + return ecdsa.SignHash(hash); + } + + throw new NotSupportedException("The signing key is not a valid RSA or ECDSA key"); + } + + ///<inheritdoc/> + public bool VerifyJwt(JsonWebToken jwt) + { + return jwt.VerifyFromJwk(_verKey.Value); + } + + ///<inheritdoc/> + public bool VerifyMessageHash(ReadOnlySpan<byte> hash, HashAlg alg, ReadOnlySpan<byte> signature) + { + //try to get the rsa alg for the signing key + using RSA? rsa = _verKey.Value.GetRSAPublicKey(); + if (rsa != null) + { + return rsa.VerifyHash(hash, signature, alg.GetAlgName(), RSASignaturePadding.Pkcs1); + } + + //try to get the ecdsa alg for the signing key + using ECDsa? ecdsa = _verKey.Value.GetECDsaPublicKey(); + if (ecdsa != null) + { + return ecdsa.VerifyHash(hash, signature); + } + + throw new NotSupportedException("The current key is not an RSA or ECDSA key and is not supported"); + } + } + + private sealed record class DiscoveryErrHAndler(ILogProvider Logger) : ICacheDiscoveryErrorHandler + { + public void OnDiscoveryError(ICacheNodeAdvertisment errorNode, Exception ex) + { + Logger.Error("Failed to discover nodes from server {s} cause:\n{err}", errorNode.NodeId, ex); + } + } } }
\ No newline at end of file diff --git a/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClientConfig.cs b/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClientConfig.cs index 1d888ec..64d3e07 100644 --- a/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClientConfig.cs +++ b/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClientConfig.cs @@ -25,6 +25,7 @@ using System; using System.Text.Json.Serialization; +using VNLib.Data.Caching.Extensions; using VNLib.Plugins.Extensions.Loading; namespace VNLib.Plugins.Extensions.VNCache @@ -45,8 +46,8 @@ namespace VNLib.Plugins.Extensions.VNCache /// <summary> /// The broker server address /// </summary> - [JsonPropertyName("broker_address")] - public string? BrokerAddress { get; set; } + [JsonPropertyName("use_tls")] + public bool UseTls { get; set; } = true; /// <summary> /// The time (in seconds) to randomly delay polling the broker server @@ -77,6 +78,12 @@ namespace VNLib.Plugins.Extensions.VNCache /// </summary> internal TimeSpan RequestTimeout => TimeSpan.FromSeconds(RequestTimeoutSeconds!.Value); + /// <summary> + /// The initial peers to connect to + /// </summary> + [JsonPropertyName("initial_nodes")] + public InitialNode[]? InitialNodes { get; set; } + void IOnConfigValidation.Validate() { if (!MaxMessageSize.HasValue || MaxMessageSize.Value < 1) @@ -95,9 +102,42 @@ namespace VNLib.Plugins.Extensions.VNCache throw new ArgumentException("You must specify a positive integer FBM message timoeut", "request_timeout_sec"); } - if(!Uri.TryCreate(BrokerAddress, UriKind.RelativeOrAbsolute, out _)) + //Validate initial nodes + if (InitialNodes == null || InitialNodes.Length == 0) + { + throw new ArgumentException("You must specify at least one initial peer", "initial_peers"); + } + + foreach (InitialNode peer in InitialNodes) + { + _ = peer.ConnectEndpoint ?? throw new ArgumentException("You must specify a connect endpoint for each initial node", "initial_nodes"); + _ = peer.NodeId ?? throw new ArgumentException("You must specify a node id for each initial node", "initial_nodes"); + } + } + + public sealed record class InitialNode : ICacheNodeAdvertisment + { + [JsonIgnore] + public Uri ConnectEndpoint { get; private set; } + + [JsonIgnore] + public Uri? DiscoveryEndpoint { get; private set; } + + [JsonPropertyName("node_id")] + public string? NodeId { get; set; } + + [JsonPropertyName("connect_endpoint")] + public string? ConnectEndpointString + { + get => ConnectEndpoint.ToString(); + set => ConnectEndpoint = new Uri(value!); + } + + [JsonPropertyName("discovery_endpoint")] + public string? DiscoveryEndpointString { - throw new ArgumentException("You must specify a valid HTTP uri broker address", "broker_address"); + get => DiscoveryEndpoint?.ToString(); + set => DiscoveryEndpoint = value == null ? null : new Uri(value); } } } diff --git a/plugins/ObjectCacheServer/src/CacheAuthKeyStore.cs b/plugins/ObjectCacheServer/src/CacheAuthKeyStore.cs new file mode 100644 index 0000000..6725fbe --- /dev/null +++ b/plugins/ObjectCacheServer/src/CacheAuthKeyStore.cs @@ -0,0 +1,117 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: ObjectCacheServer +* File: ObjectCacheServerEntry.cs +* +* ObjectCacheServerEntry.cs is part of ObjectCacheServer which is part of the larger +* VNLib collection of libraries and utilities. +* +* ObjectCacheServer is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* ObjectCacheServer is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Collections.Generic; +using System.Security.Cryptography; + +using VNLib.Hashing; +using VNLib.Plugins; +using VNLib.Hashing.IdentityUtility; +using VNLib.Plugins.Extensions.Loading; +using VNLib.Data.Caching.Extensions; + + +namespace VNLib.Data.Caching.ObjectCache.Server +{ + sealed record class CacheAuthKeyStore : ICacheAuthManager + { + private readonly IAsyncLazy<ReadOnlyJsonWebKey> _clientPub; + private readonly IAsyncLazy<ReadOnlyJsonWebKey> _cachePriv; + + public CacheAuthKeyStore(PluginBase plugin) + { + _clientPub = plugin.GetSecretAsync("client_public_key").ToLazy(r => r.GetJsonWebKey()); + _cachePriv = plugin.GetSecretAsync("cache_private_key").ToLazy(r => r.GetJsonWebKey()); + } + + ///<inheritdoc/> + public IReadOnlyDictionary<string, string?> GetJwtHeader() + { + return _cachePriv.Value.JwtHeader; + } + + ///<inheritdoc/> + public void SignJwt(JsonWebToken jwt) + { + jwt.SignFromJwk(_cachePriv.Value); + } + + ///<inheritdoc/> + public bool VerifyJwt(JsonWebToken jwt) + { + return jwt.VerifyFromJwk(_clientPub.Value); + } + + /// <summary> + /// Verifies the message against the stored cache key + /// </summary> + /// <param name="jwt">The token to verify</param> + /// <returns>True if the token was verified, false otherwise</returns> + public bool VerifyCachePeer(JsonWebToken jwt) + { + return jwt.VerifyFromJwk(_cachePriv.Value); + } + + ///<inheritdoc/> + public byte[] SignMessageHash(byte[] hash, HashAlg alg) + { + //try to get the rsa alg for the signing key + using RSA? rsa = _cachePriv.Value.GetRSAPublicKey(); + if (rsa != null) + { + return rsa.SignHash(hash, alg.GetAlgName(), RSASignaturePadding.Pkcs1); + } + + //try to get the ecdsa alg for the signing key + using ECDsa? ecdsa = _cachePriv.Value.GetECDsaPublicKey(); + if (ecdsa != null) + { + return ecdsa.SignHash(hash); + } + + throw new NotSupportedException("The signing key is not a valid RSA or ECDSA key"); + } + + ///<inheritdoc/> + public bool VerifyMessageHash(ReadOnlySpan<byte> hash, HashAlg alg, ReadOnlySpan<byte> signature) + { + //try to get the rsa alg for the signing key + using RSA? rsa = _clientPub.Value.GetRSAPublicKey(); + if (rsa != null) + { + return rsa.VerifyHash(hash, signature, alg.GetAlgName(), RSASignaturePadding.Pkcs1); + } + + //try to get the ecdsa alg for the signing key + using ECDsa? ecdsa = _clientPub.Value.GetECDsaPublicKey(); + if (ecdsa != null) + { + return ecdsa.VerifyHash(hash, signature); + } + + throw new NotSupportedException("The current key is not an RSA or ECDSA key and is not supported"); + } + } +} diff --git a/plugins/ObjectCacheServer/src/CacheEventQueueManager.cs b/plugins/ObjectCacheServer/src/CacheEventQueueManager.cs index 5fb6d2a..049069e 100644 --- a/plugins/ObjectCacheServer/src/CacheEventQueueManager.cs +++ b/plugins/ObjectCacheServer/src/CacheEventQueueManager.cs @@ -39,6 +39,7 @@ using VNLib.Plugins.Extensions.Loading.Events; namespace VNLib.Data.Caching.ObjectCache.Server { + [ConfigurationName("event_manager")] internal sealed class CacheEventQueueManager : ICacheEventQueueManager, IDisposable, IIntervalScheduleable { @@ -70,7 +71,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server } ///<inheritdoc/> - public AsyncQueue<ChangeEvent> Subscribe(ICachePeer peer) + public IPeerEventQueue Subscribe(ICachePeer peer) { NodeQueue? nq; @@ -198,7 +199,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server * attached to the queue */ - private sealed class NodeQueue + private sealed class NodeQueue : IPeerEventQueue { public int Listeners; @@ -243,6 +244,18 @@ namespace VNLib.Data.Caching.ObjectCache.Server Queue.TryEnque(changes[i]); } } + + ///<inheritdoc/> + public ValueTask<ChangeEvent> DequeueAsync(CancellationToken cancellation) + { + return Queue.DequeueAsync(cancellation); + } + + ///<inheritdoc/> + public bool TryDequeue(out ChangeEvent change) + { + return Queue.TryDequeue(out change); + } } } } diff --git a/plugins/ObjectCacheServer/src/CacheListenerPubQueue.cs b/plugins/ObjectCacheServer/src/CacheListenerPubQueue.cs new file mode 100644 index 0000000..52b6abf --- /dev/null +++ b/plugins/ObjectCacheServer/src/CacheListenerPubQueue.cs @@ -0,0 +1,117 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: ObjectCacheServer +* File: CacheListenerPubQueue.cs +* +* CacheListenerPubQueue.cs is part of ObjectCacheServer which is part of the larger +* VNLib collection of libraries and utilities. +* +* ObjectCacheServer is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* ObjectCacheServer is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Threading; +using System.Threading.Tasks; + +using VNLib.Utils.Async; +using VNLib.Utils.Logging; +using VNLib.Plugins; +using VNLib.Plugins.Extensions.Loading; + + +namespace VNLib.Data.Caching.ObjectCache.Server +{ + internal sealed class CacheListenerPubQueue : ICacheListenerEventQueue, IAsyncBackgroundWork + { + private readonly AsyncQueue<ChangeEvent> _listenerQueue; + private readonly ILogProvider _logProvider; + private readonly ICacheEventQueueManager _queueManager; + + public CacheListenerPubQueue(PluginBase plugin) + { + _queueManager = plugin.GetOrCreateSingleton<CacheEventQueueManager>(); + _logProvider = plugin.Log; + _listenerQueue = new AsyncQueue<ChangeEvent>(false, true); + + //Register processing worker + _ = plugin.ObserveWork(this, 500); + } + + ///<inheritdoc/> + public async Task DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken) + { + const int accumulatorSize = 64; + + try + { + //Accumulator for events + ChangeEvent[] accumulator = new ChangeEvent[accumulatorSize]; + int ptr = 0; + + //Listen for changes + while (true) + { + //Wait for next event + accumulator[ptr++] = await _listenerQueue.DequeueAsync(exitToken); + + //try to accumulate more events until we can't anymore + while (_listenerQueue.TryDequeue(out ChangeEvent? ev) && ptr < accumulatorSize) + { + accumulator[ptr++] = ev; + } + + //Publish all events to subscribers + _queueManager.PublishMultiple(accumulator.AsSpan(0, ptr)); + + //Reset pointer + ptr = 0; + } + } + catch (OperationCanceledException) + { + //Normal exit + pluginLog.Debug("Change queue listener worker exited"); + } + } + + ///<inheritdoc/> + public bool IsEnabled(object userState) + { + return userState is IPeerEventQueue; + } + + ///<inheritdoc/> + public void PublishEvent(ChangeEvent changeEvent) + { + if (!_listenerQueue.TryEnque(changeEvent)) + { + _logProvider.Warn("Cache listener event queue is overflowing"); + } + } + + ///<inheritdoc/> + public bool TryDequeue(object userState, out ChangeEvent changeEvent) + { + return (userState as IPeerEventQueue)!.TryDequeue(out changeEvent); + } + + ///<inheritdoc/> + public ValueTask<ChangeEvent> DequeueAsync(object userState, CancellationToken cancellation) + { + return (userState as IPeerEventQueue)!.DequeueAsync(cancellation); + } + } +} diff --git a/plugins/ObjectCacheServer/src/Endpoints/CacheStore.cs b/plugins/ObjectCacheServer/src/CacheStore.cs index 67db433..e7a7c63 100644 --- a/plugins/ObjectCacheServer/src/Endpoints/CacheStore.cs +++ b/plugins/ObjectCacheServer/src/CacheStore.cs @@ -3,9 +3,9 @@ * * Library: VNLib * Package: ObjectCacheServer -* File: ConnectEndpoint.cs +* File: CacheStore.cs * -* ConnectEndpoint.cs is part of ObjectCacheServer which is part of the larger +* CacheStore.cs is part of ObjectCacheServer which is part of the larger * VNLib collection of libraries and utilities. * * ObjectCacheServer is free software: you can redistribute it and/or modify @@ -25,14 +25,16 @@ using System; using System.Threading; using System.Threading.Tasks; + using VNLib.Utils.Logging; using VNLib.Plugins; using VNLib.Plugins.Extensions.Loading; + namespace VNLib.Data.Caching.ObjectCache.Server { [ConfigurationName("cache")] - sealed class CacheStore : ICacheStore, IDisposable + internal sealed class CacheStore : ICacheStore, IDisposable { public BlobCacheListener Listener { get; } @@ -76,11 +78,20 @@ namespace VNLib.Data.Caching.ObjectCache.Server plugin.Log.Verbose("Creating cache store with {bc} buckets, with {mc} items/bucket", cacheConf.BucketCount, cacheConf.MaxCacheEntries); + //calculate the max memory usage + ulong maxByteSize = ((ulong)cacheConf.MaxCacheEntries * (ulong)cacheConf.BucketCount * (ulong)cacheConf.MaxMessageSize); + + //Log max memory usage + plugin.Log.Debug("Maxium memory consumption {mx}Mb", maxByteSize / (ulong)(1024 * 1000)); + //Load the blob cache table system IBlobCacheTable bc = plugin.LoadMemoryCacheSystem(config, plugin.CacheHeap, cacheConf); + //Get the event listener + ICacheListenerEventQueue queue = plugin.GetOrCreateSingleton<CacheListenerPubQueue>(); + //Endpoint only allows for a single reader - return new(bc, plugin.Log, plugin.CacheHeap, true); + return new(bc, queue, plugin.Log, plugin.CacheHeap); } public void Dispose() diff --git a/plugins/ObjectCacheServer/src/Endpoints/CacheSystemUtil.cs b/plugins/ObjectCacheServer/src/CacheSystemUtil.cs index 669b84f..669b84f 100644 --- a/plugins/ObjectCacheServer/src/Endpoints/CacheSystemUtil.cs +++ b/plugins/ObjectCacheServer/src/CacheSystemUtil.cs diff --git a/plugins/ObjectCacheServer/src/Distribution/CacheNodeReplicationMaanger.cs b/plugins/ObjectCacheServer/src/Distribution/CacheNodeReplicationMaanger.cs index b453dcc..a55e8e2 100644 --- a/plugins/ObjectCacheServer/src/Distribution/CacheNodeReplicationMaanger.cs +++ b/plugins/ObjectCacheServer/src/Distribution/CacheNodeReplicationMaanger.cs @@ -58,7 +58,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Distribution //Get peer adapter PeerAdapter = plugin.GetOrCreateSingleton<PeerDiscoveryManager>(); - + CacheStore = plugin.GetOrCreateSingleton<CacheStore>(); } public async Task DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken) @@ -70,15 +70,15 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Distribution while (true) { //Get all new peers - ICachePeerAdvertisment[] peers = PeerAdapter.GetNewPeers(); + ICacheNodeAdvertisment[] peers = PeerAdapter.GetNewPeers(); if (peers.Length == 0) { pluginLog.Verbose("[REPL] No new peers to connect to"); } - //Connect to each peer - foreach (ICachePeerAdvertisment peer in peers) + //Connect to each peer as a background task + foreach (ICacheNodeAdvertisment peer in peers) { _ = Plugin.ObserveWork(() => OnNewPeerDoWorkAsync(peer, pluginLog, exitToken)); } @@ -104,13 +104,16 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Distribution pluginLog.Information("[REPL] Node replication worker exited"); } - private async Task OnNewPeerDoWorkAsync(ICachePeerAdvertisment newPeer, ILogProvider log, CancellationToken exitToken) + private async Task OnNewPeerDoWorkAsync(ICacheNodeAdvertisment newPeer, ILogProvider log, CancellationToken exitToken) { _ = newPeer ?? throw new ArgumentNullException(nameof(newPeer)); //Setup client FBMClient client = new(ClientConfig); + //Add peer to monitor + PeerAdapter.OnPeerListenerAttached(newPeer); + try { log.Information("Establishing replication connection to peer {server}...", newPeer.NodeId); @@ -198,7 +201,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Distribution return; case "deleted": //Delete the object from the store - await CacheStore.DeleteItemAsync(changedObject.CurrentId); + await CacheStore.DeleteItemAsync(changedObject.CurrentId, CancellationToken.None); break; case "modified": //Reload the record from the store @@ -226,6 +229,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Distribution //Check response code string status = response.Headers.First(static s => s.Header == HeaderCommand.Status).Value.ToString(); + if (ResponseCodes.Okay.Equals(status, StringComparison.Ordinal)) { //Update the record diff --git a/plugins/ObjectCacheServer/src/Distribution/CachePeerMonitor.cs b/plugins/ObjectCacheServer/src/Distribution/CachePeerMonitor.cs index 82b280c..f191c9d 100644 --- a/plugins/ObjectCacheServer/src/Distribution/CachePeerMonitor.cs +++ b/plugins/ObjectCacheServer/src/Distribution/CachePeerMonitor.cs @@ -22,7 +22,7 @@ * along with this program. If not, see https://www.gnu.org/licenses/. */ -using System; +using System.Linq; using System.Collections.Generic; using VNLib.Plugins; @@ -32,24 +32,39 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Distribution internal sealed class CachePeerMonitor : IPeerMonitor { - public CachePeerMonitor(PluginBase plugin) - { - } + private readonly LinkedList<ICachePeer> peers = new(); + + public CachePeerMonitor(PluginBase plugin) + { } + ///<inheritdoc/> public IEnumerable<ICachePeer> GetAllPeers() { - throw new NotImplementedException(); + lock(peers) + { + return peers.ToArray(); + } } + ///<inheritdoc/> public void OnPeerConnected(ICachePeer peer) { - throw new NotImplementedException(); + //When a peer is connected we can add it to the list so the replication manager can see it + lock(peers) + { + peers.AddLast(peer); + } } + ///<inheritdoc/> public void OnPeerDisconnected(ICachePeer peer) { - throw new NotImplementedException(); + //When a peer is disconnected we can remove it from the list + lock(peers) + { + peers.Remove(peer); + } } } } diff --git a/plugins/ObjectCacheServer/src/Distribution/ICachePeerAdapter.cs b/plugins/ObjectCacheServer/src/Distribution/ICachePeerAdapter.cs index d029f10..c3fb022 100644 --- a/plugins/ObjectCacheServer/src/Distribution/ICachePeerAdapter.cs +++ b/plugins/ObjectCacheServer/src/Distribution/ICachePeerAdapter.cs @@ -3,9 +3,9 @@ * * Library: VNLib * Package: ObjectCacheServer -* File: ObjectCacheServerEntry.cs +* File: ICachePeerAdapter.cs * -* ObjectCacheServerEntry.cs is part of ObjectCacheServer which is part of the larger +* ICachePeerAdapter.cs is part of ObjectCacheServer which is part of the larger * VNLib collection of libraries and utilities. * * ObjectCacheServer is free software: you can redistribute it and/or modify @@ -32,18 +32,18 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Distribution /// Gets the peers that have been discovered but not yet connected to /// </summary> /// <returns>A collection of peers that have not been connected to yet</returns> - ICachePeerAdvertisment[] GetNewPeers(); + ICacheNodeAdvertisment[] GetNewPeers(); /// <summary> /// Called when a peer has been connected to /// </summary> /// <param name="peer">The peer that has been connected</param> - void OnPeerListenerAttached(ICachePeerAdvertisment peer); + void OnPeerListenerAttached(ICacheNodeAdvertisment peer); /// <summary> /// Called when a peer has been disconnected from /// </summary> /// <param name="peer">The disconnected peer</param> - void OnPeerListenerDetatched(ICachePeerAdvertisment peer); + void OnPeerListenerDetatched(ICacheNodeAdvertisment peer); } } diff --git a/plugins/ObjectCacheServer/src/Distribution/IPeerMonitor.cs b/plugins/ObjectCacheServer/src/Distribution/IPeerMonitor.cs index b4cb840..028171f 100644 --- a/plugins/ObjectCacheServer/src/Distribution/IPeerMonitor.cs +++ b/plugins/ObjectCacheServer/src/Distribution/IPeerMonitor.cs @@ -45,9 +45,9 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Distribution void OnPeerDisconnected(ICachePeer peer); /// <summary> - /// Gets an enumerable of all peers currently active in the current peer + /// Gets an enumerable of all peers currently connected to this node /// </summary> - /// <returns></returns> + /// <returns>The collection of all connected peers</returns> IEnumerable<ICachePeer> GetAllPeers(); } } diff --git a/plugins/ObjectCacheServer/src/Distribution/KnownPeerList.cs b/plugins/ObjectCacheServer/src/Distribution/KnownPeerList.cs new file mode 100644 index 0000000..74df81f --- /dev/null +++ b/plugins/ObjectCacheServer/src/Distribution/KnownPeerList.cs @@ -0,0 +1,100 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: ObjectCacheServer +* File: KnownPeerList.cs +* +* KnownPeerList.cs is part of ObjectCacheServer which is part of the larger +* VNLib collection of libraries and utilities. +* +* ObjectCacheServer is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* ObjectCacheServer is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Linq; +using System.Collections.Generic; +using System.Text.Json.Serialization; + +using VNLib.Plugins; +using VNLib.Data.Caching.Extensions; +using VNLib.Plugins.Extensions.Loading; + + +namespace VNLib.Data.Caching.ObjectCache.Server.Distribution +{ + [ConfigurationName("known_peers")] + internal sealed class KnownPeerList + { + private readonly List<KnownPeer> _peers; + + public KnownPeerList(PluginBase plugin, IConfigScope config) + { + //Deserialze the known peers into an array + KnownPeer[] peers = config.Deserialze<KnownPeer[]>(); + + foreach (KnownPeer peer in peers) + { + //Validate the peer + peer.Validate(); + } + + _peers = peers?.ToList() ?? new(); + } + + public IEnumerable<ICacheNodeAdvertisment> GetPeers() + { + return _peers; + } + + private sealed class KnownPeer : ICacheNodeAdvertisment + { + public Uri? ConnectEndpoint { get; set; } + public Uri? DiscoveryEndpoint { get; set; } + + [JsonPropertyName("node_id")] + public string NodeId { get; set; } + + [JsonPropertyName("connect_url")] + public string? ConnectEpPath + { + get => ConnectEndpoint?.ToString() ?? string.Empty; + set => ConnectEndpoint = new Uri(value ?? string.Empty); + } + + [JsonPropertyName("discovery_url")] + public string? DiscoveryEpPath + { + get => DiscoveryEndpoint?.ToString() ?? string.Empty; + set => DiscoveryEndpoint = new Uri(value ?? string.Empty); + } + + public void Validate() + { + if (string.IsNullOrWhiteSpace(NodeId)) + { + throw new ArgumentException("Node ID cannot be null or whitespace"); + } + if (ConnectEndpoint is null) + { + throw new ArgumentException("Connect endpoint cannot be null"); + } + if (DiscoveryEndpoint is null) + { + throw new ArgumentException("Discovery endpoint cannot be null"); + } + } + } + } +} diff --git a/plugins/ObjectCacheServer/src/Distribution/PeerDiscoveryManager.cs b/plugins/ObjectCacheServer/src/Distribution/PeerDiscoveryManager.cs index 54e4258..26ec565 100644 --- a/plugins/ObjectCacheServer/src/Distribution/PeerDiscoveryManager.cs +++ b/plugins/ObjectCacheServer/src/Distribution/PeerDiscoveryManager.cs @@ -33,14 +33,16 @@ using VNLib.Utils.Logging; using VNLib.Data.Caching.Extensions; using VNLib.Plugins.Extensions.Loading; + namespace VNLib.Data.Caching.ObjectCache.Server.Distribution { sealed class PeerDiscoveryManager : IAsyncBackgroundWork, ICachePeerAdapter { + private readonly List<ICacheNodeAdvertisment> _connectedPeers; private readonly NodeConfig _config; - private readonly IPeerMonitor _monitor; - private readonly INodeDiscoveryCollection _peers; + private readonly IPeerMonitor _monitor; + private readonly KnownPeerList _knownPeers; public PeerDiscoveryManager(PluginBase plugin) { @@ -50,14 +52,23 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Distribution //Get the peer monitor _monitor = plugin.GetOrCreateSingleton<CachePeerMonitor>(); - //Get the node collection - _peers = plugin.GetOrCreateSingleton<NodeDiscoveryCollection>(); + //Get the known peer list + _knownPeers = plugin.GetOrCreateSingleton<KnownPeerList>(); _connectedPeers = new(); + + //Setup discovery error handler + _config.Config.WithErrorHandler(new ErrorHandler(plugin.Log)); } async Task IAsyncBackgroundWork.DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken) { + /* + * This loop uses the peer monitor to keep track of all connected peers, then gets + * all the advertising peers (including known peers) and resolves all nodes across + * the network. + */ + pluginLog.Information("Node discovery worker started"); try @@ -66,7 +77,19 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Distribution { try { - await DiscoverAllNodesAsync(pluginLog, exitToken); + //Use the monitor to get the initial peers + IEnumerable<ICacheNodeAdvertisment> ads = _monitor.GetAllPeers() + .Where(static p => p.Advertisment != null) + .Select(static p => p.Advertisment!); + + //Add known peers to the initial list + ads = ads.Union(_knownPeers.GetPeers()); + + //Set initial peers + _config.Config.WithInitialPeers(ads); + + //Discover all nodes + await _config.Config.DiscoverNodesAsync(exitToken); } catch(OperationCanceledException) { @@ -91,54 +114,15 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Distribution } } - - async Task DiscoverAllNodesAsync(ILogProvider log, CancellationToken cancellation) - { - //Use the monitor to get the initial peers - IEnumerable<ICachePeerAdvertisment> ads = _monitor.GetAllPeers() - .Where(static p => p.Advertisment != null) - .Select(static p => p.Advertisment!); - - //Init enumerator with initial peers - INodeDiscoveryEnumerator enumerator = _peers.BeginDiscovery(ads); - - do - { - //Load the initial peer - ICachePeerAdvertisment? peer = enumerator.GetNextPeer(); - - if (peer == null) - { - break; - } - - log.Verbose("Discovering peer nodes from {Peer}", peer.NodeId); - - //Discover nodes from this peer - ICachePeerAdvertisment[]? newNodes = await _config.Config.DiscoverClusterNodesAsync(peer, cancellation); - - //Add nodes to the enumerator - if (newNodes != null) - { - enumerator.OnPeerDiscoveryComplete(newNodes); - } - - } while (true); - - //Commit peer updates - _peers.CompleteDiscovery(enumerator); - } - - - private readonly List<ICachePeerAdvertisment> _connectedPeers; + ///<inheritdoc/> - public ICachePeerAdvertisment[] GetNewPeers() + public ICacheNodeAdvertisment[] GetNewPeers() { lock (_connectedPeers) { //Get all discovered peers - ICachePeerAdvertisment[] peers = _peers.GetAllNodes(); + ICacheNodeAdvertisment[] peers = _config.Config.NodeCollection.GetAllNodes(); //Get the difference between the discovered peers and the connected peers return peers.Except(_connectedPeers).ToArray(); @@ -146,7 +130,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Distribution } ///<inheritdoc/> - public void OnPeerListenerAttached(ICachePeerAdvertisment peer) + public void OnPeerListenerAttached(ICacheNodeAdvertisment peer) { lock (_connectedPeers) { @@ -156,7 +140,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Distribution } ///<inheritdoc/> - public void OnPeerListenerDetatched(ICachePeerAdvertisment peer) + public void OnPeerListenerDetatched(ICacheNodeAdvertisment peer) { //remove from connected peers lock (_connectedPeers) @@ -164,5 +148,14 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Distribution _connectedPeers.Remove(peer); } } + + + private sealed record class ErrorHandler(ILogProvider Logger) : ICacheDiscoveryErrorHandler + { + public void OnDiscoveryError(ICacheNodeAdvertisment errorNode, Exception ex) + { + Logger.Error("Failed to discover nodes from nodeid {nid}, with error\n{err}", errorNode, ex); + } + } } } diff --git a/plugins/ObjectCacheServer/src/Endpoints/BrokerHeartBeatEndpoint.cs b/plugins/ObjectCacheServer/src/Endpoints/BrokerHeartBeatEndpoint.cs deleted file mode 100644 index b9c00e6..0000000 --- a/plugins/ObjectCacheServer/src/Endpoints/BrokerHeartBeatEndpoint.cs +++ /dev/null @@ -1,149 +0,0 @@ -/* -* Copyright (c) 2023 Vaughn Nugent -* -* Library: VNLib -* Package: ObjectCacheServer -* File: BrokerHeartBeatEndpoint.cs -* -* BrokerHeartBeatEndpoint.cs is part of ObjectCacheServer which is part of the larger -* VNLib collection of libraries and utilities. -* -* ObjectCacheServer is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License as -* published by the Free Software Foundation, either version 3 of the -* License, or (at your option) any later version. -* -* ObjectCacheServer is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see https://www.gnu.org/licenses/. -*/ - -using System; -using System.Net; -using System.Linq; -using System.Text.Json; -using System.Threading.Tasks; - - -using VNLib.Plugins; -using VNLib.Utils.Logging; -using VNLib.Plugins.Essentials; -using VNLib.Hashing.IdentityUtility; -using VNLib.Plugins.Essentials.Endpoints; -using VNLib.Plugins.Essentials.Extensions; -using VNLib.Plugins.Extensions.Loading; - -namespace VNLib.Data.Caching.ObjectCache.Server -{ - internal sealed class BrokerHeartBeatEndpoint : ResourceEndpointBase - { - private readonly IBrokerHeartbeatNotifier _heartBeat; - private readonly Task<IPAddress[]> BrokerIpList; - private readonly bool DebugMode; - - ///<inheritdoc/> - protected override ProtectionSettings EndpointProtectionSettings { get; } = new() - { - DisableBrowsersOnly = true, - DisableSessionsRequired = true - }; - - public BrokerHeartBeatEndpoint(PluginBase plugin) - { - //Get debug flag - DebugMode = plugin.IsDebug(); - - //Get or create the current node config - _heartBeat = plugin.GetOrCreateSingleton<NodeConfig>(); - - /* - * Resolve the ip address of the broker and store it to verify connections - * later - */ - BrokerIpList = Dns.GetHostAddressesAsync(_heartBeat.GetBrokerAddress().DnsSafeHost); - - //Setup endpoint - InitPathAndLog("/heartbeat", plugin.Log); - } - - - protected override async ValueTask<VfReturnType> GetAsync(HttpEntity entity) - { - //If-not loopback then verify server address - if (!entity.Server.IsLoopBack()) - { - //Load and verify the broker's ip address matches with an address we have stored - IPAddress[] addresses = await BrokerIpList; - - if (!addresses.Contains(entity.TrustedRemoteIp)) - { - if (DebugMode) - { - Log.Debug("Received connection {ip} that was not a DNS safe address for the broker server, access denied"); - } - - //Token invalid - entity.CloseResponse(HttpStatusCode.Forbidden); - return VfReturnType.VirtualSkip; - } - } - - //Get the authorization jwt - string? jwtAuth = entity.Server.Headers[HttpRequestHeader.Authorization]; - - if (string.IsNullOrWhiteSpace(jwtAuth)) - { - //Token invalid - entity.CloseResponse(HttpStatusCode.Forbidden); - return VfReturnType.VirtualSkip; - } - - //Parse the jwt - using JsonWebToken jwt = JsonWebToken.Parse(jwtAuth); - - //Verify the jwt using the broker's public key certificate - using (ReadOnlyJsonWebKey cert = _heartBeat.GetBrokerPublicKey()) - { - //Verify the jwt - if (!jwt.VerifyFromJwk(cert)) - { - //Token invalid - entity.CloseResponse(HttpStatusCode.Forbidden); - return VfReturnType.VirtualSkip; - } - } - - string? auth; - //Recover the auth token from the jwt - using (JsonDocument doc = jwt.GetPayload()) - { - auth = doc.RootElement.GetProperty("token").GetString(); - } - - //Get our stored token used for registration - string? selfToken = _heartBeat.GetAuthToken(); - - //Verify token - if (selfToken != null && selfToken.Equals(auth, StringComparison.Ordinal)) - { - //Signal keepalive - _heartBeat.HearbeatReceived(); - entity.CloseResponse(HttpStatusCode.OK); - return VfReturnType.VirtualSkip; - } - - if (DebugMode) - { - Log.Debug("Invalid auth token recieved from broker sever, access denied"); - } - - //Token invalid - entity.CloseResponse(HttpStatusCode.Forbidden); - return VfReturnType.VirtualSkip; - } - } -} diff --git a/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs b/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs index 167a7e9..8352635 100644 --- a/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs +++ b/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs @@ -31,10 +31,8 @@ using System.Collections.Generic; using VNLib.Hashing; using VNLib.Net.Http; -using VNLib.Utils.Async; using VNLib.Utils.Memory; using VNLib.Utils.Logging; -using VNLib.Utils.Extensions; using VNLib.Data.Caching; using VNLib.Data.Caching.Extensions; using VNLib.Hashing.IdentityUtility; @@ -49,21 +47,21 @@ using VNLib.Plugins.Essentials.Extensions; using VNLib.Plugins.Extensions.Loading.Routing; using VNLib.Data.Caching.ObjectCache.Server.Distribution; -namespace VNLib.Data.Caching.ObjectCache.Server + +namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints { [ConfigurationName("connect_endpoint")] - internal sealed class ConnectEndpoint : ResourceEndpointBase, IAsyncBackgroundWork + internal sealed class ConnectEndpoint : ResourceEndpointBase { private static readonly TimeSpan AuthTokenExpiration = TimeSpan.FromSeconds(30); - private readonly CacheNodeConfiguration NodeConfiguration; + private readonly NodeConfig NodeConfiguration; private readonly ICacheEventQueueManager PubSubManager; private readonly IPeerMonitor Peers; private readonly BlobCacheListener Store; - private readonly CacheAuthKeyStore KeyStore; private readonly bool VerifyIp; private readonly string AudienceLocalServerId; @@ -94,8 +92,6 @@ namespace VNLib.Data.Caching.ObjectCache.Server InitPathAndLog(path, plugin.Log); - KeyStore = new(plugin); - //Check for ip-verification flag VerifyIp = config.TryGetValue("verify_ip", out JsonElement vIpEl) && vIpEl.GetBoolean(); @@ -103,7 +99,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server PubSubManager = plugin.GetOrCreateSingleton<CacheEventQueueManager>(); //Get node configuration - NodeConfiguration = plugin.GetOrCreateSingleton<NodeConfig>().Config; + NodeConfiguration = plugin.GetOrCreateSingleton<NodeConfig>(); //Get peer monitor Peers = plugin.GetOrCreateSingleton<CachePeerMonitor>(); @@ -119,9 +115,6 @@ namespace VNLib.Data.Caching.ObjectCache.Server * know client tokens belong to us when singed by the same key */ AudienceLocalServerId = Guid.NewGuid().ToString("N"); - - //Schedule the queue worker to be run - _ = plugin.ObserveWork(this, 100); } @@ -142,7 +135,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server * received the messages properly */ - protected override async ValueTask<VfReturnType> GetAsync(HttpEntity entity) + protected override VfReturnType Get(HttpEntity entity) { //Parse jwt from authoriation string? jwtAuth = entity.Server.Headers[HttpRequestHeader.Authorization]; @@ -161,22 +154,18 @@ namespace VNLib.Data.Caching.ObjectCache.Server { bool verified = false; - //Get the client public key certificate to verify the client's message - using(ReadOnlyJsonWebKey cert = await KeyStore.GetClientPublicKeyAsync()) + //verify signature for client + if (NodeConfiguration.KeyStore.VerifyJwt(jwt)) { - //verify signature for client - if (jwt.VerifyFromJwk(cert)) - { - verified = true; - } - //May be signed by a cache server - else - { - //Set peer and verified flag since the another cache server signed the request - isPeer = verified = NodeConfiguration.VerifyCache(jwt); - } + verified = true; + } + //May be signed by a cache server + else + { + //Set peer and verified flag since the another cache server signed the request + isPeer = verified = NodeConfiguration.KeyStore.VerifyCachePeer(jwt); } - + //Check flag if (!verified) { @@ -204,7 +193,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server //Verified, now we can create an auth message with a short expiration using JsonWebToken auth = new(); - auth.WriteHeader(NodeConfiguration.GetJwtHeader()); + auth.WriteHeader(NodeConfiguration.KeyStore.GetJwtHeader()); auth.InitPayloadClaim() .AddClaim("aud", AudienceLocalServerId) .AddClaim("exp", entity.RequestedTimeUtc.Add(AuthTokenExpiration).ToUnixTimeSeconds()) @@ -223,14 +212,14 @@ namespace VNLib.Data.Caching.ObjectCache.Server .CommitClaims(); //Sign the auth message from our private key - NodeConfiguration.SignJwt(auth); + NodeConfiguration.KeyStore.SignJwt(auth); //Close response entity.CloseResponse(HttpStatusCode.OK, ContentType.Text, auth.DataBuffer); return VfReturnType.VirtualSkip; } - protected override async ValueTask<VfReturnType> WebsocketRequestedAsync(HttpEntity entity) + protected override VfReturnType WebsocketRequested(HttpEntity entity) { //Parse jwt from authorization string? jwtAuth = entity.Server.Headers[HttpRequestHeader.Authorization]; @@ -251,13 +240,13 @@ namespace VNLib.Data.Caching.ObjectCache.Server } string? nodeId = null; - ICachePeerAdvertisment? discoveryAd = null; + ICacheNodeAdvertisment? discoveryAd = null; //Parse jwt using (JsonWebToken jwt = JsonWebToken.Parse(jwtAuth)) { //verify signature against the cache public key, since this server must have signed it - if (!NodeConfiguration.VerifyCache(jwt)) + if (!NodeConfiguration.KeyStore.VerifyCachePeer(jwt)) { entity.CloseResponse(HttpStatusCode.Unauthorized); return VfReturnType.VirtualSkip; @@ -313,7 +302,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server if (isPeer) { //Verify token signature against a fellow cache public key - if (!NodeConfiguration.VerifyUpgradeToken(clientSignature, jwtAuth)) + if (!NodeConfiguration.KeyStore.VerifyUpgradeToken(clientSignature, jwtAuth)) { entity.CloseResponse(HttpStatusCode.Unauthorized); return VfReturnType.VirtualSkip; @@ -325,16 +314,13 @@ namespace VNLib.Data.Caching.ObjectCache.Server //Verify the node advertisement header and publish it if (!string.IsNullOrWhiteSpace(discoveryHeader)) { - discoveryAd = NodeConfiguration.VerifyPeerAdvertisment(discoveryHeader); + discoveryAd = NodeConfiguration.KeyStore.VerifyPeerAdvertisment(discoveryHeader); } } else { //Not a peer, so verify against the client's public key - using ReadOnlyJsonWebKey clientPub = await KeyStore.GetClientPublicKeyAsync(); - - //Verify token signature - if (!FBMDataCacheExtensions.VerifyUpgradeToken(clientSignature, jwtAuth, clientPub)) + if (!NodeConfiguration.KeyStore.VerifyUpgradeToken(clientSignature, jwtAuth)) { entity.CloseResponse(HttpStatusCode.Unauthorized); return VfReturnType.VirtualSkip; @@ -426,7 +412,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server if (!string.IsNullOrWhiteSpace(state.NodeId)) { //Get the event queue for the current node - AsyncQueue<ChangeEvent> queue = PubSubManager.Subscribe(state); + IPeerEventQueue queue = PubSubManager.Subscribe(state); try { @@ -470,44 +456,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server Log.Debug("Server websocket exited"); } - - - //Background worker to process event queue items - async Task IAsyncBackgroundWork.DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken) - { - const int accumulatorSize = 64; - - try - { - //Accumulator for events - ChangeEvent[] accumulator = new ChangeEvent[accumulatorSize]; - int ptr = 0; - - //Listen for changes - while (true) - { - //Wait for next event - accumulator[ptr++] = await Store.EventQueue.DequeueAsync(exitToken); - - //try to accumulate more events until we can't anymore - while (Store.EventQueue.TryDequeue(out ChangeEvent? ev) && ptr < accumulatorSize) - { - accumulator[ptr++] = ev; - } - - //Publish all events to subscribers - PubSubManager.PublishMultiple(accumulator.AsSpan(0, ptr)); - - //Reset pointer - ptr = 0; - } - } - catch (OperationCanceledException) - { - //Normal exit - pluginLog.Debug("Change queue listener worker exited"); - } - } + private class WsUserState : ICachePeer { @@ -516,7 +465,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server public int MaxMessageSize { get; init; } public int MaxResponseBufferSize { get; init; } public string? NodeId { get; init; } - public ICachePeerAdvertisment? Advertisment { get; init; } + public ICacheNodeAdvertisment? Advertisment { get; init; } public override string ToString() { diff --git a/plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs b/plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs index 670d624..90ffca0 100644 --- a/plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs +++ b/plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs @@ -23,8 +23,8 @@ */ using System; -using System.Linq; using System.Net; +using System.Linq; using System.Text.Json; using System.Threading.Tasks; @@ -76,14 +76,12 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints using(JsonWebToken jwt = JsonWebToken.Parse(authToken)) { //try to verify against cache node first - if (!Config.Config.VerifyCache(jwt)) + if (!Config.KeyStore.VerifyCachePeer(jwt)) { //failed... //try to verify against client key - using ReadOnlyJsonWebKey clientPub = await Config.KeyStore.GetClientPublicKeyAsync(); - - if (!jwt.VerifyFromJwk(clientPub)) + if (!Config.KeyStore.VerifyJwt(jwt)) { //invalid token entity.CloseResponse(HttpStatusCode.Unauthorized); @@ -97,7 +95,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints } //Valid key, get peer list to send to client - ICachePeerAdvertisment[] peers = PeerMonitor.GetAllPeers() + ICacheNodeAdvertisment[] peers = PeerMonitor.GetAllPeers() .Where(static p => p.Advertisment != null) .Select(static p => p.Advertisment!) .ToArray(); @@ -106,7 +104,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints using JsonWebToken response = new(); //set header from cache config - response.WriteHeader(Config.Config.GetJwtHeader()); + response.WriteHeader(Config.KeyStore.GetJwtHeader()); response.InitPayloadClaim() .AddClaim("iss", Config.Config.NodeId) @@ -119,7 +117,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints .CommitClaims(); //Sign the response - Config.Config.SignJwt(response); + Config.KeyStore.SignJwt(response); //Send response to client entity.CloseResponse(HttpStatusCode.OK, Net.Http.ContentType.Text, response.DataBuffer); diff --git a/plugins/ObjectCacheServer/src/ICacheEventQueueManager.cs b/plugins/ObjectCacheServer/src/ICacheEventQueueManager.cs index 6b07000..20edf0b 100644 --- a/plugins/ObjectCacheServer/src/ICacheEventQueueManager.cs +++ b/plugins/ObjectCacheServer/src/ICacheEventQueueManager.cs @@ -24,9 +24,6 @@ using System; -using VNLib.Utils.Async; - - namespace VNLib.Data.Caching.ObjectCache.Server { /// <summary> @@ -49,14 +46,14 @@ namespace VNLib.Data.Caching.ObjectCache.Server /// <summary> /// Attatches a subscriber that will receive all published changes /// </summary> - /// <param name="nodeId">The id of the node to get the queue for</param> + /// <param name="peer">The peer node that wishes to subscribe for events</param> /// <returns>The initilaizes event queue for the single subscriber</returns> - AsyncQueue<ChangeEvent> Subscribe(ICachePeer peer); + IPeerEventQueue Subscribe(ICachePeer peer); /// <summary> /// Detatches a subscriber from the event queue /// </summary> - /// <param name="nodeId">The id of the nede to detach</param> + /// <param name="peer">The peer to unsubscribe from events</param> void Unsubscribe(ICachePeer peer); /// <summary> diff --git a/plugins/ObjectCacheServer/src/ICachePeer.cs b/plugins/ObjectCacheServer/src/ICachePeer.cs index d374400..97b406f 100644 --- a/plugins/ObjectCacheServer/src/ICachePeer.cs +++ b/plugins/ObjectCacheServer/src/ICachePeer.cs @@ -39,6 +39,6 @@ namespace VNLib.Data.Caching.ObjectCache.Server /// <summary> /// An optional signed advertisment message for other peers /// </summary> - ICachePeerAdvertisment? Advertisment { get; } + ICacheNodeAdvertisment? Advertisment { get; } } } diff --git a/plugins/ObjectCacheServer/src/IPeerEventQueue.cs b/plugins/ObjectCacheServer/src/IPeerEventQueue.cs new file mode 100644 index 0000000..63dbd3f --- /dev/null +++ b/plugins/ObjectCacheServer/src/IPeerEventQueue.cs @@ -0,0 +1,50 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: ObjectCacheServer +* File: CacheEventQueueManager.cs +* +* CacheEventQueueManager.cs is part of ObjectCacheServer which is +* part of the larger VNLib collection of libraries and utilities. +* +* ObjectCacheServer is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* ObjectCacheServer is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System.Threading; +using System.Threading.Tasks; + + +namespace VNLib.Data.Caching.ObjectCache.Server +{ + /// <summary> + /// Represents a queue of events for a specific peer node + /// </summary> + internal interface IPeerEventQueue + { + /// <summary> + /// Dequeues an event from the queue asynchronously + /// </summary> + /// <param name="cancellation">A token to cancel the operation</param> + /// <returns>The value task that represents the wait</returns> + ValueTask<ChangeEvent> DequeueAsync(CancellationToken cancellation); + + /// <summary> + /// Attemts to dequeue an event from the queue + /// </summary> + /// <param name="change">The change event that was dequeued if possible</param> + /// <returns>True if the event was dequeued</returns> + bool TryDequeue(out ChangeEvent change); + } +} diff --git a/plugins/ObjectCacheServer/src/NodeConfig.cs b/plugins/ObjectCacheServer/src/NodeConfig.cs index 614f0d6..81b8a32 100644 --- a/plugins/ObjectCacheServer/src/NodeConfig.cs +++ b/plugins/ObjectCacheServer/src/NodeConfig.cs @@ -3,9 +3,9 @@ * * Library: VNLib * Package: ObjectCacheServer -* File: ObjectCacheServerEntry.cs +* File: NodeConfig.cs * -* ObjectCacheServerEntry.cs is part of ObjectCacheServer which is part of the larger +* NodeConfig.cs is part of ObjectCacheServer which is part of the larger * VNLib collection of libraries and utilities. * * ObjectCacheServer is free software: you can redistribute it and/or modify @@ -25,63 +25,48 @@ using System; using System.Net; using System.Linq; -using System.Net.Http; using System.Text.Json; -using System.Threading; -using System.Net.Sockets; using System.Threading.Tasks; -using System.Collections.Generic; -using System.Security.Cryptography; using VNLib.Plugins; using VNLib.Utils; using VNLib.Utils.Logging; -using VNLib.Utils.Extensions; -using VNLib.Hashing; -using VNLib.Hashing.IdentityUtility; using VNLib.Data.Caching.Extensions; using VNLib.Plugins.Extensions.Loading; - +using VNLib.Data.Caching.ObjectCache.Server.Endpoints; namespace VNLib.Data.Caching.ObjectCache.Server { [ConfigurationName("cluster")] - internal sealed class NodeConfig : VnDisposeable, IAsyncConfigurable, IAsyncBackgroundWork, IBrokerHeartbeatNotifier + internal sealed class NodeConfig : VnDisposeable { const string CacheConfigTemplate = @" Cluster Configuration: - Broker Address: {ba} - Heartbeat Timeout: {hb} Node Id: {id} TlsEndabled: {tls}, Cache Endpoint: {ep} "; public CacheNodeConfiguration Config { get; } - public CacheAuthKeyStore KeyStore { get; } - - private readonly ManualResetEventSlim hearbeatHandle; - private readonly TimeSpan _hearbeatTimeout; - private string? _authToken; + public CacheAuthKeyStore KeyStore { get; } public NodeConfig(PluginBase plugin, IConfigScope config) { //Server id is just dns name for now string nodeId = Dns.GetHostName(); - Config = new(); - //Get the heartbeat interval - TimeSpan heartBeatDelayMs = config["heartbeat_timeout_sec"].GetTimeSpan(TimeParseType.Seconds); - - string brokerAddr = config["broker_address"].GetString() ?? throw new KeyNotFoundException("Missing required key 'broker_address' for config 'cluster'"); + Config = new(); //Get the port of the primary webserver int port; bool usingTls; { - JsonElement firstHost = plugin.HostConfig.GetProperty("virtual_hosts").EnumerateArray().First(); + //Get the port number of the first virtual host + JsonElement firstHost = plugin.HostConfig.GetProperty("virtual_hosts") + .EnumerateArray() + .First(); port = firstHost.GetProperty("interface") .GetProperty("port") @@ -91,147 +76,51 @@ namespace VNLib.Data.Caching.ObjectCache.Server usingTls = firstHost.TryGetProperty("ssl", out _); } - //Get the cache endpoint config - IConfigScope cacheEpConfig = plugin.GetConfigForType<ConnectEndpoint>(); - //The endpoint to advertise to cache clients that allows cache connections - UriBuilder endpoint = new(usingTls ? Uri.UriSchemeHttps : Uri.UriSchemeHttp, nodeId, port, cacheEpConfig["path"].GetString()); + Uri cacheEndpoint = GetEndpointUri<ConnectEndpoint>(plugin, usingTls, port, nodeId); + + //Init key store + KeyStore = new(plugin); //Setup cache node config - Config.WithCacheEndpoint(endpoint.Uri) + Config.WithCacheEndpoint(cacheEndpoint) .WithNodeId(nodeId) - .WithTls(usingTls) - .WithBroker(new(brokerAddr)); + .WithAuthenticator(KeyStore) + .WithTls(usingTls); //Check if advertising is enabled if(config.TryGetValue("advertise", out JsonElement adEl) && adEl.GetBoolean()) { - Config.EnableAdvertisment(true, ""); - } - - //Init key store - KeyStore = new(plugin); - - //Init heartbeat handle unsiganled waiting for first heartbeat - hearbeatHandle = new(false); - - //Schedule heartbeat - _ = plugin.ObserveWork(this, 500); + //Get the the broadcast endpoint + Uri discoveryEndpoint = GetEndpointUri<PeerDiscoveryEndpoint>(plugin, usingTls, port, nodeId); + //Enable advertising + Config.EnableAdvertisment(discoveryEndpoint); + } + + //log the config plugin.Log.Information(CacheConfigTemplate, - brokerAddr, - heartBeatDelayMs, nodeId, usingTls, - endpoint.Uri); + cacheEndpoint + ); } - async Task IAsyncConfigurable.ConfigureServiceAsync(PluginBase plugin) + private static Uri GetEndpointUri<T>(PluginBase plugin, bool usingTls, int port, string hostName) where T: IEndpoint { - //Get cache private key for signing from the key store - ReadOnlyJsonWebKey signingKey = await KeyStore.GetCachePrivateAsync(); - - Config.WithSigningKey(signingKey); - - //Get broker public key for verifying from the key store - ReadOnlyJsonWebKey brokerKey = await KeyStore.GetBrokerPublicAsync(); + //Get the cache endpoint config + IConfigScope cacheEpConfig = plugin.GetConfigForType<T>(); - Config.WithBrokerVerificationKey(brokerKey); + //The endpoint to advertise to cache clients that allows cache connections + return new UriBuilder(usingTls ? Uri.UriSchemeHttps : Uri.UriSchemeHttp, hostName, port, cacheEpConfig["path"].GetString()).Uri; } + protected override void Free() { - //Dispose the heartbeat handle - hearbeatHandle.Dispose(); - //cleanup keys - Config.SigningKey?.Dispose(); - Config.VerificationKey?.Dispose(); - Config.BrokerVerificationKey?.Dispose(); - } - - ///<inheritdoc/> - public void HearbeatReceived() - { - //Set the heartbeat handle as received - hearbeatHandle.Set(); + } - - ///<inheritdoc/> - public string? GetAuthToken() => _authToken; - - ///<inheritdoc/> - public Uri GetBrokerAddress() => Config.DiscoveryEndpoint!; - - ///<inheritdoc/> - public ReadOnlyJsonWebKey GetBrokerPublicKey() => Config.BrokerVerificationKey!; - - - /* - * Worker loop for registering with the broker and monitoring hearbeat requests - */ - async Task IAsyncBackgroundWork.DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken) - { - //Listen in loop - while (true) - { - try - { - //Regen the auth token before registering - _authToken = RandomHash.GetRandomBase32(32); - - pluginLog.Information("Registering with cache broker server with id {id}", Config.NodeId); - - //Register with the broker and pass the current auth token - await Config.RegisterWithBrokerAsync(_authToken); - - //Enter heartbeat loop - while (true) - { - //Wait for the heartbeat timeout - await Task.Delay(_hearbeatTimeout, exitToken); - - //Confrim the hearbeat was received within the timeout period - if (!hearbeatHandle.IsSet) - { - //If the heartbeat handle is not set, the heartbeat was not received, reg-register - pluginLog.Information("Broker missed hearbeat request"); - - //not received, break out of the heartbeat loop to re-register - break; - } - - //Reset the handle and continue the heartbeat loop - hearbeatHandle.Reset(); - } - - //Add random delay to prevent all nodes from re-registering at the same time - await Task.Delay(RandomNumberGenerator.GetInt32(1000, 5000), exitToken); - } - catch (OperationCanceledException) - { - pluginLog.Debug("Registration loop exited on unload"); - break; - } - catch (TimeoutException) - { - pluginLog.Warn("Failed to connect to cache broker server within the specified timeout period"); - } - catch (HttpRequestException re) when (re.InnerException is SocketException) - { - pluginLog.Warn("Cache broker is unavailable or network is unavailable"); - } - catch(HttpRequestException re) when (re.StatusCode.HasValue) - { - pluginLog.Warn("Failed to register with cache broker server, received status code {code}", re.StatusCode); - } - catch (Exception ex) - { - pluginLog.Warn("Exception occured in registraion loop: {ex}", ex!.Message); - } - } - } - } } diff --git a/plugins/ObjectCacheServer/src/ObjectCacheServer.csproj b/plugins/ObjectCacheServer/src/ObjectCacheServer.csproj index 0c53095..e8d6291 100644 --- a/plugins/ObjectCacheServer/src/ObjectCacheServer.csproj +++ b/plugins/ObjectCacheServer/src/ObjectCacheServer.csproj @@ -37,8 +37,8 @@ </ItemGroup> <ItemGroup> <ProjectReference Include="..\..\..\..\..\core\lib\Plugins.PluginBase\src\VNLib.Plugins.PluginBase.csproj" /> + <ProjectReference Include="..\..\..\..\Extensions\lib\VNLib.Plugins.Extensions.Loading\src\VNLib.Plugins.Extensions.Loading.csproj" /> <ProjectReference Include="..\..\..\lib\VNLib.Data.Caching.Extensions\src\VNLib.Data.Caching.Extensions.csproj" /> <ProjectReference Include="..\..\..\lib\VNLib.Data.Caching.ObjectCache\src\VNLib.Data.Caching.ObjectCache.csproj" /> - <ProjectReference Include="..\..\CacheBroker\src\CacheBroker.csproj" /> </ItemGroup> </Project> diff --git a/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs b/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs index c1a6ad2..5d9a50b 100644 --- a/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs +++ b/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs @@ -23,80 +23,20 @@ */ using System; -using System.IO; -using System.Net; -using System.Linq; -using System.Net.Http; using System.Threading; -using System.Net.Sockets; -using System.Threading.Tasks; using System.Collections.Generic; -using System.Security.Cryptography; using VNLib.Plugins; using VNLib.Utils.Memory; using VNLib.Utils.Logging; using VNLib.Utils.Memory.Diagnostics; -using VNLib.Hashing.IdentityUtility; -using VNLib.Data.Caching.Extensions; -using static VNLib.Data.Caching.Constants; -using VNLib.Net.Messaging.FBM; -using VNLib.Net.Messaging.FBM.Client; -using VNLib.Plugins.Cache.Broker.Endpoints; using VNLib.Plugins.Extensions.Loading; using VNLib.Plugins.Extensions.Loading.Routing; using VNLib.Data.Caching.ObjectCache.Server.Endpoints; + namespace VNLib.Data.Caching.ObjectCache.Server { - sealed record class CacheAuthKeyStore(PluginBase Plugin) - { - public Task<ReadOnlyJsonWebKey> GetCachePublicAsync() - { - return Plugin.TryGetSecretAsync("cache_private_key").ToJsonWebKey(true); - } - - public Task<ReadOnlyJsonWebKey> GetCachePrivateAsync() - { - return Plugin.TryGetSecretAsync("cache_private_key").ToJsonWebKey(true); - } - - public Task<ReadOnlyJsonWebKey> GetBrokerPublicAsync() - { - return Plugin.TryGetSecretAsync("broker_public_key").ToJsonWebKey(true); - } - - public Task<ReadOnlyJsonWebKey> GetClientPublicKeyAsync() - { - return Plugin.TryGetSecretAsync("client_public_key").ToJsonWebKey(true); - } - } - - internal interface IBrokerHeartbeatNotifier - { - /// <summary> - /// Called when the heartbeat endpoint receives a heartbeat from the broker - /// </summary> - void HearbeatReceived(); - - /// <summary> - /// Gets the current auth token sent to the broker, which is expected to be sent back in the heartbeat - /// </summary> - /// <returns>The heartbeat auth token if set</returns> - string? GetAuthToken(); - - /// <summary> - /// Gets the address of the broker server - /// </summary> - /// <returns>The full address of the broker server to connect to</returns> - Uri GetBrokerAddress(); - - /// <summary> - /// Gets the public key of the broker server - /// </summary> - /// <returns>The broker's public key</returns> - ReadOnlyJsonWebKey GetBrokerPublicKey(); - } public sealed class ObjectCacheServerEntry : PluginBase { @@ -136,26 +76,14 @@ namespace VNLib.Data.Caching.ObjectCache.Server { try { - //Setup Node config - NodeConfig nodeConf = this.GetOrCreateSingleton<NodeConfig>(); - //Init connect endpoint - ConnectEndpoint endpoint = this.Route<ConnectEndpoint>(); - - //Route the broker endpoint - this.Route<BrokerHeartBeatEndpoint>(); + this.Route<ConnectEndpoint>(); //Setup discovery endpoint if(this.HasConfigForType<PeerDiscoveryEndpoint>()) { this.Route<PeerDiscoveryEndpoint>(); - } - - ulong maxByteSize = ((ulong)endpoint.CacheConfig.MaxCacheEntries * (ulong)endpoint.CacheConfig.BucketCount * (ulong)endpoint.CacheConfig.MaxMessageSize); - - //Log max memory usage - Log.Debug("Maxium memory consumption {mx}Mb", maxByteSize / (ulong)(1024 * 1000)); - + } Log.Information("Plugin loaded"); } |