diff options
29 files changed, 2572 insertions, 931 deletions
diff --git a/lib/VNLib.Data.Caching.Extensions/src/ActiveServer.cs b/lib/VNLib.Data.Caching.Extensions/src/ActiveServer.cs index d9c463b..2d02491 100644 --- a/lib/VNLib.Data.Caching.Extensions/src/ActiveServer.cs +++ b/lib/VNLib.Data.Caching.Extensions/src/ActiveServer.cs @@ -27,14 +27,22 @@ using System.Text.Json.Serialization; namespace VNLib.Data.Caching.Extensions { - public class ActiveServer + public class ActiveServer : ICachePeerAdvertisment { [JsonPropertyName("address")] public string? HostName { get; set; } - [JsonPropertyName("server_id")] + public string? ServerId { get; set; } [JsonPropertyName("ip_address")] public string? Ip { get; set; } + + public Uri ConnectEndpoint { get; } + + public Uri? DiscoveryEndpoint { get; } + + [JsonPropertyName("server_id")] + public string NodeId { get; } + ///<inheritdoc/> public override int GetHashCode() => ServerId!.GetHashCode(StringComparison.OrdinalIgnoreCase); ///<inheritdoc/> diff --git a/lib/VNLib.Data.Caching.Extensions/src/ClientCacheConfiguration.cs b/lib/VNLib.Data.Caching.Extensions/src/CacheClientConfiguration.cs index ef44a29..05e4928 100644 --- a/lib/VNLib.Data.Caching.Extensions/src/ClientCacheConfiguration.cs +++ b/lib/VNLib.Data.Caching.Extensions/src/CacheClientConfiguration.cs @@ -31,23 +31,30 @@ using VNLib.Hashing.IdentityUtility; namespace VNLib.Data.Caching.Extensions { + public interface ICacheJwtManager + { + IReadOnlyDictionary<string, string?> GetJwtHeader(); + + void SignJwt(JsonWebToken jwt); + + bool VerifyCache(JsonWebToken jwt); + + bool VerifyBroker(JsonWebToken jwt); + } + /// <summary> /// A fluent api configuration object for configuring a <see cref="FBMClient"/> /// to connect to cache servers. /// </summary> - public sealed class ClientCacheConfiguration + public class CacheClientConfiguration : ICacheJwtManager, ICacheListServerRequest { - internal ReadOnlyJsonWebKey? SigningKey { get; private set; } - internal ReadOnlyJsonWebKey? VerificationKey { get; private set; } - internal ReadOnlyJsonWebKey? BrokerVerificationKey { get; private set; } + public ReadOnlyJsonWebKey? SigningKey { get; private set; } + public ReadOnlyJsonWebKey? VerificationKey { get; private set; } + public ReadOnlyJsonWebKey? BrokerVerificationKey { get; private set; } - internal string ServerChallenge { get; } = RandomHash.GetRandomBase32(24); - internal string? NodeId { get; set; } - internal Uri? BrokerAddress { get; set; } - internal bool UseTls { get; set; } - internal ActiveServer[]? CacheServers { get; set; } - - internal IReadOnlyDictionary<string, string?> JwtHeader => SigningKey!.JwtHeader; + public Uri? DiscoveryEndpoint { get; private set; } + public bool UseTls { get; private set; } + internal ICachePeerAdvertisment[]? CacheServers { get; set; } /// <summary> /// Imports the private key used to sign messages @@ -56,7 +63,7 @@ namespace VNLib.Data.Caching.Extensions /// <returns>Chainable fluent object</returns> /// <exception cref="ArgumentException"></exception> /// <exception cref="CryptographicException"></exception> - public ClientCacheConfiguration WithSigningCertificate(ReadOnlyJsonWebKey jwk) + public CacheClientConfiguration WithSigningKey(ReadOnlyJsonWebKey jwk) { SigningKey = jwk ?? throw new ArgumentNullException(nameof(jwk)); return this; @@ -69,13 +76,13 @@ namespace VNLib.Data.Caching.Extensions /// <returns>Chainable fluent object</returns> /// <exception cref="ArgumentException"></exception> /// <exception cref="CryptographicException"></exception> - public ClientCacheConfiguration WithVerificationKey(ReadOnlyJsonWebKey jwk) + public CacheClientConfiguration WithVerificationKey(ReadOnlyJsonWebKey jwk) { VerificationKey = jwk ?? throw new ArgumentNullException(nameof(jwk)); return this; } - public ClientCacheConfiguration WithBrokerVerificationKey(ReadOnlyJsonWebKey jwk) + public CacheClientConfiguration WithBrokerVerificationKey(ReadOnlyJsonWebKey jwk) { BrokerVerificationKey = jwk ?? throw new ArgumentNullException(nameof(jwk)); return this; @@ -86,50 +93,35 @@ namespace VNLib.Data.Caching.Extensions /// </summary> /// <param name="useTls">A value that indicates if connections should use TLS</param> /// <returns>Chainable fluent object</returns> - public ClientCacheConfiguration WithTls(bool useTls) + public CacheClientConfiguration WithTls(bool useTls) { UseTls = useTls; return this; } + /// <summary> /// Specifies the broker address to discover cache nodes from /// </summary> /// <param name="brokerAddress">The address of the server broker</param> /// <returns>Chainable fluent object</returns> /// <exception cref="ArgumentNullException"></exception> - public ClientCacheConfiguration WithBroker(Uri brokerAddress) + public CacheClientConfiguration WithBroker(Uri brokerAddress) { - this.BrokerAddress = brokerAddress ?? throw new ArgumentNullException(nameof(brokerAddress)); + DiscoveryEndpoint = brokerAddress ?? throw new ArgumentNullException(nameof(brokerAddress)); return this; } + + ///<inheritdoc/> + public void SignJwt(JsonWebToken jwt) => jwt.SignFromJwk(SigningKey!); - /// <summary> - /// Specifies the current server's cluster node id. If this - /// is a server connection attempting to listen for changes on the - /// remote server, this id must be set and unique - /// </summary> - /// <param name="nodeId">The cluster node id of the current server</param> - /// <returns>Chainable fluent object</returns> - /// <exception cref="ArgumentNullException"></exception> - public ClientCacheConfiguration WithNodeId(string nodeId) - { - this.NodeId = nodeId ?? throw new ArgumentNullException(nameof(nodeId)); - return this; - } + ///<inheritdoc/> + public bool VerifyCache(JsonWebToken jwt) => jwt.VerifyFromJwk(VerificationKey!); - internal void SignJwt(JsonWebToken jwt) - { - jwt.SignFromJwk(SigningKey); - } + ///<inheritdoc/> + public bool VerifyBroker(JsonWebToken jwt) => jwt.VerifyFromJwk(BrokerVerificationKey!); - internal bool VerifyCache(JsonWebToken jwt) - { - return jwt.VerifyFromJwk(VerificationKey); - } + ///<inheritdoc/> + public IReadOnlyDictionary<string, string?> GetJwtHeader() => SigningKey!.JwtHeader; - internal bool VerifyBroker(JsonWebToken jwt) - { - return jwt.VerifyFromJwk(BrokerVerificationKey); - } } } diff --git a/lib/VNLib.Data.Caching.Extensions/src/CacheListServerRequest.cs b/lib/VNLib.Data.Caching.Extensions/src/CacheListServerRequest.cs new file mode 100644 index 0000000..76d4ad8 --- /dev/null +++ b/lib/VNLib.Data.Caching.Extensions/src/CacheListServerRequest.cs @@ -0,0 +1,82 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Data.Caching.Extensions +* File: ListServerRequest.cs +* +* ListServerRequest.cs is part of VNLib.Data.Caching.Extensions which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Data.Caching.Extensions is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Data.Caching.Extensions is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Collections.Generic; + +using VNLib.Utils; +using VNLib.Hashing.IdentityUtility; + +namespace VNLib.Data.Caching.Extensions +{ + public interface ICacheListServerRequest : ICacheJwtManager + { + Uri DiscoveryEndpoint { get; } + } + + /// <summary> + /// A request container for a ListServer request + /// </summary> + public sealed class CacheListServerRequest : ICacheListServerRequest + { + private readonly ICacheJwtManager _manager; + + + /// <summary> + /// The address of the broker server to connect to + /// </summary> + public Uri DiscoveryEndpoint { get; private set; } + + public CacheListServerRequest(ICacheJwtManager keyManager, Uri? brokerAddress = null) + { + _manager = keyManager; + DiscoveryEndpoint = brokerAddress!; + } + + + /// <summary> + /// Sets the broker address for the request + /// </summary> + /// <param name="brokerAddr">The broker server's address to connect to</param> + /// <returns>A fluent chainable value</returns> + /// <exception cref="ArgumentNullException"></exception> + public CacheListServerRequest WithDiscoveryEndpoint(Uri brokerAddr) + { + DiscoveryEndpoint = brokerAddr ?? throw new ArgumentNullException(nameof(brokerAddr)); + return this; + } + + /// <inheritdoc/> + public void SignJwt(JsonWebToken jwt) => _manager.SignJwt(jwt); + + /// <inheritdoc/> + public bool VerifyCache(JsonWebToken jwt) => _manager.VerifyCache(jwt); + + /// <inheritdoc/> + public bool VerifyBroker(JsonWebToken jwt) => _manager.VerifyBroker(jwt); + + /// <inheritdoc/> + public IReadOnlyDictionary<string, string?> GetJwtHeader() => _manager.GetJwtHeader(); + } +} diff --git a/lib/VNLib.Data.Caching.Extensions/src/CacheNodeConfiguration.cs b/lib/VNLib.Data.Caching.Extensions/src/CacheNodeConfiguration.cs new file mode 100644 index 0000000..21a99e1 --- /dev/null +++ b/lib/VNLib.Data.Caching.Extensions/src/CacheNodeConfiguration.cs @@ -0,0 +1,84 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Data.Caching.Extensions +* File: CacheNodeConfiguration.cs +* +* CacheNodeConfiguration.cs is part of VNLib.Data.Caching.Extensions which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Data.Caching.Extensions is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Data.Caching.Extensions is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; + + +namespace VNLib.Data.Caching.Extensions +{ + + public class CacheNodeConfiguration: CacheClientConfiguration, ICachePeerAdvertisment + { + /// <summary> + /// The address for clients to connect to + /// </summary> + public Uri? ConnectEndpoint { get; private set; } + + /// <summary> + /// Whether or not to advertise ourself to peer nodes + /// </summary> + public bool BroadcastAdverisment { get; private set; } + + /// <summary> + /// Define the endpoint for clients to connect to to discover + /// other discovertable nodes + /// </summary> + public Uri? DiscoveryEndpoint { get; private set; } + + /// <summary> + /// Sets the full address of our cache endpoint for clients to connect to + /// </summary> + /// <param name="connectUri">The uri clients will attempt to connect to</param> + public CacheNodeConfiguration WithCacheEndpoint(Uri connectUri) + { + ConnectEndpoint = connectUri; + return this; + } + + public CacheNodeConfiguration EnableAdvertisment(bool enable, Uri? discoveryEndpoint) + { + BroadcastAdverisment = enable; + DiscoveryEndpoint = discoveryEndpoint; + return this; + } + + ///<inheritdoc/> + public string NodeId { get; private set; } = null!; + + /// <summary> + /// Specifies the current server's cluster node id. If this + /// is a server connection attempting to listen for changes on the + /// remote server, this id must be set and unique + /// </summary> + /// <param name="nodeId">The cluster node id of the current server</param> + /// <returns>Chainable fluent object</returns> + /// <exception cref="ArgumentNullException"></exception> + public CacheClientConfiguration WithNodeId(string nodeId) + { + NodeId = nodeId ?? throw new ArgumentNullException(nameof(nodeId)); + return this; + } + + } +} diff --git a/lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs b/lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs index 8ee02f7..9efe16a 100644 --- a/lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs +++ b/lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs @@ -32,6 +32,7 @@ using System.Threading; using System.Threading.Tasks; using System.Collections.Generic; using System.Security.Cryptography; +using System.Text.Json.Serialization; using System.Runtime.CompilerServices; using RestSharp; @@ -45,12 +46,12 @@ using VNLib.Utils.Extensions; using VNLib.Net.Rest.Client; using VNLib.Net.Messaging.FBM; using VNLib.Net.Messaging.FBM.Client; - using ContentType = VNLib.Net.Http.ContentType; + namespace VNLib.Data.Caching.Extensions { - + /// <summary> /// Provides extension methods for FBM data caching using /// cache servers and brokers @@ -61,11 +62,22 @@ namespace VNLib.Data.Caching.Extensions /// The websocket sub-protocol to use when connecting to cache servers /// </summary> public const string CACHE_WS_SUB_PROCOL = "object-cache"; + /// <summary> /// The default cache message header size /// </summary> public const int MAX_FBM_MESSAGE_HEADER_SIZE = 1024; + /// <summary> + /// The client nonce signature http header name + /// </summary> + public const string X_UPGRADE_SIG_HEADER = "X-Cache-Upgrade-Sig"; + + /// <summary> + /// The advertisment header for cache node discovery + /// </summary> + public const string X_NODE_DISCOVERY_HEADER = "X-Cache-Node-Discovery"; + private static readonly RestClientPool ClientPool = new(2,new RestClientOptions() { MaxTimeout = 10 * 1000, @@ -75,7 +87,7 @@ namespace VNLib.Data.Caching.Extensions ThrowOnAnyError = true, }); - private static readonly ConditionalWeakTable<FBMClient, ClientCacheConfiguration> ClientCacheConfig = new(); + private static readonly ConditionalWeakTable<FBMClient, CacheClientConfiguration> ClientCacheConfig = new(); /// <summary> /// Gets a <see cref="FBMClientConfig"/> preconfigured object caching @@ -128,6 +140,59 @@ namespace VNLib.Data.Caching.Extensions } /// <summary> + /// Creats a new <see cref="CacheListServerRequest"/> from an existing <see cref="CacheClientConfiguration"/> + /// </summary> + /// <param name="conf">The prepared client configuration</param> + /// <returns>The new <see cref="CacheListServerRequest"/></returns> + public static CacheListServerRequest GetListMessage(this CacheClientConfiguration conf) + { + return new(conf, conf.DiscoveryEndpoint); + } + + /// <summary> + /// Discovers peer nodes from a given initial peer and returns a list of discovered nodes. If the config + /// is for a cache peer node, the current peer is removed from the list of discovered nodes. + /// </summary> + /// <param name="cacheConfig"></param> + /// <param name="initialPeer">The initial peer to discover nodes from</param> + /// <param name="cancellation">A token to cancel the discovery operation</param> + /// <returns>The collection of discovered nodes</returns> + /// <exception cref="ArgumentNullException"></exception> + public static async Task<ICachePeerAdvertisment[]?> DiscoverClusterNodesAsync( + this CacheClientConfiguration cacheConfig, + ICachePeerAdvertisment initialPeer, + CancellationToken cancellation + ) + { + _ = initialPeer?.DiscoveryEndpoint ?? throw new ArgumentException("Advertisment does not expose an advertisment endpoint"); + + //Create list request + CacheListServerRequest request = cacheConfig.GetListMessage(); + + //Override with the initial peer's discovery endpoint + request.WithDiscoveryEndpoint(initialPeer.DiscoveryEndpoint); + + //Get the list of servers + ICachePeerAdvertisment[]? servers = await ListServersAsync(request, cancellation); + + if (servers == null) + { + return null; + } + + if(cacheConfig is CacheNodeConfiguration cnc) + { + //Filter out the current node + return servers.Where(s => !cnc.NodeId.Equals(s.NodeId, StringComparison.OrdinalIgnoreCase)).ToArray(); + } + else + { + //Do not filter + return servers; + } + } + + /// <summary> /// Contacts the cache broker to get a list of active servers to connect to /// </summary> /// <param name="request">The request message used to connecto the broker server</param> @@ -135,27 +200,30 @@ namespace VNLib.Data.Caching.Extensions /// <returns>The list of active servers</returns> /// <exception cref="SecurityException"></exception> /// <exception cref="ArgumentNullException"></exception> - public static async Task<ActiveServer[]?> ListServersAsync(ListServerRequest request, CancellationToken cancellationToken = default) + public static async Task<ICachePeerAdvertisment[]?> ListServersAsync(ICacheListServerRequest request, CancellationToken cancellationToken = default) { _ = request ?? throw new ArgumentNullException(nameof(request)); string jwtBody; + //Build request jwt using (JsonWebToken requestJwt = new()) { - requestJwt.WriteHeader(request.JwtHeader); + requestJwt.WriteHeader(request.GetJwtHeader()); requestJwt.InitPayloadClaim() .AddClaim("iat", DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()) .AddClaim("nonce", RandomHash.GetRandomBase32(16)) .CommitClaims(); + //sign the jwt request.SignJwt(requestJwt); + //Compile the jwt jwtBody = requestJwt.Compile(); } //New list request - RestRequest listRequest = new(request.BrokerAddress, Method.Post); + RestRequest listRequest = new(request.DiscoveryEndpoint, Method.Post); //Add the jwt as a string to the request body listRequest.AddStringBody(jwtBody, DataFormat.None); @@ -177,24 +245,49 @@ namespace VNLib.Data.Caching.Extensions data = response.RawBytes ?? throw new InvalidOperationException("No data returned from broker"); } + //Response is jwt using JsonWebToken responseJwt = JsonWebToken.ParseRaw(data); //Verify the jwt - if (!request.VerifyJwt(responseJwt)) + if (!request.VerifyBroker(responseJwt)) { throw new SecurityException("Failed to verify the broker's challenge, cannot continue"); } using JsonDocument doc = responseJwt.GetPayload(); - return doc.RootElement.GetProperty("servers").Deserialize<ActiveServer[]>(); + return doc.RootElement.GetProperty("peers").Deserialize<Advertisment[]>(); + } + + /// <summary> + /// Registers the current node with the broker + /// </summary> + /// <returns>A task that completes when the regitration has been made successfully</returns> + /// <exception cref="ArgumentException"></exception> + public static async Task RegisterWithBrokerAsync(this CacheNodeConfiguration config, string authToken) + { + //Recover the certificate + ReadOnlyJsonWebKey cacheCert = config?.SigningKey ?? throw new ArgumentException(nameof(config.SigningKey)); + + //init broker request + using BrokerRegistrationRequest request = new(); + + request.WithBroker(config.DiscoveryEndpoint!) + .WithRegistrationAddress(config.ConnectEndpoint!.ToString()) + .WithNodeId(config.NodeId!) + .WithSigningKey(cacheCert, true) + .WithHeartbeatToken(authToken); + + + //Send the request + await RegisterWithBrokerAsync(request); } /// <summary> /// Registers the current server as active with the specified broker /// </summary> /// <param name="registration">The registration request</param> - public static async Task ResgisterWithBrokerAsync(BrokerRegistrationRequest registration) + public static async Task RegisterWithBrokerAsync(BrokerRegistrationRequest registration) { _ = registration ?? throw new ArgumentNullException(nameof(registration)); _ = registration.HeartbeatToken ?? throw new ArgumentException("Missing required heartbeat access token"); @@ -220,17 +313,20 @@ namespace VNLib.Data.Caching.Extensions //Compile and save requestData = jwt.Compile(); } + //Create reg request message RestRequest regRequest = new(registration.BrokerAddress); regRequest.AddStringBody(requestData, DataFormat.None); regRequest.AddHeader("Content-Type", "text/plain"); + //Rent client using ClientContract cc = ClientPool.Lease(); + //Exec the regitration request RestResponse response = await cc.Resource.ExecutePutAsync(regRequest); response.ThrowIfError(); - } - + } + /// <summary> /// Allows for configuration of an <see cref="FBMClient"/> @@ -238,7 +334,31 @@ namespace VNLib.Data.Caching.Extensions /// </summary> /// <param name="client"></param> /// <returns>A fluent api configuration builder for the current client</returns> - public static ClientCacheConfiguration GetCacheConfiguration(this FBMClient client) => ClientCacheConfig.GetOrCreateValue(client); + public static CacheClientConfiguration GetCacheConfiguration(this FBMClient client) => ClientCacheConfig.GetOrCreateValue(client); + + /// <summary> + /// Explicitly set the client cache configuration for the current client + /// </summary> + /// <param name="client"></param> + /// <param name="config">The cache node configuration</param> + /// <returns>The config instance</returns> + public static CacheClientConfiguration SetCacheConfiguration(this FBMClient client, CacheClientConfiguration config) + { + ClientCacheConfig.AddOrUpdate(client, config); + return config; + } + + /// <summary> + /// Explicitly set the cache node configuration for the current client + /// </summary> + /// <param name="client"></param> + /// <param name="nodeConfig">The cache node configuration</param> + /// <returns>The config instance</returns> + public static CacheNodeConfiguration SetCacheConfiguration(this FBMClient client, CacheNodeConfiguration nodeConfig) + { + ClientCacheConfig.AddOrUpdate(client, nodeConfig); + return nodeConfig; + } /// <summary> /// Discovers cache nodes in the broker configured for the current client. @@ -246,7 +366,10 @@ namespace VNLib.Data.Caching.Extensions /// <param name="client"></param> /// <param name="token">A token to cancel the discovery</param> /// <returns>A task the resolves the list of active servers on the broker server</returns> - public static Task<ActiveServer[]?> DiscoverCacheNodesAsync(this FBMClientWorkerBase client, CancellationToken token = default) => client.Client.DiscoverCacheNodesAsync(token); + public static Task<ICachePeerAdvertisment[]?> DiscoverCacheNodesAsync(this FBMClientWorkerBase client, CancellationToken token = default) + { + return client.Client.DiscoverCacheNodesAsync(token); + } /// <summary> /// Discovers cache nodes in the broker configured for the current client. @@ -254,13 +377,13 @@ namespace VNLib.Data.Caching.Extensions /// <param name="client"></param> /// <param name="token">A token to cancel the discovery </param> /// <returns>A task the resolves the list of active servers on the broker server</returns> - public static async Task<ActiveServer[]?> DiscoverCacheNodesAsync(this FBMClient client, CancellationToken token = default) + public static async Task<ICachePeerAdvertisment[]?> DiscoverCacheNodesAsync(this FBMClient client, CancellationToken token = default) { - ClientCacheConfiguration conf = ClientCacheConfig.GetOrCreateValue(client); - //Request from config - using ListServerRequest req = ListServerRequest.FromConfig(conf); + //Get the stored client config + CacheClientConfiguration conf = ClientCacheConfig.GetOrCreateValue(client); + //List servers async - return conf.CacheServers = await ListServersAsync(req, token); + return conf.CacheServers = await ListServersAsync(conf, token); } /// <summary> @@ -274,16 +397,21 @@ namespace VNLib.Data.Caching.Extensions public static async Task WaitForExitAsync(this FBMClient client, CancellationToken token = default) { client.LogDebug("Waiting for cache client to exit"); + //Get task for cancellation Task cancellation = token.WaitHandle.WaitAsync(); + //Task for status handle Task run = client.ConnectionStatusHandle.WaitAsync(); + //Wait for cancellation or _ = await Task.WhenAny(cancellation, run); client.LogDebug("Disconnecting the cache client"); + //Normal try to disconnect the socket await client.DisconnectAsync(CancellationToken.None); + //Notify if cancelled token.ThrowIfCancellationRequested(); } @@ -300,14 +428,18 @@ namespace VNLib.Data.Caching.Extensions /// <exception cref="ArgumentNullException"></exception> /// <exception cref="SecurityException"></exception> /// <exception cref="ObjectDisposedException"></exception> - public static async Task<ActiveServer> ConnectToRandomCacheAsync(this FBMClient client, CancellationToken cancellation = default) + public static async Task<ICachePeerAdvertisment> ConnectToRandomCacheAsync(this FBMClient client, CancellationToken cancellation = default) { //Get stored config - ClientCacheConfiguration conf = ClientCacheConfig.GetOrCreateValue(client); + CacheClientConfiguration conf = ClientCacheConfig.GetOrCreateValue(client); + //Select random - ActiveServer? randomServer = conf.CacheServers?.SelectRandom(); - _ = randomServer ?? throw new ArgumentException("No servers detected, cannot connect"); + ICachePeerAdvertisment? randomServer = conf.CacheServers?.SelectRandom() + ?? throw new ArgumentException("No servers detected, cannot connect"); + await ConnectToCacheAsync(client, randomServer, cancellation); + + //Return the random server we connected to return randomServer; } @@ -324,59 +456,97 @@ namespace VNLib.Data.Caching.Extensions /// <exception cref="ArgumentNullException"></exception> /// <exception cref="SecurityException"></exception> /// <exception cref="ObjectDisposedException"></exception> - public static Task ConnectToCacheAsync(this FBMClient client, ActiveServer server, CancellationToken token = default) + public static Task ConnectToCacheAsync(this FBMClient client, ICachePeerAdvertisment server, CancellationToken token = default) { _ = client ?? throw new ArgumentNullException(nameof(client)); _ = server ?? throw new ArgumentNullException(nameof(server)); //Get stored config - ClientCacheConfiguration conf = ClientCacheConfig.GetOrCreateValue(client); + CacheClientConfiguration conf = ClientCacheConfig.GetOrCreateValue(client); //Connect to server (no server id because client not replication server) return ConnectToCacheAsync(client, conf, server, token); } - - private static async Task ConnectToCacheAsync(FBMClient client, ClientCacheConfiguration request, ActiveServer server, CancellationToken token = default) + /// <summary> + /// Connects to the specified server on the configured cache client + /// </summary> + /// <param name="client"></param> + /// <param name="server">The server to connect to</param> + /// <param name="token">A token to cancel the operation</param> + /// <param name="explicitConfig">Explicit cache configuration to use</param> + /// <returns>A task that resolves when the client is connected to the cache server</returns> + /// <exception cref="FBMException"></exception> + /// <exception cref="FBMServerNegiationException"></exception> + /// <exception cref="ArgumentException"></exception> + /// <exception cref="ArgumentNullException"></exception> + /// <exception cref="SecurityException"></exception> + /// <exception cref="ObjectDisposedException"></exception> + public static Task ConnectToCacheAsync(this FBMClient client, ICachePeerAdvertisment server, CacheClientConfiguration explicitConfig, CancellationToken token = default) { - //Construct server uri - Uri serverUri = new(server.HostName!); - - //build ws uri - UriBuilder uriBuilder = new(serverUri) + _ = client ?? throw new ArgumentNullException(nameof(client)); + _ = server ?? throw new ArgumentNullException(nameof(server)); + + //Connect to server (no server id because client not replication server) + return ConnectToCacheAsync(client, explicitConfig, server, token); + } + + + private static async Task ConnectToCacheAsync( + FBMClient client, + CacheClientConfiguration config, + ICachePeerAdvertisment server, + CancellationToken token = default + ) + { + //build ws uri from the connect endpoint + UriBuilder uriBuilder = new(server.ConnectEndpoint) { - Scheme = request.UseTls ? "wss://" : "ws://" + Scheme = config.UseTls ? "wss://" : "ws://" }; - + + string challenge = RandomHash.GetRandomBase32(24); + + //See if the supplied config is for a cache node + CacheNodeConfiguration? cnc = config as CacheNodeConfiguration; + string jwtMessage; //Init jwt for connecting to server using (JsonWebToken jwt = new()) { - jwt.WriteHeader(request.JwtHeader); + jwt.WriteHeader(config.GetJwtHeader()); //Init claim JwtPayload claim = jwt.InitPayloadClaim(); - claim.AddClaim("chl", request.ServerChallenge); + claim.AddClaim("chl", challenge); - if (!string.IsNullOrWhiteSpace(request.NodeId)) + if (!string.IsNullOrWhiteSpace(cnc?.NodeId)) { /* * The unique node id so the other nodes know to load the * proper event queue for the current server */ - claim.AddClaim("sub", request.NodeId); + claim.AddClaim("sub", cnc.NodeId); } claim.CommitClaims(); //Sign jwt - request.SignJwt(jwt); + config.SignJwt(jwt); //Compile to string jwtMessage = jwt.Compile(); } - RestRequest negotation = new(serverUri, Method.Get); + /* + * During a server negiation, the client makes an intial get request to the cache endpoint + * and passes some client negiation terms as a signed message to the server. The server then + * validates these values and returns a signed jwt with the server negiation terms. + * + * The response from the server is essentailly the 'access token' + */ + + RestRequest negotation = new(server.ConnectEndpoint, Method.Get); //Set the jwt auth header for negotiation negotation.AddHeader("Authorization", jwtMessage); negotation.AddHeader("Accept", HttpHelpers.GetContentTypeString(ContentType.Text)); @@ -406,13 +576,13 @@ namespace VNLib.Data.Caching.Extensions using (JsonWebToken jwt = JsonWebToken.Parse(authToken)) { //Verify the jwt - if (!request.VerifyCache(jwt)) + if (!config.VerifyCache(jwt)) { throw new SecurityException("Failed to verify the cache server's negotiation message, cannot continue"); } //Confirm the server's buffer configuration - ValidateServerNegotation(client, request.ServerChallenge, jwt); + ValidateServerNegotation(client, challenge, jwt); } client.LogDebug("Server negotiation validated, connecting to server"); @@ -420,6 +590,16 @@ namespace VNLib.Data.Caching.Extensions //The client authorization header is the exact response client.ClientSocket.Headers[HttpRequestHeader.Authorization] = authToken; + //Compute the signature of the upgrade token + client.ClientSocket.Headers[X_UPGRADE_SIG_HEADER] = GetBase64UpgradeSingature(authToken, config.SigningKey!); + + //Check to see if adversize self is enabled + if (cnc?.BroadcastAdverisment == true) + { + //Set advertisment header + client.ClientSocket.Headers[X_NODE_DISCOVERY_HEADER] = GetAdvertismentHeader(cnc); + } + //Connect async await client.ConnectAsync(uriBuilder.Uri, token); } @@ -475,16 +655,183 @@ namespace VNLib.Data.Caching.Extensions } } + /* + * Added layer to confirm that client that requested the negotation holds the private key + * compute a signature of the upgrade token and send it to the server to prove we hold the private key. + */ + + private static string GetBase64UpgradeSingature(string? token, ReadOnlyJsonWebKey key) + { + //try to get the ecdsa key first + using ECDsa? ec = key.GetECDsaPrivateKey(); + + if(ec != null) + { + //Compute hash of the token + byte[] hash = ManagedHash.ComputeHash(token, HashAlg.SHA256); + + //Sign the hash + byte[] sig = ec.SignHash(hash, DSASignatureFormat.IeeeP1363FixedFieldConcatenation); + + //Return the base64 string + return Convert.ToBase64String(sig); + } + + //Check rsa next + using RSA? rsa = key.GetRSAPrivateKey(); + if(rsa != null) + { + //Compute hash of the token + byte[] hash = ManagedHash.ComputeHash(token, HashAlg.SHA256); + + //Sign the hash + byte[] sig = rsa.SignHash(hash, HashAlgorithmName.SHA256, RSASignaturePadding.Pkcs1); + + //Return the base64 string + return Convert.ToBase64String(sig); + } + + throw new CryptographicException("Cache JKW does not export a supported private key for upgrade challenges"); + } + + /// <summary> + /// Verifies the signed auth token against the given verification key + /// </summary> + /// <param name="signature">The base64 signature of the token</param> + /// <param name="token">The raw token to compute the hash of</param> + /// <param name="nodeConfig">The node configuration</param> + /// <returns>True if the singature matches, false otherwise</returns> + /// <exception cref="CryptographicException"></exception> + public static bool VerifyUpgradeToken(this CacheClientConfiguration nodeConfig, string signature, string token) + { + return VerifyUpgradeToken(signature, token, nodeConfig.VerificationKey); + } + + /// <summary> + /// Verifies the signed auth token against the given verification key + /// </summary> + /// <param name="signature">The base64 signature of the token</param> + /// <param name="token">The raw token to compute the hash of</param> + /// <param name="verifcationKey">The key used to verify the singature with</param> + /// <returns>True if the singature matches, false otherwise</returns> + /// <exception cref="ArgumentNullException"></exception> + /// <exception cref="CryptographicException"></exception> + public static bool VerifyUpgradeToken(string signature, string token, ReadOnlyJsonWebKey verifcationKey) + { + _ = verifcationKey ?? throw new ArgumentNullException(nameof(verifcationKey)); + + //get the hash of the token + byte[] hash = ManagedHash.ComputeHash(token, HashAlg.SHA256); + + //decode the signature + byte[] sig = Convert.FromBase64String(signature); + + //try to get the ecdsa key first + using ECDsa? ec = verifcationKey.GetECDsaPublicKey(); + if(ec != null) + { + //Verify the signature + return ec.VerifyHash(hash, sig, DSASignatureFormat.IeeeP1363FixedFieldConcatenation); + } + + //Check rsa next + using RSA? rsa = verifcationKey.GetRSAPublicKey(); + if(rsa != null) + { + //Verify the signature + return rsa.VerifyHash(hash, sig, HashAlgorithmName.SHA256, RSASignaturePadding.Pkcs1); + } + + throw new CryptographicException("Cache JKW does not export a supported public key for upgrade challenges"); + } + + private static string GetAdvertismentHeader(CacheNodeConfiguration nodeConfiguration) + { + /* + * Create node advertisment message to publish to peer nodes + * + * these messages will allow other clients and peers to discover us + */ + + using JsonWebToken jwt = new(); + + //Get the jwt header + jwt.WriteHeader(nodeConfiguration.GetJwtHeader()); + + jwt.InitPayloadClaim() + .AddClaim("nonce", RandomHash.GetRandomBase32(16)) + .AddClaim("iat", DateTimeOffset.UtcNow.ToUnixTimeSeconds()) + .AddClaim("iss", nodeConfiguration.NodeId!) + .AddClaim("url", nodeConfiguration.ConnectEndpoint!.ToString()) + //Optional discovery endpoint + .AddClaim("dis", nodeConfiguration.DiscoveryEndpoint?.ToString() ?? string.Empty) + .CommitClaims(); + + //Sign message + nodeConfiguration.SignJwt(jwt); + + return jwt.Compile(); + } + + /// <summary> + /// Verifies the peer advertisment message + /// </summary> + /// <param name="config"></param> + /// <param name="message">The advertisment message to verify</param> + /// <returns>The advertisment message if successfully verified, or null otherwise</returns> + /// <exception cref="FormatException"></exception> + public static ICachePeerAdvertisment? VerifyPeerAdvertisment(this ICacheJwtManager config, string message) + { + using JsonWebToken jwt = JsonWebToken.Parse(message); + + //Verify the signature + if (!config.VerifyCache(jwt)) + { + return null; + } + + //Get the payload + return jwt.GetPayload<Advertisment>(); + } + + /// <summary> /// Selects a random server from a collection of active servers /// </summary> /// <param name="servers"></param> /// <returns>A server selected at random</returns> - public static ActiveServer SelectRandom(this ICollection<ActiveServer> servers) + public static ICachePeerAdvertisment SelectRandom(this ICollection<ICachePeerAdvertisment> servers) { //select random server int randServer = RandomNumberGenerator.GetInt32(0, servers.Count); return servers.ElementAt(randServer); } + + + private class Advertisment : ICachePeerAdvertisment + { + [JsonIgnore] + public Uri? ConnectEndpoint { get; set; } + + [JsonIgnore] + public Uri? DiscoveryEndpoint { get; set; } + + [JsonPropertyName("iss")] + public string NodeId { get; set; } + + [JsonPropertyName("url")] + public string? url + { + get => ConnectEndpoint?.ToString(); + set => ConnectEndpoint = value == null ? null : new Uri(value); + } + + [JsonPropertyName("dis")] + public string? dis + { + get => DiscoveryEndpoint?.ToString(); + set => DiscoveryEndpoint = value == null ? null : new Uri(value); + } + } } } diff --git a/lib/VNLib.Data.Caching.Extensions/src/ICachePeerAdvertisment.cs b/lib/VNLib.Data.Caching.Extensions/src/ICachePeerAdvertisment.cs new file mode 100644 index 0000000..acf883e --- /dev/null +++ b/lib/VNLib.Data.Caching.Extensions/src/ICachePeerAdvertisment.cs @@ -0,0 +1,50 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Data.Caching.Extensions +* File: ICachePeerAdvertisment.cs +* +* ICachePeerAdvertisment.cs is part of VNLib.Data.Caching.Extensions which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Data.Caching.Extensions is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Data.Caching.Extensions is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; + + +namespace VNLib.Data.Caching.Extensions +{ + /// <summary> + /// Represents a node that can be advertised to clients + /// </summary> + public interface ICachePeerAdvertisment + { + /// <summary> + /// The endpoint for clients to connect to to access the cache + /// </summary> + Uri ConnectEndpoint { get; } + + /// <summary> + /// Gets the address for clients to connect to to discover other discovertable nodes + /// </summary> + Uri? DiscoveryEndpoint { get; } + + /// <summary> + /// Gets the unique identifier for this node + /// </summary> + string NodeId { get; } + } +} diff --git a/lib/VNLib.Data.Caching.Extensions/src/ListServerRequest.cs b/lib/VNLib.Data.Caching.Extensions/src/ListServerRequest.cs deleted file mode 100644 index fd25925..0000000 --- a/lib/VNLib.Data.Caching.Extensions/src/ListServerRequest.cs +++ /dev/null @@ -1,116 +0,0 @@ -/* -* Copyright (c) 2023 Vaughn Nugent -* -* Library: VNLib -* Package: VNLib.Data.Caching.Extensions -* File: ListServerRequest.cs -* -* ListServerRequest.cs is part of VNLib.Data.Caching.Extensions which is part of the larger -* VNLib collection of libraries and utilities. -* -* VNLib.Data.Caching.Extensions is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License as -* published by the Free Software Foundation, either version 3 of the -* License, or (at your option) any later version. -* -* VNLib.Data.Caching.Extensions is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see https://www.gnu.org/licenses/. -*/ - -using System; -using System.Collections.Generic; - -using VNLib.Utils; -using VNLib.Hashing.IdentityUtility; - -namespace VNLib.Data.Caching.Extensions -{ - /// <summary> - /// A request container for a ListServer request - /// </summary> - public sealed class ListServerRequest : VnDisposeable - { - private readonly bool _ownsKeys; - - private ReadOnlyJsonWebKey? VerificationKey; - private ReadOnlyJsonWebKey? SigningAlg; - - /// <summary> - /// The address of the broker server to connect to - /// </summary> - public Uri BrokerAddress { get; } - - public ListServerRequest(Uri brokerAddress) - { - BrokerAddress = brokerAddress; - _ownsKeys = true; - } - - private ListServerRequest(ClientCacheConfiguration conf) - { - //Broker verification key is required - VerificationKey = conf.BrokerVerificationKey; - SigningAlg = conf.SigningKey; - BrokerAddress = conf.BrokerAddress ?? throw new ArgumentException("Broker address must be specified"); - _ownsKeys = false; - } - - internal static ListServerRequest FromConfig(ClientCacheConfiguration conf) => new (conf); - - /// <summary> - /// Sets the public key used to verify the signature of the response. - /// </summary> - /// <param name="jwk">The key used to verify messages </param> - public ListServerRequest WithVerificationKey(ReadOnlyJsonWebKey jwk) - { - VerificationKey = jwk ?? throw new ArgumentNullException(nameof(jwk)); - return this; - } - /// <summary> - /// Sets the private key used to sign the request. - /// </summary> - /// <param name="jwk">The <see cref="ReadOnlyJsonWebKey"/> containing the private key used to sign the message</param> - /// <exception cref="ArgumentNullException"></exception> - public ListServerRequest WithSigningKey(ReadOnlyJsonWebKey jwk) - { - SigningAlg = jwk ?? throw new ArgumentNullException(nameof(jwk)); - return this; - } - - /// <summary> - /// Signs the <see cref="JsonWebToken"/> using the private key. - /// </summary> - /// <param name="jwt">The message to sign</param> - internal void SignJwt(JsonWebToken jwt) - { - jwt.SignFromJwk(SigningAlg); - } - - /// <summary> - /// Verifies the signature of the <see cref="JsonWebToken"/> - /// </summary> - /// <param name="jwt"></param> - /// <returns>A value that indicates if the signature is verified</returns> - internal bool VerifyJwt(JsonWebToken jwt) - { - return jwt.VerifyFromJwk(VerificationKey); - } - - internal IReadOnlyDictionary<string, string?> JwtHeader => SigningAlg!.JwtHeader; - - ///<inheritdoc/> - protected override void Free() - { - if (_ownsKeys) - { - VerificationKey?.Dispose(); - SigningAlg?.Dispose(); - } - } - } -} diff --git a/lib/VNLib.Data.Caching/src/ClientExtensions.cs b/lib/VNLib.Data.Caching/src/ClientExtensions.cs index 2ac8b40..eced7fe 100644 --- a/lib/VNLib.Data.Caching/src/ClientExtensions.cs +++ b/lib/VNLib.Data.Caching/src/ClientExtensions.cs @@ -28,7 +28,6 @@ using System.Buffers; using System.Text.Json; using System.Threading; using System.Threading.Tasks; -using System.Collections.Generic; using System.Runtime.CompilerServices; using VNLib.Utils.Logging; @@ -417,7 +416,7 @@ namespace VNLib.Data.Caching /// </summary> /// <param name="client"></param> /// <param name="cancellationToken">A token to cancel the deuque operation</param> - /// <returns>A <see cref="KeyValuePair{TKey, TValue}"/> that contains the modified object id and optionally its new id</returns> + /// <returns>A <see cref="WaitForChangeResult"/> that contains information about the modified element</returns> public static async Task<WaitForChangeResult> WaitForChangeAsync(this FBMClient client, CancellationToken cancellationToken = default) { //Rent a new request diff --git a/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs b/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs index ae6ca59..9f5ccfe 100644 --- a/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs +++ b/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs @@ -112,7 +112,7 @@ namespace VNLib.Plugins.Extensions.VNCache //Connection authentication methods Client.GetCacheConfiguration() .WithVerificationKey(cachePub) - .WithSigningCertificate(clientPriv) + .WithSigningKey(clientPriv) .WithBrokerVerificationKey(brokerPub); } @@ -127,7 +127,7 @@ namespace VNLib.Plugins.Extensions.VNCache while (true) { //Load the server list - ActiveServer[]? servers; + ICachePeerAdvertisment[]? servers; while (true) { try @@ -163,8 +163,8 @@ namespace VNLib.Plugins.Extensions.VNCache pluginLog.Debug("Connecting to random cache server"); //Connect to a random server - ActiveServer selected = await Client.ConnectToRandomCacheAsync(exitToken); - pluginLog.Debug("Connected to cache server {s}", selected.ServerId); + ICachePeerAdvertisment selected = await Client.ConnectToRandomCacheAsync(exitToken); + pluginLog.Debug("Connected to cache server {s}", selected.NodeId); //Set connection status flag IsConnected = true; diff --git a/plugins/CacheBroker/src/Endpoints/BrokerRegistrationEndpoint.cs b/plugins/CacheBroker/src/Endpoints/BrokerRegistrationEndpoint.cs index db82887..cb22176 100644 --- a/plugins/CacheBroker/src/Endpoints/BrokerRegistrationEndpoint.cs +++ b/plugins/CacheBroker/src/Endpoints/BrokerRegistrationEndpoint.cs @@ -109,18 +109,18 @@ namespace VNLib.Plugins.Cache.Broker.Endpoints private async Task<ReadOnlyJsonWebKey> GetClientPublic() { - return await this.GetPlugin().TryGetSecretAsync("client_public_key").ToJsonWebKey() ?? throw new InvalidOperationException("Client public key not found in vault"); + return await this.GetPlugin().GetSecretAsync("client_public_key").ToJsonWebKey(); } private async Task<ReadOnlyJsonWebKey> GetCachePublic() { - using SecretResult secret = await this.GetPlugin().TryGetSecretAsync("cache_public_key") ?? throw new InvalidOperationException("Cache public key not found in vault"); + using ISecretResult secret = await this.GetPlugin().GetSecretAsync("cache_public_key"); return secret.GetJsonWebKey(); } private async Task<ReadOnlyJsonWebKey> GetBrokerCertificate() { - using SecretResult secret = await this.GetPlugin().TryGetSecretAsync("broker_private_key") ?? throw new InvalidOperationException("Broker private key not found in vault"); + using ISecretResult secret = await this.GetPlugin().TryGetSecretAsync("broker_private_key"); return secret.GetJsonWebKey(); } diff --git a/plugins/ObjectCacheServer/src/Endpoints/CacheConfiguration.cs b/plugins/ObjectCacheServer/src/CacheConfiguration.cs index f7adeb3..f7adeb3 100644 --- a/plugins/ObjectCacheServer/src/Endpoints/CacheConfiguration.cs +++ b/plugins/ObjectCacheServer/src/CacheConfiguration.cs diff --git a/plugins/ObjectCacheServer/src/CacheEventQueueManager.cs b/plugins/ObjectCacheServer/src/CacheEventQueueManager.cs new file mode 100644 index 0000000..5fb6d2a --- /dev/null +++ b/plugins/ObjectCacheServer/src/CacheEventQueueManager.cs @@ -0,0 +1,248 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: ObjectCacheServer +* File: CacheEventQueueManager.cs +* +* CacheEventQueueManager.cs is part of ObjectCacheServer which is +* part of the larger VNLib collection of libraries and utilities. +* +* ObjectCacheServer is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* ObjectCacheServer is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using System.Threading.Channels; +using System.Collections.Generic; + +using VNLib.Plugins; +using VNLib.Utils.Async; +using VNLib.Utils.Logging; +using VNLib.Utils.Extensions; +using VNLib.Plugins.Extensions.Loading; +using VNLib.Plugins.Extensions.Loading.Events; + + +namespace VNLib.Data.Caching.ObjectCache.Server +{ + [ConfigurationName("event_manager")] + internal sealed class CacheEventQueueManager : ICacheEventQueueManager, IDisposable, IIntervalScheduleable + { + private readonly int MaxQueueDepth; + + private readonly object SubLock; + private readonly LinkedList<NodeQueue> Subscribers; + + private readonly object StoreLock; + private readonly Dictionary<string, NodeQueue> QueueStore; + + + public CacheEventQueueManager(PluginBase plugin, IConfigScope config) + { + //Get purge interval + TimeSpan purgeInterval = config["purge_interval_sec"].GetTimeSpan(TimeParseType.Seconds); + + //Get max queue depth + MaxQueueDepth = (int)config["max_depth"].GetUInt32(); + + //Create purge interval + plugin.ScheduleInterval(this, purgeInterval); + + SubLock = new(); + Subscribers = new(); + + StoreLock = new(); + QueueStore = new(StringComparer.OrdinalIgnoreCase); + } + + ///<inheritdoc/> + public AsyncQueue<ChangeEvent> Subscribe(ICachePeer peer) + { + NodeQueue? nq; + + bool isNew = false; + + //Enter sync lock + lock (StoreLock) + { + //Try to recover the queue for the node + if(!QueueStore.TryGetValue(peer.NodeId, out nq)) + { + //Create new queue + nq = new(peer.NodeId, MaxQueueDepth); + QueueStore.Add(peer.NodeId, nq); + isNew = true; + } + + //Increment listener count + nq.Listeners++; + } + + //Publish new peer to subscribers list + if (isNew) + { + lock (SubLock) + { + //Add peer to subscribers list + Subscribers.AddLast(nq); + } + } + + //Return the node's queue + return nq.Queue; + } + + ///<inheritdoc/> + public void Unsubscribe(ICachePeer peer) + { + //Detach a listener for a node + lock (StoreLock) + { + //Get the queue and decrement the listener count + NodeQueue nq = QueueStore[peer.NodeId]; + nq.Listeners--; + } + } + + ///<inheritdoc/> + public void PublishSingle(ChangeEvent change) + { + //Wait to enter the sub lock + lock (SubLock) + { + //Loop through ll the fast way + LinkedListNode<NodeQueue>? q = Subscribers.First; + + while (q != null) + { + //Pub single event node + q.Value.PublishChange(change); + + //Get next queue + q = q.Next; + } + } + } + + ///<inheritdoc/> + public void PublishMultiple(Span<ChangeEvent> changes) + { + //Wait to enter the sub lock + lock (SubLock) + { + //Loop through ll the fast way + LinkedListNode<NodeQueue>? q = Subscribers.First; + + while (q != null) + { + //Publish multiple + q.Value.PublishChanges(changes); + + //Get next queue + q = q.Next; + } + } + } + + ///<inheritdoc/> + public void PurgeStaleSubscribers() + { + //Enter locks + lock(SubLock) + lock(StoreLock) + { + //Get all stale queues (queues without listeners) + NodeQueue[] staleQueues = QueueStore.Values.Where(static nq => nq.Listeners == 0).ToArray(); + + foreach (NodeQueue nq in staleQueues) + { + //Remove from store + QueueStore.Remove(nq.NodeId); + + //remove from subscribers + Subscribers.Remove(nq); + } + } + } + + //Interval to purge stale subscribers + Task IIntervalScheduleable.OnIntervalAsync(ILogProvider log, CancellationToken cancellationToken) + { + //Purge + PurgeStaleSubscribers(); + return Task.CompletedTask; + } + + void IDisposable.Dispose() + { + QueueStore.Clear(); + Subscribers.Clear(); + } + + /* + * Holds queues for each node and keeps track of the number of listeners + * attached to the queue + */ + + private sealed class NodeQueue + { + public int Listeners; + + public string NodeId { get; } + + public AsyncQueue<ChangeEvent> Queue { get; } + + public NodeQueue(string nodeId, int maxDepth) + { + NodeId = nodeId; + + /* + * Create a bounded channel that acts as a lru and evicts + * the oldest item when the queue is full + * + * There will also only ever be a single thread writing events + * to the queue + */ + + BoundedChannelOptions queueOptions = new(maxDepth) + { + AllowSynchronousContinuations = true, + SingleReader = false, + SingleWriter = true, + //Drop oldest item in queue if full + FullMode = BoundedChannelFullMode.DropOldest, + }; + + //Init queue/channel + Queue = new(queueOptions); + } + + public void PublishChange(ChangeEvent change) + { + Queue.TryEnque(change); + } + + public void PublishChanges(Span<ChangeEvent> changes) + { + for(int i = 0; i < changes.Length; i++) + { + Queue.TryEnque(changes[i]); + } + } + } + } +} diff --git a/plugins/ObjectCacheServer/src/Distribution/CacheNodeReplicationMaanger.cs b/plugins/ObjectCacheServer/src/Distribution/CacheNodeReplicationMaanger.cs new file mode 100644 index 0000000..b453dcc --- /dev/null +++ b/plugins/ObjectCacheServer/src/Distribution/CacheNodeReplicationMaanger.cs @@ -0,0 +1,246 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: ObjectCacheServer +* File: ObjectCacheServerEntry.cs +* +* ObjectCacheServerEntry.cs is part of ObjectCacheServer which is part of the larger +* VNLib collection of libraries and utilities. +* +* ObjectCacheServer is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* ObjectCacheServer is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using System.Collections.Generic; + +using VNLib.Plugins; +using VNLib.Utils.Logging; +using VNLib.Data.Caching.Extensions; +using static VNLib.Data.Caching.Constants; +using VNLib.Net.Messaging.FBM; +using VNLib.Net.Messaging.FBM.Client; +using VNLib.Plugins.Extensions.Loading; + +namespace VNLib.Data.Caching.ObjectCache.Server.Distribution +{ + internal sealed class CacheNodeReplicationMaanger : IAsyncBackgroundWork + { + private static readonly TimeSpan GetItemTimeout = TimeSpan.FromSeconds(10); + + private readonly NodeConfig NodeConfig; + private readonly ICachePeerAdapter PeerAdapter; + private readonly ICacheStore CacheStore; + private readonly FBMClientConfig ClientConfig; + private readonly PluginBase Plugin; + + private CacheNodeConfiguration CacheConfig => NodeConfig.Config; + + public CacheNodeReplicationMaanger(PluginBase plugin) + { + //Load the node config + NodeConfig = plugin.GetOrCreateSingleton<NodeConfig>(); + + //Get peer adapter + PeerAdapter = plugin.GetOrCreateSingleton<PeerDiscoveryManager>(); + + + } + + public async Task DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken) + { + pluginLog.Information("[REPL] Initializing node replication worker"); + + try + { + while (true) + { + //Get all new peers + ICachePeerAdvertisment[] peers = PeerAdapter.GetNewPeers(); + + if (peers.Length == 0) + { + pluginLog.Verbose("[REPL] No new peers to connect to"); + } + + //Connect to each peer + foreach (ICachePeerAdvertisment peer in peers) + { + _ = Plugin.ObserveWork(() => OnNewPeerDoWorkAsync(peer, pluginLog, exitToken)); + } + + //Wait for a new peers + await Task.Delay(10000, exitToken); + } + } + catch (OperationCanceledException) + { + //Normal exit + } + catch + { + pluginLog.Error("[REPL] Node replication worker exited with an error"); + throw; + } + finally + { + + } + + pluginLog.Information("[REPL] Node replication worker exited"); + } + + private async Task OnNewPeerDoWorkAsync(ICachePeerAdvertisment newPeer, ILogProvider log, CancellationToken exitToken) + { + _ = newPeer ?? throw new ArgumentNullException(nameof(newPeer)); + + //Setup client + FBMClient client = new(ClientConfig); + + try + { + log.Information("Establishing replication connection to peer {server}...", newPeer.NodeId); + + //Connect to the server + await client.ConnectToCacheAsync(newPeer, CacheConfig, exitToken); + + log.Information("Connected to {server}, starting queue listeners", newPeer.NodeId); + + //Start worker tasks + List<Task> workerTasks = new(); + + for (int i = 0; i < Environment.ProcessorCount; i++) + { + Task workerTask = Task.Run(() => ReplicationWorkerDoWorkAsync(client, log, exitToken), exitToken); + + workerTasks.Add(workerTask); + } + + //Wait for sync workers to exit + await Task.WhenAll(workerTasks); + + log.Debug("All cache worker tasks exited successfully, disconnecting from {server}", newPeer.NodeId); + + //Disconnect client gracefully + await client.DisconnectAsync(CancellationToken.None); + } + catch (InvalidResponseException ie) + { + //See if the plugin is unloading + if (!exitToken.IsCancellationRequested) + { + log.Debug("Peer {p} responded with invalid response packet, disconnected. reason\n {reason}", newPeer.NodeId, ie); + } + //Disconnect client gracefully + try + { + await client.DisconnectAsync(CancellationToken.None); + } + catch (Exception ex) + { + log.Error(ex); + } + } + catch (OperationCanceledException) + { + //Plugin unloading, Try to disconnect + try + { + await client.DisconnectAsync(CancellationToken.None); + } + catch (Exception ex) + { + log.Error(ex); + } + } + catch (Exception ex) + { + log.Warn("Lost connection to peer {h}\n {m}", newPeer.NodeId, ex); + } + finally + { + client.Dispose(); + + //Notify monitor of disconnect + PeerAdapter.OnPeerListenerDetatched(newPeer); + } + } + + //Wroker task callback method + private async Task ReplicationWorkerDoWorkAsync(FBMClient client, ILogProvider log, CancellationToken exitToken) + { + //Listen for changes + while (true) + { + //Wait for changes + WaitForChangeResult changedObject = await client.WaitForChangeAsync(exitToken); + + log.Debug("Object changed {typ} {obj}", changedObject.Status, changedObject.CurrentId); + + switch (changedObject.Status) + { + case ResponseCodes.NotFound: + log.Warn("Server cache not properly configured, worker exiting"); + return; + case "deleted": + //Delete the object from the store + await CacheStore.DeleteItemAsync(changedObject.CurrentId); + break; + case "modified": + //Reload the record from the store + await UpdateRecordAsync(client, log, changedObject.CurrentId, changedObject.NewId, exitToken); + break; + } + } + } + + private async Task UpdateRecordAsync(FBMClient client, ILogProvider log, string objectId, string newId, CancellationToken cancellation) + { + //Get request message + FBMRequest modRequest = client.RentRequest(); + try + { + //Set action as get/create + modRequest.WriteHeader(HeaderCommand.Action, Actions.Get); + //Set session-id header + modRequest.WriteHeader(ObjectId, string.IsNullOrWhiteSpace(newId) ? objectId : newId); + + //Make request + using FBMResponse response = await client.SendAsync(modRequest, GetItemTimeout, cancellation); + + response.ThrowIfNotSet(); + + //Check response code + string status = response.Headers.First(static s => s.Header == HeaderCommand.Status).Value.ToString(); + if (ResponseCodes.Okay.Equals(status, StringComparison.Ordinal)) + { + //Update the record + await CacheStore.AddOrUpdateBlobAsync(objectId, newId, static (t) => t.ResponseBody, response, cancellation); + log.Debug("Updated object {id}", objectId); + } + else + { + log.Warn("Object {id} was missing on the remote server", objectId); + } + } + finally + { + client.ReturnRequest(modRequest); + } + } + } +} diff --git a/plugins/ObjectCacheServer/src/Distribution/CachePeerMonitor.cs b/plugins/ObjectCacheServer/src/Distribution/CachePeerMonitor.cs new file mode 100644 index 0000000..82b280c --- /dev/null +++ b/plugins/ObjectCacheServer/src/Distribution/CachePeerMonitor.cs @@ -0,0 +1,55 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: ObjectCacheServer +* File: CachePeerMonitor.cs +* +* CachePeerMonitor.cs is part of ObjectCacheServer which is part of the larger +* VNLib collection of libraries and utilities. +* +* ObjectCacheServer is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* ObjectCacheServer is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Collections.Generic; + +using VNLib.Plugins; + +namespace VNLib.Data.Caching.ObjectCache.Server.Distribution +{ + + internal sealed class CachePeerMonitor : IPeerMonitor + { + public CachePeerMonitor(PluginBase plugin) + { + + } + + public IEnumerable<ICachePeer> GetAllPeers() + { + throw new NotImplementedException(); + } + + public void OnPeerConnected(ICachePeer peer) + { + throw new NotImplementedException(); + } + + public void OnPeerDisconnected(ICachePeer peer) + { + throw new NotImplementedException(); + } + } +} diff --git a/plugins/ObjectCacheServer/src/Distribution/ICachePeerAdapter.cs b/plugins/ObjectCacheServer/src/Distribution/ICachePeerAdapter.cs new file mode 100644 index 0000000..d029f10 --- /dev/null +++ b/plugins/ObjectCacheServer/src/Distribution/ICachePeerAdapter.cs @@ -0,0 +1,49 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: ObjectCacheServer +* File: ObjectCacheServerEntry.cs +* +* ObjectCacheServerEntry.cs is part of ObjectCacheServer which is part of the larger +* VNLib collection of libraries and utilities. +* +* ObjectCacheServer is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* ObjectCacheServer is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using VNLib.Data.Caching.Extensions; + +namespace VNLib.Data.Caching.ObjectCache.Server.Distribution +{ + internal interface ICachePeerAdapter + { + /// <summary> + /// Gets the peers that have been discovered but not yet connected to + /// </summary> + /// <returns>A collection of peers that have not been connected to yet</returns> + ICachePeerAdvertisment[] GetNewPeers(); + + /// <summary> + /// Called when a peer has been connected to + /// </summary> + /// <param name="peer">The peer that has been connected</param> + void OnPeerListenerAttached(ICachePeerAdvertisment peer); + + /// <summary> + /// Called when a peer has been disconnected from + /// </summary> + /// <param name="peer">The disconnected peer</param> + void OnPeerListenerDetatched(ICachePeerAdvertisment peer); + } +} diff --git a/plugins/ObjectCacheServer/src/Distribution/INodeDiscoveryCollection.cs b/plugins/ObjectCacheServer/src/Distribution/INodeDiscoveryCollection.cs new file mode 100644 index 0000000..d69da40 --- /dev/null +++ b/plugins/ObjectCacheServer/src/Distribution/INodeDiscoveryCollection.cs @@ -0,0 +1,58 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: ObjectCacheServer +* File: INodeDiscoveryCollection.cs +* +* INodeDiscoveryCollection.cs is part of ObjectCacheServer which is +* part of the larger VNLib collection of libraries and utilities. +* +* ObjectCacheServer is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* ObjectCacheServer is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System.Collections.Generic; + +using VNLib.Data.Caching.Extensions; + +namespace VNLib.Data.Caching.ObjectCache.Server.Distribution +{ + internal interface INodeDiscoveryCollection + { + /// <summary> + /// Begins a new discovery and gets an enumerator for the discovery process + /// </summary> + /// <returns>An enumerator that simplifies discovery of unique nodes</returns> + INodeDiscoveryEnumerator BeginDiscovery(); + + /// <summary> + /// Begins a new discovery and gets an enumerator for the discovery process + /// </summary> + /// <param name="initialPeers">An initial collection of peers to add to the enumeration</param> + /// <returns>An enumerator that simplifies discovery of unique nodes</returns> + INodeDiscoveryEnumerator BeginDiscovery(IEnumerable<ICachePeerAdvertisment> initialPeers); + + /// <summary> + /// Gets a snapshot of all discovered nodes in the current collection. + /// </summary> + /// <returns>The current collection of notes</returns> + ICachePeerAdvertisment[] GetAllNodes(); + + /// <summary> + /// Completes a discovery process and updates the collection with the results + /// </summary> + /// <param name="enumerator">The enumerator used to collect discovered nodes</param> + void CompleteDiscovery(INodeDiscoveryEnumerator enumerator); + } +} diff --git a/plugins/ObjectCacheServer/src/Distribution/INodeDiscoveryEnumerator.cs b/plugins/ObjectCacheServer/src/Distribution/INodeDiscoveryEnumerator.cs new file mode 100644 index 0000000..5cddf9c --- /dev/null +++ b/plugins/ObjectCacheServer/src/Distribution/INodeDiscoveryEnumerator.cs @@ -0,0 +1,45 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: ObjectCacheServer +* File: INodeDiscoveryEnumerator.cs +* +* INodeDiscoveryEnumerator.cs is part of ObjectCacheServer which is part +* of the larger VNLib collection of libraries and utilities. +* +* ObjectCacheServer is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* ObjectCacheServer is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System.Collections.Generic; + +using VNLib.Data.Caching.Extensions; + +namespace VNLib.Data.Caching.ObjectCache.Server.Distribution +{ + internal interface INodeDiscoveryEnumerator + { + /// <summary> + /// Moves the enumerator to the next peer in the discovery process and returns the result + /// </summary> + /// <returns>The next peer advertisment in the enumeration</returns> + ICachePeerAdvertisment? GetNextPeer(); + + /// <summary> + /// Adds the specified peer to the collection of discovered peers + /// </summary> + /// <param name="discoveredPeers">The peer collection</param> + void OnPeerDiscoveryComplete(IEnumerable<ICachePeerAdvertisment> discoveredPeers); + } +} diff --git a/plugins/ObjectCacheServer/src/Distribution/IPeerMonitor.cs b/plugins/ObjectCacheServer/src/Distribution/IPeerMonitor.cs new file mode 100644 index 0000000..b4cb840 --- /dev/null +++ b/plugins/ObjectCacheServer/src/Distribution/IPeerMonitor.cs @@ -0,0 +1,53 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: ObjectCacheServer +* File: IPeerMonitor.cs +* +* IPeerMonitor.cs is part of ObjectCacheServer which is part of the larger +* VNLib collection of libraries and utilities. +* +* ObjectCacheServer is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* ObjectCacheServer is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System.Collections.Generic; + +namespace VNLib.Data.Caching.ObjectCache.Server.Distribution +{ + /// <summary> + /// Represents a monitor for peer cache servers to advertise their presence + /// in the cluster + /// </summary> + internal interface IPeerMonitor + { + /// <summary> + /// Notifies the monitor that a peer has connected to the cluster + /// </summary> + /// <param name="peer">The peer that connected</param> + void OnPeerConnected(ICachePeer peer); + + /// <summary> + /// Notifies the monitor that a peer has disconnected + /// </summary> + /// <param name="peer">The peer that has disconnected</param> + void OnPeerDisconnected(ICachePeer peer); + + /// <summary> + /// Gets an enumerable of all peers currently active in the current peer + /// </summary> + /// <returns></returns> + IEnumerable<ICachePeer> GetAllPeers(); + } +} diff --git a/plugins/ObjectCacheServer/src/Distribution/NodeDiscoveryCollection.cs b/plugins/ObjectCacheServer/src/Distribution/NodeDiscoveryCollection.cs new file mode 100644 index 0000000..f773a2e --- /dev/null +++ b/plugins/ObjectCacheServer/src/Distribution/NodeDiscoveryCollection.cs @@ -0,0 +1,99 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: ObjectCacheServer +* File: PeerDiscoveryManager.cs +* +* PeerDiscoveryManager.cs is part of ObjectCacheServer which is part of the larger +* VNLib collection of libraries and utilities. +* +* ObjectCacheServer is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* ObjectCacheServer is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Linq; +using System.Collections.Generic; + +using VNLib.Plugins; +using VNLib.Data.Caching.Extensions; + +namespace VNLib.Data.Caching.ObjectCache.Server.Distribution +{ + sealed class NodeDiscoveryCollection : INodeDiscoveryCollection + { + private LinkedList<ICachePeerAdvertisment> _peers; + + + public NodeDiscoveryCollection(PluginBase plugin) + { + _peers = new(); + } + + ///<inheritdoc/> + public INodeDiscoveryEnumerator BeginDiscovery() + { + return new NodeEnumerator(new()); + } + + ///<inheritdoc/> + public INodeDiscoveryEnumerator BeginDiscovery(IEnumerable<ICachePeerAdvertisment> initialPeers) + { + //Init new enumerator with the initial peers + return new NodeEnumerator(new(initialPeers)); + } + + ///<inheritdoc/> + public void CompleteDiscovery(INodeDiscoveryEnumerator enumerator) + { + _ = enumerator ?? throw new ArgumentNullException(nameof(enumerator)); + + //Capture all nodes from the enumerator and store them as our current peers + _peers = (enumerator as NodeEnumerator)!.Peers; + } + + ///<inheritdoc/> + public ICachePeerAdvertisment[] GetAllNodes() + { + //Capture all current peers + return _peers.ToArray(); + } + + private sealed record class NodeEnumerator(LinkedList<ICachePeerAdvertisment> Peers) : INodeDiscoveryEnumerator + { + //Keep track of the current node in the collection so we can move down the list + private LinkedListNode<ICachePeerAdvertisment>? _currentNode = Peers.First; + + public ICachePeerAdvertisment? GetNextPeer() + { + //Move to the next peer in the collection + _currentNode = _currentNode?.Next; + + return _currentNode?.Value; + } + + public void OnPeerDiscoveryComplete(IEnumerable<ICachePeerAdvertisment> discoveredPeers) + { + //Get only the peers from the discovery that are not already in the collection + IEnumerable<ICachePeerAdvertisment> newPeers = discoveredPeers.Except(Peers); + + //Add them to the end of the collection + foreach(ICachePeerAdvertisment ad in newPeers) + { + Peers.AddLast(ad); + } + } + } + } +} diff --git a/plugins/ObjectCacheServer/src/Distribution/PeerDiscoveryManager.cs b/plugins/ObjectCacheServer/src/Distribution/PeerDiscoveryManager.cs new file mode 100644 index 0000000..54e4258 --- /dev/null +++ b/plugins/ObjectCacheServer/src/Distribution/PeerDiscoveryManager.cs @@ -0,0 +1,168 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: ObjectCacheServer +* File: PeerDiscoveryManager.cs +* +* PeerDiscoveryManager.cs is part of ObjectCacheServer which is part of the larger +* VNLib collection of libraries and utilities. +* +* ObjectCacheServer is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* ObjectCacheServer is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using System.Collections.Generic; + +using VNLib.Plugins; +using VNLib.Utils.Logging; +using VNLib.Data.Caching.Extensions; +using VNLib.Plugins.Extensions.Loading; + +namespace VNLib.Data.Caching.ObjectCache.Server.Distribution +{ + + sealed class PeerDiscoveryManager : IAsyncBackgroundWork, ICachePeerAdapter + { + private readonly NodeConfig _config; + private readonly IPeerMonitor _monitor; + private readonly INodeDiscoveryCollection _peers; + + public PeerDiscoveryManager(PluginBase plugin) + { + //Get config + _config = plugin.GetOrCreateSingleton<NodeConfig>(); + + //Get the peer monitor + _monitor = plugin.GetOrCreateSingleton<CachePeerMonitor>(); + + //Get the node collection + _peers = plugin.GetOrCreateSingleton<NodeDiscoveryCollection>(); + + _connectedPeers = new(); + } + + async Task IAsyncBackgroundWork.DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken) + { + pluginLog.Information("Node discovery worker started"); + + try + { + while (true) + { + try + { + await DiscoverAllNodesAsync(pluginLog, exitToken); + } + catch(OperationCanceledException) + { + throw; + } + catch (Exception ex) + { + pluginLog.Error(ex, "Failed to discover new peer nodes"); + } + + //Delay the next discovery + await Task.Delay(_config.DiscoveryInterval, exitToken); + } + } + catch (OperationCanceledException) + { + //Normal exit + pluginLog.Information("Node discovery worker exiting"); + } + finally + { + + } + } + + async Task DiscoverAllNodesAsync(ILogProvider log, CancellationToken cancellation) + { + //Use the monitor to get the initial peers + IEnumerable<ICachePeerAdvertisment> ads = _monitor.GetAllPeers() + .Where(static p => p.Advertisment != null) + .Select(static p => p.Advertisment!); + + //Init enumerator with initial peers + INodeDiscoveryEnumerator enumerator = _peers.BeginDiscovery(ads); + + do + { + //Load the initial peer + ICachePeerAdvertisment? peer = enumerator.GetNextPeer(); + + if (peer == null) + { + break; + } + + log.Verbose("Discovering peer nodes from {Peer}", peer.NodeId); + + //Discover nodes from this peer + ICachePeerAdvertisment[]? newNodes = await _config.Config.DiscoverClusterNodesAsync(peer, cancellation); + + //Add nodes to the enumerator + if (newNodes != null) + { + enumerator.OnPeerDiscoveryComplete(newNodes); + } + + } while (true); + + //Commit peer updates + _peers.CompleteDiscovery(enumerator); + } + + + private readonly List<ICachePeerAdvertisment> _connectedPeers; + + ///<inheritdoc/> + public ICachePeerAdvertisment[] GetNewPeers() + { + lock (_connectedPeers) + { + //Get all discovered peers + ICachePeerAdvertisment[] peers = _peers.GetAllNodes(); + + //Get the difference between the discovered peers and the connected peers + return peers.Except(_connectedPeers).ToArray(); + } + } + + ///<inheritdoc/> + public void OnPeerListenerAttached(ICachePeerAdvertisment peer) + { + lock (_connectedPeers) + { + //Add to connected peers + _connectedPeers.Add(peer); + } + } + + ///<inheritdoc/> + public void OnPeerListenerDetatched(ICachePeerAdvertisment peer) + { + //remove from connected peers + lock (_connectedPeers) + { + _connectedPeers.Remove(peer); + } + } + } +} diff --git a/plugins/ObjectCacheServer/src/Endpoints/BrokerHeartBeat.cs b/plugins/ObjectCacheServer/src/Endpoints/BrokerHeartBeatEndpoint.cs index 97061b3..b9c00e6 100644 --- a/plugins/ObjectCacheServer/src/Endpoints/BrokerHeartBeat.cs +++ b/plugins/ObjectCacheServer/src/Endpoints/BrokerHeartBeatEndpoint.cs @@ -1,11 +1,11 @@ /* -* Copyright (c) 2022 Vaughn Nugent +* Copyright (c) 2023 Vaughn Nugent * * Library: VNLib * Package: ObjectCacheServer -* File: BrokerHeartBeat.cs +* File: BrokerHeartBeatEndpoint.cs * -* BrokerHeartBeat.cs is part of ObjectCacheServer which is part of the larger +* BrokerHeartBeatEndpoint.cs is part of ObjectCacheServer which is part of the larger * VNLib collection of libraries and utilities. * * ObjectCacheServer is free software: you can redistribute it and/or modify @@ -26,11 +26,11 @@ using System; using System.Net; using System.Linq; using System.Text.Json; -using System.Threading; using System.Threading.Tasks; -using System.Collections.Generic; + using VNLib.Plugins; +using VNLib.Utils.Logging; using VNLib.Plugins.Essentials; using VNLib.Hashing.IdentityUtility; using VNLib.Plugins.Essentials.Endpoints; @@ -39,14 +39,11 @@ using VNLib.Plugins.Extensions.Loading; namespace VNLib.Data.Caching.ObjectCache.Server { - internal sealed class BrokerHeartBeat : ResourceEndpointBase + internal sealed class BrokerHeartBeatEndpoint : ResourceEndpointBase { - public override string Path => "/heartbeat"; - - private readonly Func<string> Token; - private readonly ManualResetEvent KeepaliveSet; + private readonly IBrokerHeartbeatNotifier _heartBeat; private readonly Task<IPAddress[]> BrokerIpList; - private readonly PluginBase Pbase; + private readonly bool DebugMode; ///<inheritdoc/> protected override ProtectionSettings EndpointProtectionSettings { get; } = new() @@ -55,19 +52,24 @@ namespace VNLib.Data.Caching.ObjectCache.Server DisableSessionsRequired = true }; - public BrokerHeartBeat(Func<string> token, ManualResetEvent keepaliveSet, Uri brokerUri, PluginBase pbase) + public BrokerHeartBeatEndpoint(PluginBase plugin) { - Token = token; - KeepaliveSet = keepaliveSet; - BrokerIpList = Dns.GetHostAddressesAsync(brokerUri.DnsSafeHost); - - this.Pbase = pbase; - } + //Get debug flag + DebugMode = plugin.IsDebug(); - private async Task<ReadOnlyJsonWebKey> GetBrokerPubAsync() - { - return await Pbase.TryGetSecretAsync("broker_public_key").ToJsonWebKey() ?? throw new KeyNotFoundException("Missing required secret : broker_public_key"); + //Get or create the current node config + _heartBeat = plugin.GetOrCreateSingleton<NodeConfig>(); + + /* + * Resolve the ip address of the broker and store it to verify connections + * later + */ + BrokerIpList = Dns.GetHostAddressesAsync(_heartBeat.GetBrokerAddress().DnsSafeHost); + + //Setup endpoint + InitPathAndLog("/heartbeat", plugin.Log); } + protected override async ValueTask<VfReturnType> GetAsync(HttpEntity entity) { @@ -76,13 +78,20 @@ namespace VNLib.Data.Caching.ObjectCache.Server { //Load and verify the broker's ip address matches with an address we have stored IPAddress[] addresses = await BrokerIpList; + if (!addresses.Contains(entity.TrustedRemoteIp)) { + if (DebugMode) + { + Log.Debug("Received connection {ip} that was not a DNS safe address for the broker server, access denied"); + } + //Token invalid entity.CloseResponse(HttpStatusCode.Forbidden); return VfReturnType.VirtualSkip; } } + //Get the authorization jwt string? jwtAuth = entity.Server.Headers[HttpRequestHeader.Authorization]; @@ -97,7 +106,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server using JsonWebToken jwt = JsonWebToken.Parse(jwtAuth); //Verify the jwt using the broker's public key certificate - using (ReadOnlyJsonWebKey cert = await GetBrokerPubAsync()) + using (ReadOnlyJsonWebKey cert = _heartBeat.GetBrokerPublicKey()) { //Verify the jwt if (!jwt.VerifyFromJwk(cert)) @@ -114,16 +123,24 @@ namespace VNLib.Data.Caching.ObjectCache.Server { auth = doc.RootElement.GetProperty("token").GetString(); } - + + //Get our stored token used for registration + string? selfToken = _heartBeat.GetAuthToken(); + //Verify token - if(Token().Equals(auth, StringComparison.Ordinal)) + if (selfToken != null && selfToken.Equals(auth, StringComparison.Ordinal)) { //Signal keepalive - KeepaliveSet.Set(); + _heartBeat.HearbeatReceived(); entity.CloseResponse(HttpStatusCode.OK); return VfReturnType.VirtualSkip; } - + + if (DebugMode) + { + Log.Debug("Invalid auth token recieved from broker sever, access denied"); + } + //Token invalid entity.CloseResponse(HttpStatusCode.Forbidden); return VfReturnType.VirtualSkip; diff --git a/plugins/ObjectCacheServer/src/Endpoints/CacheStore.cs b/plugins/ObjectCacheServer/src/Endpoints/CacheStore.cs new file mode 100644 index 0000000..67db433 --- /dev/null +++ b/plugins/ObjectCacheServer/src/Endpoints/CacheStore.cs @@ -0,0 +1,91 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: ObjectCacheServer +* File: ConnectEndpoint.cs +* +* ConnectEndpoint.cs is part of ObjectCacheServer which is part of the larger +* VNLib collection of libraries and utilities. +* +* ObjectCacheServer is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* ObjectCacheServer is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Threading; +using System.Threading.Tasks; +using VNLib.Utils.Logging; +using VNLib.Plugins; +using VNLib.Plugins.Extensions.Loading; + +namespace VNLib.Data.Caching.ObjectCache.Server +{ + [ConfigurationName("cache")] + sealed class CacheStore : ICacheStore, IDisposable + { + public BlobCacheListener Listener { get; } + + + public CacheStore(PluginBase plugin, IConfigScope config) + { + //Init cache + Listener = InitializeCache((ObjectCacheServerEntry)plugin, config); + } + + ValueTask ICacheStore.AddOrUpdateBlobAsync<T>(string objectId, string? alternateId, GetBodyDataCallback<T> bodyData, T state, CancellationToken token) + { + return Listener.Cache.AddOrUpdateObjectAsync(objectId, alternateId, bodyData, state, default, token); + } + + void ICacheStore.Clear() + { + throw new NotImplementedException(); + } + + ValueTask<bool> ICacheStore.DeleteItemAsync(string id, CancellationToken token) + { + return Listener.Cache.DeleteObjectAsync(id, token); + } + + private static BlobCacheListener InitializeCache(ObjectCacheServerEntry plugin, IConfigScope config) + { + //Deserialize the cache config + CacheConfiguration cacheConf = config.Deserialze<CacheConfiguration>(); + + if (cacheConf.MaxCacheEntries < 2) + { + throw new ArgumentException("You must configure a 'max_cache' size larger than 1 item"); + } + + //Suggestion + if (cacheConf.MaxCacheEntries < 200) + { + plugin.Log.Information("Suggestion: You may want a larger cache size, you have less than 200 items in cache"); + } + + plugin.Log.Verbose("Creating cache store with {bc} buckets, with {mc} items/bucket", cacheConf.BucketCount, cacheConf.MaxCacheEntries); + + //Load the blob cache table system + IBlobCacheTable bc = plugin.LoadMemoryCacheSystem(config, plugin.CacheHeap, cacheConf); + + //Endpoint only allows for a single reader + return new(bc, plugin.Log, plugin.CacheHeap, true); + } + + public void Dispose() + { + Listener.Dispose(); + } + } +} diff --git a/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs b/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs index 2f896bc..167a7e9 100644 --- a/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs +++ b/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs @@ -27,40 +27,46 @@ using System.Net; using System.Text.Json; using System.Threading; using System.Threading.Tasks; -using System.Threading.Channels; using System.Collections.Generic; -using System.Collections.Concurrent; -using VNLib.Plugins; using VNLib.Hashing; using VNLib.Net.Http; using VNLib.Utils.Async; using VNLib.Utils.Memory; using VNLib.Utils.Logging; +using VNLib.Utils.Extensions; using VNLib.Data.Caching; +using VNLib.Data.Caching.Extensions; using VNLib.Hashing.IdentityUtility; using VNLib.Net.Messaging.FBM; using VNLib.Net.Messaging.FBM.Client; using VNLib.Net.Messaging.FBM.Server; +using VNLib.Plugins; using VNLib.Plugins.Essentials; using VNLib.Plugins.Extensions.Loading; using VNLib.Plugins.Essentials.Endpoints; using VNLib.Plugins.Essentials.Extensions; - +using VNLib.Plugins.Extensions.Loading.Routing; +using VNLib.Data.Caching.ObjectCache.Server.Distribution; namespace VNLib.Data.Caching.ObjectCache.Server { - [ConfigurationName("store")] - internal sealed class ConnectEndpoint : ResourceEndpointBase, IDisposable, IAsyncBackgroundWork + [ConfigurationName("connect_endpoint")] + internal sealed class ConnectEndpoint : ResourceEndpointBase, IAsyncBackgroundWork { - private static readonly TimeSpan AuthTokenExpiration = TimeSpan.FromSeconds(30); + private static readonly TimeSpan AuthTokenExpiration = TimeSpan.FromSeconds(30); + + + private readonly CacheNodeConfiguration NodeConfiguration; + private readonly ICacheEventQueueManager PubSubManager; + private readonly IPeerMonitor Peers; - private readonly string AudienceLocalServerId; private readonly BlobCacheListener Store; - private readonly PluginBase Pbase; + private readonly CacheAuthKeyStore KeyStore; - private readonly ConcurrentDictionary<string, AsyncQueue<ChangeEvent>> StatefulEventQueue; + private readonly bool VerifyIp; + private readonly string AudienceLocalServerId; private uint _connectedClients; @@ -87,27 +93,26 @@ namespace VNLib.Data.Caching.ObjectCache.Server string? path = config["path"].GetString(); InitPathAndLog(path, plugin.Log); - - Pbase = plugin; - //Parse cache config or use default - if(config.TryGetValue("cache", out JsonElement confEl)) - { - CacheConfig = confEl.Deserialize<CacheConfiguration>()!; - } - else - { - //Init default config if not fount - CacheConfig = new(); + KeyStore = new(plugin); - Log.Verbose("Loading default cache buffer configuration"); - } + //Check for ip-verification flag + VerifyIp = config.TryGetValue("verify_ip", out JsonElement vIpEl) && vIpEl.GetBoolean(); - //Create event queue client lookup table - StatefulEventQueue = new(StringComparer.OrdinalIgnoreCase); + //Setup pub/sub manager + PubSubManager = plugin.GetOrCreateSingleton<CacheEventQueueManager>(); + + //Get node configuration + NodeConfiguration = plugin.GetOrCreateSingleton<NodeConfig>().Config; + + //Get peer monitor + Peers = plugin.GetOrCreateSingleton<CachePeerMonitor>(); //Init the cache store - Store = InitializeCache((ObjectCacheServerEntry)plugin, CacheConfig, config); + Store = plugin.GetOrCreateSingleton<CacheStore>().Listener; + + //Get the cache store configuration + CacheConfig = plugin.GetConfigForType<CacheStore>().Deserialze<CacheConfiguration>(); /* * Generate a random guid for the current server when created so we @@ -118,60 +123,8 @@ namespace VNLib.Data.Caching.ObjectCache.Server //Schedule the queue worker to be run _ = plugin.ObserveWork(this, 100); } - - - private static BlobCacheListener InitializeCache(ObjectCacheServerEntry plugin, CacheConfiguration cacheConf, IConfigScope config) - { - if(cacheConf.MaxCacheEntries < 2) - { - throw new ArgumentException("You must configure a 'max_cache' size larger than 1 item"); - } - - //Suggestion - if(cacheConf.MaxCacheEntries < 200) - { - plugin.Log.Information("Suggestion: You may want a larger cache size, you have less than 200 items in cache"); - } - - plugin.Log.Verbose("Creating cache store with {bc} buckets, with {mc} items/bucket", cacheConf.BucketCount, cacheConf.MaxCacheEntries); - - //Load the blob cache table system - IBlobCacheTable bc = plugin.LoadMemoryCacheSystem(config, plugin.CacheHeap, cacheConf); - - //Endpoint only allows for a single reader - return new (bc, plugin.Log, plugin.CacheHeap, true); - } - - - /// <summary> - /// Gets the configured cache store - /// </summary> - /// <returns></returns> - public ICacheStore GetCacheStore() => new CacheStore(Store); - //Dispose will be called by the host plugin on unload - void IDisposable.Dispose() - { - //Dispose the store on cleanup - Store.Dispose(); - } - - - private async Task<ReadOnlyJsonWebKey> GetClientPubAsync() - { - return await Pbase.TryGetSecretAsync("client_public_key").ToJsonWebKey() ?? throw new KeyNotFoundException("Missing required secret : client_public_key"); - } - private async Task<ReadOnlyJsonWebKey> GetCachePubAsync() - { - return await Pbase.TryGetSecretAsync("cache_public_key").ToJsonWebKey() ?? throw new KeyNotFoundException("Missing required secret : client_public_key"); - } - private async Task<ReadOnlyJsonWebKey> GetCachePrivateKeyAsync() - { - return await Pbase.TryGetSecretAsync("cache_private_key").ToJsonWebKey() ?? throw new KeyNotFoundException("Missing required secret : client_public_key"); - } - - /* * Used as a client negotiation and verification request * @@ -183,6 +136,10 @@ namespace VNLib.Data.Caching.ObjectCache.Server * * The tokens are very short lived as requests are intended to be made * directly after verification + * + * Clients must also sign the entire token with their private key and + * set the signature in the x-upgrade-sig header so we can verify they + * received the messages properly */ protected override async ValueTask<VfReturnType> GetAsync(HttpEntity entity) @@ -205,7 +162,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server bool verified = false; //Get the client public key certificate to verify the client's message - using(ReadOnlyJsonWebKey cert = await GetClientPubAsync()) + using(ReadOnlyJsonWebKey cert = await KeyStore.GetClientPublicKeyAsync()) { //verify signature for client if (jwt.VerifyFromJwk(cert)) @@ -215,10 +172,8 @@ namespace VNLib.Data.Caching.ObjectCache.Server //May be signed by a cache server else { - using ReadOnlyJsonWebKey cacheCert = await GetCachePubAsync(); - //Set peer and verified flag since the another cache server signed the request - isPeer = verified = jwt.VerifyFromJwk(cacheCert); + isPeer = verified = NodeConfiguration.VerifyCache(jwt); } } @@ -232,10 +187,12 @@ namespace VNLib.Data.Caching.ObjectCache.Server //Recover json body using JsonDocument doc = jwt.GetPayload(); + if (doc.RootElement.TryGetProperty("sub", out JsonElement servIdEl)) { nodeId = servIdEl.GetString(); } + if (doc.RootElement.TryGetProperty("chl", out JsonElement challengeEl)) { challenge = challengeEl.GetString(); @@ -246,133 +203,147 @@ namespace VNLib.Data.Caching.ObjectCache.Server //Verified, now we can create an auth message with a short expiration using JsonWebToken auth = new(); + + auth.WriteHeader(NodeConfiguration.GetJwtHeader()); + auth.InitPayloadClaim() + .AddClaim("aud", AudienceLocalServerId) + .AddClaim("exp", entity.RequestedTimeUtc.Add(AuthTokenExpiration).ToUnixTimeSeconds()) + .AddClaim("nonce", RandomHash.GetRandomBase32(8)) + .AddClaim("chl", challenge!) + //Set the ispeer flag if the request was signed by a cache server + .AddClaim("isPeer", isPeer) + //Specify the server's node id if set + .AddClaim("sub", nodeId!) + //Set ip address + .AddClaim("ip", entity.TrustedRemoteIp.ToString()) + //Add negotiaion args + .AddClaim(FBMClient.REQ_HEAD_BUF_QUERY_ARG, CacheConfig.MaxHeaderBufferSize) + .AddClaim(FBMClient.REQ_RECV_BUF_QUERY_ARG, CacheConfig.MaxRecvBufferSize) + .AddClaim(FBMClient.REQ_MAX_MESS_QUERY_ARG, CacheConfig.MaxMessageSize) + .CommitClaims(); + + //Sign the auth message from our private key + NodeConfiguration.SignJwt(auth); - //Sign the auth message from the cache certificate's private key - using (ReadOnlyJsonWebKey cert = await GetCachePrivateKeyAsync()) - { - auth.WriteHeader(cert.JwtHeader); - auth.InitPayloadClaim() - .AddClaim("aud", AudienceLocalServerId) - .AddClaim("exp", entity.RequestedTimeUtc.Add(AuthTokenExpiration).ToUnixTimeSeconds()) - .AddClaim("nonce", RandomHash.GetRandomBase32(8)) - .AddClaim("chl", challenge!) - //Set the ispeer flag if the request was signed by a cache server - .AddClaim("isPeer", isPeer) - //Specify the server's node id if set - .AddClaim("sub", nodeId!) - //Add negotiaion args - .AddClaim(FBMClient.REQ_HEAD_BUF_QUERY_ARG, CacheConfig.MaxHeaderBufferSize) - .AddClaim(FBMClient.REQ_RECV_BUF_QUERY_ARG, CacheConfig.MaxRecvBufferSize) - .AddClaim(FBMClient.REQ_MAX_MESS_QUERY_ARG, CacheConfig.MaxMessageSize) - .CommitClaims(); - - auth.SignFromJwk(cert); - } - //Close response entity.CloseResponse(HttpStatusCode.OK, ContentType.Text, auth.DataBuffer); return VfReturnType.VirtualSkip; } - - //Background worker to process event queue items - async Task IAsyncBackgroundWork.DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken) + protected override async ValueTask<VfReturnType> WebsocketRequestedAsync(HttpEntity entity) { - try - { - //Listen for changes - while (true) - { - ChangeEvent ev = await Store.EventQueue.DequeueAsync(exitToken); + //Parse jwt from authorization + string? jwtAuth = entity.Server.Headers[HttpRequestHeader.Authorization]; - //Add event to queues - foreach (AsyncQueue<ChangeEvent> queue in StatefulEventQueue.Values) - { - if (!queue.TryEnque(ev)) - { - Log.Debug("Listener queue has exeeded capacity, change events will be lost"); - } - } - } - } - catch (OperationCanceledException) + if (string.IsNullOrWhiteSpace(jwtAuth)) { - //Normal exit + entity.CloseResponse(HttpStatusCode.Unauthorized); + return VfReturnType.VirtualSkip; } - } - private class WsUserState - { - public int RecvBufferSize { get; init; } - public int MaxHeaderBufferSize { get; init; } - public int MaxMessageSize { get; init; } - public int MaxResponseBufferSize { get; init; } - public AsyncQueue<ChangeEvent>? SyncQueue { get; init; } + //Get the upgrade signature header + string? clientSignature = entity.Server.Headers[FBMDataCacheExtensions.X_UPGRADE_SIG_HEADER]; - public override string ToString() + if (string.IsNullOrWhiteSpace(clientSignature)) { - return - $"{nameof(RecvBufferSize)}:{RecvBufferSize}, {nameof(MaxHeaderBufferSize)}: {MaxHeaderBufferSize}, {nameof(MaxMessageSize)}:{MaxMessageSize}, {nameof(MaxResponseBufferSize)}:{MaxResponseBufferSize}"; + entity.CloseResponse(HttpStatusCode.Unauthorized); + return VfReturnType.VirtualSkip; } - } - protected override async ValueTask<VfReturnType> WebsocketRequestedAsync(HttpEntity entity) - { - try + string? nodeId = null; + ICachePeerAdvertisment? discoveryAd = null; + + //Parse jwt + using (JsonWebToken jwt = JsonWebToken.Parse(jwtAuth)) { - //Parse jwt from authorization - string? jwtAuth = entity.Server.Headers[HttpRequestHeader.Authorization]; + //verify signature against the cache public key, since this server must have signed it + if (!NodeConfiguration.VerifyCache(jwt)) + { + entity.CloseResponse(HttpStatusCode.Unauthorized); + return VfReturnType.VirtualSkip; + } + + //Recover json body + using JsonDocument doc = jwt.GetPayload(); + + //Verify audience, expiration - if (string.IsNullOrWhiteSpace(jwtAuth)) + if (!doc.RootElement.TryGetProperty("aud", out JsonElement audEl) || !AudienceLocalServerId.Equals(audEl.GetString(), StringComparison.OrdinalIgnoreCase)) + { + entity.CloseResponse(HttpStatusCode.Unauthorized); + return VfReturnType.VirtualSkip; + } + + if (!doc.RootElement.TryGetProperty("exp", out JsonElement expEl) + || DateTimeOffset.FromUnixTimeSeconds(expEl.GetInt64()) < entity.RequestedTimeUtc) { entity.CloseResponse(HttpStatusCode.Unauthorized); return VfReturnType.VirtualSkip; } - - string? nodeId = null; - //Parse jwt - using (JsonWebToken jwt = JsonWebToken.Parse(jwtAuth)) + //Check node ip address matches if required + if (VerifyIp) { - //Get the client public key certificate to verify the client's message - using (ReadOnlyJsonWebKey cert = await GetCachePubAsync()) + if (!doc.RootElement.TryGetProperty("ip", out JsonElement ipEl)) { - //verify signature against the cache public key, since this server must have signed it - if (!jwt.VerifyFromJwk(cert)) - { - entity.CloseResponse(HttpStatusCode.Unauthorized); - return VfReturnType.VirtualSkip; - } + entity.CloseResponse(HttpStatusCode.Unauthorized); + return VfReturnType.VirtualSkip; } - - //Recover json body - using JsonDocument doc = jwt.GetPayload(); - - //Verify audience, expiration - if (!doc.RootElement.TryGetProperty("aud", out JsonElement audEl) || !AudienceLocalServerId.Equals(audEl.GetString(), StringComparison.OrdinalIgnoreCase)) + string? clientIp = ipEl.GetString(); + //Verify the client ip address matches the one in the token + if (clientIp == null || !IPAddress.TryParse(clientIp, out IPAddress? clientIpAddr) || !clientIpAddr.Equals(entity.TrustedRemoteIp)) { entity.CloseResponse(HttpStatusCode.Unauthorized); return VfReturnType.VirtualSkip; } + } + + //Check if the client is a peer + bool isPeer = doc.RootElement.TryGetProperty("isPeer", out JsonElement isPeerEl) && isPeerEl.GetBoolean(); - if (!doc.RootElement.TryGetProperty("exp", out JsonElement expEl) - || DateTimeOffset.FromUnixTimeSeconds(expEl.GetInt64()) < entity.RequestedTimeUtc) + //The node id is optional and stored in the 'sub' field, ignore if the client is not a peer + if (isPeer && doc.RootElement.TryGetProperty("sub", out JsonElement servIdEl)) + { + nodeId = servIdEl.GetString(); + } + + //Verify the signature the client included of the auth token + + if (isPeer) + { + //Verify token signature against a fellow cache public key + if (!NodeConfiguration.VerifyUpgradeToken(clientSignature, jwtAuth)) { entity.CloseResponse(HttpStatusCode.Unauthorized); return VfReturnType.VirtualSkip; } - //Check if the client is a peer - bool isPeer = doc.RootElement.TryGetProperty("isPeer", out JsonElement isPeerEl) && isPeerEl.GetBoolean(); + //Try to get the node advertisement header + string? discoveryHeader = entity.Server.Headers[FBMDataCacheExtensions.X_NODE_DISCOVERY_HEADER]; - //The node id is optional and stored in the 'sub' field, ignore if the client is not a peer - if (isPeer && doc.RootElement.TryGetProperty("sub", out JsonElement servIdEl)) + //Verify the node advertisement header and publish it + if (!string.IsNullOrWhiteSpace(discoveryHeader)) { - nodeId = servIdEl.GetString(); + discoveryAd = NodeConfiguration.VerifyPeerAdvertisment(discoveryHeader); } } - + else + { + //Not a peer, so verify against the client's public key + using ReadOnlyJsonWebKey clientPub = await KeyStore.GetClientPublicKeyAsync(); + + //Verify token signature + if (!FBMDataCacheExtensions.VerifyUpgradeToken(clientSignature, jwtAuth, clientPub)) + { + entity.CloseResponse(HttpStatusCode.Unauthorized); + return VfReturnType.VirtualSkip; + } + } + } + + try + { //Get query config suggestions from the client string recvBufCmd = entity.QueryArgs[FBMClient.REQ_RECV_BUF_QUERY_ARG]; string maxHeaderCharCmd = entity.QueryArgs[FBMClient.REQ_HEAD_BUF_QUERY_ARG]; @@ -382,34 +353,6 @@ namespace VNLib.Data.Caching.ObjectCache.Server int recvBufSize = int.TryParse(recvBufCmd, out int rbs) ? rbs : CacheConfig.MinRecvBufferSize; int maxHeadBufSize = int.TryParse(maxHeaderCharCmd, out int hbs) ? hbs : CacheConfig.MinHeaderBufferSize; int maxMessageSize = int.TryParse(maxMessageSizeCmd, out int mxs) ? mxs : CacheConfig.MaxMessageSize; - - AsyncQueue<ChangeEvent>? nodeQueue = null; - - //The connection may be a caching server node, so get its node-id - if (!string.IsNullOrWhiteSpace(nodeId)) - { - /* - * Store a new async queue, or get an old queue for the current node - * - * We should use a bounded queue and disacard LRU items, we also know - * only a single writer is needed as the queue is processed on a single thread - * and change events may be processed on mutliple threads. - */ - - BoundedChannelOptions queueOptions = new(CacheConfig.MaxEventQueueDepth) - { - AllowSynchronousContinuations = true, - SingleReader = false, - SingleWriter = true, - //Drop oldest item in queue if full - FullMode = BoundedChannelFullMode.DropOldest, - }; - - _ = StatefulEventQueue.TryAdd(nodeId, new(queueOptions)); - - //Get the queue - nodeQueue = StatefulEventQueue[nodeId]; - } /* * Buffer sizing can get messy as the response/resquest sizes can vary @@ -434,7 +377,8 @@ namespace VNLib.Data.Caching.ObjectCache.Server */ MaxResponseBufferSize = (int)MemoryUtil.NearestPage(maxMessageSizeClamp), - SyncQueue = nodeQueue + NodeId = nodeId, + Advertisment = discoveryAd }; Log.Debug("Client recv buffer suggestion {recv}, header buffer size {head}, response buffer size {r}", recvBufCmd, maxHeaderCharCmd, state.MaxResponseBufferSize); @@ -454,14 +398,19 @@ namespace VNLib.Data.Caching.ObjectCache.Server private async Task WebsocketAcceptedAsync(WebSocketSession wss) { + WsUserState state = (WsUserState)wss.UserState!; + + //Notify peers of new connection + Peers.OnPeerConnected(state); + //Inc connected count Interlocked.Increment(ref _connectedClients); + //Register plugin exit token to cancel the connected socket - CancellationTokenRegistration reg = Pbase.UnloadToken.Register(wss.CancelAll); + CancellationTokenRegistration reg = this.GetPlugin().UnloadToken.Register(wss.CancelAll); + try { - WsUserState state = (wss.UserState as WsUserState)!; - //Init listener args from request FBMListenerSessionParams args = new() { @@ -473,12 +422,33 @@ namespace VNLib.Data.Caching.ObjectCache.Server HeaderEncoding = Helpers.DefaultEncoding, }; - //Listen for requests - await Store.ListenAsync(wss, args, state.SyncQueue); + //Check if the client is a peer node, if it is, subscribe to change events + if (!string.IsNullOrWhiteSpace(state.NodeId)) + { + //Get the event queue for the current node + AsyncQueue<ChangeEvent> queue = PubSubManager.Subscribe(state); + + try + { + //Begin listening for messages with a queue + await Store.ListenAsync(wss, args, queue); + } + finally + { + //ALAWYS Detatch listener + PubSubManager.Unsubscribe(state); + } + } + else + { + //Begin listening for messages without a queue + await Store.ListenAsync(wss, args, null); + } } catch (OperationCanceledException) { Log.Debug("Websocket connection was canceled"); + //Disconnect the socket await wss.CloseSocketOutputAsync(System.Net.WebSockets.WebSocketCloseStatus.NormalClosure, "unload", CancellationToken.None); } @@ -490,35 +460,68 @@ namespace VNLib.Data.Caching.ObjectCache.Server { //Dec connected count Interlocked.Decrement(ref _connectedClients); + //Unregister the reg.Unregister(); } + + //Notify monitor of disconnect + Peers.OnPeerDisconnected(state); + Log.Debug("Server websocket exited"); } - - private sealed class CacheStore : ICacheStore + + //Background worker to process event queue items + async Task IAsyncBackgroundWork.DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken) { - private readonly BlobCacheListener _cache; + const int accumulatorSize = 64; - public CacheStore(BlobCacheListener cache) + try { - _cache = cache; - } + //Accumulator for events + ChangeEvent[] accumulator = new ChangeEvent[accumulatorSize]; + int ptr = 0; - ValueTask ICacheStore.AddOrUpdateBlobAsync<T>(string objectId, string? alternateId, GetBodyDataCallback<T> bodyData, T state, CancellationToken token) - { - return _cache.Cache.AddOrUpdateObjectAsync(objectId, alternateId, bodyData, state, default, token); - } + //Listen for changes + while (true) + { + //Wait for next event + accumulator[ptr++] = await Store.EventQueue.DequeueAsync(exitToken); - void ICacheStore.Clear() + //try to accumulate more events until we can't anymore + while (Store.EventQueue.TryDequeue(out ChangeEvent? ev) && ptr < accumulatorSize) + { + accumulator[ptr++] = ev; + } + + //Publish all events to subscribers + PubSubManager.PublishMultiple(accumulator.AsSpan(0, ptr)); + + //Reset pointer + ptr = 0; + } + } + catch (OperationCanceledException) { - throw new NotImplementedException(); + //Normal exit + pluginLog.Debug("Change queue listener worker exited"); } + } + + private class WsUserState : ICachePeer + { + public int RecvBufferSize { get; init; } + public int MaxHeaderBufferSize { get; init; } + public int MaxMessageSize { get; init; } + public int MaxResponseBufferSize { get; init; } + public string? NodeId { get; init; } + public ICachePeerAdvertisment? Advertisment { get; init; } - ValueTask<bool> ICacheStore.DeleteItemAsync(string id, CancellationToken token) + public override string ToString() { - return _cache.Cache.DeleteObjectAsync(id, token); + return + $"{nameof(RecvBufferSize)}:{RecvBufferSize}, {nameof(MaxHeaderBufferSize)}: {MaxHeaderBufferSize}, {nameof(MaxMessageSize)}:{MaxMessageSize}, {nameof(MaxResponseBufferSize)}:{MaxResponseBufferSize}"; } } } diff --git a/plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs b/plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs new file mode 100644 index 0000000..670d624 --- /dev/null +++ b/plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs @@ -0,0 +1,129 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: ObjectCacheServer +* File: PeerDiscoveryEndpoint.cs +* +* PeerDiscoveryEndpoint.cs is part of ObjectCacheServer which is part of the larger +* VNLib collection of libraries and utilities. +* +* ObjectCacheServer is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* ObjectCacheServer is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Linq; +using System.Net; +using System.Text.Json; +using System.Threading.Tasks; + +using VNLib.Hashing; +using VNLib.Hashing.IdentityUtility; +using VNLib.Plugins; +using VNLib.Plugins.Essentials; +using VNLib.Plugins.Essentials.Endpoints; +using VNLib.Plugins.Essentials.Extensions; +using VNLib.Plugins.Extensions.Loading; +using VNLib.Data.Caching.Extensions; +using VNLib.Data.Caching.ObjectCache.Server.Distribution; + +namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints +{ + [ConfigurationName("discovery_endpoint")] + internal sealed class PeerDiscoveryEndpoint : UnprotectedWebEndpoint + { + private readonly IPeerMonitor PeerMonitor; + private readonly NodeConfig Config; + + public PeerDiscoveryEndpoint(PluginBase plugin, IConfigScope config) + { + string? path = config["path"].GetString(); + + InitPathAndLog(path, plugin.Log); + + //Get the peer monitor + PeerMonitor = plugin.GetOrCreateSingleton<CachePeerMonitor>(); + + //Get the node config + Config = plugin.GetOrCreateSingleton<NodeConfig>(); + } + + protected override async ValueTask<VfReturnType> GetAsync(HttpEntity entity) + { + //Get auth token + string? authToken = entity.Server.Headers[HttpRequestHeader.Authorization]; + + if(string.IsNullOrWhiteSpace(authToken)) + { + entity.CloseResponse(HttpStatusCode.Unauthorized); + return VfReturnType.VirtualSkip; + } + + string subject = string.Empty; + + //Parse auth token + using(JsonWebToken jwt = JsonWebToken.Parse(authToken)) + { + //try to verify against cache node first + if (!Config.Config.VerifyCache(jwt)) + { + //failed... + + //try to verify against client key + using ReadOnlyJsonWebKey clientPub = await Config.KeyStore.GetClientPublicKeyAsync(); + + if (!jwt.VerifyFromJwk(clientPub)) + { + //invalid token + entity.CloseResponse(HttpStatusCode.Unauthorized); + return VfReturnType.VirtualSkip; + } + } + + using JsonDocument payload = jwt.GetPayload(); + + subject = payload.RootElement.GetProperty("sub").GetString() ?? string.Empty; + } + + //Valid key, get peer list to send to client + ICachePeerAdvertisment[] peers = PeerMonitor.GetAllPeers() + .Where(static p => p.Advertisment != null) + .Select(static p => p.Advertisment!) + .ToArray(); + + //Build response jwt + using JsonWebToken response = new(); + + //set header from cache config + response.WriteHeader(Config.Config.GetJwtHeader()); + + response.InitPayloadClaim() + .AddClaim("iss", Config.Config.NodeId) + //Audience is the requestor id + .AddClaim("sub", subject) + .AddClaim("iat", entity.RequestedTimeUtc.ToUnixTimeSeconds()) + //Send all peers as a json array + .AddClaim("peers", peers) + .AddClaim("nonce", RandomHash.GetRandomBase32(24)) + .CommitClaims(); + + //Sign the response + Config.Config.SignJwt(response); + + //Send response to client + entity.CloseResponse(HttpStatusCode.OK, Net.Http.ContentType.Text, response.DataBuffer); + return VfReturnType.VirtualSkip; + } + } +} diff --git a/plugins/ObjectCacheServer/src/ICacheEventQueueManager.cs b/plugins/ObjectCacheServer/src/ICacheEventQueueManager.cs new file mode 100644 index 0000000..6b07000 --- /dev/null +++ b/plugins/ObjectCacheServer/src/ICacheEventQueueManager.cs @@ -0,0 +1,67 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: ObjectCacheServer +* File: ICacheEventQueueManager.cs +* +* ICacheEventQueueManager.cs is part of ObjectCacheServer which is part of the larger +* VNLib collection of libraries and utilities. +* +* ObjectCacheServer is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* ObjectCacheServer is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; + +using VNLib.Utils.Async; + + +namespace VNLib.Data.Caching.ObjectCache.Server +{ + /// <summary> + /// Represents a managment system for publishing and subscribing to cache change events + /// </summary> + internal interface ICacheEventQueueManager + { + /// <summary> + /// Publishes a change event to all subscribers + /// </summary> + /// <param name="change">The change event to publish</param> + void PublishSingle(ChangeEvent change); + + /// <summary> + /// Publishes multiple change events to all subscribers + /// </summary> + /// <param name="changes">The span of changes to publish to all subscribers</param> + void PublishMultiple(Span<ChangeEvent> changes); + + /// <summary> + /// Attatches a subscriber that will receive all published changes + /// </summary> + /// <param name="nodeId">The id of the node to get the queue for</param> + /// <returns>The initilaizes event queue for the single subscriber</returns> + AsyncQueue<ChangeEvent> Subscribe(ICachePeer peer); + + /// <summary> + /// Detatches a subscriber from the event queue + /// </summary> + /// <param name="nodeId">The id of the nede to detach</param> + void Unsubscribe(ICachePeer peer); + + /// <summary> + /// Purges all stale subcriber nodes + /// </summary> + void PurgeStaleSubscribers(); + } +} diff --git a/plugins/ObjectCacheServer/src/ICachePeer.cs b/plugins/ObjectCacheServer/src/ICachePeer.cs new file mode 100644 index 0000000..d374400 --- /dev/null +++ b/plugins/ObjectCacheServer/src/ICachePeer.cs @@ -0,0 +1,44 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: ObjectCacheServer +* File: ICachePeer.cs +* +* ICachePeer.cs is part of ObjectCacheServer which is part of the larger +* VNLib collection of libraries and utilities. +* +* ObjectCacheServer is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* ObjectCacheServer is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using VNLib.Data.Caching.Extensions; + +namespace VNLib.Data.Caching.ObjectCache.Server +{ + /// <summary> + /// Represents a fellow cache peer in the cluster + /// </summary> + internal interface ICachePeer + { + /// <summary> + /// The unique identifier of the node + /// </summary> + string NodeId { get; } + + /// <summary> + /// An optional signed advertisment message for other peers + /// </summary> + ICachePeerAdvertisment? Advertisment { get; } + } +} diff --git a/plugins/ObjectCacheServer/src/Endpoints/ICacheStore.cs b/plugins/ObjectCacheServer/src/ICacheStore.cs index f911af9..f911af9 100644 --- a/plugins/ObjectCacheServer/src/Endpoints/ICacheStore.cs +++ b/plugins/ObjectCacheServer/src/ICacheStore.cs diff --git a/plugins/ObjectCacheServer/src/NodeConfig.cs b/plugins/ObjectCacheServer/src/NodeConfig.cs new file mode 100644 index 0000000..614f0d6 --- /dev/null +++ b/plugins/ObjectCacheServer/src/NodeConfig.cs @@ -0,0 +1,237 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: ObjectCacheServer +* File: ObjectCacheServerEntry.cs +* +* ObjectCacheServerEntry.cs is part of ObjectCacheServer which is part of the larger +* VNLib collection of libraries and utilities. +* +* ObjectCacheServer is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* ObjectCacheServer is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Net; +using System.Linq; +using System.Net.Http; +using System.Text.Json; +using System.Threading; +using System.Net.Sockets; +using System.Threading.Tasks; +using System.Collections.Generic; +using System.Security.Cryptography; + +using VNLib.Plugins; +using VNLib.Utils; +using VNLib.Utils.Logging; +using VNLib.Utils.Extensions; +using VNLib.Hashing; +using VNLib.Hashing.IdentityUtility; +using VNLib.Data.Caching.Extensions; +using VNLib.Plugins.Extensions.Loading; + + +namespace VNLib.Data.Caching.ObjectCache.Server +{ + [ConfigurationName("cluster")] + internal sealed class NodeConfig : VnDisposeable, IAsyncConfigurable, IAsyncBackgroundWork, IBrokerHeartbeatNotifier + { + const string CacheConfigTemplate = +@" + Cluster Configuration: + Broker Address: {ba} + Heartbeat Timeout: {hb} + Node Id: {id} + TlsEndabled: {tls}, + Cache Endpoint: {ep} +"; + + public CacheNodeConfiguration Config { get; } + public CacheAuthKeyStore KeyStore { get; } + + private readonly ManualResetEventSlim hearbeatHandle; + private readonly TimeSpan _hearbeatTimeout; + + private string? _authToken; + + public NodeConfig(PluginBase plugin, IConfigScope config) + { + //Server id is just dns name for now + string nodeId = Dns.GetHostName(); + Config = new(); + + //Get the heartbeat interval + TimeSpan heartBeatDelayMs = config["heartbeat_timeout_sec"].GetTimeSpan(TimeParseType.Seconds); + + string brokerAddr = config["broker_address"].GetString() ?? throw new KeyNotFoundException("Missing required key 'broker_address' for config 'cluster'"); + + //Get the port of the primary webserver + int port; + bool usingTls; + { + JsonElement firstHost = plugin.HostConfig.GetProperty("virtual_hosts").EnumerateArray().First(); + + port = firstHost.GetProperty("interface") + .GetProperty("port") + .GetInt32(); + + //If the ssl element is present, ssl is enabled for the server + usingTls = firstHost.TryGetProperty("ssl", out _); + } + + //Get the cache endpoint config + IConfigScope cacheEpConfig = plugin.GetConfigForType<ConnectEndpoint>(); + + //The endpoint to advertise to cache clients that allows cache connections + UriBuilder endpoint = new(usingTls ? Uri.UriSchemeHttps : Uri.UriSchemeHttp, nodeId, port, cacheEpConfig["path"].GetString()); + + //Setup cache node config + Config.WithCacheEndpoint(endpoint.Uri) + .WithNodeId(nodeId) + .WithTls(usingTls) + .WithBroker(new(brokerAddr)); + + //Check if advertising is enabled + if(config.TryGetValue("advertise", out JsonElement adEl) && adEl.GetBoolean()) + { + Config.EnableAdvertisment(true, ""); + } + + //Init key store + KeyStore = new(plugin); + + //Init heartbeat handle unsiganled waiting for first heartbeat + hearbeatHandle = new(false); + + //Schedule heartbeat + _ = plugin.ObserveWork(this, 500); + + //log the config + plugin.Log.Information(CacheConfigTemplate, + brokerAddr, + heartBeatDelayMs, + nodeId, + usingTls, + endpoint.Uri); + } + + async Task IAsyncConfigurable.ConfigureServiceAsync(PluginBase plugin) + { + //Get cache private key for signing from the key store + ReadOnlyJsonWebKey signingKey = await KeyStore.GetCachePrivateAsync(); + + Config.WithSigningKey(signingKey); + + //Get broker public key for verifying from the key store + ReadOnlyJsonWebKey brokerKey = await KeyStore.GetBrokerPublicAsync(); + + Config.WithBrokerVerificationKey(brokerKey); + } + + protected override void Free() + { + //Dispose the heartbeat handle + hearbeatHandle.Dispose(); + + //cleanup keys + Config.SigningKey?.Dispose(); + Config.VerificationKey?.Dispose(); + Config.BrokerVerificationKey?.Dispose(); + } + + ///<inheritdoc/> + public void HearbeatReceived() + { + //Set the heartbeat handle as received + hearbeatHandle.Set(); + } + + ///<inheritdoc/> + public string? GetAuthToken() => _authToken; + + ///<inheritdoc/> + public Uri GetBrokerAddress() => Config.DiscoveryEndpoint!; + + ///<inheritdoc/> + public ReadOnlyJsonWebKey GetBrokerPublicKey() => Config.BrokerVerificationKey!; + + + /* + * Worker loop for registering with the broker and monitoring hearbeat requests + */ + async Task IAsyncBackgroundWork.DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken) + { + //Listen in loop + while (true) + { + try + { + //Regen the auth token before registering + _authToken = RandomHash.GetRandomBase32(32); + + pluginLog.Information("Registering with cache broker server with id {id}", Config.NodeId); + + //Register with the broker and pass the current auth token + await Config.RegisterWithBrokerAsync(_authToken); + + //Enter heartbeat loop + while (true) + { + //Wait for the heartbeat timeout + await Task.Delay(_hearbeatTimeout, exitToken); + + //Confrim the hearbeat was received within the timeout period + if (!hearbeatHandle.IsSet) + { + //If the heartbeat handle is not set, the heartbeat was not received, reg-register + pluginLog.Information("Broker missed hearbeat request"); + + //not received, break out of the heartbeat loop to re-register + break; + } + + //Reset the handle and continue the heartbeat loop + hearbeatHandle.Reset(); + } + + //Add random delay to prevent all nodes from re-registering at the same time + await Task.Delay(RandomNumberGenerator.GetInt32(1000, 5000), exitToken); + } + catch (OperationCanceledException) + { + pluginLog.Debug("Registration loop exited on unload"); + break; + } + catch (TimeoutException) + { + pluginLog.Warn("Failed to connect to cache broker server within the specified timeout period"); + } + catch (HttpRequestException re) when (re.InnerException is SocketException) + { + pluginLog.Warn("Cache broker is unavailable or network is unavailable"); + } + catch(HttpRequestException re) when (re.StatusCode.HasValue) + { + pluginLog.Warn("Failed to register with cache broker server, received status code {code}", re.StatusCode); + } + catch (Exception ex) + { + pluginLog.Warn("Exception occured in registraion loop: {ex}", ex!.Message); + } + } + } + + } +} diff --git a/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs b/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs index 2fa6220..c1a6ad2 100644 --- a/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs +++ b/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs @@ -27,7 +27,6 @@ using System.IO; using System.Net; using System.Linq; using System.Net.Http; -using System.Text.Json; using System.Threading; using System.Net.Sockets; using System.Threading.Tasks; @@ -38,7 +37,6 @@ using VNLib.Plugins; using VNLib.Utils.Memory; using VNLib.Utils.Logging; using VNLib.Utils.Memory.Diagnostics; -using VNLib.Hashing; using VNLib.Hashing.IdentityUtility; using VNLib.Data.Caching.Extensions; using static VNLib.Data.Caching.Constants; @@ -47,18 +45,64 @@ using VNLib.Net.Messaging.FBM.Client; using VNLib.Plugins.Cache.Broker.Endpoints; using VNLib.Plugins.Extensions.Loading; using VNLib.Plugins.Extensions.Loading.Routing; - +using VNLib.Data.Caching.ObjectCache.Server.Endpoints; namespace VNLib.Data.Caching.ObjectCache.Server { + sealed record class CacheAuthKeyStore(PluginBase Plugin) + { + public Task<ReadOnlyJsonWebKey> GetCachePublicAsync() + { + return Plugin.TryGetSecretAsync("cache_private_key").ToJsonWebKey(true); + } + + public Task<ReadOnlyJsonWebKey> GetCachePrivateAsync() + { + return Plugin.TryGetSecretAsync("cache_private_key").ToJsonWebKey(true); + } + + public Task<ReadOnlyJsonWebKey> GetBrokerPublicAsync() + { + return Plugin.TryGetSecretAsync("broker_public_key").ToJsonWebKey(true); + } + + public Task<ReadOnlyJsonWebKey> GetClientPublicKeyAsync() + { + return Plugin.TryGetSecretAsync("client_public_key").ToJsonWebKey(true); + } + } + + internal interface IBrokerHeartbeatNotifier + { + /// <summary> + /// Called when the heartbeat endpoint receives a heartbeat from the broker + /// </summary> + void HearbeatReceived(); + + /// <summary> + /// Gets the current auth token sent to the broker, which is expected to be sent back in the heartbeat + /// </summary> + /// <returns>The heartbeat auth token if set</returns> + string? GetAuthToken(); + + /// <summary> + /// Gets the address of the broker server + /// </summary> + /// <returns>The full address of the broker server to connect to</returns> + Uri GetBrokerAddress(); + + /// <summary> + /// Gets the public key of the broker server + /// </summary> + /// <returns>The broker's public key</returns> + ReadOnlyJsonWebKey GetBrokerPublicKey(); + } + public sealed class ObjectCacheServerEntry : PluginBase { public override string PluginName => "ObjectCache.Service"; - private readonly Lazy<IUnmangedHeap> _cacheHeap; - private readonly object ServerLock; - private readonly HashSet<ActiveServer> ListeningServers; - private readonly ManualResetEvent BrokerSyncHandle; + private readonly Lazy<IUnmangedHeap> _cacheHeap; /// <summary> /// Gets the shared heap for the plugin @@ -69,12 +113,6 @@ namespace VNLib.Data.Caching.ObjectCache.Server { //Init heap _cacheHeap = new Lazy<IUnmangedHeap>(InitializeHeap, LazyThreadSafetyMode.PublicationOnly); - - ServerLock = new(); - ListeningServers = new(); - - //Set sync handle - BrokerSyncHandle = new(false); } private IUnmangedHeap InitializeHeap() @@ -94,64 +132,30 @@ namespace VNLib.Data.Caching.ObjectCache.Server } - private string? BrokerHeartBeatToken; - - private void RemoveServer(ActiveServer server) - { - lock (ServerLock) - { - ListeningServers.Remove(server); - } - } - - private FBMClientConfig ClientConfig; - - protected override void OnLoad() { try { - IConfigScope clusterConf = this.GetConfig("cluster"); - - Uri brokerAddress = new(clusterConf["broker_address"].GetString() ?? throw new KeyNotFoundException("Missing required key 'broker_address' for config 'cluster'")); + //Setup Node config + NodeConfig nodeConf = this.GetOrCreateSingleton<NodeConfig>(); //Init connect endpoint ConnectEndpoint endpoint = this.Route<ConnectEndpoint>(); - //Get the cache store from the connection endpoint - ICacheStore store = endpoint.GetCacheStore(); + //Route the broker endpoint + this.Route<BrokerHeartBeatEndpoint>(); + + //Setup discovery endpoint + if(this.HasConfigForType<PeerDiscoveryEndpoint>()) + { + this.Route<PeerDiscoveryEndpoint>(); + } ulong maxByteSize = ((ulong)endpoint.CacheConfig.MaxCacheEntries * (ulong)endpoint.CacheConfig.BucketCount * (ulong)endpoint.CacheConfig.MaxMessageSize); //Log max memory usage Log.Debug("Maxium memory consumption {mx}Mb", maxByteSize / (ulong)(1024 * 1000)); - - //Setup broker and regitration - { - //Route the broker endpoint - BrokerHeartBeat brokerEp = new(() => BrokerHeartBeatToken!, BrokerSyncHandle, brokerAddress, this); - Route(brokerEp); - - //start registration - _ = this.ObserveWork(() => RegisterServerAsync(endpoint.Path), 200); - } - - //Setup cluster worker - { - //Get pre-configured fbm client config for caching - ClientConfig = FBMDataCacheExtensions.GetDefaultConfig(CacheHeap, endpoint.CacheConfig.MaxMessageSize / 2, default, this.IsDebug() ? Log : null); - - //Start Client runner - _ = this.ObserveWork(() => RunClientAsync(store, brokerAddress), 300); - } - - //Load a cache broker to the current server if the config is defined - { - if(this.HasConfigForType<BrokerRegistrationEndpoint>()) - { - this.Route<BrokerRegistrationEndpoint>(); - } - } + Log.Information("Plugin loaded"); } @@ -169,430 +173,12 @@ namespace VNLib.Data.Caching.ObjectCache.Server _cacheHeap.Value.Dispose(); } - //Dispose mre sync handle - BrokerSyncHandle.Dispose(); - Log.Information("Plugin unloaded"); } - #region Registration - - private async Task RegisterServerAsync(string connectPath) - { - try - { - //Get the broker config element - IConfigScope clusterConfig = this.GetConfig("cluster"); - - //Server id is just dns name for now - string serverId = Dns.GetHostName(); - - int heartBeatDelayMs = clusterConfig["heartbeat_timeout_sec"].GetInt32() * 1000; - - - //Get the port of the primary webserver - int port; - bool usingTls; - { - JsonElement firstHost = HostConfig.GetProperty("virtual_hosts").EnumerateArray().First(); - - port = firstHost.GetProperty("interface") - .GetProperty("port") - .GetInt32(); - - //If a certificate is specified, tls is enabled on the port - usingTls = firstHost.TryGetProperty("cert", out _); - } - - using BrokerRegistrationRequest request = new(); - { - string addr = clusterConfig["broker_address"].GetString() ?? throw new KeyNotFoundException("Missing required key 'broker_address' for config 'cluster'"); - - //Recover the certificate - ReadOnlyJsonWebKey cacheCert = await GetCachePrivate(); - - //Init url builder for payload, see if tls is enabled - Uri connectAddress = new UriBuilder(usingTls ? Uri.UriSchemeHttps : Uri.UriSchemeHttp, Dns.GetHostName(), port, connectPath).Uri; - - request.WithBroker(new(addr)) - .WithRegistrationAddress(connectAddress.ToString()) - .WithNodeId(serverId) - .WithSigningKey(cacheCert, true); - } - - while (true) - { - try - { - //Gen a random reg token before registering - BrokerHeartBeatToken = RandomHash.GetRandomHex(32); - - //Assign new hb token - request.WithHeartbeatToken(BrokerHeartBeatToken); - - Log.Information("Registering with cache broker {addr}, with node-id {id}", request.BrokerAddress, serverId); - - //Register with the broker - await FBMDataCacheExtensions.ResgisterWithBrokerAsync(request); - - Log.Debug("Successfully registered with cache broker"); - - /* - * Wait in a loop for the broker to send a keepalive - * request with the specified token. When the event - * is signaled the task will be completed - */ - while (true) - { - await Task.Delay(heartBeatDelayMs, UnloadToken); - - //Set the timeout to 0 to it will just check the status without blocking - if (!BrokerSyncHandle.WaitOne(0)) - { - //server miseed a keepalive event, time to break the loop and retry - Log.Debug("Broker missed a heartbeat request, attempting to re-register"); - break; - } - - //Reset the msr - BrokerSyncHandle.Reset(); - } - } - catch (TaskCanceledException) - { - throw; - } - catch (TimeoutException) - { - Log.Warn("Failed to connect to cache broker server within the specified timeout period"); - } - catch (HttpRequestException re) when (re.InnerException is SocketException) - { - Log.Warn("Cache broker is unavailable or network is unavailable"); - } - catch (Exception ex) - { - Log.Warn(ex, "Failed to update broker registration"); - } - - //Gen random ms delay - int randomMsDelay = RandomNumberGenerator.GetInt32(500, 2000); - //Delay - await Task.Delay(randomMsDelay, UnloadToken); - } - } - catch (KeyNotFoundException kne) - { - Log.Error("Missing required broker configuration variables {ke}", kne.Message); - } - catch (TaskCanceledException) - { - //Normal unload/exit - } - catch (Exception ex) - { - Log.Error(ex); - } - finally - { - BrokerHeartBeatToken = null; - } - Log.Debug("Registration worker exited"); - } - - #endregion - - #region Cluster - - private async Task<ReadOnlyJsonWebKey> GetCachePrivate() - { - return await this.TryGetSecretAsync("cache_private_key").ToJsonWebKey() ?? throw new KeyNotFoundException("Failed to load the cache private key"); - } - - private async Task<ReadOnlyJsonWebKey> GetBrokerPublic() - { - return await this.TryGetSecretAsync("broker_public_key").ToJsonWebKey() ?? throw new KeyNotFoundException("Failed to load the broker's public key"); - } - - - /// <summary> - /// Starts a self-contained process-long task to discover other cache servers - /// from a shared broker server - /// </summary> - /// <param name="cacheStore">The cache store to synchronize</param> - /// <param name="brokerAddress">The broker server's address</param> - /// <param name="serverId">The node-id of the current server</param> - /// <param name="clientConf">The configuration to use when initializing synchronization clients</param> - /// <returns>A task that resolves when the plugin unloads</returns> - private async Task RunClientAsync(ICacheStore cacheStore, Uri brokerAddress) - { - TimeSpan noServerDelay = TimeSpan.FromSeconds(10); - - //The node id is just the dns hostname of the current machine - string nodeId = Dns.GetHostName(); - - ListServerRequest listRequest = new(brokerAddress); - try - { - //Get the broker config element - IConfigScope clusterConf = this.GetConfig("cluster"); - - int serverCheckMs = clusterConf["update_interval_sec"].GetInt32() * 1000; - - //Setup signing and verification certificates - ReadOnlyJsonWebKey cacheSig = await GetCachePrivate(); - ReadOnlyJsonWebKey brokerPub = await GetBrokerPublic(); - - //Import certificates - listRequest.WithVerificationKey(brokerPub) - .WithSigningKey(cacheSig); - - //Main event loop - Log.Information("Begining cluster node discovery"); - - ILogProvider? debugLog = this.IsDebug() ? Log : null; - - while (true) - { - //Load the server list - ActiveServer[]? servers; - while (true) - { - try - { - debugLog?.Information("[CACHE] Requesting server list from broker"); - - //Get server list - servers = await FBMDataCacheExtensions.ListServersAsync(listRequest, UnloadToken); - - //Servers are loaded, so continue - break; - } - catch(HttpRequestException he) when(he.InnerException is SocketException) - { - Log.Warn("Failed to connect to cache broker, trying again"); - } - catch (TimeoutException) - { - Log.Warn("Failed to connect to cache broker server within the specified timeout period"); - } - catch (Exception ex) - { - Log.Warn(ex, "Failed to get server list from broker"); - } - - //Gen random ms delay - int randomMsDelay = RandomNumberGenerator.GetInt32(1000, 2000); - - //Delay - await Task.Delay(randomMsDelay, UnloadToken); - } - - if(servers == null || servers.Length == 0) - { - Log.Information("No cluster nodes found, retrying"); - //Delay - await Task.Delay(noServerDelay, UnloadToken); - continue; - } - - - //Lock on sever set while enumerating - lock (ServerLock) - { - //Select servers that are not the current server and are not already being monitored - IEnumerable<ActiveServer> serversToConnectTo = servers.Where(s => !nodeId.Equals(s.ServerId, StringComparison.OrdinalIgnoreCase)); - - //Connect to servers - foreach (ActiveServer server in serversToConnectTo) - { - //Make sure were not currently connected to the server - if (!ListeningServers.Contains(server)) - { - //Add the server to the set - ListeningServers.Add(server); - - //Run listener background task - _ = this.ObserveWork(() => RunSyncTaskAsync(server, cacheStore, nodeId)); - } - } - } - - //Delay until next check cycle - await Task.Delay(serverCheckMs, UnloadToken); - } - } - catch (FileNotFoundException) - { - Log.Error("Client/cluster private cluster key file was not found or could not be read"); - } - catch (KeyNotFoundException) - { - Log.Error("Missing required cluster configuration varables"); - } - catch (TaskCanceledException) - { - //normal exit/unload - } - catch (Exception ex) - { - Log.Error(ex); - } - finally - { - listRequest.Dispose(); - } - Log.Debug("Cluster sync worker exited"); - } - - private async Task RunSyncTaskAsync(ActiveServer server, ICacheStore cacheStore, string nodeId) - { - //Setup timeout for get operations to avoid deadlocks - TimeSpan getTimeout = TimeSpan.FromSeconds(30); - - //Setup client - FBMClient client = new(ClientConfig); - try - { - async Task UpdateRecordAsync(string objectId, string newId) - { - //Get request message - FBMRequest modRequest = client.RentRequest(); - try - { - //Set action as get/create - modRequest.WriteHeader(HeaderCommand.Action, Actions.Get); - //Set session-id header - modRequest.WriteHeader(ObjectId, string.IsNullOrWhiteSpace(newId) ? objectId : newId); - - //Make request - using FBMResponse response = await client.SendAsync(modRequest, getTimeout, UnloadToken); - - response.ThrowIfNotSet(); - - //Check response code - string status = response.Headers.First(static s => s.Header == HeaderCommand.Status).Value.ToString(); - if (ResponseCodes.Okay.Equals(status, StringComparison.Ordinal)) - { - //Update the record - await cacheStore.AddOrUpdateBlobAsync(objectId, newId, static (t) => t.ResponseBody, response); - Log.Debug("Updated object {id}", objectId); - } - else - { - Log.Warn("Object {id} was missing on the remote server", objectId); - } - } - finally - { - client.ReturnRequest(modRequest); - } - } - - { - //Sign and verify requests with the cache private key since we are a peer - ReadOnlyJsonWebKey cachePriv = await GetCachePrivate(); - - //Configure cache - client.GetCacheConfiguration() - .WithVerificationKey(cachePriv) - .WithSigningCertificate(cachePriv) - .WithNodeId(nodeId) //set nodeid since were listening for changes - .WithTls(false); - } - - Log.Information("Connecting to {server}...", server.ServerId); - - //Connect to the server - await client.ConnectToCacheAsync(server, UnloadToken); - - //Wroker task callback method - async Task BgWorkerAsync() - { - //Listen for changes - while (true) - { - //Wait for changes - WaitForChangeResult changedObject = await client.WaitForChangeAsync(UnloadToken); - - Log.Debug("Object changed {typ} {obj}", changedObject.Status, changedObject.CurrentId); - - switch (changedObject.Status) - { - case ResponseCodes.NotFound: - Log.Warn("Server cache not properly configured, worker exiting"); - return; - case "deleted": - //Delete the object from the store - await cacheStore.DeleteItemAsync(changedObject.CurrentId); - break; - case "modified": - //Reload the record from the store - await UpdateRecordAsync(changedObject.CurrentId, changedObject.NewId); - break; - } - } - } - - Log.Information("Connected to {server}, starting queue listeners", server.ServerId); - - //Start worker tasks - List<Task> workerTasks = new(); - for(int i = 0; i < Environment.ProcessorCount; i++) - { - workerTasks.Add(Task.Run(BgWorkerAsync)); - } - - //Wait for sync workers to exit - await Task.WhenAll(workerTasks); - } - catch (InvalidResponseException ie) - { - //See if the plugin is unloading - if (!UnloadToken.IsCancellationRequested) - { - Log.Debug("Server responded with invalid response packet, disconnected. reason {reason}", ie); - } - //Disconnect client gracefully - try - { - await client.DisconnectAsync(); - } - catch (Exception ex) - { - Log.Error(ex); - } - } - catch (OperationCanceledException) - { - //Plugin unloading, Try to disconnect - try - { - await client.DisconnectAsync(); - } - catch(Exception ex) - { - Log.Error(ex); - } - } - catch(Exception ex) - { - Log.Warn("Lost connection to server {h}, {m}", server.ServerId, ex); - } - finally - { - //Remove server from active list, since its been disconnected - RemoveServer(server); - client.Dispose(); - } - } - protected override void ProcessHostCommand(string cmd) { - Log.Debug(cmd); + throw new NotImplementedException(); } - - - #endregion } } |