diff options
28 files changed, 965 insertions, 858 deletions
diff --git a/lib/VNLib.Data.Caching.Extensions/src/ApiModel/CacheSiteAdapter.cs b/lib/VNLib.Data.Caching.Extensions/src/ApiModel/CacheSiteAdapter.cs index 99acfd5..b6add7d 100644 --- a/lib/VNLib.Data.Caching.Extensions/src/ApiModel/CacheSiteAdapter.cs +++ b/lib/VNLib.Data.Caching.Extensions/src/ApiModel/CacheSiteAdapter.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2023 Vaughn Nugent +* Copyright (c) 2024 Vaughn Nugent * * Library: VNLib * Package: VNLib.Data.Caching.Extensions @@ -22,6 +22,7 @@ * along with this program. If not, see https://www.gnu.org/licenses/. */ +using System; using System.Net; using System.Text; using System.Threading; @@ -39,6 +40,22 @@ namespace VNLib.Data.Caching.Extensions.ApiModel /// </summary> internal sealed class CacheSiteAdapter : RestSiteAdapterBase { + /* + * Lazy to defer errors for debuggong + */ + private static readonly Lazy<CacheSiteAdapter> _lazy = new(() => ConfigureAdapter(2)); + + internal static CacheSiteAdapter Instance => _lazy.Value; + + private static CacheSiteAdapter ConfigureAdapter(int maxClients) + { + CacheSiteAdapter adapter = new(maxClients); + //Configure the site endpoints + adapter.BuildEndpoints(ServiceEndpoints.Definition); + return adapter; + } + + protected override RestClientPool Pool { get; } public CacheSiteAdapter(int maxClients) diff --git a/lib/VNLib.Data.Caching.Extensions/src/Clustering/CacheClientConfiguration.cs b/lib/VNLib.Data.Caching.Extensions/src/Clustering/CacheClientConfiguration.cs index 5a15d33..263f41a 100644 --- a/lib/VNLib.Data.Caching.Extensions/src/Clustering/CacheClientConfiguration.cs +++ b/lib/VNLib.Data.Caching.Extensions/src/Clustering/CacheClientConfiguration.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2023 Vaughn Nugent +* Copyright (c) 2024 Vaughn Nugent * * Library: VNLib * Package: VNLib.Data.Caching.Extensions @@ -37,14 +37,9 @@ namespace VNLib.Data.Caching.Extensions.Clustering public class CacheClientConfiguration { /// <summary> - /// Stores available cache servers to be used for discovery, and connections - /// </summary> - public INodeDiscoveryCollection NodeCollection { get; } = new NodeDiscoveryCollection(); - - /// <summary> /// The authentication manager to use for signing and verifying messages to and from the cache servers /// </summary> - public ICacheAuthManager AuthManager { get; private set; } + public ICacheAuthManager AuthManager { get; private set; } = null!; /// <summary> /// The error handler to use for handling errors that occur during the discovery process @@ -89,7 +84,7 @@ namespace VNLib.Data.Caching.Extensions.Clustering public CacheClientConfiguration WithInitialPeers(IEnumerable<Uri> peers) { //Check null - _ = peers ?? throw new ArgumentNullException(nameof(peers)); + ArgumentNullException.ThrowIfNull(peers); //Store peer array WellKnownNodes = peers.ToArray(); diff --git a/lib/VNLib.Data.Caching.Extensions/src/Clustering/CacheNodeConfiguration.cs b/lib/VNLib.Data.Caching.Extensions/src/Clustering/CacheNodeConfiguration.cs index 6b7ab48..c21ed05 100644 --- a/lib/VNLib.Data.Caching.Extensions/src/Clustering/CacheNodeConfiguration.cs +++ b/lib/VNLib.Data.Caching.Extensions/src/Clustering/CacheNodeConfiguration.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2023 Vaughn Nugent +* Copyright (c) 2024 Vaughn Nugent * * Library: VNLib * Package: VNLib.Data.Caching.Extensions @@ -23,6 +23,7 @@ */ using System; +using System.Runtime.CompilerServices; namespace VNLib.Data.Caching.Extensions.Clustering { @@ -85,8 +86,14 @@ namespace VNLib.Data.Caching.Extensions.Clustering return this; } + internal StrongBox<string> NodeIdRef { get; } = new(string.Empty); + ///<inheritdoc/> - public string NodeId { get; private set; } = null!; + public string NodeId + { + get => NodeIdRef.Value!; + private set => NodeIdRef.Value = value; + } /// <summary> /// Specifies the current server's cluster node id. If this @@ -99,10 +106,6 @@ namespace VNLib.Data.Caching.Extensions.Clustering public CacheNodeConfiguration WithNodeId(string nodeId) { NodeId = nodeId ?? throw new ArgumentNullException(nameof(nodeId)); - - //Update the node id in the node collection - (NodeCollection as NodeDiscoveryCollection)!.SetSelfId(nodeId); - return this; } diff --git a/lib/VNLib.Data.Caching.Extensions/src/Clustering/INodeDiscoveryEnumerator.cs b/lib/VNLib.Data.Caching.Extensions/src/Clustering/INodeDiscoveryEnumerator.cs index 677088a..e57e69b 100644 --- a/lib/VNLib.Data.Caching.Extensions/src/Clustering/INodeDiscoveryEnumerator.cs +++ b/lib/VNLib.Data.Caching.Extensions/src/Clustering/INodeDiscoveryEnumerator.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2023 Vaughn Nugent +* Copyright (c) 2024 Vaughn Nugent * * Library: VNLib * Package: VNLib.Data.Caching.Extensions @@ -28,7 +28,9 @@ using System.Collections.Generic; namespace VNLib.Data.Caching.Extensions.Clustering { /// <summary> - /// A custom enumerator for the node discovery process + /// A custom enumerator for the node discovery process. Simplifies the recursive processes + /// of discovering nodes in a cluster to a simple enumeration process. It allows for real-time + /// updates to the collection of discovered nodes as a union operation. /// </summary> public interface INodeDiscoveryEnumerator : IEnumerator<CacheNodeAdvertisment> { diff --git a/lib/VNLib.Data.Caching.Extensions/src/Clustering/NodeDiscoveryCollection.cs b/lib/VNLib.Data.Caching.Extensions/src/Clustering/NodeDiscoveryCollection.cs index b0e53e1..16b96a3 100644 --- a/lib/VNLib.Data.Caching.Extensions/src/Clustering/NodeDiscoveryCollection.cs +++ b/lib/VNLib.Data.Caching.Extensions/src/Clustering/NodeDiscoveryCollection.cs @@ -1,12 +1,12 @@ /* -* Copyright (c) 2023 Vaughn Nugent +* Copyright (c) 2024 Vaughn Nugent * * Library: VNLib * Package: VNLib.Data.Caching.Extensions * File: NodeDiscoveryCollection.cs * -* NodeDiscoveryCollection.cs is part of VNLib.Data.Caching.Extensions which is part of the larger -* VNLib collection of libraries and utilities. +* NodeDiscoveryCollection.cs is part of VNLib.Data.Caching.Extensions which is part +* of the larger VNLib collection of libraries and utilities. * * VNLib.Data.Caching.Extensions is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License as @@ -26,24 +26,18 @@ using System; using System.Linq; using System.Collections; using System.Collections.Generic; +using System.Runtime.CompilerServices; + +using VNLib.Utils.Extensions; namespace VNLib.Data.Caching.Extensions.Clustering { /// <summary> /// Represents a collection of available cache nodes from a discovery process /// </summary> - public sealed class NodeDiscoveryCollection : INodeDiscoveryCollection + public sealed class NodeDiscoveryCollection(StrongBox<string?>? selfId) : INodeDiscoveryCollection { - private string? _selfId; - private LinkedList<CacheNodeAdvertisment> _peers; - - /// <summary> - /// Initializes a new empty <see cref="NodeDiscoveryCollection"/> - /// </summary> - public NodeDiscoveryCollection() - { - _peers = new(); - } + private LinkedList<CacheNodeAdvertisment> _peers = new(); /// <summary> /// Manually adds nodes to the collection that were not discovered through the discovery process @@ -62,39 +56,34 @@ namespace VNLib.Data.Caching.Extensions.Clustering } /// <summary> - /// Sets the id of the current node, so it can be excluded from discovery + /// Removes a vector of nodes from the internal collection /// </summary> - /// <param name="selfId">The id of the current node to exclude</param> - public void SetSelfId(string? selfId) => _selfId = selfId; + /// <param name="nodes">The vector containg nodes to remove from the collection</param> + public void RemoveManualNodes(IEnumerable<CacheNodeAdvertisment> nodes) => nodes.ForEach(n => _peers.Remove(n)); ///<inheritdoc/> - public INodeDiscoveryEnumerator BeginDiscovery() - { - return new NodeEnumerator(new(), _selfId); - } + public INodeDiscoveryEnumerator BeginDiscovery() => new NodeEnumerator(new(), selfId?.Value); ///<inheritdoc/> public INodeDiscoveryEnumerator BeginDiscovery(IEnumerable<CacheNodeAdvertisment> initialPeers) { + ArgumentNullException.ThrowIfNull(initialPeers); + //Init new enumerator with the initial peers - return new NodeEnumerator(new(initialPeers), _selfId); + return new NodeEnumerator(new(initialPeers), selfId?.Value); } ///<inheritdoc/> public void CompleteDiscovery(INodeDiscoveryEnumerator enumerator) { - _ = enumerator ?? throw new ArgumentNullException(nameof(enumerator)); + ArgumentNullException.ThrowIfNull(enumerator); //Capture all nodes from the enumerator and store them as our current peers _peers = (enumerator as NodeEnumerator)!.Peers; } ///<inheritdoc/> - public CacheNodeAdvertisment[] GetAllNodes() - { - //Capture all current peers - return _peers.ToArray(); - } + public CacheNodeAdvertisment[] GetAllNodes() => _peers.ToArray(); private sealed record class NodeEnumerator(LinkedList<CacheNodeAdvertisment> Peers, string? SelfNodeId) : INodeDiscoveryEnumerator { diff --git a/lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs b/lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs index bd86461..47aadd9 100644 --- a/lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs +++ b/lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs @@ -31,7 +31,6 @@ using System.Threading; using System.Threading.Tasks; using System.Collections.Generic; using System.Security.Cryptography; -using System.Runtime.CompilerServices; using RestSharp; @@ -74,22 +73,7 @@ namespace VNLib.Data.Caching.Extensions /// The advertisment header for cache node discovery /// </summary> public const string X_NODE_DISCOVERY_HEADER = "X-Cache-Node-Discovery"; - - /* - * Lazy to defer errors for debuggong - */ - private static readonly Lazy<CacheSiteAdapter> SiteAdapter = new(() => ConfigureAdapter(2)); - - private static CacheSiteAdapter ConfigureAdapter(int maxClients) - { - CacheSiteAdapter adapter = new(maxClients); - //Configure the site endpoints - adapter.BuildEndpoints(ServiceEndpoints.Definition); - return adapter; - } - - private static readonly ConditionalWeakTable<FBMClientFactory, CacheClientConfiguration> ClientCacheConfig = new(); - + /// <summary> /// Gets a <see cref="FBMClientConfig"/> preconfigured object caching /// protocl @@ -101,7 +85,7 @@ namespace VNLib.Data.Caching.Extensions /// <returns>A preconfigured <see cref="FBMClientConfig"/> for object caching</returns> public static FBMClientConfig GetDefaultConfig(IUnmangedHeap heap, int maxMessageSize, TimeSpan timeout = default, ILogProvider? debugLog = null) { - return GetDefaultConfig(new FallbackFBMMemoryManager(heap), maxMessageSize, timeout, debugLog); + return GetDefaultConfig(new SharedHeapFBMMemoryManager(heap), maxMessageSize, timeout, debugLog); } /// <summary> @@ -153,236 +137,22 @@ namespace VNLib.Data.Caching.Extensions { client.Config.DebugLog?.Debug("{debug}: {data}", "[CACHE]", message); } - - /// <summary> - /// Discovers ALL possible cache nodes itteritivley, first by collecting the configuration - /// from the initial peers. - /// This will make connections to all discoverable servers - /// </summary> - /// <param name="config"></param> - /// <param name="cancellation">A token to cancel the operation</param> - /// <returns></returns> - /// <exception cref="ArgumentException"></exception> - /// <exception cref="CacheDiscoveryFailureException"></exception> - public static async Task DiscoverNodesAsync(this CacheClientConfiguration config, CancellationToken cancellation) - { - //Make sure at least one node defined - if(config?.WellKnownNodes == null || config.WellKnownNodes.Length == 0) - { - throw new ArgumentException("There must be at least one cache node defined in the client configuration"); - } - - //Get the initial advertisments that arent null - CacheNodeAdvertisment[] initialPeers = await ResolveWellKnownAsync(config, cancellation); - - if (initialPeers.Length == 0) - { - throw new CacheDiscoveryFailureException("There must be at least one available cache node to continue discovery"); - } - - await DiscoverNodesAsync(config, initialPeers, cancellation); - } /// <summary> - /// Resolves the initial well-known cache nodes into their advertisments + /// Gets the discovery manager for the current client configuration. Just a + /// convience method. /// </summary> - /// <param name="config"></param> - /// <param name="cancellation">A token to cancel the operation</param> - /// <returns>An array of resolved nodes</returns> - public static async Task<CacheNodeAdvertisment[]> ResolveWellKnownAsync(this CacheClientConfiguration config, CancellationToken cancellation) - { - _ = config?.WellKnownNodes ?? throw new ArgumentNullException(nameof(config)); - - Task<CacheNodeAdvertisment?>[] initialAdds = new Task<CacheNodeAdvertisment?>[config.WellKnownNodes.Length]; - - //Discover initial advertisments from well-known addresses - for (int i = 0; i < config.WellKnownNodes.Length; i++) - { - initialAdds[i] = DiscoverNodeConfigAsync(config.WellKnownNodes[i], config, cancellation); - } - - //Wait for all initial adds to complete - await Task.WhenAll(initialAdds); - - //Get the initial advertisments that arent null - return initialAdds.Select(static x => x.Result!).Where(static s => s != null).ToArray(); - } + /// <param name="conf"></param> + /// <returns>The new <see cref="VNCacheClusterManager"/> instance around your config</returns> + public static VNCacheClusterManager GetDiscoveryManager(this CacheClientConfiguration conf) => new(conf); /// <summary> - /// Discovers ALL possible cache nodes itteritivley from the current collection of initial peers. - /// This will make connections to all discoverable servers and update the client configuration, with all - /// discovered peers + /// Converts the cache client configuration to a cluster client /// </summary> /// <param name="config"></param> - /// <param name="initialPeers">Accepts an array of initial peers to override the endpoint discovery process</param> - /// <param name="cancellation">A token to cancel the operation</param> - /// <returns>A task that completes when all nodes have been discovered</returns> - /// <exception cref="ArgumentException"></exception> - /// <exception cref="CacheDiscoveryFailureException"></exception> - public static async Task DiscoverNodesAsync(this CacheClientConfiguration config, CacheNodeAdvertisment[] initialPeers, CancellationToken cancellation) - { - //Make sure at least one node defined - if (initialPeers == null || initialPeers.Length == 0) - { - throw new ArgumentException("There must be at least one initial peer"); - } - - //Get the discovery enumerator with the initial peers - INodeDiscoveryEnumerator enumerator = config.NodeCollection.BeginDiscovery(initialPeers); - - //Start the discovery process - await DiscoverNodesAsync(enumerator, config, config.ErrorHandler, cancellation); - - //Commit nodes - config.NodeCollection.CompleteDiscovery(enumerator); - } - - private static async Task DiscoverNodesAsync( - INodeDiscoveryEnumerator enumerator, - CacheClientConfiguration config, - ICacheDiscoveryErrorHandler? errHandler, - CancellationToken cancellation - ) - { - //Loop through servers - while (enumerator.MoveNext()) - { - //Make sure the node has a discovery endpoint - if (enumerator.Current.DiscoveryEndpoint == null) - { - //Skip this node - continue; - } - - /* - * We are allowed to save nodes that do not have a discovery endpoint, but we cannot - * discover nodes from them we can only use them as cache - */ - - //add a random delay to avoid spamming servers - await Task.Delay((int)Random.Shared.NextInt64(100, 500), cancellation); - - try - { - //Discover nodes from the current node - CacheNodeAdvertisment[]? nodes = await GetCacheNodesAsync(enumerator.Current, config, cancellation); - - if (nodes != null) - { - //Add nodes to the collection - enumerator.OnPeerDiscoveryComplete(nodes); - } - } - //Catch exceptions when an error handler is defined - catch(Exception ex) when (errHandler != null) - { - //Handle the error - errHandler.OnDiscoveryError(enumerator.Current, ex); - } - catch(Exception ex) - { - throw new CacheDiscoveryFailureException($"Failed to discovery peer node {enumerator.Current?.NodeId}, cannot continue", ex); - } - } - } - - private static async Task<CacheNodeAdvertisment?> DiscoverNodeConfigAsync(Uri serverUri, CacheClientConfiguration config, CancellationToken cancellation) - { - try - { - GetConfigRequest req = new (serverUri, config); - - //Site adapter verifies response messages so we dont need to check on the response - byte[] data = await SiteAdapter.Value.ExecuteAsync(req, cancellation).AsBytes() - ?? throw new CacheDiscoveryFailureException($"No data returned from desired cache node"); - - //Response is jwt - using JsonWebToken responseJwt = JsonWebToken.ParseRaw(data); - - //The entire payload is just the single serialzed advertisment - using JsonDocument doc = responseJwt.GetPayload(); - - return doc.RootElement.GetProperty("sub").Deserialize<CacheNodeAdvertisment>(); - } - //Bypass cdfe when error handler is null - catch(CacheDiscoveryFailureException) when(config.ErrorHandler == null) - { - throw; - } - //Catch exceptions when an error handler is defined - catch (Exception ex) when (config.ErrorHandler != null) - { - //Handle the error - config.ErrorHandler.OnDiscoveryError(null!, ex); - return null; - } - catch (Exception ex) - { - throw new CacheDiscoveryFailureException("Failed to discover node configuration", ex); - } - } - - /// <summary> - /// Contacts the given server's discovery endpoint to discover a list of available - /// servers we can connect to - /// </summary> - /// <param name="advert">An advertisment of a server to discover other nodes from</param> - /// <param name="cancellationToken">A token to cancel the operationS</param> - /// <param name="config">The cache configuration object</param> - /// <returns>The list of active servers</returns> - /// <exception cref="SecurityException"></exception> - /// <exception cref="ArgumentException"></exception> - /// <exception cref="ArgumentNullException"></exception> - public static async Task<CacheNodeAdvertisment[]?> GetCacheNodesAsync(CacheNodeAdvertisment advert, CacheClientConfiguration config, CancellationToken cancellationToken = default) - { - _ = advert ?? throw new ArgumentNullException(nameof(advert)); - _ = config ?? throw new ArgumentNullException(nameof(config)); - _ = advert.DiscoveryEndpoint ?? throw new ArgumentException("Advertisment does not expose an advertisment endpoint"); - - DiscoveryRequest req = new (advert.DiscoveryEndpoint, config); - - //Site adapter verifies response messages so we dont need to check on the response - byte[] data = await SiteAdapter.Value.ExecuteAsync(req, cancellationToken).AsBytes() - ?? throw new InvalidOperationException($"No data returned from node {advert.NodeId}"); - - //Response is jwt - using JsonWebToken responseJwt = JsonWebToken.ParseRaw(data); - - using JsonDocument doc = responseJwt.GetPayload(); - return doc.RootElement.GetProperty("peers").Deserialize<CacheNodeAdvertisment[]>(); - } - - /// <summary> - /// Allows for configuration of an <see cref="FBMClient"/> - /// for a connection to a cache server - /// </summary> - /// <param name="client"></param> - /// <returns>A fluent api configuration builder for the current client</returns> - public static CacheClientConfiguration GetCacheConfiguration(this FBMClientFactory client) => ClientCacheConfig.GetOrCreateValue(client); - - /// <summary> - /// Explicitly set the client cache configuration for the current client - /// </summary> - /// <param name="client"></param> - /// <param name="config">The cache node configuration</param> - /// <returns>The config instance</returns> - public static CacheClientConfiguration SetCacheConfiguration(this FBMClientFactory client, CacheClientConfiguration config) - { - ClientCacheConfig.AddOrUpdate(client, config); - return config; - } - - /// <summary> - /// Explicitly set the cache node configuration for the current client - /// </summary> - /// <param name="client"></param> - /// <param name="nodeConfig">The cache node configuration</param> - /// <returns>The config instance</returns> - public static CacheNodeConfiguration SetCacheConfiguration(this FBMClientFactory client, CacheNodeConfiguration nodeConfig) - { - ClientCacheConfig.AddOrUpdate(client, nodeConfig); - return nodeConfig; - } + /// <param name="factory">The FBM client factory instance to use</param> + /// <returns>The new cluster client instance</returns> + public static VNCacheClusterClient ToClusterClient(this CacheClientConfiguration config, FBMClientFactory factory) => new(config, factory); /// <summary> /// Waits for the client to disconnect from the server while observing @@ -396,6 +166,8 @@ namespace VNLib.Data.Caching.Extensions /// <returns>A task that complets when the connecion has been closed successfully</returns> public static async Task WaitForExitAsync(this FBMClient client, CancellationToken token = default) { + ArgumentNullException.ThrowIfNull(client); + client.LogDebug("Waiting for cache client to exit"); //Get task for cancellation @@ -420,88 +192,6 @@ namespace VNLib.Data.Caching.Extensions } /// <summary> - /// Discovers all available nodes for the current client config - /// </summary> - /// <param name="client"></param> - /// <param name="cancellation">A token to cancel the operation</param> - /// <returns>A task that completes when all nodes have been discovered</returns> - public static Task DiscoverAvailableNodesAsync(this FBMClientFactory client, CancellationToken cancellation = default) - { - //Get stored config - CacheClientConfiguration conf = ClientCacheConfig.GetOrCreateValue(client); - - //Discover all nodes - return conf.DiscoverNodesAsync(cancellation); - } - - /// <summary> - /// Connects to a random server from the servers discovered during a cache server discovery - /// </summary> - /// <param name="client"></param> - /// <param name="cancellation">A token to cancel the operation</param> - /// <returns>The server that the connection was made with</returns> - /// <exception cref="FBMException"></exception> - /// <exception cref="FBMServerNegiationException"></exception> - /// <exception cref="ArgumentException"></exception> - /// <exception cref="ArgumentNullException"></exception> - /// <exception cref="SecurityException"></exception> - /// <exception cref="ObjectDisposedException"></exception> - public static async Task<CacheNodeAdvertisment> ConnectToRandomCacheAsync(this FBMClientFactory client, CancellationToken cancellation = default) - { - //Get stored config - CacheClientConfiguration conf = ClientCacheConfig.GetOrCreateValue(client); - - //Get all available nodes, or at least the initial peers - CacheNodeAdvertisment[]? adverts = conf.NodeCollection.GetAllNodes() ?? throw new ArgumentException("No cache nodes discovered, cannot connect"); - - //Select random node from all available nodes - CacheNodeAdvertisment randomServer = adverts.SelectRandom(); - - //Connect to the random server - await ConnectToCacheAsync(client, randomServer, cancellation); - - //Return the random server we connected to - return randomServer; - } - - /// <summary> - /// Connects to the specified server on the configured cache client - /// </summary> - /// <param name="factory"></param> - /// <param name="server">The server to connect to</param> - /// <param name="token">A token to cancel the operation</param> - /// <returns>A task that resolves when the client is connected to the cache server</returns> - /// <exception cref="FBMException"></exception> - /// <exception cref="FBMServerNegiationException"></exception> - /// <exception cref="ArgumentException"></exception> - /// <exception cref="ArgumentNullException"></exception> - /// <exception cref="SecurityException"></exception> - /// <exception cref="ObjectDisposedException"></exception> - public static async Task<FBMClient> ConnectToCacheAsync(this FBMClientFactory factory, CacheNodeAdvertisment server, CancellationToken token = default) - { - _ = factory ?? throw new ArgumentNullException(nameof(factory)); - _ = server ?? throw new ArgumentNullException(nameof(server)); - - //Get stored config - CacheClientConfiguration conf = ClientCacheConfig.GetOrCreateValue(factory); - - //Create new client - FBMClient client = factory.CreateClient(); - - try - { - //Connect to server (no server id because client not replication server) - await ConnectToCacheAsync(client, conf, server, token); - return client; - } - catch - { - client.Dispose(); - throw; - } - } - - /// <summary> /// Connects to the specified server on the configured cache client /// </summary> /// <param name="client"></param> @@ -517,9 +207,9 @@ namespace VNLib.Data.Caching.Extensions /// <exception cref="ObjectDisposedException"></exception> public static Task ConnectToCacheAsync(this FBMClient client, CacheNodeAdvertisment server, CacheClientConfiguration explicitConfig, CancellationToken token = default) { - _ = client ?? throw new ArgumentNullException(nameof(client)); - _ = server ?? throw new ArgumentNullException(nameof(server)); - + ArgumentNullException.ThrowIfNull(client); + ArgumentNullException.ThrowIfNull(server); + //Connect to server (no server id because client not replication server) return ConnectToCacheAsync(client, explicitConfig, server, token); } @@ -546,7 +236,7 @@ namespace VNLib.Data.Caching.Extensions NegotationRequest req = new(server.ConnectEndpoint, config); //Exec negotiation - RestResponse response = await SiteAdapter.Value.ExecuteAsync(req, token); + RestResponse response = await CacheSiteAdapter.Instance.ExecuteAsync(req, token); /* * JWT will already be veified by the endpoint adapter, so we @@ -582,6 +272,12 @@ namespace VNLib.Data.Caching.Extensions Scheme = config.UseTls ? "wss://" : "ws://" }; + //if the server is specifying https urls, then attempt to upgrade to wss + if (server.ConnectEndpoint.Scheme == Uri.UriSchemeHttps) + { + uriBuilder.Scheme = "wss://"; + } + //Connect async await client.ConnectAsync(uriBuilder.Uri, token); } @@ -658,7 +354,7 @@ namespace VNLib.Data.Caching.Extensions /// <exception cref="CryptographicException"></exception> public static bool VerifyUpgradeToken(this ICacheAuthManager man, string signature, string token, bool isPeer) { - _ = man ?? throw new ArgumentNullException(nameof(man)); + ArgumentNullException.ThrowIfNull(man); //get the hash of the token byte[] hash = ManagedHash.ComputeHash(token, HashAlg.SHA256); @@ -704,8 +400,15 @@ namespace VNLib.Data.Caching.Extensions /// <param name="message">The advertisment message to verify</param> /// <returns>The advertisment message if successfully verified, or null otherwise</returns> /// <exception cref="FormatException"></exception> - public static CacheNodeAdvertisment? VerifyPeerAdvertisment(this ICacheAuthManager config, string message) + public static CacheNodeAdvertisment? VerifyPeerAdvertisment(this ICacheAuthManager config, string? message) { + ArgumentNullException.ThrowIfNull(config); + + if (string.IsNullOrWhiteSpace(message)) + { + return null; + } + using JsonWebToken jwt = JsonWebToken.Parse(message); //Verify the signature diff --git a/lib/VNLib.Data.Caching.Extensions/src/VNCacheClusterClient.cs b/lib/VNLib.Data.Caching.Extensions/src/VNCacheClusterClient.cs new file mode 100644 index 0000000..050e1a3 --- /dev/null +++ b/lib/VNLib.Data.Caching.Extensions/src/VNCacheClusterClient.cs @@ -0,0 +1,78 @@ +/* +* Copyright (c) 2024 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Data.Caching.Extensions +* File: VNCacheClusterClient.cs +* +* VNCacheClusterClient.cs is part of VNLib.Data.Caching.Extensions which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Data.Caching.Extensions is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Data.Caching.Extensions is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Security; +using System.Threading; +using System.Threading.Tasks; + +using VNLib.Net.Messaging.FBM; +using VNLib.Net.Messaging.FBM.Client; +using VNLib.Data.Caching.Extensions.Clustering; + +namespace VNLib.Data.Caching.Extensions +{ + /// <summary> + /// Manages client connections to cluster nodes with discovery from <see cref="VNCacheClusterManager"/> + /// instance. + /// </summary> + /// <param name="config">The client configuration to use when discovering or connecting to cache nodes</param> + /// <param name="factory">The fbm client factory instance</param> + public class VNCacheClusterClient(CacheClientConfiguration config, FBMClientFactory factory) + : VNCacheClusterManager(config) + { + + /// <summary> + /// Connects to the specified server on the configured cache client + /// </summary> + /// <param name="factory"></param> + /// <param name="server">The server to connect to</param> + /// <param name="token">A token to cancel the operation</param> + /// <returns>A task that resolves when the client is connected to the cache server</returns> + /// <exception cref="FBMException"></exception> + /// <exception cref="FBMServerNegiationException"></exception> + /// <exception cref="ArgumentException"></exception> + /// <exception cref="ArgumentNullException"></exception> + /// <exception cref="SecurityException"></exception> + /// <exception cref="ObjectDisposedException"></exception> + public async Task<FBMClient> ConnectToCacheAsync(CacheNodeAdvertisment server, CancellationToken token = default) + { + ArgumentNullException.ThrowIfNull(server); + + FBMClient client = factory.CreateClient(); + + try + { + //Connect to server (no server id because client not replication server) + await client.ConnectToCacheAsync(server, config, token); + return client; + } + catch + { + client.Dispose(); + throw; + } + } + } +} diff --git a/lib/VNLib.Data.Caching.Extensions/src/VNCacheClusterManager.cs b/lib/VNLib.Data.Caching.Extensions/src/VNCacheClusterManager.cs new file mode 100644 index 0000000..4a1a6bd --- /dev/null +++ b/lib/VNLib.Data.Caching.Extensions/src/VNCacheClusterManager.cs @@ -0,0 +1,300 @@ +/* +* Copyright (c) 2024 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Data.Caching.Extensions +* File: VNCacheClusterManager.cs +* +* VNCacheClusterManager.cs is part of VNLib.Data.Caching.Extensions +* which is part of the larger VNLib collection of libraries and utilities. +* +* VNLib.Data.Caching.Extensions is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Data.Caching.Extensions is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Linq; +using System.Security; +using System.Text.Json; +using System.Threading; +using System.Threading.Tasks; +using System.Collections.Generic; + +using RestSharp; + +using VNLib.Hashing.IdentityUtility; +using VNLib.Net.Rest.Client.Construction; +using VNLib.Data.Caching.Extensions.ApiModel; +using VNLib.Data.Caching.Extensions.Clustering; + +namespace VNLib.Data.Caching.Extensions +{ + + /// <summary> + /// A VNCache cluster client discovery maanger. Used to simplify the discovery + /// of cache nodes + /// </summary> + /// <param name="config">The client configuration instance</param> + public class VNCacheClusterManager(CacheClientConfiguration config) + { + /// <summary> + /// The internal collection of discovered nodes + /// </summary> + protected NodeDiscoveryCollection NodeCollection { get; } = GetNodeCollection(config); + + /// <summary> + /// Gets the collection of discovered nodes within the manager + /// </summary> + public INodeDiscoveryCollection DiscoveredNodes => NodeCollection; + + /// <summary> + /// The underlying <see cref="CacheClientConfiguration"/> instance + /// </summary> + public CacheClientConfiguration Config => config; + + /// <summary> + /// Adds an array of nodes manually to the collection of discovered cluster nodes + /// </summary> + /// <param name="nodes"></param> + public void AddManualNodes(params CacheNodeAdvertisment[] nodes) => AddManualNodes(nodes.AsEnumerable()); + + /// <summary> + /// Adds an array of nodes manually to the collection of discovered cluster nodes + /// </summary> + /// <param name="nodes"></param> + public void AddManualNodes(IEnumerable<CacheNodeAdvertisment> nodes) => NodeCollection.AddManualNodes(nodes); + + /// <summary> + /// Removes an array of nodes manually from the collection of discovered cluster nodes + /// </summary> + /// <param name="nodes"></param> + public void RemoveManualNodes(params CacheNodeAdvertisment[] nodes) => RemoveManualNodes(nodes.AsEnumerable()); + + /// <summary> + /// Removes an array of nodes manually from the collection of discovered cluster nodes + /// </summary> + /// <param name="nodes"></param> + public void RemoveManualNodes(IEnumerable<CacheNodeAdvertisment> nodes) => NodeCollection.RemoveManualNodes(nodes); + + /// <summary> + /// Resolves the initial well-known cache nodes into their advertisments + /// </summary> + /// <param name="cancellation">A token to cancel the operation</param> + /// <returns>An array of resolved nodes</returns> + public async Task<CacheNodeAdvertisment[]> ResolveWellKnownAsync(CancellationToken cancellation) + { + Task<CacheNodeAdvertisment?>[] initialAdds = new Task<CacheNodeAdvertisment?>[config.WellKnownNodes.Length]; + + //Discover initial advertisments from well-known addresses + for (int i = 0; i < config.WellKnownNodes.Length; i++) + { + initialAdds[i] = DiscoverNodeConfigAsync(config.WellKnownNodes[i], cancellation); + } + + //Wait for all initial adds to complete + await Task.WhenAll(initialAdds); + + //Get the initial advertisments that arent null + return initialAdds.Select(static x => x.Result!).Where(static s => s != null).ToArray(); + } + + /// <summary> + /// Discovers ALL possible cache nodes itteritivley, first by collecting the configuration + /// from the initial peers. + /// This will make connections to all discoverable servers + /// </summary> + /// <param name="cancellation">A token to cancel the operation</param> + /// <returns></returns> + /// <exception cref="ArgumentException"></exception> + /// <exception cref="CacheDiscoveryFailureException"></exception> + /// <remarks> + /// This method simply combines the <see cref="ResolveWellKnownAsync"/> and <see cref="DiscoverNodesAsync"/> + /// methods into a single operation + /// </remarks> + public async Task DiscoverNodesAsync(CancellationToken cancellation) + { + //Make sure at least one node defined + if (config?.WellKnownNodes == null || config.WellKnownNodes.Length == 0) + { + throw new ArgumentException("There must be at least one cache node defined in the client configuration"); + } + + /* + * Connect to well-known nodes from the client configuration to discovery its layout. + * + */ + CacheNodeAdvertisment[] initialPeers = await ResolveWellKnownAsync(cancellation); + + if (initialPeers.Length == 0) + { + throw new CacheDiscoveryFailureException("There must be at least one available cache node to continue discovery"); + } + + await DiscoverNodesAsync(initialPeers, cancellation); + } + + /// <summary> + /// Discovers ALL possible cache nodes itteritivley from the current collection of initial peers. + /// This will make connections to all discoverable servers and update the client configuration, with all + /// discovered peers + /// </summary> + /// <param name="initialPeers">Accepts an array of initial peers to override the endpoint discovery process</param> + /// <param name="cancellation">A token to cancel the operation</param> + /// <returns>A task that completes when all nodes have been discovered</returns> + /// <exception cref="ArgumentException"></exception> + /// <exception cref="CacheDiscoveryFailureException"></exception> + public async Task DiscoverNodesAsync(CacheNodeAdvertisment[] initialPeers, CancellationToken cancellation) + { + //Make sure at least one node defined + ArgumentNullException.ThrowIfNull(initialPeers); + ArgumentOutOfRangeException.ThrowIfZero(initialPeers.Length); + + //Get the discovery enumerator with the initial peers + using INodeDiscoveryEnumerator enumerator = NodeCollection.BeginDiscovery(initialPeers); + + //Start the discovery process + await DiscoverNodesAsync(enumerator, config, config.ErrorHandler, cancellation); + + //Commit discovered nodes to stored node collection + NodeCollection.CompleteDiscovery(enumerator); + } + + private static async Task DiscoverNodesAsync( + INodeDiscoveryEnumerator enumerator, + CacheClientConfiguration config, + ICacheDiscoveryErrorHandler? errHandler, + CancellationToken cancellation + ) + { + //Loop through servers + while (enumerator.MoveNext()) + { + //Make sure the node has a discovery endpoint + if (enumerator.Current.DiscoveryEndpoint == null) + { + //Skip this node + continue; + } + + /* + * We are allowed to save nodes that do not have a discovery endpoint, but we cannot + * discover nodes from them we can only use them as cache + */ + + //add a random delay to avoid spamming servers + await Task.Delay((int)Random.Shared.NextInt64(100, 500), cancellation); + + try + { + //Discover nodes from the current node + CacheNodeAdvertisment[]? nodes = await GetCacheNodesAsync(enumerator.Current, config, cancellation); + + if (nodes != null) + { + //Add nodes to the collection + enumerator.OnPeerDiscoveryComplete(nodes); + } + } + //Catch exceptions when an error handler is defined + catch (Exception ex) when (errHandler != null) + { + //Handle the error + errHandler.OnDiscoveryError(enumerator.Current, ex); + } + catch (Exception ex) + { + throw new CacheDiscoveryFailureException($"Failed to discovery peer node {enumerator.Current?.NodeId}, cannot continue", ex); + } + } + } + + /// <summary> + /// Contacts the given server's discovery endpoint to discover a list of available + /// servers we can connect to + /// </summary> + /// <param name="advert">An advertisment of a server to discover other nodes from</param> + /// <param name="cancellationToken">A token to cancel the operationS</param> + /// <param name="config">The cache configuration object</param> + /// <returns>The list of active servers</returns> + /// <exception cref="SecurityException"></exception> + /// <exception cref="ArgumentException"></exception> + /// <exception cref="ArgumentNullException"></exception> + public static async Task<CacheNodeAdvertisment[]?> GetCacheNodesAsync(CacheNodeAdvertisment advert, CacheClientConfiguration config, CancellationToken cancellationToken = default) + { + ArgumentNullException.ThrowIfNull(advert); + ArgumentNullException.ThrowIfNull(config); + ArgumentNullException.ThrowIfNull(advert.DiscoveryEndpoint, nameof(advert.DiscoveryEndpoint)); + + DiscoveryRequest req = new (advert.DiscoveryEndpoint, config); + + //Site adapter verifies response messages so we dont need to check on the response + byte[] data = await CacheSiteAdapter.Instance.ExecuteAsync(req, cancellationToken).AsBytes() + ?? throw new InvalidOperationException($"No data returned from node {advert.NodeId}"); + + //Response is jwt + using JsonWebToken responseJwt = JsonWebToken.ParseRaw(data); + + using JsonDocument doc = responseJwt.GetPayload(); + return doc.RootElement.GetProperty("peers").Deserialize<CacheNodeAdvertisment[]>(); + } + + + /* + * This method will connect to a given well-known (cache config endpoint) and discover the + * servers configuration (endpoint config) + * + * This function exists so clients only need a single endpoint to connect to, and the server + * will return it's signed configuration data (including cluster network information) + */ + private async Task<CacheNodeAdvertisment?> DiscoverNodeConfigAsync(Uri serverUri, CancellationToken cancellation) + { + try + { + GetConfigRequest req = new (serverUri, config); + + //Site adapter verifies response messages so we dont need to check on the response + byte[] data = await CacheSiteAdapter.Instance.ExecuteAsync(req, cancellation).AsBytes() + ?? throw new CacheDiscoveryFailureException($"No data returned from desired cache node"); + + //Response is jwt + using JsonWebToken responseJwt = JsonWebToken.ParseRaw(data); + + //The entire payload is just the single serialzed advertisment + using JsonDocument doc = responseJwt.GetPayload(); + + return doc.RootElement.GetProperty("sub").Deserialize<CacheNodeAdvertisment>(); + } + //Bypass cdfe when error handler is null (avoid nesting)` + catch (CacheDiscoveryFailureException) when (config.ErrorHandler == null) + { + throw; + } + //Catch exceptions when an error handler is defined + catch (Exception ex) when (config.ErrorHandler != null) + { + //Handle the error + config.ErrorHandler.OnDiscoveryError(null!, ex); + return null; + } + catch (Exception ex) + { + throw new CacheDiscoveryFailureException("Failed to discover node configuration", ex); + } + } + + private static NodeDiscoveryCollection GetNodeCollection(CacheClientConfiguration config) + { + return config is CacheNodeConfiguration cnc ? new (cnc.NodeIdRef!) : new (null); + } + } +} diff --git a/lib/VNLib.Data.Caching/src/ClientExtensions.cs b/lib/VNLib.Data.Caching/src/ClientExtensions.cs index f0eee06..e0aa744 100644 --- a/lib/VNLib.Data.Caching/src/ClientExtensions.cs +++ b/lib/VNLib.Data.Caching/src/ClientExtensions.cs @@ -451,7 +451,7 @@ namespace VNLib.Data.Caching /// <param name="client"></param> /// <param name="change">The instance to store change event data to</param> /// <param name="cancellationToken">A token to cancel the deuque operation</param> - /// <returns>A <see cref="WaitForChangeResult"/> that contains information about the modified element</returns> + /// <returns>A task that completes when a change has occured</returns> /// <exception cref="InvalidResponseException"></exception> /// <exception cref="InvalidOperationException"></exception> public static async Task WaitForChangeAsync(this FBMClient client, WaitForChangeResult change, CancellationToken cancellationToken = default) diff --git a/plugins/ObjectCacheServer/src/BucketLocalManagerFactory.cs b/plugins/ObjectCacheServer/src/BucketLocalManagerFactory.cs index e7fa3e1..944aa4b 100644 --- a/plugins/ObjectCacheServer/src/BucketLocalManagerFactory.cs +++ b/plugins/ObjectCacheServer/src/BucketLocalManagerFactory.cs @@ -33,11 +33,10 @@ using System.Runtime.CompilerServices; using VNLib.Plugins; using VNLib.Utils; using VNLib.Utils.Memory; -using VNLib.Utils.Extensions; -using VNLib.Plugins.Extensions.Loading; using VNLib.Utils.Memory.Diagnostics; using VNLib.Utils.Logging; - +using VNLib.Utils.Extensions; +using VNLib.Plugins.Extensions.Loading; /* * How bucket local memory works: * diff --git a/plugins/ObjectCacheServer/src/Cache/CacheListenerPubQueue.cs b/plugins/ObjectCacheServer/src/Cache/CacheListenerPubQueue.cs index 6942828..16fda39 100644 --- a/plugins/ObjectCacheServer/src/Cache/CacheListenerPubQueue.cs +++ b/plugins/ObjectCacheServer/src/Cache/CacheListenerPubQueue.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2023 Vaughn Nugent +* Copyright (c) 2024 Vaughn Nugent * * Library: VNLib * Package: ObjectCacheServer @@ -50,11 +50,11 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache private readonly AsyncQueue<ChangeEvent> _listenerQueue; private readonly ILogProvider _logProvider; - private readonly ICacheEventQueueManager _queueManager; + private readonly PeerEventQueueManager _queueManager; public CacheListenerPubQueue(PluginBase plugin) { - _queueManager = plugin.GetOrCreateSingleton<CacheEventQueueManager>(); + _queueManager = plugin.GetOrCreateSingleton<PeerEventQueueManager>(); _logProvider = plugin.Log.CreateScope(LOG_SCOPE_NAME); //Init local queue to store published events @@ -110,10 +110,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache } ///<inheritdoc/> - public bool IsEnabled([NotNullWhen(true)] IPeerEventQueue? userState) - { - return userState is IPeerEventQueue; - } + public bool IsEnabled([NotNullWhen(true)] IPeerEventQueue? userState) => userState is not null; ///<inheritdoc/> public void PublishEvent(ChangeEvent changeEvent) diff --git a/plugins/ObjectCacheServer/src/Cache/CacheConfiguration.cs b/plugins/ObjectCacheServer/src/Cache/CacheMemoryConfiguration.cs index bd15d24..c404cc5 100644 --- a/plugins/ObjectCacheServer/src/Cache/CacheConfiguration.cs +++ b/plugins/ObjectCacheServer/src/Cache/CacheMemoryConfiguration.cs @@ -1,12 +1,12 @@ /* -* Copyright (c) 2023 Vaughn Nugent +* Copyright (c) 2024 Vaughn Nugent * * Library: VNLib * Package: ObjectCacheServer -* File: CacheConfiguration.cs +* File: CacheMemoryConfiguration.cs * -* CacheConfiguration.cs is part of ObjectCacheServer which is part of the larger -* VNLib collection of libraries and utilities. +* CacheMemoryConfiguration.cs is part of ObjectCacheServer which +* is part of the larger VNLib collection of libraries and utilities. * * ObjectCacheServer is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License as @@ -26,7 +26,7 @@ using System.Text.Json.Serialization; namespace VNLib.Data.Caching.ObjectCache.Server.Cache { - internal sealed class CacheConfiguration + internal sealed class CacheMemoryConfiguration { [JsonPropertyName("buffer_recv_max")] public int MaxRecvBufferSize { get; set; } = 1000 * 1024; @@ -36,6 +36,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache [JsonPropertyName("buffer_header_max")] public int MaxHeaderBufferSize { get; set; } = 2 * 1024; + [JsonPropertyName("buffer_header_min")] public int MinHeaderBufferSize { get; set; } = 128; @@ -49,5 +50,9 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache [JsonPropertyName("buckets")] public uint BucketCount { get; set; } = 10; + + + [JsonPropertyName("memory_lib_path")] + public string? ExternLibPath { get; set; } } } diff --git a/plugins/ObjectCacheServer/src/Cache/CacheStore.cs b/plugins/ObjectCacheServer/src/Cache/CacheStore.cs index 86df849..81f4843 100644 --- a/plugins/ObjectCacheServer/src/Cache/CacheStore.cs +++ b/plugins/ObjectCacheServer/src/Cache/CacheStore.cs @@ -26,32 +26,24 @@ using System; using System.Threading; using System.Threading.Tasks; -using VNLib.Utils.Logging; -using VNLib.Net.Messaging.FBM; -using VNLib.Plugins; using VNLib.Plugins.Extensions.Loading; namespace VNLib.Data.Caching.ObjectCache.Server.Cache { + /* * Implements the blob cache store, which is an abstraction around the blob cache listener. * This allows for publishing local events (say from other nodes) to keep caches in sync. */ [ConfigurationName("cache")] - internal sealed class CacheStore(PluginBase plugin, IConfigScope config) : ICacheStore, IDisposable + internal sealed class CacheStore(IBlobCacheTable table) : ICacheStore { - /// <summary> - /// Gets the underlying cache listener - /// </summary> - public BlobCacheListener<IPeerEventQueue> Listener { get; } = InitializeCache((ObjectCacheServerEntry)plugin, config); - - ///<inheritdoc/> ValueTask ICacheStore.AddOrUpdateBlobAsync<T>(string objectId, string? alternateId, ObjectDataGet<T> bodyData, T state, CancellationToken token) { - return Listener.Cache.AddOrUpdateObjectAsync(objectId, alternateId, bodyData, state, default, token); + return table.AddOrUpdateObjectAsync(objectId, alternateId, bodyData, state, default, token); } ///<inheritdoc/> @@ -63,64 +55,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache ///<inheritdoc/> ValueTask<bool> ICacheStore.DeleteItemAsync(string id, CancellationToken token) { - return Listener.Cache.DeleteObjectAsync(id, token); - } - - private static BlobCacheListener<IPeerEventQueue> InitializeCache(ObjectCacheServerEntry plugin, IConfigScope config) - { - const string CacheConfigTemplate = -@" -Cache Configuration: - Max memory: {max} Mb - Buckets: {bc} - Entries per-bucket: {mc} -"; - - //Deserialize the cache config - CacheConfiguration cacheConf = config.Deserialze<CacheConfiguration>(); - - if (cacheConf.MaxCacheEntries < 2) - { - throw new ArgumentException("You must configure a 'max_cache' size larger than 1 item"); - } - - //Suggestion - if (cacheConf.MaxCacheEntries < 200) - { - plugin.Log.Information("Suggestion: You may want a larger cache size, you have less than 200 items in cache"); - } - - //calculate the max memory usage - ulong maxByteSize = cacheConf.MaxCacheEntries * (ulong)cacheConf.BucketCount * (ulong)cacheConf.MaxMessageSize; - - //Log the cache config - plugin.Log.Information(CacheConfigTemplate, - maxByteSize / (1024 * 1000), - cacheConf.BucketCount, - cacheConf.MaxCacheEntries - ); - - //Get the event listener - ICacheListenerEventQueue<IPeerEventQueue> queue = plugin.GetOrCreateSingleton<CacheListenerPubQueue>(); - - //Get the memory manager - ICacheMemoryManagerFactory manager = plugin.GetOrCreateSingleton<BucketLocalManagerFactory>(); - - //Load the blob cache table system - IBlobCacheTable bc = plugin.LoadMemoryCacheSystem(config, manager, cacheConf); - - FallbackFBMMemoryManager fbmMemManager = new(plugin.ListenerHeap); - - //Endpoint only allows for a single reader - return new(bc, queue, plugin.Log, fbmMemManager); - } - - /* - * Cleaned up by the plugin on exit - */ - public void Dispose() - { - Listener.Dispose(); + return table.DeleteObjectAsync(id, token); } } } diff --git a/plugins/ObjectCacheServer/src/Cache/CacheSystemUtil.cs b/plugins/ObjectCacheServer/src/Cache/CacheSystemUtil.cs index b7bf83f..8f196b0 100644 --- a/plugins/ObjectCacheServer/src/Cache/CacheSystemUtil.cs +++ b/plugins/ObjectCacheServer/src/Cache/CacheSystemUtil.cs @@ -29,6 +29,7 @@ using System.Text.Json; using VNLib.Utils.Resources; using VNLib.Plugins; using VNLib.Plugins.Extensions.Loading; +using VNLib.Utils.Extensions; namespace VNLib.Data.Caching.ObjectCache.Server.Cache { @@ -49,7 +50,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache /// <param name="cacheConf">The cache configuration object</param> /// <returns>The loaded <see cref="IBlobCacheTable"/> implementation</returns> /// <exception cref="FileNotFoundException"></exception> - public static IBlobCacheTable LoadMemoryCacheSystem(this PluginBase plugin, IConfigScope config, ICacheMemoryManagerFactory heap, CacheConfiguration cacheConf) + public static IBlobCacheTable LoadMemoryCacheSystem(this PluginBase plugin, IConfigScope config, ICacheMemoryManagerFactory heap, CacheMemoryConfiguration cacheConf) { #pragma warning disable CA2000 // Dispose objects before losing scope @@ -94,10 +95,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache if(initMethod != null) { //Itterate all buckets - foreach (IBlobCacheBucket bucket in table) - { - initMethod.Invoke(bucket.Id); - } + table.ForEach(bucket => initMethod(bucket.Id)); } } diff --git a/plugins/ObjectCacheServer/src/Cache/CacheEventQueueManager.cs b/plugins/ObjectCacheServer/src/Cache/PeerEventQueueManager.cs index e3c613d..12cf37a 100644 --- a/plugins/ObjectCacheServer/src/Cache/CacheEventQueueManager.cs +++ b/plugins/ObjectCacheServer/src/Cache/PeerEventQueueManager.cs @@ -1,11 +1,11 @@ /* -* Copyright (c) 2023 Vaughn Nugent +* Copyright (c) 2024 Vaughn Nugent * * Library: VNLib * Package: ObjectCacheServer -* File: CacheEventQueueManager.cs +* File: PeerEventQueueManager.cs * -* CacheEventQueueManager.cs is part of ObjectCacheServer which is +* PeerEventQueueManager.cs is part of ObjectCacheServer which is * part of the larger VNLib collection of libraries and utilities. * * ObjectCacheServer is free software: you can redistribute it and/or modify @@ -38,41 +38,37 @@ using VNLib.Plugins.Extensions.Loading.Events; namespace VNLib.Data.Caching.ObjectCache.Server.Cache { - internal sealed class CacheEventQueueManager : ICacheEventQueueManager, IDisposable, IIntervalScheduleable + internal sealed class PeerEventQueueManager : ICacheEventQueueManager, IIntervalScheduleable { private readonly int MaxQueueDepth; - private readonly object SubLock; - private readonly LinkedList<NodeQueue> Subscribers; + private readonly object SubLock = new(); + private readonly LinkedList<PeerEventListenerQueue> Subscribers = []; - private readonly object StoreLock; - private readonly Dictionary<string, NodeQueue> QueueStore; + private readonly object StoreLock = new(); + private readonly Dictionary<string, PeerEventListenerQueue> QueueStore = new(StringComparer.OrdinalIgnoreCase); - - public CacheEventQueueManager(PluginBase plugin) + public PeerEventQueueManager(PluginBase plugin, NodeConfig config) { - //Get node config - NodeConfig config = plugin.GetOrCreateSingleton<NodeConfig>(); - - //Get max queue depth MaxQueueDepth = config.MaxQueueDepth; /* - * Schedule purge interval to clean up stale queues - */ + * Schedule purge interval to clean up stale queues + */ plugin.ScheduleInterval(this, config.EventQueuePurgeInterval); - - SubLock = new(); - Subscribers = new(); - - StoreLock = new(); - QueueStore = new(StringComparer.OrdinalIgnoreCase); + + //Cleanup disposeables on unload + _ = plugin.RegisterForUnload(() => + { + QueueStore.Clear(); + Subscribers.Clear(); + }); } ///<inheritdoc/> public IPeerEventQueue Subscribe(ICachePeer peer) { - NodeQueue? nq; + PeerEventListenerQueue? nq; bool isNew = false; @@ -82,13 +78,13 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache //Try to recover the queue for the node if (!QueueStore.TryGetValue(peer.NodeId, out nq)) { - //Create new queue + //Create new queue since an existing queue was not found nq = new(peer.NodeId, MaxQueueDepth); QueueStore.Add(peer.NodeId, nq); isNew = true; } - //Increment listener count + //Increment listener count since a new listener has attached nq.Listeners++; } @@ -109,11 +105,20 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache ///<inheritdoc/> public void Unsubscribe(ICachePeer peer) { + /* + * The reason I am not purging queues that no longer have listeners + * now is because it is possible that a listener needed to detach because of + * a network issue and will be reconnecting shortly. If the node doesnt + * come back before the next purge interval, it's events will be purged. + * + * Point is: there is a reason for the garbage collection style purging + */ + //Detach a listener for a node lock (StoreLock) { //Get the queue and decrement the listener count - NodeQueue nq = QueueStore[peer.NodeId]; + PeerEventListenerQueue nq = QueueStore[peer.NodeId]; nq.Listeners--; } } @@ -125,7 +130,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache lock (SubLock) { //Loop through ll the fast way - LinkedListNode<NodeQueue>? q = Subscribers.First; + LinkedListNode<PeerEventListenerQueue>? q = Subscribers.First; while (q != null) { @@ -145,7 +150,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache lock (SubLock) { //Loop through ll the fast way - LinkedListNode<NodeQueue>? q = Subscribers.First; + LinkedListNode<PeerEventListenerQueue>? q = Subscribers.First; while (q != null) { @@ -167,9 +172,9 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache lock (StoreLock) { //Get all stale queues (queues without listeners) - NodeQueue[] staleQueues = QueueStore.Values.Where(static nq => nq.Listeners == 0).ToArray(); + PeerEventListenerQueue[] staleQueues = QueueStore.Values.Where(static nq => nq.Listeners == 0).ToArray(); - foreach (NodeQueue nq in staleQueues) + foreach (PeerEventListenerQueue nq in staleQueues) { //Remove from store QueueStore.Remove(nq.NodeId); @@ -191,54 +196,39 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache return Task.CompletedTask; } - void IDisposable.Dispose() - { - QueueStore.Clear(); - Subscribers.Clear(); - } /* * Holds queues for each node and keeps track of the number of listeners * attached to the queue + * + * The role of this class is to store change events for a given peer node, + * and return them when the peer requests them. It also keeps track of the + * number of active listeners (server connections) to the queue. */ - private sealed class NodeQueue : IPeerEventQueue + private sealed class PeerEventListenerQueue(string nodeId, int maxDepth) : IPeerEventQueue { public int Listeners; - public string NodeId { get; } - - public AsyncQueue<ChangeEvent> Queue { get; } + public string NodeId => nodeId; - public NodeQueue(string nodeId, int maxDepth) + /* + * Create a bounded channel that acts as a lru and evicts + * the oldest item when the queue is full + * + * There will also only ever be a single thread writing events + * to the queue + */ + private readonly AsyncQueue<ChangeEvent> Queue = new(new BoundedChannelOptions(maxDepth) { - NodeId = nodeId; - - /* - * Create a bounded channel that acts as a lru and evicts - * the oldest item when the queue is full - * - * There will also only ever be a single thread writing events - * to the queue - */ - - BoundedChannelOptions queueOptions = new(maxDepth) - { - AllowSynchronousContinuations = true, - SingleReader = false, - SingleWriter = true, - //Drop oldest item in queue if full - FullMode = BoundedChannelFullMode.DropOldest, - }; - - //Init queue/channel - Queue = new(queueOptions); - } + AllowSynchronousContinuations = true, + SingleReader = false, + SingleWriter = true, + //Drop oldest item in queue if full + FullMode = BoundedChannelFullMode.DropOldest, + }); - public void PublishChange(ChangeEvent change) - { - Queue.TryEnque(change); - } + public void PublishChange(ChangeEvent change) => Queue.TryEnque(change); public void PublishChanges(Span<ChangeEvent> changes) { @@ -249,16 +239,10 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache } ///<inheritdoc/> - public ValueTask<ChangeEvent> DequeueAsync(CancellationToken cancellation) - { - return Queue.DequeueAsync(cancellation); - } + public ValueTask<ChangeEvent> DequeueAsync(CancellationToken cancellation) => Queue.DequeueAsync(cancellation); ///<inheritdoc/> - public bool TryDequeue(out ChangeEvent change) - { - return Queue.TryDequeue(out change); - } + public bool TryDequeue(out ChangeEvent change) => Queue.TryDequeue(out change); } } } diff --git a/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs b/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs index a240dde..0a1bb4d 100644 --- a/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs +++ b/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2023 Vaughn Nugent +* Copyright (c) 2024 Vaughn Nugent * * Library: VNLib * Package: ObjectCacheServer @@ -36,7 +36,6 @@ using VNLib.Net.Messaging.FBM; using VNLib.Net.Messaging.FBM.Client; using VNLib.Plugins.Extensions.Loading; using VNLib.Data.Caching.Extensions.Clustering; -using VNLib.Data.Caching.ObjectCache.Server.Cache; namespace VNLib.Data.Caching.ObjectCache.Server.Clustering { @@ -56,38 +55,37 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering internal sealed class CacheNodeReplicationMaanger : IAsyncBackgroundWork { private const string LOG_SCOPE_NAME = "REPL"; + private const string FBM_LOG_SCOPE_NAME = "REPL-CLNT"; private static readonly TimeSpan GetItemTimeout = TimeSpan.FromSeconds(10); private const int MAX_MESSAGE_SIZE = 12 * 1024; private readonly PluginBase _plugin; private readonly ILogProvider _log; - private readonly NodeConfig _nodeConfig; - private readonly ICacheStore _cacheStore; - private readonly ICachePeerAdapter _peerAdapter; private readonly FBMClientFactory _clientFactory; - + private readonly ObjectCacheSystemState _sysState; + private readonly bool _isDebug; private int _openConnections; public CacheNodeReplicationMaanger(PluginBase plugin) { - //Load the node config - _nodeConfig = plugin.GetOrCreateSingleton<NodeConfig>(); - _cacheStore = plugin.GetOrCreateSingleton<CacheStore>(); - _peerAdapter = plugin.GetOrCreateSingleton<PeerDiscoveryManager>(); + _sysState = plugin.GetOrCreateSingleton<ObjectCacheSystemState>(); //Init fbm config with fixed message size FBMClientConfig clientConfig = FBMDataCacheExtensions.GetDefaultConfig( - (plugin as ObjectCacheServerEntry)!.ListenerHeap, + _sysState.SharedCacheHeap, MAX_MESSAGE_SIZE, - debugLog: plugin.IsDebug() ? plugin.Log : null + debugLog: plugin.IsDebug() ? plugin.Log.CreateScope(FBM_LOG_SCOPE_NAME) : null ); //Init ws fallback factory and client factory - FBMFallbackClientWsFactory wsFactory = new(); - _clientFactory = new(in clientConfig, wsFactory); + _clientFactory = new( + ref clientConfig, + new FBMFallbackClientWsFactory(), + (int)_sysState.Configuration.MaxPeerConnections + ); _plugin = plugin; _isDebug = plugin.IsDebug(); @@ -103,7 +101,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering while (true) { //Get all new peers - CacheNodeAdvertisment[] peers = _peerAdapter.GetNewPeers(); + CacheNodeAdvertisment[] peers = _sysState.PeerDiscovery.GetNewPeers(); if (peers.Length == 0 && _isDebug) { @@ -111,7 +109,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering } //Make sure we don't exceed the max connections - if(_openConnections >= _nodeConfig.MaxPeerConnections) + if(_openConnections >= _sysState.Configuration.MaxPeerConnections) { if (_isDebug) { @@ -150,13 +148,13 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering private async Task OnNewPeerDoWorkAsync(CacheNodeAdvertisment newPeer, ILogProvider log, CancellationToken exitToken) { - _ = newPeer ?? throw new ArgumentNullException(nameof(newPeer)); + ArgumentNullException.ThrowIfNull(newPeer); //Setup client FBMClient client = _clientFactory.CreateClient(); //Add peer to monitor - _peerAdapter.OnPeerListenerAttached(newPeer); + _sysState.PeerDiscovery.OnPeerListenerAttached(newPeer); Interlocked.Increment(ref _openConnections); @@ -165,12 +163,12 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering log.Information("Establishing replication connection to peer {server}...", newPeer.NodeId); //Connect to the server - await client.ConnectToCacheAsync(newPeer, _nodeConfig.Config, exitToken); + await client.ConnectToCacheAsync(newPeer, _sysState.Configuration.Config, exitToken); log.Information("Connected to {server}, starting queue listeners", newPeer.NodeId); //Start worker tasks - List<Task> workerTasks = new(); + List<Task> workerTasks = []; for (int i = 0; i < Environment.ProcessorCount; i++) { @@ -187,6 +185,10 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering //Disconnect client gracefully await client.DisconnectAsync(CancellationToken.None); } + catch(FBMServerNegiationException fbm) + { + log.Error("Failed to negotiate buffer configuration, check your cache memory configuration. Error:{err}", fbm.Message); + } catch (InvalidResponseException ie) { //See if the plugin is unloading @@ -227,7 +229,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering client.Dispose(); //Notify monitor of disconnect - _peerAdapter.OnPeerListenerDetatched(newPeer); + _sysState.PeerDiscovery.OnPeerListenerDetatched(newPeer); } } @@ -259,7 +261,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering return; case "deleted": //Delete the object from the store - await _cacheStore.DeleteItemAsync(changedObject.CurrentId, CancellationToken.None); + await _sysState.InternalStore.DeleteItemAsync(changedObject.CurrentId, CancellationToken.None); break; case "modified": //Reload the record from the store @@ -297,7 +299,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering if (ResponseCodes.Okay.Equals(status, StringComparison.Ordinal)) { //Update the record - await _cacheStore.AddOrUpdateBlobAsync(objectId, newId, static (t) => t.ResponseBody, response, cancellation); + await _sysState.InternalStore.AddOrUpdateBlobAsync(objectId, newId, static (t) => t.ResponseBody, response, cancellation); log.Debug("Updated object {id}", objectId); } else diff --git a/plugins/ObjectCacheServer/src/Clustering/CachePeerMonitor.cs b/plugins/ObjectCacheServer/src/Clustering/CachePeerMonitor.cs index c49a54b..c3fbd8e 100644 --- a/plugins/ObjectCacheServer/src/Clustering/CachePeerMonitor.cs +++ b/plugins/ObjectCacheServer/src/Clustering/CachePeerMonitor.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2023 Vaughn Nugent +* Copyright (c) 2024 Vaughn Nugent * * Library: VNLib * Package: ObjectCacheServer @@ -29,7 +29,6 @@ using System.Collections.Generic; using VNLib.Utils; using VNLib.Utils.Extensions; -using VNLib.Plugins; namespace VNLib.Data.Caching.ObjectCache.Server.Clustering { @@ -37,12 +36,9 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering internal sealed class CachePeerMonitor : VnDisposeable, IPeerMonitor { - private readonly LinkedList<ICachePeer> peers = new(); + private readonly List<ICachePeer> peers = new(); private readonly ManualResetEvent newPeerTrigger = new (false); - public CachePeerMonitor(PluginBase plugin) - { } - /// <summary> /// Waits for new peers to connect to the server /// </summary> @@ -70,7 +66,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering //When a peer is connected we can add it to the list so the replication manager can see it lock(peers) { - peers.AddLast(peer); + peers.Add(peer); } //Trigger monitor when change occurs @@ -92,6 +88,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering protected override void Free() { + peers.Clear(); newPeerTrigger.Dispose(); } } diff --git a/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs b/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs index 6475f9c..f22e1dd 100644 --- a/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs +++ b/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2023 Vaughn Nugent +* Copyright (c) 2024 Vaughn Nugent * * Library: VNLib * Package: ObjectCacheServer @@ -24,14 +24,11 @@ using System; using System.Linq; -using System.Net.Http; using System.Threading; -using System.Net.Sockets; using System.Threading.Tasks; using System.Collections.Generic; using VNLib.Utils.Logging; -using VNLib.Plugins; using VNLib.Plugins.Extensions.Loading; using VNLib.Data.Caching.Extensions; using VNLib.Data.Caching.Extensions.Clustering; @@ -43,9 +40,10 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering * This class is responsible for resolving and discovering peer nodes in the cluster network. */ - internal sealed class PeerDiscoveryManager : IAsyncBackgroundWork, ICachePeerAdapter + internal sealed class PeerDiscoveryManager(NodeConfig config, ILogProvider Log, bool IsDebug, bool HasWellKnown) : IAsyncBackgroundWork, ICachePeerAdapter { - private const string LOG_SCOPE_NAME = "DISC"; + internal const string LOG_SCOPE_NAME = "DISC"; + /* * The initial discovery delay. This allows for the server to initialize before * starting the discovery process. This will probably be a shorter delay @@ -54,43 +52,9 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering private static readonly TimeSpan InitialDelay = TimeSpan.FromSeconds(15); private static readonly TimeSpan WhenWellKnownResolveFailed = TimeSpan.FromSeconds(20); - - private readonly List<CacheNodeAdvertisment> _connectedPeers; - private readonly NodeConfig Config; - private readonly CachePeerMonitor Monitor; - private readonly ILogProvider Log; - private readonly bool IsDebug; - private readonly bool HasWellKnown; - - public PeerDiscoveryManager(PluginBase plugin) - { - //Get config - Config = plugin.GetOrCreateSingleton<NodeConfig>(); - - //Get the known peers array from config, its allowed to be null for master nodes - IConfigScope? config = plugin.TryGetConfig("known_peers"); - string[] kownPeers = config?.Deserialze<string[]>() ?? Array.Empty<string>(); - - //Add known peers to the monitor - Config.Config.WithInitialPeers(kownPeers.Select(static s => new Uri(s))); - - HasWellKnown = kownPeers.Length > 0; - - //Get the peer monitor - Monitor = plugin.GetOrCreateSingleton<CachePeerMonitor>(); - - _connectedPeers = new(); - - //Create scoped logger - Log = plugin.Log.CreateScope(LOG_SCOPE_NAME); - - Log.Information("Inital peer nodes: {nodes}", kownPeers); - - //Setup discovery error handler - Config.Config.WithErrorHandler(new ErrorHandler(Log)); - - IsDebug = plugin.IsDebug(); - } + private readonly List<CacheNodeAdvertisment> _connectedPeers = []; + private readonly CachePeerMonitor Monitor = new(); + private readonly VNCacheClusterManager clusterMan = new(config.Config); async Task IAsyncBackgroundWork.DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken) { @@ -124,7 +88,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering } //Resolve all known peers - CacheNodeAdvertisment[] wellKnown = await Config.Config.ResolveWellKnownAsync(exitToken); + CacheNodeAdvertisment[] wellKnown = await clusterMan.ResolveWellKnownAsync(exitToken); wellKnownFailed = wellKnown.Length == 0; //Use the monitor to get the initial peers @@ -136,13 +100,13 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering if (allAds.Length > 0) { //Discover all known nodes - await Config.Config.DiscoverNodesAsync(allAds, exitToken); + await clusterMan.DiscoverNodesAsync(allAds, exitToken); } //Log the discovered nodes if verbose logging is enabled if (IsDebug) { - CacheNodeAdvertisment[] found = Config.Config.NodeCollection.GetAllNodes(); + CacheNodeAdvertisment[] found = clusterMan.DiscoveredNodes.GetAllNodes(); Log.Debug("Discovered {count} nodes\n\t{nodes}", found.Length, found.Select(static s => s.NodeId)); } @@ -177,7 +141,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering else { //Delay the next discovery - await Task.Delay(Config.DiscoveryInterval, exitToken); + await Task.Delay(config.DiscoveryInterval, exitToken); } } } @@ -188,7 +152,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering } finally { - + Monitor.Dispose(); } //Wait for the watcher to exit @@ -197,10 +161,11 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering private IEnumerable<CacheNodeAdvertisment> GetMonitorAds() { + string selfId = (clusterMan.Config as CacheNodeConfiguration)!.NodeId; return Monitor.GetAllPeers() .Where(static p => p.Advertisment != null) //Without us - .Where(n => n.NodeId != Config.Config.NodeId) + .Where(n => !string.Equals(n.NodeId, selfId, StringComparison.OrdinalIgnoreCase)) .Select(static p => p.Advertisment!); } @@ -222,7 +187,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering //Use the monitor to get the initial peers IEnumerable<CacheNodeAdvertisment> ads = GetMonitorAds(); - ((NodeDiscoveryCollection)Config.Config.NodeCollection).AddManualNodes(ads); + clusterMan.AddManualNodes(ads); } } catch (OperationCanceledException) @@ -239,7 +204,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering lock (_connectedPeers) { //Get all discovered peers - CacheNodeAdvertisment[] peers = Config.Config.NodeCollection.GetAllNodes(); + CacheNodeAdvertisment[] peers = clusterMan.DiscoveredNodes.GetAllNodes(); //Get the difference between the discovered peers and the connected peers return peers.Except(_connectedPeers).ToArray(); @@ -265,31 +230,5 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering _connectedPeers.Remove(peer); } } - - - private sealed record class ErrorHandler(ILogProvider Logger) : ICacheDiscoveryErrorHandler - { - public void OnDiscoveryError(CacheNodeAdvertisment errorNode, Exception ex) - { - - if (ex is HttpRequestException hre) - { - if (hre.InnerException is SocketException se) - { - //traisnport failed - Logger.Warn("Failed to connect to server {serv} because {err}", errorNode, se.Message); - } - else - { - Logger.Error("Failed to connect to node {n}\n{err}", errorNode, hre); - } - } - else - { - Logger.Error("Failed to discover nodes from nodeid {nid}, with error\n{err}", errorNode, ex); - } - - } - } } } diff --git a/plugins/ObjectCacheServer/src/Endpoints/CacheNegotationManager.cs b/plugins/ObjectCacheServer/src/Endpoints/CacheNegotationManager.cs index d1591f8..48f4448 100644 --- a/plugins/ObjectCacheServer/src/Endpoints/CacheNegotationManager.cs +++ b/plugins/ObjectCacheServer/src/Endpoints/CacheNegotationManager.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2023 Vaughn Nugent +* Copyright (c) 2024 Vaughn Nugent * * Library: VNLib * Package: ObjectCacheServer @@ -45,7 +45,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints public bool IsPeer { get; set; } } - internal sealed class CacheNegotationManager + internal sealed class CacheNegotationManager(PluginBase plugin) { /* * Cache keys are centralized and may be shared between all cache server nodes. This means @@ -64,21 +64,13 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints private static readonly TimeSpan AuthTokenExpiration = TimeSpan.FromSeconds(30); - private readonly string AudienceLocalServerId; - private readonly NodeConfig _nodeConfig; - private readonly CacheConfiguration _cacheConfig; + private readonly string AudienceLocalServerId = Guid.NewGuid().ToString("N"); - public CacheNegotationManager(PluginBase plugin) - { - //Get node configuration - _nodeConfig = plugin.GetOrCreateSingleton<NodeConfig>(); + private readonly ObjectCacheSystemState _sysState = plugin.GetOrCreateSingleton<ObjectCacheSystemState>(); - //Get the cache store configuration - _cacheConfig = plugin.GetConfigForType<CacheStore>().Deserialze<CacheConfiguration>(); + private NodeConfig NodeConfig => _sysState.Configuration; - AudienceLocalServerId = Guid.NewGuid().ToString("N"); - } - + private CacheMemoryConfiguration CacheConfig => _sysState.MemoryConfiguration; public bool IsClientNegotiationValid(string authToken, out ClientNegotiationState state) { @@ -88,12 +80,12 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints using JsonWebToken jwt = JsonWebToken.Parse(authToken); //verify signature for client - if (_nodeConfig.KeyStore.VerifyJwt(jwt, false)) + if (NodeConfig.KeyStore.VerifyJwt(jwt, false)) { //Validated as normal client } //May be signed by a cache server - else if (_nodeConfig.KeyStore.VerifyJwt(jwt, true)) + else if (NodeConfig.KeyStore.VerifyJwt(jwt, true)) { //Set peer and verified flag since the another cache server signed the request state.IsPeer = true; @@ -117,12 +109,12 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints return true; } - public JsonWebToken ConfirmCLientNegotiation(ClientNegotiationState state, IPAddress clientIp, DateTimeOffset now) + public JsonWebToken ConfirmClientNegotiation(ClientNegotiationState state, IPAddress clientIp, DateTimeOffset now) { //Verified, now we can create an auth message with a short expiration JsonWebToken auth = new(); - auth.WriteHeader(_nodeConfig.KeyStore.GetJwtHeader()); + auth.WriteHeader(NodeConfig.KeyStore.GetJwtHeader()); auth.InitPayloadClaim() .AddClaim("aud", AudienceLocalServerId) .AddClaim("iat", now.ToUnixTimeSeconds()) @@ -136,24 +128,29 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints //Set ip address .AddClaim("ip", clientIp.ToString()) //Add negotiaion args - .AddClaim(FBMClient.REQ_HEAD_BUF_QUERY_ARG, _cacheConfig.MaxHeaderBufferSize) - .AddClaim(FBMClient.REQ_RECV_BUF_QUERY_ARG, _cacheConfig.MaxRecvBufferSize) - .AddClaim(FBMClient.REQ_MAX_MESS_QUERY_ARG, _cacheConfig.MaxMessageSize) + .AddClaim(FBMClient.REQ_HEAD_BUF_QUERY_ARG, CacheConfig.MaxHeaderBufferSize) + .AddClaim(FBMClient.REQ_RECV_BUF_QUERY_ARG, CacheConfig.MaxRecvBufferSize) + .AddClaim(FBMClient.REQ_MAX_MESS_QUERY_ARG, CacheConfig.MaxMessageSize) .CommitClaims(); //Sign the auth message from our private key - _nodeConfig.KeyStore.SignJwt(auth); + NodeConfig.KeyStore.SignJwt(auth); return auth; } - public bool ValidateUpgrade(string upgradeToken, string tokenSignature, DateTimeOffset now, IPAddress connectionIp, ref string? nodeId, ref bool isPeer) + public bool ValidateUpgrade(string? upgradeToken, string? tokenSignature, DateTimeOffset now, IPAddress connectionIp, ref string? nodeId, ref bool isPeer) { + if(string.IsNullOrWhiteSpace(upgradeToken) || string.IsNullOrWhiteSpace(tokenSignature)) + { + return false; + } + //Parse jwt using JsonWebToken jwt = JsonWebToken.Parse(upgradeToken); //verify signature against the cache public key, since this server must have signed it - if (!_nodeConfig.KeyStore.VerifyCachePeer(jwt)) + if (!NodeConfig.KeyStore.VerifyCachePeer(jwt)) { return false; } @@ -175,7 +172,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints } //Check node ip address matches if required - if (_nodeConfig.VerifyIp) + if (NodeConfig.VerifyIp) { if (!doc.RootElement.TryGetProperty("ip", out JsonElement ipEl)) { @@ -201,7 +198,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints } //Verify token signature against a fellow cache public key - return _nodeConfig.KeyStore.VerifyUpgradeToken(tokenSignature, upgradeToken, isPeer); + return NodeConfig.KeyStore.VerifyUpgradeToken(tokenSignature, upgradeToken, isPeer); } } } diff --git a/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs b/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs index 816e6c3..d6b733c 100644 --- a/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs +++ b/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2023 Vaughn Nugent +* Copyright (c) 2024 Vaughn Nugent * * Library: VNLib * Package: ObjectCacheServer @@ -55,11 +55,14 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints { internal const string LOG_SCOPE_NAME = "CONEP"; - - private readonly ICacheEventQueueManager PubSubManager; - private readonly IPeerMonitor Peers; - private readonly BlobCacheListener<IPeerEventQueue> Store; - private readonly NodeConfig NodeConfiguration; + + private readonly ObjectCacheSystemState _sysState; + + private PeerEventQueueManager PubSubManager => _sysState.PeerEventQueue; + private CachePeerMonitor Peers => _sysState.PeerMonitor; + private BlobCacheListener<IPeerEventQueue> Listener => _sysState.Listener; + private NodeConfig NodeConfiguration => _sysState.Configuration; + private readonly CacheNegotationManager AuthManager; private uint _connectedClients; @@ -72,7 +75,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints /// <summary> /// The cache store configuration /// </summary> - public CacheConfiguration CacheConfig { get; } + public CacheMemoryConfiguration CacheConfig => _sysState.MemoryConfiguration; //Loosen up protection settings ///<inheritdoc/> @@ -83,24 +86,11 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints public ConnectEndpoint(PluginBase plugin) { - //Get node configuration - NodeConfiguration = plugin.GetOrCreateSingleton<NodeConfig>(); + _sysState = plugin.GetOrCreateSingleton<ObjectCacheSystemState>(); //Init from config and create a new log scope InitPathAndLog(NodeConfiguration.ConnectPath, plugin.Log.CreateScope(LOG_SCOPE_NAME)); - - //Setup pub/sub manager - PubSubManager = plugin.GetOrCreateSingleton<CacheEventQueueManager>(); - - //Get peer monitor - Peers = plugin.GetOrCreateSingleton<CachePeerMonitor>(); - - //Init the cache store - Store = plugin.GetOrCreateSingleton<CacheStore>().Listener; - - //Get the cache store configuration - CacheConfig = plugin.GetConfigForType<CacheStore>().Deserialze<CacheConfiguration>(); - + //Get the auth manager AuthManager = plugin.GetOrCreateSingleton<CacheNegotationManager>(); } @@ -127,6 +117,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints { //Parse jwt from authoriation string? jwtAuth = entity.Server.Headers[HttpRequestHeader.Authorization]; + if (string.IsNullOrWhiteSpace(jwtAuth)) { return VirtualClose(entity, HttpStatusCode.Forbidden); @@ -149,26 +140,34 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints } //Verified, now we can create an auth message with a short expiration - using JsonWebToken auth = AuthManager.ConfirmCLientNegotiation(state, entity.TrustedRemoteIp, entity.RequestedTimeUtc); + using JsonWebToken auth = AuthManager.ConfirmClientNegotiation(state, entity.TrustedRemoteIp, entity.RequestedTimeUtc); - //Close response + //Close response by sending a copy of the signed token entity.CloseResponse(HttpStatusCode.OK, ContentType.Text, auth.DataBuffer); return VfReturnType.VirtualSkip; } protected override VfReturnType WebsocketRequested(HttpEntity entity) { + /* + * Check to see if any more connections are allowed, + * otherwise deny the connection + * + * This is done here to prevent the server from being overloaded + * on a new connection. It would be ideal to not grant new tokens + * but malicious clients could cache a bunch of tokens and use them + * later, exhausting resources. + */ + if(_connectedClients >= NodeConfiguration.MaxConcurrentConnections) + { + return VirtualClose(entity, HttpStatusCode.ServiceUnavailable); + } + //Parse jwt from authorization string? jwtAuth = entity.Server.Headers[HttpRequestHeader.Authorization]; string? clientSignature = entity.Server.Headers[FBMDataCacheExtensions.X_UPGRADE_SIG_HEADER]; string? optionalDiscovery = entity.Server.Headers[FBMDataCacheExtensions.X_NODE_DISCOVERY_HEADER]; - //Not null - if (string.IsNullOrWhiteSpace(jwtAuth) || string.IsNullOrWhiteSpace(clientSignature)) - { - return VfReturnType.Forbidden; - } - string? nodeId = null; bool isPeer = false; @@ -178,15 +177,15 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints return VirtualClose(entity, HttpStatusCode.Unauthorized); } - CacheNodeAdvertisment? discoveryAd = null; - /* * If the client is a peer server, it may offer a signed advertisment * that this node will have the duty of making available to other peers * if it is valid */ - if (isPeer && !string.IsNullOrWhiteSpace(optionalDiscovery)) + CacheNodeAdvertisment? discoveryAd = null; + + if (isPeer) { discoveryAd = NodeConfiguration.KeyStore.VerifyPeerAdvertisment(optionalDiscovery); } @@ -196,11 +195,10 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints try { //Get query config suggestions from the client - string recvBufCmd = entity.QueryArgs[FBMClient.REQ_RECV_BUF_QUERY_ARG]; - string maxHeaderCharCmd = entity.QueryArgs[FBMClient.REQ_HEAD_BUF_QUERY_ARG]; - string maxMessageSizeCmd = entity.QueryArgs[FBMClient.REQ_MAX_MESS_QUERY_ARG]; + string? recvBufCmd = entity.QueryArgs.GetValueOrDefault(FBMClient.REQ_RECV_BUF_QUERY_ARG); + string? maxHeaderCharCmd = entity.QueryArgs.GetValueOrDefault(FBMClient.REQ_HEAD_BUF_QUERY_ARG); + string? maxMessageSizeCmd = entity.QueryArgs.GetValueOrDefault(FBMClient.REQ_MAX_MESS_QUERY_ARG); - //Parse recv buffer size int recvBufSize = int.TryParse(recvBufCmd, out int rbs) ? rbs : CacheConfig.MinRecvBufferSize; int maxHeadBufSize = int.TryParse(maxHeaderCharCmd, out int hbs) ? hbs : CacheConfig.MinHeaderBufferSize; int maxMessageSize = int.TryParse(maxMessageSizeCmd, out int mxs) ? mxs : CacheConfig.MaxMessageSize; @@ -253,9 +251,8 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints Peers.OnPeerConnected(state); //Register plugin exit token to cancel the connected socket - CancellationTokenRegistration reg = this.GetPlugin().UnloadToken.Register(wss.CancelAll); - - //Inc connected count + await using CancellationTokenRegistration reg = this.GetPlugin().UnloadToken.Register(wss.CancelAll); + Interlocked.Increment(ref _connectedClients); try @@ -280,7 +277,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints try { //Begin listening for messages with a queue - await Store.ListenAsync(wss, queue, args); + await Listener.ListenAsync(wss, queue, args); } finally { @@ -291,7 +288,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints else { //Begin listening for messages without a queue - await Store.ListenAsync(wss, null!, args); + await Listener.ListenAsync(wss, null!, args); } } catch (OperationCanceledException) @@ -303,15 +300,13 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints } catch (Exception ex) { - Log.Debug(ex); + //If debug logging is enabled print a more detailed error message + Log.Error("An error occured on websocket connection: node {con} -> {error}", state.NodeId, ex.Message); + Log.Debug("Websocket connection error: node {con}\n{error}", state.NodeId, ex); } - - //Dec connected count + Interlocked.Decrement(ref _connectedClients); - //Unregister the token - reg.Unregister(); - //Notify monitor of disconnect Peers.OnPeerDisconnected(state); diff --git a/plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs b/plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs index 7d376b8..56fe8cd 100644 --- a/plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs +++ b/plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2023 Vaughn Nugent +* Copyright (c) 2024 Vaughn Nugent * * Library: VNLib * Package: ObjectCacheServer @@ -40,25 +40,29 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints { internal sealed class PeerDiscoveryEndpoint : ResourceEndpointBase { - private readonly IPeerMonitor PeerMonitor; - private readonly NodeConfig Config; + private readonly ObjectCacheSystemState SysState; + + private CacheAuthKeyStore KeyStore => SysState.Configuration.KeyStore; + + private CachePeerMonitor PeerMonitor => SysState.PeerMonitor; + + private CacheNodeConfiguration NodeConfig => SysState.Configuration.Config; - //Loosen up protection settings ///<inheritdoc/> protected override ProtectionSettings EndpointProtectionSettings { get; } = new() { - DisableSessionsRequired = true + /* + * Sessions will not be used or required for this endpoint. + * We should also assume the session system is not even loaded + */ + DisableSessionsRequired = true }; public PeerDiscoveryEndpoint(PluginBase plugin) { - //Get the peer monitor - PeerMonitor = plugin.GetOrCreateSingleton<CachePeerMonitor>(); - - //Get the node config - Config = plugin.GetOrCreateSingleton<NodeConfig>(); + SysState = plugin.GetOrCreateSingleton<ObjectCacheSystemState>(); - InitPathAndLog(Config.DiscoveryPath, plugin.Log); + InitPathAndLog(SysState.Configuration.DiscoveryPath!, plugin.Log); } protected override VfReturnType Get(HttpEntity entity) @@ -68,36 +72,41 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints if(string.IsNullOrWhiteSpace(authToken)) { - entity.CloseResponse(HttpStatusCode.Unauthorized); - return VfReturnType.VirtualSkip; + return VirtualClose(entity, HttpStatusCode.Unauthorized); } string subject = string.Empty; string challenge = string.Empty; - //Parse auth token - using(JsonWebToken jwt = JsonWebToken.Parse(authToken)) + try { + //Parse auth token + using JsonWebToken jwt = JsonWebToken.Parse(authToken); + //try to verify against cache node first - if (!Config.KeyStore.VerifyJwt(jwt, true)) + if (!KeyStore.VerifyJwt(jwt, true)) { //failed... //try to verify against client key - if (!Config.KeyStore.VerifyJwt(jwt, false)) + if (!KeyStore.VerifyJwt(jwt, false)) { //invalid token - entity.CloseResponse(HttpStatusCode.Unauthorized); - return VfReturnType.VirtualSkip; + return VirtualClose(entity, HttpStatusCode.Unauthorized); } } using JsonDocument payload = jwt.GetPayload(); //Get client info to pass back - subject = payload.RootElement.TryGetProperty("sub", out JsonElement subEl) ? subEl.GetString() ?? string.Empty : string.Empty; + subject = payload.RootElement.TryGetProperty("sub", out JsonElement subEl) ? subEl.GetString() ?? string.Empty : string.Empty; challenge = payload.RootElement.GetProperty("chl").GetString() ?? string.Empty; } + catch (FormatException) + { + //If tokens are invalid format, let the client know instead of a server error + return VfReturnType.BadRequest; + } //Valid key, get peer list to send to client CacheNodeAdvertisment[] peers = PeerMonitor.GetAllPeers() @@ -109,10 +118,10 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints using JsonWebToken response = new(); //set header from cache config - response.WriteHeader(Config.KeyStore.GetJwtHeader()); + response.WriteHeader(KeyStore.GetJwtHeader()); response.InitPayloadClaim() - .AddClaim("iss", Config.Config.NodeId) + .AddClaim("iss", NodeConfig.NodeId) //Audience is the requestor id .AddClaim("sub", subject) .AddClaim("iat", entity.RequestedTimeUtc.ToUnixTimeSeconds()) @@ -122,10 +131,9 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints .AddClaim("chl", challenge) .CommitClaims(); - //Sign the response - Config.KeyStore.SignJwt(response); - - //Send response to client + + KeyStore.SignJwt(response); + entity.CloseResponse(HttpStatusCode.OK, Net.Http.ContentType.Text, response.DataBuffer); return VfReturnType.VirtualSkip; } diff --git a/plugins/ObjectCacheServer/src/Endpoints/WellKnownEndpoint.cs b/plugins/ObjectCacheServer/src/Endpoints/WellKnownEndpoint.cs index 87a471b..04380c5 100644 --- a/plugins/ObjectCacheServer/src/Endpoints/WellKnownEndpoint.cs +++ b/plugins/ObjectCacheServer/src/Endpoints/WellKnownEndpoint.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2023 Vaughn Nugent +* Copyright (c) 2024 Vaughn Nugent * * Library: VNLib * Package: ObjectCacheServer @@ -59,7 +59,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints public WellKnownEndpoint(PluginBase plugin) { //Get the node config - NodeConfig nodeConfig = plugin.GetOrCreateSingleton<NodeConfig>(); + NodeConfig nodeConfig = plugin.GetOrCreateSingleton<ObjectCacheSystemState>().Configuration; //serialize the config, discovery may not be enabled _advertisment = nodeConfig.Config.Advertisment; diff --git a/plugins/ObjectCacheServer/src/NodeConfig.cs b/plugins/ObjectCacheServer/src/NodeConfig.cs index 3a2e10e..4dd9f4a 100644 --- a/plugins/ObjectCacheServer/src/NodeConfig.cs +++ b/plugins/ObjectCacheServer/src/NodeConfig.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2023 Vaughn Nugent +* Copyright (c) 2024 Vaughn Nugent * * Library: VNLib * Package: ObjectCacheServer @@ -66,6 +66,12 @@ namespace VNLib.Data.Caching.ObjectCache.Server /// </summary> public uint MaxPeerConnections { get; } = 10; + /// <summary> + /// The maxium number of concurrent client connections to allow + /// before rejecting new connections + /// </summary> + public uint MaxConcurrentConnections { get; } + public NodeConfig(PluginBase plugin, IConfigScope config) { //Get the port of the primary webserver @@ -92,32 +98,23 @@ namespace VNLib.Data.Caching.ObjectCache.Server //Init key store KeyStore = new(plugin); - - DiscoveryInterval = config["discovery_interval_sec"].GetTimeSpan(TimeParseType.Seconds); - - //Get the event queue purge interval - EventQueuePurgeInterval = config["queue_purge_interval_sec"].GetTimeSpan(TimeParseType.Seconds); - - //Get the max queue depth - MaxQueueDepth = (int)config["max_queue_depth"].GetUInt32(); - - - //Get the connect path - ConnectPath = config["connect_path"].GetString() ?? throw new KeyNotFoundException("Missing required key 'connect_path' in cluster config"); - - //Get the verify ip setting - VerifyIp = config["verify_ip"].GetBoolean(); + DiscoveryInterval = config.GetRequiredProperty("discovery_interval_sec", p => p.GetTimeSpan(TimeParseType.Seconds)); + EventQueuePurgeInterval = config.GetRequiredProperty("queue_purge_interval_sec", p => p.GetTimeSpan(TimeParseType.Seconds)); + MaxQueueDepth = (int)config.GetRequiredProperty("max_queue_depth", p => p.GetUInt32()); + ConnectPath = config.GetRequiredProperty("connect_path", p => p.GetString()!); + VerifyIp = config.GetRequiredProperty("verify_ip", p => p.GetBoolean()); + WellKnownPath = config.GetValueOrDefault("well_known_path", p => p.GetString()!, DefaultPath); + MaxPeerConnections = config.GetValueOrDefault("max_peers", p => p.GetUInt32(), 10u); Uri connectEp = BuildUri(usingTls, hostname, port, ConnectPath); Uri? discoveryEp = null; - Config = new(); - //Setup cache node config - Config.WithCacheEndpoint(connectEp) - .WithNodeId(nodeId) - .WithAuthenticator(KeyStore) - .WithTls(usingTls); + (Config = new()) + .WithCacheEndpoint(connectEp) + .WithNodeId(nodeId) + .WithAuthenticator(KeyStore) + .WithTls(usingTls); //Get the discovery path (optional) if (config.TryGetValue("discovery_path", out JsonElement discoveryPathEl)) @@ -133,20 +130,6 @@ namespace VNLib.Data.Caching.ObjectCache.Server } } - //Allow custom well-known path - if(config.TryGetValue("well_known_path", out JsonElement wkEl)) - { - WellKnownPath = wkEl.GetString() ?? DefaultPath; - } - //Default if not set - WellKnownPath ??= DefaultPath; - - //Get the max peer connections - if (config.TryGetValue("max_peers", out JsonElement maxPeerEl)) - { - MaxPeerConnections = maxPeerEl.GetUInt32(); - } - const string CacheConfigTemplate = @" Cluster Configuration: diff --git a/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs b/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs index aada787..b970cee 100644 --- a/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs +++ b/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2023 Vaughn Nugent +* Copyright (c) 2024 Vaughn Nugent * * Library: VNLib * Package: ObjectCacheServer @@ -23,13 +23,10 @@ */ using System; -using System.Threading; using System.Collections.Generic; using VNLib.Plugins; -using VNLib.Utils.Memory; using VNLib.Utils.Logging; -using VNLib.Utils.Memory.Diagnostics; using VNLib.Plugins.Extensions.Loading; using VNLib.Plugins.Extensions.Loading.Routing; @@ -42,39 +39,15 @@ namespace VNLib.Data.Caching.ObjectCache.Server public sealed class ObjectCacheServerEntry : PluginBase { public override string PluginName => "ObjectCache.Service"; - - private readonly Lazy<IUnmangedHeap> _cacheHeap; - - internal IUnmangedHeap ListenerHeap => _cacheHeap.Value; - - public ObjectCacheServerEntry() - { - //Init heap - _cacheHeap = new Lazy<IUnmangedHeap>(InitializeHeap, LazyThreadSafetyMode.PublicationOnly); - } - - internal IUnmangedHeap InitializeHeap() - { - //Create default heap - IUnmangedHeap _heap = MemoryUtil.InitializeNewHeapForProcess(); - try - { - //If the plugin is in debug mode enable heap tracking - return this.IsDebug() ? new TrackedHeapWrapper(_heap, true) : _heap; - } - catch - { - _heap.Dispose(); - throw; - } - } + protected override void OnLoad() { try { - //Get the node configuration first - NodeConfig config = this.GetOrCreateSingleton<NodeConfig>(); + //Initialize the cache node builder + ObjectCacheSystemState builder = this.GetOrCreateSingleton<ObjectCacheSystemState>(); + builder.Initialize(); //Route well-known endpoint this.Route<WellKnownEndpoint>(); @@ -86,7 +59,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server _ = this.GetOrCreateSingleton<CacheNodeReplicationMaanger>(); //Setup discovery endpoint - if(!string.IsNullOrWhiteSpace(config.DiscoveryPath)) + if(!string.IsNullOrWhiteSpace(builder.Configuration.DiscoveryPath)) { this.Route<PeerDiscoveryEndpoint>(); } @@ -101,12 +74,6 @@ namespace VNLib.Data.Caching.ObjectCache.Server protected override void OnUnLoad() { - //dispose heap if initialized - if(_cacheHeap.IsValueCreated) - { - _cacheHeap.Value.Dispose(); - } - Log.Information("Plugin unloaded"); } diff --git a/plugins/ObjectCacheServer/src/ObjectCacheSystemState.cs b/plugins/ObjectCacheServer/src/ObjectCacheSystemState.cs new file mode 100644 index 0000000..6183956 --- /dev/null +++ b/plugins/ObjectCacheServer/src/ObjectCacheSystemState.cs @@ -0,0 +1,215 @@ +/* +* Copyright (c) 2024 Vaughn Nugent +* +* Library: VNLib +* Package: ObjectCacheServer +* File: ObjectCacheSystemState.cs +* +* ObjectCacheSystemState.cs is part of ObjectCacheServer which is +* part of the larger VNLib collection of libraries and utilities. +* +* ObjectCacheServer is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* ObjectCacheServer is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Linq; +using System.Net.Http; +using System.Net.Sockets; + +using VNLib.Utils.Logging; +using VNLib.Utils.Memory; +using VNLib.Utils.Memory.Diagnostics; +using VNLib.Net.Messaging.FBM; +using VNLib.Plugins; +using VNLib.Plugins.Extensions.Loading; + +using VNLib.Data.Caching.Extensions.Clustering; +using VNLib.Data.Caching.ObjectCache.Server.Cache; +using VNLib.Data.Caching.ObjectCache.Server.Clustering; + +namespace VNLib.Data.Caching.ObjectCache.Server +{ + [ConfigurationName("cache")] + internal sealed class ObjectCacheSystemState(PluginBase plugin, IConfigScope config) : IDisposable + { + const string LISTENER_LOG_SCOPE = "CacheListener"; + + public BlobCacheListener<IPeerEventQueue> Listener { get; private set; } = null!; + + public ICacheStore InternalStore { get; private set; } = null!; + + /// <summary> + /// Used for miscellaneous shared memory allocations (like the cache listener) + /// </summary> + public IUnmangedHeap SharedCacheHeap { get; private set; } = null!; + + /// <summary> + /// The plugin-wide, shared node configuration + /// </summary> + public NodeConfig Configuration { get; } = plugin.GetOrCreateSingleton<NodeConfig>(); + + /// <summary> + /// The peer discovery manager + /// </summary> + public PeerDiscoveryManager PeerDiscovery { get; private set; } = null!; + + /// <summary> + /// System wide peer monitor + /// </summary> + public CachePeerMonitor PeerMonitor { get; } = new(); + + public CacheMemoryConfiguration MemoryConfiguration { get; } = config.Deserialze<CacheMemoryConfiguration>(); + + /// <summary> + /// The system wide peer event queue manager + /// </summary> + public PeerEventQueueManager PeerEventQueue { get; private set; } + + void IDisposable.Dispose() + { + SharedCacheHeap.Dispose(); + Listener.Dispose(); + } + + /// <summary> + /// Initializes the cache node state + /// </summary> + public void Initialize() + { + CacheMemoryConfiguration cacheConf = MemoryConfiguration; + + ArgumentOutOfRangeException.ThrowIfLessThan(cacheConf.MaxCacheEntries, 2u); + + //Suggestion + if (cacheConf.MaxCacheEntries < 200) + { + plugin.Log.Information("Suggestion: You may want a larger cache size, you have less than 200 items in cache"); + } + + LogMemConfiguration(); + + //If the plugin is in debug mode enable heap tracking + SharedCacheHeap = plugin.IsDebug() ? + new TrackedHeapWrapper(MemoryUtil.InitializeNewHeapForProcess(), true) + : MemoryUtil.InitializeNewHeapForProcess(); + + ConfigurePeerDiscovery(); + + ConfigureCacheListener(); + + PeerEventQueue = new(plugin, Configuration); + } + + private void ConfigurePeerDiscovery() + { + //Get the known peers array from config, its allowed to be null for master nodes + IConfigScope? config = plugin.TryGetConfig("known_peers"); + string[] kownPeers = config?.Deserialze<string[]>() ?? []; + + ILogProvider discLogger = plugin.Log.CreateScope(PeerDiscoveryManager.LOG_SCOPE_NAME); + + Configuration.Config.WithInitialPeers(kownPeers.Select(static s => new Uri(s))) + .WithErrorHandler(new ErrorHandler(discLogger)); + + discLogger.Information("Inital peer nodes: {nodes}", kownPeers); + + PeerDiscovery = new PeerDiscoveryManager( + Configuration, + discLogger, + plugin.IsDebug(), + kownPeers.Length > 0 + ); + + //Discovery manager needs to be scheduled for background work to run the discovery loop + _ = plugin.ObserveWork(PeerDiscovery, 10); + } + + private void ConfigureCacheListener() + { + /* + * Allow loading external managed dll for a bucket-local memory manager + */ + ICacheMemoryManagerFactory manager; + + if (string.IsNullOrWhiteSpace(MemoryConfiguration.ExternLibPath)) + { + //Get the memory manager + manager = plugin.GetOrCreateSingleton<BucketLocalManagerFactory>(); + } + else + { + manager = plugin.CreateServiceExternal<ICacheMemoryManagerFactory>(MemoryConfiguration.ExternLibPath); + } + + //Endpoint only allows for a single reader + Listener = new( + plugin.LoadMemoryCacheSystem(config, manager, MemoryConfiguration), + plugin.GetOrCreateSingleton<CacheListenerPubQueue>(), + plugin.Log.CreateScope(LISTENER_LOG_SCOPE), + new SharedHeapFBMMemoryManager(SharedCacheHeap) + ); + + InternalStore = new CacheStore(Listener.Cache); + } + + private void LogMemConfiguration() + { + const string CacheConfigTemplate = +@" +Cache Configuration: + Max memory: {max} Mb + Buckets: {bc} + Entries per-bucket: {mc} + HeapTracking: {ht} +"; + + CacheMemoryConfiguration cacheConf = MemoryConfiguration; + + //calculate the max memory usage + ulong maxByteSize = cacheConf.MaxCacheEntries * (ulong)cacheConf.BucketCount * (ulong)cacheConf.MaxMessageSize; + + //Log the cache config + plugin.Log.Information( + CacheConfigTemplate, + maxByteSize / (1024 * 1000), + cacheConf.BucketCount, + cacheConf.MaxCacheEntries, + plugin.IsDebug() + ); + } + + private sealed class ErrorHandler(ILogProvider Logger) : ICacheDiscoveryErrorHandler + { + public void OnDiscoveryError(CacheNodeAdvertisment errorNode, Exception ex) + { + if (ex is HttpRequestException hre) + { + if (hre.InnerException is SocketException se) + { + //transport failed + Logger.Warn("Failed to connect to server {serv} because {err}", errorNode, se.Message); + } + else + { + Logger.Error("Failed to connect to node {n}\n{err}", errorNode, hre); + } + } + else + { + Logger.Error("Failed to discover nodes from nodeid {nid}, with error\n{err}", errorNode, ex); + } + } + } + } +} diff --git a/plugins/VNLib.Data.Caching.Providers.VNCache/src/Clustering/ClusterNodeIndex.cs b/plugins/VNLib.Data.Caching.Providers.VNCache/src/Clustering/ClusterNodeIndex.cs index c9cd746..e9dcbc5 100644 --- a/plugins/VNLib.Data.Caching.Providers.VNCache/src/Clustering/ClusterNodeIndex.cs +++ b/plugins/VNLib.Data.Caching.Providers.VNCache/src/Clustering/ClusterNodeIndex.cs @@ -46,7 +46,7 @@ namespace VNLib.Data.Caching.Providers.VNCache.Clustering * it in the app domain. */ - public static IClusterNodeIndex CreateIndex(CacheClientConfiguration config) + public static IClusterNodeIndex CreateIndex(VNCacheClusterManager cluster) { /* TEMPORARY: * Named semaphores are only supported on Windows, which allowed synchronized communication between @@ -75,7 +75,7 @@ namespace VNLib.Data.Caching.Providers.VNCache.Clustering if (remoteIndex == null) { //Create a new index and store it in the app domain - IClusterNodeIndex index = new LocalHandler(config); + IClusterNodeIndex index = new LocalHandler(cluster); AppDomain.CurrentDomain.SetData(APP_DOMAIN_KEY, index); return index; } @@ -92,7 +92,7 @@ namespace VNLib.Data.Caching.Providers.VNCache.Clustering } else { - return new LocalHandler(config); + return new LocalHandler(cluster); } } @@ -114,15 +114,15 @@ namespace VNLib.Data.Caching.Providers.VNCache.Clustering * Unless VNLib.Core supports a new way to safley share types across ALCs, this is my solution. */ - sealed class LocalHandler(CacheClientConfiguration Config) : IClusterNodeIndex, IIntervalScheduleable + sealed class LocalHandler(VNCacheClusterManager cluster) : IClusterNodeIndex, IIntervalScheduleable { private Task _currentUpdate = Task.CompletedTask; ///<inheritdoc/> public CacheNodeAdvertisment? GetNextNode() { - //Get all nodes - CacheNodeAdvertisment[] ads = Config.NodeCollection.GetAllNodes(); + //Get all discovered nodes + CacheNodeAdvertisment[] ads = cluster.DiscoveredNodes.GetAllNodes(); //Just get a random node from the collection for now return ads.Length > 0 ? ads.SelectRandom() : null; } @@ -134,7 +134,7 @@ namespace VNLib.Data.Caching.Providers.VNCache.Clustering public Task OnIntervalAsync(ILogProvider log, CancellationToken cancellationToken) { //Run discovery operation and update the task - _currentUpdate = Config.DiscoverNodesAsync(cancellationToken); + _currentUpdate = cluster.DiscoverNodesAsync(cancellationToken); return Task.CompletedTask; } diff --git a/plugins/VNLib.Data.Caching.Providers.VNCache/src/FBMCacheClient.cs b/plugins/VNLib.Data.Caching.Providers.VNCache/src/FBMCacheClient.cs index 73783dc..a8f86f9 100644 --- a/plugins/VNLib.Data.Caching.Providers.VNCache/src/FBMCacheClient.cs +++ b/plugins/VNLib.Data.Caching.Providers.VNCache/src/FBMCacheClient.cs @@ -59,7 +59,7 @@ namespace VNLib.Data.Caching.Providers.VNCache private readonly VnCacheClientConfig _config; private readonly IClusterNodeIndex _index; - private readonly FBMClientFactory _clientFactory; + private readonly VNCacheClusterClient _cluster; private readonly TimeSpan _initNodeDelay; private bool _isConnected; @@ -83,9 +83,8 @@ namespace VNLib.Data.Caching.Providers.VNCache { ILogProvider scoped = plugin.Log.CreateScope(LOG_NAME); - //Set authenticator and error handler - _clientFactory.GetCacheConfiguration() - .WithAuthenticator(new AuthManager(plugin)) + //When in plugin context, we can use plugin local secrets and a log-based error handler + _cluster.Config.WithAuthenticator(new AuthManager(plugin)) .WithErrorHandler(new DiscoveryErrHAndler(scoped)); //Only the master index is schedulable @@ -116,17 +115,20 @@ namespace VNLib.Data.Caching.Providers.VNCache //Init the client with default settings FBMClientConfig conf = FBMDataCacheExtensions.GetDefaultConfig(BufferHeap, (int)config.MaxBlobSize, config.RequestTimeout, debugLog); - - FBMFallbackClientWsFactory wsFactory = new(); - _clientFactory = new(in conf, wsFactory); - - //Add the configuration to the client - _clientFactory.GetCacheConfiguration() + + FBMClientFactory clientFactory = new( + in conf, + new FBMFallbackClientWsFactory(), + 10 + ); + + _cluster = (new CacheClientConfiguration()) .WithTls(config.UseTls) - .WithInitialPeers(config.GetInitialNodeUris()); + .WithInitialPeers(config.GetInitialNodeUris()) + .ToClusterClient(clientFactory); //Init index - _index = ClusterNodeIndex.CreateIndex(_clientFactory.GetCacheConfiguration()); + _index = ClusterNodeIndex.CreateIndex(_cluster); } /* @@ -216,7 +218,7 @@ namespace VNLib.Data.Caching.Providers.VNCache pluginLog.Debug("Connecting to {node}", node); //Connect to the node and save new client - _client = await _clientFactory.ConnectToCacheAsync(node, exitToken); + _client = await _cluster.ConnectToCacheAsync(node, exitToken); if (pluginLog.IsEnabled(LogLevel.Debug)) { @@ -327,22 +329,11 @@ namespace VNLib.Data.Caching.Providers.VNCache ///<inheritdoc/> public override object GetUnderlyingStore() => _client ?? throw new InvalidOperationException("The client is not currently connected"); - private sealed class AuthManager : ICacheAuthManager + private sealed class AuthManager(PluginBase plugin) : ICacheAuthManager { - private IAsyncLazy<ReadOnlyJsonWebKey> _sigKey; - private IAsyncLazy<ReadOnlyJsonWebKey> _verKey; - - public AuthManager(PluginBase plugin) - { - //Lazy load keys - - //Get the signing key - _sigKey = plugin.GetSecretAsync("client_private_key").ToLazy(static r => r.GetJsonWebKey()); - - //Lazy load cache public key - _verKey = plugin.GetSecretAsync("cache_public_key").ToLazy(static r => r.GetJsonWebKey()); - } + private IAsyncLazy<ReadOnlyJsonWebKey> _sigKey = plugin.GetSecretAsync("client_private_key").ToLazy(static r => r.GetJsonWebKey()); + private IAsyncLazy<ReadOnlyJsonWebKey> _verKey = plugin.GetSecretAsync("cache_public_key").ToLazy(static r => r.GetJsonWebKey()); public async Task AwaitLazyKeyLoad() { diff --git a/plugins/VNLib.Data.Caching.Providers.VNCache/src/VnCacheClientConfig.cs b/plugins/VNLib.Data.Caching.Providers.VNCache/src/VnCacheClientConfig.cs index 383c979..0d6cd34 100644 --- a/plugins/VNLib.Data.Caching.Providers.VNCache/src/VnCacheClientConfig.cs +++ b/plugins/VNLib.Data.Caching.Providers.VNCache/src/VnCacheClientConfig.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2023 Vaughn Nugent +* Copyright (c) 2024 Vaughn Nugent * * Library: VNLib * Package: VNLib.Data.Caching.Providers.VNCache @@ -33,6 +33,8 @@ namespace VNLib.Data.Caching.Providers.VNCache /// </summary> public class VnCacheClientConfig : VNCacheConfig { + const string DefaultWellKnownEndpoint = "/.well-known/vncache"; + /// <summary> /// The broker server address /// </summary> @@ -88,7 +90,13 @@ namespace VNLib.Data.Caching.Providers.VNCache public Uri[] GetInitialNodeUris() { _ = InitialNodes ?? throw new InvalidOperationException("Initial nodes have not been set"); - return InitialNodes.Select(static x => new Uri(x, UriKind.Absolute)).ToArray(); + return InitialNodes.Select(static x => + { + //Append a default well known endpoint if the path is just a root + Uri ur = new (x, UriKind.Absolute); + return ur.LocalPath == "/" ? new Uri(ur, DefaultWellKnownEndpoint) : ur; + }) + .ToArray(); } ///<inheritdoc/> |