aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Taskfile.yaml5
-rw-r--r--lib/VNLib.Data.Caching.Extensions/src/ApiModel/CacheSiteAdapter.cs38
-rw-r--r--lib/VNLib.Data.Caching.Extensions/src/Clustering/CacheClientConfiguration.cs11
-rw-r--r--lib/VNLib.Data.Caching.Extensions/src/Clustering/CacheNodeConfiguration.cs15
-rw-r--r--lib/VNLib.Data.Caching.Extensions/src/Clustering/ICacheDiscoveryErrorHandler.cs9
-rw-r--r--lib/VNLib.Data.Caching.Extensions/src/Clustering/INodeDiscoveryEnumerator.cs6
-rw-r--r--lib/VNLib.Data.Caching.Extensions/src/Clustering/NodeDiscoveryCollection.cs45
-rw-r--r--lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs361
-rw-r--r--lib/VNLib.Data.Caching.Extensions/src/VNCacheClusterClient.cs78
-rw-r--r--lib/VNLib.Data.Caching.Extensions/src/VNCacheClusterManager.cs306
-rw-r--r--lib/VNLib.Data.Caching.ObjectCache/src/BlobCacheTable.cs15
-rw-r--r--lib/VNLib.Data.Caching.ObjectCache/src/ICacheEntryMemoryManager.cs8
-rw-r--r--lib/VNLib.Data.Caching/src/ClientExtensions.cs2
-rw-r--r--plugins/ObjectCacheServer/Taskfile.yaml88
-rw-r--r--plugins/ObjectCacheServer/server/config/config.json104
-rw-r--r--plugins/ObjectCacheServer/server/container/Dockerfile82
-rw-r--r--plugins/ObjectCacheServer/server/container/Taskfile.yaml80
-rw-r--r--plugins/ObjectCacheServer/server/container/config-templates/ObjectCacheServer-template.json54
-rw-r--r--plugins/ObjectCacheServer/server/container/config-templates/config-template.json105
-rw-r--r--plugins/ObjectCacheServer/server/container/docker-compose.yaml45
-rw-r--r--plugins/ObjectCacheServer/server/container/run.sh15
-rw-r--r--plugins/ObjectCacheServer/server/install.ps126
-rw-r--r--plugins/ObjectCacheServer/server/install.taskfile.yaml20
-rw-r--r--plugins/ObjectCacheServer/server/taskfile.yaml193
-rw-r--r--plugins/ObjectCacheServer/src/BucketLocalManagerFactory.cs53
-rw-r--r--plugins/ObjectCacheServer/src/Cache/CacheListenerPubQueue.cs31
-rw-r--r--plugins/ObjectCacheServer/src/Cache/CacheMemoryConfiguration.cs (renamed from plugins/ObjectCacheServer/src/Cache/CacheConfiguration.cs)15
-rw-r--r--plugins/ObjectCacheServer/src/Cache/CacheStore.cs131
-rw-r--r--plugins/ObjectCacheServer/src/Cache/CacheSystemUtil.cs8
-rw-r--r--plugins/ObjectCacheServer/src/Cache/PeerEventQueueManager.cs (renamed from plugins/ObjectCacheServer/src/Cache/CacheEventQueueManager.cs)132
-rw-r--r--plugins/ObjectCacheServer/src/CacheAuthKeyStore.cs14
-rw-r--r--plugins/ObjectCacheServer/src/CacheConstants.cs107
-rw-r--r--plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs85
-rw-r--r--plugins/ObjectCacheServer/src/Clustering/CachePeerMonitor.cs11
-rw-r--r--plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs130
-rw-r--r--plugins/ObjectCacheServer/src/Endpoints/CacheNegotationManager.cs53
-rw-r--r--plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs94
-rw-r--r--plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs58
-rw-r--r--plugins/ObjectCacheServer/src/Endpoints/WellKnownEndpoint.cs10
-rw-r--r--plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs71
-rw-r--r--plugins/ObjectCacheServer/src/ObjectCacheSystemState.cs331
-rw-r--r--plugins/ObjectCacheServer/src/ServerClusterConfig.cs (renamed from plugins/ObjectCacheServer/src/NodeConfig.cs)139
-rw-r--r--plugins/VNLib.Data.Caching.Providers.Redis/src/VNLib.Data.Caching.Providers.Redis.csproj2
-rw-r--r--plugins/VNLib.Data.Caching.Providers.VNCache/src/Clustering/ClusterNodeIndex.cs14
-rw-r--r--plugins/VNLib.Data.Caching.Providers.VNCache/src/FBMCacheClient.cs88
-rw-r--r--plugins/VNLib.Data.Caching.Providers.VNCache/src/VnCacheClientConfig.cs12
46 files changed, 2232 insertions, 1068 deletions
diff --git a/Taskfile.yaml b/Taskfile.yaml
index bb95e21..48aef59 100644
--- a/Taskfile.yaml
+++ b/Taskfile.yaml
@@ -68,7 +68,6 @@ tasks:
clean:
dir: '{{.USER_WORKING_DIR}}'
cmds:
- - cmd: powershell Remove-Item -Recurse './bin'
- ignore_error: true
- - cmd: powershell Remove-Item -Recurse './obj'
+ - for: ['./bin', './obj']
+ cmd: powershell Remove-Item -Recurse '{{.ITEM}}'
ignore_error: true
diff --git a/lib/VNLib.Data.Caching.Extensions/src/ApiModel/CacheSiteAdapter.cs b/lib/VNLib.Data.Caching.Extensions/src/ApiModel/CacheSiteAdapter.cs
index 99acfd5..6edb912 100644
--- a/lib/VNLib.Data.Caching.Extensions/src/ApiModel/CacheSiteAdapter.cs
+++ b/lib/VNLib.Data.Caching.Extensions/src/ApiModel/CacheSiteAdapter.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2023 Vaughn Nugent
+* Copyright (c) 2024 Vaughn Nugent
*
* Library: VNLib
* Package: VNLib.Data.Caching.Extensions
@@ -22,7 +22,9 @@
* along with this program. If not, see https://www.gnu.org/licenses/.
*/
+using System;
using System.Net;
+using System.Security;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
@@ -39,6 +41,22 @@ namespace VNLib.Data.Caching.Extensions.ApiModel
/// </summary>
internal sealed class CacheSiteAdapter : RestSiteAdapterBase
{
+ /*
+ * Lazy to defer errors for debuggong
+ */
+ private static readonly Lazy<CacheSiteAdapter> _lazy = new(() => ConfigureAdapter(2));
+
+ internal static CacheSiteAdapter Instance => _lazy.Value;
+
+ private static CacheSiteAdapter ConfigureAdapter(int maxClients)
+ {
+ CacheSiteAdapter adapter = new(maxClients);
+ //Configure the site endpoints
+ adapter.BuildEndpoints(ServiceEndpoints.Definition);
+ return adapter;
+ }
+
+
protected override RestClientPool Pool { get; }
public CacheSiteAdapter(int maxClients)
@@ -55,7 +73,23 @@ namespace VNLib.Data.Caching.Extensions.ApiModel
}
public override void OnResponse(RestResponse response)
- { }
+ {
+ switch(response.StatusCode)
+ {
+ case HttpStatusCode.OK:
+ break;
+ case HttpStatusCode.Unauthorized:
+ throw new SecurityException("Unauthorized access to cache service");
+ case HttpStatusCode.Forbidden:
+ throw new SecurityException("Forbidden access to cache service");
+ case HttpStatusCode.NotFound:
+ throw new InvalidOperationException("Cache service not found");
+ case HttpStatusCode.InternalServerError:
+ throw new InvalidOperationException("Cache service internal error");
+ default:
+ throw new InvalidOperationException($"Cache service error: {response.StatusCode}");
+ }
+ }
public override Task WaitAsync(CancellationToken cancellation = default)
{
diff --git a/lib/VNLib.Data.Caching.Extensions/src/Clustering/CacheClientConfiguration.cs b/lib/VNLib.Data.Caching.Extensions/src/Clustering/CacheClientConfiguration.cs
index 5a15d33..263f41a 100644
--- a/lib/VNLib.Data.Caching.Extensions/src/Clustering/CacheClientConfiguration.cs
+++ b/lib/VNLib.Data.Caching.Extensions/src/Clustering/CacheClientConfiguration.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2023 Vaughn Nugent
+* Copyright (c) 2024 Vaughn Nugent
*
* Library: VNLib
* Package: VNLib.Data.Caching.Extensions
@@ -37,14 +37,9 @@ namespace VNLib.Data.Caching.Extensions.Clustering
public class CacheClientConfiguration
{
/// <summary>
- /// Stores available cache servers to be used for discovery, and connections
- /// </summary>
- public INodeDiscoveryCollection NodeCollection { get; } = new NodeDiscoveryCollection();
-
- /// <summary>
/// The authentication manager to use for signing and verifying messages to and from the cache servers
/// </summary>
- public ICacheAuthManager AuthManager { get; private set; }
+ public ICacheAuthManager AuthManager { get; private set; } = null!;
/// <summary>
/// The error handler to use for handling errors that occur during the discovery process
@@ -89,7 +84,7 @@ namespace VNLib.Data.Caching.Extensions.Clustering
public CacheClientConfiguration WithInitialPeers(IEnumerable<Uri> peers)
{
//Check null
- _ = peers ?? throw new ArgumentNullException(nameof(peers));
+ ArgumentNullException.ThrowIfNull(peers);
//Store peer array
WellKnownNodes = peers.ToArray();
diff --git a/lib/VNLib.Data.Caching.Extensions/src/Clustering/CacheNodeConfiguration.cs b/lib/VNLib.Data.Caching.Extensions/src/Clustering/CacheNodeConfiguration.cs
index 6b7ab48..c21ed05 100644
--- a/lib/VNLib.Data.Caching.Extensions/src/Clustering/CacheNodeConfiguration.cs
+++ b/lib/VNLib.Data.Caching.Extensions/src/Clustering/CacheNodeConfiguration.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2023 Vaughn Nugent
+* Copyright (c) 2024 Vaughn Nugent
*
* Library: VNLib
* Package: VNLib.Data.Caching.Extensions
@@ -23,6 +23,7 @@
*/
using System;
+using System.Runtime.CompilerServices;
namespace VNLib.Data.Caching.Extensions.Clustering
{
@@ -85,8 +86,14 @@ namespace VNLib.Data.Caching.Extensions.Clustering
return this;
}
+ internal StrongBox<string> NodeIdRef { get; } = new(string.Empty);
+
///<inheritdoc/>
- public string NodeId { get; private set; } = null!;
+ public string NodeId
+ {
+ get => NodeIdRef.Value!;
+ private set => NodeIdRef.Value = value;
+ }
/// <summary>
/// Specifies the current server's cluster node id. If this
@@ -99,10 +106,6 @@ namespace VNLib.Data.Caching.Extensions.Clustering
public CacheNodeConfiguration WithNodeId(string nodeId)
{
NodeId = nodeId ?? throw new ArgumentNullException(nameof(nodeId));
-
- //Update the node id in the node collection
- (NodeCollection as NodeDiscoveryCollection)!.SetSelfId(nodeId);
-
return this;
}
diff --git a/lib/VNLib.Data.Caching.Extensions/src/Clustering/ICacheDiscoveryErrorHandler.cs b/lib/VNLib.Data.Caching.Extensions/src/Clustering/ICacheDiscoveryErrorHandler.cs
index 984ce3d..f27f1fb 100644
--- a/lib/VNLib.Data.Caching.Extensions/src/Clustering/ICacheDiscoveryErrorHandler.cs
+++ b/lib/VNLib.Data.Caching.Extensions/src/Clustering/ICacheDiscoveryErrorHandler.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2023 Vaughn Nugent
+* Copyright (c) 2024 Vaughn Nugent
*
* Library: VNLib
* Package: VNLib.Data.Caching.Extensions
@@ -37,5 +37,12 @@ namespace VNLib.Data.Caching.Extensions.Clustering
/// <param name="errorNode">The node that the error occured on</param>
/// <param name="ex">The exception that caused the invocation</param>
void OnDiscoveryError(CacheNodeAdvertisment errorNode, Exception ex);
+
+ /// <summary>
+ /// Invoked when an error occurs during the discovery process
+ /// </summary>
+ /// <param name="errorAddress">The server address that failed to connect</param>
+ /// <param name="ex">The exception that caused the invocation</param>
+ void OnDiscoveryError(Uri errorAddress, Exception ex);
}
}
diff --git a/lib/VNLib.Data.Caching.Extensions/src/Clustering/INodeDiscoveryEnumerator.cs b/lib/VNLib.Data.Caching.Extensions/src/Clustering/INodeDiscoveryEnumerator.cs
index 677088a..e57e69b 100644
--- a/lib/VNLib.Data.Caching.Extensions/src/Clustering/INodeDiscoveryEnumerator.cs
+++ b/lib/VNLib.Data.Caching.Extensions/src/Clustering/INodeDiscoveryEnumerator.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2023 Vaughn Nugent
+* Copyright (c) 2024 Vaughn Nugent
*
* Library: VNLib
* Package: VNLib.Data.Caching.Extensions
@@ -28,7 +28,9 @@ using System.Collections.Generic;
namespace VNLib.Data.Caching.Extensions.Clustering
{
/// <summary>
- /// A custom enumerator for the node discovery process
+ /// A custom enumerator for the node discovery process. Simplifies the recursive processes
+ /// of discovering nodes in a cluster to a simple enumeration process. It allows for real-time
+ /// updates to the collection of discovered nodes as a union operation.
/// </summary>
public interface INodeDiscoveryEnumerator : IEnumerator<CacheNodeAdvertisment>
{
diff --git a/lib/VNLib.Data.Caching.Extensions/src/Clustering/NodeDiscoveryCollection.cs b/lib/VNLib.Data.Caching.Extensions/src/Clustering/NodeDiscoveryCollection.cs
index b0e53e1..16b96a3 100644
--- a/lib/VNLib.Data.Caching.Extensions/src/Clustering/NodeDiscoveryCollection.cs
+++ b/lib/VNLib.Data.Caching.Extensions/src/Clustering/NodeDiscoveryCollection.cs
@@ -1,12 +1,12 @@
/*
-* Copyright (c) 2023 Vaughn Nugent
+* Copyright (c) 2024 Vaughn Nugent
*
* Library: VNLib
* Package: VNLib.Data.Caching.Extensions
* File: NodeDiscoveryCollection.cs
*
-* NodeDiscoveryCollection.cs is part of VNLib.Data.Caching.Extensions which is part of the larger
-* VNLib collection of libraries and utilities.
+* NodeDiscoveryCollection.cs is part of VNLib.Data.Caching.Extensions which is part
+* of the larger VNLib collection of libraries and utilities.
*
* VNLib.Data.Caching.Extensions is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
@@ -26,24 +26,18 @@ using System;
using System.Linq;
using System.Collections;
using System.Collections.Generic;
+using System.Runtime.CompilerServices;
+
+using VNLib.Utils.Extensions;
namespace VNLib.Data.Caching.Extensions.Clustering
{
/// <summary>
/// Represents a collection of available cache nodes from a discovery process
/// </summary>
- public sealed class NodeDiscoveryCollection : INodeDiscoveryCollection
+ public sealed class NodeDiscoveryCollection(StrongBox<string?>? selfId) : INodeDiscoveryCollection
{
- private string? _selfId;
- private LinkedList<CacheNodeAdvertisment> _peers;
-
- /// <summary>
- /// Initializes a new empty <see cref="NodeDiscoveryCollection"/>
- /// </summary>
- public NodeDiscoveryCollection()
- {
- _peers = new();
- }
+ private LinkedList<CacheNodeAdvertisment> _peers = new();
/// <summary>
/// Manually adds nodes to the collection that were not discovered through the discovery process
@@ -62,39 +56,34 @@ namespace VNLib.Data.Caching.Extensions.Clustering
}
/// <summary>
- /// Sets the id of the current node, so it can be excluded from discovery
+ /// Removes a vector of nodes from the internal collection
/// </summary>
- /// <param name="selfId">The id of the current node to exclude</param>
- public void SetSelfId(string? selfId) => _selfId = selfId;
+ /// <param name="nodes">The vector containg nodes to remove from the collection</param>
+ public void RemoveManualNodes(IEnumerable<CacheNodeAdvertisment> nodes) => nodes.ForEach(n => _peers.Remove(n));
///<inheritdoc/>
- public INodeDiscoveryEnumerator BeginDiscovery()
- {
- return new NodeEnumerator(new(), _selfId);
- }
+ public INodeDiscoveryEnumerator BeginDiscovery() => new NodeEnumerator(new(), selfId?.Value);
///<inheritdoc/>
public INodeDiscoveryEnumerator BeginDiscovery(IEnumerable<CacheNodeAdvertisment> initialPeers)
{
+ ArgumentNullException.ThrowIfNull(initialPeers);
+
//Init new enumerator with the initial peers
- return new NodeEnumerator(new(initialPeers), _selfId);
+ return new NodeEnumerator(new(initialPeers), selfId?.Value);
}
///<inheritdoc/>
public void CompleteDiscovery(INodeDiscoveryEnumerator enumerator)
{
- _ = enumerator ?? throw new ArgumentNullException(nameof(enumerator));
+ ArgumentNullException.ThrowIfNull(enumerator);
//Capture all nodes from the enumerator and store them as our current peers
_peers = (enumerator as NodeEnumerator)!.Peers;
}
///<inheritdoc/>
- public CacheNodeAdvertisment[] GetAllNodes()
- {
- //Capture all current peers
- return _peers.ToArray();
- }
+ public CacheNodeAdvertisment[] GetAllNodes() => _peers.ToArray();
private sealed record class NodeEnumerator(LinkedList<CacheNodeAdvertisment> Peers, string? SelfNodeId) : INodeDiscoveryEnumerator
{
diff --git a/lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs b/lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs
index bd86461..47aadd9 100644
--- a/lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs
+++ b/lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs
@@ -31,7 +31,6 @@ using System.Threading;
using System.Threading.Tasks;
using System.Collections.Generic;
using System.Security.Cryptography;
-using System.Runtime.CompilerServices;
using RestSharp;
@@ -74,22 +73,7 @@ namespace VNLib.Data.Caching.Extensions
/// The advertisment header for cache node discovery
/// </summary>
public const string X_NODE_DISCOVERY_HEADER = "X-Cache-Node-Discovery";
-
- /*
- * Lazy to defer errors for debuggong
- */
- private static readonly Lazy<CacheSiteAdapter> SiteAdapter = new(() => ConfigureAdapter(2));
-
- private static CacheSiteAdapter ConfigureAdapter(int maxClients)
- {
- CacheSiteAdapter adapter = new(maxClients);
- //Configure the site endpoints
- adapter.BuildEndpoints(ServiceEndpoints.Definition);
- return adapter;
- }
-
- private static readonly ConditionalWeakTable<FBMClientFactory, CacheClientConfiguration> ClientCacheConfig = new();
-
+
/// <summary>
/// Gets a <see cref="FBMClientConfig"/> preconfigured object caching
/// protocl
@@ -101,7 +85,7 @@ namespace VNLib.Data.Caching.Extensions
/// <returns>A preconfigured <see cref="FBMClientConfig"/> for object caching</returns>
public static FBMClientConfig GetDefaultConfig(IUnmangedHeap heap, int maxMessageSize, TimeSpan timeout = default, ILogProvider? debugLog = null)
{
- return GetDefaultConfig(new FallbackFBMMemoryManager(heap), maxMessageSize, timeout, debugLog);
+ return GetDefaultConfig(new SharedHeapFBMMemoryManager(heap), maxMessageSize, timeout, debugLog);
}
/// <summary>
@@ -153,236 +137,22 @@ namespace VNLib.Data.Caching.Extensions
{
client.Config.DebugLog?.Debug("{debug}: {data}", "[CACHE]", message);
}
-
- /// <summary>
- /// Discovers ALL possible cache nodes itteritivley, first by collecting the configuration
- /// from the initial peers.
- /// This will make connections to all discoverable servers
- /// </summary>
- /// <param name="config"></param>
- /// <param name="cancellation">A token to cancel the operation</param>
- /// <returns></returns>
- /// <exception cref="ArgumentException"></exception>
- /// <exception cref="CacheDiscoveryFailureException"></exception>
- public static async Task DiscoverNodesAsync(this CacheClientConfiguration config, CancellationToken cancellation)
- {
- //Make sure at least one node defined
- if(config?.WellKnownNodes == null || config.WellKnownNodes.Length == 0)
- {
- throw new ArgumentException("There must be at least one cache node defined in the client configuration");
- }
-
- //Get the initial advertisments that arent null
- CacheNodeAdvertisment[] initialPeers = await ResolveWellKnownAsync(config, cancellation);
-
- if (initialPeers.Length == 0)
- {
- throw new CacheDiscoveryFailureException("There must be at least one available cache node to continue discovery");
- }
-
- await DiscoverNodesAsync(config, initialPeers, cancellation);
- }
/// <summary>
- /// Resolves the initial well-known cache nodes into their advertisments
+ /// Gets the discovery manager for the current client configuration. Just a
+ /// convience method.
/// </summary>
- /// <param name="config"></param>
- /// <param name="cancellation">A token to cancel the operation</param>
- /// <returns>An array of resolved nodes</returns>
- public static async Task<CacheNodeAdvertisment[]> ResolveWellKnownAsync(this CacheClientConfiguration config, CancellationToken cancellation)
- {
- _ = config?.WellKnownNodes ?? throw new ArgumentNullException(nameof(config));
-
- Task<CacheNodeAdvertisment?>[] initialAdds = new Task<CacheNodeAdvertisment?>[config.WellKnownNodes.Length];
-
- //Discover initial advertisments from well-known addresses
- for (int i = 0; i < config.WellKnownNodes.Length; i++)
- {
- initialAdds[i] = DiscoverNodeConfigAsync(config.WellKnownNodes[i], config, cancellation);
- }
-
- //Wait for all initial adds to complete
- await Task.WhenAll(initialAdds);
-
- //Get the initial advertisments that arent null
- return initialAdds.Select(static x => x.Result!).Where(static s => s != null).ToArray();
- }
+ /// <param name="conf"></param>
+ /// <returns>The new <see cref="VNCacheClusterManager"/> instance around your config</returns>
+ public static VNCacheClusterManager GetDiscoveryManager(this CacheClientConfiguration conf) => new(conf);
/// <summary>
- /// Discovers ALL possible cache nodes itteritivley from the current collection of initial peers.
- /// This will make connections to all discoverable servers and update the client configuration, with all
- /// discovered peers
+ /// Converts the cache client configuration to a cluster client
/// </summary>
/// <param name="config"></param>
- /// <param name="initialPeers">Accepts an array of initial peers to override the endpoint discovery process</param>
- /// <param name="cancellation">A token to cancel the operation</param>
- /// <returns>A task that completes when all nodes have been discovered</returns>
- /// <exception cref="ArgumentException"></exception>
- /// <exception cref="CacheDiscoveryFailureException"></exception>
- public static async Task DiscoverNodesAsync(this CacheClientConfiguration config, CacheNodeAdvertisment[] initialPeers, CancellationToken cancellation)
- {
- //Make sure at least one node defined
- if (initialPeers == null || initialPeers.Length == 0)
- {
- throw new ArgumentException("There must be at least one initial peer");
- }
-
- //Get the discovery enumerator with the initial peers
- INodeDiscoveryEnumerator enumerator = config.NodeCollection.BeginDiscovery(initialPeers);
-
- //Start the discovery process
- await DiscoverNodesAsync(enumerator, config, config.ErrorHandler, cancellation);
-
- //Commit nodes
- config.NodeCollection.CompleteDiscovery(enumerator);
- }
-
- private static async Task DiscoverNodesAsync(
- INodeDiscoveryEnumerator enumerator,
- CacheClientConfiguration config,
- ICacheDiscoveryErrorHandler? errHandler,
- CancellationToken cancellation
- )
- {
- //Loop through servers
- while (enumerator.MoveNext())
- {
- //Make sure the node has a discovery endpoint
- if (enumerator.Current.DiscoveryEndpoint == null)
- {
- //Skip this node
- continue;
- }
-
- /*
- * We are allowed to save nodes that do not have a discovery endpoint, but we cannot
- * discover nodes from them we can only use them as cache
- */
-
- //add a random delay to avoid spamming servers
- await Task.Delay((int)Random.Shared.NextInt64(100, 500), cancellation);
-
- try
- {
- //Discover nodes from the current node
- CacheNodeAdvertisment[]? nodes = await GetCacheNodesAsync(enumerator.Current, config, cancellation);
-
- if (nodes != null)
- {
- //Add nodes to the collection
- enumerator.OnPeerDiscoveryComplete(nodes);
- }
- }
- //Catch exceptions when an error handler is defined
- catch(Exception ex) when (errHandler != null)
- {
- //Handle the error
- errHandler.OnDiscoveryError(enumerator.Current, ex);
- }
- catch(Exception ex)
- {
- throw new CacheDiscoveryFailureException($"Failed to discovery peer node {enumerator.Current?.NodeId}, cannot continue", ex);
- }
- }
- }
-
- private static async Task<CacheNodeAdvertisment?> DiscoverNodeConfigAsync(Uri serverUri, CacheClientConfiguration config, CancellationToken cancellation)
- {
- try
- {
- GetConfigRequest req = new (serverUri, config);
-
- //Site adapter verifies response messages so we dont need to check on the response
- byte[] data = await SiteAdapter.Value.ExecuteAsync(req, cancellation).AsBytes()
- ?? throw new CacheDiscoveryFailureException($"No data returned from desired cache node");
-
- //Response is jwt
- using JsonWebToken responseJwt = JsonWebToken.ParseRaw(data);
-
- //The entire payload is just the single serialzed advertisment
- using JsonDocument doc = responseJwt.GetPayload();
-
- return doc.RootElement.GetProperty("sub").Deserialize<CacheNodeAdvertisment>();
- }
- //Bypass cdfe when error handler is null
- catch(CacheDiscoveryFailureException) when(config.ErrorHandler == null)
- {
- throw;
- }
- //Catch exceptions when an error handler is defined
- catch (Exception ex) when (config.ErrorHandler != null)
- {
- //Handle the error
- config.ErrorHandler.OnDiscoveryError(null!, ex);
- return null;
- }
- catch (Exception ex)
- {
- throw new CacheDiscoveryFailureException("Failed to discover node configuration", ex);
- }
- }
-
- /// <summary>
- /// Contacts the given server's discovery endpoint to discover a list of available
- /// servers we can connect to
- /// </summary>
- /// <param name="advert">An advertisment of a server to discover other nodes from</param>
- /// <param name="cancellationToken">A token to cancel the operationS</param>
- /// <param name="config">The cache configuration object</param>
- /// <returns>The list of active servers</returns>
- /// <exception cref="SecurityException"></exception>
- /// <exception cref="ArgumentException"></exception>
- /// <exception cref="ArgumentNullException"></exception>
- public static async Task<CacheNodeAdvertisment[]?> GetCacheNodesAsync(CacheNodeAdvertisment advert, CacheClientConfiguration config, CancellationToken cancellationToken = default)
- {
- _ = advert ?? throw new ArgumentNullException(nameof(advert));
- _ = config ?? throw new ArgumentNullException(nameof(config));
- _ = advert.DiscoveryEndpoint ?? throw new ArgumentException("Advertisment does not expose an advertisment endpoint");
-
- DiscoveryRequest req = new (advert.DiscoveryEndpoint, config);
-
- //Site adapter verifies response messages so we dont need to check on the response
- byte[] data = await SiteAdapter.Value.ExecuteAsync(req, cancellationToken).AsBytes()
- ?? throw new InvalidOperationException($"No data returned from node {advert.NodeId}");
-
- //Response is jwt
- using JsonWebToken responseJwt = JsonWebToken.ParseRaw(data);
-
- using JsonDocument doc = responseJwt.GetPayload();
- return doc.RootElement.GetProperty("peers").Deserialize<CacheNodeAdvertisment[]>();
- }
-
- /// <summary>
- /// Allows for configuration of an <see cref="FBMClient"/>
- /// for a connection to a cache server
- /// </summary>
- /// <param name="client"></param>
- /// <returns>A fluent api configuration builder for the current client</returns>
- public static CacheClientConfiguration GetCacheConfiguration(this FBMClientFactory client) => ClientCacheConfig.GetOrCreateValue(client);
-
- /// <summary>
- /// Explicitly set the client cache configuration for the current client
- /// </summary>
- /// <param name="client"></param>
- /// <param name="config">The cache node configuration</param>
- /// <returns>The config instance</returns>
- public static CacheClientConfiguration SetCacheConfiguration(this FBMClientFactory client, CacheClientConfiguration config)
- {
- ClientCacheConfig.AddOrUpdate(client, config);
- return config;
- }
-
- /// <summary>
- /// Explicitly set the cache node configuration for the current client
- /// </summary>
- /// <param name="client"></param>
- /// <param name="nodeConfig">The cache node configuration</param>
- /// <returns>The config instance</returns>
- public static CacheNodeConfiguration SetCacheConfiguration(this FBMClientFactory client, CacheNodeConfiguration nodeConfig)
- {
- ClientCacheConfig.AddOrUpdate(client, nodeConfig);
- return nodeConfig;
- }
+ /// <param name="factory">The FBM client factory instance to use</param>
+ /// <returns>The new cluster client instance</returns>
+ public static VNCacheClusterClient ToClusterClient(this CacheClientConfiguration config, FBMClientFactory factory) => new(config, factory);
/// <summary>
/// Waits for the client to disconnect from the server while observing
@@ -396,6 +166,8 @@ namespace VNLib.Data.Caching.Extensions
/// <returns>A task that complets when the connecion has been closed successfully</returns>
public static async Task WaitForExitAsync(this FBMClient client, CancellationToken token = default)
{
+ ArgumentNullException.ThrowIfNull(client);
+
client.LogDebug("Waiting for cache client to exit");
//Get task for cancellation
@@ -420,88 +192,6 @@ namespace VNLib.Data.Caching.Extensions
}
/// <summary>
- /// Discovers all available nodes for the current client config
- /// </summary>
- /// <param name="client"></param>
- /// <param name="cancellation">A token to cancel the operation</param>
- /// <returns>A task that completes when all nodes have been discovered</returns>
- public static Task DiscoverAvailableNodesAsync(this FBMClientFactory client, CancellationToken cancellation = default)
- {
- //Get stored config
- CacheClientConfiguration conf = ClientCacheConfig.GetOrCreateValue(client);
-
- //Discover all nodes
- return conf.DiscoverNodesAsync(cancellation);
- }
-
- /// <summary>
- /// Connects to a random server from the servers discovered during a cache server discovery
- /// </summary>
- /// <param name="client"></param>
- /// <param name="cancellation">A token to cancel the operation</param>
- /// <returns>The server that the connection was made with</returns>
- /// <exception cref="FBMException"></exception>
- /// <exception cref="FBMServerNegiationException"></exception>
- /// <exception cref="ArgumentException"></exception>
- /// <exception cref="ArgumentNullException"></exception>
- /// <exception cref="SecurityException"></exception>
- /// <exception cref="ObjectDisposedException"></exception>
- public static async Task<CacheNodeAdvertisment> ConnectToRandomCacheAsync(this FBMClientFactory client, CancellationToken cancellation = default)
- {
- //Get stored config
- CacheClientConfiguration conf = ClientCacheConfig.GetOrCreateValue(client);
-
- //Get all available nodes, or at least the initial peers
- CacheNodeAdvertisment[]? adverts = conf.NodeCollection.GetAllNodes() ?? throw new ArgumentException("No cache nodes discovered, cannot connect");
-
- //Select random node from all available nodes
- CacheNodeAdvertisment randomServer = adverts.SelectRandom();
-
- //Connect to the random server
- await ConnectToCacheAsync(client, randomServer, cancellation);
-
- //Return the random server we connected to
- return randomServer;
- }
-
- /// <summary>
- /// Connects to the specified server on the configured cache client
- /// </summary>
- /// <param name="factory"></param>
- /// <param name="server">The server to connect to</param>
- /// <param name="token">A token to cancel the operation</param>
- /// <returns>A task that resolves when the client is connected to the cache server</returns>
- /// <exception cref="FBMException"></exception>
- /// <exception cref="FBMServerNegiationException"></exception>
- /// <exception cref="ArgumentException"></exception>
- /// <exception cref="ArgumentNullException"></exception>
- /// <exception cref="SecurityException"></exception>
- /// <exception cref="ObjectDisposedException"></exception>
- public static async Task<FBMClient> ConnectToCacheAsync(this FBMClientFactory factory, CacheNodeAdvertisment server, CancellationToken token = default)
- {
- _ = factory ?? throw new ArgumentNullException(nameof(factory));
- _ = server ?? throw new ArgumentNullException(nameof(server));
-
- //Get stored config
- CacheClientConfiguration conf = ClientCacheConfig.GetOrCreateValue(factory);
-
- //Create new client
- FBMClient client = factory.CreateClient();
-
- try
- {
- //Connect to server (no server id because client not replication server)
- await ConnectToCacheAsync(client, conf, server, token);
- return client;
- }
- catch
- {
- client.Dispose();
- throw;
- }
- }
-
- /// <summary>
/// Connects to the specified server on the configured cache client
/// </summary>
/// <param name="client"></param>
@@ -517,9 +207,9 @@ namespace VNLib.Data.Caching.Extensions
/// <exception cref="ObjectDisposedException"></exception>
public static Task ConnectToCacheAsync(this FBMClient client, CacheNodeAdvertisment server, CacheClientConfiguration explicitConfig, CancellationToken token = default)
{
- _ = client ?? throw new ArgumentNullException(nameof(client));
- _ = server ?? throw new ArgumentNullException(nameof(server));
-
+ ArgumentNullException.ThrowIfNull(client);
+ ArgumentNullException.ThrowIfNull(server);
+
//Connect to server (no server id because client not replication server)
return ConnectToCacheAsync(client, explicitConfig, server, token);
}
@@ -546,7 +236,7 @@ namespace VNLib.Data.Caching.Extensions
NegotationRequest req = new(server.ConnectEndpoint, config);
//Exec negotiation
- RestResponse response = await SiteAdapter.Value.ExecuteAsync(req, token);
+ RestResponse response = await CacheSiteAdapter.Instance.ExecuteAsync(req, token);
/*
* JWT will already be veified by the endpoint adapter, so we
@@ -582,6 +272,12 @@ namespace VNLib.Data.Caching.Extensions
Scheme = config.UseTls ? "wss://" : "ws://"
};
+ //if the server is specifying https urls, then attempt to upgrade to wss
+ if (server.ConnectEndpoint.Scheme == Uri.UriSchemeHttps)
+ {
+ uriBuilder.Scheme = "wss://";
+ }
+
//Connect async
await client.ConnectAsync(uriBuilder.Uri, token);
}
@@ -658,7 +354,7 @@ namespace VNLib.Data.Caching.Extensions
/// <exception cref="CryptographicException"></exception>
public static bool VerifyUpgradeToken(this ICacheAuthManager man, string signature, string token, bool isPeer)
{
- _ = man ?? throw new ArgumentNullException(nameof(man));
+ ArgumentNullException.ThrowIfNull(man);
//get the hash of the token
byte[] hash = ManagedHash.ComputeHash(token, HashAlg.SHA256);
@@ -704,8 +400,15 @@ namespace VNLib.Data.Caching.Extensions
/// <param name="message">The advertisment message to verify</param>
/// <returns>The advertisment message if successfully verified, or null otherwise</returns>
/// <exception cref="FormatException"></exception>
- public static CacheNodeAdvertisment? VerifyPeerAdvertisment(this ICacheAuthManager config, string message)
+ public static CacheNodeAdvertisment? VerifyPeerAdvertisment(this ICacheAuthManager config, string? message)
{
+ ArgumentNullException.ThrowIfNull(config);
+
+ if (string.IsNullOrWhiteSpace(message))
+ {
+ return null;
+ }
+
using JsonWebToken jwt = JsonWebToken.Parse(message);
//Verify the signature
diff --git a/lib/VNLib.Data.Caching.Extensions/src/VNCacheClusterClient.cs b/lib/VNLib.Data.Caching.Extensions/src/VNCacheClusterClient.cs
new file mode 100644
index 0000000..050e1a3
--- /dev/null
+++ b/lib/VNLib.Data.Caching.Extensions/src/VNCacheClusterClient.cs
@@ -0,0 +1,78 @@
+/*
+* Copyright (c) 2024 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Data.Caching.Extensions
+* File: VNCacheClusterClient.cs
+*
+* VNCacheClusterClient.cs is part of VNLib.Data.Caching.Extensions which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* VNLib.Data.Caching.Extensions is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Data.Caching.Extensions is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+using System.Security;
+using System.Threading;
+using System.Threading.Tasks;
+
+using VNLib.Net.Messaging.FBM;
+using VNLib.Net.Messaging.FBM.Client;
+using VNLib.Data.Caching.Extensions.Clustering;
+
+namespace VNLib.Data.Caching.Extensions
+{
+ /// <summary>
+ /// Manages client connections to cluster nodes with discovery from <see cref="VNCacheClusterManager"/>
+ /// instance.
+ /// </summary>
+ /// <param name="config">The client configuration to use when discovering or connecting to cache nodes</param>
+ /// <param name="factory">The fbm client factory instance</param>
+ public class VNCacheClusterClient(CacheClientConfiguration config, FBMClientFactory factory)
+ : VNCacheClusterManager(config)
+ {
+
+ /// <summary>
+ /// Connects to the specified server on the configured cache client
+ /// </summary>
+ /// <param name="factory"></param>
+ /// <param name="server">The server to connect to</param>
+ /// <param name="token">A token to cancel the operation</param>
+ /// <returns>A task that resolves when the client is connected to the cache server</returns>
+ /// <exception cref="FBMException"></exception>
+ /// <exception cref="FBMServerNegiationException"></exception>
+ /// <exception cref="ArgumentException"></exception>
+ /// <exception cref="ArgumentNullException"></exception>
+ /// <exception cref="SecurityException"></exception>
+ /// <exception cref="ObjectDisposedException"></exception>
+ public async Task<FBMClient> ConnectToCacheAsync(CacheNodeAdvertisment server, CancellationToken token = default)
+ {
+ ArgumentNullException.ThrowIfNull(server);
+
+ FBMClient client = factory.CreateClient();
+
+ try
+ {
+ //Connect to server (no server id because client not replication server)
+ await client.ConnectToCacheAsync(server, config, token);
+ return client;
+ }
+ catch
+ {
+ client.Dispose();
+ throw;
+ }
+ }
+ }
+}
diff --git a/lib/VNLib.Data.Caching.Extensions/src/VNCacheClusterManager.cs b/lib/VNLib.Data.Caching.Extensions/src/VNCacheClusterManager.cs
new file mode 100644
index 0000000..f68968d
--- /dev/null
+++ b/lib/VNLib.Data.Caching.Extensions/src/VNCacheClusterManager.cs
@@ -0,0 +1,306 @@
+/*
+* Copyright (c) 2024 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Data.Caching.Extensions
+* File: VNCacheClusterManager.cs
+*
+* VNCacheClusterManager.cs is part of VNLib.Data.Caching.Extensions
+* which is part of the larger VNLib collection of libraries and utilities.
+*
+* VNLib.Data.Caching.Extensions is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Data.Caching.Extensions is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+using System.Linq;
+using System.Security;
+using System.Text.Json;
+using System.Threading;
+using System.Threading.Tasks;
+using System.Collections.Generic;
+
+using RestSharp;
+
+using VNLib.Hashing.IdentityUtility;
+using VNLib.Net.Rest.Client.Construction;
+using VNLib.Data.Caching.Extensions.ApiModel;
+using VNLib.Data.Caching.Extensions.Clustering;
+
+namespace VNLib.Data.Caching.Extensions
+{
+
+ /// <summary>
+ /// A VNCache cluster client discovery maanger. Used to simplify the discovery
+ /// of cache nodes
+ /// </summary>
+ /// <param name="config">The client configuration instance</param>
+ public class VNCacheClusterManager(CacheClientConfiguration config)
+ {
+ /// <summary>
+ /// The internal collection of discovered nodes
+ /// </summary>
+ protected NodeDiscoveryCollection NodeCollection { get; } = GetNodeCollection(config);
+
+ /// <summary>
+ /// Gets the collection of discovered nodes within the manager
+ /// </summary>
+ public INodeDiscoveryCollection DiscoveredNodes => NodeCollection;
+
+ /// <summary>
+ /// The underlying <see cref="CacheClientConfiguration"/> instance
+ /// </summary>
+ public CacheClientConfiguration Config => config;
+
+ /// <summary>
+ /// Adds an array of nodes manually to the collection of discovered cluster nodes
+ /// </summary>
+ /// <param name="nodes"></param>
+ public void AddManualNodes(params CacheNodeAdvertisment[] nodes) => AddManualNodes(nodes.AsEnumerable());
+
+ /// <summary>
+ /// Adds an array of nodes manually to the collection of discovered cluster nodes
+ /// </summary>
+ /// <param name="nodes"></param>
+ public void AddManualNodes(IEnumerable<CacheNodeAdvertisment> nodes) => NodeCollection.AddManualNodes(nodes);
+
+ /// <summary>
+ /// Removes an array of nodes manually from the collection of discovered cluster nodes
+ /// </summary>
+ /// <param name="nodes"></param>
+ public void RemoveManualNodes(params CacheNodeAdvertisment[] nodes) => RemoveManualNodes(nodes.AsEnumerable());
+
+ /// <summary>
+ /// Removes an array of nodes manually from the collection of discovered cluster nodes
+ /// </summary>
+ /// <param name="nodes"></param>
+ public void RemoveManualNodes(IEnumerable<CacheNodeAdvertisment> nodes) => NodeCollection.RemoveManualNodes(nodes);
+
+ /// <summary>
+ /// Resolves the initial well-known cache nodes into their advertisments
+ /// </summary>
+ /// <param name="cancellation">A token to cancel the operation</param>
+ /// <returns>An array of resolved nodes</returns>
+ public async Task<CacheNodeAdvertisment[]> ResolveWellKnownAsync(CancellationToken cancellation)
+ {
+ //Make sure at least one node defined
+ if (config?.WellKnownNodes == null || config.WellKnownNodes.Length == 0)
+ {
+ throw new ArgumentException("There must be at least one cache node defined in the client configuration");
+ }
+
+ Task<CacheNodeAdvertisment?>[] initialAdds = new Task<CacheNodeAdvertisment?>[config.WellKnownNodes.Length];
+
+ //Discover initial advertisments from well-known addresses
+ for (int i = 0; i < config.WellKnownNodes.Length; i++)
+ {
+ initialAdds[i] = DiscoverNodeConfigAsync(config.WellKnownNodes[i], cancellation);
+ }
+
+ //Wait for all initial adds to complete
+ await Task.WhenAll(initialAdds);
+
+ //Get the initial advertisments that arent null
+ return initialAdds.Select(static x => x.Result!).Where(static s => s != null).ToArray();
+ }
+
+ /// <summary>
+ /// Discovers ALL possible cache nodes itteritivley, first by collecting the configuration
+ /// from the initial peers.
+ /// This will make connections to all discoverable servers
+ /// </summary>
+ /// <param name="cancellation">A token to cancel the operation</param>
+ /// <returns></returns>
+ /// <exception cref="ArgumentException"></exception>
+ /// <exception cref="CacheDiscoveryFailureException"></exception>
+ /// <remarks>
+ /// This method simply combines the <see cref="ResolveWellKnownAsync"/> and <see cref="DiscoverNodesAsync"/>
+ /// methods into a single operation
+ /// </remarks>
+ public async Task DiscoverNodesAsync(CancellationToken cancellation)
+ {
+ //Make sure at least one node defined
+ if (config?.WellKnownNodes == null || config.WellKnownNodes.Length == 0)
+ {
+ throw new ArgumentException("There must be at least one cache node defined in the client configuration");
+ }
+
+ /*
+ * Connect to well-known nodes from the client configuration to discovery its layout.
+ *
+ */
+ CacheNodeAdvertisment[] initialPeers = await ResolveWellKnownAsync(cancellation);
+
+ if (initialPeers.Length == 0)
+ {
+ throw new CacheDiscoveryFailureException("There must be at least one available cache node to continue discovery");
+ }
+
+ await DiscoverNodesAsync(initialPeers, cancellation);
+ }
+
+ /// <summary>
+ /// Discovers ALL possible cache nodes itteritivley from the current collection of initial peers.
+ /// This will make connections to all discoverable servers and update the client configuration, with all
+ /// discovered peers
+ /// </summary>
+ /// <param name="initialPeers">Accepts an array of initial peers to override the endpoint discovery process</param>
+ /// <param name="cancellation">A token to cancel the operation</param>
+ /// <returns>A task that completes when all nodes have been discovered</returns>
+ /// <exception cref="ArgumentException"></exception>
+ /// <exception cref="CacheDiscoveryFailureException"></exception>
+ public async Task DiscoverNodesAsync(CacheNodeAdvertisment[] initialPeers, CancellationToken cancellation)
+ {
+ //Make sure at least one node defined
+ ArgumentNullException.ThrowIfNull(initialPeers);
+ ArgumentOutOfRangeException.ThrowIfZero(initialPeers.Length);
+
+ //Get the discovery enumerator with the initial peers
+ using INodeDiscoveryEnumerator enumerator = NodeCollection.BeginDiscovery(initialPeers);
+
+ //Start the discovery process
+ await DiscoverNodesAsync(enumerator, config, config.ErrorHandler, cancellation);
+
+ //Commit discovered nodes to stored node collection
+ NodeCollection.CompleteDiscovery(enumerator);
+ }
+
+ private static async Task DiscoverNodesAsync(
+ INodeDiscoveryEnumerator enumerator,
+ CacheClientConfiguration config,
+ ICacheDiscoveryErrorHandler? errHandler,
+ CancellationToken cancellation
+ )
+ {
+ //Loop through servers
+ while (enumerator.MoveNext())
+ {
+ //Make sure the node has a discovery endpoint
+ if (enumerator.Current.DiscoveryEndpoint == null)
+ {
+ //Skip this node
+ continue;
+ }
+
+ /*
+ * We are allowed to save nodes that do not have a discovery endpoint, but we cannot
+ * discover nodes from them we can only use them as cache
+ */
+
+ //add a random delay to avoid spamming servers
+ await Task.Delay((int)Random.Shared.NextInt64(100, 500), cancellation);
+
+ try
+ {
+ //Discover nodes from the current node
+ CacheNodeAdvertisment[]? nodes = await GetCacheNodesAsync(enumerator.Current, config, cancellation);
+
+ if (nodes != null)
+ {
+ //Add nodes to the collection
+ enumerator.OnPeerDiscoveryComplete(nodes);
+ }
+ }
+ //Catch exceptions when an error handler is defined
+ catch (Exception ex) when (errHandler != null)
+ {
+ //Handle the error
+ errHandler.OnDiscoveryError(enumerator.Current, ex);
+ }
+ catch (Exception ex)
+ {
+ throw new CacheDiscoveryFailureException($"Failed to discovery peer node {enumerator.Current?.NodeId}, cannot continue", ex);
+ }
+ }
+ }
+
+ /// <summary>
+ /// Contacts the given server's discovery endpoint to discover a list of available
+ /// servers we can connect to
+ /// </summary>
+ /// <param name="advert">An advertisment of a server to discover other nodes from</param>
+ /// <param name="cancellationToken">A token to cancel the operationS</param>
+ /// <param name="config">The cache configuration object</param>
+ /// <returns>The list of active servers</returns>
+ /// <exception cref="SecurityException"></exception>
+ /// <exception cref="ArgumentException"></exception>
+ /// <exception cref="ArgumentNullException"></exception>
+ public static async Task<CacheNodeAdvertisment[]?> GetCacheNodesAsync(CacheNodeAdvertisment advert, CacheClientConfiguration config, CancellationToken cancellationToken = default)
+ {
+ ArgumentNullException.ThrowIfNull(advert);
+ ArgumentNullException.ThrowIfNull(config);
+ ArgumentNullException.ThrowIfNull(advert.DiscoveryEndpoint, nameof(advert.DiscoveryEndpoint));
+
+ DiscoveryRequest req = new (advert.DiscoveryEndpoint, config);
+
+ //Site adapter verifies response messages so we dont need to check on the response
+ byte[] data = await CacheSiteAdapter.Instance.ExecuteAsync(req, cancellationToken).AsBytes()
+ ?? throw new InvalidOperationException($"No data returned from node {advert.NodeId}");
+
+ //Response is jwt
+ using JsonWebToken responseJwt = JsonWebToken.ParseRaw(data);
+
+ using JsonDocument doc = responseJwt.GetPayload();
+ return doc.RootElement.GetProperty("peers").Deserialize<CacheNodeAdvertisment[]>();
+ }
+
+
+ /*
+ * This method will connect to a given well-known (cache config endpoint) and discover the
+ * servers configuration (endpoint config)
+ *
+ * This function exists so clients only need a single endpoint to connect to, and the server
+ * will return it's signed configuration data (including cluster network information)
+ */
+ private async Task<CacheNodeAdvertisment?> DiscoverNodeConfigAsync(Uri serverUri, CancellationToken cancellation)
+ {
+ try
+ {
+ GetConfigRequest req = new (serverUri, config);
+
+ //Site adapter verifies response messages so we dont need to check on the response
+ byte[] data = await CacheSiteAdapter.Instance.ExecuteAsync(req, cancellation).AsBytes()
+ ?? throw new CacheDiscoveryFailureException($"No data returned from desired cache node");
+
+ //Response is jwt
+ using JsonWebToken responseJwt = JsonWebToken.ParseRaw(data);
+
+ //The entire payload is just the single serialzed advertisment
+ using JsonDocument doc = responseJwt.GetPayload();
+
+ return doc.RootElement.GetProperty("sub").Deserialize<CacheNodeAdvertisment>();
+ }
+ //Bypass cdfe when error handler is null (avoid nesting)`
+ catch (CacheDiscoveryFailureException) when (config.ErrorHandler == null)
+ {
+ throw;
+ }
+ //Catch exceptions when an error handler is defined
+ catch (Exception ex) when (config.ErrorHandler != null)
+ {
+ //Handle the error
+ config.ErrorHandler.OnDiscoveryError(serverUri, ex);
+ return null;
+ }
+ catch (Exception ex)
+ {
+ throw new CacheDiscoveryFailureException("Failed to discover node configuration", ex);
+ }
+ }
+
+ private static NodeDiscoveryCollection GetNodeCollection(CacheClientConfiguration config)
+ {
+ return config is CacheNodeConfiguration cnc ? new (cnc.NodeIdRef!) : new (null);
+ }
+ }
+}
diff --git a/lib/VNLib.Data.Caching.ObjectCache/src/BlobCacheTable.cs b/lib/VNLib.Data.Caching.ObjectCache/src/BlobCacheTable.cs
index 8270f2f..789448d 100644
--- a/lib/VNLib.Data.Caching.ObjectCache/src/BlobCacheTable.cs
+++ b/lib/VNLib.Data.Caching.ObjectCache/src/BlobCacheTable.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2023 Vaughn Nugent
+* Copyright (c) 2024 Vaughn Nugent
*
* Library: VNLib
* Package: VNLib.Data.Caching.ObjectCache
@@ -66,12 +66,8 @@ namespace VNLib.Data.Caching.ObjectCache
/// <exception cref="ArgumentException"></exception>
public BlobCacheTable(uint tableSize, uint bucketSize, ICacheMemoryManagerFactory factory, IPersistantCacheStore? persistantCache)
{
- _ = factory ?? throw new ArgumentNullException(nameof(factory));
-
- if(tableSize == 0)
- {
- throw new ArgumentException("Cache table must have atleast 1 bucket");
- }
+ ArgumentNullException.ThrowIfNull(factory);
+ ArgumentOutOfRangeException.ThrowIfZero(tableSize);
//Init bucket table
_tableSize = tableSize;
@@ -106,10 +102,7 @@ namespace VNLib.Data.Caching.ObjectCache
private uint FastGetBucketIndexFromId(ReadOnlySpan<char> objectId)
{
- if (objectId.Length < 4)
- {
- throw new ArgumentException("Object id must be larger than 3 characters");
- }
+ ArgumentOutOfRangeException.ThrowIfLessThan(objectId.Length, 4, nameof(objectId));
Span<byte> buffer = stackalloc byte[4];
diff --git a/lib/VNLib.Data.Caching.ObjectCache/src/ICacheEntryMemoryManager.cs b/lib/VNLib.Data.Caching.ObjectCache/src/ICacheEntryMemoryManager.cs
index dffbfa2..dd57931 100644
--- a/lib/VNLib.Data.Caching.ObjectCache/src/ICacheEntryMemoryManager.cs
+++ b/lib/VNLib.Data.Caching.ObjectCache/src/ICacheEntryMemoryManager.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2023 Vaughn Nugent
+* Copyright (c) 2024 Vaughn Nugent
*
* Library: VNLib
* Package: VNLib.Data.Caching.ObjectCache
@@ -57,9 +57,9 @@ namespace VNLib.Data.Caching
/// Pins the handle to the specified offset and returns a
/// <see cref="MemoryHandle"/> to the pinned memory block.
/// </summary>
- /// <param name="handle"></param>
- /// <param name="offset"></param>
- /// <returns></returns>
+ /// <param name="handle">A referrence to the existing handle</param>
+ /// <param name="offset">The number of bytes from the base to offset the returned pointer</param>
+ /// <returns>A memory handle pointing to the first byte in the segment given by the offset</returns>
MemoryHandle PinHandle(object handle, int offset);
/// <summary>
diff --git a/lib/VNLib.Data.Caching/src/ClientExtensions.cs b/lib/VNLib.Data.Caching/src/ClientExtensions.cs
index f0eee06..e0aa744 100644
--- a/lib/VNLib.Data.Caching/src/ClientExtensions.cs
+++ b/lib/VNLib.Data.Caching/src/ClientExtensions.cs
@@ -451,7 +451,7 @@ namespace VNLib.Data.Caching
/// <param name="client"></param>
/// <param name="change">The instance to store change event data to</param>
/// <param name="cancellationToken">A token to cancel the deuque operation</param>
- /// <returns>A <see cref="WaitForChangeResult"/> that contains information about the modified element</returns>
+ /// <returns>A task that completes when a change has occured</returns>
/// <exception cref="InvalidResponseException"></exception>
/// <exception cref="InvalidOperationException"></exception>
public static async Task WaitForChangeAsync(this FBMClient client, WaitForChangeResult change, CancellationToken cancellationToken = default)
diff --git a/plugins/ObjectCacheServer/Taskfile.yaml b/plugins/ObjectCacheServer/Taskfile.yaml
new file mode 100644
index 0000000..a339359
--- /dev/null
+++ b/plugins/ObjectCacheServer/Taskfile.yaml
@@ -0,0 +1,88 @@
+# https://taskfile.dev
+
+#Called by the vnbuild system to produce builds for my website
+#https://www.vaughnnugent.com/resources/software
+
+#This taskfile is called from the root of a project that is being built
+#and the purpose of this taskfile is to package up the output of a build
+#from the solution file, and package it up into a tgz files for distribution
+
+version: '3'
+
+vars:
+ TARGET: '{{.USER_WORKING_DIR}}/bin'
+ RELEASE_DIR: "./bin/release/{{.TARGET_FRAMEWORK}}/publish"
+ SOURCE_OUT: "{{.USER_WORKING_DIR}}/bin/source"
+
+includes:
+ ci:
+ taskfile: server/taskfile.yaml
+ dir: server/ #must execute from the server directory
+ optional: true
+
+tasks:
+
+ #called by ci to build the output
+ build:
+ cmds:
+ - task: ci:build
+
+ #when build succeeds, archive the output into a tgz
+ postbuild_success:
+ dir: '{{.USER_WORKING_DIR}}'
+ cmds:
+ #pack up source code
+ - task: packsource
+
+ #run post in debug mode
+ - task: postbuild
+ vars: { BUILD_MODE: debug }
+
+ #remove uncessary files from the release dir
+ - powershell -Command "Get-ChildItem -Recurse '{{.RELEASE_DIR}}/' -Include *.pdb,*.xml | Remove-Item"
+
+ #run post in release mode
+ - task: postbuild
+ vars: { BUILD_MODE: release }
+
+ - task: ci:postbuild_success
+
+
+ postbuild_failed:
+ dir: '{{.USER_WORKING_DIR}}'
+ cmds:
+ - echo "postbuild failed {{.PROJECT_NAME}}"
+
+
+ postbuild:
+ dir: '{{.USER_WORKING_DIR}}'
+ internal: true
+ vars:
+ #the build output directory
+ BUILD_OUT: "{{.USER_WORKING_DIR}}/bin/{{.BUILD_MODE}}/{{.TARGET_FRAMEWORK}}/publish"
+
+ cmds:
+
+ #copy license and readme to target
+ - cd .. && powershell -Command "Copy-Item -Path ./build.readme.md -Destination '{{.BUILD_OUT}}/readme.md'"
+
+ #tar outputs
+ - cd "{{.BUILD_OUT}}" && tar -czf "{{.TARGET}}/{{.BUILD_MODE}}.tgz" .
+
+ packsource:
+ dir: '{{.USER_WORKING_DIR}}'
+ internal: true
+ cmds:
+ #copy source code to target
+ - powershell -Command "Get-ChildItem -Include *.cs,*.csproj -Recurse | Where { \$_.FullName -notlike '*\obj\*' -and \$_.FullName -notlike '*\bin\*' } | Resolve-Path -Relative | tar --files-from - -czf '{{.TARGET}}/src.tgz'"
+
+
+#Remove the output dirs on clean
+ clean:
+ dir: '{{.USER_WORKING_DIR}}'
+ cmds:
+ - for: [ 'bin/', 'obj/' ]
+ cmd: powershell Remove-Item -Recurse '{{.ITEM}}'
+ ignore_error: true
+
+ - task: ci:clean
diff --git a/plugins/ObjectCacheServer/server/config/config.json b/plugins/ObjectCacheServer/server/config/config.json
new file mode 100644
index 0000000..1f8a382
--- /dev/null
+++ b/plugins/ObjectCacheServer/server/config/config.json
@@ -0,0 +1,104 @@
+{
+
+ //Host application config, config is loaded as a read-only DOM that is available
+ //to the host and loaded child plugins, all elements are available to plugins via the 'HostConfig' property
+
+ "http": {
+ //The defaut HTTP version to being requests with (does not support http/2 yet)
+ "default_version": "HTTP/1.1",
+ //The maxium size (in bytes) of response messges that will be compressed
+ "compression_limit": 10000,
+ //Minium response size (in bytes) to compress
+ "compression_minimum": 2048,
+ //The size of the buffer to use when parsing multipart/form data uploads
+ "multipart_max_buf_size": 1024,
+ //The maxium ammount of data (in bytes) allows for mulitpart/form data file uploads
+ "multipart_max_size": 0,
+ //Absolute maximum size (in bytes) of the request entity body (exludes headers)
+ "max_entity_size": 10240,
+ //Keepalive ms for HTTP1.1 keepalive connections
+ "keepalive_ms": 100000,
+ //The buffer size to use when parsing headers (also the maxium request header size allowed)
+ "header_buf_size": 8128,
+ //The maxium number of headers allowed in an HTTP request message
+ "max_request_header_count": 50,
+ //The maxium number of allowed network connections, before 503s will be issued automatically and connections closed
+ "max_connections": 5000,
+ //The size in bytes of the buffer to use when writing response messages
+ "response_buf_size": 4096,
+ //time (in ms) to wait for a response from an active connection in recv mode, before dropping it
+ "recv_timeout_ms": 5000,
+ //Time in ms to wait for the client to accept transport data before terminating the connection
+ "send_timeout_ms": 60000,
+ //The size (in bytes) of the buffer used to store all response header data
+ "response_header_buf_size": 16384,
+ //Max number of file uploads allowed per request
+ "max_uploads_per_request": 1
+ },
+
+ //Maxium ammount of time a request is allowed to be processed (includes loading or waiting for sessions) before operations will be cancelled and a 503 returned
+ "max_execution_time_ms": 20000,
+
+ "virtual_hosts": [
+ {
+ "interface": {
+ "address": "0.0.0.0",
+ "port": 2557
+ },
+
+ //Collection of "trusted" servers to allow proxy header support from
+ "downstream_servers": [],
+
+ //The hostname to listen for, "*" as wildcard, and "[system]" as the default hostname for the current machine
+ "hostname": "*",
+ "path": "root/", //Point to some place we can read nothing from
+
+ "deny_extensions": [ ],
+ "default_files": [ ],
+ "error_files": [],
+ "cache_default_sec": 864000,
+
+ "DISABLED ssl": {}
+ }
+ ],
+
+
+ //Defines the directory where plugin's are to be loaded from
+ "plugins": {
+ //Hot-reload creates collectable assemblies that allow full re-load support in the host application, should only be used for development purposes!
+ "hot_reload": false,
+ "path": "plugins/",
+ "config_dir": "config/",
+ "assets": "plugins/assets/"
+ },
+
+ "sys_log": {
+ "path": "data/logs/sys-log.txt",
+ "flush_sec": 5,
+ "retained_files": 31,
+ "file_size_limit": 10485760,
+ "interval": "infinite"
+ },
+
+ "app_log": {
+ "path": "data/logs/app-log.txt",
+ "flush_sec": 5,
+ "retained_files": 31,
+ "file_size_limit": 10485760,
+ "interval": "infinite"
+ },
+
+ //HASHICORP VAULT
+ "hashicorp_vault": {
+ "url": "",
+ "token": "",
+ "trust_cert": false
+ },
+
+ "secrets": {
+ //Special key used by the loading library for access to the PasswordHashing library to pepper password hashes
+ "cache_private_key": "",
+ "client_public_key": ""
+ }
+}
+
diff --git a/plugins/ObjectCacheServer/server/container/Dockerfile b/plugins/ObjectCacheServer/server/container/Dockerfile
new file mode 100644
index 0000000..6c466d4
--- /dev/null
+++ b/plugins/ObjectCacheServer/server/container/Dockerfile
@@ -0,0 +1,82 @@
+#Copyright (c) Vaughn Nugent
+#Licensed under the GNU AGPL V3.0
+
+#use plain alpine latest to build native libraries in
+FROM alpine:3.19 as native-cont
+
+#install public libs and build tools
+RUN apk update && apk add build-base cmake npm
+#most universal way to use Task is from NPM
+RUN npm install -g @go-task/cli
+
+WORKDIR /build
+
+#include local artifacts
+COPY app/ .
+
+#build internal libraries and copy the libraries to the /lib output directory
+RUN mkdir out/
+RUN task build-libs
+
+#APP CONTAINER
+#move into a clean dotnet apline lean image
+FROM mcr.microsoft.com/dotnet/runtime:8.0.2-alpine3.19-amd64 as app-cont
+
+LABEL name="vnuge/vncache"
+LABEL maintainer="Vaughn Nugent <vnpublic@proton.me>"
+LABEL description="A simple clustered network data caching service"
+
+#copy local artifacts again in run container
+COPY app/ /app
+
+#pull compiled libs from build container
+COPY --from=native-cont /build/out /app/lib
+
+RUN apk update && apk add --no-cache gettext icu-libs dumb-init
+
+#workdir
+WORKDIR /app
+
+#default to 2557 for cache port
+EXPOSE 2557/tcp
+
+VOLUME /app/ssl
+#expose an assets directory for custom assets install
+VOLUME /app/usr/assets
+
+#disable dotnet invariant culture on alpine
+ENV DOTNET_SYSTEM_GLOBALIZATION_INVARIANT=0
+
+#add helper/required libraries
+#ENV VNLIB_SHARED_HEAP_FILE_PATH=/app/lib/libvn_rpmalloc.so not ready yet, still need to debug
+
+#cache varables
+ENV MAX_ENTRIES=10000
+ENV CACHE_BUCKETS=100
+ENV CACHE_MAX_MESSAGE=20480
+ENV MAX_CONCURRENT_CONNECTIONS=1000
+
+ENV VERIFY_IP=true
+ENV MAX_PEER_NODES=10
+ENV DISCOVERY_INTERVAL=360
+ENV CACHE_CONNECT_PATH="/cache"
+ENV DISCOVER_PATH="/discover"
+ENV KNOWN_PEERS=[]
+
+#HC Vault
+ENV HC_VAULT_ADDR=""
+ENV HC_VAULT_TOKEN=""
+ENV HC_VAULT_TRUST_CERT=false
+
+#SECRETS
+ENV CACHE_PRIV_KEY=""
+ENV CLIENT_PUB_KEY=""
+
+#HTTP/PROXY Config
+ENV HTTP_DOWNSTREAM_SERVERS=[]
+ENV HTTP_MAX_CONNS=5000
+
+#run the init script within dumb-init
+ENTRYPOINT ["dumb-init", "--"]
+CMD ["ash", "./run.sh"]
+
diff --git a/plugins/ObjectCacheServer/server/container/Taskfile.yaml b/plugins/ObjectCacheServer/server/container/Taskfile.yaml
new file mode 100644
index 0000000..10ee86b
--- /dev/null
+++ b/plugins/ObjectCacheServer/server/container/Taskfile.yaml
@@ -0,0 +1,80 @@
+# https://taskfile.dev
+
+#inlcuded by the ci main taskfile to produce containerized builds, and also
+#be included by the container itself to run build tasks inside the container
+
+version: "3"
+
+vars:
+ INCLUDE_FILES: "Dockerfile, docker-compose.yaml"
+
+includes:
+ install:
+ taskfile: ../install.taskfile.yaml
+ optional: true #not needed for inside container build
+
+tasks:
+ #called from inside the container to build native libraries
+ build-libs:
+ vars:
+ OUT_DIR: "{{.USER_WORKING_DIR}}/out"
+
+ #build stage generates the following libraries
+ generates:
+ - "{{.USER_WORKING_DIR}}/out/libvn_rpmalloc.so"
+
+ cmds:
+ #build rpmalloc library
+ - cd lib/vnlib_rpmalloc/ && task && cp build/libvn_rpmalloc.so {{.OUT_DIR}}/libvn_rpmalloc.so
+
+ #called from ci pipline to build the package
+ build:
+ cmds:
+ # clean up the run.sh script to remove windows line endings in my wsl default instance
+ - cmd: wsl dos2unix ./run.sh
+ platforms: [ windows/amd64 ]
+
+ #init build image
+ - task: setup-container-image
+
+ #remove the default config file as it's not needed in the container
+ - powershell -Command "rm -Force -Recurse build/app/config/"
+
+ #install rpmalloc
+ - task: install-rpmalloc-lib
+
+ postbuild_success:
+ cmds:
+ #tar up the build directory and move it to the output bin directory
+ - cmd: cd build/ && tar -czf '{{ .BINARY_DIR }}/{{.PACKAGE_FILE_NAME}}' .
+ #clean up all the build files after build succeeds
+ - task: clean
+
+ clean:
+ ignore_error: true
+ cmds:
+ - cmd: powershell -Command "rm -Recurse -Force ./build"
+
+ install-rpmalloc-lib:
+ internal: true
+ cmds:
+ #install compressor plugin
+ - task: install:install
+ vars:
+ PROJECT_NAME: 'vnlib_rpmalloc'
+ MODULE_NAME: "VNLib.Core"
+ FILE_NAME: "src.tgz"
+ DIR: './build/app/lib/vnlib_rpmalloc'
+
+ setup-container-image:
+ internal: true
+ cmds:
+ #make build directory
+ - powershell -Command "mkdir build, build/app, build/app/config-templates/, build/app/static/ -Force"
+ #copy the existing linux-x64 build to the build folder, this will be the container base
+ - powershell -Command "cp -Recurse -Force ../build/linux-x64/* build/app/"
+ #copy local scripts and config data into the build folder
+ - powershell -Command "cp -Force run.sh, Taskfile.yaml build/app/"
+ - powershell -Command "cp -Force Dockerfile, docker-compose.yaml build/"
+ - powershell -Command "cp -Force static/* build/app/static/"
+ - powershell -Command "cp -Force config-templates/* build/app/config-templates/"
diff --git a/plugins/ObjectCacheServer/server/container/config-templates/ObjectCacheServer-template.json b/plugins/ObjectCacheServer/server/container/config-templates/ObjectCacheServer-template.json
new file mode 100644
index 0000000..765c3d7
--- /dev/null
+++ b/plugins/ObjectCacheServer/server/container/config-templates/ObjectCacheServer-template.json
@@ -0,0 +1,54 @@
+{
+ "debug": false,
+
+ //enables cache server cluster node data
+ "cluster": {
+ //Delay to re-discover peers
+ "discovery_interval_sec": ${DISCOVERY_INTERVAL},
+
+ //The maxium number of peers to connect to
+ "max_peers": ${MAX_PEER_NODES},
+
+ //Max ev queue depth before LRU eviction
+ "max_queue_depth": 10000,
+
+ //Time between queue purge
+ "queue_purge_interval_sec": 360000,
+
+ //Forces strict ip address verification on upgrades (best to leave on)
+ "verify_ip": ${VERIFY_IP},
+
+ //The cache websocket endpoint path
+ "connect_path": "${CACHE_CONNECT_PATH}",
+
+ //Optional to allow nodes to discover nodes we adverties
+ "discovery_path": "${DISCOVER_PATH}",
+
+ //Optionally change the well-known path (clients must know this)
+ "well_known_path": null,
+
+ //The maxium number of connections to this node
+ "max_concurrent_connections": ${MAX_CONCURRENT_CONNECTIONS}
+ },
+
+ //Cache configuration object, FBM protocol variables
+ "cache": {
+
+ //Max number of cache entires to be stored
+ "max_cache": ${MAX_ENTRIES},
+
+ //the number of cache buckets to distribute load
+ "buckets": ${CACHE_BUCKETS},
+
+ //FBM buffer config
+ "buffer_recv_max": ${CACHE_MAX_MESSAGE}, //Up to 100Kb transfer buffer
+ "buffer_recv_min": 8192, //min of 8k transfer buffer
+ "buffer_header_max": 2048, //2k max header buffer size
+ "buffer_header_min": 128, //128 byte min request header buffer size
+ "max_message_size": ${CACHE_MAX_MESSAGE} //Absolute maxium message size allowed, also the maxium size of cache entires
+ },
+
+ //Known peers array, must point to well-known endpoint for discovery
+ "known_peers": ${KNOWN_PEERS}
+
+} \ No newline at end of file
diff --git a/plugins/ObjectCacheServer/server/container/config-templates/config-template.json b/plugins/ObjectCacheServer/server/container/config-templates/config-template.json
new file mode 100644
index 0000000..6362432
--- /dev/null
+++ b/plugins/ObjectCacheServer/server/container/config-templates/config-template.json
@@ -0,0 +1,105 @@
+{
+
+ //Host application config, config is loaded as a read-only DOM that is available
+ //to the host and loaded child plugins, all elements are available to plugins via the 'HostConfig' property
+
+ "http": {
+ //The defaut HTTP version to being requests with (does not support http/2 yet)
+ "default_version": "HTTP/1.1",
+ //The maxium size (in bytes) of response messges that will be compressed
+ "compression_limit": 10000,
+ //Minium response size (in bytes) to compress
+ "compression_minimum": 2048,
+ //The size of the buffer to use when parsing multipart/form data uploads
+ "multipart_max_buf_size": 1024,
+ //The maxium ammount of data (in bytes) allows for mulitpart/form data file uploads
+ "multipart_max_size": 0,
+ //Absolute maximum size (in bytes) of the request entity body (exludes headers)
+ "max_entity_size": 10240,
+ //Keepalive ms for HTTP1.1 keepalive connections
+ "keepalive_ms": 100000,
+ //The buffer size to use when parsing headers (also the maxium request header size allowed)
+ "header_buf_size": 8128,
+ //The maxium number of headers allowed in an HTTP request message
+ "max_request_header_count": 50,
+ //The maxium number of allowed network connections, before 503s will be issued automatically and connections closed
+ "max_connections": ${HTTP_MAX_CONNS},
+ //The size in bytes of the buffer to use when writing response messages
+ "response_buf_size": 4096,
+ //time (in ms) to wait for a response from an active connection in recv mode, before dropping it
+ "recv_timeout_ms": 5000,
+ //Time in ms to wait for the client to accept transport data before terminating the connection
+ "send_timeout_ms": 60000,
+ //The size (in bytes) of the buffer used to store all response header data
+ "response_header_buf_size": 16384,
+ //Max number of file uploads allowed per request
+ "max_uploads_per_request": 1
+ },
+
+ //Maxium ammount of time a request is allowed to be processed (includes loading or waiting for sessions) before operations will be cancelled and a 503 returned
+ "max_execution_time_ms": 20000,
+
+ "virtual_hosts": [
+ {
+ "interface": {
+ "address": "0.0.0.0",
+ "port": 2557
+ },
+
+ //Collection of "trusted" servers to allow proxy header support from
+ "downstream_servers": ${HTTP_DOWNSTREAM_SERVERS},
+
+ //The hostname to listen for, "*" as wildcard, and "[system]" as the default hostname for the current machine
+ "hostname": "*",
+ "path": "root/",
+
+ "deny_extensions": [ ],
+ "default_files": [ ],
+ "error_files": [],
+ "cache_default_sec": 864000,
+
+ //Disabled until well-tested
+ //"ssl": ${SSL_JSON}
+ }
+ ],
+
+
+ //Defines the directory where plugin's are to be loaded from
+ "plugins": {
+ //Hot-reload creates collectable assemblies that allow full re-load support in the host application, should only be used for development purposes!
+ "hot_reload": false,
+ "path": "plugins/",
+ "config_dir": "config/",
+ "assets": "plugins/assets/"
+ },
+
+ "sys_log": {
+ "path": "data/logs/sys-log.txt",
+ "flush_sec": 5,
+ "retained_files": 31,
+ "file_size_limit": 10485760,
+ "interval": "infinite"
+ },
+
+ "app_log": {
+ "path": "data/logs/app-log.txt",
+ "flush_sec": 5,
+ "retained_files": 31,
+ "file_size_limit": 10485760,
+ "interval": "infinite"
+ },
+
+ //HASHICORP VAULT
+ "hashicorp_vault": {
+ "url": "${HC_VAULT_ADDR}",
+ "token": "${HC_VAULT_TOKEN}",
+ "trust_cert": ${HC_VAULT_TRUST_CERT}
+ },
+
+ "secrets": {
+ //Special key used by the loading library for access to the PasswordHashing library to pepper password hashes
+ "cache_private_key": "${CACHE_PRIV_KEY}",
+ "client_public_key": "${CLIENT_PUB_KEY}"
+ }
+}
+
diff --git a/plugins/ObjectCacheServer/server/container/docker-compose.yaml b/plugins/ObjectCacheServer/server/container/docker-compose.yaml
new file mode 100644
index 0000000..c1b61fa
--- /dev/null
+++ b/plugins/ObjectCacheServer/server/container/docker-compose.yaml
@@ -0,0 +1,45 @@
+#Copyright (c) Vaughn Nugent
+#Licensed under the GNU AGPLv3
+
+version: '3.6'
+
+services:
+ vncache:
+ image: vnuge/vncache
+ container_name: vncache
+ restart: unless-stopped
+ hostname: vncache-server
+ volumes:
+ - ./assets:/app/usr/assets:ro #optional if assets are required
+ - ./ssl:/app/ssl:ro #optional only if SSL is enabled (currently not a feature)
+ ports:
+ - 2557:2557
+ environment:
+ #System memory consumption is calculated as follows:
+ # MAX_ENTIRES x CACHE_BUCKETS x CACHE_MAX_MESSAGE = max memory consumption
+
+ MAX_CONCURRENT_CONNECTIONS: "1000" #max number of concurrent connections
+ MAX_ENTRIES: "10000" #max number of cache entries per bucket
+ CACHE_BUCKETS: "100" #number of cache buckets for load balancing
+ CACHE_MAX_MESSAGE: "20480" #20KB
+ VERIFY_IP: "true" #verfies the IP address of clients during negotiation (recommended)
+ MAX_PEER_NODES: "10" #max number of other peer nodes this node shoud connect to
+ DISCOVERY_INTERVAL: "360" #time (in seconds) between peer node discovery
+ KNOWN_PEERS: '[]' #array of known peer nodes in the cluster
+
+ #SECRETS (must be JWK formatted keys)
+ CACHE_PRIV_KEY: "" #REQUIRED local private key used to identify and sign messages to clients and other nodes
+ CLIENT_PUB_KEY: "" #REQUIRED used to verify client messages
+
+ #HC vault
+ #HC_VAULT_ADDR: ""
+ #HC_VAULT_TOKEN: ""
+ #HC_VAULT_TRUST_CERT: "false"
+
+ #HTTP
+ #HTTP_DOWNSTREAM_SERVERS: '[]'
+ #SSL_JSON: '{"cert": "ssl/cert.pem", "privkey":"ssl/priv.pem"}'
+ HTTP_MAX_CONNS: "5000"
+
+ SERVER_ARGS: "--input-off"
+
diff --git a/plugins/ObjectCacheServer/server/container/run.sh b/plugins/ObjectCacheServer/server/container/run.sh
new file mode 100644
index 0000000..2c2636c
--- /dev/null
+++ b/plugins/ObjectCacheServer/server/container/run.sh
@@ -0,0 +1,15 @@
+#! /bin/sh
+
+#this script will be invoked by dumb-init in the container on statup and is located at /app
+
+rm -rf config && mkdir config
+
+#substitude all -template files in the config-templates dir and write them to the config dir
+for file in config-templates/*-template.json; do
+ envsubst < $file > config/$(basename $file -template.json).json
+done
+
+cp usr/assets/* plugins/assets/ -rf
+
+#start the server
+dotnet webserver/VNLib.WebServer.dll --config config/config.json $SERVER_ARGS \ No newline at end of file
diff --git a/plugins/ObjectCacheServer/server/install.ps1 b/plugins/ObjectCacheServer/server/install.ps1
new file mode 100644
index 0000000..4c42c18
--- /dev/null
+++ b/plugins/ObjectCacheServer/server/install.ps1
@@ -0,0 +1,26 @@
+param([String] $BaseUrl, [String] $ModuleName, [String] $ProjectName, [String]$FileName)
+
+#get the latest file
+Invoke-WebRequest "$BaseUrl/$ModuleName/@latest" -OutFile latest.txt
+#read the file into a variable
+$latest = Get-Content latest.txt
+
+#download the latest version
+Invoke-WebRequest "$BaseUrl/$ModuleName/$latest/$ProjectName/$FileName" -OutFile $FileName
+
+#download latest sha256
+Invoke-WebRequest "$BaseUrl/$ModuleName/$latest/$ProjectName/$FileName.sha256" -OutFile "$FileName.sha256"
+
+#verify the file
+$hash = (Get-FileHash $FileName -Algorithm SHA256).Hash
+
+#read the sha256 file
+$sha256 = Get-Content "$FileName.sha256"
+
+#compare the hashes
+if ($hash -eq $sha256) {
+ Write-Host "Hashes match, file is valid" -ForegroundColor Blue
+} else {
+ throw "Hashes do not match, file is invalid"
+}
+
diff --git a/plugins/ObjectCacheServer/server/install.taskfile.yaml b/plugins/ObjectCacheServer/server/install.taskfile.yaml
new file mode 100644
index 0000000..37baf12
--- /dev/null
+++ b/plugins/ObjectCacheServer/server/install.taskfile.yaml
@@ -0,0 +1,20 @@
+# https://taskfile.dev
+
+#Called by the vnbuild system to produce builds for my website
+#https://www.vaughnnugent.com/resources/software
+
+version: "3"
+
+tasks:
+
+ install:
+ internal: true
+ cmds:
+ #make the plugin directory
+ - cmd: powershell -Command "mkdir {{.DIR}} -Force"
+ ignore_error: true
+ - cmd: powershell -Command "pwd"
+ - cd {{.DIR}} && powershell "{{.SCRIPT_DIR}}/install.ps1" -BaseUrl {{.BUILDS_URL}} -ModuleName {{.MODULE_NAME}} -ProjectName {{.PROJECT_NAME}} -FileName {{.FILE_NAME}}
+ - cd {{.DIR}} && tar -xzf {{.FILE_NAME}}
+ #remove the archive file
+ - cd {{.DIR}} && powershell -Command "rm {{.FILE_NAME}}" \ No newline at end of file
diff --git a/plugins/ObjectCacheServer/server/taskfile.yaml b/plugins/ObjectCacheServer/server/taskfile.yaml
new file mode 100644
index 0000000..38eae79
--- /dev/null
+++ b/plugins/ObjectCacheServer/server/taskfile.yaml
@@ -0,0 +1,193 @@
+# https://taskfile.dev
+
+#Inlcuded taskfile for object cache server that is used to produce
+#ci builds for standalone caching servers
+
+version: "3"
+
+vars:
+ BUILDS_URL: https://www.vaughnnugent.com/public/resources/software/builds
+ SCRIPT_DIR: '{{.TASKFILE_DIR}}'
+ BINARY_DIR: '{{.PROJECT_DIR}}/bin' #binary dir is not available for dotnet plugis
+
+includes:
+ install:
+ taskfile: install.taskfile.yaml
+ optional: true
+
+ container:
+ dir: container #always run from the container directory
+ taskfile: container/Taskfile.yaml
+ optional: true
+ vars:
+ BUILDS_URL: '{{.BUILDS_URL}}'
+ PACKAGE_FILE_NAME: "vncache-alpine3.19-oci.tgz" #the name of the output package file
+
+tasks:
+# CLIENT-SIDE TASKS
+ default:
+ desc: "Runs the VNCache server"
+ cmds:
+ - task: run
+
+ run:
+ desc: "Runs the VNCache server"
+ silent: true
+ env:
+ #server should detect the file extension and load the correct library
+ VNLIB_SHARED_HEAP_FILE_PATH: lib/libvn_rpmalloc
+
+ cmds:
+ - cmd: dotnet webserver/VNLib.WebServer.dll --config config/config.json --input-off --inline-scheduler {{.ARGS}}
+ #setup sever environment
+
+
+ setup-debian:
+ desc: "Performs initial setup on Debian x64 based machines"
+ silent: true
+ cmds:
+ - apt update
+ - apt install -y dotnet-runtime-8.0 gcc cmake
+ - task: setup
+ - echo "Setup complete"
+
+ setup-fedora:
+ desc: "Performs initial setup on Fedora/Redhat x64 (dnf) based machines"
+ silent: true
+ cmds:
+ - dnf update
+ - dnf install -y dotnet-runtime-8.0 gcc cmake
+ - task: setup
+ - echo "Setup complete"
+
+ setup-alpine:
+ desc: "Performs initial setup on Alpine x64 based machines"
+ silent: true
+ cmds:
+ - apk update
+ - apk add --no-cache dotnet8-runtime gcc cmake
+ - task: setup
+ - echo "Setup complete"
+
+ setup:
+ cmds:
+ #build rpmalloc lib
+ - task: build-rpmalloc
+
+ build-rpmalloc:
+ internal: true
+ dir: 'lib/'
+ vars:
+ RPMALLOC_DIR: 'vnlib_rpmalloc'
+ cmds:
+ #build rpmalloc library
+ - cmd: cd vnlib_rpmalloc/ && task
+
+ - cmd: cp vnlib_rpmalloc/build/libvn_rpmalloc.so libvn_rpmalloc.so
+ platforms: [ linux ]
+
+ - cmd: cp vnlib_rpmalloc/build/libvn_rpmalloc.dylib libvn_rpmalloc.dylib
+ platforms: [ darwin ]
+
+ - cmd: powershell -Command "cp vnlib_rpmalloc/build/Release/vnlib_rpmalloc.dll libvn_rpmalloc.dll"
+ platforms: [ windows/amd64 ]
+
+# CI BUILD TASKS
+ build:
+ desc: "CI ONLY! DO NOT RUN"
+ cmds:
+ - task: install-plugins
+ - task: install-webserver
+
+ #run container build last
+ - task: container:build
+
+ install-webserver:
+ internal: true
+ cmds:
+ - for: [ win-x64, linux-x64, osx-x64, linux-arm64 ]
+ task: create-env
+ vars:
+ TARGET_OS: '{{.ITEM}}'
+
+ install-plugins:
+ internal: true
+ cmds:
+ - cmd: powershell -Command "mkdir lib -Force"
+ ignore_error: true
+
+ #copy the object-cache plugin output to the local plugins directory
+ - cmd: powershell -Command "cp -Recurse -Force {{.PROJECT_DIR}}/bin/Release/net8.0/publish/ plugins/{{.PROJECT_NAME}}/"
+
+ #download rpmalloc
+ - task: install:install
+ vars:
+ PROJECT_NAME: 'vnlib_rpmalloc'
+ MODULE_NAME: "VNLib.Core"
+ FILE_NAME: "src.tgz"
+ DIR: './lib/vnlib_rpmalloc'
+
+ postbuild_success:
+ desc: "CI ONLY! DO NOT RUN"
+ cmds:
+ - for: [ win-x64, linux-x64, osx-x64, linux-arm64 ]
+ task: pack
+ vars:
+ TARGET_OS: '{{.ITEM}}'
+
+ #cleanup unnecessary build files that clog up the pipeline
+ - for: [ build, plugins, lib ]
+ cmd: powershell -Command "rm -Recurse '{{.ITEM}}'"
+ ignore_error: true
+
+ - task: container:postbuild_success
+
+ build-container:
+ internal: true
+ cmds:
+ - task: container:build
+
+ #Creates a new webserver build environment for an operating system configuration
+ create-env:
+ internal: true
+ vars:
+ BUILD_DIR: './build/{{.TARGET_OS}}'
+ cmds:
+ #create dir for env
+ - cmd: powershell -Command "mkdir {{.BUILD_DIR}} -Force"
+ ignore_error: true
+
+ #copy build files
+ - for: [ plugins, lib, config, taskfile.yaml ]
+ cmd: powershell -Command "cp -Recurse -Force {{.ITEM}} {{.BUILD_DIR}}"
+
+ - task: get-webserver
+ vars:
+ TARGET_OS: '{{.TARGET_OS}}'
+ BUILD_DIR: '{{.BUILD_DIR}}'
+
+ #fetches a copy of (the desired os version) VNLib.WebServer project and installs it into the build directory
+ get-webserver:
+ internal: true
+ cmds:
+ - task: install:install
+ vars:
+ PROJECT_NAME: 'VNLib.Webserver'
+ MODULE_NAME: "VNLib.Webserver"
+ FILE_NAME: "{{.TARGET_OS}}-release.tgz"
+ DIR: '{{.BUILD_DIR}}/webserver'
+
+ pack:
+ internal: true
+ cmds:
+ - cd build/{{.TARGET_OS}} && tar -czf '{{ .BINARY_DIR }}/{{ .TARGET_OS }}-release.tgz' .
+
+ clean:
+ desc: "CI ONLY! DO NOT RUN"
+ ignore_error: true
+ cmds:
+ - for: [ build/, bin/, plugins/, lib/]
+ cmd: powershell -Command "rm -Recurse -Force '{{.ITEM}}'"
+
+ - task: container:clean
+
diff --git a/plugins/ObjectCacheServer/src/BucketLocalManagerFactory.cs b/plugins/ObjectCacheServer/src/BucketLocalManagerFactory.cs
index 40f4c29..6f733ed 100644
--- a/plugins/ObjectCacheServer/src/BucketLocalManagerFactory.cs
+++ b/plugins/ObjectCacheServer/src/BucketLocalManagerFactory.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2023 Vaughn Nugent
+* Copyright (c) 2024 Vaughn Nugent
*
* Library: VNLib
* Package: VNLib.Plugins.Extensions.VNCache
@@ -24,6 +24,7 @@
using System;
+using System.Linq;
using System.Buffers;
using System.Text.Json;
using System.Collections.Generic;
@@ -32,9 +33,10 @@ using System.Runtime.CompilerServices;
using VNLib.Plugins;
using VNLib.Utils;
using VNLib.Utils.Memory;
+using VNLib.Utils.Memory.Diagnostics;
+using VNLib.Utils.Logging;
using VNLib.Utils.Extensions;
using VNLib.Plugins.Extensions.Loading;
-
/*
* How bucket local memory works:
*
@@ -53,6 +55,8 @@ namespace VNLib.Data.Caching.ObjectCache.Server
{
private readonly LinkedList<BucketLocalManager> _managers = new ();
private readonly bool _zeroAll;
+ private readonly bool _enableHeapTracking;
+ private readonly ILogProvider _statsLogger;
///<inheritdoc/>
public ICacheEntryMemoryManager CreateForBucket(uint bucketId)
@@ -60,6 +64,13 @@ namespace VNLib.Data.Caching.ObjectCache.Server
//Init a new heap for the individual bucket
IUnmangedHeap localHeap = MemoryUtil.InitializeNewHeapForProcess();
+ if (_enableHeapTracking)
+ {
+#pragma warning disable CA2000 // Dispose objects before losing scope
+ localHeap = new TrackedHeapWrapper(localHeap, true);
+#pragma warning restore CA2000 // Dispose objects before losing scope
+ }
+
BucketLocalManager manager = new (localHeap, bucketId, _zeroAll);
_managers.AddLast(manager);
@@ -74,11 +85,13 @@ namespace VNLib.Data.Caching.ObjectCache.Server
if (config != null)
{
//Try to get the zero all flag
- if (config.TryGetValue("zero_all", out JsonElement zeroEl))
- {
- _zeroAll = zeroEl.GetBoolean();
- }
+ _zeroAll = config.TryGetValue("zero_all", out JsonElement zeroEl) && zeroEl.GetBoolean();
+
+ //Get the heap tracking flag
+ _enableHeapTracking = config.TryGetValue("diag_mem", out JsonElement trackEl) && trackEl.GetBoolean();
}
+
+ _statsLogger = plugin.Log.CreateScope("Cache MemStats");
}
protected override void Free()
@@ -90,6 +103,24 @@ namespace VNLib.Data.Caching.ObjectCache.Server
}
}
+ public void LogHeapStats()
+ {
+ //If tracking is not enabled, the heap instances stored by the managers will not be tracked, and the cast in the code below will fail
+ if (!_enableHeapTracking)
+ {
+ return;
+ }
+
+ string[] statsPerHeap = _managers.Select(hm =>
+ {
+ HeapStatistics stats = (hm.Heap as TrackedHeapWrapper)!.GetCurrentStats();
+ return $"\tBucket {hm.BucketId}: Current {stats.AllocatedBytes / 1024}kB, Blocks {stats.AllocatedBlocks}, Max size {stats.MaxHeapSize / 1024}kB";
+
+ }).ToArray();
+
+ _statsLogger.Debug("Memory statistics for cache memory manager: {hm}\n{stats}", GetHashCode(), statsPerHeap);
+ }
+
/*
* Buckets are mutually exclusive, so we can use a single heap for each bucket
* to get a little more performance on memory operations
@@ -104,7 +135,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server
///<inheritdoc/>
public void FreeHandle(object handle)
{
- _ = handle ?? throw new ArgumentNullException(nameof(handle));
+ ArgumentNullException.ThrowIfNull(handle);
MemoryHandle<byte> _handle = Unsafe.As<object, MemoryHandle<byte>>(ref handle);
//Free the handle
@@ -114,7 +145,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server
///<inheritdoc/>
public uint GetHandleSize(object handle)
{
- _ = handle ?? throw new ArgumentNullException(nameof(handle));
+ ArgumentNullException.ThrowIfNull(handle);
MemoryHandle<byte> _handle = Unsafe.As<object, MemoryHandle<byte>>(ref handle);
return (uint)_handle.Length;
@@ -123,7 +154,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server
///<inheritdoc/>
public Span<byte> GetSpan(object handle, uint offset, uint length)
{
- _ = handle ?? throw new ArgumentNullException(nameof(handle));
+ ArgumentNullException.ThrowIfNull(handle);
MemoryHandle<byte> _handle = Unsafe.As<object, MemoryHandle<byte>>(ref handle);
return _handle.GetOffsetSpan(offset, checked((int)length));
@@ -132,7 +163,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server
///<inheritdoc/>
public MemoryHandle PinHandle(object handle, int offset)
{
- _ = handle ?? throw new ArgumentNullException(nameof(handle));
+ ArgumentNullException.ThrowIfNull(handle);
MemoryHandle<byte> _handle = Unsafe.As<object, MemoryHandle<byte>>(ref handle);
//Pin the handle
@@ -142,7 +173,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server
///<inheritdoc/>
public void ResizeHandle(object handle, uint newSize)
{
- _ = handle ?? throw new ArgumentNullException(nameof(handle));
+ ArgumentNullException.ThrowIfNull(handle);
MemoryHandle<byte> _handle = Unsafe.As<object, MemoryHandle<byte>>(ref handle);
//Resize the handle
diff --git a/plugins/ObjectCacheServer/src/Cache/CacheListenerPubQueue.cs b/plugins/ObjectCacheServer/src/Cache/CacheListenerPubQueue.cs
index 6942828..aef0255 100644
--- a/plugins/ObjectCacheServer/src/Cache/CacheListenerPubQueue.cs
+++ b/plugins/ObjectCacheServer/src/Cache/CacheListenerPubQueue.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2023 Vaughn Nugent
+* Copyright (c) 2024 Vaughn Nugent
*
* Library: VNLib
* Package: ObjectCacheServer
@@ -45,39 +45,33 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache
internal sealed class CacheListenerPubQueue : ICacheListenerEventQueue<IPeerEventQueue>, IAsyncBackgroundWork
{
- private const int MAX_LOCAL_QUEUE_ITEMS = 10000;
- private const string LOG_SCOPE_NAME = "QUEUE";
-
private readonly AsyncQueue<ChangeEvent> _listenerQueue;
private readonly ILogProvider _logProvider;
- private readonly ICacheEventQueueManager _queueManager;
+ private readonly PeerEventQueueManager _queueManager;
- public CacheListenerPubQueue(PluginBase plugin)
+ public CacheListenerPubQueue(PluginBase plugin, PeerEventQueueManager queueMan)
{
- _queueManager = plugin.GetOrCreateSingleton<CacheEventQueueManager>();
- _logProvider = plugin.Log.CreateScope(LOG_SCOPE_NAME);
+ _queueManager = queueMan;
+ _logProvider = plugin.Log.CreateScope(CacheConstants.LogScopes.CacheListenerPubQueue);
//Init local queue to store published events
- _listenerQueue = new(new BoundedChannelOptions(MAX_LOCAL_QUEUE_ITEMS)
+ _listenerQueue = new(new BoundedChannelOptions(CacheConstants.CacheListenerChangeQueueSize)
{
AllowSynchronousContinuations = true,
FullMode = BoundedChannelFullMode.DropOldest,
- SingleReader = true,
+ SingleReader = true, //Always a singe thread reading events
SingleWriter = false,
});
}
///<inheritdoc/>
- async Task IAsyncBackgroundWork.DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken)
+ async Task IAsyncBackgroundWork.DoWorkAsync(ILogProvider _, CancellationToken exitToken)
{
const int accumulatorSize = 64;
- //Create scope
- pluginLog = pluginLog.CreateScope(LOG_SCOPE_NAME);
-
try
{
- pluginLog.Debug("Change queue worker listening for local cache changes");
+ _logProvider.Debug("Change queue worker listening for local cache changes");
//Accumulator for events
ChangeEvent[] accumulator = new ChangeEvent[accumulatorSize];
@@ -105,15 +99,12 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache
catch (OperationCanceledException)
{
//Normal exit
- pluginLog.Debug("Change queue listener worker exited");
+ _logProvider.Debug("Change queue listener worker exited");
}
}
///<inheritdoc/>
- public bool IsEnabled([NotNullWhen(true)] IPeerEventQueue? userState)
- {
- return userState is IPeerEventQueue;
- }
+ public bool IsEnabled([NotNullWhen(true)] IPeerEventQueue? userState) => userState is not null;
///<inheritdoc/>
public void PublishEvent(ChangeEvent changeEvent)
diff --git a/plugins/ObjectCacheServer/src/Cache/CacheConfiguration.cs b/plugins/ObjectCacheServer/src/Cache/CacheMemoryConfiguration.cs
index bd15d24..c404cc5 100644
--- a/plugins/ObjectCacheServer/src/Cache/CacheConfiguration.cs
+++ b/plugins/ObjectCacheServer/src/Cache/CacheMemoryConfiguration.cs
@@ -1,12 +1,12 @@
/*
-* Copyright (c) 2023 Vaughn Nugent
+* Copyright (c) 2024 Vaughn Nugent
*
* Library: VNLib
* Package: ObjectCacheServer
-* File: CacheConfiguration.cs
+* File: CacheMemoryConfiguration.cs
*
-* CacheConfiguration.cs is part of ObjectCacheServer which is part of the larger
-* VNLib collection of libraries and utilities.
+* CacheMemoryConfiguration.cs is part of ObjectCacheServer which
+* is part of the larger VNLib collection of libraries and utilities.
*
* ObjectCacheServer is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
@@ -26,7 +26,7 @@ using System.Text.Json.Serialization;
namespace VNLib.Data.Caching.ObjectCache.Server.Cache
{
- internal sealed class CacheConfiguration
+ internal sealed class CacheMemoryConfiguration
{
[JsonPropertyName("buffer_recv_max")]
public int MaxRecvBufferSize { get; set; } = 1000 * 1024;
@@ -36,6 +36,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache
[JsonPropertyName("buffer_header_max")]
public int MaxHeaderBufferSize { get; set; } = 2 * 1024;
+
[JsonPropertyName("buffer_header_min")]
public int MinHeaderBufferSize { get; set; } = 128;
@@ -49,5 +50,9 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache
[JsonPropertyName("buckets")]
public uint BucketCount { get; set; } = 10;
+
+
+ [JsonPropertyName("memory_lib_path")]
+ public string? ExternLibPath { get; set; }
}
}
diff --git a/plugins/ObjectCacheServer/src/Cache/CacheStore.cs b/plugins/ObjectCacheServer/src/Cache/CacheStore.cs
deleted file mode 100644
index 75abe37..0000000
--- a/plugins/ObjectCacheServer/src/Cache/CacheStore.cs
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
-* Copyright (c) 2024 Vaughn Nugent
-*
-* Library: VNLib
-* Package: ObjectCacheServer
-* File: CacheStore.cs
-*
-* CacheStore.cs is part of ObjectCacheServer which is part of the larger
-* VNLib collection of libraries and utilities.
-*
-* ObjectCacheServer is free software: you can redistribute it and/or modify
-* it under the terms of the GNU Affero General Public License as
-* published by the Free Software Foundation, either version 3 of the
-* License, or (at your option) any later version.
-*
-* ObjectCacheServer is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-* GNU Affero General Public License for more details.
-*
-* You should have received a copy of the GNU Affero General Public License
-* along with this program. If not, see https://www.gnu.org/licenses/.
-*/
-
-using System;
-using System.Threading;
-using System.Threading.Tasks;
-
-using VNLib.Utils.Logging;
-using VNLib.Net.Messaging.FBM;
-using VNLib.Plugins;
-using VNLib.Plugins.Extensions.Loading;
-
-namespace VNLib.Data.Caching.ObjectCache.Server.Cache
-{
- /*
- * Implements the blob cache store, which is an abstraction around the blob cache listener.
- * This allows for publishing local events (say from other nodes) to keep caches in sync.
- */
-
- [ConfigurationName("cache")]
- internal sealed class CacheStore : ICacheStore, IDisposable
- {
- /// <summary>
- /// Gets the underlying cache listener
- /// </summary>
- public BlobCacheListener<IPeerEventQueue> Listener { get; }
-
-
- public CacheStore(PluginBase plugin, IConfigScope config)
- {
- //Init cache
- Listener = InitializeCache((ObjectCacheServerEntry)plugin, config);
- }
-
- ///<inheritdoc/>
- ValueTask ICacheStore.AddOrUpdateBlobAsync<T>(string objectId, string? alternateId, ObjectDataGet<T> bodyData, T state, CancellationToken token)
- {
- return Listener.Cache.AddOrUpdateObjectAsync(objectId, alternateId, bodyData, state, default, token);
- }
-
- ///<inheritdoc/>
- void ICacheStore.Clear()
- {
- throw new NotImplementedException();
- }
-
- ///<inheritdoc/>
- ValueTask<bool> ICacheStore.DeleteItemAsync(string id, CancellationToken token)
- {
- return Listener.Cache.DeleteObjectAsync(id, token);
- }
-
- private static BlobCacheListener<IPeerEventQueue> InitializeCache(ObjectCacheServerEntry plugin, IConfigScope config)
- {
- const string CacheConfigTemplate =
-@"
-Cache Configuration:
- Max memory: {max} Mb
- Buckets: {bc}
- Entries per-bucket: {mc}
-";
-
- //Deserialize the cache config
- CacheConfiguration cacheConf = config.Deserialze<CacheConfiguration>();
-
- if (cacheConf.MaxCacheEntries < 2)
- {
- throw new ArgumentException("You must configure a 'max_cache' size larger than 1 item");
- }
-
- //Suggestion
- if (cacheConf.MaxCacheEntries < 200)
- {
- plugin.Log.Information("Suggestion: You may want a larger cache size, you have less than 200 items in cache");
- }
-
- //calculate the max memory usage
- ulong maxByteSize = cacheConf.MaxCacheEntries * (ulong)cacheConf.BucketCount * (ulong)cacheConf.MaxMessageSize;
-
- //Log the cache config
- plugin.Log.Information(CacheConfigTemplate,
- maxByteSize / (1024 * 1000),
- cacheConf.BucketCount,
- cacheConf.MaxCacheEntries
- );
-
- //Get the event listener
- ICacheListenerEventQueue<IPeerEventQueue> queue = plugin.GetOrCreateSingleton<CacheListenerPubQueue>();
-
- //Get the memory manager
- ICacheMemoryManagerFactory manager = plugin.GetOrCreateSingleton<BucketLocalManagerFactory>();
-
- //Load the blob cache table system
- IBlobCacheTable bc = plugin.LoadMemoryCacheSystem(config, manager, cacheConf);
-
- FallbackFBMMemoryManager fbmMemManager = new(plugin.ListenerHeap);
-
- //Endpoint only allows for a single reader
- return new(bc, queue, plugin.Log, fbmMemManager);
- }
-
- /*
- * Cleaned up by the plugin on exit
- */
- public void Dispose()
- {
- Listener.Dispose();
- }
- }
-}
diff --git a/plugins/ObjectCacheServer/src/Cache/CacheSystemUtil.cs b/plugins/ObjectCacheServer/src/Cache/CacheSystemUtil.cs
index b7bf83f..8f196b0 100644
--- a/plugins/ObjectCacheServer/src/Cache/CacheSystemUtil.cs
+++ b/plugins/ObjectCacheServer/src/Cache/CacheSystemUtil.cs
@@ -29,6 +29,7 @@ using System.Text.Json;
using VNLib.Utils.Resources;
using VNLib.Plugins;
using VNLib.Plugins.Extensions.Loading;
+using VNLib.Utils.Extensions;
namespace VNLib.Data.Caching.ObjectCache.Server.Cache
{
@@ -49,7 +50,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache
/// <param name="cacheConf">The cache configuration object</param>
/// <returns>The loaded <see cref="IBlobCacheTable"/> implementation</returns>
/// <exception cref="FileNotFoundException"></exception>
- public static IBlobCacheTable LoadMemoryCacheSystem(this PluginBase plugin, IConfigScope config, ICacheMemoryManagerFactory heap, CacheConfiguration cacheConf)
+ public static IBlobCacheTable LoadMemoryCacheSystem(this PluginBase plugin, IConfigScope config, ICacheMemoryManagerFactory heap, CacheMemoryConfiguration cacheConf)
{
#pragma warning disable CA2000 // Dispose objects before losing scope
@@ -94,10 +95,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache
if(initMethod != null)
{
//Itterate all buckets
- foreach (IBlobCacheBucket bucket in table)
- {
- initMethod.Invoke(bucket.Id);
- }
+ table.ForEach(bucket => initMethod(bucket.Id));
}
}
diff --git a/plugins/ObjectCacheServer/src/Cache/CacheEventQueueManager.cs b/plugins/ObjectCacheServer/src/Cache/PeerEventQueueManager.cs
index e3c613d..4b76a9b 100644
--- a/plugins/ObjectCacheServer/src/Cache/CacheEventQueueManager.cs
+++ b/plugins/ObjectCacheServer/src/Cache/PeerEventQueueManager.cs
@@ -1,11 +1,11 @@
/*
-* Copyright (c) 2023 Vaughn Nugent
+* Copyright (c) 2024 Vaughn Nugent
*
* Library: VNLib
* Package: ObjectCacheServer
-* File: CacheEventQueueManager.cs
+* File: PeerEventQueueManager.cs
*
-* CacheEventQueueManager.cs is part of ObjectCacheServer which is
+* PeerEventQueueManager.cs is part of ObjectCacheServer which is
* part of the larger VNLib collection of libraries and utilities.
*
* ObjectCacheServer is free software: you can redistribute it and/or modify
@@ -38,41 +38,37 @@ using VNLib.Plugins.Extensions.Loading.Events;
namespace VNLib.Data.Caching.ObjectCache.Server.Cache
{
- internal sealed class CacheEventQueueManager : ICacheEventQueueManager, IDisposable, IIntervalScheduleable
+ internal sealed class PeerEventQueueManager : ICacheEventQueueManager, IIntervalScheduleable
{
private readonly int MaxQueueDepth;
- private readonly object SubLock;
- private readonly LinkedList<NodeQueue> Subscribers;
+ private readonly object SubLock = new();
+ private readonly LinkedList<PeerEventListenerQueue> Subscribers = [];
- private readonly object StoreLock;
- private readonly Dictionary<string, NodeQueue> QueueStore;
+ private readonly object StoreLock = new();
+ private readonly Dictionary<string, PeerEventListenerQueue> QueueStore = new(StringComparer.OrdinalIgnoreCase);
-
- public CacheEventQueueManager(PluginBase plugin)
+ public PeerEventQueueManager(PluginBase plugin, ServerClusterConfig config)
{
- //Get node config
- NodeConfig config = plugin.GetOrCreateSingleton<NodeConfig>();
-
- //Get max queue depth
MaxQueueDepth = config.MaxQueueDepth;
/*
- * Schedule purge interval to clean up stale queues
- */
+ * Schedule purge interval to clean up stale queues
+ */
plugin.ScheduleInterval(this, config.EventQueuePurgeInterval);
-
- SubLock = new();
- Subscribers = new();
-
- StoreLock = new();
- QueueStore = new(StringComparer.OrdinalIgnoreCase);
+
+ //Cleanup disposeables on unload
+ _ = plugin.RegisterForUnload(() =>
+ {
+ QueueStore.Clear();
+ Subscribers.Clear();
+ });
}
///<inheritdoc/>
public IPeerEventQueue Subscribe(ICachePeer peer)
{
- NodeQueue? nq;
+ PeerEventListenerQueue? nq;
bool isNew = false;
@@ -82,13 +78,13 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache
//Try to recover the queue for the node
if (!QueueStore.TryGetValue(peer.NodeId, out nq))
{
- //Create new queue
+ //Create new queue since an existing queue was not found
nq = new(peer.NodeId, MaxQueueDepth);
QueueStore.Add(peer.NodeId, nq);
isNew = true;
}
- //Increment listener count
+ //Increment listener count since a new listener has attached
nq.Listeners++;
}
@@ -109,11 +105,20 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache
///<inheritdoc/>
public void Unsubscribe(ICachePeer peer)
{
+ /*
+ * The reason I am not purging queues that no longer have listeners
+ * now is because it is possible that a listener needed to detach because of
+ * a network issue and will be reconnecting shortly. If the node doesnt
+ * come back before the next purge interval, it's events will be purged.
+ *
+ * Point is: there is a reason for the garbage collection style purging
+ */
+
//Detach a listener for a node
lock (StoreLock)
{
//Get the queue and decrement the listener count
- NodeQueue nq = QueueStore[peer.NodeId];
+ PeerEventListenerQueue nq = QueueStore[peer.NodeId];
nq.Listeners--;
}
}
@@ -125,7 +130,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache
lock (SubLock)
{
//Loop through ll the fast way
- LinkedListNode<NodeQueue>? q = Subscribers.First;
+ LinkedListNode<PeerEventListenerQueue>? q = Subscribers.First;
while (q != null)
{
@@ -145,7 +150,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache
lock (SubLock)
{
//Loop through ll the fast way
- LinkedListNode<NodeQueue>? q = Subscribers.First;
+ LinkedListNode<PeerEventListenerQueue>? q = Subscribers.First;
while (q != null)
{
@@ -167,9 +172,9 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache
lock (StoreLock)
{
//Get all stale queues (queues without listeners)
- NodeQueue[] staleQueues = QueueStore.Values.Where(static nq => nq.Listeners == 0).ToArray();
+ PeerEventListenerQueue[] staleQueues = QueueStore.Values.Where(static nq => nq.Listeners == 0).ToArray();
- foreach (NodeQueue nq in staleQueues)
+ foreach (PeerEventListenerQueue nq in staleQueues)
{
//Remove from store
QueueStore.Remove(nq.NodeId);
@@ -191,54 +196,39 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache
return Task.CompletedTask;
}
- void IDisposable.Dispose()
- {
- QueueStore.Clear();
- Subscribers.Clear();
- }
/*
* Holds queues for each node and keeps track of the number of listeners
* attached to the queue
+ *
+ * The role of this class is to store change events for a given peer node,
+ * and return them when the peer requests them. It also keeps track of the
+ * number of active listeners (server connections) to the queue.
*/
- private sealed class NodeQueue : IPeerEventQueue
+ private sealed class PeerEventListenerQueue(string nodeId, int maxDepth) : IPeerEventQueue
{
public int Listeners;
- public string NodeId { get; }
-
- public AsyncQueue<ChangeEvent> Queue { get; }
+ public string NodeId => nodeId;
- public NodeQueue(string nodeId, int maxDepth)
+ /*
+ * Create a bounded channel that acts as a lru and evicts
+ * the oldest item when the queue is full
+ *
+ * There will also only ever be a single thread writing events
+ * to the queue
+ */
+ private readonly AsyncQueue<ChangeEvent> Queue = new(new BoundedChannelOptions(maxDepth)
{
- NodeId = nodeId;
-
- /*
- * Create a bounded channel that acts as a lru and evicts
- * the oldest item when the queue is full
- *
- * There will also only ever be a single thread writing events
- * to the queue
- */
-
- BoundedChannelOptions queueOptions = new(maxDepth)
- {
- AllowSynchronousContinuations = true,
- SingleReader = false,
- SingleWriter = true,
- //Drop oldest item in queue if full
- FullMode = BoundedChannelFullMode.DropOldest,
- };
-
- //Init queue/channel
- Queue = new(queueOptions);
- }
+ AllowSynchronousContinuations = true,
+ SingleReader = false,
+ SingleWriter = true,
+ //Drop oldest item in queue if full
+ FullMode = BoundedChannelFullMode.DropOldest,
+ });
- public void PublishChange(ChangeEvent change)
- {
- Queue.TryEnque(change);
- }
+ public void PublishChange(ChangeEvent change) => Queue.TryEnque(change);
public void PublishChanges(Span<ChangeEvent> changes)
{
@@ -249,16 +239,10 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache
}
///<inheritdoc/>
- public ValueTask<ChangeEvent> DequeueAsync(CancellationToken cancellation)
- {
- return Queue.DequeueAsync(cancellation);
- }
+ public ValueTask<ChangeEvent> DequeueAsync(CancellationToken cancellation) => Queue.DequeueAsync(cancellation);
///<inheritdoc/>
- public bool TryDequeue(out ChangeEvent change)
- {
- return Queue.TryDequeue(out change);
- }
+ public bool TryDequeue(out ChangeEvent change) => Queue.TryDequeue(out change);
}
}
}
diff --git a/plugins/ObjectCacheServer/src/CacheAuthKeyStore.cs b/plugins/ObjectCacheServer/src/CacheAuthKeyStore.cs
index 5fc700b..5be0776 100644
--- a/plugins/ObjectCacheServer/src/CacheAuthKeyStore.cs
+++ b/plugins/ObjectCacheServer/src/CacheAuthKeyStore.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2023 Vaughn Nugent
+* Copyright (c) 2024 Vaughn Nugent
*
* Library: VNLib
* Package: ObjectCacheServer
@@ -34,16 +34,10 @@ using VNLib.Data.Caching.Extensions;
namespace VNLib.Data.Caching.ObjectCache.Server
{
- sealed record class CacheAuthKeyStore : ICacheAuthManager
+ sealed class CacheAuthKeyStore(PluginBase plugin) : ICacheAuthManager
{
- private readonly IAsyncLazy<ReadOnlyJsonWebKey> _clientPub;
- private readonly IAsyncLazy<ReadOnlyJsonWebKey> _cachePriv;
-
- public CacheAuthKeyStore(PluginBase plugin)
- {
- _clientPub = plugin.GetSecretAsync("client_public_key").ToLazy(r => r.GetJsonWebKey());
- _cachePriv = plugin.GetSecretAsync("cache_private_key").ToLazy(r => r.GetJsonWebKey());
- }
+ private readonly IAsyncLazy<ReadOnlyJsonWebKey> _clientPub = plugin.Secrets().GetSecretAsync("client_public_key").ToLazy(r => r.GetJsonWebKey());
+ private readonly IAsyncLazy<ReadOnlyJsonWebKey> _cachePriv = plugin.Secrets().GetSecretAsync("cache_private_key").ToLazy(r => r.GetJsonWebKey());
///<inheritdoc/>
public IReadOnlyDictionary<string, string?> GetJwtHeader()
diff --git a/plugins/ObjectCacheServer/src/CacheConstants.cs b/plugins/ObjectCacheServer/src/CacheConstants.cs
new file mode 100644
index 0000000..85f737d
--- /dev/null
+++ b/plugins/ObjectCacheServer/src/CacheConstants.cs
@@ -0,0 +1,107 @@
+/*
+* Copyright (c) 2024 Vaughn Nugent
+*
+* Library: VNLib
+* Package: ObjectCacheServer
+* File: CacheConstants.cs
+*
+* CacheConstants.cs is part of ObjectCacheServer which is
+* part of the larger VNLib collection of libraries and utilities.
+*
+* ObjectCacheServer is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* ObjectCacheServer is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+
+namespace VNLib.Data.Caching.ObjectCache.Server
+{
+ internal static class CacheConstants
+ {
+ /// <summary>
+ /// The default path for the VNCache well known endpoint (aka discovery endpoint)
+ /// </summary>
+ public const string DefaultWellKnownPath = "/.well-known/vncache";
+
+ /// <summary>
+ /// The maximum size of buffers for FBM messages sent between servers.
+ /// </summary>
+ public const int MaxSyncMessageSize = 12 * 1024;
+
+ /// <summary>
+ /// The maximum size of the change queue for the cache listener
+ /// </summary>
+ public const int CacheListenerChangeQueueSize = 10000;
+
+ /// <summary>
+ /// The time a client authorization token is valid for
+ /// </summary>
+ public static readonly TimeSpan ClientAuthTokenExpiration = TimeSpan.FromSeconds(30);
+
+ public static class LogScopes
+ {
+ /// <summary>
+ /// The log scope for the cache listener
+ /// </summary>
+ public const string BlobCacheListener = "CacheListener";
+
+ /// <summary>
+ /// The peer discovery log scope
+ /// </summary>
+ public const string PeerDiscovery = "DISC";
+
+ /// <summary>
+ /// The log scope for the replication FBM client debug log (if debugging is enabled)
+ /// </summary>
+ public const string ReplicationFbmDebug = "REPL-CLNT";
+
+ /// <summary>
+ /// The log scope for cache replication events
+ /// </summary>
+ public const string RepliactionManager = "REPL-MGR";
+
+ /// <summary>
+ /// The log scope for the cache listener change event queue
+ /// </summary>
+ public const string CacheListenerPubQueue = "QUEUE";
+
+ /// <summary>
+ /// The log scope for the cache connection websocket endpoint
+ /// </summary>
+ public const string ConnectionEndpoint = "CONEP";
+ }
+
+ public static class Delays
+ {
+ /// <summary>
+ /// The amount of startup delay before starting an initial peer discovery
+ /// </summary>
+ public static readonly TimeSpan InitialDiscovery = TimeSpan.FromSeconds(15);
+
+ /// <summary>
+ /// The amount of time to wait before retrying a failed resolve
+ /// of a well-known peers
+ /// </summary>
+ public static readonly TimeSpan WellKnownResolveFailed = TimeSpan.FromSeconds(20);
+
+ /// <summary>
+ /// The amount of time to wait when getting the value of a changed item from the cache
+ /// </summary>
+ /// <remarks>
+ /// When an item change was detected from another peer, the cache will wait this
+ /// amount of time to get the new value from the cache before timing out.
+ /// </remarks>
+ public static readonly TimeSpan CacheSyncGetItemTimeout = TimeSpan.FromSeconds(10);
+ }
+ }
+}
diff --git a/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs b/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs
index a240dde..92f0352 100644
--- a/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs
+++ b/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2023 Vaughn Nugent
+* Copyright (c) 2024 Vaughn Nugent
*
* Library: VNLib
* Package: ObjectCacheServer
@@ -36,7 +36,6 @@ using VNLib.Net.Messaging.FBM;
using VNLib.Net.Messaging.FBM.Client;
using VNLib.Plugins.Extensions.Loading;
using VNLib.Data.Caching.Extensions.Clustering;
-using VNLib.Data.Caching.ObjectCache.Server.Cache;
namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
{
@@ -55,43 +54,36 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
internal sealed class CacheNodeReplicationMaanger : IAsyncBackgroundWork
{
- private const string LOG_SCOPE_NAME = "REPL";
-
- private static readonly TimeSpan GetItemTimeout = TimeSpan.FromSeconds(10);
- private const int MAX_MESSAGE_SIZE = 12 * 1024;
-
private readonly PluginBase _plugin;
private readonly ILogProvider _log;
- private readonly NodeConfig _nodeConfig;
- private readonly ICacheStore _cacheStore;
- private readonly ICachePeerAdapter _peerAdapter;
private readonly FBMClientFactory _clientFactory;
-
+ private readonly ObjectCacheSystemState _sysState;
+
private readonly bool _isDebug;
private int _openConnections;
public CacheNodeReplicationMaanger(PluginBase plugin)
{
- //Load the node config
- _nodeConfig = plugin.GetOrCreateSingleton<NodeConfig>();
- _cacheStore = plugin.GetOrCreateSingleton<CacheStore>();
- _peerAdapter = plugin.GetOrCreateSingleton<PeerDiscoveryManager>();
+ _sysState = plugin.GetOrCreateSingleton<ObjectCacheSystemState>();
//Init fbm config with fixed message size
FBMClientConfig clientConfig = FBMDataCacheExtensions.GetDefaultConfig(
- (plugin as ObjectCacheServerEntry)!.ListenerHeap,
- MAX_MESSAGE_SIZE,
- debugLog: plugin.IsDebug() ? plugin.Log : null
+ _sysState.SharedCacheHeap,
+ CacheConstants.MaxSyncMessageSize,
+ debugLog: plugin.IsDebug() ? plugin.Log.CreateScope(CacheConstants.LogScopes.ReplicationFbmDebug) : null
);
//Init ws fallback factory and client factory
- FBMFallbackClientWsFactory wsFactory = new();
- _clientFactory = new(in clientConfig, wsFactory);
+ _clientFactory = new(
+ ref clientConfig,
+ new FBMFallbackClientWsFactory(),
+ (int)_sysState.ClusterConfig.MaxPeerConnections
+ );
_plugin = plugin;
_isDebug = plugin.IsDebug();
- _log = plugin.Log.CreateScope(LOG_SCOPE_NAME);
+ _log = plugin.Log.CreateScope(CacheConstants.LogScopes.RepliactionManager);
}
public async Task DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken)
@@ -103,7 +95,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
while (true)
{
//Get all new peers
- CacheNodeAdvertisment[] peers = _peerAdapter.GetNewPeers();
+ CacheNodeAdvertisment[] peers = _sysState.PeerDiscovery.GetNewPeers();
if (peers.Length == 0 && _isDebug)
{
@@ -111,7 +103,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
}
//Make sure we don't exceed the max connections
- if(_openConnections >= _nodeConfig.MaxPeerConnections)
+ if(_openConnections >= _sysState.ClusterConfig.MaxPeerConnections)
{
if (_isDebug)
{
@@ -148,15 +140,24 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
_log.Information("Node replication worker exited");
}
+ /*
+ * This method is called when a new peer has connected (or discovered) to establish a
+ * replication connection.
+ */
private async Task OnNewPeerDoWorkAsync(CacheNodeAdvertisment newPeer, ILogProvider log, CancellationToken exitToken)
{
- _ = newPeer ?? throw new ArgumentNullException(nameof(newPeer));
-
- //Setup client
+ ArgumentNullException.ThrowIfNull(newPeer);
+
FBMClient client = _clientFactory.CreateClient();
- //Add peer to monitor
- _peerAdapter.OnPeerListenerAttached(newPeer);
+ /*
+ * Notify discovery that we will be listening to this peer
+ *
+ * This exists so when a new discovery happens, the work loop will produce
+ * the difference of new peers to existing peers, and we can connect to them.
+ * Avoiding infinite connections to the same peer.
+ */
+ _sysState.PeerDiscovery.OnPeerListenerAttached(newPeer);
Interlocked.Increment(ref _openConnections);
@@ -165,12 +166,12 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
log.Information("Establishing replication connection to peer {server}...", newPeer.NodeId);
//Connect to the server
- await client.ConnectToCacheAsync(newPeer, _nodeConfig.Config, exitToken);
+ await client.ConnectToCacheAsync(newPeer, _sysState.NodeConfig, exitToken);
log.Information("Connected to {server}, starting queue listeners", newPeer.NodeId);
//Start worker tasks
- List<Task> workerTasks = new();
+ List<Task> workerTasks = [];
for (int i = 0; i < Environment.ProcessorCount; i++)
{
@@ -187,6 +188,10 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
//Disconnect client gracefully
await client.DisconnectAsync(CancellationToken.None);
}
+ catch(FBMServerNegiationException fbm)
+ {
+ log.Error("Failed to negotiate buffer configuration, check your cache memory configuration. Error:{err}", fbm.Message);
+ }
catch (InvalidResponseException ie)
{
//See if the plugin is unloading
@@ -218,7 +223,15 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
}
catch (Exception ex)
{
- log.Warn("Lost connection to peer {h}\n {m}", newPeer.NodeId, ex);
+ //Avoid call stacks unless debug or higher logging levels
+ if (log.IsEnabled(LogLevel.Debug))
+ {
+ log.Warn("Lost connection to peer {h}\n {m}", newPeer.NodeId, ex);
+ }
+ else
+ {
+ log.Warn("Lost connection to peer {h}\n {m}", newPeer.NodeId, ex.Message);
+ }
}
finally
{
@@ -226,8 +239,8 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
client.Dispose();
- //Notify monitor of disconnect
- _peerAdapter.OnPeerListenerDetatched(newPeer);
+ //Notify monitor of disconnect to make it available again later
+ _sysState.PeerDiscovery.OnPeerListenerDetatched(newPeer);
}
}
@@ -259,7 +272,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
return;
case "deleted":
//Delete the object from the store
- await _cacheStore.DeleteItemAsync(changedObject.CurrentId, CancellationToken.None);
+ await _sysState.InternalStore.DeleteItemAsync(changedObject.CurrentId, CancellationToken.None);
break;
case "modified":
//Reload the record from the store
@@ -287,7 +300,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
modRequest.WriteHeader(ObjectId, string.IsNullOrWhiteSpace(newId) ? objectId : newId);
//Make request
- using FBMResponse response = await client.SendAsync(modRequest, GetItemTimeout, cancellation);
+ using FBMResponse response = await client.SendAsync(modRequest, CacheConstants.Delays.CacheSyncGetItemTimeout, cancellation);
response.ThrowIfNotSet();
@@ -297,7 +310,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
if (ResponseCodes.Okay.Equals(status, StringComparison.Ordinal))
{
//Update the record
- await _cacheStore.AddOrUpdateBlobAsync(objectId, newId, static (t) => t.ResponseBody, response, cancellation);
+ await _sysState.InternalStore.AddOrUpdateBlobAsync(objectId, newId, static (t) => t.ResponseBody, response, cancellation);
log.Debug("Updated object {id}", objectId);
}
else
diff --git a/plugins/ObjectCacheServer/src/Clustering/CachePeerMonitor.cs b/plugins/ObjectCacheServer/src/Clustering/CachePeerMonitor.cs
index c49a54b..c3fbd8e 100644
--- a/plugins/ObjectCacheServer/src/Clustering/CachePeerMonitor.cs
+++ b/plugins/ObjectCacheServer/src/Clustering/CachePeerMonitor.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2023 Vaughn Nugent
+* Copyright (c) 2024 Vaughn Nugent
*
* Library: VNLib
* Package: ObjectCacheServer
@@ -29,7 +29,6 @@ using System.Collections.Generic;
using VNLib.Utils;
using VNLib.Utils.Extensions;
-using VNLib.Plugins;
namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
{
@@ -37,12 +36,9 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
internal sealed class CachePeerMonitor : VnDisposeable, IPeerMonitor
{
- private readonly LinkedList<ICachePeer> peers = new();
+ private readonly List<ICachePeer> peers = new();
private readonly ManualResetEvent newPeerTrigger = new (false);
- public CachePeerMonitor(PluginBase plugin)
- { }
-
/// <summary>
/// Waits for new peers to connect to the server
/// </summary>
@@ -70,7 +66,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
//When a peer is connected we can add it to the list so the replication manager can see it
lock(peers)
{
- peers.AddLast(peer);
+ peers.Add(peer);
}
//Trigger monitor when change occurs
@@ -92,6 +88,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
protected override void Free()
{
+ peers.Clear();
newPeerTrigger.Dispose();
}
}
diff --git a/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs b/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs
index 6475f9c..b9a220d 100644
--- a/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs
+++ b/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2023 Vaughn Nugent
+* Copyright (c) 2024 Vaughn Nugent
*
* Library: VNLib
* Package: ObjectCacheServer
@@ -24,14 +24,11 @@
using System;
using System.Linq;
-using System.Net.Http;
using System.Threading;
-using System.Net.Sockets;
using System.Threading.Tasks;
using System.Collections.Generic;
using VNLib.Utils.Logging;
-using VNLib.Plugins;
using VNLib.Plugins.Extensions.Loading;
using VNLib.Data.Caching.Extensions;
using VNLib.Data.Caching.Extensions.Clustering;
@@ -43,54 +40,19 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
* This class is responsible for resolving and discovering peer nodes in the cluster network.
*/
- internal sealed class PeerDiscoveryManager : IAsyncBackgroundWork, ICachePeerAdapter
+ internal sealed class PeerDiscoveryManager(
+ CacheNodeConfiguration config,
+ ServerClusterConfig clusterConf,
+ CachePeerMonitor Monitor,
+ ILogProvider Log,
+ bool IsDebug,
+ bool HasWellKnown
+ )
+ : IAsyncBackgroundWork, ICachePeerAdapter
{
- private const string LOG_SCOPE_NAME = "DISC";
- /*
- * The initial discovery delay. This allows for the server to initialize before
- * starting the discovery process. This will probably be a shorter delay
- * than a usual discovery interval.
- */
- private static readonly TimeSpan InitialDelay = TimeSpan.FromSeconds(15);
- private static readonly TimeSpan WhenWellKnownResolveFailed = TimeSpan.FromSeconds(20);
-
-
- private readonly List<CacheNodeAdvertisment> _connectedPeers;
- private readonly NodeConfig Config;
- private readonly CachePeerMonitor Monitor;
- private readonly ILogProvider Log;
- private readonly bool IsDebug;
- private readonly bool HasWellKnown;
-
- public PeerDiscoveryManager(PluginBase plugin)
- {
- //Get config
- Config = plugin.GetOrCreateSingleton<NodeConfig>();
-
- //Get the known peers array from config, its allowed to be null for master nodes
- IConfigScope? config = plugin.TryGetConfig("known_peers");
- string[] kownPeers = config?.Deserialze<string[]>() ?? Array.Empty<string>();
-
- //Add known peers to the monitor
- Config.Config.WithInitialPeers(kownPeers.Select(static s => new Uri(s)));
-
- HasWellKnown = kownPeers.Length > 0;
-
- //Get the peer monitor
- Monitor = plugin.GetOrCreateSingleton<CachePeerMonitor>();
-
- _connectedPeers = new();
-
- //Create scoped logger
- Log = plugin.Log.CreateScope(LOG_SCOPE_NAME);
-
- Log.Information("Inital peer nodes: {nodes}", kownPeers);
- //Setup discovery error handler
- Config.Config.WithErrorHandler(new ErrorHandler(Log));
-
- IsDebug = plugin.IsDebug();
- }
+ private readonly List<CacheNodeAdvertisment> _connectedPeers = [];
+ private readonly VNCacheClusterManager clusterMan = new(config);
async Task IAsyncBackgroundWork.DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken)
{
@@ -103,12 +65,11 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
//Start the change listener
Task watcher = WatchForPeersAsync(exitToken);
- Log.Information("Node discovery worker started, waiting for {idel} to start initial discovery", InitialDelay);
+ Log.Information("Node discovery worker started, waiting for {idel} to start initial discovery", CacheConstants.Delays.InitialDiscovery);
try
- {
- //Wait for the initial delay
- await Task.Delay(InitialDelay, exitToken);
+ {
+ await Task.Delay(CacheConstants.Delays.InitialDiscovery, exitToken);
Log.Debug("Begining discovery loop");
@@ -123,26 +84,32 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
Log.Debug("Begining node discovery");
}
- //Resolve all known peers
- CacheNodeAdvertisment[] wellKnown = await Config.Config.ResolveWellKnownAsync(exitToken);
- wellKnownFailed = wellKnown.Length == 0;
+ /*
+ * On every loop we will need to resolve well-known servers incase they go down
+ * or change. There probably should be some more advanced logic and caching here.
+ *
+ * Node may not have any well-known nodes, so we need to check for that.
+ */
+ CacheNodeAdvertisment[] wellKnown = HasWellKnown ?
+ await clusterMan.ResolveWellKnownAsync(exitToken) :
+ Array.Empty<CacheNodeAdvertisment>();
//Use the monitor to get the initial peers
IEnumerable<CacheNodeAdvertisment> ads = GetMonitorAds();
- //Combine well-known with new connected peers
+ //Combine well-known peers that are currently connected to this server
CacheNodeAdvertisment[] allAds = ads.Union(wellKnown).ToArray();
if (allAds.Length > 0)
{
- //Discover all known nodes
- await Config.Config.DiscoverNodesAsync(allAds, exitToken);
+ //Build the discovery map from all the known nodes to find all known nodes in the entire cluster
+ await clusterMan.DiscoverNodesAsync(allAds, exitToken);
}
//Log the discovered nodes if verbose logging is enabled
if (IsDebug)
{
- CacheNodeAdvertisment[] found = Config.Config.NodeCollection.GetAllNodes();
+ CacheNodeAdvertisment[] found = clusterMan.DiscoveredNodes.GetAllNodes();
Log.Debug("Discovered {count} nodes\n\t{nodes}", found.Length, found.Select(static s => s.NodeId));
}
@@ -168,16 +135,16 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
{
if (IsDebug)
{
- Log.Debug("Well known node discovery failed, waiting for {idel} before retrying", WhenWellKnownResolveFailed);
+ Log.Debug("Well known node discovery failed, waiting for {idel} before retrying", CacheConstants.Delays.WellKnownResolveFailed);
}
//Wait for shorter duration
- await Task.Delay(WhenWellKnownResolveFailed, exitToken);
+ await Task.Delay(CacheConstants.Delays.WellKnownResolveFailed, exitToken);
}
else
{
//Delay the next discovery
- await Task.Delay(Config.DiscoveryInterval, exitToken);
+ await Task.Delay(clusterConf.DiscoveryInterval, exitToken);
}
}
}
@@ -188,7 +155,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
}
finally
{
-
+ Monitor.Dispose();
}
//Wait for the watcher to exit
@@ -197,10 +164,11 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
private IEnumerable<CacheNodeAdvertisment> GetMonitorAds()
{
+ string selfId = (clusterMan.Config as CacheNodeConfiguration)!.NodeId;
return Monitor.GetAllPeers()
.Where(static p => p.Advertisment != null)
//Without us
- .Where(n => n.NodeId != Config.Config.NodeId)
+ .Where(n => !string.Equals(n.NodeId, selfId, StringComparison.OrdinalIgnoreCase))
.Select(static p => p.Advertisment!);
}
@@ -222,7 +190,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
//Use the monitor to get the initial peers
IEnumerable<CacheNodeAdvertisment> ads = GetMonitorAds();
- ((NodeDiscoveryCollection)Config.Config.NodeCollection).AddManualNodes(ads);
+ clusterMan.AddManualNodes(ads);
}
}
catch (OperationCanceledException)
@@ -239,7 +207,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
lock (_connectedPeers)
{
//Get all discovered peers
- CacheNodeAdvertisment[] peers = Config.Config.NodeCollection.GetAllNodes();
+ CacheNodeAdvertisment[] peers = clusterMan.DiscoveredNodes.GetAllNodes();
//Get the difference between the discovered peers and the connected peers
return peers.Except(_connectedPeers).ToArray();
@@ -265,31 +233,5 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
_connectedPeers.Remove(peer);
}
}
-
-
- private sealed record class ErrorHandler(ILogProvider Logger) : ICacheDiscoveryErrorHandler
- {
- public void OnDiscoveryError(CacheNodeAdvertisment errorNode, Exception ex)
- {
-
- if (ex is HttpRequestException hre)
- {
- if (hre.InnerException is SocketException se)
- {
- //traisnport failed
- Logger.Warn("Failed to connect to server {serv} because {err}", errorNode, se.Message);
- }
- else
- {
- Logger.Error("Failed to connect to node {n}\n{err}", errorNode, hre);
- }
- }
- else
- {
- Logger.Error("Failed to discover nodes from nodeid {nid}, with error\n{err}", errorNode, ex);
- }
-
- }
- }
}
}
diff --git a/plugins/ObjectCacheServer/src/Endpoints/CacheNegotationManager.cs b/plugins/ObjectCacheServer/src/Endpoints/CacheNegotationManager.cs
index d1591f8..99433e1 100644
--- a/plugins/ObjectCacheServer/src/Endpoints/CacheNegotationManager.cs
+++ b/plugins/ObjectCacheServer/src/Endpoints/CacheNegotationManager.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2023 Vaughn Nugent
+* Copyright (c) 2024 Vaughn Nugent
*
* Library: VNLib
* Package: ObjectCacheServer
@@ -45,7 +45,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
public bool IsPeer { get; set; }
}
- internal sealed class CacheNegotationManager
+ internal sealed class CacheNegotationManager(PluginBase plugin)
{
/*
* Cache keys are centralized and may be shared between all cache server nodes. This means
@@ -62,23 +62,11 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
* client could use trial and error to find the servers buffer configuration.
*/
- private static readonly TimeSpan AuthTokenExpiration = TimeSpan.FromSeconds(30);
+ private readonly string AudienceLocalServerId = Guid.NewGuid().ToString("N");
- private readonly string AudienceLocalServerId;
- private readonly NodeConfig _nodeConfig;
- private readonly CacheConfiguration _cacheConfig;
+ private readonly ObjectCacheSystemState _sysState = plugin.GetOrCreateSingleton<ObjectCacheSystemState>();
- public CacheNegotationManager(PluginBase plugin)
- {
- //Get node configuration
- _nodeConfig = plugin.GetOrCreateSingleton<NodeConfig>();
-
- //Get the cache store configuration
- _cacheConfig = plugin.GetConfigForType<CacheStore>().Deserialze<CacheConfiguration>();
-
- AudienceLocalServerId = Guid.NewGuid().ToString("N");
- }
-
+ private CacheMemoryConfiguration CacheConfig => _sysState.MemoryConfiguration;
public bool IsClientNegotiationValid(string authToken, out ClientNegotiationState state)
{
@@ -88,12 +76,12 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
using JsonWebToken jwt = JsonWebToken.Parse(authToken);
//verify signature for client
- if (_nodeConfig.KeyStore.VerifyJwt(jwt, false))
+ if (_sysState.KeyStore.VerifyJwt(jwt, false))
{
//Validated as normal client
}
//May be signed by a cache server
- else if (_nodeConfig.KeyStore.VerifyJwt(jwt, true))
+ else if (_sysState.KeyStore.VerifyJwt(jwt, true))
{
//Set peer and verified flag since the another cache server signed the request
state.IsPeer = true;
@@ -117,16 +105,16 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
return true;
}
- public JsonWebToken ConfirmCLientNegotiation(ClientNegotiationState state, IPAddress clientIp, DateTimeOffset now)
+ public JsonWebToken ConfirmClientNegotiation(ClientNegotiationState state, IPAddress clientIp, DateTimeOffset now)
{
//Verified, now we can create an auth message with a short expiration
JsonWebToken auth = new();
- auth.WriteHeader(_nodeConfig.KeyStore.GetJwtHeader());
+ auth.WriteHeader(_sysState.KeyStore.GetJwtHeader());
auth.InitPayloadClaim()
.AddClaim("aud", AudienceLocalServerId)
.AddClaim("iat", now.ToUnixTimeSeconds())
- .AddClaim("exp", now.Add(AuthTokenExpiration).ToUnixTimeSeconds())
+ .AddClaim("exp", now.Add(CacheConstants.ClientAuthTokenExpiration).ToUnixTimeSeconds())
.AddClaim("nonce", RandomHash.GetRandomBase32(8))
.AddClaim("chl", state.Challenge!)
//Set the ispeer flag if the request was signed by a cache server
@@ -136,24 +124,29 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
//Set ip address
.AddClaim("ip", clientIp.ToString())
//Add negotiaion args
- .AddClaim(FBMClient.REQ_HEAD_BUF_QUERY_ARG, _cacheConfig.MaxHeaderBufferSize)
- .AddClaim(FBMClient.REQ_RECV_BUF_QUERY_ARG, _cacheConfig.MaxRecvBufferSize)
- .AddClaim(FBMClient.REQ_MAX_MESS_QUERY_ARG, _cacheConfig.MaxMessageSize)
+ .AddClaim(FBMClient.REQ_HEAD_BUF_QUERY_ARG, CacheConfig.MaxHeaderBufferSize)
+ .AddClaim(FBMClient.REQ_RECV_BUF_QUERY_ARG, CacheConfig.MaxRecvBufferSize)
+ .AddClaim(FBMClient.REQ_MAX_MESS_QUERY_ARG, CacheConfig.MaxMessageSize)
.CommitClaims();
//Sign the auth message from our private key
- _nodeConfig.KeyStore.SignJwt(auth);
+ _sysState.KeyStore.SignJwt(auth);
return auth;
}
- public bool ValidateUpgrade(string upgradeToken, string tokenSignature, DateTimeOffset now, IPAddress connectionIp, ref string? nodeId, ref bool isPeer)
+ public bool ValidateUpgrade(string? upgradeToken, string? tokenSignature, DateTimeOffset now, IPAddress connectionIp, ref string? nodeId, ref bool isPeer)
{
+ if(string.IsNullOrWhiteSpace(upgradeToken) || string.IsNullOrWhiteSpace(tokenSignature))
+ {
+ return false;
+ }
+
//Parse jwt
using JsonWebToken jwt = JsonWebToken.Parse(upgradeToken);
//verify signature against the cache public key, since this server must have signed it
- if (!_nodeConfig.KeyStore.VerifyCachePeer(jwt))
+ if (!_sysState.KeyStore.VerifyCachePeer(jwt))
{
return false;
}
@@ -175,7 +168,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
}
//Check node ip address matches if required
- if (_nodeConfig.VerifyIp)
+ if (_sysState.ClusterConfig.VerifyIp)
{
if (!doc.RootElement.TryGetProperty("ip", out JsonElement ipEl))
{
@@ -201,7 +194,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
}
//Verify token signature against a fellow cache public key
- return _nodeConfig.KeyStore.VerifyUpgradeToken(tokenSignature, upgradeToken, isPeer);
+ return _sysState.KeyStore.VerifyUpgradeToken(tokenSignature, upgradeToken, isPeer);
}
}
}
diff --git a/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs b/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs
index 816e6c3..8368d3a 100644
--- a/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs
+++ b/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2023 Vaughn Nugent
+* Copyright (c) 2024 Vaughn Nugent
*
* Library: VNLib
* Package: ObjectCacheServer
@@ -53,13 +53,13 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
internal sealed class ConnectEndpoint : ResourceEndpointBase
{
- internal const string LOG_SCOPE_NAME = "CONEP";
+ private readonly ObjectCacheSystemState _sysState;
-
- private readonly ICacheEventQueueManager PubSubManager;
- private readonly IPeerMonitor Peers;
- private readonly BlobCacheListener<IPeerEventQueue> Store;
- private readonly NodeConfig NodeConfiguration;
+ private PeerEventQueueManager PubSubManager => _sysState.PeerEventQueue;
+ private CachePeerMonitor Peers => _sysState.PeerMonitor;
+ private BlobCacheListener<IPeerEventQueue> Listener => _sysState.Listener;
+ private ServerClusterConfig ClusterConfiguration => _sysState.ClusterConfig;
+
private readonly CacheNegotationManager AuthManager;
private uint _connectedClients;
@@ -72,7 +72,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
/// <summary>
/// The cache store configuration
/// </summary>
- public CacheConfiguration CacheConfig { get; }
+ public CacheMemoryConfiguration CacheConfig => _sysState.MemoryConfiguration;
//Loosen up protection settings
///<inheritdoc/>
@@ -83,24 +83,11 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
public ConnectEndpoint(PluginBase plugin)
{
- //Get node configuration
- NodeConfiguration = plugin.GetOrCreateSingleton<NodeConfig>();
+ _sysState = plugin.GetOrCreateSingleton<ObjectCacheSystemState>();
//Init from config and create a new log scope
- InitPathAndLog(NodeConfiguration.ConnectPath, plugin.Log.CreateScope(LOG_SCOPE_NAME));
-
- //Setup pub/sub manager
- PubSubManager = plugin.GetOrCreateSingleton<CacheEventQueueManager>();
-
- //Get peer monitor
- Peers = plugin.GetOrCreateSingleton<CachePeerMonitor>();
-
- //Init the cache store
- Store = plugin.GetOrCreateSingleton<CacheStore>().Listener;
-
- //Get the cache store configuration
- CacheConfig = plugin.GetConfigForType<CacheStore>().Deserialze<CacheConfiguration>();
-
+ InitPathAndLog(ClusterConfiguration.ConnectPath, plugin.Log.CreateScope(CacheConstants.LogScopes.ConnectionEndpoint));
+
//Get the auth manager
AuthManager = plugin.GetOrCreateSingleton<CacheNegotationManager>();
}
@@ -127,6 +114,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
{
//Parse jwt from authoriation
string? jwtAuth = entity.Server.Headers[HttpRequestHeader.Authorization];
+
if (string.IsNullOrWhiteSpace(jwtAuth))
{
return VirtualClose(entity, HttpStatusCode.Forbidden);
@@ -149,26 +137,34 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
}
//Verified, now we can create an auth message with a short expiration
- using JsonWebToken auth = AuthManager.ConfirmCLientNegotiation(state, entity.TrustedRemoteIp, entity.RequestedTimeUtc);
+ using JsonWebToken auth = AuthManager.ConfirmClientNegotiation(state, entity.TrustedRemoteIp, entity.RequestedTimeUtc);
- //Close response
+ //Close response by sending a copy of the signed token
entity.CloseResponse(HttpStatusCode.OK, ContentType.Text, auth.DataBuffer);
return VfReturnType.VirtualSkip;
}
protected override VfReturnType WebsocketRequested(HttpEntity entity)
{
+ /*
+ * Check to see if any more connections are allowed,
+ * otherwise deny the connection
+ *
+ * This is done here to prevent the server from being overloaded
+ * on a new connection. It would be ideal to not grant new tokens
+ * but malicious clients could cache a bunch of tokens and use them
+ * later, exhausting resources.
+ */
+ if(_connectedClients >= ClusterConfiguration.MaxConcurrentConnections)
+ {
+ return VirtualClose(entity, HttpStatusCode.ServiceUnavailable);
+ }
+
//Parse jwt from authorization
string? jwtAuth = entity.Server.Headers[HttpRequestHeader.Authorization];
string? clientSignature = entity.Server.Headers[FBMDataCacheExtensions.X_UPGRADE_SIG_HEADER];
string? optionalDiscovery = entity.Server.Headers[FBMDataCacheExtensions.X_NODE_DISCOVERY_HEADER];
- //Not null
- if (string.IsNullOrWhiteSpace(jwtAuth) || string.IsNullOrWhiteSpace(clientSignature))
- {
- return VfReturnType.Forbidden;
- }
-
string? nodeId = null;
bool isPeer = false;
@@ -178,17 +174,17 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
return VirtualClose(entity, HttpStatusCode.Unauthorized);
}
- CacheNodeAdvertisment? discoveryAd = null;
-
/*
* If the client is a peer server, it may offer a signed advertisment
* that this node will have the duty of making available to other peers
* if it is valid
*/
- if (isPeer && !string.IsNullOrWhiteSpace(optionalDiscovery))
+ CacheNodeAdvertisment? discoveryAd = null;
+
+ if (isPeer)
{
- discoveryAd = NodeConfiguration.KeyStore.VerifyPeerAdvertisment(optionalDiscovery);
+ discoveryAd = _sysState.KeyStore.VerifyPeerAdvertisment(optionalDiscovery);
}
WsUserState state;
@@ -196,11 +192,10 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
try
{
//Get query config suggestions from the client
- string recvBufCmd = entity.QueryArgs[FBMClient.REQ_RECV_BUF_QUERY_ARG];
- string maxHeaderCharCmd = entity.QueryArgs[FBMClient.REQ_HEAD_BUF_QUERY_ARG];
- string maxMessageSizeCmd = entity.QueryArgs[FBMClient.REQ_MAX_MESS_QUERY_ARG];
+ string? recvBufCmd = entity.QueryArgs.GetValueOrDefault(FBMClient.REQ_RECV_BUF_QUERY_ARG);
+ string? maxHeaderCharCmd = entity.QueryArgs.GetValueOrDefault(FBMClient.REQ_HEAD_BUF_QUERY_ARG);
+ string? maxMessageSizeCmd = entity.QueryArgs.GetValueOrDefault(FBMClient.REQ_MAX_MESS_QUERY_ARG);
- //Parse recv buffer size
int recvBufSize = int.TryParse(recvBufCmd, out int rbs) ? rbs : CacheConfig.MinRecvBufferSize;
int maxHeadBufSize = int.TryParse(maxHeaderCharCmd, out int hbs) ? hbs : CacheConfig.MinHeaderBufferSize;
int maxMessageSize = int.TryParse(maxMessageSizeCmd, out int mxs) ? mxs : CacheConfig.MaxMessageSize;
@@ -253,9 +248,8 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
Peers.OnPeerConnected(state);
//Register plugin exit token to cancel the connected socket
- CancellationTokenRegistration reg = this.GetPlugin().UnloadToken.Register(wss.CancelAll);
-
- //Inc connected count
+ await using CancellationTokenRegistration reg = this.GetPlugin().UnloadToken.Register(wss.CancelAll);
+
Interlocked.Increment(ref _connectedClients);
try
@@ -280,7 +274,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
try
{
//Begin listening for messages with a queue
- await Store.ListenAsync(wss, queue, args);
+ await Listener.ListenAsync(wss, queue, args);
}
finally
{
@@ -291,7 +285,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
else
{
//Begin listening for messages without a queue
- await Store.ListenAsync(wss, null!, args);
+ await Listener.ListenAsync(wss, null!, args);
}
}
catch (OperationCanceledException)
@@ -303,15 +297,13 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
}
catch (Exception ex)
{
- Log.Debug(ex);
+ //If debug logging is enabled print a more detailed error message
+ Log.Error("An error occured on websocket connection: node {con} -> {error}", state.NodeId, ex.Message);
+ Log.Debug("Websocket connection error: node {con}\n{error}", state.NodeId, ex);
}
-
- //Dec connected count
+
Interlocked.Decrement(ref _connectedClients);
- //Unregister the token
- reg.Unregister();
-
//Notify monitor of disconnect
Peers.OnPeerDisconnected(state);
diff --git a/plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs b/plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs
index 7d376b8..8038b70 100644
--- a/plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs
+++ b/plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2023 Vaughn Nugent
+* Copyright (c) 2024 Vaughn Nugent
*
* Library: VNLib
* Package: ObjectCacheServer
@@ -40,25 +40,27 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
{
internal sealed class PeerDiscoveryEndpoint : ResourceEndpointBase
{
- private readonly IPeerMonitor PeerMonitor;
- private readonly NodeConfig Config;
+ private readonly ObjectCacheSystemState _sysState;
+
+ private CacheAuthKeyStore KeyStore => _sysState.KeyStore;
+
+ private CachePeerMonitor PeerMonitor => _sysState.PeerMonitor;
- //Loosen up protection settings
///<inheritdoc/>
protected override ProtectionSettings EndpointProtectionSettings { get; } = new()
{
- DisableSessionsRequired = true
+ /*
+ * Sessions will not be used or required for this endpoint.
+ * We should also assume the session system is not even loaded
+ */
+ DisableSessionsRequired = true
};
public PeerDiscoveryEndpoint(PluginBase plugin)
{
- //Get the peer monitor
- PeerMonitor = plugin.GetOrCreateSingleton<CachePeerMonitor>();
+ _sysState = plugin.GetOrCreateSingleton<ObjectCacheSystemState>();
- //Get the node config
- Config = plugin.GetOrCreateSingleton<NodeConfig>();
-
- InitPathAndLog(Config.DiscoveryPath, plugin.Log);
+ InitPathAndLog(_sysState.ClusterConfig.DiscoveryPath!, plugin.Log);
}
protected override VfReturnType Get(HttpEntity entity)
@@ -68,36 +70,41 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
if(string.IsNullOrWhiteSpace(authToken))
{
- entity.CloseResponse(HttpStatusCode.Unauthorized);
- return VfReturnType.VirtualSkip;
+ return VirtualClose(entity, HttpStatusCode.Unauthorized);
}
string subject = string.Empty;
string challenge = string.Empty;
- //Parse auth token
- using(JsonWebToken jwt = JsonWebToken.Parse(authToken))
+ try
{
+ //Parse auth token
+ using JsonWebToken jwt = JsonWebToken.Parse(authToken);
+
//try to verify against cache node first
- if (!Config.KeyStore.VerifyJwt(jwt, true))
+ if (!KeyStore.VerifyJwt(jwt, true))
{
//failed...
//try to verify against client key
- if (!Config.KeyStore.VerifyJwt(jwt, false))
+ if (!KeyStore.VerifyJwt(jwt, false))
{
//invalid token
- entity.CloseResponse(HttpStatusCode.Unauthorized);
- return VfReturnType.VirtualSkip;
+ return VirtualClose(entity, HttpStatusCode.Unauthorized);
}
}
using JsonDocument payload = jwt.GetPayload();
//Get client info to pass back
- subject = payload.RootElement.TryGetProperty("sub", out JsonElement subEl) ? subEl.GetString() ?? string.Empty : string.Empty;
+ subject = payload.RootElement.TryGetProperty("sub", out JsonElement subEl) ? subEl.GetString() ?? string.Empty : string.Empty;
challenge = payload.RootElement.GetProperty("chl").GetString() ?? string.Empty;
}
+ catch (FormatException)
+ {
+ //If tokens are invalid format, let the client know instead of a server error
+ return VfReturnType.BadRequest;
+ }
//Valid key, get peer list to send to client
CacheNodeAdvertisment[] peers = PeerMonitor.GetAllPeers()
@@ -109,10 +116,10 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
using JsonWebToken response = new();
//set header from cache config
- response.WriteHeader(Config.KeyStore.GetJwtHeader());
+ response.WriteHeader(KeyStore.GetJwtHeader());
response.InitPayloadClaim()
- .AddClaim("iss", Config.Config.NodeId)
+ .AddClaim("iss", _sysState.NodeConfig.NodeId)
//Audience is the requestor id
.AddClaim("sub", subject)
.AddClaim("iat", entity.RequestedTimeUtc.ToUnixTimeSeconds())
@@ -122,10 +129,9 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
.AddClaim("chl", challenge)
.CommitClaims();
- //Sign the response
- Config.KeyStore.SignJwt(response);
-
- //Send response to client
+
+ KeyStore.SignJwt(response);
+
entity.CloseResponse(HttpStatusCode.OK, Net.Http.ContentType.Text, response.DataBuffer);
return VfReturnType.VirtualSkip;
}
diff --git a/plugins/ObjectCacheServer/src/Endpoints/WellKnownEndpoint.cs b/plugins/ObjectCacheServer/src/Endpoints/WellKnownEndpoint.cs
index 87a471b..18855e3 100644
--- a/plugins/ObjectCacheServer/src/Endpoints/WellKnownEndpoint.cs
+++ b/plugins/ObjectCacheServer/src/Endpoints/WellKnownEndpoint.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2023 Vaughn Nugent
+* Copyright (c) 2024 Vaughn Nugent
*
* Library: VNLib
* Package: ObjectCacheServer
@@ -59,13 +59,13 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
public WellKnownEndpoint(PluginBase plugin)
{
//Get the node config
- NodeConfig nodeConfig = plugin.GetOrCreateSingleton<NodeConfig>();
+ ObjectCacheSystemState conf = plugin.GetOrCreateSingleton<ObjectCacheSystemState>();
//serialize the config, discovery may not be enabled
- _advertisment = nodeConfig.Config.Advertisment;
- _keyStore = nodeConfig.KeyStore;
+ _advertisment = conf.NodeConfig.Advertisment;
+ _keyStore = conf.KeyStore;
- InitPathAndLog(nodeConfig.WellKnownPath, plugin.Log);
+ InitPathAndLog(conf.ClusterConfig.WellKnownPath, plugin.Log);
}
protected override VfReturnType Get(HttpEntity entity)
diff --git a/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs b/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs
index aada787..42bd0c7 100644
--- a/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs
+++ b/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2023 Vaughn Nugent
+* Copyright (c) 2024 Vaughn Nugent
*
* Library: VNLib
* Package: ObjectCacheServer
@@ -23,13 +23,11 @@
*/
using System;
-using System.Threading;
using System.Collections.Generic;
using VNLib.Plugins;
-using VNLib.Utils.Memory;
+using VNLib.Utils;
using VNLib.Utils.Logging;
-using VNLib.Utils.Memory.Diagnostics;
using VNLib.Plugins.Extensions.Loading;
using VNLib.Plugins.Extensions.Loading.Routing;
@@ -43,38 +41,15 @@ namespace VNLib.Data.Caching.ObjectCache.Server
{
public override string PluginName => "ObjectCache.Service";
- private readonly Lazy<IUnmangedHeap> _cacheHeap;
-
- internal IUnmangedHeap ListenerHeap => _cacheHeap.Value;
-
- public ObjectCacheServerEntry()
- {
- //Init heap
- _cacheHeap = new Lazy<IUnmangedHeap>(InitializeHeap, LazyThreadSafetyMode.PublicationOnly);
- }
-
- internal IUnmangedHeap InitializeHeap()
- {
- //Create default heap
- IUnmangedHeap _heap = MemoryUtil.InitializeNewHeapForProcess();
- try
- {
- //If the plugin is in debug mode enable heap tracking
- return this.IsDebug() ? new TrackedHeapWrapper(_heap, true) : _heap;
- }
- catch
- {
- _heap.Dispose();
- throw;
- }
- }
+ ObjectCacheSystemState? sysState;
protected override void OnLoad()
{
try
{
- //Get the node configuration first
- NodeConfig config = this.GetOrCreateSingleton<NodeConfig>();
+ //Initialize the cache node builder
+ sysState = this.GetOrCreateSingleton<ObjectCacheSystemState>();
+ sysState.Initialize();
//Route well-known endpoint
this.Route<WellKnownEndpoint>();
@@ -85,8 +60,8 @@ namespace VNLib.Data.Caching.ObjectCache.Server
//We must initialize the replication manager
_ = this.GetOrCreateSingleton<CacheNodeReplicationMaanger>();
- //Setup discovery endpoint
- if(!string.IsNullOrWhiteSpace(config.DiscoveryPath))
+ //Setup discovery endpoint only if the user enabled clustering
+ if(!string.IsNullOrWhiteSpace(sysState.ClusterConfig.DiscoveryPath))
{
this.Route<PeerDiscoveryEndpoint>();
}
@@ -101,18 +76,34 @@ namespace VNLib.Data.Caching.ObjectCache.Server
protected override void OnUnLoad()
{
- //dispose heap if initialized
- if(_cacheHeap.IsValueCreated)
- {
- _cacheHeap.Value.Dispose();
- }
-
Log.Information("Plugin unloaded");
}
protected override void ProcessHostCommand(string cmd)
{
- throw new NotImplementedException();
+ if(string.IsNullOrWhiteSpace(cmd))
+ {
+ return;
+ }
+
+ ArgumentList al = new(cmd.Split(" "));
+
+ if(al.Count == 0)
+ {
+ Log.Warn("Invalid command");
+ return;
+ }
+
+ switch (al[0].ToLower(null))
+ {
+ case "memstats":
+ sysState?.LogMemoryStats();
+ break;
+
+ default:
+ Log.Warn("Invalid command");
+ break;
+ }
}
}
}
diff --git a/plugins/ObjectCacheServer/src/ObjectCacheSystemState.cs b/plugins/ObjectCacheServer/src/ObjectCacheSystemState.cs
new file mode 100644
index 0000000..cd5bf1b
--- /dev/null
+++ b/plugins/ObjectCacheServer/src/ObjectCacheSystemState.cs
@@ -0,0 +1,331 @@
+/*
+* Copyright (c) 2024 Vaughn Nugent
+*
+* Library: VNLib
+* Package: ObjectCacheServer
+* File: ObjectCacheSystemState.cs
+*
+* ObjectCacheSystemState.cs is part of ObjectCacheServer which is
+* part of the larger VNLib collection of libraries and utilities.
+*
+* ObjectCacheServer is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* ObjectCacheServer is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+using System.Linq;
+using System.Net.Http;
+using System.Net.Sockets;
+using System.Threading;
+using System.Threading.Tasks;
+using System.Collections.Generic;
+
+using VNLib.Utils.Logging;
+using VNLib.Utils.Memory;
+using VNLib.Utils.Memory.Diagnostics;
+using VNLib.Net.Messaging.FBM;
+using VNLib.Plugins;
+using VNLib.Plugins.Extensions.Loading;
+
+using VNLib.Data.Caching.Extensions.Clustering;
+using VNLib.Data.Caching.ObjectCache.Server.Cache;
+using VNLib.Data.Caching.ObjectCache.Server.Clustering;
+
+namespace VNLib.Data.Caching.ObjectCache.Server
+{
+ /*
+ * The purpose of this class is to manage the state of the entire cache server.
+ * All configuration and state should be creatd and managed by this class. To make it
+ * easier to manage.
+ */
+ [ConfigurationName("cache")]
+ internal sealed class ObjectCacheSystemState(PluginBase plugin, IConfigScope config) : IDisposable
+ {
+ public BlobCacheListener<IPeerEventQueue> Listener { get; private set; } = null!;
+
+ public ICacheStore InternalStore { get; private set; } = null!;
+
+ /// <summary>
+ /// Used for miscellaneous shared memory allocations (like the cache listener)
+ /// </summary>
+ public IUnmangedHeap SharedCacheHeap { get; private set; } = null!;
+
+ /// <summary>
+ /// The plugin-wide, shared node configuration
+ /// </summary>
+ public ServerClusterConfig ClusterConfig { get; } = plugin.GetOrCreateSingleton<ServerClusterConfig>();
+
+ /// <summary>
+ /// The system wide cache authenticator
+ /// </summary>
+ public CacheAuthKeyStore KeyStore { get; } = new(plugin);
+
+ /// <summary>
+ /// The system cache node configuration
+ /// </summary>
+ public CacheNodeConfiguration NodeConfig { get; private set; }
+
+ /// <summary>
+ /// The peer discovery manager
+ /// </summary>
+ public PeerDiscoveryManager PeerDiscovery { get; private set; } = null!;
+
+ /// <summary>
+ /// System wide peer monitor
+ /// </summary>
+ public CachePeerMonitor PeerMonitor { get; } = new();
+
+ public CacheMemoryConfiguration MemoryConfiguration { get; } = config.Deserialze<CacheMemoryConfiguration>();
+
+ /// <summary>
+ /// The system wide peer event queue manager
+ /// </summary>
+ public PeerEventQueueManager PeerEventQueue { get; private set; }
+
+ private ICacheMemoryManagerFactory _cacheMemManager;
+
+ void IDisposable.Dispose()
+ {
+ SharedCacheHeap.Dispose();
+ Listener.Dispose();
+ }
+
+ /// <summary>
+ /// Initializes the cache node state
+ /// </summary>
+ public void Initialize()
+ {
+ CacheMemoryConfiguration cacheConf = MemoryConfiguration;
+
+ ArgumentOutOfRangeException.ThrowIfLessThan(cacheConf.MaxCacheEntries, 2u);
+
+ //Suggestion
+ if (cacheConf.MaxCacheEntries < 200)
+ {
+ plugin.Log.Information("Suggestion: You may want a larger cache size, you have less than 200 items in cache");
+ }
+
+ LogMemConfiguration();
+
+ PeerEventQueue = new(plugin, ClusterConfig);
+
+ //If the plugin is in debug mode enable heap tracking
+ SharedCacheHeap = plugin.IsDebug() ?
+ new TrackedHeapWrapper(MemoryUtil.InitializeNewHeapForProcess(), true)
+ : MemoryUtil.InitializeNewHeapForProcess();
+
+ //Load node configuration first
+ (NodeConfig = ClusterConfig.BuildNodeConfig())
+ .WithAuthenticator(KeyStore); //Also pass the key store to the node config
+
+ ConfigurePeerDiscovery();
+
+ ConfigureCacheListener();
+ }
+
+ private void ConfigurePeerDiscovery()
+ {
+ //Get the known peers array from config, its allowed to be null for master nodes
+ IConfigScope? config = plugin.TryGetConfig("known_peers");
+ string[] kownPeers = config?.Deserialze<string[]>() ?? [];
+
+ ILogProvider discLogger = plugin.Log.CreateScope(CacheConstants.LogScopes.PeerDiscovery);
+
+ //Allow just origin nodes to be used as known peers
+ IEnumerable<Uri> peerUris = kownPeers.Select(static p =>
+ {
+ Uri bUri = new(p, UriKind.Absolute);
+ return bUri.LocalPath == "/" ? new Uri(bUri, CacheConstants.DefaultWellKnownPath) : bUri;
+ });
+
+ NodeConfig.WithInitialPeers(peerUris)
+ .WithErrorHandler(new ErrorHandler(discLogger));
+
+ discLogger.Information("Inital peer nodes: {nodes}", kownPeers);
+
+ PeerDiscovery = new PeerDiscoveryManager(
+ NodeConfig,
+ ClusterConfig,
+ PeerMonitor,
+ discLogger,
+ plugin.IsDebug(),
+ kownPeers.Length > 0
+ );
+
+ //Discovery manager needs to be scheduled for background work to run the discovery loop
+ _ = plugin.ObserveWork(PeerDiscovery, 10);
+ }
+
+ private void ConfigureCacheListener()
+ {
+ /*
+ * Allow loading external managed dll for a bucket-local memory manager
+ */
+ ICacheMemoryManagerFactory manager;
+
+ if (string.IsNullOrWhiteSpace(MemoryConfiguration.ExternLibPath))
+ {
+ //Get the memory manager
+ manager = plugin.GetOrCreateSingleton<BucketLocalManagerFactory>();
+ }
+ else
+ {
+ manager = plugin.CreateServiceExternal<ICacheMemoryManagerFactory>(MemoryConfiguration.ExternLibPath);
+ }
+
+ _cacheMemManager = manager;
+
+ CacheListenerPubQueue queue = new(plugin, PeerEventQueue);
+
+ //Must register background worker to listen for changes
+ _ = plugin.ObserveWork(queue, 150);
+
+ //Endpoint only allows for a single reader
+ Listener = new(
+ plugin.LoadMemoryCacheSystem(config, manager, MemoryConfiguration),
+ queue,
+ plugin.Log.CreateScope(CacheConstants.LogScopes.BlobCacheListener),
+ new SharedHeapFBMMemoryManager(SharedCacheHeap)
+ );
+
+ InternalStore = new CacheStore(Listener.Cache);
+ }
+
+ private void LogMemConfiguration()
+ {
+ const string CacheConfigTemplate =
+@"
+Cache Configuration:
+ Max memory: {max} Mb
+ Buckets: {bc}
+ Entries per-bucket: {mc}
+ HeapTracking: {ht}
+";
+
+ CacheMemoryConfiguration cacheConf = MemoryConfiguration;
+
+ //calculate the max memory usage
+ ulong maxByteSize = cacheConf.MaxCacheEntries * (ulong)cacheConf.BucketCount * (ulong)cacheConf.MaxMessageSize;
+
+ //Log the cache config
+ plugin.Log.Information(
+ CacheConfigTemplate,
+ maxByteSize / (1024 * 1000),
+ cacheConf.BucketCount,
+ cacheConf.MaxCacheEntries,
+ plugin.IsDebug()
+ );
+ }
+
+ public void LogMemoryStats()
+ {
+ if(SharedCacheHeap is TrackedHeapWrapper thw)
+ {
+ const string shStatTemplate =
+@" VNCache shared heap stats:
+ Current: {cur}kB
+ Blocks: {blks}
+ Max size: {max}kB
+";
+ HeapStatistics stats = thw.GetCurrentStats();
+ plugin.Log.Debug(
+ shStatTemplate,
+ stats.AllocatedBytes / 1024,
+ stats.AllocatedBlocks,
+ stats.MaxHeapSize / 1024
+ );
+
+ }
+
+ //Also print logs for the bucket local managers if they are enabled
+ if(_cacheMemManager is BucketLocalManagerFactory blmf)
+ {
+ blmf.LogHeapStats();
+ }
+ }
+
+ private sealed class ErrorHandler(ILogProvider Logger) : ICacheDiscoveryErrorHandler
+ {
+ public void OnDiscoveryError(CacheNodeAdvertisment errorNode, Exception ex)
+ => LogError(ex, errorNode.NodeId, errorNode.ConnectEndpoint);
+
+ public void OnDiscoveryError(Uri errorAddress, Exception ex)
+ => LogError(ex, null, errorAddress);
+
+ private void LogError(Exception ex, string? nodId, Uri? connectAddress)
+ {
+ //For logging purposes, use the node id if its available, otherwise use the address
+ if(nodId == null && connectAddress != null)
+ {
+ nodId = connectAddress.ToString();
+ }
+
+ if (ex is HttpRequestException hre)
+ {
+ if (hre.InnerException is SocketException se)
+ {
+ //transport failed
+ Logger.Warn("Failed to connect to server {serv} because {err}", nodId, se.Message);
+ }
+ else
+ {
+ Logger.Error("Failed to connect to node {n}\n{err}", nodId, hre);
+ }
+ }
+ if (ex is OperationCanceledException)
+ {
+ Logger.Error("Failed to discover nodes from nodeid {nid}, because the operation was canceled");
+ }
+ else if (ex is TimeoutException)
+ {
+ //Only log exception stack when in debug logging mode
+ Logger.Warn("Failed to discover nodes from nodeid {nid}, because a timeout occured", nodId);
+ }
+ else
+ {
+ //Only log exception stack when in debug logging mode
+ if (Logger.IsEnabled(LogLevel.Debug))
+ {
+ Logger.Error("Failed to discover nodes from nodeid {nid}, with error\n{err}", nodId, ex);
+ }
+ else
+ {
+ Logger.Error("Failed to discover nodes from nodeid {nid}, with error: {err}", nodId, ex.Message);
+ }
+ }
+ }
+ }
+
+ internal sealed class CacheStore(IBlobCacheTable table) : ICacheStore
+ {
+
+ ///<inheritdoc/>
+ ValueTask ICacheStore.AddOrUpdateBlobAsync<T>(string objectId, string? alternateId, ObjectDataGet<T> bodyData, T state, CancellationToken token)
+ {
+ return table.AddOrUpdateObjectAsync(objectId, alternateId, bodyData, state, default, token);
+ }
+
+ ///<inheritdoc/>
+ void ICacheStore.Clear()
+ {
+ throw new NotImplementedException();
+ }
+
+ ///<inheritdoc/>
+ ValueTask<bool> ICacheStore.DeleteItemAsync(string id, CancellationToken token)
+ {
+ return table.DeleteObjectAsync(id, token);
+ }
+ }
+ }
+}
diff --git a/plugins/ObjectCacheServer/src/NodeConfig.cs b/plugins/ObjectCacheServer/src/ServerClusterConfig.cs
index 3a2e10e..8e098cd 100644
--- a/plugins/ObjectCacheServer/src/NodeConfig.cs
+++ b/plugins/ObjectCacheServer/src/ServerClusterConfig.cs
@@ -1,11 +1,11 @@
/*
-* Copyright (c) 2023 Vaughn Nugent
+* Copyright (c) 2024 Vaughn Nugent
*
* Library: VNLib
* Package: ObjectCacheServer
-* File: NodeConfig.cs
+* File: ServerClusterConfig.cs
*
-* NodeConfig.cs is part of ObjectCacheServer which is part of the larger
+* ServerClusterConfig.cs is part of ObjectCacheServer which is part of the larger
* VNLib collection of libraries and utilities.
*
* ObjectCacheServer is free software: you can redistribute it and/or modify
@@ -34,40 +34,56 @@ using VNLib.Utils.Extensions;
using VNLib.Plugins.Extensions.Loading;
using VNLib.Data.Caching.Extensions.Clustering;
-
namespace VNLib.Data.Caching.ObjectCache.Server
{
[ConfigurationName("cluster")]
- internal sealed class NodeConfig
+ internal sealed class ServerClusterConfig(PluginBase plugin, IConfigScope config)
{
- //Default path for the well known endpoint
- const string DefaultPath = "/.well-known/vncache";
-
- public CacheNodeConfiguration Config { get; }
-
- public CacheAuthKeyStore KeyStore { get; }
-
- public TimeSpan DiscoveryInterval { get; }
+ public TimeSpan DiscoveryInterval { get; } = config.GetRequiredProperty("discovery_interval_sec", p => p.GetTimeSpan(TimeParseType.Seconds));
- public TimeSpan EventQueuePurgeInterval { get; }
+ public TimeSpan EventQueuePurgeInterval { get; } = config.GetRequiredProperty("queue_purge_interval_sec", p => p.GetTimeSpan(TimeParseType.Seconds));
- public int MaxQueueDepth { get; }
+ public int MaxQueueDepth { get; } = (int)config.GetRequiredProperty("max_queue_depth", p => p.GetUInt32());
- public string? DiscoveryPath { get; }
+ public string? DiscoveryPath { get; } = config.GetValueOrDefault("discovery_path", p => p.GetString(), null);
- public string ConnectPath { get; }
+ public string ConnectPath { get; } = config.GetRequiredProperty("connect_path", p => p.GetString()!);
- public string WellKnownPath { get; }
+ public string WellKnownPath { get; } = config.GetValueOrDefault("well_known_path", p => p.GetString()!, CacheConstants.DefaultWellKnownPath)
+ ?? CacheConstants.DefaultWellKnownPath;
- public bool VerifyIp { get; }
+ public bool VerifyIp { get; } = config.GetRequiredProperty("verify_ip", p => p.GetBoolean());
/// <summary>
/// The maximum number of peer connections to allow
/// </summary>
- public uint MaxPeerConnections { get; } = 10;
+ public uint MaxPeerConnections { get; } = config.GetValueOrDefault("max_peers", p => p.GetUInt32(), 10u);
+
+ /// <summary>
+ /// The maxium number of concurrent client connections to allow
+ /// before rejecting new connections
+ /// </summary>
+ public uint MaxConcurrentConnections { get; } = config.GetValueOrDefault("max_concurrent_connections", p => p.GetUInt32(), 100u);
+
+ const string CacheConfigTemplate =
+@"
+Cluster Configuration:
+ Node Id: {id}
+ TlsEndabled: {tls}
+ Verify Ip: {vi}
+ Well-Known: {wk}
+ Cache Endpoint: {ep}
+ Discovery Endpoint: {dep}
+ Discovery Interval: {di}
+ Max Peer Connections: {mpc}
+ Max Queue Depth: {mqd}
+ Event Queue Purge Interval: {eqpi}
+";
+
+ internal CacheNodeConfiguration BuildNodeConfig()
+ {
+ CacheNodeConfiguration conf = new();
- public NodeConfig(PluginBase plugin, IConfigScope config)
- {
//Get the port of the primary webserver
int port;
bool usingTls;
@@ -88,81 +104,24 @@ namespace VNLib.Data.Caching.ObjectCache.Server
//Server id is just dns name for now
string nodeId = $"{hostname}:{port}";
-
- //Init key store
- KeyStore = new(plugin);
-
-
- DiscoveryInterval = config["discovery_interval_sec"].GetTimeSpan(TimeParseType.Seconds);
-
- //Get the event queue purge interval
- EventQueuePurgeInterval = config["queue_purge_interval_sec"].GetTimeSpan(TimeParseType.Seconds);
-
- //Get the max queue depth
- MaxQueueDepth = (int)config["max_queue_depth"].GetUInt32();
-
-
- //Get the connect path
- ConnectPath = config["connect_path"].GetString() ?? throw new KeyNotFoundException("Missing required key 'connect_path' in cluster config");
-
- //Get the verify ip setting
- VerifyIp = config["verify_ip"].GetBoolean();
Uri connectEp = BuildUri(usingTls, hostname, port, ConnectPath);
Uri? discoveryEp = null;
-
- Config = new();
-
- //Setup cache node config
- Config.WithCacheEndpoint(connectEp)
- .WithNodeId(nodeId)
- .WithAuthenticator(KeyStore)
- .WithTls(usingTls);
+
+
+ conf.WithCacheEndpoint(connectEp)
+ .WithNodeId(nodeId)
+ .WithTls(usingTls);
//Get the discovery path (optional)
- if (config.TryGetValue("discovery_path", out JsonElement discoveryPathEl))
+ if (!string.IsNullOrWhiteSpace(DiscoveryPath))
{
- DiscoveryPath = discoveryPathEl.GetString();
-
- //Enable advertisment if a discovery path is present
- if (!string.IsNullOrEmpty(DiscoveryPath))
- {
- //Build the discovery endpoint, it must be an absolute uri
- discoveryEp = BuildUri(usingTls, hostname, port, DiscoveryPath);
- Config.EnableAdvertisment(discoveryEp);
- }
+ //Build the discovery endpoint, it must be an absolute uri
+ discoveryEp = BuildUri(usingTls, hostname, port, DiscoveryPath);
+ conf.EnableAdvertisment(discoveryEp);
}
- //Allow custom well-known path
- if(config.TryGetValue("well_known_path", out JsonElement wkEl))
- {
- WellKnownPath = wkEl.GetString() ?? DefaultPath;
- }
- //Default if not set
- WellKnownPath ??= DefaultPath;
-
- //Get the max peer connections
- if (config.TryGetValue("max_peers", out JsonElement maxPeerEl))
- {
- MaxPeerConnections = maxPeerEl.GetUInt32();
- }
-
- const string CacheConfigTemplate =
-@"
-Cluster Configuration:
- Node Id: {id}
- TlsEndabled: {tls}
- Verify Ip: {vi}
- Well-Known: {wk}
- Cache Endpoint: {ep}
- Discovery Endpoint: {dep}
- Discovery Interval: {di}
- Max Peer Connections: {mpc}
- Max Queue Depth: {mqd}
- Event Queue Purge Interval: {eqpi}
-";
-
- //log the config
+ //print the cluster configuration to the log
plugin.Log.Information(CacheConfigTemplate,
nodeId,
usingTls,
@@ -175,6 +134,8 @@ Cluster Configuration:
MaxQueueDepth,
EventQueuePurgeInterval
);
+
+ return conf;
}
private static Uri BuildUri(bool tls, string host, int port, string path)
diff --git a/plugins/VNLib.Data.Caching.Providers.Redis/src/VNLib.Data.Caching.Providers.Redis.csproj b/plugins/VNLib.Data.Caching.Providers.Redis/src/VNLib.Data.Caching.Providers.Redis.csproj
index c71ee72..c6da1e6 100644
--- a/plugins/VNLib.Data.Caching.Providers.Redis/src/VNLib.Data.Caching.Providers.Redis.csproj
+++ b/plugins/VNLib.Data.Caching.Providers.Redis/src/VNLib.Data.Caching.Providers.Redis.csproj
@@ -39,7 +39,7 @@
</ItemGroup>
<ItemGroup>
- <PackageReference Include="StackExchange.Redis" Version="2.7.17" />
+ <PackageReference Include="StackExchange.Redis" Version="2.7.27" />
</ItemGroup>
<ItemGroup>
diff --git a/plugins/VNLib.Data.Caching.Providers.VNCache/src/Clustering/ClusterNodeIndex.cs b/plugins/VNLib.Data.Caching.Providers.VNCache/src/Clustering/ClusterNodeIndex.cs
index c9cd746..e9dcbc5 100644
--- a/plugins/VNLib.Data.Caching.Providers.VNCache/src/Clustering/ClusterNodeIndex.cs
+++ b/plugins/VNLib.Data.Caching.Providers.VNCache/src/Clustering/ClusterNodeIndex.cs
@@ -46,7 +46,7 @@ namespace VNLib.Data.Caching.Providers.VNCache.Clustering
* it in the app domain.
*/
- public static IClusterNodeIndex CreateIndex(CacheClientConfiguration config)
+ public static IClusterNodeIndex CreateIndex(VNCacheClusterManager cluster)
{
/* TEMPORARY:
* Named semaphores are only supported on Windows, which allowed synchronized communication between
@@ -75,7 +75,7 @@ namespace VNLib.Data.Caching.Providers.VNCache.Clustering
if (remoteIndex == null)
{
//Create a new index and store it in the app domain
- IClusterNodeIndex index = new LocalHandler(config);
+ IClusterNodeIndex index = new LocalHandler(cluster);
AppDomain.CurrentDomain.SetData(APP_DOMAIN_KEY, index);
return index;
}
@@ -92,7 +92,7 @@ namespace VNLib.Data.Caching.Providers.VNCache.Clustering
}
else
{
- return new LocalHandler(config);
+ return new LocalHandler(cluster);
}
}
@@ -114,15 +114,15 @@ namespace VNLib.Data.Caching.Providers.VNCache.Clustering
* Unless VNLib.Core supports a new way to safley share types across ALCs, this is my solution.
*/
- sealed class LocalHandler(CacheClientConfiguration Config) : IClusterNodeIndex, IIntervalScheduleable
+ sealed class LocalHandler(VNCacheClusterManager cluster) : IClusterNodeIndex, IIntervalScheduleable
{
private Task _currentUpdate = Task.CompletedTask;
///<inheritdoc/>
public CacheNodeAdvertisment? GetNextNode()
{
- //Get all nodes
- CacheNodeAdvertisment[] ads = Config.NodeCollection.GetAllNodes();
+ //Get all discovered nodes
+ CacheNodeAdvertisment[] ads = cluster.DiscoveredNodes.GetAllNodes();
//Just get a random node from the collection for now
return ads.Length > 0 ? ads.SelectRandom() : null;
}
@@ -134,7 +134,7 @@ namespace VNLib.Data.Caching.Providers.VNCache.Clustering
public Task OnIntervalAsync(ILogProvider log, CancellationToken cancellationToken)
{
//Run discovery operation and update the task
- _currentUpdate = Config.DiscoverNodesAsync(cancellationToken);
+ _currentUpdate = cluster.DiscoverNodesAsync(cancellationToken);
return Task.CompletedTask;
}
diff --git a/plugins/VNLib.Data.Caching.Providers.VNCache/src/FBMCacheClient.cs b/plugins/VNLib.Data.Caching.Providers.VNCache/src/FBMCacheClient.cs
index 73783dc..07fc9ee 100644
--- a/plugins/VNLib.Data.Caching.Providers.VNCache/src/FBMCacheClient.cs
+++ b/plugins/VNLib.Data.Caching.Providers.VNCache/src/FBMCacheClient.cs
@@ -59,7 +59,7 @@ namespace VNLib.Data.Caching.Providers.VNCache
private readonly VnCacheClientConfig _config;
private readonly IClusterNodeIndex _index;
- private readonly FBMClientFactory _clientFactory;
+ private readonly VNCacheClusterClient _cluster;
private readonly TimeSpan _initNodeDelay;
private bool _isConnected;
@@ -83,9 +83,8 @@ namespace VNLib.Data.Caching.Providers.VNCache
{
ILogProvider scoped = plugin.Log.CreateScope(LOG_NAME);
- //Set authenticator and error handler
- _clientFactory.GetCacheConfiguration()
- .WithAuthenticator(new AuthManager(plugin))
+ //When in plugin context, we can use plugin local secrets and a log-based error handler
+ _cluster.Config.WithAuthenticator(new AuthManager(plugin))
.WithErrorHandler(new DiscoveryErrHAndler(scoped));
//Only the master index is schedulable
@@ -116,17 +115,20 @@ namespace VNLib.Data.Caching.Providers.VNCache
//Init the client with default settings
FBMClientConfig conf = FBMDataCacheExtensions.GetDefaultConfig(BufferHeap, (int)config.MaxBlobSize, config.RequestTimeout, debugLog);
-
- FBMFallbackClientWsFactory wsFactory = new();
- _clientFactory = new(in conf, wsFactory);
-
- //Add the configuration to the client
- _clientFactory.GetCacheConfiguration()
+
+ FBMClientFactory clientFactory = new(
+ in conf,
+ new FBMFallbackClientWsFactory(),
+ 10
+ );
+
+ _cluster = (new CacheClientConfiguration())
.WithTls(config.UseTls)
- .WithInitialPeers(config.GetInitialNodeUris());
+ .WithInitialPeers(config.GetInitialNodeUris())
+ .ToClusterClient(clientFactory);
//Init index
- _index = ClusterNodeIndex.CreateIndex(_clientFactory.GetCacheConfiguration());
+ _index = ClusterNodeIndex.CreateIndex(_cluster);
}
/*
@@ -216,7 +218,7 @@ namespace VNLib.Data.Caching.Providers.VNCache
pluginLog.Debug("Connecting to {node}", node);
//Connect to the node and save new client
- _client = await _clientFactory.ConnectToCacheAsync(node, exitToken);
+ _client = await _cluster.ConnectToCacheAsync(node, exitToken);
if (pluginLog.IsEnabled(LogLevel.Debug))
{
@@ -256,6 +258,12 @@ namespace VNLib.Data.Caching.Providers.VNCache
pluginLog.Verbose("Stack trace: {re}", he);
await Task.Delay(1000, exitToken);
}
+ catch(HttpRequestException hre) when (hre.InnerException is SocketException se)
+ {
+ pluginLog.Warn("Failed to establish a TCP connection to server {server} {reason}", node.NodeId, se.Message);
+ pluginLog.Verbose("Stack trace: {re}", se);
+ await Task.Delay(1000, exitToken);
+ }
finally
{
_isConnected = false;
@@ -327,22 +335,11 @@ namespace VNLib.Data.Caching.Providers.VNCache
///<inheritdoc/>
public override object GetUnderlyingStore() => _client ?? throw new InvalidOperationException("The client is not currently connected");
- private sealed class AuthManager : ICacheAuthManager
+ private sealed class AuthManager(PluginBase plugin) : ICacheAuthManager
{
- private IAsyncLazy<ReadOnlyJsonWebKey> _sigKey;
- private IAsyncLazy<ReadOnlyJsonWebKey> _verKey;
-
- public AuthManager(PluginBase plugin)
- {
- //Lazy load keys
-
- //Get the signing key
- _sigKey = plugin.GetSecretAsync("client_private_key").ToLazy(static r => r.GetJsonWebKey());
-
- //Lazy load cache public key
- _verKey = plugin.GetSecretAsync("cache_public_key").ToLazy(static r => r.GetJsonWebKey());
- }
+ private IAsyncLazy<ReadOnlyJsonWebKey> _sigKey = plugin.GetSecretAsync("client_private_key").ToLazy(static r => r.GetJsonWebKey());
+ private IAsyncLazy<ReadOnlyJsonWebKey> _verKey = plugin.GetSecretAsync("cache_public_key").ToLazy(static r => r.GetJsonWebKey());
public async Task AwaitLazyKeyLoad()
{
@@ -413,9 +410,42 @@ namespace VNLib.Data.Caching.Providers.VNCache
private sealed record class DiscoveryErrHAndler(ILogProvider Logger) : ICacheDiscoveryErrorHandler
{
- public void OnDiscoveryError(CacheNodeAdvertisment errorNode, Exception ex)
+ public void OnDiscoveryError(CacheNodeAdvertisment errorNode, Exception ex)
+ => OnDiscoveryError(errorNode, ex);
+
+ public void OnDiscoveryError(Uri errorAddress, Exception ex)
+ => OnDiscoveryError(ex, null, errorAddress);
+
+ public void OnDiscoveryError(Exception ex, CacheNodeAdvertisment? errorNode, Uri? address)
{
- Logger.Error("Failed to discover nodes from server {s} cause:\n{err}", errorNode?.NodeId, ex);
+ string node = errorNode?.NodeId ?? address?.ToString() ?? "unknown";
+
+ if(ex is HttpRequestException he)
+ {
+ if(he.InnerException is SocketException se)
+ {
+ LogErrorException(se);
+ return;
+ }
+
+ LogErrorException(he);
+ return;
+ }
+
+ LogErrorException(ex);
+ return;
+
+ void LogErrorException(Exception ex)
+ {
+ if(Logger.IsEnabled(LogLevel.Debug))
+ {
+ Logger.Error("Failed to discover nodes from server {s} cause:\n{err}", node, ex);
+ }
+ else
+ {
+ Logger.Error("Failed to discover nodes from server {s} cause:\n{err}", node, ex.Message);
+ }
+ }
}
}
}
diff --git a/plugins/VNLib.Data.Caching.Providers.VNCache/src/VnCacheClientConfig.cs b/plugins/VNLib.Data.Caching.Providers.VNCache/src/VnCacheClientConfig.cs
index 383c979..0d6cd34 100644
--- a/plugins/VNLib.Data.Caching.Providers.VNCache/src/VnCacheClientConfig.cs
+++ b/plugins/VNLib.Data.Caching.Providers.VNCache/src/VnCacheClientConfig.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2023 Vaughn Nugent
+* Copyright (c) 2024 Vaughn Nugent
*
* Library: VNLib
* Package: VNLib.Data.Caching.Providers.VNCache
@@ -33,6 +33,8 @@ namespace VNLib.Data.Caching.Providers.VNCache
/// </summary>
public class VnCacheClientConfig : VNCacheConfig
{
+ const string DefaultWellKnownEndpoint = "/.well-known/vncache";
+
/// <summary>
/// The broker server address
/// </summary>
@@ -88,7 +90,13 @@ namespace VNLib.Data.Caching.Providers.VNCache
public Uri[] GetInitialNodeUris()
{
_ = InitialNodes ?? throw new InvalidOperationException("Initial nodes have not been set");
- return InitialNodes.Select(static x => new Uri(x, UriKind.Absolute)).ToArray();
+ return InitialNodes.Select(static x =>
+ {
+ //Append a default well known endpoint if the path is just a root
+ Uri ur = new (x, UriKind.Absolute);
+ return ur.LocalPath == "/" ? new Uri(ur, DefaultWellKnownEndpoint) : ur;
+ })
+ .ToArray();
}
///<inheritdoc/>