From 4d8cfc10382105b0acbd94df93ad3d05ff91db54 Mon Sep 17 00:00:00 2001 From: vnugent Date: Wed, 6 Mar 2024 21:30:58 -0500 Subject: refactor: #2 Centralize server state, default discovery endpoints & more --- .../src/ApiModel/CacheSiteAdapter.cs | 19 +- .../src/Clustering/CacheClientConfiguration.cs | 11 +- .../src/Clustering/CacheNodeConfiguration.cs | 15 +- .../src/Clustering/INodeDiscoveryEnumerator.cs | 6 +- .../src/Clustering/NodeDiscoveryCollection.cs | 45 +-- .../src/FBMDataCacheExtensions.cs | 361 ++------------------- .../src/VNCacheClusterClient.cs | 78 +++++ .../src/VNCacheClusterManager.cs | 300 +++++++++++++++++ lib/VNLib.Data.Caching/src/ClientExtensions.cs | 2 +- .../src/BucketLocalManagerFactory.cs | 5 +- .../src/Cache/CacheConfiguration.cs | 53 --- .../src/Cache/CacheEventQueueManager.cs | 264 --------------- .../src/Cache/CacheListenerPubQueue.cs | 11 +- .../src/Cache/CacheMemoryConfiguration.cs | 58 ++++ plugins/ObjectCacheServer/src/Cache/CacheStore.cs | 73 +---- .../ObjectCacheServer/src/Cache/CacheSystemUtil.cs | 8 +- .../src/Cache/PeerEventQueueManager.cs | 248 ++++++++++++++ .../src/Clustering/CacheNodeReplicationMaanger.cs | 48 +-- .../src/Clustering/CachePeerMonitor.cs | 11 +- .../src/Clustering/PeerDiscoveryManager.cs | 93 +----- .../src/Endpoints/CacheNegotationManager.cs | 49 ++- .../src/Endpoints/ConnectEndpoint.cs | 91 +++--- .../src/Endpoints/PeerDiscoveryEndpoint.cs | 60 ++-- .../src/Endpoints/WellKnownEndpoint.cs | 4 +- plugins/ObjectCacheServer/src/NodeConfig.cs | 55 ++-- .../src/ObjectCacheServerEntry.cs | 45 +-- .../src/ObjectCacheSystemState.cs | 215 ++++++++++++ .../src/Clustering/ClusterNodeIndex.cs | 14 +- .../src/FBMCacheClient.cs | 45 +-- .../src/VnCacheClientConfig.cs | 12 +- 30 files changed, 1203 insertions(+), 1096 deletions(-) create mode 100644 lib/VNLib.Data.Caching.Extensions/src/VNCacheClusterClient.cs create mode 100644 lib/VNLib.Data.Caching.Extensions/src/VNCacheClusterManager.cs delete mode 100644 plugins/ObjectCacheServer/src/Cache/CacheConfiguration.cs delete mode 100644 plugins/ObjectCacheServer/src/Cache/CacheEventQueueManager.cs create mode 100644 plugins/ObjectCacheServer/src/Cache/CacheMemoryConfiguration.cs create mode 100644 plugins/ObjectCacheServer/src/Cache/PeerEventQueueManager.cs create mode 100644 plugins/ObjectCacheServer/src/ObjectCacheSystemState.cs diff --git a/lib/VNLib.Data.Caching.Extensions/src/ApiModel/CacheSiteAdapter.cs b/lib/VNLib.Data.Caching.Extensions/src/ApiModel/CacheSiteAdapter.cs index 99acfd5..b6add7d 100644 --- a/lib/VNLib.Data.Caching.Extensions/src/ApiModel/CacheSiteAdapter.cs +++ b/lib/VNLib.Data.Caching.Extensions/src/ApiModel/CacheSiteAdapter.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2023 Vaughn Nugent +* Copyright (c) 2024 Vaughn Nugent * * Library: VNLib * Package: VNLib.Data.Caching.Extensions @@ -22,6 +22,7 @@ * along with this program. If not, see https://www.gnu.org/licenses/. */ +using System; using System.Net; using System.Text; using System.Threading; @@ -39,6 +40,22 @@ namespace VNLib.Data.Caching.Extensions.ApiModel /// internal sealed class CacheSiteAdapter : RestSiteAdapterBase { + /* + * Lazy to defer errors for debuggong + */ + private static readonly Lazy _lazy = new(() => ConfigureAdapter(2)); + + internal static CacheSiteAdapter Instance => _lazy.Value; + + private static CacheSiteAdapter ConfigureAdapter(int maxClients) + { + CacheSiteAdapter adapter = new(maxClients); + //Configure the site endpoints + adapter.BuildEndpoints(ServiceEndpoints.Definition); + return adapter; + } + + protected override RestClientPool Pool { get; } public CacheSiteAdapter(int maxClients) diff --git a/lib/VNLib.Data.Caching.Extensions/src/Clustering/CacheClientConfiguration.cs b/lib/VNLib.Data.Caching.Extensions/src/Clustering/CacheClientConfiguration.cs index 5a15d33..263f41a 100644 --- a/lib/VNLib.Data.Caching.Extensions/src/Clustering/CacheClientConfiguration.cs +++ b/lib/VNLib.Data.Caching.Extensions/src/Clustering/CacheClientConfiguration.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2023 Vaughn Nugent +* Copyright (c) 2024 Vaughn Nugent * * Library: VNLib * Package: VNLib.Data.Caching.Extensions @@ -36,15 +36,10 @@ namespace VNLib.Data.Caching.Extensions.Clustering /// public class CacheClientConfiguration { - /// - /// Stores available cache servers to be used for discovery, and connections - /// - public INodeDiscoveryCollection NodeCollection { get; } = new NodeDiscoveryCollection(); - /// /// The authentication manager to use for signing and verifying messages to and from the cache servers /// - public ICacheAuthManager AuthManager { get; private set; } + public ICacheAuthManager AuthManager { get; private set; } = null!; /// /// The error handler to use for handling errors that occur during the discovery process @@ -89,7 +84,7 @@ namespace VNLib.Data.Caching.Extensions.Clustering public CacheClientConfiguration WithInitialPeers(IEnumerable peers) { //Check null - _ = peers ?? throw new ArgumentNullException(nameof(peers)); + ArgumentNullException.ThrowIfNull(peers); //Store peer array WellKnownNodes = peers.ToArray(); diff --git a/lib/VNLib.Data.Caching.Extensions/src/Clustering/CacheNodeConfiguration.cs b/lib/VNLib.Data.Caching.Extensions/src/Clustering/CacheNodeConfiguration.cs index 6b7ab48..c21ed05 100644 --- a/lib/VNLib.Data.Caching.Extensions/src/Clustering/CacheNodeConfiguration.cs +++ b/lib/VNLib.Data.Caching.Extensions/src/Clustering/CacheNodeConfiguration.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2023 Vaughn Nugent +* Copyright (c) 2024 Vaughn Nugent * * Library: VNLib * Package: VNLib.Data.Caching.Extensions @@ -23,6 +23,7 @@ */ using System; +using System.Runtime.CompilerServices; namespace VNLib.Data.Caching.Extensions.Clustering { @@ -85,8 +86,14 @@ namespace VNLib.Data.Caching.Extensions.Clustering return this; } + internal StrongBox NodeIdRef { get; } = new(string.Empty); + /// - public string NodeId { get; private set; } = null!; + public string NodeId + { + get => NodeIdRef.Value!; + private set => NodeIdRef.Value = value; + } /// /// Specifies the current server's cluster node id. If this @@ -99,10 +106,6 @@ namespace VNLib.Data.Caching.Extensions.Clustering public CacheNodeConfiguration WithNodeId(string nodeId) { NodeId = nodeId ?? throw new ArgumentNullException(nameof(nodeId)); - - //Update the node id in the node collection - (NodeCollection as NodeDiscoveryCollection)!.SetSelfId(nodeId); - return this; } diff --git a/lib/VNLib.Data.Caching.Extensions/src/Clustering/INodeDiscoveryEnumerator.cs b/lib/VNLib.Data.Caching.Extensions/src/Clustering/INodeDiscoveryEnumerator.cs index 677088a..e57e69b 100644 --- a/lib/VNLib.Data.Caching.Extensions/src/Clustering/INodeDiscoveryEnumerator.cs +++ b/lib/VNLib.Data.Caching.Extensions/src/Clustering/INodeDiscoveryEnumerator.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2023 Vaughn Nugent +* Copyright (c) 2024 Vaughn Nugent * * Library: VNLib * Package: VNLib.Data.Caching.Extensions @@ -28,7 +28,9 @@ using System.Collections.Generic; namespace VNLib.Data.Caching.Extensions.Clustering { /// - /// A custom enumerator for the node discovery process + /// A custom enumerator for the node discovery process. Simplifies the recursive processes + /// of discovering nodes in a cluster to a simple enumeration process. It allows for real-time + /// updates to the collection of discovered nodes as a union operation. /// public interface INodeDiscoveryEnumerator : IEnumerator { diff --git a/lib/VNLib.Data.Caching.Extensions/src/Clustering/NodeDiscoveryCollection.cs b/lib/VNLib.Data.Caching.Extensions/src/Clustering/NodeDiscoveryCollection.cs index b0e53e1..16b96a3 100644 --- a/lib/VNLib.Data.Caching.Extensions/src/Clustering/NodeDiscoveryCollection.cs +++ b/lib/VNLib.Data.Caching.Extensions/src/Clustering/NodeDiscoveryCollection.cs @@ -1,12 +1,12 @@ /* -* Copyright (c) 2023 Vaughn Nugent +* Copyright (c) 2024 Vaughn Nugent * * Library: VNLib * Package: VNLib.Data.Caching.Extensions * File: NodeDiscoveryCollection.cs * -* NodeDiscoveryCollection.cs is part of VNLib.Data.Caching.Extensions which is part of the larger -* VNLib collection of libraries and utilities. +* NodeDiscoveryCollection.cs is part of VNLib.Data.Caching.Extensions which is part +* of the larger VNLib collection of libraries and utilities. * * VNLib.Data.Caching.Extensions is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License as @@ -26,24 +26,18 @@ using System; using System.Linq; using System.Collections; using System.Collections.Generic; +using System.Runtime.CompilerServices; + +using VNLib.Utils.Extensions; namespace VNLib.Data.Caching.Extensions.Clustering { /// /// Represents a collection of available cache nodes from a discovery process /// - public sealed class NodeDiscoveryCollection : INodeDiscoveryCollection + public sealed class NodeDiscoveryCollection(StrongBox? selfId) : INodeDiscoveryCollection { - private string? _selfId; - private LinkedList _peers; - - /// - /// Initializes a new empty - /// - public NodeDiscoveryCollection() - { - _peers = new(); - } + private LinkedList _peers = new(); /// /// Manually adds nodes to the collection that were not discovered through the discovery process @@ -62,39 +56,34 @@ namespace VNLib.Data.Caching.Extensions.Clustering } /// - /// Sets the id of the current node, so it can be excluded from discovery + /// Removes a vector of nodes from the internal collection /// - /// The id of the current node to exclude - public void SetSelfId(string? selfId) => _selfId = selfId; + /// The vector containg nodes to remove from the collection + public void RemoveManualNodes(IEnumerable nodes) => nodes.ForEach(n => _peers.Remove(n)); /// - public INodeDiscoveryEnumerator BeginDiscovery() - { - return new NodeEnumerator(new(), _selfId); - } + public INodeDiscoveryEnumerator BeginDiscovery() => new NodeEnumerator(new(), selfId?.Value); /// public INodeDiscoveryEnumerator BeginDiscovery(IEnumerable initialPeers) { + ArgumentNullException.ThrowIfNull(initialPeers); + //Init new enumerator with the initial peers - return new NodeEnumerator(new(initialPeers), _selfId); + return new NodeEnumerator(new(initialPeers), selfId?.Value); } /// public void CompleteDiscovery(INodeDiscoveryEnumerator enumerator) { - _ = enumerator ?? throw new ArgumentNullException(nameof(enumerator)); + ArgumentNullException.ThrowIfNull(enumerator); //Capture all nodes from the enumerator and store them as our current peers _peers = (enumerator as NodeEnumerator)!.Peers; } /// - public CacheNodeAdvertisment[] GetAllNodes() - { - //Capture all current peers - return _peers.ToArray(); - } + public CacheNodeAdvertisment[] GetAllNodes() => _peers.ToArray(); private sealed record class NodeEnumerator(LinkedList Peers, string? SelfNodeId) : INodeDiscoveryEnumerator { diff --git a/lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs b/lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs index bd86461..47aadd9 100644 --- a/lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs +++ b/lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs @@ -31,7 +31,6 @@ using System.Threading; using System.Threading.Tasks; using System.Collections.Generic; using System.Security.Cryptography; -using System.Runtime.CompilerServices; using RestSharp; @@ -74,22 +73,7 @@ namespace VNLib.Data.Caching.Extensions /// The advertisment header for cache node discovery /// public const string X_NODE_DISCOVERY_HEADER = "X-Cache-Node-Discovery"; - - /* - * Lazy to defer errors for debuggong - */ - private static readonly Lazy SiteAdapter = new(() => ConfigureAdapter(2)); - - private static CacheSiteAdapter ConfigureAdapter(int maxClients) - { - CacheSiteAdapter adapter = new(maxClients); - //Configure the site endpoints - adapter.BuildEndpoints(ServiceEndpoints.Definition); - return adapter; - } - - private static readonly ConditionalWeakTable ClientCacheConfig = new(); - + /// /// Gets a preconfigured object caching /// protocl @@ -101,7 +85,7 @@ namespace VNLib.Data.Caching.Extensions /// A preconfigured for object caching public static FBMClientConfig GetDefaultConfig(IUnmangedHeap heap, int maxMessageSize, TimeSpan timeout = default, ILogProvider? debugLog = null) { - return GetDefaultConfig(new FallbackFBMMemoryManager(heap), maxMessageSize, timeout, debugLog); + return GetDefaultConfig(new SharedHeapFBMMemoryManager(heap), maxMessageSize, timeout, debugLog); } /// @@ -153,236 +137,22 @@ namespace VNLib.Data.Caching.Extensions { client.Config.DebugLog?.Debug("{debug}: {data}", "[CACHE]", message); } - - /// - /// Discovers ALL possible cache nodes itteritivley, first by collecting the configuration - /// from the initial peers. - /// This will make connections to all discoverable servers - /// - /// - /// A token to cancel the operation - /// - /// - /// - public static async Task DiscoverNodesAsync(this CacheClientConfiguration config, CancellationToken cancellation) - { - //Make sure at least one node defined - if(config?.WellKnownNodes == null || config.WellKnownNodes.Length == 0) - { - throw new ArgumentException("There must be at least one cache node defined in the client configuration"); - } - - //Get the initial advertisments that arent null - CacheNodeAdvertisment[] initialPeers = await ResolveWellKnownAsync(config, cancellation); - - if (initialPeers.Length == 0) - { - throw new CacheDiscoveryFailureException("There must be at least one available cache node to continue discovery"); - } - - await DiscoverNodesAsync(config, initialPeers, cancellation); - } /// - /// Resolves the initial well-known cache nodes into their advertisments + /// Gets the discovery manager for the current client configuration. Just a + /// convience method. /// - /// - /// A token to cancel the operation - /// An array of resolved nodes - public static async Task ResolveWellKnownAsync(this CacheClientConfiguration config, CancellationToken cancellation) - { - _ = config?.WellKnownNodes ?? throw new ArgumentNullException(nameof(config)); - - Task[] initialAdds = new Task[config.WellKnownNodes.Length]; - - //Discover initial advertisments from well-known addresses - for (int i = 0; i < config.WellKnownNodes.Length; i++) - { - initialAdds[i] = DiscoverNodeConfigAsync(config.WellKnownNodes[i], config, cancellation); - } - - //Wait for all initial adds to complete - await Task.WhenAll(initialAdds); - - //Get the initial advertisments that arent null - return initialAdds.Select(static x => x.Result!).Where(static s => s != null).ToArray(); - } + /// + /// The new instance around your config + public static VNCacheClusterManager GetDiscoveryManager(this CacheClientConfiguration conf) => new(conf); /// - /// Discovers ALL possible cache nodes itteritivley from the current collection of initial peers. - /// This will make connections to all discoverable servers and update the client configuration, with all - /// discovered peers + /// Converts the cache client configuration to a cluster client /// /// - /// Accepts an array of initial peers to override the endpoint discovery process - /// A token to cancel the operation - /// A task that completes when all nodes have been discovered - /// - /// - public static async Task DiscoverNodesAsync(this CacheClientConfiguration config, CacheNodeAdvertisment[] initialPeers, CancellationToken cancellation) - { - //Make sure at least one node defined - if (initialPeers == null || initialPeers.Length == 0) - { - throw new ArgumentException("There must be at least one initial peer"); - } - - //Get the discovery enumerator with the initial peers - INodeDiscoveryEnumerator enumerator = config.NodeCollection.BeginDiscovery(initialPeers); - - //Start the discovery process - await DiscoverNodesAsync(enumerator, config, config.ErrorHandler, cancellation); - - //Commit nodes - config.NodeCollection.CompleteDiscovery(enumerator); - } - - private static async Task DiscoverNodesAsync( - INodeDiscoveryEnumerator enumerator, - CacheClientConfiguration config, - ICacheDiscoveryErrorHandler? errHandler, - CancellationToken cancellation - ) - { - //Loop through servers - while (enumerator.MoveNext()) - { - //Make sure the node has a discovery endpoint - if (enumerator.Current.DiscoveryEndpoint == null) - { - //Skip this node - continue; - } - - /* - * We are allowed to save nodes that do not have a discovery endpoint, but we cannot - * discover nodes from them we can only use them as cache - */ - - //add a random delay to avoid spamming servers - await Task.Delay((int)Random.Shared.NextInt64(100, 500), cancellation); - - try - { - //Discover nodes from the current node - CacheNodeAdvertisment[]? nodes = await GetCacheNodesAsync(enumerator.Current, config, cancellation); - - if (nodes != null) - { - //Add nodes to the collection - enumerator.OnPeerDiscoveryComplete(nodes); - } - } - //Catch exceptions when an error handler is defined - catch(Exception ex) when (errHandler != null) - { - //Handle the error - errHandler.OnDiscoveryError(enumerator.Current, ex); - } - catch(Exception ex) - { - throw new CacheDiscoveryFailureException($"Failed to discovery peer node {enumerator.Current?.NodeId}, cannot continue", ex); - } - } - } - - private static async Task DiscoverNodeConfigAsync(Uri serverUri, CacheClientConfiguration config, CancellationToken cancellation) - { - try - { - GetConfigRequest req = new (serverUri, config); - - //Site adapter verifies response messages so we dont need to check on the response - byte[] data = await SiteAdapter.Value.ExecuteAsync(req, cancellation).AsBytes() - ?? throw new CacheDiscoveryFailureException($"No data returned from desired cache node"); - - //Response is jwt - using JsonWebToken responseJwt = JsonWebToken.ParseRaw(data); - - //The entire payload is just the single serialzed advertisment - using JsonDocument doc = responseJwt.GetPayload(); - - return doc.RootElement.GetProperty("sub").Deserialize(); - } - //Bypass cdfe when error handler is null - catch(CacheDiscoveryFailureException) when(config.ErrorHandler == null) - { - throw; - } - //Catch exceptions when an error handler is defined - catch (Exception ex) when (config.ErrorHandler != null) - { - //Handle the error - config.ErrorHandler.OnDiscoveryError(null!, ex); - return null; - } - catch (Exception ex) - { - throw new CacheDiscoveryFailureException("Failed to discover node configuration", ex); - } - } - - /// - /// Contacts the given server's discovery endpoint to discover a list of available - /// servers we can connect to - /// - /// An advertisment of a server to discover other nodes from - /// A token to cancel the operationS - /// The cache configuration object - /// The list of active servers - /// - /// - /// - public static async Task GetCacheNodesAsync(CacheNodeAdvertisment advert, CacheClientConfiguration config, CancellationToken cancellationToken = default) - { - _ = advert ?? throw new ArgumentNullException(nameof(advert)); - _ = config ?? throw new ArgumentNullException(nameof(config)); - _ = advert.DiscoveryEndpoint ?? throw new ArgumentException("Advertisment does not expose an advertisment endpoint"); - - DiscoveryRequest req = new (advert.DiscoveryEndpoint, config); - - //Site adapter verifies response messages so we dont need to check on the response - byte[] data = await SiteAdapter.Value.ExecuteAsync(req, cancellationToken).AsBytes() - ?? throw new InvalidOperationException($"No data returned from node {advert.NodeId}"); - - //Response is jwt - using JsonWebToken responseJwt = JsonWebToken.ParseRaw(data); - - using JsonDocument doc = responseJwt.GetPayload(); - return doc.RootElement.GetProperty("peers").Deserialize(); - } - - /// - /// Allows for configuration of an - /// for a connection to a cache server - /// - /// - /// A fluent api configuration builder for the current client - public static CacheClientConfiguration GetCacheConfiguration(this FBMClientFactory client) => ClientCacheConfig.GetOrCreateValue(client); - - /// - /// Explicitly set the client cache configuration for the current client - /// - /// - /// The cache node configuration - /// The config instance - public static CacheClientConfiguration SetCacheConfiguration(this FBMClientFactory client, CacheClientConfiguration config) - { - ClientCacheConfig.AddOrUpdate(client, config); - return config; - } - - /// - /// Explicitly set the cache node configuration for the current client - /// - /// - /// The cache node configuration - /// The config instance - public static CacheNodeConfiguration SetCacheConfiguration(this FBMClientFactory client, CacheNodeConfiguration nodeConfig) - { - ClientCacheConfig.AddOrUpdate(client, nodeConfig); - return nodeConfig; - } + /// The FBM client factory instance to use + /// The new cluster client instance + public static VNCacheClusterClient ToClusterClient(this CacheClientConfiguration config, FBMClientFactory factory) => new(config, factory); /// /// Waits for the client to disconnect from the server while observing @@ -396,6 +166,8 @@ namespace VNLib.Data.Caching.Extensions /// A task that complets when the connecion has been closed successfully public static async Task WaitForExitAsync(this FBMClient client, CancellationToken token = default) { + ArgumentNullException.ThrowIfNull(client); + client.LogDebug("Waiting for cache client to exit"); //Get task for cancellation @@ -419,88 +191,6 @@ namespace VNLib.Data.Caching.Extensions } } - /// - /// Discovers all available nodes for the current client config - /// - /// - /// A token to cancel the operation - /// A task that completes when all nodes have been discovered - public static Task DiscoverAvailableNodesAsync(this FBMClientFactory client, CancellationToken cancellation = default) - { - //Get stored config - CacheClientConfiguration conf = ClientCacheConfig.GetOrCreateValue(client); - - //Discover all nodes - return conf.DiscoverNodesAsync(cancellation); - } - - /// - /// Connects to a random server from the servers discovered during a cache server discovery - /// - /// - /// A token to cancel the operation - /// The server that the connection was made with - /// - /// - /// - /// - /// - /// - public static async Task ConnectToRandomCacheAsync(this FBMClientFactory client, CancellationToken cancellation = default) - { - //Get stored config - CacheClientConfiguration conf = ClientCacheConfig.GetOrCreateValue(client); - - //Get all available nodes, or at least the initial peers - CacheNodeAdvertisment[]? adverts = conf.NodeCollection.GetAllNodes() ?? throw new ArgumentException("No cache nodes discovered, cannot connect"); - - //Select random node from all available nodes - CacheNodeAdvertisment randomServer = adverts.SelectRandom(); - - //Connect to the random server - await ConnectToCacheAsync(client, randomServer, cancellation); - - //Return the random server we connected to - return randomServer; - } - - /// - /// Connects to the specified server on the configured cache client - /// - /// - /// The server to connect to - /// A token to cancel the operation - /// A task that resolves when the client is connected to the cache server - /// - /// - /// - /// - /// - /// - public static async Task ConnectToCacheAsync(this FBMClientFactory factory, CacheNodeAdvertisment server, CancellationToken token = default) - { - _ = factory ?? throw new ArgumentNullException(nameof(factory)); - _ = server ?? throw new ArgumentNullException(nameof(server)); - - //Get stored config - CacheClientConfiguration conf = ClientCacheConfig.GetOrCreateValue(factory); - - //Create new client - FBMClient client = factory.CreateClient(); - - try - { - //Connect to server (no server id because client not replication server) - await ConnectToCacheAsync(client, conf, server, token); - return client; - } - catch - { - client.Dispose(); - throw; - } - } - /// /// Connects to the specified server on the configured cache client /// @@ -517,9 +207,9 @@ namespace VNLib.Data.Caching.Extensions /// public static Task ConnectToCacheAsync(this FBMClient client, CacheNodeAdvertisment server, CacheClientConfiguration explicitConfig, CancellationToken token = default) { - _ = client ?? throw new ArgumentNullException(nameof(client)); - _ = server ?? throw new ArgumentNullException(nameof(server)); - + ArgumentNullException.ThrowIfNull(client); + ArgumentNullException.ThrowIfNull(server); + //Connect to server (no server id because client not replication server) return ConnectToCacheAsync(client, explicitConfig, server, token); } @@ -546,7 +236,7 @@ namespace VNLib.Data.Caching.Extensions NegotationRequest req = new(server.ConnectEndpoint, config); //Exec negotiation - RestResponse response = await SiteAdapter.Value.ExecuteAsync(req, token); + RestResponse response = await CacheSiteAdapter.Instance.ExecuteAsync(req, token); /* * JWT will already be veified by the endpoint adapter, so we @@ -582,6 +272,12 @@ namespace VNLib.Data.Caching.Extensions Scheme = config.UseTls ? "wss://" : "ws://" }; + //if the server is specifying https urls, then attempt to upgrade to wss + if (server.ConnectEndpoint.Scheme == Uri.UriSchemeHttps) + { + uriBuilder.Scheme = "wss://"; + } + //Connect async await client.ConnectAsync(uriBuilder.Uri, token); } @@ -658,7 +354,7 @@ namespace VNLib.Data.Caching.Extensions /// public static bool VerifyUpgradeToken(this ICacheAuthManager man, string signature, string token, bool isPeer) { - _ = man ?? throw new ArgumentNullException(nameof(man)); + ArgumentNullException.ThrowIfNull(man); //get the hash of the token byte[] hash = ManagedHash.ComputeHash(token, HashAlg.SHA256); @@ -704,8 +400,15 @@ namespace VNLib.Data.Caching.Extensions /// The advertisment message to verify /// The advertisment message if successfully verified, or null otherwise /// - public static CacheNodeAdvertisment? VerifyPeerAdvertisment(this ICacheAuthManager config, string message) + public static CacheNodeAdvertisment? VerifyPeerAdvertisment(this ICacheAuthManager config, string? message) { + ArgumentNullException.ThrowIfNull(config); + + if (string.IsNullOrWhiteSpace(message)) + { + return null; + } + using JsonWebToken jwt = JsonWebToken.Parse(message); //Verify the signature diff --git a/lib/VNLib.Data.Caching.Extensions/src/VNCacheClusterClient.cs b/lib/VNLib.Data.Caching.Extensions/src/VNCacheClusterClient.cs new file mode 100644 index 0000000..050e1a3 --- /dev/null +++ b/lib/VNLib.Data.Caching.Extensions/src/VNCacheClusterClient.cs @@ -0,0 +1,78 @@ +/* +* Copyright (c) 2024 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Data.Caching.Extensions +* File: VNCacheClusterClient.cs +* +* VNCacheClusterClient.cs is part of VNLib.Data.Caching.Extensions which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Data.Caching.Extensions is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Data.Caching.Extensions is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Security; +using System.Threading; +using System.Threading.Tasks; + +using VNLib.Net.Messaging.FBM; +using VNLib.Net.Messaging.FBM.Client; +using VNLib.Data.Caching.Extensions.Clustering; + +namespace VNLib.Data.Caching.Extensions +{ + /// + /// Manages client connections to cluster nodes with discovery from + /// instance. + /// + /// The client configuration to use when discovering or connecting to cache nodes + /// The fbm client factory instance + public class VNCacheClusterClient(CacheClientConfiguration config, FBMClientFactory factory) + : VNCacheClusterManager(config) + { + + /// + /// Connects to the specified server on the configured cache client + /// + /// + /// The server to connect to + /// A token to cancel the operation + /// A task that resolves when the client is connected to the cache server + /// + /// + /// + /// + /// + /// + public async Task ConnectToCacheAsync(CacheNodeAdvertisment server, CancellationToken token = default) + { + ArgumentNullException.ThrowIfNull(server); + + FBMClient client = factory.CreateClient(); + + try + { + //Connect to server (no server id because client not replication server) + await client.ConnectToCacheAsync(server, config, token); + return client; + } + catch + { + client.Dispose(); + throw; + } + } + } +} diff --git a/lib/VNLib.Data.Caching.Extensions/src/VNCacheClusterManager.cs b/lib/VNLib.Data.Caching.Extensions/src/VNCacheClusterManager.cs new file mode 100644 index 0000000..4a1a6bd --- /dev/null +++ b/lib/VNLib.Data.Caching.Extensions/src/VNCacheClusterManager.cs @@ -0,0 +1,300 @@ +/* +* Copyright (c) 2024 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Data.Caching.Extensions +* File: VNCacheClusterManager.cs +* +* VNCacheClusterManager.cs is part of VNLib.Data.Caching.Extensions +* which is part of the larger VNLib collection of libraries and utilities. +* +* VNLib.Data.Caching.Extensions is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Data.Caching.Extensions is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Linq; +using System.Security; +using System.Text.Json; +using System.Threading; +using System.Threading.Tasks; +using System.Collections.Generic; + +using RestSharp; + +using VNLib.Hashing.IdentityUtility; +using VNLib.Net.Rest.Client.Construction; +using VNLib.Data.Caching.Extensions.ApiModel; +using VNLib.Data.Caching.Extensions.Clustering; + +namespace VNLib.Data.Caching.Extensions +{ + + /// + /// A VNCache cluster client discovery maanger. Used to simplify the discovery + /// of cache nodes + /// + /// The client configuration instance + public class VNCacheClusterManager(CacheClientConfiguration config) + { + /// + /// The internal collection of discovered nodes + /// + protected NodeDiscoveryCollection NodeCollection { get; } = GetNodeCollection(config); + + /// + /// Gets the collection of discovered nodes within the manager + /// + public INodeDiscoveryCollection DiscoveredNodes => NodeCollection; + + /// + /// The underlying instance + /// + public CacheClientConfiguration Config => config; + + /// + /// Adds an array of nodes manually to the collection of discovered cluster nodes + /// + /// + public void AddManualNodes(params CacheNodeAdvertisment[] nodes) => AddManualNodes(nodes.AsEnumerable()); + + /// + /// Adds an array of nodes manually to the collection of discovered cluster nodes + /// + /// + public void AddManualNodes(IEnumerable nodes) => NodeCollection.AddManualNodes(nodes); + + /// + /// Removes an array of nodes manually from the collection of discovered cluster nodes + /// + /// + public void RemoveManualNodes(params CacheNodeAdvertisment[] nodes) => RemoveManualNodes(nodes.AsEnumerable()); + + /// + /// Removes an array of nodes manually from the collection of discovered cluster nodes + /// + /// + public void RemoveManualNodes(IEnumerable nodes) => NodeCollection.RemoveManualNodes(nodes); + + /// + /// Resolves the initial well-known cache nodes into their advertisments + /// + /// A token to cancel the operation + /// An array of resolved nodes + public async Task ResolveWellKnownAsync(CancellationToken cancellation) + { + Task[] initialAdds = new Task[config.WellKnownNodes.Length]; + + //Discover initial advertisments from well-known addresses + for (int i = 0; i < config.WellKnownNodes.Length; i++) + { + initialAdds[i] = DiscoverNodeConfigAsync(config.WellKnownNodes[i], cancellation); + } + + //Wait for all initial adds to complete + await Task.WhenAll(initialAdds); + + //Get the initial advertisments that arent null + return initialAdds.Select(static x => x.Result!).Where(static s => s != null).ToArray(); + } + + /// + /// Discovers ALL possible cache nodes itteritivley, first by collecting the configuration + /// from the initial peers. + /// This will make connections to all discoverable servers + /// + /// A token to cancel the operation + /// + /// + /// + /// + /// This method simply combines the and + /// methods into a single operation + /// + public async Task DiscoverNodesAsync(CancellationToken cancellation) + { + //Make sure at least one node defined + if (config?.WellKnownNodes == null || config.WellKnownNodes.Length == 0) + { + throw new ArgumentException("There must be at least one cache node defined in the client configuration"); + } + + /* + * Connect to well-known nodes from the client configuration to discovery its layout. + * + */ + CacheNodeAdvertisment[] initialPeers = await ResolveWellKnownAsync(cancellation); + + if (initialPeers.Length == 0) + { + throw new CacheDiscoveryFailureException("There must be at least one available cache node to continue discovery"); + } + + await DiscoverNodesAsync(initialPeers, cancellation); + } + + /// + /// Discovers ALL possible cache nodes itteritivley from the current collection of initial peers. + /// This will make connections to all discoverable servers and update the client configuration, with all + /// discovered peers + /// + /// Accepts an array of initial peers to override the endpoint discovery process + /// A token to cancel the operation + /// A task that completes when all nodes have been discovered + /// + /// + public async Task DiscoverNodesAsync(CacheNodeAdvertisment[] initialPeers, CancellationToken cancellation) + { + //Make sure at least one node defined + ArgumentNullException.ThrowIfNull(initialPeers); + ArgumentOutOfRangeException.ThrowIfZero(initialPeers.Length); + + //Get the discovery enumerator with the initial peers + using INodeDiscoveryEnumerator enumerator = NodeCollection.BeginDiscovery(initialPeers); + + //Start the discovery process + await DiscoverNodesAsync(enumerator, config, config.ErrorHandler, cancellation); + + //Commit discovered nodes to stored node collection + NodeCollection.CompleteDiscovery(enumerator); + } + + private static async Task DiscoverNodesAsync( + INodeDiscoveryEnumerator enumerator, + CacheClientConfiguration config, + ICacheDiscoveryErrorHandler? errHandler, + CancellationToken cancellation + ) + { + //Loop through servers + while (enumerator.MoveNext()) + { + //Make sure the node has a discovery endpoint + if (enumerator.Current.DiscoveryEndpoint == null) + { + //Skip this node + continue; + } + + /* + * We are allowed to save nodes that do not have a discovery endpoint, but we cannot + * discover nodes from them we can only use them as cache + */ + + //add a random delay to avoid spamming servers + await Task.Delay((int)Random.Shared.NextInt64(100, 500), cancellation); + + try + { + //Discover nodes from the current node + CacheNodeAdvertisment[]? nodes = await GetCacheNodesAsync(enumerator.Current, config, cancellation); + + if (nodes != null) + { + //Add nodes to the collection + enumerator.OnPeerDiscoveryComplete(nodes); + } + } + //Catch exceptions when an error handler is defined + catch (Exception ex) when (errHandler != null) + { + //Handle the error + errHandler.OnDiscoveryError(enumerator.Current, ex); + } + catch (Exception ex) + { + throw new CacheDiscoveryFailureException($"Failed to discovery peer node {enumerator.Current?.NodeId}, cannot continue", ex); + } + } + } + + /// + /// Contacts the given server's discovery endpoint to discover a list of available + /// servers we can connect to + /// + /// An advertisment of a server to discover other nodes from + /// A token to cancel the operationS + /// The cache configuration object + /// The list of active servers + /// + /// + /// + public static async Task GetCacheNodesAsync(CacheNodeAdvertisment advert, CacheClientConfiguration config, CancellationToken cancellationToken = default) + { + ArgumentNullException.ThrowIfNull(advert); + ArgumentNullException.ThrowIfNull(config); + ArgumentNullException.ThrowIfNull(advert.DiscoveryEndpoint, nameof(advert.DiscoveryEndpoint)); + + DiscoveryRequest req = new (advert.DiscoveryEndpoint, config); + + //Site adapter verifies response messages so we dont need to check on the response + byte[] data = await CacheSiteAdapter.Instance.ExecuteAsync(req, cancellationToken).AsBytes() + ?? throw new InvalidOperationException($"No data returned from node {advert.NodeId}"); + + //Response is jwt + using JsonWebToken responseJwt = JsonWebToken.ParseRaw(data); + + using JsonDocument doc = responseJwt.GetPayload(); + return doc.RootElement.GetProperty("peers").Deserialize(); + } + + + /* + * This method will connect to a given well-known (cache config endpoint) and discover the + * servers configuration (endpoint config) + * + * This function exists so clients only need a single endpoint to connect to, and the server + * will return it's signed configuration data (including cluster network information) + */ + private async Task DiscoverNodeConfigAsync(Uri serverUri, CancellationToken cancellation) + { + try + { + GetConfigRequest req = new (serverUri, config); + + //Site adapter verifies response messages so we dont need to check on the response + byte[] data = await CacheSiteAdapter.Instance.ExecuteAsync(req, cancellation).AsBytes() + ?? throw new CacheDiscoveryFailureException($"No data returned from desired cache node"); + + //Response is jwt + using JsonWebToken responseJwt = JsonWebToken.ParseRaw(data); + + //The entire payload is just the single serialzed advertisment + using JsonDocument doc = responseJwt.GetPayload(); + + return doc.RootElement.GetProperty("sub").Deserialize(); + } + //Bypass cdfe when error handler is null (avoid nesting)` + catch (CacheDiscoveryFailureException) when (config.ErrorHandler == null) + { + throw; + } + //Catch exceptions when an error handler is defined + catch (Exception ex) when (config.ErrorHandler != null) + { + //Handle the error + config.ErrorHandler.OnDiscoveryError(null!, ex); + return null; + } + catch (Exception ex) + { + throw new CacheDiscoveryFailureException("Failed to discover node configuration", ex); + } + } + + private static NodeDiscoveryCollection GetNodeCollection(CacheClientConfiguration config) + { + return config is CacheNodeConfiguration cnc ? new (cnc.NodeIdRef!) : new (null); + } + } +} diff --git a/lib/VNLib.Data.Caching/src/ClientExtensions.cs b/lib/VNLib.Data.Caching/src/ClientExtensions.cs index f0eee06..e0aa744 100644 --- a/lib/VNLib.Data.Caching/src/ClientExtensions.cs +++ b/lib/VNLib.Data.Caching/src/ClientExtensions.cs @@ -451,7 +451,7 @@ namespace VNLib.Data.Caching /// /// The instance to store change event data to /// A token to cancel the deuque operation - /// A that contains information about the modified element + /// A task that completes when a change has occured /// /// public static async Task WaitForChangeAsync(this FBMClient client, WaitForChangeResult change, CancellationToken cancellationToken = default) diff --git a/plugins/ObjectCacheServer/src/BucketLocalManagerFactory.cs b/plugins/ObjectCacheServer/src/BucketLocalManagerFactory.cs index e7fa3e1..944aa4b 100644 --- a/plugins/ObjectCacheServer/src/BucketLocalManagerFactory.cs +++ b/plugins/ObjectCacheServer/src/BucketLocalManagerFactory.cs @@ -33,11 +33,10 @@ using System.Runtime.CompilerServices; using VNLib.Plugins; using VNLib.Utils; using VNLib.Utils.Memory; -using VNLib.Utils.Extensions; -using VNLib.Plugins.Extensions.Loading; using VNLib.Utils.Memory.Diagnostics; using VNLib.Utils.Logging; - +using VNLib.Utils.Extensions; +using VNLib.Plugins.Extensions.Loading; /* * How bucket local memory works: * diff --git a/plugins/ObjectCacheServer/src/Cache/CacheConfiguration.cs b/plugins/ObjectCacheServer/src/Cache/CacheConfiguration.cs deleted file mode 100644 index bd15d24..0000000 --- a/plugins/ObjectCacheServer/src/Cache/CacheConfiguration.cs +++ /dev/null @@ -1,53 +0,0 @@ -/* -* Copyright (c) 2023 Vaughn Nugent -* -* Library: VNLib -* Package: ObjectCacheServer -* File: CacheConfiguration.cs -* -* CacheConfiguration.cs is part of ObjectCacheServer which is part of the larger -* VNLib collection of libraries and utilities. -* -* ObjectCacheServer is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License as -* published by the Free Software Foundation, either version 3 of the -* License, or (at your option) any later version. -* -* ObjectCacheServer is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see https://www.gnu.org/licenses/. -*/ - -using System.Text.Json.Serialization; - -namespace VNLib.Data.Caching.ObjectCache.Server.Cache -{ - internal sealed class CacheConfiguration - { - [JsonPropertyName("buffer_recv_max")] - public int MaxRecvBufferSize { get; set; } = 1000 * 1024; - [JsonPropertyName("buffer_recv_min")] - public int MinRecvBufferSize { get; set; } = 8 * 1024; - - - [JsonPropertyName("buffer_header_max")] - public int MaxHeaderBufferSize { get; set; } = 2 * 1024; - [JsonPropertyName("buffer_header_min")] - public int MinHeaderBufferSize { get; set; } = 128; - - - [JsonPropertyName("max_message_size")] - public int MaxMessageSize { get; set; } = 1000 * 1024; - - - [JsonPropertyName("max_cache")] - public uint MaxCacheEntries { get; set; } = 10000; - - [JsonPropertyName("buckets")] - public uint BucketCount { get; set; } = 10; - } -} diff --git a/plugins/ObjectCacheServer/src/Cache/CacheEventQueueManager.cs b/plugins/ObjectCacheServer/src/Cache/CacheEventQueueManager.cs deleted file mode 100644 index e3c613d..0000000 --- a/plugins/ObjectCacheServer/src/Cache/CacheEventQueueManager.cs +++ /dev/null @@ -1,264 +0,0 @@ -/* -* Copyright (c) 2023 Vaughn Nugent -* -* Library: VNLib -* Package: ObjectCacheServer -* File: CacheEventQueueManager.cs -* -* CacheEventQueueManager.cs is part of ObjectCacheServer which is -* part of the larger VNLib collection of libraries and utilities. -* -* ObjectCacheServer is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License as -* published by the Free Software Foundation, either version 3 of the -* License, or (at your option) any later version. -* -* ObjectCacheServer is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see https://www.gnu.org/licenses/. -*/ - -using System; -using System.Linq; -using System.Threading; -using System.Threading.Tasks; -using System.Threading.Channels; -using System.Collections.Generic; - -using VNLib.Plugins; -using VNLib.Utils.Async; -using VNLib.Utils.Logging; -using VNLib.Plugins.Extensions.Loading; -using VNLib.Plugins.Extensions.Loading.Events; - - -namespace VNLib.Data.Caching.ObjectCache.Server.Cache -{ - internal sealed class CacheEventQueueManager : ICacheEventQueueManager, IDisposable, IIntervalScheduleable - { - private readonly int MaxQueueDepth; - - private readonly object SubLock; - private readonly LinkedList Subscribers; - - private readonly object StoreLock; - private readonly Dictionary QueueStore; - - - public CacheEventQueueManager(PluginBase plugin) - { - //Get node config - NodeConfig config = plugin.GetOrCreateSingleton(); - - //Get max queue depth - MaxQueueDepth = config.MaxQueueDepth; - - /* - * Schedule purge interval to clean up stale queues - */ - plugin.ScheduleInterval(this, config.EventQueuePurgeInterval); - - SubLock = new(); - Subscribers = new(); - - StoreLock = new(); - QueueStore = new(StringComparer.OrdinalIgnoreCase); - } - - /// - public IPeerEventQueue Subscribe(ICachePeer peer) - { - NodeQueue? nq; - - bool isNew = false; - - //Enter sync lock - lock (StoreLock) - { - //Try to recover the queue for the node - if (!QueueStore.TryGetValue(peer.NodeId, out nq)) - { - //Create new queue - nq = new(peer.NodeId, MaxQueueDepth); - QueueStore.Add(peer.NodeId, nq); - isNew = true; - } - - //Increment listener count - nq.Listeners++; - } - - //Publish new peer to subscribers list - if (isNew) - { - lock (SubLock) - { - //Add peer to subscribers list - Subscribers.AddLast(nq); - } - } - - //Return the node's queue - return nq; - } - - /// - public void Unsubscribe(ICachePeer peer) - { - //Detach a listener for a node - lock (StoreLock) - { - //Get the queue and decrement the listener count - NodeQueue nq = QueueStore[peer.NodeId]; - nq.Listeners--; - } - } - - /// - public void PublishSingle(ChangeEvent change) - { - //Wait to enter the sub lock - lock (SubLock) - { - //Loop through ll the fast way - LinkedListNode? q = Subscribers.First; - - while (q != null) - { - //Pub single event node - q.Value.PublishChange(change); - - //Get next queue - q = q.Next; - } - } - } - - /// - public void PublishMultiple(Span changes) - { - //Wait to enter the sub lock - lock (SubLock) - { - //Loop through ll the fast way - LinkedListNode? q = Subscribers.First; - - while (q != null) - { - //Publish multiple - q.Value.PublishChanges(changes); - - //Get next queue - q = q.Next; - } - } - } - - /// - public void PurgeStaleSubscribers() - { - //Enter locks - lock (SubLock) - { - lock (StoreLock) - { - //Get all stale queues (queues without listeners) - NodeQueue[] staleQueues = QueueStore.Values.Where(static nq => nq.Listeners == 0).ToArray(); - - foreach (NodeQueue nq in staleQueues) - { - //Remove from store - QueueStore.Remove(nq.NodeId); - - //remove from subscribers - Subscribers.Remove(nq); - } - } - } - } - - //Interval to purge stale subscribers - Task IIntervalScheduleable.OnIntervalAsync(ILogProvider log, CancellationToken cancellationToken) - { - log.Debug("Purging stale peer event queues"); - - PurgeStaleSubscribers(); - - return Task.CompletedTask; - } - - void IDisposable.Dispose() - { - QueueStore.Clear(); - Subscribers.Clear(); - } - - /* - * Holds queues for each node and keeps track of the number of listeners - * attached to the queue - */ - - private sealed class NodeQueue : IPeerEventQueue - { - public int Listeners; - - public string NodeId { get; } - - public AsyncQueue Queue { get; } - - public NodeQueue(string nodeId, int maxDepth) - { - NodeId = nodeId; - - /* - * Create a bounded channel that acts as a lru and evicts - * the oldest item when the queue is full - * - * There will also only ever be a single thread writing events - * to the queue - */ - - BoundedChannelOptions queueOptions = new(maxDepth) - { - AllowSynchronousContinuations = true, - SingleReader = false, - SingleWriter = true, - //Drop oldest item in queue if full - FullMode = BoundedChannelFullMode.DropOldest, - }; - - //Init queue/channel - Queue = new(queueOptions); - } - - public void PublishChange(ChangeEvent change) - { - Queue.TryEnque(change); - } - - public void PublishChanges(Span changes) - { - for (int i = 0; i < changes.Length; i++) - { - Queue.TryEnque(changes[i]); - } - } - - /// - public ValueTask DequeueAsync(CancellationToken cancellation) - { - return Queue.DequeueAsync(cancellation); - } - - /// - public bool TryDequeue(out ChangeEvent change) - { - return Queue.TryDequeue(out change); - } - } - } -} diff --git a/plugins/ObjectCacheServer/src/Cache/CacheListenerPubQueue.cs b/plugins/ObjectCacheServer/src/Cache/CacheListenerPubQueue.cs index 6942828..16fda39 100644 --- a/plugins/ObjectCacheServer/src/Cache/CacheListenerPubQueue.cs +++ b/plugins/ObjectCacheServer/src/Cache/CacheListenerPubQueue.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2023 Vaughn Nugent +* Copyright (c) 2024 Vaughn Nugent * * Library: VNLib * Package: ObjectCacheServer @@ -50,11 +50,11 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache private readonly AsyncQueue _listenerQueue; private readonly ILogProvider _logProvider; - private readonly ICacheEventQueueManager _queueManager; + private readonly PeerEventQueueManager _queueManager; public CacheListenerPubQueue(PluginBase plugin) { - _queueManager = plugin.GetOrCreateSingleton(); + _queueManager = plugin.GetOrCreateSingleton(); _logProvider = plugin.Log.CreateScope(LOG_SCOPE_NAME); //Init local queue to store published events @@ -110,10 +110,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache } /// - public bool IsEnabled([NotNullWhen(true)] IPeerEventQueue? userState) - { - return userState is IPeerEventQueue; - } + public bool IsEnabled([NotNullWhen(true)] IPeerEventQueue? userState) => userState is not null; /// public void PublishEvent(ChangeEvent changeEvent) diff --git a/plugins/ObjectCacheServer/src/Cache/CacheMemoryConfiguration.cs b/plugins/ObjectCacheServer/src/Cache/CacheMemoryConfiguration.cs new file mode 100644 index 0000000..c404cc5 --- /dev/null +++ b/plugins/ObjectCacheServer/src/Cache/CacheMemoryConfiguration.cs @@ -0,0 +1,58 @@ +/* +* Copyright (c) 2024 Vaughn Nugent +* +* Library: VNLib +* Package: ObjectCacheServer +* File: CacheMemoryConfiguration.cs +* +* CacheMemoryConfiguration.cs is part of ObjectCacheServer which +* is part of the larger VNLib collection of libraries and utilities. +* +* ObjectCacheServer is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* ObjectCacheServer is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System.Text.Json.Serialization; + +namespace VNLib.Data.Caching.ObjectCache.Server.Cache +{ + internal sealed class CacheMemoryConfiguration + { + [JsonPropertyName("buffer_recv_max")] + public int MaxRecvBufferSize { get; set; } = 1000 * 1024; + [JsonPropertyName("buffer_recv_min")] + public int MinRecvBufferSize { get; set; } = 8 * 1024; + + + [JsonPropertyName("buffer_header_max")] + public int MaxHeaderBufferSize { get; set; } = 2 * 1024; + + [JsonPropertyName("buffer_header_min")] + public int MinHeaderBufferSize { get; set; } = 128; + + + [JsonPropertyName("max_message_size")] + public int MaxMessageSize { get; set; } = 1000 * 1024; + + + [JsonPropertyName("max_cache")] + public uint MaxCacheEntries { get; set; } = 10000; + + [JsonPropertyName("buckets")] + public uint BucketCount { get; set; } = 10; + + + [JsonPropertyName("memory_lib_path")] + public string? ExternLibPath { get; set; } + } +} diff --git a/plugins/ObjectCacheServer/src/Cache/CacheStore.cs b/plugins/ObjectCacheServer/src/Cache/CacheStore.cs index 86df849..81f4843 100644 --- a/plugins/ObjectCacheServer/src/Cache/CacheStore.cs +++ b/plugins/ObjectCacheServer/src/Cache/CacheStore.cs @@ -26,32 +26,24 @@ using System; using System.Threading; using System.Threading.Tasks; -using VNLib.Utils.Logging; -using VNLib.Net.Messaging.FBM; -using VNLib.Plugins; using VNLib.Plugins.Extensions.Loading; namespace VNLib.Data.Caching.ObjectCache.Server.Cache { + /* * Implements the blob cache store, which is an abstraction around the blob cache listener. * This allows for publishing local events (say from other nodes) to keep caches in sync. */ [ConfigurationName("cache")] - internal sealed class CacheStore(PluginBase plugin, IConfigScope config) : ICacheStore, IDisposable + internal sealed class CacheStore(IBlobCacheTable table) : ICacheStore { - /// - /// Gets the underlying cache listener - /// - public BlobCacheListener Listener { get; } = InitializeCache((ObjectCacheServerEntry)plugin, config); - - /// ValueTask ICacheStore.AddOrUpdateBlobAsync(string objectId, string? alternateId, ObjectDataGet bodyData, T state, CancellationToken token) { - return Listener.Cache.AddOrUpdateObjectAsync(objectId, alternateId, bodyData, state, default, token); + return table.AddOrUpdateObjectAsync(objectId, alternateId, bodyData, state, default, token); } /// @@ -63,64 +55,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache /// ValueTask ICacheStore.DeleteItemAsync(string id, CancellationToken token) { - return Listener.Cache.DeleteObjectAsync(id, token); - } - - private static BlobCacheListener InitializeCache(ObjectCacheServerEntry plugin, IConfigScope config) - { - const string CacheConfigTemplate = -@" -Cache Configuration: - Max memory: {max} Mb - Buckets: {bc} - Entries per-bucket: {mc} -"; - - //Deserialize the cache config - CacheConfiguration cacheConf = config.Deserialze(); - - if (cacheConf.MaxCacheEntries < 2) - { - throw new ArgumentException("You must configure a 'max_cache' size larger than 1 item"); - } - - //Suggestion - if (cacheConf.MaxCacheEntries < 200) - { - plugin.Log.Information("Suggestion: You may want a larger cache size, you have less than 200 items in cache"); - } - - //calculate the max memory usage - ulong maxByteSize = cacheConf.MaxCacheEntries * (ulong)cacheConf.BucketCount * (ulong)cacheConf.MaxMessageSize; - - //Log the cache config - plugin.Log.Information(CacheConfigTemplate, - maxByteSize / (1024 * 1000), - cacheConf.BucketCount, - cacheConf.MaxCacheEntries - ); - - //Get the event listener - ICacheListenerEventQueue queue = plugin.GetOrCreateSingleton(); - - //Get the memory manager - ICacheMemoryManagerFactory manager = plugin.GetOrCreateSingleton(); - - //Load the blob cache table system - IBlobCacheTable bc = plugin.LoadMemoryCacheSystem(config, manager, cacheConf); - - FallbackFBMMemoryManager fbmMemManager = new(plugin.ListenerHeap); - - //Endpoint only allows for a single reader - return new(bc, queue, plugin.Log, fbmMemManager); - } - - /* - * Cleaned up by the plugin on exit - */ - public void Dispose() - { - Listener.Dispose(); + return table.DeleteObjectAsync(id, token); } } } diff --git a/plugins/ObjectCacheServer/src/Cache/CacheSystemUtil.cs b/plugins/ObjectCacheServer/src/Cache/CacheSystemUtil.cs index b7bf83f..8f196b0 100644 --- a/plugins/ObjectCacheServer/src/Cache/CacheSystemUtil.cs +++ b/plugins/ObjectCacheServer/src/Cache/CacheSystemUtil.cs @@ -29,6 +29,7 @@ using System.Text.Json; using VNLib.Utils.Resources; using VNLib.Plugins; using VNLib.Plugins.Extensions.Loading; +using VNLib.Utils.Extensions; namespace VNLib.Data.Caching.ObjectCache.Server.Cache { @@ -49,7 +50,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache /// The cache configuration object /// The loaded implementation /// - public static IBlobCacheTable LoadMemoryCacheSystem(this PluginBase plugin, IConfigScope config, ICacheMemoryManagerFactory heap, CacheConfiguration cacheConf) + public static IBlobCacheTable LoadMemoryCacheSystem(this PluginBase plugin, IConfigScope config, ICacheMemoryManagerFactory heap, CacheMemoryConfiguration cacheConf) { #pragma warning disable CA2000 // Dispose objects before losing scope @@ -94,10 +95,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache if(initMethod != null) { //Itterate all buckets - foreach (IBlobCacheBucket bucket in table) - { - initMethod.Invoke(bucket.Id); - } + table.ForEach(bucket => initMethod(bucket.Id)); } } diff --git a/plugins/ObjectCacheServer/src/Cache/PeerEventQueueManager.cs b/plugins/ObjectCacheServer/src/Cache/PeerEventQueueManager.cs new file mode 100644 index 0000000..12cf37a --- /dev/null +++ b/plugins/ObjectCacheServer/src/Cache/PeerEventQueueManager.cs @@ -0,0 +1,248 @@ +/* +* Copyright (c) 2024 Vaughn Nugent +* +* Library: VNLib +* Package: ObjectCacheServer +* File: PeerEventQueueManager.cs +* +* PeerEventQueueManager.cs is part of ObjectCacheServer which is +* part of the larger VNLib collection of libraries and utilities. +* +* ObjectCacheServer is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* ObjectCacheServer is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using System.Threading.Channels; +using System.Collections.Generic; + +using VNLib.Plugins; +using VNLib.Utils.Async; +using VNLib.Utils.Logging; +using VNLib.Plugins.Extensions.Loading; +using VNLib.Plugins.Extensions.Loading.Events; + + +namespace VNLib.Data.Caching.ObjectCache.Server.Cache +{ + internal sealed class PeerEventQueueManager : ICacheEventQueueManager, IIntervalScheduleable + { + private readonly int MaxQueueDepth; + + private readonly object SubLock = new(); + private readonly LinkedList Subscribers = []; + + private readonly object StoreLock = new(); + private readonly Dictionary QueueStore = new(StringComparer.OrdinalIgnoreCase); + + public PeerEventQueueManager(PluginBase plugin, NodeConfig config) + { + MaxQueueDepth = config.MaxQueueDepth; + + /* + * Schedule purge interval to clean up stale queues + */ + plugin.ScheduleInterval(this, config.EventQueuePurgeInterval); + + //Cleanup disposeables on unload + _ = plugin.RegisterForUnload(() => + { + QueueStore.Clear(); + Subscribers.Clear(); + }); + } + + /// + public IPeerEventQueue Subscribe(ICachePeer peer) + { + PeerEventListenerQueue? nq; + + bool isNew = false; + + //Enter sync lock + lock (StoreLock) + { + //Try to recover the queue for the node + if (!QueueStore.TryGetValue(peer.NodeId, out nq)) + { + //Create new queue since an existing queue was not found + nq = new(peer.NodeId, MaxQueueDepth); + QueueStore.Add(peer.NodeId, nq); + isNew = true; + } + + //Increment listener count since a new listener has attached + nq.Listeners++; + } + + //Publish new peer to subscribers list + if (isNew) + { + lock (SubLock) + { + //Add peer to subscribers list + Subscribers.AddLast(nq); + } + } + + //Return the node's queue + return nq; + } + + /// + public void Unsubscribe(ICachePeer peer) + { + /* + * The reason I am not purging queues that no longer have listeners + * now is because it is possible that a listener needed to detach because of + * a network issue and will be reconnecting shortly. If the node doesnt + * come back before the next purge interval, it's events will be purged. + * + * Point is: there is a reason for the garbage collection style purging + */ + + //Detach a listener for a node + lock (StoreLock) + { + //Get the queue and decrement the listener count + PeerEventListenerQueue nq = QueueStore[peer.NodeId]; + nq.Listeners--; + } + } + + /// + public void PublishSingle(ChangeEvent change) + { + //Wait to enter the sub lock + lock (SubLock) + { + //Loop through ll the fast way + LinkedListNode? q = Subscribers.First; + + while (q != null) + { + //Pub single event node + q.Value.PublishChange(change); + + //Get next queue + q = q.Next; + } + } + } + + /// + public void PublishMultiple(Span changes) + { + //Wait to enter the sub lock + lock (SubLock) + { + //Loop through ll the fast way + LinkedListNode? q = Subscribers.First; + + while (q != null) + { + //Publish multiple + q.Value.PublishChanges(changes); + + //Get next queue + q = q.Next; + } + } + } + + /// + public void PurgeStaleSubscribers() + { + //Enter locks + lock (SubLock) + { + lock (StoreLock) + { + //Get all stale queues (queues without listeners) + PeerEventListenerQueue[] staleQueues = QueueStore.Values.Where(static nq => nq.Listeners == 0).ToArray(); + + foreach (PeerEventListenerQueue nq in staleQueues) + { + //Remove from store + QueueStore.Remove(nq.NodeId); + + //remove from subscribers + Subscribers.Remove(nq); + } + } + } + } + + //Interval to purge stale subscribers + Task IIntervalScheduleable.OnIntervalAsync(ILogProvider log, CancellationToken cancellationToken) + { + log.Debug("Purging stale peer event queues"); + + PurgeStaleSubscribers(); + + return Task.CompletedTask; + } + + + /* + * Holds queues for each node and keeps track of the number of listeners + * attached to the queue + * + * The role of this class is to store change events for a given peer node, + * and return them when the peer requests them. It also keeps track of the + * number of active listeners (server connections) to the queue. + */ + + private sealed class PeerEventListenerQueue(string nodeId, int maxDepth) : IPeerEventQueue + { + public int Listeners; + + public string NodeId => nodeId; + + /* + * Create a bounded channel that acts as a lru and evicts + * the oldest item when the queue is full + * + * There will also only ever be a single thread writing events + * to the queue + */ + private readonly AsyncQueue Queue = new(new BoundedChannelOptions(maxDepth) + { + AllowSynchronousContinuations = true, + SingleReader = false, + SingleWriter = true, + //Drop oldest item in queue if full + FullMode = BoundedChannelFullMode.DropOldest, + }); + + public void PublishChange(ChangeEvent change) => Queue.TryEnque(change); + + public void PublishChanges(Span changes) + { + for (int i = 0; i < changes.Length; i++) + { + Queue.TryEnque(changes[i]); + } + } + + /// + public ValueTask DequeueAsync(CancellationToken cancellation) => Queue.DequeueAsync(cancellation); + + /// + public bool TryDequeue(out ChangeEvent change) => Queue.TryDequeue(out change); + } + } +} diff --git a/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs b/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs index a240dde..0a1bb4d 100644 --- a/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs +++ b/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2023 Vaughn Nugent +* Copyright (c) 2024 Vaughn Nugent * * Library: VNLib * Package: ObjectCacheServer @@ -36,7 +36,6 @@ using VNLib.Net.Messaging.FBM; using VNLib.Net.Messaging.FBM.Client; using VNLib.Plugins.Extensions.Loading; using VNLib.Data.Caching.Extensions.Clustering; -using VNLib.Data.Caching.ObjectCache.Server.Cache; namespace VNLib.Data.Caching.ObjectCache.Server.Clustering { @@ -56,38 +55,37 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering internal sealed class CacheNodeReplicationMaanger : IAsyncBackgroundWork { private const string LOG_SCOPE_NAME = "REPL"; + private const string FBM_LOG_SCOPE_NAME = "REPL-CLNT"; private static readonly TimeSpan GetItemTimeout = TimeSpan.FromSeconds(10); private const int MAX_MESSAGE_SIZE = 12 * 1024; private readonly PluginBase _plugin; private readonly ILogProvider _log; - private readonly NodeConfig _nodeConfig; - private readonly ICacheStore _cacheStore; - private readonly ICachePeerAdapter _peerAdapter; private readonly FBMClientFactory _clientFactory; - + private readonly ObjectCacheSystemState _sysState; + private readonly bool _isDebug; private int _openConnections; public CacheNodeReplicationMaanger(PluginBase plugin) { - //Load the node config - _nodeConfig = plugin.GetOrCreateSingleton(); - _cacheStore = plugin.GetOrCreateSingleton(); - _peerAdapter = plugin.GetOrCreateSingleton(); + _sysState = plugin.GetOrCreateSingleton(); //Init fbm config with fixed message size FBMClientConfig clientConfig = FBMDataCacheExtensions.GetDefaultConfig( - (plugin as ObjectCacheServerEntry)!.ListenerHeap, + _sysState.SharedCacheHeap, MAX_MESSAGE_SIZE, - debugLog: plugin.IsDebug() ? plugin.Log : null + debugLog: plugin.IsDebug() ? plugin.Log.CreateScope(FBM_LOG_SCOPE_NAME) : null ); //Init ws fallback factory and client factory - FBMFallbackClientWsFactory wsFactory = new(); - _clientFactory = new(in clientConfig, wsFactory); + _clientFactory = new( + ref clientConfig, + new FBMFallbackClientWsFactory(), + (int)_sysState.Configuration.MaxPeerConnections + ); _plugin = plugin; _isDebug = plugin.IsDebug(); @@ -103,7 +101,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering while (true) { //Get all new peers - CacheNodeAdvertisment[] peers = _peerAdapter.GetNewPeers(); + CacheNodeAdvertisment[] peers = _sysState.PeerDiscovery.GetNewPeers(); if (peers.Length == 0 && _isDebug) { @@ -111,7 +109,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering } //Make sure we don't exceed the max connections - if(_openConnections >= _nodeConfig.MaxPeerConnections) + if(_openConnections >= _sysState.Configuration.MaxPeerConnections) { if (_isDebug) { @@ -150,13 +148,13 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering private async Task OnNewPeerDoWorkAsync(CacheNodeAdvertisment newPeer, ILogProvider log, CancellationToken exitToken) { - _ = newPeer ?? throw new ArgumentNullException(nameof(newPeer)); + ArgumentNullException.ThrowIfNull(newPeer); //Setup client FBMClient client = _clientFactory.CreateClient(); //Add peer to monitor - _peerAdapter.OnPeerListenerAttached(newPeer); + _sysState.PeerDiscovery.OnPeerListenerAttached(newPeer); Interlocked.Increment(ref _openConnections); @@ -165,12 +163,12 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering log.Information("Establishing replication connection to peer {server}...", newPeer.NodeId); //Connect to the server - await client.ConnectToCacheAsync(newPeer, _nodeConfig.Config, exitToken); + await client.ConnectToCacheAsync(newPeer, _sysState.Configuration.Config, exitToken); log.Information("Connected to {server}, starting queue listeners", newPeer.NodeId); //Start worker tasks - List workerTasks = new(); + List workerTasks = []; for (int i = 0; i < Environment.ProcessorCount; i++) { @@ -187,6 +185,10 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering //Disconnect client gracefully await client.DisconnectAsync(CancellationToken.None); } + catch(FBMServerNegiationException fbm) + { + log.Error("Failed to negotiate buffer configuration, check your cache memory configuration. Error:{err}", fbm.Message); + } catch (InvalidResponseException ie) { //See if the plugin is unloading @@ -227,7 +229,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering client.Dispose(); //Notify monitor of disconnect - _peerAdapter.OnPeerListenerDetatched(newPeer); + _sysState.PeerDiscovery.OnPeerListenerDetatched(newPeer); } } @@ -259,7 +261,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering return; case "deleted": //Delete the object from the store - await _cacheStore.DeleteItemAsync(changedObject.CurrentId, CancellationToken.None); + await _sysState.InternalStore.DeleteItemAsync(changedObject.CurrentId, CancellationToken.None); break; case "modified": //Reload the record from the store @@ -297,7 +299,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering if (ResponseCodes.Okay.Equals(status, StringComparison.Ordinal)) { //Update the record - await _cacheStore.AddOrUpdateBlobAsync(objectId, newId, static (t) => t.ResponseBody, response, cancellation); + await _sysState.InternalStore.AddOrUpdateBlobAsync(objectId, newId, static (t) => t.ResponseBody, response, cancellation); log.Debug("Updated object {id}", objectId); } else diff --git a/plugins/ObjectCacheServer/src/Clustering/CachePeerMonitor.cs b/plugins/ObjectCacheServer/src/Clustering/CachePeerMonitor.cs index c49a54b..c3fbd8e 100644 --- a/plugins/ObjectCacheServer/src/Clustering/CachePeerMonitor.cs +++ b/plugins/ObjectCacheServer/src/Clustering/CachePeerMonitor.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2023 Vaughn Nugent +* Copyright (c) 2024 Vaughn Nugent * * Library: VNLib * Package: ObjectCacheServer @@ -29,7 +29,6 @@ using System.Collections.Generic; using VNLib.Utils; using VNLib.Utils.Extensions; -using VNLib.Plugins; namespace VNLib.Data.Caching.ObjectCache.Server.Clustering { @@ -37,12 +36,9 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering internal sealed class CachePeerMonitor : VnDisposeable, IPeerMonitor { - private readonly LinkedList peers = new(); + private readonly List peers = new(); private readonly ManualResetEvent newPeerTrigger = new (false); - public CachePeerMonitor(PluginBase plugin) - { } - /// /// Waits for new peers to connect to the server /// @@ -70,7 +66,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering //When a peer is connected we can add it to the list so the replication manager can see it lock(peers) { - peers.AddLast(peer); + peers.Add(peer); } //Trigger monitor when change occurs @@ -92,6 +88,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering protected override void Free() { + peers.Clear(); newPeerTrigger.Dispose(); } } diff --git a/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs b/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs index 6475f9c..f22e1dd 100644 --- a/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs +++ b/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2023 Vaughn Nugent +* Copyright (c) 2024 Vaughn Nugent * * Library: VNLib * Package: ObjectCacheServer @@ -24,14 +24,11 @@ using System; using System.Linq; -using System.Net.Http; using System.Threading; -using System.Net.Sockets; using System.Threading.Tasks; using System.Collections.Generic; using VNLib.Utils.Logging; -using VNLib.Plugins; using VNLib.Plugins.Extensions.Loading; using VNLib.Data.Caching.Extensions; using VNLib.Data.Caching.Extensions.Clustering; @@ -43,9 +40,10 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering * This class is responsible for resolving and discovering peer nodes in the cluster network. */ - internal sealed class PeerDiscoveryManager : IAsyncBackgroundWork, ICachePeerAdapter + internal sealed class PeerDiscoveryManager(NodeConfig config, ILogProvider Log, bool IsDebug, bool HasWellKnown) : IAsyncBackgroundWork, ICachePeerAdapter { - private const string LOG_SCOPE_NAME = "DISC"; + internal const string LOG_SCOPE_NAME = "DISC"; + /* * The initial discovery delay. This allows for the server to initialize before * starting the discovery process. This will probably be a shorter delay @@ -54,43 +52,9 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering private static readonly TimeSpan InitialDelay = TimeSpan.FromSeconds(15); private static readonly TimeSpan WhenWellKnownResolveFailed = TimeSpan.FromSeconds(20); - - private readonly List _connectedPeers; - private readonly NodeConfig Config; - private readonly CachePeerMonitor Monitor; - private readonly ILogProvider Log; - private readonly bool IsDebug; - private readonly bool HasWellKnown; - - public PeerDiscoveryManager(PluginBase plugin) - { - //Get config - Config = plugin.GetOrCreateSingleton(); - - //Get the known peers array from config, its allowed to be null for master nodes - IConfigScope? config = plugin.TryGetConfig("known_peers"); - string[] kownPeers = config?.Deserialze() ?? Array.Empty(); - - //Add known peers to the monitor - Config.Config.WithInitialPeers(kownPeers.Select(static s => new Uri(s))); - - HasWellKnown = kownPeers.Length > 0; - - //Get the peer monitor - Monitor = plugin.GetOrCreateSingleton(); - - _connectedPeers = new(); - - //Create scoped logger - Log = plugin.Log.CreateScope(LOG_SCOPE_NAME); - - Log.Information("Inital peer nodes: {nodes}", kownPeers); - - //Setup discovery error handler - Config.Config.WithErrorHandler(new ErrorHandler(Log)); - - IsDebug = plugin.IsDebug(); - } + private readonly List _connectedPeers = []; + private readonly CachePeerMonitor Monitor = new(); + private readonly VNCacheClusterManager clusterMan = new(config.Config); async Task IAsyncBackgroundWork.DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken) { @@ -124,7 +88,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering } //Resolve all known peers - CacheNodeAdvertisment[] wellKnown = await Config.Config.ResolveWellKnownAsync(exitToken); + CacheNodeAdvertisment[] wellKnown = await clusterMan.ResolveWellKnownAsync(exitToken); wellKnownFailed = wellKnown.Length == 0; //Use the monitor to get the initial peers @@ -136,13 +100,13 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering if (allAds.Length > 0) { //Discover all known nodes - await Config.Config.DiscoverNodesAsync(allAds, exitToken); + await clusterMan.DiscoverNodesAsync(allAds, exitToken); } //Log the discovered nodes if verbose logging is enabled if (IsDebug) { - CacheNodeAdvertisment[] found = Config.Config.NodeCollection.GetAllNodes(); + CacheNodeAdvertisment[] found = clusterMan.DiscoveredNodes.GetAllNodes(); Log.Debug("Discovered {count} nodes\n\t{nodes}", found.Length, found.Select(static s => s.NodeId)); } @@ -177,7 +141,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering else { //Delay the next discovery - await Task.Delay(Config.DiscoveryInterval, exitToken); + await Task.Delay(config.DiscoveryInterval, exitToken); } } } @@ -188,7 +152,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering } finally { - + Monitor.Dispose(); } //Wait for the watcher to exit @@ -197,10 +161,11 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering private IEnumerable GetMonitorAds() { + string selfId = (clusterMan.Config as CacheNodeConfiguration)!.NodeId; return Monitor.GetAllPeers() .Where(static p => p.Advertisment != null) //Without us - .Where(n => n.NodeId != Config.Config.NodeId) + .Where(n => !string.Equals(n.NodeId, selfId, StringComparison.OrdinalIgnoreCase)) .Select(static p => p.Advertisment!); } @@ -222,7 +187,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering //Use the monitor to get the initial peers IEnumerable ads = GetMonitorAds(); - ((NodeDiscoveryCollection)Config.Config.NodeCollection).AddManualNodes(ads); + clusterMan.AddManualNodes(ads); } } catch (OperationCanceledException) @@ -239,7 +204,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering lock (_connectedPeers) { //Get all discovered peers - CacheNodeAdvertisment[] peers = Config.Config.NodeCollection.GetAllNodes(); + CacheNodeAdvertisment[] peers = clusterMan.DiscoveredNodes.GetAllNodes(); //Get the difference between the discovered peers and the connected peers return peers.Except(_connectedPeers).ToArray(); @@ -265,31 +230,5 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering _connectedPeers.Remove(peer); } } - - - private sealed record class ErrorHandler(ILogProvider Logger) : ICacheDiscoveryErrorHandler - { - public void OnDiscoveryError(CacheNodeAdvertisment errorNode, Exception ex) - { - - if (ex is HttpRequestException hre) - { - if (hre.InnerException is SocketException se) - { - //traisnport failed - Logger.Warn("Failed to connect to server {serv} because {err}", errorNode, se.Message); - } - else - { - Logger.Error("Failed to connect to node {n}\n{err}", errorNode, hre); - } - } - else - { - Logger.Error("Failed to discover nodes from nodeid {nid}, with error\n{err}", errorNode, ex); - } - - } - } } } diff --git a/plugins/ObjectCacheServer/src/Endpoints/CacheNegotationManager.cs b/plugins/ObjectCacheServer/src/Endpoints/CacheNegotationManager.cs index d1591f8..48f4448 100644 --- a/plugins/ObjectCacheServer/src/Endpoints/CacheNegotationManager.cs +++ b/plugins/ObjectCacheServer/src/Endpoints/CacheNegotationManager.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2023 Vaughn Nugent +* Copyright (c) 2024 Vaughn Nugent * * Library: VNLib * Package: ObjectCacheServer @@ -45,7 +45,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints public bool IsPeer { get; set; } } - internal sealed class CacheNegotationManager + internal sealed class CacheNegotationManager(PluginBase plugin) { /* * Cache keys are centralized and may be shared between all cache server nodes. This means @@ -64,21 +64,13 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints private static readonly TimeSpan AuthTokenExpiration = TimeSpan.FromSeconds(30); - private readonly string AudienceLocalServerId; - private readonly NodeConfig _nodeConfig; - private readonly CacheConfiguration _cacheConfig; + private readonly string AudienceLocalServerId = Guid.NewGuid().ToString("N"); - public CacheNegotationManager(PluginBase plugin) - { - //Get node configuration - _nodeConfig = plugin.GetOrCreateSingleton(); + private readonly ObjectCacheSystemState _sysState = plugin.GetOrCreateSingleton(); - //Get the cache store configuration - _cacheConfig = plugin.GetConfigForType().Deserialze(); + private NodeConfig NodeConfig => _sysState.Configuration; - AudienceLocalServerId = Guid.NewGuid().ToString("N"); - } - + private CacheMemoryConfiguration CacheConfig => _sysState.MemoryConfiguration; public bool IsClientNegotiationValid(string authToken, out ClientNegotiationState state) { @@ -88,12 +80,12 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints using JsonWebToken jwt = JsonWebToken.Parse(authToken); //verify signature for client - if (_nodeConfig.KeyStore.VerifyJwt(jwt, false)) + if (NodeConfig.KeyStore.VerifyJwt(jwt, false)) { //Validated as normal client } //May be signed by a cache server - else if (_nodeConfig.KeyStore.VerifyJwt(jwt, true)) + else if (NodeConfig.KeyStore.VerifyJwt(jwt, true)) { //Set peer and verified flag since the another cache server signed the request state.IsPeer = true; @@ -117,12 +109,12 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints return true; } - public JsonWebToken ConfirmCLientNegotiation(ClientNegotiationState state, IPAddress clientIp, DateTimeOffset now) + public JsonWebToken ConfirmClientNegotiation(ClientNegotiationState state, IPAddress clientIp, DateTimeOffset now) { //Verified, now we can create an auth message with a short expiration JsonWebToken auth = new(); - auth.WriteHeader(_nodeConfig.KeyStore.GetJwtHeader()); + auth.WriteHeader(NodeConfig.KeyStore.GetJwtHeader()); auth.InitPayloadClaim() .AddClaim("aud", AudienceLocalServerId) .AddClaim("iat", now.ToUnixTimeSeconds()) @@ -136,24 +128,29 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints //Set ip address .AddClaim("ip", clientIp.ToString()) //Add negotiaion args - .AddClaim(FBMClient.REQ_HEAD_BUF_QUERY_ARG, _cacheConfig.MaxHeaderBufferSize) - .AddClaim(FBMClient.REQ_RECV_BUF_QUERY_ARG, _cacheConfig.MaxRecvBufferSize) - .AddClaim(FBMClient.REQ_MAX_MESS_QUERY_ARG, _cacheConfig.MaxMessageSize) + .AddClaim(FBMClient.REQ_HEAD_BUF_QUERY_ARG, CacheConfig.MaxHeaderBufferSize) + .AddClaim(FBMClient.REQ_RECV_BUF_QUERY_ARG, CacheConfig.MaxRecvBufferSize) + .AddClaim(FBMClient.REQ_MAX_MESS_QUERY_ARG, CacheConfig.MaxMessageSize) .CommitClaims(); //Sign the auth message from our private key - _nodeConfig.KeyStore.SignJwt(auth); + NodeConfig.KeyStore.SignJwt(auth); return auth; } - public bool ValidateUpgrade(string upgradeToken, string tokenSignature, DateTimeOffset now, IPAddress connectionIp, ref string? nodeId, ref bool isPeer) + public bool ValidateUpgrade(string? upgradeToken, string? tokenSignature, DateTimeOffset now, IPAddress connectionIp, ref string? nodeId, ref bool isPeer) { + if(string.IsNullOrWhiteSpace(upgradeToken) || string.IsNullOrWhiteSpace(tokenSignature)) + { + return false; + } + //Parse jwt using JsonWebToken jwt = JsonWebToken.Parse(upgradeToken); //verify signature against the cache public key, since this server must have signed it - if (!_nodeConfig.KeyStore.VerifyCachePeer(jwt)) + if (!NodeConfig.KeyStore.VerifyCachePeer(jwt)) { return false; } @@ -175,7 +172,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints } //Check node ip address matches if required - if (_nodeConfig.VerifyIp) + if (NodeConfig.VerifyIp) { if (!doc.RootElement.TryGetProperty("ip", out JsonElement ipEl)) { @@ -201,7 +198,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints } //Verify token signature against a fellow cache public key - return _nodeConfig.KeyStore.VerifyUpgradeToken(tokenSignature, upgradeToken, isPeer); + return NodeConfig.KeyStore.VerifyUpgradeToken(tokenSignature, upgradeToken, isPeer); } } } diff --git a/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs b/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs index 816e6c3..d6b733c 100644 --- a/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs +++ b/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2023 Vaughn Nugent +* Copyright (c) 2024 Vaughn Nugent * * Library: VNLib * Package: ObjectCacheServer @@ -55,11 +55,14 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints { internal const string LOG_SCOPE_NAME = "CONEP"; - - private readonly ICacheEventQueueManager PubSubManager; - private readonly IPeerMonitor Peers; - private readonly BlobCacheListener Store; - private readonly NodeConfig NodeConfiguration; + + private readonly ObjectCacheSystemState _sysState; + + private PeerEventQueueManager PubSubManager => _sysState.PeerEventQueue; + private CachePeerMonitor Peers => _sysState.PeerMonitor; + private BlobCacheListener Listener => _sysState.Listener; + private NodeConfig NodeConfiguration => _sysState.Configuration; + private readonly CacheNegotationManager AuthManager; private uint _connectedClients; @@ -72,7 +75,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints /// /// The cache store configuration /// - public CacheConfiguration CacheConfig { get; } + public CacheMemoryConfiguration CacheConfig => _sysState.MemoryConfiguration; //Loosen up protection settings /// @@ -83,24 +86,11 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints public ConnectEndpoint(PluginBase plugin) { - //Get node configuration - NodeConfiguration = plugin.GetOrCreateSingleton(); + _sysState = plugin.GetOrCreateSingleton(); //Init from config and create a new log scope InitPathAndLog(NodeConfiguration.ConnectPath, plugin.Log.CreateScope(LOG_SCOPE_NAME)); - - //Setup pub/sub manager - PubSubManager = plugin.GetOrCreateSingleton(); - - //Get peer monitor - Peers = plugin.GetOrCreateSingleton(); - - //Init the cache store - Store = plugin.GetOrCreateSingleton().Listener; - - //Get the cache store configuration - CacheConfig = plugin.GetConfigForType().Deserialze(); - + //Get the auth manager AuthManager = plugin.GetOrCreateSingleton(); } @@ -127,6 +117,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints { //Parse jwt from authoriation string? jwtAuth = entity.Server.Headers[HttpRequestHeader.Authorization]; + if (string.IsNullOrWhiteSpace(jwtAuth)) { return VirtualClose(entity, HttpStatusCode.Forbidden); @@ -149,26 +140,34 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints } //Verified, now we can create an auth message with a short expiration - using JsonWebToken auth = AuthManager.ConfirmCLientNegotiation(state, entity.TrustedRemoteIp, entity.RequestedTimeUtc); + using JsonWebToken auth = AuthManager.ConfirmClientNegotiation(state, entity.TrustedRemoteIp, entity.RequestedTimeUtc); - //Close response + //Close response by sending a copy of the signed token entity.CloseResponse(HttpStatusCode.OK, ContentType.Text, auth.DataBuffer); return VfReturnType.VirtualSkip; } protected override VfReturnType WebsocketRequested(HttpEntity entity) { + /* + * Check to see if any more connections are allowed, + * otherwise deny the connection + * + * This is done here to prevent the server from being overloaded + * on a new connection. It would be ideal to not grant new tokens + * but malicious clients could cache a bunch of tokens and use them + * later, exhausting resources. + */ + if(_connectedClients >= NodeConfiguration.MaxConcurrentConnections) + { + return VirtualClose(entity, HttpStatusCode.ServiceUnavailable); + } + //Parse jwt from authorization string? jwtAuth = entity.Server.Headers[HttpRequestHeader.Authorization]; string? clientSignature = entity.Server.Headers[FBMDataCacheExtensions.X_UPGRADE_SIG_HEADER]; string? optionalDiscovery = entity.Server.Headers[FBMDataCacheExtensions.X_NODE_DISCOVERY_HEADER]; - //Not null - if (string.IsNullOrWhiteSpace(jwtAuth) || string.IsNullOrWhiteSpace(clientSignature)) - { - return VfReturnType.Forbidden; - } - string? nodeId = null; bool isPeer = false; @@ -178,15 +177,15 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints return VirtualClose(entity, HttpStatusCode.Unauthorized); } - CacheNodeAdvertisment? discoveryAd = null; - /* * If the client is a peer server, it may offer a signed advertisment * that this node will have the duty of making available to other peers * if it is valid */ - if (isPeer && !string.IsNullOrWhiteSpace(optionalDiscovery)) + CacheNodeAdvertisment? discoveryAd = null; + + if (isPeer) { discoveryAd = NodeConfiguration.KeyStore.VerifyPeerAdvertisment(optionalDiscovery); } @@ -196,11 +195,10 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints try { //Get query config suggestions from the client - string recvBufCmd = entity.QueryArgs[FBMClient.REQ_RECV_BUF_QUERY_ARG]; - string maxHeaderCharCmd = entity.QueryArgs[FBMClient.REQ_HEAD_BUF_QUERY_ARG]; - string maxMessageSizeCmd = entity.QueryArgs[FBMClient.REQ_MAX_MESS_QUERY_ARG]; + string? recvBufCmd = entity.QueryArgs.GetValueOrDefault(FBMClient.REQ_RECV_BUF_QUERY_ARG); + string? maxHeaderCharCmd = entity.QueryArgs.GetValueOrDefault(FBMClient.REQ_HEAD_BUF_QUERY_ARG); + string? maxMessageSizeCmd = entity.QueryArgs.GetValueOrDefault(FBMClient.REQ_MAX_MESS_QUERY_ARG); - //Parse recv buffer size int recvBufSize = int.TryParse(recvBufCmd, out int rbs) ? rbs : CacheConfig.MinRecvBufferSize; int maxHeadBufSize = int.TryParse(maxHeaderCharCmd, out int hbs) ? hbs : CacheConfig.MinHeaderBufferSize; int maxMessageSize = int.TryParse(maxMessageSizeCmd, out int mxs) ? mxs : CacheConfig.MaxMessageSize; @@ -253,9 +251,8 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints Peers.OnPeerConnected(state); //Register plugin exit token to cancel the connected socket - CancellationTokenRegistration reg = this.GetPlugin().UnloadToken.Register(wss.CancelAll); - - //Inc connected count + await using CancellationTokenRegistration reg = this.GetPlugin().UnloadToken.Register(wss.CancelAll); + Interlocked.Increment(ref _connectedClients); try @@ -280,7 +277,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints try { //Begin listening for messages with a queue - await Store.ListenAsync(wss, queue, args); + await Listener.ListenAsync(wss, queue, args); } finally { @@ -291,7 +288,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints else { //Begin listening for messages without a queue - await Store.ListenAsync(wss, null!, args); + await Listener.ListenAsync(wss, null!, args); } } catch (OperationCanceledException) @@ -303,15 +300,13 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints } catch (Exception ex) { - Log.Debug(ex); + //If debug logging is enabled print a more detailed error message + Log.Error("An error occured on websocket connection: node {con} -> {error}", state.NodeId, ex.Message); + Log.Debug("Websocket connection error: node {con}\n{error}", state.NodeId, ex); } - - //Dec connected count + Interlocked.Decrement(ref _connectedClients); - //Unregister the token - reg.Unregister(); - //Notify monitor of disconnect Peers.OnPeerDisconnected(state); diff --git a/plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs b/plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs index 7d376b8..56fe8cd 100644 --- a/plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs +++ b/plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2023 Vaughn Nugent +* Copyright (c) 2024 Vaughn Nugent * * Library: VNLib * Package: ObjectCacheServer @@ -40,25 +40,29 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints { internal sealed class PeerDiscoveryEndpoint : ResourceEndpointBase { - private readonly IPeerMonitor PeerMonitor; - private readonly NodeConfig Config; + private readonly ObjectCacheSystemState SysState; + + private CacheAuthKeyStore KeyStore => SysState.Configuration.KeyStore; + + private CachePeerMonitor PeerMonitor => SysState.PeerMonitor; + + private CacheNodeConfiguration NodeConfig => SysState.Configuration.Config; - //Loosen up protection settings /// protected override ProtectionSettings EndpointProtectionSettings { get; } = new() { - DisableSessionsRequired = true + /* + * Sessions will not be used or required for this endpoint. + * We should also assume the session system is not even loaded + */ + DisableSessionsRequired = true }; public PeerDiscoveryEndpoint(PluginBase plugin) { - //Get the peer monitor - PeerMonitor = plugin.GetOrCreateSingleton(); - - //Get the node config - Config = plugin.GetOrCreateSingleton(); + SysState = plugin.GetOrCreateSingleton(); - InitPathAndLog(Config.DiscoveryPath, plugin.Log); + InitPathAndLog(SysState.Configuration.DiscoveryPath!, plugin.Log); } protected override VfReturnType Get(HttpEntity entity) @@ -68,36 +72,41 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints if(string.IsNullOrWhiteSpace(authToken)) { - entity.CloseResponse(HttpStatusCode.Unauthorized); - return VfReturnType.VirtualSkip; + return VirtualClose(entity, HttpStatusCode.Unauthorized); } string subject = string.Empty; string challenge = string.Empty; - //Parse auth token - using(JsonWebToken jwt = JsonWebToken.Parse(authToken)) + try { + //Parse auth token + using JsonWebToken jwt = JsonWebToken.Parse(authToken); + //try to verify against cache node first - if (!Config.KeyStore.VerifyJwt(jwt, true)) + if (!KeyStore.VerifyJwt(jwt, true)) { //failed... //try to verify against client key - if (!Config.KeyStore.VerifyJwt(jwt, false)) + if (!KeyStore.VerifyJwt(jwt, false)) { //invalid token - entity.CloseResponse(HttpStatusCode.Unauthorized); - return VfReturnType.VirtualSkip; + return VirtualClose(entity, HttpStatusCode.Unauthorized); } } using JsonDocument payload = jwt.GetPayload(); //Get client info to pass back - subject = payload.RootElement.TryGetProperty("sub", out JsonElement subEl) ? subEl.GetString() ?? string.Empty : string.Empty; + subject = payload.RootElement.TryGetProperty("sub", out JsonElement subEl) ? subEl.GetString() ?? string.Empty : string.Empty; challenge = payload.RootElement.GetProperty("chl").GetString() ?? string.Empty; } + catch (FormatException) + { + //If tokens are invalid format, let the client know instead of a server error + return VfReturnType.BadRequest; + } //Valid key, get peer list to send to client CacheNodeAdvertisment[] peers = PeerMonitor.GetAllPeers() @@ -109,10 +118,10 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints using JsonWebToken response = new(); //set header from cache config - response.WriteHeader(Config.KeyStore.GetJwtHeader()); + response.WriteHeader(KeyStore.GetJwtHeader()); response.InitPayloadClaim() - .AddClaim("iss", Config.Config.NodeId) + .AddClaim("iss", NodeConfig.NodeId) //Audience is the requestor id .AddClaim("sub", subject) .AddClaim("iat", entity.RequestedTimeUtc.ToUnixTimeSeconds()) @@ -122,10 +131,9 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints .AddClaim("chl", challenge) .CommitClaims(); - //Sign the response - Config.KeyStore.SignJwt(response); - - //Send response to client + + KeyStore.SignJwt(response); + entity.CloseResponse(HttpStatusCode.OK, Net.Http.ContentType.Text, response.DataBuffer); return VfReturnType.VirtualSkip; } diff --git a/plugins/ObjectCacheServer/src/Endpoints/WellKnownEndpoint.cs b/plugins/ObjectCacheServer/src/Endpoints/WellKnownEndpoint.cs index 87a471b..04380c5 100644 --- a/plugins/ObjectCacheServer/src/Endpoints/WellKnownEndpoint.cs +++ b/plugins/ObjectCacheServer/src/Endpoints/WellKnownEndpoint.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2023 Vaughn Nugent +* Copyright (c) 2024 Vaughn Nugent * * Library: VNLib * Package: ObjectCacheServer @@ -59,7 +59,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints public WellKnownEndpoint(PluginBase plugin) { //Get the node config - NodeConfig nodeConfig = plugin.GetOrCreateSingleton(); + NodeConfig nodeConfig = plugin.GetOrCreateSingleton().Configuration; //serialize the config, discovery may not be enabled _advertisment = nodeConfig.Config.Advertisment; diff --git a/plugins/ObjectCacheServer/src/NodeConfig.cs b/plugins/ObjectCacheServer/src/NodeConfig.cs index 3a2e10e..4dd9f4a 100644 --- a/plugins/ObjectCacheServer/src/NodeConfig.cs +++ b/plugins/ObjectCacheServer/src/NodeConfig.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2023 Vaughn Nugent +* Copyright (c) 2024 Vaughn Nugent * * Library: VNLib * Package: ObjectCacheServer @@ -66,6 +66,12 @@ namespace VNLib.Data.Caching.ObjectCache.Server /// public uint MaxPeerConnections { get; } = 10; + /// + /// The maxium number of concurrent client connections to allow + /// before rejecting new connections + /// + public uint MaxConcurrentConnections { get; } + public NodeConfig(PluginBase plugin, IConfigScope config) { //Get the port of the primary webserver @@ -92,32 +98,23 @@ namespace VNLib.Data.Caching.ObjectCache.Server //Init key store KeyStore = new(plugin); - - DiscoveryInterval = config["discovery_interval_sec"].GetTimeSpan(TimeParseType.Seconds); - - //Get the event queue purge interval - EventQueuePurgeInterval = config["queue_purge_interval_sec"].GetTimeSpan(TimeParseType.Seconds); - - //Get the max queue depth - MaxQueueDepth = (int)config["max_queue_depth"].GetUInt32(); - - - //Get the connect path - ConnectPath = config["connect_path"].GetString() ?? throw new KeyNotFoundException("Missing required key 'connect_path' in cluster config"); - - //Get the verify ip setting - VerifyIp = config["verify_ip"].GetBoolean(); + DiscoveryInterval = config.GetRequiredProperty("discovery_interval_sec", p => p.GetTimeSpan(TimeParseType.Seconds)); + EventQueuePurgeInterval = config.GetRequiredProperty("queue_purge_interval_sec", p => p.GetTimeSpan(TimeParseType.Seconds)); + MaxQueueDepth = (int)config.GetRequiredProperty("max_queue_depth", p => p.GetUInt32()); + ConnectPath = config.GetRequiredProperty("connect_path", p => p.GetString()!); + VerifyIp = config.GetRequiredProperty("verify_ip", p => p.GetBoolean()); + WellKnownPath = config.GetValueOrDefault("well_known_path", p => p.GetString()!, DefaultPath); + MaxPeerConnections = config.GetValueOrDefault("max_peers", p => p.GetUInt32(), 10u); Uri connectEp = BuildUri(usingTls, hostname, port, ConnectPath); Uri? discoveryEp = null; - Config = new(); - //Setup cache node config - Config.WithCacheEndpoint(connectEp) - .WithNodeId(nodeId) - .WithAuthenticator(KeyStore) - .WithTls(usingTls); + (Config = new()) + .WithCacheEndpoint(connectEp) + .WithNodeId(nodeId) + .WithAuthenticator(KeyStore) + .WithTls(usingTls); //Get the discovery path (optional) if (config.TryGetValue("discovery_path", out JsonElement discoveryPathEl)) @@ -133,20 +130,6 @@ namespace VNLib.Data.Caching.ObjectCache.Server } } - //Allow custom well-known path - if(config.TryGetValue("well_known_path", out JsonElement wkEl)) - { - WellKnownPath = wkEl.GetString() ?? DefaultPath; - } - //Default if not set - WellKnownPath ??= DefaultPath; - - //Get the max peer connections - if (config.TryGetValue("max_peers", out JsonElement maxPeerEl)) - { - MaxPeerConnections = maxPeerEl.GetUInt32(); - } - const string CacheConfigTemplate = @" Cluster Configuration: diff --git a/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs b/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs index aada787..b970cee 100644 --- a/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs +++ b/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2023 Vaughn Nugent +* Copyright (c) 2024 Vaughn Nugent * * Library: VNLib * Package: ObjectCacheServer @@ -23,13 +23,10 @@ */ using System; -using System.Threading; using System.Collections.Generic; using VNLib.Plugins; -using VNLib.Utils.Memory; using VNLib.Utils.Logging; -using VNLib.Utils.Memory.Diagnostics; using VNLib.Plugins.Extensions.Loading; using VNLib.Plugins.Extensions.Loading.Routing; @@ -42,39 +39,15 @@ namespace VNLib.Data.Caching.ObjectCache.Server public sealed class ObjectCacheServerEntry : PluginBase { public override string PluginName => "ObjectCache.Service"; - - private readonly Lazy _cacheHeap; - - internal IUnmangedHeap ListenerHeap => _cacheHeap.Value; - - public ObjectCacheServerEntry() - { - //Init heap - _cacheHeap = new Lazy(InitializeHeap, LazyThreadSafetyMode.PublicationOnly); - } - - internal IUnmangedHeap InitializeHeap() - { - //Create default heap - IUnmangedHeap _heap = MemoryUtil.InitializeNewHeapForProcess(); - try - { - //If the plugin is in debug mode enable heap tracking - return this.IsDebug() ? new TrackedHeapWrapper(_heap, true) : _heap; - } - catch - { - _heap.Dispose(); - throw; - } - } + protected override void OnLoad() { try { - //Get the node configuration first - NodeConfig config = this.GetOrCreateSingleton(); + //Initialize the cache node builder + ObjectCacheSystemState builder = this.GetOrCreateSingleton(); + builder.Initialize(); //Route well-known endpoint this.Route(); @@ -86,7 +59,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server _ = this.GetOrCreateSingleton(); //Setup discovery endpoint - if(!string.IsNullOrWhiteSpace(config.DiscoveryPath)) + if(!string.IsNullOrWhiteSpace(builder.Configuration.DiscoveryPath)) { this.Route(); } @@ -101,12 +74,6 @@ namespace VNLib.Data.Caching.ObjectCache.Server protected override void OnUnLoad() { - //dispose heap if initialized - if(_cacheHeap.IsValueCreated) - { - _cacheHeap.Value.Dispose(); - } - Log.Information("Plugin unloaded"); } diff --git a/plugins/ObjectCacheServer/src/ObjectCacheSystemState.cs b/plugins/ObjectCacheServer/src/ObjectCacheSystemState.cs new file mode 100644 index 0000000..6183956 --- /dev/null +++ b/plugins/ObjectCacheServer/src/ObjectCacheSystemState.cs @@ -0,0 +1,215 @@ +/* +* Copyright (c) 2024 Vaughn Nugent +* +* Library: VNLib +* Package: ObjectCacheServer +* File: ObjectCacheSystemState.cs +* +* ObjectCacheSystemState.cs is part of ObjectCacheServer which is +* part of the larger VNLib collection of libraries and utilities. +* +* ObjectCacheServer is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* ObjectCacheServer is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Linq; +using System.Net.Http; +using System.Net.Sockets; + +using VNLib.Utils.Logging; +using VNLib.Utils.Memory; +using VNLib.Utils.Memory.Diagnostics; +using VNLib.Net.Messaging.FBM; +using VNLib.Plugins; +using VNLib.Plugins.Extensions.Loading; + +using VNLib.Data.Caching.Extensions.Clustering; +using VNLib.Data.Caching.ObjectCache.Server.Cache; +using VNLib.Data.Caching.ObjectCache.Server.Clustering; + +namespace VNLib.Data.Caching.ObjectCache.Server +{ + [ConfigurationName("cache")] + internal sealed class ObjectCacheSystemState(PluginBase plugin, IConfigScope config) : IDisposable + { + const string LISTENER_LOG_SCOPE = "CacheListener"; + + public BlobCacheListener Listener { get; private set; } = null!; + + public ICacheStore InternalStore { get; private set; } = null!; + + /// + /// Used for miscellaneous shared memory allocations (like the cache listener) + /// + public IUnmangedHeap SharedCacheHeap { get; private set; } = null!; + + /// + /// The plugin-wide, shared node configuration + /// + public NodeConfig Configuration { get; } = plugin.GetOrCreateSingleton(); + + /// + /// The peer discovery manager + /// + public PeerDiscoveryManager PeerDiscovery { get; private set; } = null!; + + /// + /// System wide peer monitor + /// + public CachePeerMonitor PeerMonitor { get; } = new(); + + public CacheMemoryConfiguration MemoryConfiguration { get; } = config.Deserialze(); + + /// + /// The system wide peer event queue manager + /// + public PeerEventQueueManager PeerEventQueue { get; private set; } + + void IDisposable.Dispose() + { + SharedCacheHeap.Dispose(); + Listener.Dispose(); + } + + /// + /// Initializes the cache node state + /// + public void Initialize() + { + CacheMemoryConfiguration cacheConf = MemoryConfiguration; + + ArgumentOutOfRangeException.ThrowIfLessThan(cacheConf.MaxCacheEntries, 2u); + + //Suggestion + if (cacheConf.MaxCacheEntries < 200) + { + plugin.Log.Information("Suggestion: You may want a larger cache size, you have less than 200 items in cache"); + } + + LogMemConfiguration(); + + //If the plugin is in debug mode enable heap tracking + SharedCacheHeap = plugin.IsDebug() ? + new TrackedHeapWrapper(MemoryUtil.InitializeNewHeapForProcess(), true) + : MemoryUtil.InitializeNewHeapForProcess(); + + ConfigurePeerDiscovery(); + + ConfigureCacheListener(); + + PeerEventQueue = new(plugin, Configuration); + } + + private void ConfigurePeerDiscovery() + { + //Get the known peers array from config, its allowed to be null for master nodes + IConfigScope? config = plugin.TryGetConfig("known_peers"); + string[] kownPeers = config?.Deserialze() ?? []; + + ILogProvider discLogger = plugin.Log.CreateScope(PeerDiscoveryManager.LOG_SCOPE_NAME); + + Configuration.Config.WithInitialPeers(kownPeers.Select(static s => new Uri(s))) + .WithErrorHandler(new ErrorHandler(discLogger)); + + discLogger.Information("Inital peer nodes: {nodes}", kownPeers); + + PeerDiscovery = new PeerDiscoveryManager( + Configuration, + discLogger, + plugin.IsDebug(), + kownPeers.Length > 0 + ); + + //Discovery manager needs to be scheduled for background work to run the discovery loop + _ = plugin.ObserveWork(PeerDiscovery, 10); + } + + private void ConfigureCacheListener() + { + /* + * Allow loading external managed dll for a bucket-local memory manager + */ + ICacheMemoryManagerFactory manager; + + if (string.IsNullOrWhiteSpace(MemoryConfiguration.ExternLibPath)) + { + //Get the memory manager + manager = plugin.GetOrCreateSingleton(); + } + else + { + manager = plugin.CreateServiceExternal(MemoryConfiguration.ExternLibPath); + } + + //Endpoint only allows for a single reader + Listener = new( + plugin.LoadMemoryCacheSystem(config, manager, MemoryConfiguration), + plugin.GetOrCreateSingleton(), + plugin.Log.CreateScope(LISTENER_LOG_SCOPE), + new SharedHeapFBMMemoryManager(SharedCacheHeap) + ); + + InternalStore = new CacheStore(Listener.Cache); + } + + private void LogMemConfiguration() + { + const string CacheConfigTemplate = +@" +Cache Configuration: + Max memory: {max} Mb + Buckets: {bc} + Entries per-bucket: {mc} + HeapTracking: {ht} +"; + + CacheMemoryConfiguration cacheConf = MemoryConfiguration; + + //calculate the max memory usage + ulong maxByteSize = cacheConf.MaxCacheEntries * (ulong)cacheConf.BucketCount * (ulong)cacheConf.MaxMessageSize; + + //Log the cache config + plugin.Log.Information( + CacheConfigTemplate, + maxByteSize / (1024 * 1000), + cacheConf.BucketCount, + cacheConf.MaxCacheEntries, + plugin.IsDebug() + ); + } + + private sealed class ErrorHandler(ILogProvider Logger) : ICacheDiscoveryErrorHandler + { + public void OnDiscoveryError(CacheNodeAdvertisment errorNode, Exception ex) + { + if (ex is HttpRequestException hre) + { + if (hre.InnerException is SocketException se) + { + //transport failed + Logger.Warn("Failed to connect to server {serv} because {err}", errorNode, se.Message); + } + else + { + Logger.Error("Failed to connect to node {n}\n{err}", errorNode, hre); + } + } + else + { + Logger.Error("Failed to discover nodes from nodeid {nid}, with error\n{err}", errorNode, ex); + } + } + } + } +} diff --git a/plugins/VNLib.Data.Caching.Providers.VNCache/src/Clustering/ClusterNodeIndex.cs b/plugins/VNLib.Data.Caching.Providers.VNCache/src/Clustering/ClusterNodeIndex.cs index c9cd746..e9dcbc5 100644 --- a/plugins/VNLib.Data.Caching.Providers.VNCache/src/Clustering/ClusterNodeIndex.cs +++ b/plugins/VNLib.Data.Caching.Providers.VNCache/src/Clustering/ClusterNodeIndex.cs @@ -46,7 +46,7 @@ namespace VNLib.Data.Caching.Providers.VNCache.Clustering * it in the app domain. */ - public static IClusterNodeIndex CreateIndex(CacheClientConfiguration config) + public static IClusterNodeIndex CreateIndex(VNCacheClusterManager cluster) { /* TEMPORARY: * Named semaphores are only supported on Windows, which allowed synchronized communication between @@ -75,7 +75,7 @@ namespace VNLib.Data.Caching.Providers.VNCache.Clustering if (remoteIndex == null) { //Create a new index and store it in the app domain - IClusterNodeIndex index = new LocalHandler(config); + IClusterNodeIndex index = new LocalHandler(cluster); AppDomain.CurrentDomain.SetData(APP_DOMAIN_KEY, index); return index; } @@ -92,7 +92,7 @@ namespace VNLib.Data.Caching.Providers.VNCache.Clustering } else { - return new LocalHandler(config); + return new LocalHandler(cluster); } } @@ -114,15 +114,15 @@ namespace VNLib.Data.Caching.Providers.VNCache.Clustering * Unless VNLib.Core supports a new way to safley share types across ALCs, this is my solution. */ - sealed class LocalHandler(CacheClientConfiguration Config) : IClusterNodeIndex, IIntervalScheduleable + sealed class LocalHandler(VNCacheClusterManager cluster) : IClusterNodeIndex, IIntervalScheduleable { private Task _currentUpdate = Task.CompletedTask; /// public CacheNodeAdvertisment? GetNextNode() { - //Get all nodes - CacheNodeAdvertisment[] ads = Config.NodeCollection.GetAllNodes(); + //Get all discovered nodes + CacheNodeAdvertisment[] ads = cluster.DiscoveredNodes.GetAllNodes(); //Just get a random node from the collection for now return ads.Length > 0 ? ads.SelectRandom() : null; } @@ -134,7 +134,7 @@ namespace VNLib.Data.Caching.Providers.VNCache.Clustering public Task OnIntervalAsync(ILogProvider log, CancellationToken cancellationToken) { //Run discovery operation and update the task - _currentUpdate = Config.DiscoverNodesAsync(cancellationToken); + _currentUpdate = cluster.DiscoverNodesAsync(cancellationToken); return Task.CompletedTask; } diff --git a/plugins/VNLib.Data.Caching.Providers.VNCache/src/FBMCacheClient.cs b/plugins/VNLib.Data.Caching.Providers.VNCache/src/FBMCacheClient.cs index 73783dc..a8f86f9 100644 --- a/plugins/VNLib.Data.Caching.Providers.VNCache/src/FBMCacheClient.cs +++ b/plugins/VNLib.Data.Caching.Providers.VNCache/src/FBMCacheClient.cs @@ -59,7 +59,7 @@ namespace VNLib.Data.Caching.Providers.VNCache private readonly VnCacheClientConfig _config; private readonly IClusterNodeIndex _index; - private readonly FBMClientFactory _clientFactory; + private readonly VNCacheClusterClient _cluster; private readonly TimeSpan _initNodeDelay; private bool _isConnected; @@ -83,9 +83,8 @@ namespace VNLib.Data.Caching.Providers.VNCache { ILogProvider scoped = plugin.Log.CreateScope(LOG_NAME); - //Set authenticator and error handler - _clientFactory.GetCacheConfiguration() - .WithAuthenticator(new AuthManager(plugin)) + //When in plugin context, we can use plugin local secrets and a log-based error handler + _cluster.Config.WithAuthenticator(new AuthManager(plugin)) .WithErrorHandler(new DiscoveryErrHAndler(scoped)); //Only the master index is schedulable @@ -116,17 +115,20 @@ namespace VNLib.Data.Caching.Providers.VNCache //Init the client with default settings FBMClientConfig conf = FBMDataCacheExtensions.GetDefaultConfig(BufferHeap, (int)config.MaxBlobSize, config.RequestTimeout, debugLog); - - FBMFallbackClientWsFactory wsFactory = new(); - _clientFactory = new(in conf, wsFactory); - - //Add the configuration to the client - _clientFactory.GetCacheConfiguration() + + FBMClientFactory clientFactory = new( + in conf, + new FBMFallbackClientWsFactory(), + 10 + ); + + _cluster = (new CacheClientConfiguration()) .WithTls(config.UseTls) - .WithInitialPeers(config.GetInitialNodeUris()); + .WithInitialPeers(config.GetInitialNodeUris()) + .ToClusterClient(clientFactory); //Init index - _index = ClusterNodeIndex.CreateIndex(_clientFactory.GetCacheConfiguration()); + _index = ClusterNodeIndex.CreateIndex(_cluster); } /* @@ -216,7 +218,7 @@ namespace VNLib.Data.Caching.Providers.VNCache pluginLog.Debug("Connecting to {node}", node); //Connect to the node and save new client - _client = await _clientFactory.ConnectToCacheAsync(node, exitToken); + _client = await _cluster.ConnectToCacheAsync(node, exitToken); if (pluginLog.IsEnabled(LogLevel.Debug)) { @@ -327,22 +329,11 @@ namespace VNLib.Data.Caching.Providers.VNCache /// public override object GetUnderlyingStore() => _client ?? throw new InvalidOperationException("The client is not currently connected"); - private sealed class AuthManager : ICacheAuthManager + private sealed class AuthManager(PluginBase plugin) : ICacheAuthManager { - private IAsyncLazy _sigKey; - private IAsyncLazy _verKey; - - public AuthManager(PluginBase plugin) - { - //Lazy load keys - - //Get the signing key - _sigKey = plugin.GetSecretAsync("client_private_key").ToLazy(static r => r.GetJsonWebKey()); - - //Lazy load cache public key - _verKey = plugin.GetSecretAsync("cache_public_key").ToLazy(static r => r.GetJsonWebKey()); - } + private IAsyncLazy _sigKey = plugin.GetSecretAsync("client_private_key").ToLazy(static r => r.GetJsonWebKey()); + private IAsyncLazy _verKey = plugin.GetSecretAsync("cache_public_key").ToLazy(static r => r.GetJsonWebKey()); public async Task AwaitLazyKeyLoad() { diff --git a/plugins/VNLib.Data.Caching.Providers.VNCache/src/VnCacheClientConfig.cs b/plugins/VNLib.Data.Caching.Providers.VNCache/src/VnCacheClientConfig.cs index 383c979..0d6cd34 100644 --- a/plugins/VNLib.Data.Caching.Providers.VNCache/src/VnCacheClientConfig.cs +++ b/plugins/VNLib.Data.Caching.Providers.VNCache/src/VnCacheClientConfig.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2023 Vaughn Nugent +* Copyright (c) 2024 Vaughn Nugent * * Library: VNLib * Package: VNLib.Data.Caching.Providers.VNCache @@ -33,6 +33,8 @@ namespace VNLib.Data.Caching.Providers.VNCache /// public class VnCacheClientConfig : VNCacheConfig { + const string DefaultWellKnownEndpoint = "/.well-known/vncache"; + /// /// The broker server address /// @@ -88,7 +90,13 @@ namespace VNLib.Data.Caching.Providers.VNCache public Uri[] GetInitialNodeUris() { _ = InitialNodes ?? throw new InvalidOperationException("Initial nodes have not been set"); - return InitialNodes.Select(static x => new Uri(x, UriKind.Absolute)).ToArray(); + return InitialNodes.Select(static x => + { + //Append a default well known endpoint if the path is just a root + Uri ur = new (x, UriKind.Absolute); + return ur.LocalPath == "/" ? new Uri(ur, DefaultWellKnownEndpoint) : ur; + }) + .ToArray(); } /// -- cgit