aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLibravatar vnugent <public@vaughnnugent.com>2024-03-06 21:30:58 -0500
committerLibravatar vnugent <public@vaughnnugent.com>2024-03-06 21:30:58 -0500
commit4d8cfc10382105b0acbd94df93ad3d05ff91db54 (patch)
treed9795c60b2e2a4871eddff43311866784c1c054b
parent016a96a80cce025a86c6cf26707738f6a2eb2658 (diff)
refactor: #2 Centralize server state, default discovery endpoints & more
-rw-r--r--lib/VNLib.Data.Caching.Extensions/src/ApiModel/CacheSiteAdapter.cs19
-rw-r--r--lib/VNLib.Data.Caching.Extensions/src/Clustering/CacheClientConfiguration.cs11
-rw-r--r--lib/VNLib.Data.Caching.Extensions/src/Clustering/CacheNodeConfiguration.cs15
-rw-r--r--lib/VNLib.Data.Caching.Extensions/src/Clustering/INodeDiscoveryEnumerator.cs6
-rw-r--r--lib/VNLib.Data.Caching.Extensions/src/Clustering/NodeDiscoveryCollection.cs45
-rw-r--r--lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs361
-rw-r--r--lib/VNLib.Data.Caching.Extensions/src/VNCacheClusterClient.cs78
-rw-r--r--lib/VNLib.Data.Caching.Extensions/src/VNCacheClusterManager.cs300
-rw-r--r--lib/VNLib.Data.Caching/src/ClientExtensions.cs2
-rw-r--r--plugins/ObjectCacheServer/src/BucketLocalManagerFactory.cs5
-rw-r--r--plugins/ObjectCacheServer/src/Cache/CacheListenerPubQueue.cs11
-rw-r--r--plugins/ObjectCacheServer/src/Cache/CacheMemoryConfiguration.cs (renamed from plugins/ObjectCacheServer/src/Cache/CacheConfiguration.cs)15
-rw-r--r--plugins/ObjectCacheServer/src/Cache/CacheStore.cs73
-rw-r--r--plugins/ObjectCacheServer/src/Cache/CacheSystemUtil.cs8
-rw-r--r--plugins/ObjectCacheServer/src/Cache/PeerEventQueueManager.cs (renamed from plugins/ObjectCacheServer/src/Cache/CacheEventQueueManager.cs)132
-rw-r--r--plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs48
-rw-r--r--plugins/ObjectCacheServer/src/Clustering/CachePeerMonitor.cs11
-rw-r--r--plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs93
-rw-r--r--plugins/ObjectCacheServer/src/Endpoints/CacheNegotationManager.cs49
-rw-r--r--plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs91
-rw-r--r--plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs60
-rw-r--r--plugins/ObjectCacheServer/src/Endpoints/WellKnownEndpoint.cs4
-rw-r--r--plugins/ObjectCacheServer/src/NodeConfig.cs55
-rw-r--r--plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs45
-rw-r--r--plugins/ObjectCacheServer/src/ObjectCacheSystemState.cs215
-rw-r--r--plugins/VNLib.Data.Caching.Providers.VNCache/src/Clustering/ClusterNodeIndex.cs14
-rw-r--r--plugins/VNLib.Data.Caching.Providers.VNCache/src/FBMCacheClient.cs45
-rw-r--r--plugins/VNLib.Data.Caching.Providers.VNCache/src/VnCacheClientConfig.cs12
28 files changed, 965 insertions, 858 deletions
diff --git a/lib/VNLib.Data.Caching.Extensions/src/ApiModel/CacheSiteAdapter.cs b/lib/VNLib.Data.Caching.Extensions/src/ApiModel/CacheSiteAdapter.cs
index 99acfd5..b6add7d 100644
--- a/lib/VNLib.Data.Caching.Extensions/src/ApiModel/CacheSiteAdapter.cs
+++ b/lib/VNLib.Data.Caching.Extensions/src/ApiModel/CacheSiteAdapter.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2023 Vaughn Nugent
+* Copyright (c) 2024 Vaughn Nugent
*
* Library: VNLib
* Package: VNLib.Data.Caching.Extensions
@@ -22,6 +22,7 @@
* along with this program. If not, see https://www.gnu.org/licenses/.
*/
+using System;
using System.Net;
using System.Text;
using System.Threading;
@@ -39,6 +40,22 @@ namespace VNLib.Data.Caching.Extensions.ApiModel
/// </summary>
internal sealed class CacheSiteAdapter : RestSiteAdapterBase
{
+ /*
+ * Lazy to defer errors for debuggong
+ */
+ private static readonly Lazy<CacheSiteAdapter> _lazy = new(() => ConfigureAdapter(2));
+
+ internal static CacheSiteAdapter Instance => _lazy.Value;
+
+ private static CacheSiteAdapter ConfigureAdapter(int maxClients)
+ {
+ CacheSiteAdapter adapter = new(maxClients);
+ //Configure the site endpoints
+ adapter.BuildEndpoints(ServiceEndpoints.Definition);
+ return adapter;
+ }
+
+
protected override RestClientPool Pool { get; }
public CacheSiteAdapter(int maxClients)
diff --git a/lib/VNLib.Data.Caching.Extensions/src/Clustering/CacheClientConfiguration.cs b/lib/VNLib.Data.Caching.Extensions/src/Clustering/CacheClientConfiguration.cs
index 5a15d33..263f41a 100644
--- a/lib/VNLib.Data.Caching.Extensions/src/Clustering/CacheClientConfiguration.cs
+++ b/lib/VNLib.Data.Caching.Extensions/src/Clustering/CacheClientConfiguration.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2023 Vaughn Nugent
+* Copyright (c) 2024 Vaughn Nugent
*
* Library: VNLib
* Package: VNLib.Data.Caching.Extensions
@@ -37,14 +37,9 @@ namespace VNLib.Data.Caching.Extensions.Clustering
public class CacheClientConfiguration
{
/// <summary>
- /// Stores available cache servers to be used for discovery, and connections
- /// </summary>
- public INodeDiscoveryCollection NodeCollection { get; } = new NodeDiscoveryCollection();
-
- /// <summary>
/// The authentication manager to use for signing and verifying messages to and from the cache servers
/// </summary>
- public ICacheAuthManager AuthManager { get; private set; }
+ public ICacheAuthManager AuthManager { get; private set; } = null!;
/// <summary>
/// The error handler to use for handling errors that occur during the discovery process
@@ -89,7 +84,7 @@ namespace VNLib.Data.Caching.Extensions.Clustering
public CacheClientConfiguration WithInitialPeers(IEnumerable<Uri> peers)
{
//Check null
- _ = peers ?? throw new ArgumentNullException(nameof(peers));
+ ArgumentNullException.ThrowIfNull(peers);
//Store peer array
WellKnownNodes = peers.ToArray();
diff --git a/lib/VNLib.Data.Caching.Extensions/src/Clustering/CacheNodeConfiguration.cs b/lib/VNLib.Data.Caching.Extensions/src/Clustering/CacheNodeConfiguration.cs
index 6b7ab48..c21ed05 100644
--- a/lib/VNLib.Data.Caching.Extensions/src/Clustering/CacheNodeConfiguration.cs
+++ b/lib/VNLib.Data.Caching.Extensions/src/Clustering/CacheNodeConfiguration.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2023 Vaughn Nugent
+* Copyright (c) 2024 Vaughn Nugent
*
* Library: VNLib
* Package: VNLib.Data.Caching.Extensions
@@ -23,6 +23,7 @@
*/
using System;
+using System.Runtime.CompilerServices;
namespace VNLib.Data.Caching.Extensions.Clustering
{
@@ -85,8 +86,14 @@ namespace VNLib.Data.Caching.Extensions.Clustering
return this;
}
+ internal StrongBox<string> NodeIdRef { get; } = new(string.Empty);
+
///<inheritdoc/>
- public string NodeId { get; private set; } = null!;
+ public string NodeId
+ {
+ get => NodeIdRef.Value!;
+ private set => NodeIdRef.Value = value;
+ }
/// <summary>
/// Specifies the current server's cluster node id. If this
@@ -99,10 +106,6 @@ namespace VNLib.Data.Caching.Extensions.Clustering
public CacheNodeConfiguration WithNodeId(string nodeId)
{
NodeId = nodeId ?? throw new ArgumentNullException(nameof(nodeId));
-
- //Update the node id in the node collection
- (NodeCollection as NodeDiscoveryCollection)!.SetSelfId(nodeId);
-
return this;
}
diff --git a/lib/VNLib.Data.Caching.Extensions/src/Clustering/INodeDiscoveryEnumerator.cs b/lib/VNLib.Data.Caching.Extensions/src/Clustering/INodeDiscoveryEnumerator.cs
index 677088a..e57e69b 100644
--- a/lib/VNLib.Data.Caching.Extensions/src/Clustering/INodeDiscoveryEnumerator.cs
+++ b/lib/VNLib.Data.Caching.Extensions/src/Clustering/INodeDiscoveryEnumerator.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2023 Vaughn Nugent
+* Copyright (c) 2024 Vaughn Nugent
*
* Library: VNLib
* Package: VNLib.Data.Caching.Extensions
@@ -28,7 +28,9 @@ using System.Collections.Generic;
namespace VNLib.Data.Caching.Extensions.Clustering
{
/// <summary>
- /// A custom enumerator for the node discovery process
+ /// A custom enumerator for the node discovery process. Simplifies the recursive processes
+ /// of discovering nodes in a cluster to a simple enumeration process. It allows for real-time
+ /// updates to the collection of discovered nodes as a union operation.
/// </summary>
public interface INodeDiscoveryEnumerator : IEnumerator<CacheNodeAdvertisment>
{
diff --git a/lib/VNLib.Data.Caching.Extensions/src/Clustering/NodeDiscoveryCollection.cs b/lib/VNLib.Data.Caching.Extensions/src/Clustering/NodeDiscoveryCollection.cs
index b0e53e1..16b96a3 100644
--- a/lib/VNLib.Data.Caching.Extensions/src/Clustering/NodeDiscoveryCollection.cs
+++ b/lib/VNLib.Data.Caching.Extensions/src/Clustering/NodeDiscoveryCollection.cs
@@ -1,12 +1,12 @@
/*
-* Copyright (c) 2023 Vaughn Nugent
+* Copyright (c) 2024 Vaughn Nugent
*
* Library: VNLib
* Package: VNLib.Data.Caching.Extensions
* File: NodeDiscoveryCollection.cs
*
-* NodeDiscoveryCollection.cs is part of VNLib.Data.Caching.Extensions which is part of the larger
-* VNLib collection of libraries and utilities.
+* NodeDiscoveryCollection.cs is part of VNLib.Data.Caching.Extensions which is part
+* of the larger VNLib collection of libraries and utilities.
*
* VNLib.Data.Caching.Extensions is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
@@ -26,24 +26,18 @@ using System;
using System.Linq;
using System.Collections;
using System.Collections.Generic;
+using System.Runtime.CompilerServices;
+
+using VNLib.Utils.Extensions;
namespace VNLib.Data.Caching.Extensions.Clustering
{
/// <summary>
/// Represents a collection of available cache nodes from a discovery process
/// </summary>
- public sealed class NodeDiscoveryCollection : INodeDiscoveryCollection
+ public sealed class NodeDiscoveryCollection(StrongBox<string?>? selfId) : INodeDiscoveryCollection
{
- private string? _selfId;
- private LinkedList<CacheNodeAdvertisment> _peers;
-
- /// <summary>
- /// Initializes a new empty <see cref="NodeDiscoveryCollection"/>
- /// </summary>
- public NodeDiscoveryCollection()
- {
- _peers = new();
- }
+ private LinkedList<CacheNodeAdvertisment> _peers = new();
/// <summary>
/// Manually adds nodes to the collection that were not discovered through the discovery process
@@ -62,39 +56,34 @@ namespace VNLib.Data.Caching.Extensions.Clustering
}
/// <summary>
- /// Sets the id of the current node, so it can be excluded from discovery
+ /// Removes a vector of nodes from the internal collection
/// </summary>
- /// <param name="selfId">The id of the current node to exclude</param>
- public void SetSelfId(string? selfId) => _selfId = selfId;
+ /// <param name="nodes">The vector containg nodes to remove from the collection</param>
+ public void RemoveManualNodes(IEnumerable<CacheNodeAdvertisment> nodes) => nodes.ForEach(n => _peers.Remove(n));
///<inheritdoc/>
- public INodeDiscoveryEnumerator BeginDiscovery()
- {
- return new NodeEnumerator(new(), _selfId);
- }
+ public INodeDiscoveryEnumerator BeginDiscovery() => new NodeEnumerator(new(), selfId?.Value);
///<inheritdoc/>
public INodeDiscoveryEnumerator BeginDiscovery(IEnumerable<CacheNodeAdvertisment> initialPeers)
{
+ ArgumentNullException.ThrowIfNull(initialPeers);
+
//Init new enumerator with the initial peers
- return new NodeEnumerator(new(initialPeers), _selfId);
+ return new NodeEnumerator(new(initialPeers), selfId?.Value);
}
///<inheritdoc/>
public void CompleteDiscovery(INodeDiscoveryEnumerator enumerator)
{
- _ = enumerator ?? throw new ArgumentNullException(nameof(enumerator));
+ ArgumentNullException.ThrowIfNull(enumerator);
//Capture all nodes from the enumerator and store them as our current peers
_peers = (enumerator as NodeEnumerator)!.Peers;
}
///<inheritdoc/>
- public CacheNodeAdvertisment[] GetAllNodes()
- {
- //Capture all current peers
- return _peers.ToArray();
- }
+ public CacheNodeAdvertisment[] GetAllNodes() => _peers.ToArray();
private sealed record class NodeEnumerator(LinkedList<CacheNodeAdvertisment> Peers, string? SelfNodeId) : INodeDiscoveryEnumerator
{
diff --git a/lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs b/lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs
index bd86461..47aadd9 100644
--- a/lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs
+++ b/lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs
@@ -31,7 +31,6 @@ using System.Threading;
using System.Threading.Tasks;
using System.Collections.Generic;
using System.Security.Cryptography;
-using System.Runtime.CompilerServices;
using RestSharp;
@@ -74,22 +73,7 @@ namespace VNLib.Data.Caching.Extensions
/// The advertisment header for cache node discovery
/// </summary>
public const string X_NODE_DISCOVERY_HEADER = "X-Cache-Node-Discovery";
-
- /*
- * Lazy to defer errors for debuggong
- */
- private static readonly Lazy<CacheSiteAdapter> SiteAdapter = new(() => ConfigureAdapter(2));
-
- private static CacheSiteAdapter ConfigureAdapter(int maxClients)
- {
- CacheSiteAdapter adapter = new(maxClients);
- //Configure the site endpoints
- adapter.BuildEndpoints(ServiceEndpoints.Definition);
- return adapter;
- }
-
- private static readonly ConditionalWeakTable<FBMClientFactory, CacheClientConfiguration> ClientCacheConfig = new();
-
+
/// <summary>
/// Gets a <see cref="FBMClientConfig"/> preconfigured object caching
/// protocl
@@ -101,7 +85,7 @@ namespace VNLib.Data.Caching.Extensions
/// <returns>A preconfigured <see cref="FBMClientConfig"/> for object caching</returns>
public static FBMClientConfig GetDefaultConfig(IUnmangedHeap heap, int maxMessageSize, TimeSpan timeout = default, ILogProvider? debugLog = null)
{
- return GetDefaultConfig(new FallbackFBMMemoryManager(heap), maxMessageSize, timeout, debugLog);
+ return GetDefaultConfig(new SharedHeapFBMMemoryManager(heap), maxMessageSize, timeout, debugLog);
}
/// <summary>
@@ -153,236 +137,22 @@ namespace VNLib.Data.Caching.Extensions
{
client.Config.DebugLog?.Debug("{debug}: {data}", "[CACHE]", message);
}
-
- /// <summary>
- /// Discovers ALL possible cache nodes itteritivley, first by collecting the configuration
- /// from the initial peers.
- /// This will make connections to all discoverable servers
- /// </summary>
- /// <param name="config"></param>
- /// <param name="cancellation">A token to cancel the operation</param>
- /// <returns></returns>
- /// <exception cref="ArgumentException"></exception>
- /// <exception cref="CacheDiscoveryFailureException"></exception>
- public static async Task DiscoverNodesAsync(this CacheClientConfiguration config, CancellationToken cancellation)
- {
- //Make sure at least one node defined
- if(config?.WellKnownNodes == null || config.WellKnownNodes.Length == 0)
- {
- throw new ArgumentException("There must be at least one cache node defined in the client configuration");
- }
-
- //Get the initial advertisments that arent null
- CacheNodeAdvertisment[] initialPeers = await ResolveWellKnownAsync(config, cancellation);
-
- if (initialPeers.Length == 0)
- {
- throw new CacheDiscoveryFailureException("There must be at least one available cache node to continue discovery");
- }
-
- await DiscoverNodesAsync(config, initialPeers, cancellation);
- }
/// <summary>
- /// Resolves the initial well-known cache nodes into their advertisments
+ /// Gets the discovery manager for the current client configuration. Just a
+ /// convience method.
/// </summary>
- /// <param name="config"></param>
- /// <param name="cancellation">A token to cancel the operation</param>
- /// <returns>An array of resolved nodes</returns>
- public static async Task<CacheNodeAdvertisment[]> ResolveWellKnownAsync(this CacheClientConfiguration config, CancellationToken cancellation)
- {
- _ = config?.WellKnownNodes ?? throw new ArgumentNullException(nameof(config));
-
- Task<CacheNodeAdvertisment?>[] initialAdds = new Task<CacheNodeAdvertisment?>[config.WellKnownNodes.Length];
-
- //Discover initial advertisments from well-known addresses
- for (int i = 0; i < config.WellKnownNodes.Length; i++)
- {
- initialAdds[i] = DiscoverNodeConfigAsync(config.WellKnownNodes[i], config, cancellation);
- }
-
- //Wait for all initial adds to complete
- await Task.WhenAll(initialAdds);
-
- //Get the initial advertisments that arent null
- return initialAdds.Select(static x => x.Result!).Where(static s => s != null).ToArray();
- }
+ /// <param name="conf"></param>
+ /// <returns>The new <see cref="VNCacheClusterManager"/> instance around your config</returns>
+ public static VNCacheClusterManager GetDiscoveryManager(this CacheClientConfiguration conf) => new(conf);
/// <summary>
- /// Discovers ALL possible cache nodes itteritivley from the current collection of initial peers.
- /// This will make connections to all discoverable servers and update the client configuration, with all
- /// discovered peers
+ /// Converts the cache client configuration to a cluster client
/// </summary>
/// <param name="config"></param>
- /// <param name="initialPeers">Accepts an array of initial peers to override the endpoint discovery process</param>
- /// <param name="cancellation">A token to cancel the operation</param>
- /// <returns>A task that completes when all nodes have been discovered</returns>
- /// <exception cref="ArgumentException"></exception>
- /// <exception cref="CacheDiscoveryFailureException"></exception>
- public static async Task DiscoverNodesAsync(this CacheClientConfiguration config, CacheNodeAdvertisment[] initialPeers, CancellationToken cancellation)
- {
- //Make sure at least one node defined
- if (initialPeers == null || initialPeers.Length == 0)
- {
- throw new ArgumentException("There must be at least one initial peer");
- }
-
- //Get the discovery enumerator with the initial peers
- INodeDiscoveryEnumerator enumerator = config.NodeCollection.BeginDiscovery(initialPeers);
-
- //Start the discovery process
- await DiscoverNodesAsync(enumerator, config, config.ErrorHandler, cancellation);
-
- //Commit nodes
- config.NodeCollection.CompleteDiscovery(enumerator);
- }
-
- private static async Task DiscoverNodesAsync(
- INodeDiscoveryEnumerator enumerator,
- CacheClientConfiguration config,
- ICacheDiscoveryErrorHandler? errHandler,
- CancellationToken cancellation
- )
- {
- //Loop through servers
- while (enumerator.MoveNext())
- {
- //Make sure the node has a discovery endpoint
- if (enumerator.Current.DiscoveryEndpoint == null)
- {
- //Skip this node
- continue;
- }
-
- /*
- * We are allowed to save nodes that do not have a discovery endpoint, but we cannot
- * discover nodes from them we can only use them as cache
- */
-
- //add a random delay to avoid spamming servers
- await Task.Delay((int)Random.Shared.NextInt64(100, 500), cancellation);
-
- try
- {
- //Discover nodes from the current node
- CacheNodeAdvertisment[]? nodes = await GetCacheNodesAsync(enumerator.Current, config, cancellation);
-
- if (nodes != null)
- {
- //Add nodes to the collection
- enumerator.OnPeerDiscoveryComplete(nodes);
- }
- }
- //Catch exceptions when an error handler is defined
- catch(Exception ex) when (errHandler != null)
- {
- //Handle the error
- errHandler.OnDiscoveryError(enumerator.Current, ex);
- }
- catch(Exception ex)
- {
- throw new CacheDiscoveryFailureException($"Failed to discovery peer node {enumerator.Current?.NodeId}, cannot continue", ex);
- }
- }
- }
-
- private static async Task<CacheNodeAdvertisment?> DiscoverNodeConfigAsync(Uri serverUri, CacheClientConfiguration config, CancellationToken cancellation)
- {
- try
- {
- GetConfigRequest req = new (serverUri, config);
-
- //Site adapter verifies response messages so we dont need to check on the response
- byte[] data = await SiteAdapter.Value.ExecuteAsync(req, cancellation).AsBytes()
- ?? throw new CacheDiscoveryFailureException($"No data returned from desired cache node");
-
- //Response is jwt
- using JsonWebToken responseJwt = JsonWebToken.ParseRaw(data);
-
- //The entire payload is just the single serialzed advertisment
- using JsonDocument doc = responseJwt.GetPayload();
-
- return doc.RootElement.GetProperty("sub").Deserialize<CacheNodeAdvertisment>();
- }
- //Bypass cdfe when error handler is null
- catch(CacheDiscoveryFailureException) when(config.ErrorHandler == null)
- {
- throw;
- }
- //Catch exceptions when an error handler is defined
- catch (Exception ex) when (config.ErrorHandler != null)
- {
- //Handle the error
- config.ErrorHandler.OnDiscoveryError(null!, ex);
- return null;
- }
- catch (Exception ex)
- {
- throw new CacheDiscoveryFailureException("Failed to discover node configuration", ex);
- }
- }
-
- /// <summary>
- /// Contacts the given server's discovery endpoint to discover a list of available
- /// servers we can connect to
- /// </summary>
- /// <param name="advert">An advertisment of a server to discover other nodes from</param>
- /// <param name="cancellationToken">A token to cancel the operationS</param>
- /// <param name="config">The cache configuration object</param>
- /// <returns>The list of active servers</returns>
- /// <exception cref="SecurityException"></exception>
- /// <exception cref="ArgumentException"></exception>
- /// <exception cref="ArgumentNullException"></exception>
- public static async Task<CacheNodeAdvertisment[]?> GetCacheNodesAsync(CacheNodeAdvertisment advert, CacheClientConfiguration config, CancellationToken cancellationToken = default)
- {
- _ = advert ?? throw new ArgumentNullException(nameof(advert));
- _ = config ?? throw new ArgumentNullException(nameof(config));
- _ = advert.DiscoveryEndpoint ?? throw new ArgumentException("Advertisment does not expose an advertisment endpoint");
-
- DiscoveryRequest req = new (advert.DiscoveryEndpoint, config);
-
- //Site adapter verifies response messages so we dont need to check on the response
- byte[] data = await SiteAdapter.Value.ExecuteAsync(req, cancellationToken).AsBytes()
- ?? throw new InvalidOperationException($"No data returned from node {advert.NodeId}");
-
- //Response is jwt
- using JsonWebToken responseJwt = JsonWebToken.ParseRaw(data);
-
- using JsonDocument doc = responseJwt.GetPayload();
- return doc.RootElement.GetProperty("peers").Deserialize<CacheNodeAdvertisment[]>();
- }
-
- /// <summary>
- /// Allows for configuration of an <see cref="FBMClient"/>
- /// for a connection to a cache server
- /// </summary>
- /// <param name="client"></param>
- /// <returns>A fluent api configuration builder for the current client</returns>
- public static CacheClientConfiguration GetCacheConfiguration(this FBMClientFactory client) => ClientCacheConfig.GetOrCreateValue(client);
-
- /// <summary>
- /// Explicitly set the client cache configuration for the current client
- /// </summary>
- /// <param name="client"></param>
- /// <param name="config">The cache node configuration</param>
- /// <returns>The config instance</returns>
- public static CacheClientConfiguration SetCacheConfiguration(this FBMClientFactory client, CacheClientConfiguration config)
- {
- ClientCacheConfig.AddOrUpdate(client, config);
- return config;
- }
-
- /// <summary>
- /// Explicitly set the cache node configuration for the current client
- /// </summary>
- /// <param name="client"></param>
- /// <param name="nodeConfig">The cache node configuration</param>
- /// <returns>The config instance</returns>
- public static CacheNodeConfiguration SetCacheConfiguration(this FBMClientFactory client, CacheNodeConfiguration nodeConfig)
- {
- ClientCacheConfig.AddOrUpdate(client, nodeConfig);
- return nodeConfig;
- }
+ /// <param name="factory">The FBM client factory instance to use</param>
+ /// <returns>The new cluster client instance</returns>
+ public static VNCacheClusterClient ToClusterClient(this CacheClientConfiguration config, FBMClientFactory factory) => new(config, factory);
/// <summary>
/// Waits for the client to disconnect from the server while observing
@@ -396,6 +166,8 @@ namespace VNLib.Data.Caching.Extensions
/// <returns>A task that complets when the connecion has been closed successfully</returns>
public static async Task WaitForExitAsync(this FBMClient client, CancellationToken token = default)
{
+ ArgumentNullException.ThrowIfNull(client);
+
client.LogDebug("Waiting for cache client to exit");
//Get task for cancellation
@@ -420,88 +192,6 @@ namespace VNLib.Data.Caching.Extensions
}
/// <summary>
- /// Discovers all available nodes for the current client config
- /// </summary>
- /// <param name="client"></param>
- /// <param name="cancellation">A token to cancel the operation</param>
- /// <returns>A task that completes when all nodes have been discovered</returns>
- public static Task DiscoverAvailableNodesAsync(this FBMClientFactory client, CancellationToken cancellation = default)
- {
- //Get stored config
- CacheClientConfiguration conf = ClientCacheConfig.GetOrCreateValue(client);
-
- //Discover all nodes
- return conf.DiscoverNodesAsync(cancellation);
- }
-
- /// <summary>
- /// Connects to a random server from the servers discovered during a cache server discovery
- /// </summary>
- /// <param name="client"></param>
- /// <param name="cancellation">A token to cancel the operation</param>
- /// <returns>The server that the connection was made with</returns>
- /// <exception cref="FBMException"></exception>
- /// <exception cref="FBMServerNegiationException"></exception>
- /// <exception cref="ArgumentException"></exception>
- /// <exception cref="ArgumentNullException"></exception>
- /// <exception cref="SecurityException"></exception>
- /// <exception cref="ObjectDisposedException"></exception>
- public static async Task<CacheNodeAdvertisment> ConnectToRandomCacheAsync(this FBMClientFactory client, CancellationToken cancellation = default)
- {
- //Get stored config
- CacheClientConfiguration conf = ClientCacheConfig.GetOrCreateValue(client);
-
- //Get all available nodes, or at least the initial peers
- CacheNodeAdvertisment[]? adverts = conf.NodeCollection.GetAllNodes() ?? throw new ArgumentException("No cache nodes discovered, cannot connect");
-
- //Select random node from all available nodes
- CacheNodeAdvertisment randomServer = adverts.SelectRandom();
-
- //Connect to the random server
- await ConnectToCacheAsync(client, randomServer, cancellation);
-
- //Return the random server we connected to
- return randomServer;
- }
-
- /// <summary>
- /// Connects to the specified server on the configured cache client
- /// </summary>
- /// <param name="factory"></param>
- /// <param name="server">The server to connect to</param>
- /// <param name="token">A token to cancel the operation</param>
- /// <returns>A task that resolves when the client is connected to the cache server</returns>
- /// <exception cref="FBMException"></exception>
- /// <exception cref="FBMServerNegiationException"></exception>
- /// <exception cref="ArgumentException"></exception>
- /// <exception cref="ArgumentNullException"></exception>
- /// <exception cref="SecurityException"></exception>
- /// <exception cref="ObjectDisposedException"></exception>
- public static async Task<FBMClient> ConnectToCacheAsync(this FBMClientFactory factory, CacheNodeAdvertisment server, CancellationToken token = default)
- {
- _ = factory ?? throw new ArgumentNullException(nameof(factory));
- _ = server ?? throw new ArgumentNullException(nameof(server));
-
- //Get stored config
- CacheClientConfiguration conf = ClientCacheConfig.GetOrCreateValue(factory);
-
- //Create new client
- FBMClient client = factory.CreateClient();
-
- try
- {
- //Connect to server (no server id because client not replication server)
- await ConnectToCacheAsync(client, conf, server, token);
- return client;
- }
- catch
- {
- client.Dispose();
- throw;
- }
- }
-
- /// <summary>
/// Connects to the specified server on the configured cache client
/// </summary>
/// <param name="client"></param>
@@ -517,9 +207,9 @@ namespace VNLib.Data.Caching.Extensions
/// <exception cref="ObjectDisposedException"></exception>
public static Task ConnectToCacheAsync(this FBMClient client, CacheNodeAdvertisment server, CacheClientConfiguration explicitConfig, CancellationToken token = default)
{
- _ = client ?? throw new ArgumentNullException(nameof(client));
- _ = server ?? throw new ArgumentNullException(nameof(server));
-
+ ArgumentNullException.ThrowIfNull(client);
+ ArgumentNullException.ThrowIfNull(server);
+
//Connect to server (no server id because client not replication server)
return ConnectToCacheAsync(client, explicitConfig, server, token);
}
@@ -546,7 +236,7 @@ namespace VNLib.Data.Caching.Extensions
NegotationRequest req = new(server.ConnectEndpoint, config);
//Exec negotiation
- RestResponse response = await SiteAdapter.Value.ExecuteAsync(req, token);
+ RestResponse response = await CacheSiteAdapter.Instance.ExecuteAsync(req, token);
/*
* JWT will already be veified by the endpoint adapter, so we
@@ -582,6 +272,12 @@ namespace VNLib.Data.Caching.Extensions
Scheme = config.UseTls ? "wss://" : "ws://"
};
+ //if the server is specifying https urls, then attempt to upgrade to wss
+ if (server.ConnectEndpoint.Scheme == Uri.UriSchemeHttps)
+ {
+ uriBuilder.Scheme = "wss://";
+ }
+
//Connect async
await client.ConnectAsync(uriBuilder.Uri, token);
}
@@ -658,7 +354,7 @@ namespace VNLib.Data.Caching.Extensions
/// <exception cref="CryptographicException"></exception>
public static bool VerifyUpgradeToken(this ICacheAuthManager man, string signature, string token, bool isPeer)
{
- _ = man ?? throw new ArgumentNullException(nameof(man));
+ ArgumentNullException.ThrowIfNull(man);
//get the hash of the token
byte[] hash = ManagedHash.ComputeHash(token, HashAlg.SHA256);
@@ -704,8 +400,15 @@ namespace VNLib.Data.Caching.Extensions
/// <param name="message">The advertisment message to verify</param>
/// <returns>The advertisment message if successfully verified, or null otherwise</returns>
/// <exception cref="FormatException"></exception>
- public static CacheNodeAdvertisment? VerifyPeerAdvertisment(this ICacheAuthManager config, string message)
+ public static CacheNodeAdvertisment? VerifyPeerAdvertisment(this ICacheAuthManager config, string? message)
{
+ ArgumentNullException.ThrowIfNull(config);
+
+ if (string.IsNullOrWhiteSpace(message))
+ {
+ return null;
+ }
+
using JsonWebToken jwt = JsonWebToken.Parse(message);
//Verify the signature
diff --git a/lib/VNLib.Data.Caching.Extensions/src/VNCacheClusterClient.cs b/lib/VNLib.Data.Caching.Extensions/src/VNCacheClusterClient.cs
new file mode 100644
index 0000000..050e1a3
--- /dev/null
+++ b/lib/VNLib.Data.Caching.Extensions/src/VNCacheClusterClient.cs
@@ -0,0 +1,78 @@
+/*
+* Copyright (c) 2024 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Data.Caching.Extensions
+* File: VNCacheClusterClient.cs
+*
+* VNCacheClusterClient.cs is part of VNLib.Data.Caching.Extensions which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* VNLib.Data.Caching.Extensions is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Data.Caching.Extensions is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+using System.Security;
+using System.Threading;
+using System.Threading.Tasks;
+
+using VNLib.Net.Messaging.FBM;
+using VNLib.Net.Messaging.FBM.Client;
+using VNLib.Data.Caching.Extensions.Clustering;
+
+namespace VNLib.Data.Caching.Extensions
+{
+ /// <summary>
+ /// Manages client connections to cluster nodes with discovery from <see cref="VNCacheClusterManager"/>
+ /// instance.
+ /// </summary>
+ /// <param name="config">The client configuration to use when discovering or connecting to cache nodes</param>
+ /// <param name="factory">The fbm client factory instance</param>
+ public class VNCacheClusterClient(CacheClientConfiguration config, FBMClientFactory factory)
+ : VNCacheClusterManager(config)
+ {
+
+ /// <summary>
+ /// Connects to the specified server on the configured cache client
+ /// </summary>
+ /// <param name="factory"></param>
+ /// <param name="server">The server to connect to</param>
+ /// <param name="token">A token to cancel the operation</param>
+ /// <returns>A task that resolves when the client is connected to the cache server</returns>
+ /// <exception cref="FBMException"></exception>
+ /// <exception cref="FBMServerNegiationException"></exception>
+ /// <exception cref="ArgumentException"></exception>
+ /// <exception cref="ArgumentNullException"></exception>
+ /// <exception cref="SecurityException"></exception>
+ /// <exception cref="ObjectDisposedException"></exception>
+ public async Task<FBMClient> ConnectToCacheAsync(CacheNodeAdvertisment server, CancellationToken token = default)
+ {
+ ArgumentNullException.ThrowIfNull(server);
+
+ FBMClient client = factory.CreateClient();
+
+ try
+ {
+ //Connect to server (no server id because client not replication server)
+ await client.ConnectToCacheAsync(server, config, token);
+ return client;
+ }
+ catch
+ {
+ client.Dispose();
+ throw;
+ }
+ }
+ }
+}
diff --git a/lib/VNLib.Data.Caching.Extensions/src/VNCacheClusterManager.cs b/lib/VNLib.Data.Caching.Extensions/src/VNCacheClusterManager.cs
new file mode 100644
index 0000000..4a1a6bd
--- /dev/null
+++ b/lib/VNLib.Data.Caching.Extensions/src/VNCacheClusterManager.cs
@@ -0,0 +1,300 @@
+/*
+* Copyright (c) 2024 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Data.Caching.Extensions
+* File: VNCacheClusterManager.cs
+*
+* VNCacheClusterManager.cs is part of VNLib.Data.Caching.Extensions
+* which is part of the larger VNLib collection of libraries and utilities.
+*
+* VNLib.Data.Caching.Extensions is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Data.Caching.Extensions is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+using System.Linq;
+using System.Security;
+using System.Text.Json;
+using System.Threading;
+using System.Threading.Tasks;
+using System.Collections.Generic;
+
+using RestSharp;
+
+using VNLib.Hashing.IdentityUtility;
+using VNLib.Net.Rest.Client.Construction;
+using VNLib.Data.Caching.Extensions.ApiModel;
+using VNLib.Data.Caching.Extensions.Clustering;
+
+namespace VNLib.Data.Caching.Extensions
+{
+
+ /// <summary>
+ /// A VNCache cluster client discovery maanger. Used to simplify the discovery
+ /// of cache nodes
+ /// </summary>
+ /// <param name="config">The client configuration instance</param>
+ public class VNCacheClusterManager(CacheClientConfiguration config)
+ {
+ /// <summary>
+ /// The internal collection of discovered nodes
+ /// </summary>
+ protected NodeDiscoveryCollection NodeCollection { get; } = GetNodeCollection(config);
+
+ /// <summary>
+ /// Gets the collection of discovered nodes within the manager
+ /// </summary>
+ public INodeDiscoveryCollection DiscoveredNodes => NodeCollection;
+
+ /// <summary>
+ /// The underlying <see cref="CacheClientConfiguration"/> instance
+ /// </summary>
+ public CacheClientConfiguration Config => config;
+
+ /// <summary>
+ /// Adds an array of nodes manually to the collection of discovered cluster nodes
+ /// </summary>
+ /// <param name="nodes"></param>
+ public void AddManualNodes(params CacheNodeAdvertisment[] nodes) => AddManualNodes(nodes.AsEnumerable());
+
+ /// <summary>
+ /// Adds an array of nodes manually to the collection of discovered cluster nodes
+ /// </summary>
+ /// <param name="nodes"></param>
+ public void AddManualNodes(IEnumerable<CacheNodeAdvertisment> nodes) => NodeCollection.AddManualNodes(nodes);
+
+ /// <summary>
+ /// Removes an array of nodes manually from the collection of discovered cluster nodes
+ /// </summary>
+ /// <param name="nodes"></param>
+ public void RemoveManualNodes(params CacheNodeAdvertisment[] nodes) => RemoveManualNodes(nodes.AsEnumerable());
+
+ /// <summary>
+ /// Removes an array of nodes manually from the collection of discovered cluster nodes
+ /// </summary>
+ /// <param name="nodes"></param>
+ public void RemoveManualNodes(IEnumerable<CacheNodeAdvertisment> nodes) => NodeCollection.RemoveManualNodes(nodes);
+
+ /// <summary>
+ /// Resolves the initial well-known cache nodes into their advertisments
+ /// </summary>
+ /// <param name="cancellation">A token to cancel the operation</param>
+ /// <returns>An array of resolved nodes</returns>
+ public async Task<CacheNodeAdvertisment[]> ResolveWellKnownAsync(CancellationToken cancellation)
+ {
+ Task<CacheNodeAdvertisment?>[] initialAdds = new Task<CacheNodeAdvertisment?>[config.WellKnownNodes.Length];
+
+ //Discover initial advertisments from well-known addresses
+ for (int i = 0; i < config.WellKnownNodes.Length; i++)
+ {
+ initialAdds[i] = DiscoverNodeConfigAsync(config.WellKnownNodes[i], cancellation);
+ }
+
+ //Wait for all initial adds to complete
+ await Task.WhenAll(initialAdds);
+
+ //Get the initial advertisments that arent null
+ return initialAdds.Select(static x => x.Result!).Where(static s => s != null).ToArray();
+ }
+
+ /// <summary>
+ /// Discovers ALL possible cache nodes itteritivley, first by collecting the configuration
+ /// from the initial peers.
+ /// This will make connections to all discoverable servers
+ /// </summary>
+ /// <param name="cancellation">A token to cancel the operation</param>
+ /// <returns></returns>
+ /// <exception cref="ArgumentException"></exception>
+ /// <exception cref="CacheDiscoveryFailureException"></exception>
+ /// <remarks>
+ /// This method simply combines the <see cref="ResolveWellKnownAsync"/> and <see cref="DiscoverNodesAsync"/>
+ /// methods into a single operation
+ /// </remarks>
+ public async Task DiscoverNodesAsync(CancellationToken cancellation)
+ {
+ //Make sure at least one node defined
+ if (config?.WellKnownNodes == null || config.WellKnownNodes.Length == 0)
+ {
+ throw new ArgumentException("There must be at least one cache node defined in the client configuration");
+ }
+
+ /*
+ * Connect to well-known nodes from the client configuration to discovery its layout.
+ *
+ */
+ CacheNodeAdvertisment[] initialPeers = await ResolveWellKnownAsync(cancellation);
+
+ if (initialPeers.Length == 0)
+ {
+ throw new CacheDiscoveryFailureException("There must be at least one available cache node to continue discovery");
+ }
+
+ await DiscoverNodesAsync(initialPeers, cancellation);
+ }
+
+ /// <summary>
+ /// Discovers ALL possible cache nodes itteritivley from the current collection of initial peers.
+ /// This will make connections to all discoverable servers and update the client configuration, with all
+ /// discovered peers
+ /// </summary>
+ /// <param name="initialPeers">Accepts an array of initial peers to override the endpoint discovery process</param>
+ /// <param name="cancellation">A token to cancel the operation</param>
+ /// <returns>A task that completes when all nodes have been discovered</returns>
+ /// <exception cref="ArgumentException"></exception>
+ /// <exception cref="CacheDiscoveryFailureException"></exception>
+ public async Task DiscoverNodesAsync(CacheNodeAdvertisment[] initialPeers, CancellationToken cancellation)
+ {
+ //Make sure at least one node defined
+ ArgumentNullException.ThrowIfNull(initialPeers);
+ ArgumentOutOfRangeException.ThrowIfZero(initialPeers.Length);
+
+ //Get the discovery enumerator with the initial peers
+ using INodeDiscoveryEnumerator enumerator = NodeCollection.BeginDiscovery(initialPeers);
+
+ //Start the discovery process
+ await DiscoverNodesAsync(enumerator, config, config.ErrorHandler, cancellation);
+
+ //Commit discovered nodes to stored node collection
+ NodeCollection.CompleteDiscovery(enumerator);
+ }
+
+ private static async Task DiscoverNodesAsync(
+ INodeDiscoveryEnumerator enumerator,
+ CacheClientConfiguration config,
+ ICacheDiscoveryErrorHandler? errHandler,
+ CancellationToken cancellation
+ )
+ {
+ //Loop through servers
+ while (enumerator.MoveNext())
+ {
+ //Make sure the node has a discovery endpoint
+ if (enumerator.Current.DiscoveryEndpoint == null)
+ {
+ //Skip this node
+ continue;
+ }
+
+ /*
+ * We are allowed to save nodes that do not have a discovery endpoint, but we cannot
+ * discover nodes from them we can only use them as cache
+ */
+
+ //add a random delay to avoid spamming servers
+ await Task.Delay((int)Random.Shared.NextInt64(100, 500), cancellation);
+
+ try
+ {
+ //Discover nodes from the current node
+ CacheNodeAdvertisment[]? nodes = await GetCacheNodesAsync(enumerator.Current, config, cancellation);
+
+ if (nodes != null)
+ {
+ //Add nodes to the collection
+ enumerator.OnPeerDiscoveryComplete(nodes);
+ }
+ }
+ //Catch exceptions when an error handler is defined
+ catch (Exception ex) when (errHandler != null)
+ {
+ //Handle the error
+ errHandler.OnDiscoveryError(enumerator.Current, ex);
+ }
+ catch (Exception ex)
+ {
+ throw new CacheDiscoveryFailureException($"Failed to discovery peer node {enumerator.Current?.NodeId}, cannot continue", ex);
+ }
+ }
+ }
+
+ /// <summary>
+ /// Contacts the given server's discovery endpoint to discover a list of available
+ /// servers we can connect to
+ /// </summary>
+ /// <param name="advert">An advertisment of a server to discover other nodes from</param>
+ /// <param name="cancellationToken">A token to cancel the operationS</param>
+ /// <param name="config">The cache configuration object</param>
+ /// <returns>The list of active servers</returns>
+ /// <exception cref="SecurityException"></exception>
+ /// <exception cref="ArgumentException"></exception>
+ /// <exception cref="ArgumentNullException"></exception>
+ public static async Task<CacheNodeAdvertisment[]?> GetCacheNodesAsync(CacheNodeAdvertisment advert, CacheClientConfiguration config, CancellationToken cancellationToken = default)
+ {
+ ArgumentNullException.ThrowIfNull(advert);
+ ArgumentNullException.ThrowIfNull(config);
+ ArgumentNullException.ThrowIfNull(advert.DiscoveryEndpoint, nameof(advert.DiscoveryEndpoint));
+
+ DiscoveryRequest req = new (advert.DiscoveryEndpoint, config);
+
+ //Site adapter verifies response messages so we dont need to check on the response
+ byte[] data = await CacheSiteAdapter.Instance.ExecuteAsync(req, cancellationToken).AsBytes()
+ ?? throw new InvalidOperationException($"No data returned from node {advert.NodeId}");
+
+ //Response is jwt
+ using JsonWebToken responseJwt = JsonWebToken.ParseRaw(data);
+
+ using JsonDocument doc = responseJwt.GetPayload();
+ return doc.RootElement.GetProperty("peers").Deserialize<CacheNodeAdvertisment[]>();
+ }
+
+
+ /*
+ * This method will connect to a given well-known (cache config endpoint) and discover the
+ * servers configuration (endpoint config)
+ *
+ * This function exists so clients only need a single endpoint to connect to, and the server
+ * will return it's signed configuration data (including cluster network information)
+ */
+ private async Task<CacheNodeAdvertisment?> DiscoverNodeConfigAsync(Uri serverUri, CancellationToken cancellation)
+ {
+ try
+ {
+ GetConfigRequest req = new (serverUri, config);
+
+ //Site adapter verifies response messages so we dont need to check on the response
+ byte[] data = await CacheSiteAdapter.Instance.ExecuteAsync(req, cancellation).AsBytes()
+ ?? throw new CacheDiscoveryFailureException($"No data returned from desired cache node");
+
+ //Response is jwt
+ using JsonWebToken responseJwt = JsonWebToken.ParseRaw(data);
+
+ //The entire payload is just the single serialzed advertisment
+ using JsonDocument doc = responseJwt.GetPayload();
+
+ return doc.RootElement.GetProperty("sub").Deserialize<CacheNodeAdvertisment>();
+ }
+ //Bypass cdfe when error handler is null (avoid nesting)`
+ catch (CacheDiscoveryFailureException) when (config.ErrorHandler == null)
+ {
+ throw;
+ }
+ //Catch exceptions when an error handler is defined
+ catch (Exception ex) when (config.ErrorHandler != null)
+ {
+ //Handle the error
+ config.ErrorHandler.OnDiscoveryError(null!, ex);
+ return null;
+ }
+ catch (Exception ex)
+ {
+ throw new CacheDiscoveryFailureException("Failed to discover node configuration", ex);
+ }
+ }
+
+ private static NodeDiscoveryCollection GetNodeCollection(CacheClientConfiguration config)
+ {
+ return config is CacheNodeConfiguration cnc ? new (cnc.NodeIdRef!) : new (null);
+ }
+ }
+}
diff --git a/lib/VNLib.Data.Caching/src/ClientExtensions.cs b/lib/VNLib.Data.Caching/src/ClientExtensions.cs
index f0eee06..e0aa744 100644
--- a/lib/VNLib.Data.Caching/src/ClientExtensions.cs
+++ b/lib/VNLib.Data.Caching/src/ClientExtensions.cs
@@ -451,7 +451,7 @@ namespace VNLib.Data.Caching
/// <param name="client"></param>
/// <param name="change">The instance to store change event data to</param>
/// <param name="cancellationToken">A token to cancel the deuque operation</param>
- /// <returns>A <see cref="WaitForChangeResult"/> that contains information about the modified element</returns>
+ /// <returns>A task that completes when a change has occured</returns>
/// <exception cref="InvalidResponseException"></exception>
/// <exception cref="InvalidOperationException"></exception>
public static async Task WaitForChangeAsync(this FBMClient client, WaitForChangeResult change, CancellationToken cancellationToken = default)
diff --git a/plugins/ObjectCacheServer/src/BucketLocalManagerFactory.cs b/plugins/ObjectCacheServer/src/BucketLocalManagerFactory.cs
index e7fa3e1..944aa4b 100644
--- a/plugins/ObjectCacheServer/src/BucketLocalManagerFactory.cs
+++ b/plugins/ObjectCacheServer/src/BucketLocalManagerFactory.cs
@@ -33,11 +33,10 @@ using System.Runtime.CompilerServices;
using VNLib.Plugins;
using VNLib.Utils;
using VNLib.Utils.Memory;
-using VNLib.Utils.Extensions;
-using VNLib.Plugins.Extensions.Loading;
using VNLib.Utils.Memory.Diagnostics;
using VNLib.Utils.Logging;
-
+using VNLib.Utils.Extensions;
+using VNLib.Plugins.Extensions.Loading;
/*
* How bucket local memory works:
*
diff --git a/plugins/ObjectCacheServer/src/Cache/CacheListenerPubQueue.cs b/plugins/ObjectCacheServer/src/Cache/CacheListenerPubQueue.cs
index 6942828..16fda39 100644
--- a/plugins/ObjectCacheServer/src/Cache/CacheListenerPubQueue.cs
+++ b/plugins/ObjectCacheServer/src/Cache/CacheListenerPubQueue.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2023 Vaughn Nugent
+* Copyright (c) 2024 Vaughn Nugent
*
* Library: VNLib
* Package: ObjectCacheServer
@@ -50,11 +50,11 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache
private readonly AsyncQueue<ChangeEvent> _listenerQueue;
private readonly ILogProvider _logProvider;
- private readonly ICacheEventQueueManager _queueManager;
+ private readonly PeerEventQueueManager _queueManager;
public CacheListenerPubQueue(PluginBase plugin)
{
- _queueManager = plugin.GetOrCreateSingleton<CacheEventQueueManager>();
+ _queueManager = plugin.GetOrCreateSingleton<PeerEventQueueManager>();
_logProvider = plugin.Log.CreateScope(LOG_SCOPE_NAME);
//Init local queue to store published events
@@ -110,10 +110,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache
}
///<inheritdoc/>
- public bool IsEnabled([NotNullWhen(true)] IPeerEventQueue? userState)
- {
- return userState is IPeerEventQueue;
- }
+ public bool IsEnabled([NotNullWhen(true)] IPeerEventQueue? userState) => userState is not null;
///<inheritdoc/>
public void PublishEvent(ChangeEvent changeEvent)
diff --git a/plugins/ObjectCacheServer/src/Cache/CacheConfiguration.cs b/plugins/ObjectCacheServer/src/Cache/CacheMemoryConfiguration.cs
index bd15d24..c404cc5 100644
--- a/plugins/ObjectCacheServer/src/Cache/CacheConfiguration.cs
+++ b/plugins/ObjectCacheServer/src/Cache/CacheMemoryConfiguration.cs
@@ -1,12 +1,12 @@
/*
-* Copyright (c) 2023 Vaughn Nugent
+* Copyright (c) 2024 Vaughn Nugent
*
* Library: VNLib
* Package: ObjectCacheServer
-* File: CacheConfiguration.cs
+* File: CacheMemoryConfiguration.cs
*
-* CacheConfiguration.cs is part of ObjectCacheServer which is part of the larger
-* VNLib collection of libraries and utilities.
+* CacheMemoryConfiguration.cs is part of ObjectCacheServer which
+* is part of the larger VNLib collection of libraries and utilities.
*
* ObjectCacheServer is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
@@ -26,7 +26,7 @@ using System.Text.Json.Serialization;
namespace VNLib.Data.Caching.ObjectCache.Server.Cache
{
- internal sealed class CacheConfiguration
+ internal sealed class CacheMemoryConfiguration
{
[JsonPropertyName("buffer_recv_max")]
public int MaxRecvBufferSize { get; set; } = 1000 * 1024;
@@ -36,6 +36,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache
[JsonPropertyName("buffer_header_max")]
public int MaxHeaderBufferSize { get; set; } = 2 * 1024;
+
[JsonPropertyName("buffer_header_min")]
public int MinHeaderBufferSize { get; set; } = 128;
@@ -49,5 +50,9 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache
[JsonPropertyName("buckets")]
public uint BucketCount { get; set; } = 10;
+
+
+ [JsonPropertyName("memory_lib_path")]
+ public string? ExternLibPath { get; set; }
}
}
diff --git a/plugins/ObjectCacheServer/src/Cache/CacheStore.cs b/plugins/ObjectCacheServer/src/Cache/CacheStore.cs
index 86df849..81f4843 100644
--- a/plugins/ObjectCacheServer/src/Cache/CacheStore.cs
+++ b/plugins/ObjectCacheServer/src/Cache/CacheStore.cs
@@ -26,32 +26,24 @@ using System;
using System.Threading;
using System.Threading.Tasks;
-using VNLib.Utils.Logging;
-using VNLib.Net.Messaging.FBM;
-using VNLib.Plugins;
using VNLib.Plugins.Extensions.Loading;
namespace VNLib.Data.Caching.ObjectCache.Server.Cache
{
+
/*
* Implements the blob cache store, which is an abstraction around the blob cache listener.
* This allows for publishing local events (say from other nodes) to keep caches in sync.
*/
[ConfigurationName("cache")]
- internal sealed class CacheStore(PluginBase plugin, IConfigScope config) : ICacheStore, IDisposable
+ internal sealed class CacheStore(IBlobCacheTable table) : ICacheStore
{
- /// <summary>
- /// Gets the underlying cache listener
- /// </summary>
- public BlobCacheListener<IPeerEventQueue> Listener { get; } = InitializeCache((ObjectCacheServerEntry)plugin, config);
-
-
///<inheritdoc/>
ValueTask ICacheStore.AddOrUpdateBlobAsync<T>(string objectId, string? alternateId, ObjectDataGet<T> bodyData, T state, CancellationToken token)
{
- return Listener.Cache.AddOrUpdateObjectAsync(objectId, alternateId, bodyData, state, default, token);
+ return table.AddOrUpdateObjectAsync(objectId, alternateId, bodyData, state, default, token);
}
///<inheritdoc/>
@@ -63,64 +55,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache
///<inheritdoc/>
ValueTask<bool> ICacheStore.DeleteItemAsync(string id, CancellationToken token)
{
- return Listener.Cache.DeleteObjectAsync(id, token);
- }
-
- private static BlobCacheListener<IPeerEventQueue> InitializeCache(ObjectCacheServerEntry plugin, IConfigScope config)
- {
- const string CacheConfigTemplate =
-@"
-Cache Configuration:
- Max memory: {max} Mb
- Buckets: {bc}
- Entries per-bucket: {mc}
-";
-
- //Deserialize the cache config
- CacheConfiguration cacheConf = config.Deserialze<CacheConfiguration>();
-
- if (cacheConf.MaxCacheEntries < 2)
- {
- throw new ArgumentException("You must configure a 'max_cache' size larger than 1 item");
- }
-
- //Suggestion
- if (cacheConf.MaxCacheEntries < 200)
- {
- plugin.Log.Information("Suggestion: You may want a larger cache size, you have less than 200 items in cache");
- }
-
- //calculate the max memory usage
- ulong maxByteSize = cacheConf.MaxCacheEntries * (ulong)cacheConf.BucketCount * (ulong)cacheConf.MaxMessageSize;
-
- //Log the cache config
- plugin.Log.Information(CacheConfigTemplate,
- maxByteSize / (1024 * 1000),
- cacheConf.BucketCount,
- cacheConf.MaxCacheEntries
- );
-
- //Get the event listener
- ICacheListenerEventQueue<IPeerEventQueue> queue = plugin.GetOrCreateSingleton<CacheListenerPubQueue>();
-
- //Get the memory manager
- ICacheMemoryManagerFactory manager = plugin.GetOrCreateSingleton<BucketLocalManagerFactory>();
-
- //Load the blob cache table system
- IBlobCacheTable bc = plugin.LoadMemoryCacheSystem(config, manager, cacheConf);
-
- FallbackFBMMemoryManager fbmMemManager = new(plugin.ListenerHeap);
-
- //Endpoint only allows for a single reader
- return new(bc, queue, plugin.Log, fbmMemManager);
- }
-
- /*
- * Cleaned up by the plugin on exit
- */
- public void Dispose()
- {
- Listener.Dispose();
+ return table.DeleteObjectAsync(id, token);
}
}
}
diff --git a/plugins/ObjectCacheServer/src/Cache/CacheSystemUtil.cs b/plugins/ObjectCacheServer/src/Cache/CacheSystemUtil.cs
index b7bf83f..8f196b0 100644
--- a/plugins/ObjectCacheServer/src/Cache/CacheSystemUtil.cs
+++ b/plugins/ObjectCacheServer/src/Cache/CacheSystemUtil.cs
@@ -29,6 +29,7 @@ using System.Text.Json;
using VNLib.Utils.Resources;
using VNLib.Plugins;
using VNLib.Plugins.Extensions.Loading;
+using VNLib.Utils.Extensions;
namespace VNLib.Data.Caching.ObjectCache.Server.Cache
{
@@ -49,7 +50,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache
/// <param name="cacheConf">The cache configuration object</param>
/// <returns>The loaded <see cref="IBlobCacheTable"/> implementation</returns>
/// <exception cref="FileNotFoundException"></exception>
- public static IBlobCacheTable LoadMemoryCacheSystem(this PluginBase plugin, IConfigScope config, ICacheMemoryManagerFactory heap, CacheConfiguration cacheConf)
+ public static IBlobCacheTable LoadMemoryCacheSystem(this PluginBase plugin, IConfigScope config, ICacheMemoryManagerFactory heap, CacheMemoryConfiguration cacheConf)
{
#pragma warning disable CA2000 // Dispose objects before losing scope
@@ -94,10 +95,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache
if(initMethod != null)
{
//Itterate all buckets
- foreach (IBlobCacheBucket bucket in table)
- {
- initMethod.Invoke(bucket.Id);
- }
+ table.ForEach(bucket => initMethod(bucket.Id));
}
}
diff --git a/plugins/ObjectCacheServer/src/Cache/CacheEventQueueManager.cs b/plugins/ObjectCacheServer/src/Cache/PeerEventQueueManager.cs
index e3c613d..12cf37a 100644
--- a/plugins/ObjectCacheServer/src/Cache/CacheEventQueueManager.cs
+++ b/plugins/ObjectCacheServer/src/Cache/PeerEventQueueManager.cs
@@ -1,11 +1,11 @@
/*
-* Copyright (c) 2023 Vaughn Nugent
+* Copyright (c) 2024 Vaughn Nugent
*
* Library: VNLib
* Package: ObjectCacheServer
-* File: CacheEventQueueManager.cs
+* File: PeerEventQueueManager.cs
*
-* CacheEventQueueManager.cs is part of ObjectCacheServer which is
+* PeerEventQueueManager.cs is part of ObjectCacheServer which is
* part of the larger VNLib collection of libraries and utilities.
*
* ObjectCacheServer is free software: you can redistribute it and/or modify
@@ -38,41 +38,37 @@ using VNLib.Plugins.Extensions.Loading.Events;
namespace VNLib.Data.Caching.ObjectCache.Server.Cache
{
- internal sealed class CacheEventQueueManager : ICacheEventQueueManager, IDisposable, IIntervalScheduleable
+ internal sealed class PeerEventQueueManager : ICacheEventQueueManager, IIntervalScheduleable
{
private readonly int MaxQueueDepth;
- private readonly object SubLock;
- private readonly LinkedList<NodeQueue> Subscribers;
+ private readonly object SubLock = new();
+ private readonly LinkedList<PeerEventListenerQueue> Subscribers = [];
- private readonly object StoreLock;
- private readonly Dictionary<string, NodeQueue> QueueStore;
+ private readonly object StoreLock = new();
+ private readonly Dictionary<string, PeerEventListenerQueue> QueueStore = new(StringComparer.OrdinalIgnoreCase);
-
- public CacheEventQueueManager(PluginBase plugin)
+ public PeerEventQueueManager(PluginBase plugin, NodeConfig config)
{
- //Get node config
- NodeConfig config = plugin.GetOrCreateSingleton<NodeConfig>();
-
- //Get max queue depth
MaxQueueDepth = config.MaxQueueDepth;
/*
- * Schedule purge interval to clean up stale queues
- */
+ * Schedule purge interval to clean up stale queues
+ */
plugin.ScheduleInterval(this, config.EventQueuePurgeInterval);
-
- SubLock = new();
- Subscribers = new();
-
- StoreLock = new();
- QueueStore = new(StringComparer.OrdinalIgnoreCase);
+
+ //Cleanup disposeables on unload
+ _ = plugin.RegisterForUnload(() =>
+ {
+ QueueStore.Clear();
+ Subscribers.Clear();
+ });
}
///<inheritdoc/>
public IPeerEventQueue Subscribe(ICachePeer peer)
{
- NodeQueue? nq;
+ PeerEventListenerQueue? nq;
bool isNew = false;
@@ -82,13 +78,13 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache
//Try to recover the queue for the node
if (!QueueStore.TryGetValue(peer.NodeId, out nq))
{
- //Create new queue
+ //Create new queue since an existing queue was not found
nq = new(peer.NodeId, MaxQueueDepth);
QueueStore.Add(peer.NodeId, nq);
isNew = true;
}
- //Increment listener count
+ //Increment listener count since a new listener has attached
nq.Listeners++;
}
@@ -109,11 +105,20 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache
///<inheritdoc/>
public void Unsubscribe(ICachePeer peer)
{
+ /*
+ * The reason I am not purging queues that no longer have listeners
+ * now is because it is possible that a listener needed to detach because of
+ * a network issue and will be reconnecting shortly. If the node doesnt
+ * come back before the next purge interval, it's events will be purged.
+ *
+ * Point is: there is a reason for the garbage collection style purging
+ */
+
//Detach a listener for a node
lock (StoreLock)
{
//Get the queue and decrement the listener count
- NodeQueue nq = QueueStore[peer.NodeId];
+ PeerEventListenerQueue nq = QueueStore[peer.NodeId];
nq.Listeners--;
}
}
@@ -125,7 +130,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache
lock (SubLock)
{
//Loop through ll the fast way
- LinkedListNode<NodeQueue>? q = Subscribers.First;
+ LinkedListNode<PeerEventListenerQueue>? q = Subscribers.First;
while (q != null)
{
@@ -145,7 +150,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache
lock (SubLock)
{
//Loop through ll the fast way
- LinkedListNode<NodeQueue>? q = Subscribers.First;
+ LinkedListNode<PeerEventListenerQueue>? q = Subscribers.First;
while (q != null)
{
@@ -167,9 +172,9 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache
lock (StoreLock)
{
//Get all stale queues (queues without listeners)
- NodeQueue[] staleQueues = QueueStore.Values.Where(static nq => nq.Listeners == 0).ToArray();
+ PeerEventListenerQueue[] staleQueues = QueueStore.Values.Where(static nq => nq.Listeners == 0).ToArray();
- foreach (NodeQueue nq in staleQueues)
+ foreach (PeerEventListenerQueue nq in staleQueues)
{
//Remove from store
QueueStore.Remove(nq.NodeId);
@@ -191,54 +196,39 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache
return Task.CompletedTask;
}
- void IDisposable.Dispose()
- {
- QueueStore.Clear();
- Subscribers.Clear();
- }
/*
* Holds queues for each node and keeps track of the number of listeners
* attached to the queue
+ *
+ * The role of this class is to store change events for a given peer node,
+ * and return them when the peer requests them. It also keeps track of the
+ * number of active listeners (server connections) to the queue.
*/
- private sealed class NodeQueue : IPeerEventQueue
+ private sealed class PeerEventListenerQueue(string nodeId, int maxDepth) : IPeerEventQueue
{
public int Listeners;
- public string NodeId { get; }
-
- public AsyncQueue<ChangeEvent> Queue { get; }
+ public string NodeId => nodeId;
- public NodeQueue(string nodeId, int maxDepth)
+ /*
+ * Create a bounded channel that acts as a lru and evicts
+ * the oldest item when the queue is full
+ *
+ * There will also only ever be a single thread writing events
+ * to the queue
+ */
+ private readonly AsyncQueue<ChangeEvent> Queue = new(new BoundedChannelOptions(maxDepth)
{
- NodeId = nodeId;
-
- /*
- * Create a bounded channel that acts as a lru and evicts
- * the oldest item when the queue is full
- *
- * There will also only ever be a single thread writing events
- * to the queue
- */
-
- BoundedChannelOptions queueOptions = new(maxDepth)
- {
- AllowSynchronousContinuations = true,
- SingleReader = false,
- SingleWriter = true,
- //Drop oldest item in queue if full
- FullMode = BoundedChannelFullMode.DropOldest,
- };
-
- //Init queue/channel
- Queue = new(queueOptions);
- }
+ AllowSynchronousContinuations = true,
+ SingleReader = false,
+ SingleWriter = true,
+ //Drop oldest item in queue if full
+ FullMode = BoundedChannelFullMode.DropOldest,
+ });
- public void PublishChange(ChangeEvent change)
- {
- Queue.TryEnque(change);
- }
+ public void PublishChange(ChangeEvent change) => Queue.TryEnque(change);
public void PublishChanges(Span<ChangeEvent> changes)
{
@@ -249,16 +239,10 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache
}
///<inheritdoc/>
- public ValueTask<ChangeEvent> DequeueAsync(CancellationToken cancellation)
- {
- return Queue.DequeueAsync(cancellation);
- }
+ public ValueTask<ChangeEvent> DequeueAsync(CancellationToken cancellation) => Queue.DequeueAsync(cancellation);
///<inheritdoc/>
- public bool TryDequeue(out ChangeEvent change)
- {
- return Queue.TryDequeue(out change);
- }
+ public bool TryDequeue(out ChangeEvent change) => Queue.TryDequeue(out change);
}
}
}
diff --git a/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs b/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs
index a240dde..0a1bb4d 100644
--- a/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs
+++ b/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2023 Vaughn Nugent
+* Copyright (c) 2024 Vaughn Nugent
*
* Library: VNLib
* Package: ObjectCacheServer
@@ -36,7 +36,6 @@ using VNLib.Net.Messaging.FBM;
using VNLib.Net.Messaging.FBM.Client;
using VNLib.Plugins.Extensions.Loading;
using VNLib.Data.Caching.Extensions.Clustering;
-using VNLib.Data.Caching.ObjectCache.Server.Cache;
namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
{
@@ -56,38 +55,37 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
internal sealed class CacheNodeReplicationMaanger : IAsyncBackgroundWork
{
private const string LOG_SCOPE_NAME = "REPL";
+ private const string FBM_LOG_SCOPE_NAME = "REPL-CLNT";
private static readonly TimeSpan GetItemTimeout = TimeSpan.FromSeconds(10);
private const int MAX_MESSAGE_SIZE = 12 * 1024;
private readonly PluginBase _plugin;
private readonly ILogProvider _log;
- private readonly NodeConfig _nodeConfig;
- private readonly ICacheStore _cacheStore;
- private readonly ICachePeerAdapter _peerAdapter;
private readonly FBMClientFactory _clientFactory;
-
+ private readonly ObjectCacheSystemState _sysState;
+
private readonly bool _isDebug;
private int _openConnections;
public CacheNodeReplicationMaanger(PluginBase plugin)
{
- //Load the node config
- _nodeConfig = plugin.GetOrCreateSingleton<NodeConfig>();
- _cacheStore = plugin.GetOrCreateSingleton<CacheStore>();
- _peerAdapter = plugin.GetOrCreateSingleton<PeerDiscoveryManager>();
+ _sysState = plugin.GetOrCreateSingleton<ObjectCacheSystemState>();
//Init fbm config with fixed message size
FBMClientConfig clientConfig = FBMDataCacheExtensions.GetDefaultConfig(
- (plugin as ObjectCacheServerEntry)!.ListenerHeap,
+ _sysState.SharedCacheHeap,
MAX_MESSAGE_SIZE,
- debugLog: plugin.IsDebug() ? plugin.Log : null
+ debugLog: plugin.IsDebug() ? plugin.Log.CreateScope(FBM_LOG_SCOPE_NAME) : null
);
//Init ws fallback factory and client factory
- FBMFallbackClientWsFactory wsFactory = new();
- _clientFactory = new(in clientConfig, wsFactory);
+ _clientFactory = new(
+ ref clientConfig,
+ new FBMFallbackClientWsFactory(),
+ (int)_sysState.Configuration.MaxPeerConnections
+ );
_plugin = plugin;
_isDebug = plugin.IsDebug();
@@ -103,7 +101,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
while (true)
{
//Get all new peers
- CacheNodeAdvertisment[] peers = _peerAdapter.GetNewPeers();
+ CacheNodeAdvertisment[] peers = _sysState.PeerDiscovery.GetNewPeers();
if (peers.Length == 0 && _isDebug)
{
@@ -111,7 +109,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
}
//Make sure we don't exceed the max connections
- if(_openConnections >= _nodeConfig.MaxPeerConnections)
+ if(_openConnections >= _sysState.Configuration.MaxPeerConnections)
{
if (_isDebug)
{
@@ -150,13 +148,13 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
private async Task OnNewPeerDoWorkAsync(CacheNodeAdvertisment newPeer, ILogProvider log, CancellationToken exitToken)
{
- _ = newPeer ?? throw new ArgumentNullException(nameof(newPeer));
+ ArgumentNullException.ThrowIfNull(newPeer);
//Setup client
FBMClient client = _clientFactory.CreateClient();
//Add peer to monitor
- _peerAdapter.OnPeerListenerAttached(newPeer);
+ _sysState.PeerDiscovery.OnPeerListenerAttached(newPeer);
Interlocked.Increment(ref _openConnections);
@@ -165,12 +163,12 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
log.Information("Establishing replication connection to peer {server}...", newPeer.NodeId);
//Connect to the server
- await client.ConnectToCacheAsync(newPeer, _nodeConfig.Config, exitToken);
+ await client.ConnectToCacheAsync(newPeer, _sysState.Configuration.Config, exitToken);
log.Information("Connected to {server}, starting queue listeners", newPeer.NodeId);
//Start worker tasks
- List<Task> workerTasks = new();
+ List<Task> workerTasks = [];
for (int i = 0; i < Environment.ProcessorCount; i++)
{
@@ -187,6 +185,10 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
//Disconnect client gracefully
await client.DisconnectAsync(CancellationToken.None);
}
+ catch(FBMServerNegiationException fbm)
+ {
+ log.Error("Failed to negotiate buffer configuration, check your cache memory configuration. Error:{err}", fbm.Message);
+ }
catch (InvalidResponseException ie)
{
//See if the plugin is unloading
@@ -227,7 +229,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
client.Dispose();
//Notify monitor of disconnect
- _peerAdapter.OnPeerListenerDetatched(newPeer);
+ _sysState.PeerDiscovery.OnPeerListenerDetatched(newPeer);
}
}
@@ -259,7 +261,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
return;
case "deleted":
//Delete the object from the store
- await _cacheStore.DeleteItemAsync(changedObject.CurrentId, CancellationToken.None);
+ await _sysState.InternalStore.DeleteItemAsync(changedObject.CurrentId, CancellationToken.None);
break;
case "modified":
//Reload the record from the store
@@ -297,7 +299,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
if (ResponseCodes.Okay.Equals(status, StringComparison.Ordinal))
{
//Update the record
- await _cacheStore.AddOrUpdateBlobAsync(objectId, newId, static (t) => t.ResponseBody, response, cancellation);
+ await _sysState.InternalStore.AddOrUpdateBlobAsync(objectId, newId, static (t) => t.ResponseBody, response, cancellation);
log.Debug("Updated object {id}", objectId);
}
else
diff --git a/plugins/ObjectCacheServer/src/Clustering/CachePeerMonitor.cs b/plugins/ObjectCacheServer/src/Clustering/CachePeerMonitor.cs
index c49a54b..c3fbd8e 100644
--- a/plugins/ObjectCacheServer/src/Clustering/CachePeerMonitor.cs
+++ b/plugins/ObjectCacheServer/src/Clustering/CachePeerMonitor.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2023 Vaughn Nugent
+* Copyright (c) 2024 Vaughn Nugent
*
* Library: VNLib
* Package: ObjectCacheServer
@@ -29,7 +29,6 @@ using System.Collections.Generic;
using VNLib.Utils;
using VNLib.Utils.Extensions;
-using VNLib.Plugins;
namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
{
@@ -37,12 +36,9 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
internal sealed class CachePeerMonitor : VnDisposeable, IPeerMonitor
{
- private readonly LinkedList<ICachePeer> peers = new();
+ private readonly List<ICachePeer> peers = new();
private readonly ManualResetEvent newPeerTrigger = new (false);
- public CachePeerMonitor(PluginBase plugin)
- { }
-
/// <summary>
/// Waits for new peers to connect to the server
/// </summary>
@@ -70,7 +66,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
//When a peer is connected we can add it to the list so the replication manager can see it
lock(peers)
{
- peers.AddLast(peer);
+ peers.Add(peer);
}
//Trigger monitor when change occurs
@@ -92,6 +88,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
protected override void Free()
{
+ peers.Clear();
newPeerTrigger.Dispose();
}
}
diff --git a/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs b/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs
index 6475f9c..f22e1dd 100644
--- a/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs
+++ b/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2023 Vaughn Nugent
+* Copyright (c) 2024 Vaughn Nugent
*
* Library: VNLib
* Package: ObjectCacheServer
@@ -24,14 +24,11 @@
using System;
using System.Linq;
-using System.Net.Http;
using System.Threading;
-using System.Net.Sockets;
using System.Threading.Tasks;
using System.Collections.Generic;
using VNLib.Utils.Logging;
-using VNLib.Plugins;
using VNLib.Plugins.Extensions.Loading;
using VNLib.Data.Caching.Extensions;
using VNLib.Data.Caching.Extensions.Clustering;
@@ -43,9 +40,10 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
* This class is responsible for resolving and discovering peer nodes in the cluster network.
*/
- internal sealed class PeerDiscoveryManager : IAsyncBackgroundWork, ICachePeerAdapter
+ internal sealed class PeerDiscoveryManager(NodeConfig config, ILogProvider Log, bool IsDebug, bool HasWellKnown) : IAsyncBackgroundWork, ICachePeerAdapter
{
- private const string LOG_SCOPE_NAME = "DISC";
+ internal const string LOG_SCOPE_NAME = "DISC";
+
/*
* The initial discovery delay. This allows for the server to initialize before
* starting the discovery process. This will probably be a shorter delay
@@ -54,43 +52,9 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
private static readonly TimeSpan InitialDelay = TimeSpan.FromSeconds(15);
private static readonly TimeSpan WhenWellKnownResolveFailed = TimeSpan.FromSeconds(20);
-
- private readonly List<CacheNodeAdvertisment> _connectedPeers;
- private readonly NodeConfig Config;
- private readonly CachePeerMonitor Monitor;
- private readonly ILogProvider Log;
- private readonly bool IsDebug;
- private readonly bool HasWellKnown;
-
- public PeerDiscoveryManager(PluginBase plugin)
- {
- //Get config
- Config = plugin.GetOrCreateSingleton<NodeConfig>();
-
- //Get the known peers array from config, its allowed to be null for master nodes
- IConfigScope? config = plugin.TryGetConfig("known_peers");
- string[] kownPeers = config?.Deserialze<string[]>() ?? Array.Empty<string>();
-
- //Add known peers to the monitor
- Config.Config.WithInitialPeers(kownPeers.Select(static s => new Uri(s)));
-
- HasWellKnown = kownPeers.Length > 0;
-
- //Get the peer monitor
- Monitor = plugin.GetOrCreateSingleton<CachePeerMonitor>();
-
- _connectedPeers = new();
-
- //Create scoped logger
- Log = plugin.Log.CreateScope(LOG_SCOPE_NAME);
-
- Log.Information("Inital peer nodes: {nodes}", kownPeers);
-
- //Setup discovery error handler
- Config.Config.WithErrorHandler(new ErrorHandler(Log));
-
- IsDebug = plugin.IsDebug();
- }
+ private readonly List<CacheNodeAdvertisment> _connectedPeers = [];
+ private readonly CachePeerMonitor Monitor = new();
+ private readonly VNCacheClusterManager clusterMan = new(config.Config);
async Task IAsyncBackgroundWork.DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken)
{
@@ -124,7 +88,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
}
//Resolve all known peers
- CacheNodeAdvertisment[] wellKnown = await Config.Config.ResolveWellKnownAsync(exitToken);
+ CacheNodeAdvertisment[] wellKnown = await clusterMan.ResolveWellKnownAsync(exitToken);
wellKnownFailed = wellKnown.Length == 0;
//Use the monitor to get the initial peers
@@ -136,13 +100,13 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
if (allAds.Length > 0)
{
//Discover all known nodes
- await Config.Config.DiscoverNodesAsync(allAds, exitToken);
+ await clusterMan.DiscoverNodesAsync(allAds, exitToken);
}
//Log the discovered nodes if verbose logging is enabled
if (IsDebug)
{
- CacheNodeAdvertisment[] found = Config.Config.NodeCollection.GetAllNodes();
+ CacheNodeAdvertisment[] found = clusterMan.DiscoveredNodes.GetAllNodes();
Log.Debug("Discovered {count} nodes\n\t{nodes}", found.Length, found.Select(static s => s.NodeId));
}
@@ -177,7 +141,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
else
{
//Delay the next discovery
- await Task.Delay(Config.DiscoveryInterval, exitToken);
+ await Task.Delay(config.DiscoveryInterval, exitToken);
}
}
}
@@ -188,7 +152,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
}
finally
{
-
+ Monitor.Dispose();
}
//Wait for the watcher to exit
@@ -197,10 +161,11 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
private IEnumerable<CacheNodeAdvertisment> GetMonitorAds()
{
+ string selfId = (clusterMan.Config as CacheNodeConfiguration)!.NodeId;
return Monitor.GetAllPeers()
.Where(static p => p.Advertisment != null)
//Without us
- .Where(n => n.NodeId != Config.Config.NodeId)
+ .Where(n => !string.Equals(n.NodeId, selfId, StringComparison.OrdinalIgnoreCase))
.Select(static p => p.Advertisment!);
}
@@ -222,7 +187,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
//Use the monitor to get the initial peers
IEnumerable<CacheNodeAdvertisment> ads = GetMonitorAds();
- ((NodeDiscoveryCollection)Config.Config.NodeCollection).AddManualNodes(ads);
+ clusterMan.AddManualNodes(ads);
}
}
catch (OperationCanceledException)
@@ -239,7 +204,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
lock (_connectedPeers)
{
//Get all discovered peers
- CacheNodeAdvertisment[] peers = Config.Config.NodeCollection.GetAllNodes();
+ CacheNodeAdvertisment[] peers = clusterMan.DiscoveredNodes.GetAllNodes();
//Get the difference between the discovered peers and the connected peers
return peers.Except(_connectedPeers).ToArray();
@@ -265,31 +230,5 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
_connectedPeers.Remove(peer);
}
}
-
-
- private sealed record class ErrorHandler(ILogProvider Logger) : ICacheDiscoveryErrorHandler
- {
- public void OnDiscoveryError(CacheNodeAdvertisment errorNode, Exception ex)
- {
-
- if (ex is HttpRequestException hre)
- {
- if (hre.InnerException is SocketException se)
- {
- //traisnport failed
- Logger.Warn("Failed to connect to server {serv} because {err}", errorNode, se.Message);
- }
- else
- {
- Logger.Error("Failed to connect to node {n}\n{err}", errorNode, hre);
- }
- }
- else
- {
- Logger.Error("Failed to discover nodes from nodeid {nid}, with error\n{err}", errorNode, ex);
- }
-
- }
- }
}
}
diff --git a/plugins/ObjectCacheServer/src/Endpoints/CacheNegotationManager.cs b/plugins/ObjectCacheServer/src/Endpoints/CacheNegotationManager.cs
index d1591f8..48f4448 100644
--- a/plugins/ObjectCacheServer/src/Endpoints/CacheNegotationManager.cs
+++ b/plugins/ObjectCacheServer/src/Endpoints/CacheNegotationManager.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2023 Vaughn Nugent
+* Copyright (c) 2024 Vaughn Nugent
*
* Library: VNLib
* Package: ObjectCacheServer
@@ -45,7 +45,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
public bool IsPeer { get; set; }
}
- internal sealed class CacheNegotationManager
+ internal sealed class CacheNegotationManager(PluginBase plugin)
{
/*
* Cache keys are centralized and may be shared between all cache server nodes. This means
@@ -64,21 +64,13 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
private static readonly TimeSpan AuthTokenExpiration = TimeSpan.FromSeconds(30);
- private readonly string AudienceLocalServerId;
- private readonly NodeConfig _nodeConfig;
- private readonly CacheConfiguration _cacheConfig;
+ private readonly string AudienceLocalServerId = Guid.NewGuid().ToString("N");
- public CacheNegotationManager(PluginBase plugin)
- {
- //Get node configuration
- _nodeConfig = plugin.GetOrCreateSingleton<NodeConfig>();
+ private readonly ObjectCacheSystemState _sysState = plugin.GetOrCreateSingleton<ObjectCacheSystemState>();
- //Get the cache store configuration
- _cacheConfig = plugin.GetConfigForType<CacheStore>().Deserialze<CacheConfiguration>();
+ private NodeConfig NodeConfig => _sysState.Configuration;
- AudienceLocalServerId = Guid.NewGuid().ToString("N");
- }
-
+ private CacheMemoryConfiguration CacheConfig => _sysState.MemoryConfiguration;
public bool IsClientNegotiationValid(string authToken, out ClientNegotiationState state)
{
@@ -88,12 +80,12 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
using JsonWebToken jwt = JsonWebToken.Parse(authToken);
//verify signature for client
- if (_nodeConfig.KeyStore.VerifyJwt(jwt, false))
+ if (NodeConfig.KeyStore.VerifyJwt(jwt, false))
{
//Validated as normal client
}
//May be signed by a cache server
- else if (_nodeConfig.KeyStore.VerifyJwt(jwt, true))
+ else if (NodeConfig.KeyStore.VerifyJwt(jwt, true))
{
//Set peer and verified flag since the another cache server signed the request
state.IsPeer = true;
@@ -117,12 +109,12 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
return true;
}
- public JsonWebToken ConfirmCLientNegotiation(ClientNegotiationState state, IPAddress clientIp, DateTimeOffset now)
+ public JsonWebToken ConfirmClientNegotiation(ClientNegotiationState state, IPAddress clientIp, DateTimeOffset now)
{
//Verified, now we can create an auth message with a short expiration
JsonWebToken auth = new();
- auth.WriteHeader(_nodeConfig.KeyStore.GetJwtHeader());
+ auth.WriteHeader(NodeConfig.KeyStore.GetJwtHeader());
auth.InitPayloadClaim()
.AddClaim("aud", AudienceLocalServerId)
.AddClaim("iat", now.ToUnixTimeSeconds())
@@ -136,24 +128,29 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
//Set ip address
.AddClaim("ip", clientIp.ToString())
//Add negotiaion args
- .AddClaim(FBMClient.REQ_HEAD_BUF_QUERY_ARG, _cacheConfig.MaxHeaderBufferSize)
- .AddClaim(FBMClient.REQ_RECV_BUF_QUERY_ARG, _cacheConfig.MaxRecvBufferSize)
- .AddClaim(FBMClient.REQ_MAX_MESS_QUERY_ARG, _cacheConfig.MaxMessageSize)
+ .AddClaim(FBMClient.REQ_HEAD_BUF_QUERY_ARG, CacheConfig.MaxHeaderBufferSize)
+ .AddClaim(FBMClient.REQ_RECV_BUF_QUERY_ARG, CacheConfig.MaxRecvBufferSize)
+ .AddClaim(FBMClient.REQ_MAX_MESS_QUERY_ARG, CacheConfig.MaxMessageSize)
.CommitClaims();
//Sign the auth message from our private key
- _nodeConfig.KeyStore.SignJwt(auth);
+ NodeConfig.KeyStore.SignJwt(auth);
return auth;
}
- public bool ValidateUpgrade(string upgradeToken, string tokenSignature, DateTimeOffset now, IPAddress connectionIp, ref string? nodeId, ref bool isPeer)
+ public bool ValidateUpgrade(string? upgradeToken, string? tokenSignature, DateTimeOffset now, IPAddress connectionIp, ref string? nodeId, ref bool isPeer)
{
+ if(string.IsNullOrWhiteSpace(upgradeToken) || string.IsNullOrWhiteSpace(tokenSignature))
+ {
+ return false;
+ }
+
//Parse jwt
using JsonWebToken jwt = JsonWebToken.Parse(upgradeToken);
//verify signature against the cache public key, since this server must have signed it
- if (!_nodeConfig.KeyStore.VerifyCachePeer(jwt))
+ if (!NodeConfig.KeyStore.VerifyCachePeer(jwt))
{
return false;
}
@@ -175,7 +172,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
}
//Check node ip address matches if required
- if (_nodeConfig.VerifyIp)
+ if (NodeConfig.VerifyIp)
{
if (!doc.RootElement.TryGetProperty("ip", out JsonElement ipEl))
{
@@ -201,7 +198,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
}
//Verify token signature against a fellow cache public key
- return _nodeConfig.KeyStore.VerifyUpgradeToken(tokenSignature, upgradeToken, isPeer);
+ return NodeConfig.KeyStore.VerifyUpgradeToken(tokenSignature, upgradeToken, isPeer);
}
}
}
diff --git a/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs b/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs
index 816e6c3..d6b733c 100644
--- a/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs
+++ b/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2023 Vaughn Nugent
+* Copyright (c) 2024 Vaughn Nugent
*
* Library: VNLib
* Package: ObjectCacheServer
@@ -55,11 +55,14 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
{
internal const string LOG_SCOPE_NAME = "CONEP";
-
- private readonly ICacheEventQueueManager PubSubManager;
- private readonly IPeerMonitor Peers;
- private readonly BlobCacheListener<IPeerEventQueue> Store;
- private readonly NodeConfig NodeConfiguration;
+
+ private readonly ObjectCacheSystemState _sysState;
+
+ private PeerEventQueueManager PubSubManager => _sysState.PeerEventQueue;
+ private CachePeerMonitor Peers => _sysState.PeerMonitor;
+ private BlobCacheListener<IPeerEventQueue> Listener => _sysState.Listener;
+ private NodeConfig NodeConfiguration => _sysState.Configuration;
+
private readonly CacheNegotationManager AuthManager;
private uint _connectedClients;
@@ -72,7 +75,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
/// <summary>
/// The cache store configuration
/// </summary>
- public CacheConfiguration CacheConfig { get; }
+ public CacheMemoryConfiguration CacheConfig => _sysState.MemoryConfiguration;
//Loosen up protection settings
///<inheritdoc/>
@@ -83,24 +86,11 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
public ConnectEndpoint(PluginBase plugin)
{
- //Get node configuration
- NodeConfiguration = plugin.GetOrCreateSingleton<NodeConfig>();
+ _sysState = plugin.GetOrCreateSingleton<ObjectCacheSystemState>();
//Init from config and create a new log scope
InitPathAndLog(NodeConfiguration.ConnectPath, plugin.Log.CreateScope(LOG_SCOPE_NAME));
-
- //Setup pub/sub manager
- PubSubManager = plugin.GetOrCreateSingleton<CacheEventQueueManager>();
-
- //Get peer monitor
- Peers = plugin.GetOrCreateSingleton<CachePeerMonitor>();
-
- //Init the cache store
- Store = plugin.GetOrCreateSingleton<CacheStore>().Listener;
-
- //Get the cache store configuration
- CacheConfig = plugin.GetConfigForType<CacheStore>().Deserialze<CacheConfiguration>();
-
+
//Get the auth manager
AuthManager = plugin.GetOrCreateSingleton<CacheNegotationManager>();
}
@@ -127,6 +117,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
{
//Parse jwt from authoriation
string? jwtAuth = entity.Server.Headers[HttpRequestHeader.Authorization];
+
if (string.IsNullOrWhiteSpace(jwtAuth))
{
return VirtualClose(entity, HttpStatusCode.Forbidden);
@@ -149,26 +140,34 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
}
//Verified, now we can create an auth message with a short expiration
- using JsonWebToken auth = AuthManager.ConfirmCLientNegotiation(state, entity.TrustedRemoteIp, entity.RequestedTimeUtc);
+ using JsonWebToken auth = AuthManager.ConfirmClientNegotiation(state, entity.TrustedRemoteIp, entity.RequestedTimeUtc);
- //Close response
+ //Close response by sending a copy of the signed token
entity.CloseResponse(HttpStatusCode.OK, ContentType.Text, auth.DataBuffer);
return VfReturnType.VirtualSkip;
}
protected override VfReturnType WebsocketRequested(HttpEntity entity)
{
+ /*
+ * Check to see if any more connections are allowed,
+ * otherwise deny the connection
+ *
+ * This is done here to prevent the server from being overloaded
+ * on a new connection. It would be ideal to not grant new tokens
+ * but malicious clients could cache a bunch of tokens and use them
+ * later, exhausting resources.
+ */
+ if(_connectedClients >= NodeConfiguration.MaxConcurrentConnections)
+ {
+ return VirtualClose(entity, HttpStatusCode.ServiceUnavailable);
+ }
+
//Parse jwt from authorization
string? jwtAuth = entity.Server.Headers[HttpRequestHeader.Authorization];
string? clientSignature = entity.Server.Headers[FBMDataCacheExtensions.X_UPGRADE_SIG_HEADER];
string? optionalDiscovery = entity.Server.Headers[FBMDataCacheExtensions.X_NODE_DISCOVERY_HEADER];
- //Not null
- if (string.IsNullOrWhiteSpace(jwtAuth) || string.IsNullOrWhiteSpace(clientSignature))
- {
- return VfReturnType.Forbidden;
- }
-
string? nodeId = null;
bool isPeer = false;
@@ -178,15 +177,15 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
return VirtualClose(entity, HttpStatusCode.Unauthorized);
}
- CacheNodeAdvertisment? discoveryAd = null;
-
/*
* If the client is a peer server, it may offer a signed advertisment
* that this node will have the duty of making available to other peers
* if it is valid
*/
- if (isPeer && !string.IsNullOrWhiteSpace(optionalDiscovery))
+ CacheNodeAdvertisment? discoveryAd = null;
+
+ if (isPeer)
{
discoveryAd = NodeConfiguration.KeyStore.VerifyPeerAdvertisment(optionalDiscovery);
}
@@ -196,11 +195,10 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
try
{
//Get query config suggestions from the client
- string recvBufCmd = entity.QueryArgs[FBMClient.REQ_RECV_BUF_QUERY_ARG];
- string maxHeaderCharCmd = entity.QueryArgs[FBMClient.REQ_HEAD_BUF_QUERY_ARG];
- string maxMessageSizeCmd = entity.QueryArgs[FBMClient.REQ_MAX_MESS_QUERY_ARG];
+ string? recvBufCmd = entity.QueryArgs.GetValueOrDefault(FBMClient.REQ_RECV_BUF_QUERY_ARG);
+ string? maxHeaderCharCmd = entity.QueryArgs.GetValueOrDefault(FBMClient.REQ_HEAD_BUF_QUERY_ARG);
+ string? maxMessageSizeCmd = entity.QueryArgs.GetValueOrDefault(FBMClient.REQ_MAX_MESS_QUERY_ARG);
- //Parse recv buffer size
int recvBufSize = int.TryParse(recvBufCmd, out int rbs) ? rbs : CacheConfig.MinRecvBufferSize;
int maxHeadBufSize = int.TryParse(maxHeaderCharCmd, out int hbs) ? hbs : CacheConfig.MinHeaderBufferSize;
int maxMessageSize = int.TryParse(maxMessageSizeCmd, out int mxs) ? mxs : CacheConfig.MaxMessageSize;
@@ -253,9 +251,8 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
Peers.OnPeerConnected(state);
//Register plugin exit token to cancel the connected socket
- CancellationTokenRegistration reg = this.GetPlugin().UnloadToken.Register(wss.CancelAll);
-
- //Inc connected count
+ await using CancellationTokenRegistration reg = this.GetPlugin().UnloadToken.Register(wss.CancelAll);
+
Interlocked.Increment(ref _connectedClients);
try
@@ -280,7 +277,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
try
{
//Begin listening for messages with a queue
- await Store.ListenAsync(wss, queue, args);
+ await Listener.ListenAsync(wss, queue, args);
}
finally
{
@@ -291,7 +288,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
else
{
//Begin listening for messages without a queue
- await Store.ListenAsync(wss, null!, args);
+ await Listener.ListenAsync(wss, null!, args);
}
}
catch (OperationCanceledException)
@@ -303,15 +300,13 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
}
catch (Exception ex)
{
- Log.Debug(ex);
+ //If debug logging is enabled print a more detailed error message
+ Log.Error("An error occured on websocket connection: node {con} -> {error}", state.NodeId, ex.Message);
+ Log.Debug("Websocket connection error: node {con}\n{error}", state.NodeId, ex);
}
-
- //Dec connected count
+
Interlocked.Decrement(ref _connectedClients);
- //Unregister the token
- reg.Unregister();
-
//Notify monitor of disconnect
Peers.OnPeerDisconnected(state);
diff --git a/plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs b/plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs
index 7d376b8..56fe8cd 100644
--- a/plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs
+++ b/plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2023 Vaughn Nugent
+* Copyright (c) 2024 Vaughn Nugent
*
* Library: VNLib
* Package: ObjectCacheServer
@@ -40,25 +40,29 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
{
internal sealed class PeerDiscoveryEndpoint : ResourceEndpointBase
{
- private readonly IPeerMonitor PeerMonitor;
- private readonly NodeConfig Config;
+ private readonly ObjectCacheSystemState SysState;
+
+ private CacheAuthKeyStore KeyStore => SysState.Configuration.KeyStore;
+
+ private CachePeerMonitor PeerMonitor => SysState.PeerMonitor;
+
+ private CacheNodeConfiguration NodeConfig => SysState.Configuration.Config;
- //Loosen up protection settings
///<inheritdoc/>
protected override ProtectionSettings EndpointProtectionSettings { get; } = new()
{
- DisableSessionsRequired = true
+ /*
+ * Sessions will not be used or required for this endpoint.
+ * We should also assume the session system is not even loaded
+ */
+ DisableSessionsRequired = true
};
public PeerDiscoveryEndpoint(PluginBase plugin)
{
- //Get the peer monitor
- PeerMonitor = plugin.GetOrCreateSingleton<CachePeerMonitor>();
-
- //Get the node config
- Config = plugin.GetOrCreateSingleton<NodeConfig>();
+ SysState = plugin.GetOrCreateSingleton<ObjectCacheSystemState>();
- InitPathAndLog(Config.DiscoveryPath, plugin.Log);
+ InitPathAndLog(SysState.Configuration.DiscoveryPath!, plugin.Log);
}
protected override VfReturnType Get(HttpEntity entity)
@@ -68,36 +72,41 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
if(string.IsNullOrWhiteSpace(authToken))
{
- entity.CloseResponse(HttpStatusCode.Unauthorized);
- return VfReturnType.VirtualSkip;
+ return VirtualClose(entity, HttpStatusCode.Unauthorized);
}
string subject = string.Empty;
string challenge = string.Empty;
- //Parse auth token
- using(JsonWebToken jwt = JsonWebToken.Parse(authToken))
+ try
{
+ //Parse auth token
+ using JsonWebToken jwt = JsonWebToken.Parse(authToken);
+
//try to verify against cache node first
- if (!Config.KeyStore.VerifyJwt(jwt, true))
+ if (!KeyStore.VerifyJwt(jwt, true))
{
//failed...
//try to verify against client key
- if (!Config.KeyStore.VerifyJwt(jwt, false))
+ if (!KeyStore.VerifyJwt(jwt, false))
{
//invalid token
- entity.CloseResponse(HttpStatusCode.Unauthorized);
- return VfReturnType.VirtualSkip;
+ return VirtualClose(entity, HttpStatusCode.Unauthorized);
}
}
using JsonDocument payload = jwt.GetPayload();
//Get client info to pass back
- subject = payload.RootElement.TryGetProperty("sub", out JsonElement subEl) ? subEl.GetString() ?? string.Empty : string.Empty;
+ subject = payload.RootElement.TryGetProperty("sub", out JsonElement subEl) ? subEl.GetString() ?? string.Empty : string.Empty;
challenge = payload.RootElement.GetProperty("chl").GetString() ?? string.Empty;
}
+ catch (FormatException)
+ {
+ //If tokens are invalid format, let the client know instead of a server error
+ return VfReturnType.BadRequest;
+ }
//Valid key, get peer list to send to client
CacheNodeAdvertisment[] peers = PeerMonitor.GetAllPeers()
@@ -109,10 +118,10 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
using JsonWebToken response = new();
//set header from cache config
- response.WriteHeader(Config.KeyStore.GetJwtHeader());
+ response.WriteHeader(KeyStore.GetJwtHeader());
response.InitPayloadClaim()
- .AddClaim("iss", Config.Config.NodeId)
+ .AddClaim("iss", NodeConfig.NodeId)
//Audience is the requestor id
.AddClaim("sub", subject)
.AddClaim("iat", entity.RequestedTimeUtc.ToUnixTimeSeconds())
@@ -122,10 +131,9 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
.AddClaim("chl", challenge)
.CommitClaims();
- //Sign the response
- Config.KeyStore.SignJwt(response);
-
- //Send response to client
+
+ KeyStore.SignJwt(response);
+
entity.CloseResponse(HttpStatusCode.OK, Net.Http.ContentType.Text, response.DataBuffer);
return VfReturnType.VirtualSkip;
}
diff --git a/plugins/ObjectCacheServer/src/Endpoints/WellKnownEndpoint.cs b/plugins/ObjectCacheServer/src/Endpoints/WellKnownEndpoint.cs
index 87a471b..04380c5 100644
--- a/plugins/ObjectCacheServer/src/Endpoints/WellKnownEndpoint.cs
+++ b/plugins/ObjectCacheServer/src/Endpoints/WellKnownEndpoint.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2023 Vaughn Nugent
+* Copyright (c) 2024 Vaughn Nugent
*
* Library: VNLib
* Package: ObjectCacheServer
@@ -59,7 +59,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
public WellKnownEndpoint(PluginBase plugin)
{
//Get the node config
- NodeConfig nodeConfig = plugin.GetOrCreateSingleton<NodeConfig>();
+ NodeConfig nodeConfig = plugin.GetOrCreateSingleton<ObjectCacheSystemState>().Configuration;
//serialize the config, discovery may not be enabled
_advertisment = nodeConfig.Config.Advertisment;
diff --git a/plugins/ObjectCacheServer/src/NodeConfig.cs b/plugins/ObjectCacheServer/src/NodeConfig.cs
index 3a2e10e..4dd9f4a 100644
--- a/plugins/ObjectCacheServer/src/NodeConfig.cs
+++ b/plugins/ObjectCacheServer/src/NodeConfig.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2023 Vaughn Nugent
+* Copyright (c) 2024 Vaughn Nugent
*
* Library: VNLib
* Package: ObjectCacheServer
@@ -66,6 +66,12 @@ namespace VNLib.Data.Caching.ObjectCache.Server
/// </summary>
public uint MaxPeerConnections { get; } = 10;
+ /// <summary>
+ /// The maxium number of concurrent client connections to allow
+ /// before rejecting new connections
+ /// </summary>
+ public uint MaxConcurrentConnections { get; }
+
public NodeConfig(PluginBase plugin, IConfigScope config)
{
//Get the port of the primary webserver
@@ -92,32 +98,23 @@ namespace VNLib.Data.Caching.ObjectCache.Server
//Init key store
KeyStore = new(plugin);
-
- DiscoveryInterval = config["discovery_interval_sec"].GetTimeSpan(TimeParseType.Seconds);
-
- //Get the event queue purge interval
- EventQueuePurgeInterval = config["queue_purge_interval_sec"].GetTimeSpan(TimeParseType.Seconds);
-
- //Get the max queue depth
- MaxQueueDepth = (int)config["max_queue_depth"].GetUInt32();
-
-
- //Get the connect path
- ConnectPath = config["connect_path"].GetString() ?? throw new KeyNotFoundException("Missing required key 'connect_path' in cluster config");
-
- //Get the verify ip setting
- VerifyIp = config["verify_ip"].GetBoolean();
+ DiscoveryInterval = config.GetRequiredProperty("discovery_interval_sec", p => p.GetTimeSpan(TimeParseType.Seconds));
+ EventQueuePurgeInterval = config.GetRequiredProperty("queue_purge_interval_sec", p => p.GetTimeSpan(TimeParseType.Seconds));
+ MaxQueueDepth = (int)config.GetRequiredProperty("max_queue_depth", p => p.GetUInt32());
+ ConnectPath = config.GetRequiredProperty("connect_path", p => p.GetString()!);
+ VerifyIp = config.GetRequiredProperty("verify_ip", p => p.GetBoolean());
+ WellKnownPath = config.GetValueOrDefault("well_known_path", p => p.GetString()!, DefaultPath);
+ MaxPeerConnections = config.GetValueOrDefault("max_peers", p => p.GetUInt32(), 10u);
Uri connectEp = BuildUri(usingTls, hostname, port, ConnectPath);
Uri? discoveryEp = null;
- Config = new();
-
//Setup cache node config
- Config.WithCacheEndpoint(connectEp)
- .WithNodeId(nodeId)
- .WithAuthenticator(KeyStore)
- .WithTls(usingTls);
+ (Config = new())
+ .WithCacheEndpoint(connectEp)
+ .WithNodeId(nodeId)
+ .WithAuthenticator(KeyStore)
+ .WithTls(usingTls);
//Get the discovery path (optional)
if (config.TryGetValue("discovery_path", out JsonElement discoveryPathEl))
@@ -133,20 +130,6 @@ namespace VNLib.Data.Caching.ObjectCache.Server
}
}
- //Allow custom well-known path
- if(config.TryGetValue("well_known_path", out JsonElement wkEl))
- {
- WellKnownPath = wkEl.GetString() ?? DefaultPath;
- }
- //Default if not set
- WellKnownPath ??= DefaultPath;
-
- //Get the max peer connections
- if (config.TryGetValue("max_peers", out JsonElement maxPeerEl))
- {
- MaxPeerConnections = maxPeerEl.GetUInt32();
- }
-
const string CacheConfigTemplate =
@"
Cluster Configuration:
diff --git a/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs b/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs
index aada787..b970cee 100644
--- a/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs
+++ b/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2023 Vaughn Nugent
+* Copyright (c) 2024 Vaughn Nugent
*
* Library: VNLib
* Package: ObjectCacheServer
@@ -23,13 +23,10 @@
*/
using System;
-using System.Threading;
using System.Collections.Generic;
using VNLib.Plugins;
-using VNLib.Utils.Memory;
using VNLib.Utils.Logging;
-using VNLib.Utils.Memory.Diagnostics;
using VNLib.Plugins.Extensions.Loading;
using VNLib.Plugins.Extensions.Loading.Routing;
@@ -42,39 +39,15 @@ namespace VNLib.Data.Caching.ObjectCache.Server
public sealed class ObjectCacheServerEntry : PluginBase
{
public override string PluginName => "ObjectCache.Service";
-
- private readonly Lazy<IUnmangedHeap> _cacheHeap;
-
- internal IUnmangedHeap ListenerHeap => _cacheHeap.Value;
-
- public ObjectCacheServerEntry()
- {
- //Init heap
- _cacheHeap = new Lazy<IUnmangedHeap>(InitializeHeap, LazyThreadSafetyMode.PublicationOnly);
- }
-
- internal IUnmangedHeap InitializeHeap()
- {
- //Create default heap
- IUnmangedHeap _heap = MemoryUtil.InitializeNewHeapForProcess();
- try
- {
- //If the plugin is in debug mode enable heap tracking
- return this.IsDebug() ? new TrackedHeapWrapper(_heap, true) : _heap;
- }
- catch
- {
- _heap.Dispose();
- throw;
- }
- }
+
protected override void OnLoad()
{
try
{
- //Get the node configuration first
- NodeConfig config = this.GetOrCreateSingleton<NodeConfig>();
+ //Initialize the cache node builder
+ ObjectCacheSystemState builder = this.GetOrCreateSingleton<ObjectCacheSystemState>();
+ builder.Initialize();
//Route well-known endpoint
this.Route<WellKnownEndpoint>();
@@ -86,7 +59,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server
_ = this.GetOrCreateSingleton<CacheNodeReplicationMaanger>();
//Setup discovery endpoint
- if(!string.IsNullOrWhiteSpace(config.DiscoveryPath))
+ if(!string.IsNullOrWhiteSpace(builder.Configuration.DiscoveryPath))
{
this.Route<PeerDiscoveryEndpoint>();
}
@@ -101,12 +74,6 @@ namespace VNLib.Data.Caching.ObjectCache.Server
protected override void OnUnLoad()
{
- //dispose heap if initialized
- if(_cacheHeap.IsValueCreated)
- {
- _cacheHeap.Value.Dispose();
- }
-
Log.Information("Plugin unloaded");
}
diff --git a/plugins/ObjectCacheServer/src/ObjectCacheSystemState.cs b/plugins/ObjectCacheServer/src/ObjectCacheSystemState.cs
new file mode 100644
index 0000000..6183956
--- /dev/null
+++ b/plugins/ObjectCacheServer/src/ObjectCacheSystemState.cs
@@ -0,0 +1,215 @@
+/*
+* Copyright (c) 2024 Vaughn Nugent
+*
+* Library: VNLib
+* Package: ObjectCacheServer
+* File: ObjectCacheSystemState.cs
+*
+* ObjectCacheSystemState.cs is part of ObjectCacheServer which is
+* part of the larger VNLib collection of libraries and utilities.
+*
+* ObjectCacheServer is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* ObjectCacheServer is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+using System.Linq;
+using System.Net.Http;
+using System.Net.Sockets;
+
+using VNLib.Utils.Logging;
+using VNLib.Utils.Memory;
+using VNLib.Utils.Memory.Diagnostics;
+using VNLib.Net.Messaging.FBM;
+using VNLib.Plugins;
+using VNLib.Plugins.Extensions.Loading;
+
+using VNLib.Data.Caching.Extensions.Clustering;
+using VNLib.Data.Caching.ObjectCache.Server.Cache;
+using VNLib.Data.Caching.ObjectCache.Server.Clustering;
+
+namespace VNLib.Data.Caching.ObjectCache.Server
+{
+ [ConfigurationName("cache")]
+ internal sealed class ObjectCacheSystemState(PluginBase plugin, IConfigScope config) : IDisposable
+ {
+ const string LISTENER_LOG_SCOPE = "CacheListener";
+
+ public BlobCacheListener<IPeerEventQueue> Listener { get; private set; } = null!;
+
+ public ICacheStore InternalStore { get; private set; } = null!;
+
+ /// <summary>
+ /// Used for miscellaneous shared memory allocations (like the cache listener)
+ /// </summary>
+ public IUnmangedHeap SharedCacheHeap { get; private set; } = null!;
+
+ /// <summary>
+ /// The plugin-wide, shared node configuration
+ /// </summary>
+ public NodeConfig Configuration { get; } = plugin.GetOrCreateSingleton<NodeConfig>();
+
+ /// <summary>
+ /// The peer discovery manager
+ /// </summary>
+ public PeerDiscoveryManager PeerDiscovery { get; private set; } = null!;
+
+ /// <summary>
+ /// System wide peer monitor
+ /// </summary>
+ public CachePeerMonitor PeerMonitor { get; } = new();
+
+ public CacheMemoryConfiguration MemoryConfiguration { get; } = config.Deserialze<CacheMemoryConfiguration>();
+
+ /// <summary>
+ /// The system wide peer event queue manager
+ /// </summary>
+ public PeerEventQueueManager PeerEventQueue { get; private set; }
+
+ void IDisposable.Dispose()
+ {
+ SharedCacheHeap.Dispose();
+ Listener.Dispose();
+ }
+
+ /// <summary>
+ /// Initializes the cache node state
+ /// </summary>
+ public void Initialize()
+ {
+ CacheMemoryConfiguration cacheConf = MemoryConfiguration;
+
+ ArgumentOutOfRangeException.ThrowIfLessThan(cacheConf.MaxCacheEntries, 2u);
+
+ //Suggestion
+ if (cacheConf.MaxCacheEntries < 200)
+ {
+ plugin.Log.Information("Suggestion: You may want a larger cache size, you have less than 200 items in cache");
+ }
+
+ LogMemConfiguration();
+
+ //If the plugin is in debug mode enable heap tracking
+ SharedCacheHeap = plugin.IsDebug() ?
+ new TrackedHeapWrapper(MemoryUtil.InitializeNewHeapForProcess(), true)
+ : MemoryUtil.InitializeNewHeapForProcess();
+
+ ConfigurePeerDiscovery();
+
+ ConfigureCacheListener();
+
+ PeerEventQueue = new(plugin, Configuration);
+ }
+
+ private void ConfigurePeerDiscovery()
+ {
+ //Get the known peers array from config, its allowed to be null for master nodes
+ IConfigScope? config = plugin.TryGetConfig("known_peers");
+ string[] kownPeers = config?.Deserialze<string[]>() ?? [];
+
+ ILogProvider discLogger = plugin.Log.CreateScope(PeerDiscoveryManager.LOG_SCOPE_NAME);
+
+ Configuration.Config.WithInitialPeers(kownPeers.Select(static s => new Uri(s)))
+ .WithErrorHandler(new ErrorHandler(discLogger));
+
+ discLogger.Information("Inital peer nodes: {nodes}", kownPeers);
+
+ PeerDiscovery = new PeerDiscoveryManager(
+ Configuration,
+ discLogger,
+ plugin.IsDebug(),
+ kownPeers.Length > 0
+ );
+
+ //Discovery manager needs to be scheduled for background work to run the discovery loop
+ _ = plugin.ObserveWork(PeerDiscovery, 10);
+ }
+
+ private void ConfigureCacheListener()
+ {
+ /*
+ * Allow loading external managed dll for a bucket-local memory manager
+ */
+ ICacheMemoryManagerFactory manager;
+
+ if (string.IsNullOrWhiteSpace(MemoryConfiguration.ExternLibPath))
+ {
+ //Get the memory manager
+ manager = plugin.GetOrCreateSingleton<BucketLocalManagerFactory>();
+ }
+ else
+ {
+ manager = plugin.CreateServiceExternal<ICacheMemoryManagerFactory>(MemoryConfiguration.ExternLibPath);
+ }
+
+ //Endpoint only allows for a single reader
+ Listener = new(
+ plugin.LoadMemoryCacheSystem(config, manager, MemoryConfiguration),
+ plugin.GetOrCreateSingleton<CacheListenerPubQueue>(),
+ plugin.Log.CreateScope(LISTENER_LOG_SCOPE),
+ new SharedHeapFBMMemoryManager(SharedCacheHeap)
+ );
+
+ InternalStore = new CacheStore(Listener.Cache);
+ }
+
+ private void LogMemConfiguration()
+ {
+ const string CacheConfigTemplate =
+@"
+Cache Configuration:
+ Max memory: {max} Mb
+ Buckets: {bc}
+ Entries per-bucket: {mc}
+ HeapTracking: {ht}
+";
+
+ CacheMemoryConfiguration cacheConf = MemoryConfiguration;
+
+ //calculate the max memory usage
+ ulong maxByteSize = cacheConf.MaxCacheEntries * (ulong)cacheConf.BucketCount * (ulong)cacheConf.MaxMessageSize;
+
+ //Log the cache config
+ plugin.Log.Information(
+ CacheConfigTemplate,
+ maxByteSize / (1024 * 1000),
+ cacheConf.BucketCount,
+ cacheConf.MaxCacheEntries,
+ plugin.IsDebug()
+ );
+ }
+
+ private sealed class ErrorHandler(ILogProvider Logger) : ICacheDiscoveryErrorHandler
+ {
+ public void OnDiscoveryError(CacheNodeAdvertisment errorNode, Exception ex)
+ {
+ if (ex is HttpRequestException hre)
+ {
+ if (hre.InnerException is SocketException se)
+ {
+ //transport failed
+ Logger.Warn("Failed to connect to server {serv} because {err}", errorNode, se.Message);
+ }
+ else
+ {
+ Logger.Error("Failed to connect to node {n}\n{err}", errorNode, hre);
+ }
+ }
+ else
+ {
+ Logger.Error("Failed to discover nodes from nodeid {nid}, with error\n{err}", errorNode, ex);
+ }
+ }
+ }
+ }
+}
diff --git a/plugins/VNLib.Data.Caching.Providers.VNCache/src/Clustering/ClusterNodeIndex.cs b/plugins/VNLib.Data.Caching.Providers.VNCache/src/Clustering/ClusterNodeIndex.cs
index c9cd746..e9dcbc5 100644
--- a/plugins/VNLib.Data.Caching.Providers.VNCache/src/Clustering/ClusterNodeIndex.cs
+++ b/plugins/VNLib.Data.Caching.Providers.VNCache/src/Clustering/ClusterNodeIndex.cs
@@ -46,7 +46,7 @@ namespace VNLib.Data.Caching.Providers.VNCache.Clustering
* it in the app domain.
*/
- public static IClusterNodeIndex CreateIndex(CacheClientConfiguration config)
+ public static IClusterNodeIndex CreateIndex(VNCacheClusterManager cluster)
{
/* TEMPORARY:
* Named semaphores are only supported on Windows, which allowed synchronized communication between
@@ -75,7 +75,7 @@ namespace VNLib.Data.Caching.Providers.VNCache.Clustering
if (remoteIndex == null)
{
//Create a new index and store it in the app domain
- IClusterNodeIndex index = new LocalHandler(config);
+ IClusterNodeIndex index = new LocalHandler(cluster);
AppDomain.CurrentDomain.SetData(APP_DOMAIN_KEY, index);
return index;
}
@@ -92,7 +92,7 @@ namespace VNLib.Data.Caching.Providers.VNCache.Clustering
}
else
{
- return new LocalHandler(config);
+ return new LocalHandler(cluster);
}
}
@@ -114,15 +114,15 @@ namespace VNLib.Data.Caching.Providers.VNCache.Clustering
* Unless VNLib.Core supports a new way to safley share types across ALCs, this is my solution.
*/
- sealed class LocalHandler(CacheClientConfiguration Config) : IClusterNodeIndex, IIntervalScheduleable
+ sealed class LocalHandler(VNCacheClusterManager cluster) : IClusterNodeIndex, IIntervalScheduleable
{
private Task _currentUpdate = Task.CompletedTask;
///<inheritdoc/>
public CacheNodeAdvertisment? GetNextNode()
{
- //Get all nodes
- CacheNodeAdvertisment[] ads = Config.NodeCollection.GetAllNodes();
+ //Get all discovered nodes
+ CacheNodeAdvertisment[] ads = cluster.DiscoveredNodes.GetAllNodes();
//Just get a random node from the collection for now
return ads.Length > 0 ? ads.SelectRandom() : null;
}
@@ -134,7 +134,7 @@ namespace VNLib.Data.Caching.Providers.VNCache.Clustering
public Task OnIntervalAsync(ILogProvider log, CancellationToken cancellationToken)
{
//Run discovery operation and update the task
- _currentUpdate = Config.DiscoverNodesAsync(cancellationToken);
+ _currentUpdate = cluster.DiscoverNodesAsync(cancellationToken);
return Task.CompletedTask;
}
diff --git a/plugins/VNLib.Data.Caching.Providers.VNCache/src/FBMCacheClient.cs b/plugins/VNLib.Data.Caching.Providers.VNCache/src/FBMCacheClient.cs
index 73783dc..a8f86f9 100644
--- a/plugins/VNLib.Data.Caching.Providers.VNCache/src/FBMCacheClient.cs
+++ b/plugins/VNLib.Data.Caching.Providers.VNCache/src/FBMCacheClient.cs
@@ -59,7 +59,7 @@ namespace VNLib.Data.Caching.Providers.VNCache
private readonly VnCacheClientConfig _config;
private readonly IClusterNodeIndex _index;
- private readonly FBMClientFactory _clientFactory;
+ private readonly VNCacheClusterClient _cluster;
private readonly TimeSpan _initNodeDelay;
private bool _isConnected;
@@ -83,9 +83,8 @@ namespace VNLib.Data.Caching.Providers.VNCache
{
ILogProvider scoped = plugin.Log.CreateScope(LOG_NAME);
- //Set authenticator and error handler
- _clientFactory.GetCacheConfiguration()
- .WithAuthenticator(new AuthManager(plugin))
+ //When in plugin context, we can use plugin local secrets and a log-based error handler
+ _cluster.Config.WithAuthenticator(new AuthManager(plugin))
.WithErrorHandler(new DiscoveryErrHAndler(scoped));
//Only the master index is schedulable
@@ -116,17 +115,20 @@ namespace VNLib.Data.Caching.Providers.VNCache
//Init the client with default settings
FBMClientConfig conf = FBMDataCacheExtensions.GetDefaultConfig(BufferHeap, (int)config.MaxBlobSize, config.RequestTimeout, debugLog);
-
- FBMFallbackClientWsFactory wsFactory = new();
- _clientFactory = new(in conf, wsFactory);
-
- //Add the configuration to the client
- _clientFactory.GetCacheConfiguration()
+
+ FBMClientFactory clientFactory = new(
+ in conf,
+ new FBMFallbackClientWsFactory(),
+ 10
+ );
+
+ _cluster = (new CacheClientConfiguration())
.WithTls(config.UseTls)
- .WithInitialPeers(config.GetInitialNodeUris());
+ .WithInitialPeers(config.GetInitialNodeUris())
+ .ToClusterClient(clientFactory);
//Init index
- _index = ClusterNodeIndex.CreateIndex(_clientFactory.GetCacheConfiguration());
+ _index = ClusterNodeIndex.CreateIndex(_cluster);
}
/*
@@ -216,7 +218,7 @@ namespace VNLib.Data.Caching.Providers.VNCache
pluginLog.Debug("Connecting to {node}", node);
//Connect to the node and save new client
- _client = await _clientFactory.ConnectToCacheAsync(node, exitToken);
+ _client = await _cluster.ConnectToCacheAsync(node, exitToken);
if (pluginLog.IsEnabled(LogLevel.Debug))
{
@@ -327,22 +329,11 @@ namespace VNLib.Data.Caching.Providers.VNCache
///<inheritdoc/>
public override object GetUnderlyingStore() => _client ?? throw new InvalidOperationException("The client is not currently connected");
- private sealed class AuthManager : ICacheAuthManager
+ private sealed class AuthManager(PluginBase plugin) : ICacheAuthManager
{
- private IAsyncLazy<ReadOnlyJsonWebKey> _sigKey;
- private IAsyncLazy<ReadOnlyJsonWebKey> _verKey;
-
- public AuthManager(PluginBase plugin)
- {
- //Lazy load keys
-
- //Get the signing key
- _sigKey = plugin.GetSecretAsync("client_private_key").ToLazy(static r => r.GetJsonWebKey());
-
- //Lazy load cache public key
- _verKey = plugin.GetSecretAsync("cache_public_key").ToLazy(static r => r.GetJsonWebKey());
- }
+ private IAsyncLazy<ReadOnlyJsonWebKey> _sigKey = plugin.GetSecretAsync("client_private_key").ToLazy(static r => r.GetJsonWebKey());
+ private IAsyncLazy<ReadOnlyJsonWebKey> _verKey = plugin.GetSecretAsync("cache_public_key").ToLazy(static r => r.GetJsonWebKey());
public async Task AwaitLazyKeyLoad()
{
diff --git a/plugins/VNLib.Data.Caching.Providers.VNCache/src/VnCacheClientConfig.cs b/plugins/VNLib.Data.Caching.Providers.VNCache/src/VnCacheClientConfig.cs
index 383c979..0d6cd34 100644
--- a/plugins/VNLib.Data.Caching.Providers.VNCache/src/VnCacheClientConfig.cs
+++ b/plugins/VNLib.Data.Caching.Providers.VNCache/src/VnCacheClientConfig.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2023 Vaughn Nugent
+* Copyright (c) 2024 Vaughn Nugent
*
* Library: VNLib
* Package: VNLib.Data.Caching.Providers.VNCache
@@ -33,6 +33,8 @@ namespace VNLib.Data.Caching.Providers.VNCache
/// </summary>
public class VnCacheClientConfig : VNCacheConfig
{
+ const string DefaultWellKnownEndpoint = "/.well-known/vncache";
+
/// <summary>
/// The broker server address
/// </summary>
@@ -88,7 +90,13 @@ namespace VNLib.Data.Caching.Providers.VNCache
public Uri[] GetInitialNodeUris()
{
_ = InitialNodes ?? throw new InvalidOperationException("Initial nodes have not been set");
- return InitialNodes.Select(static x => new Uri(x, UriKind.Absolute)).ToArray();
+ return InitialNodes.Select(static x =>
+ {
+ //Append a default well known endpoint if the path is just a root
+ Uri ur = new (x, UriKind.Absolute);
+ return ur.LocalPath == "/" ? new Uri(ur, DefaultWellKnownEndpoint) : ur;
+ })
+ .ToArray();
}
///<inheritdoc/>