aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLibravatar vnugent <public@vaughnnugent.com>2023-07-15 13:06:00 -0400
committerLibravatar vnugent <public@vaughnnugent.com>2023-07-15 13:06:00 -0400
commit8b4fb26473256da5eaa89f3e9d2ac5d44f1e9b88 (patch)
tree6ff979b6110b9e6c61ff9f22bb0dbdd2094e08cf
parent2f674e79d42e7d36225fa9ac7ecefbc5bc62d325 (diff)
Latest working draft
-rw-r--r--lib/VNLib.Data.Caching.Extensions/src/ApiModel/ServiceEndpoints.cs9
-rw-r--r--lib/VNLib.Data.Caching.ObjectCache/src/BlobCacheLIstener.cs6
-rw-r--r--lib/VNLib.Data.Caching.ObjectCache/src/ICacheListenerEventQueue.cs3
-rw-r--r--lib/VNLib.Plugins.Extensions.VNCache/src/Clustering/ClusterNodeIndex.cs75
-rw-r--r--lib/VNLib.Plugins.Extensions.VNCache/src/Clustering/IClusterNodeIndex.cs49
-rw-r--r--lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs94
-rw-r--r--lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClientConfig.cs12
-rw-r--r--plugins/ObjectCacheServer/src/Cache/CacheConfiguration.cs (renamed from plugins/ObjectCacheServer/src/CacheConfiguration.cs)4
-rw-r--r--plugins/ObjectCacheServer/src/Cache/CacheEventQueueManager.cs (renamed from plugins/ObjectCacheServer/src/CacheEventQueueManager.cs)52
-rw-r--r--plugins/ObjectCacheServer/src/Cache/CacheListenerPubQueue.cs (renamed from plugins/ObjectCacheServer/src/CacheListenerPubQueue.cs)31
-rw-r--r--plugins/ObjectCacheServer/src/Cache/CacheStore.cs (renamed from plugins/ObjectCacheServer/src/CacheStore.cs)24
-rw-r--r--plugins/ObjectCacheServer/src/Cache/CacheSystemUtil.cs (renamed from plugins/ObjectCacheServer/src/CacheSystemUtil.cs)8
-rw-r--r--plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs1
-rw-r--r--plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs99
-rw-r--r--plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs56
-rw-r--r--plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs14
-rw-r--r--plugins/ObjectCacheServer/src/Endpoints/WellKnownEndpoint.cs27
-rw-r--r--plugins/ObjectCacheServer/src/NodeConfig.cs128
-rw-r--r--plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs8
19 files changed, 478 insertions, 222 deletions
diff --git a/lib/VNLib.Data.Caching.Extensions/src/ApiModel/ServiceEndpoints.cs b/lib/VNLib.Data.Caching.Extensions/src/ApiModel/ServiceEndpoints.cs
index c0de4a3..413bd3c 100644
--- a/lib/VNLib.Data.Caching.Extensions/src/ApiModel/ServiceEndpoints.cs
+++ b/lib/VNLib.Data.Caching.Extensions/src/ApiModel/ServiceEndpoints.cs
@@ -89,6 +89,15 @@ namespace VNLib.Data.Caching.Extensions.ApiModel
}
}
+ /*
+ * These methods define consitend endpoint definitions that must match the server
+ * endpoints.
+ *
+ * Most requests will send an authorization header with a signed JWT that includes
+ * a random challenge that the server will return in the response.
+ *
+ * All responses will be signed JWTs that must be verified before continuing
+ */
private static string BuildDiscoveryAuthToken(ICacheConnectionRequest request)
{
diff --git a/lib/VNLib.Data.Caching.ObjectCache/src/BlobCacheLIstener.cs b/lib/VNLib.Data.Caching.ObjectCache/src/BlobCacheLIstener.cs
index 0fe0663..d69c6bb 100644
--- a/lib/VNLib.Data.Caching.ObjectCache/src/BlobCacheLIstener.cs
+++ b/lib/VNLib.Data.Caching.ObjectCache/src/BlobCacheLIstener.cs
@@ -160,21 +160,21 @@ namespace VNLib.Data.Caching.ObjectCache
}
//Determine if the queue is enabled for the user
- if(!EventQueue.IsEnabled(userState!))
+ if(!EventQueue.IsEnabled(userState))
{
context.CloseResponse(ResponseCodes.NotFound);
return;
}
//try to deq without awaiting
- if (EventQueue.TryDequeue(userState!, out ChangeEvent? change))
+ if (EventQueue.TryDequeue(userState, out ChangeEvent? change))
{
SetResponse(change, context);
}
else
{
//Wait for a new message to process
- ChangeEvent ev = await EventQueue.DequeueAsync(userState!, exitToken);
+ ChangeEvent ev = await EventQueue.DequeueAsync(userState, exitToken);
//Set the response
SetResponse(ev, context);
diff --git a/lib/VNLib.Data.Caching.ObjectCache/src/ICacheListenerEventQueue.cs b/lib/VNLib.Data.Caching.ObjectCache/src/ICacheListenerEventQueue.cs
index 06be4fa..439de94 100644
--- a/lib/VNLib.Data.Caching.ObjectCache/src/ICacheListenerEventQueue.cs
+++ b/lib/VNLib.Data.Caching.ObjectCache/src/ICacheListenerEventQueue.cs
@@ -25,6 +25,7 @@
using System.Threading;
using System.Threading.Tasks;
+using System.Diagnostics.CodeAnalysis;
namespace VNLib.Data.Caching.ObjectCache
{
@@ -38,7 +39,7 @@ namespace VNLib.Data.Caching.ObjectCache
/// </summary>
/// <param name="userState">The unique state of the connection</param>
/// <returns>True if event queuing is enabled</returns>
- bool IsEnabled(object userState);
+ bool IsEnabled([NotNullWhen(true)] object? userState);
/// <summary>
/// Attempts to dequeue a single event from the queue without blocking
diff --git a/lib/VNLib.Plugins.Extensions.VNCache/src/Clustering/ClusterNodeIndex.cs b/lib/VNLib.Plugins.Extensions.VNCache/src/Clustering/ClusterNodeIndex.cs
new file mode 100644
index 0000000..487a4f9
--- /dev/null
+++ b/lib/VNLib.Plugins.Extensions.VNCache/src/Clustering/ClusterNodeIndex.cs
@@ -0,0 +1,75 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Plugins.Extensions.VNCache
+* File: ClusterNodeIndex.cs
+*
+* ClusterNodeIndex.cs is part of VNLib.Plugins.Extensions.VNCache which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* VNLib.Plugins.Extensions.VNCache is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Plugins.Extensions.VNCache is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System.Threading;
+using System.Threading.Tasks;
+
+using VNLib.Utils.Logging;
+using VNLib.Data.Caching.Extensions;
+using VNLib.Data.Caching.Extensions.Clustering;
+using VNLib.Plugins.Extensions.Loading.Events;
+
+namespace VNLib.Plugins.Extensions.VNCache.Clustering
+{
+ internal sealed class ClusterNodeIndex : IClusterNodeIndex, IIntervalScheduleable
+ {
+ private readonly CacheClientConfiguration _config;
+ private Task _currentUpdate;
+
+
+ public ClusterNodeIndex(CacheClientConfiguration config)
+ {
+ _config = config;
+ _currentUpdate = Task.CompletedTask;
+ }
+
+ ///<inheritdoc/>
+ public CacheNodeAdvertisment? GetNextNode()
+ {
+ //Get all nodes
+ CacheNodeAdvertisment[] ads = _config.NodeCollection.GetAllNodes();
+ //Just get a random node from the collection for now
+ return ads.Length > 0 ? ads.SelectRandom() : null;
+ }
+
+ ///<inheritdoc/>
+ public Task WaitForDiscoveryAsync(CancellationToken cancellationToken)
+ {
+ return _currentUpdate.WaitAsync(cancellationToken);
+ }
+
+ /// <summary>
+ /// Runs the discovery process and updates the current update task
+ /// </summary>
+ /// <param name="log"></param>
+ /// <param name="cancellationToken">A token to cancel the operation</param>
+ /// <returns>A task that completes when the discovery process is complete</returns>
+ public Task OnIntervalAsync(ILogProvider log, CancellationToken cancellationToken)
+ {
+ //Run discovery operation and update the task
+ _currentUpdate = _config.DiscoverNodesAsync(cancellationToken);
+ return Task.CompletedTask;
+ }
+ }
+} \ No newline at end of file
diff --git a/lib/VNLib.Plugins.Extensions.VNCache/src/Clustering/IClusterNodeIndex.cs b/lib/VNLib.Plugins.Extensions.VNCache/src/Clustering/IClusterNodeIndex.cs
new file mode 100644
index 0000000..ffbfa0d
--- /dev/null
+++ b/lib/VNLib.Plugins.Extensions.VNCache/src/Clustering/IClusterNodeIndex.cs
@@ -0,0 +1,49 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Plugins.Extensions.VNCache
+* File: IClusterNodeIndex.cs
+*
+* IClusterNodeIndex.cs is part of VNLib.Plugins.Extensions.VNCache which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* VNLib.Plugins.Extensions.VNCache is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Plugins.Extensions.VNCache is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System.Threading;
+using System.Threading.Tasks;
+
+using VNLib.Data.Caching.Extensions.Clustering;
+
+namespace VNLib.Plugins.Extensions.VNCache.Clustering
+{
+ internal interface IClusterNodeIndex
+ {
+ /// <summary>
+ /// Gets the next available node using the configured balancing policy
+ /// or null if no nodes are available
+ /// </summary>
+ /// <returns>The next available node to connect to if any are available</returns>
+ CacheNodeAdvertisment? GetNextNode();
+
+ /// <summary>
+ /// Waits for the discovery process to complete. This is just incase a
+ /// connection wants to happen while a long discovery is processing.
+ /// </summary>
+ /// <param name="cancellationToken">A token to cancel the operation</param>
+ /// <returns>A task that resolves when the discovery process completes</returns>
+ Task WaitForDiscoveryAsync(CancellationToken cancellationToken);
+ }
+} \ No newline at end of file
diff --git a/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs b/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs
index aa3d88f..679c91d 100644
--- a/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs
+++ b/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs
@@ -41,6 +41,8 @@ using VNLib.Data.Caching.ObjectCache;
using VNLib.Net.Messaging.FBM.Client;
using VNLib.Plugins.Extensions.Loading;
using VNLib.Data.Caching.Extensions.Clustering;
+using VNLib.Plugins.Extensions.Loading.Events;
+using VNLib.Plugins.Extensions.VNCache.Clustering;
namespace VNLib.Plugins.Extensions.VNCache
{
@@ -51,14 +53,18 @@ namespace VNLib.Plugins.Extensions.VNCache
TimeSpan RefreshInterval { get; }
}
-
/// <summary>
/// A base class that manages
/// </summary>
[ConfigurationName(VNCacheExtensions.CACHE_CONFIG_KEY)]
internal class VnCacheClient : IGlobalCacheProvider, IAsyncBackgroundWork
{
+ private const string LOG_NAME = "CLIENT";
+ private static readonly TimeSpan InitialDelay = TimeSpan.FromSeconds(10);
+ private static readonly TimeSpan NoNodeDelay = TimeSpan.FromSeconds(10);
+
private readonly VnCacheClientConfig _config;
+ private readonly ClusterNodeIndex _index;
/// <summary>
/// The internal client
@@ -76,10 +82,23 @@ namespace VNLib.Plugins.Extensions.VNCache
plugin.IsDebug() ? plugin.Log : null
)
{
+ ILogProvider scoped = plugin.Log.CreateScope(LOG_NAME);
+
//Set authenticator and error handler
Client.GetCacheConfiguration()
.WithAuthenticator(new AuthManager(plugin))
- .WithErrorHandler(new DiscoveryErrHAndler(plugin.Log));
+ .WithErrorHandler(new DiscoveryErrHAndler(scoped));
+
+ //Schedule discovery interval
+ plugin.ScheduleInterval(_index, _config.DiscoveryInterval);
+
+ //Run discovery after initial delay if interval is greater than initial delay
+ if(_config.DiscoveryInterval > InitialDelay)
+ {
+ //Run a manual initial load
+ scoped.Information("Running initial discovery in {delay}", InitialDelay);
+ _ = plugin.ObserveWork(() => _index.OnIntervalAsync(scoped, plugin.UnloadToken), (int)InitialDelay.TotalMilliseconds);
+ }
}
public VnCacheClient(VnCacheClientConfig config, ILogProvider? debugLog)
@@ -98,6 +117,9 @@ namespace VNLib.Plugins.Extensions.VNCache
Client.GetCacheConfiguration()
.WithTls(config.UseTls)
.WithInitialPeers(config.GetInitialNodeUris());
+
+ //Init index
+ _index = new ClusterNodeIndex(Client.GetCacheConfiguration());
}
/*
@@ -106,46 +128,50 @@ namespace VNLib.Plugins.Extensions.VNCache
*/
public virtual async Task DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken)
{
+ //Scope log
+ pluginLog = pluginLog.CreateScope(LOG_NAME);
+
try
{
+ //Initial delay
+ pluginLog.Debug("Worker started, waiting for startup delay");
+ await Task.Delay((int)InitialDelay.TotalMilliseconds + 1000, exitToken);
+
while (true)
{
- //Discover nodes in the network
-
- while (true)
+ //Wait for a discovery to complete
+ await _index.WaitForDiscoveryAsync(exitToken);
+
+ //Get the next node to connect to
+ CacheNodeAdvertisment? node = _index.GetNextNode();
+
+ if (node is null)
{
- try
- {
- pluginLog.Debug("Discovering cluster nodes in network");
- //Get server list
- await Client.DiscoverAvailableNodesAsync(exitToken);
- break;
- }
- catch (HttpRequestException re) when (re.InnerException is SocketException)
- {
- pluginLog.Warn("Broker server is unreachable");
- }
- catch(CacheDiscoveryFailureException ce)
- {
- pluginLog.Warn("Failed to discover cache servers, reason {r}", ce);
- }
- catch (Exception ex)
+ pluginLog.Warn("No nodes available to connect to, trying again in {delay}", NoNodeDelay);
+ await Task.Delay(NoNodeDelay, exitToken);
+
+ //Run another manual discovery if the interval is greater than the delay
+ if (_config.DiscoveryInterval > NoNodeDelay)
{
- pluginLog.Warn("Failed to get server list from broker, reason {r}", ex.Message);
+ pluginLog.Debug("Forcing a manual discovery");
+
+ //We dont need to await this because it is awaited at the top of the loop
+ _ = _index.OnIntervalAsync(pluginLog, exitToken);
}
- //Gen random ms delay
- int randomMsDelay = RandomNumberGenerator.GetInt32(1000, 2000);
- await Task.Delay(randomMsDelay, exitToken);
+ continue;
}
+ //Ready to connect
+
try
{
- pluginLog.Debug("Connecting to random cache server");
+ pluginLog.Debug("Connecting to {node}", node);
- //Connect to a random server
- CacheNodeAdvertisment selected = await Client.ConnectToRandomCacheAsync(exitToken);
- pluginLog.Debug("Connected to cache server {s}", selected.NodeId);
+ //Connect to the node
+ await Client.ConnectToCacheAsync(node, exitToken);
+
+ pluginLog.Debug("Successfully connected to {s}", node);
//Set connection status flag
IsConnected = true;
@@ -157,7 +183,7 @@ namespace VNLib.Plugins.Extensions.VNCache
}
catch (WebSocketException wse)
{
- pluginLog.Warn("Failed to connect to cache server {reason}", wse.Message);
+ pluginLog.Warn("Failed to connect to cache server {reason}", wse);
continue;
}
catch (HttpRequestException he) when (he.InnerException is SocketException)
@@ -170,6 +196,8 @@ namespace VNLib.Plugins.Extensions.VNCache
{
IsConnected = false;
}
+
+ //Loop again
}
}
catch (OperationCanceledException)
@@ -279,14 +307,14 @@ namespace VNLib.Plugins.Extensions.VNCache
public byte[] SignMessageHash(byte[] hash, HashAlg alg)
{
//try to get the rsa alg for the signing key
- using RSA? rsa = _sigKey.Value.GetRSAPublicKey();
+ using RSA? rsa = _sigKey.Value.GetRSAPrivateKey();
if(rsa != null)
{
return rsa.SignHash(hash, alg.GetAlgName(), RSASignaturePadding.Pkcs1);
}
//try to get the ecdsa alg for the signing key
- using ECDsa? ecdsa = _sigKey.Value.GetECDsaPublicKey();
+ using ECDsa? ecdsa = _sigKey.Value.GetECDsaPrivateKey();
if(ecdsa != null)
{
return ecdsa.SignHash(hash);
@@ -326,7 +354,7 @@ namespace VNLib.Plugins.Extensions.VNCache
{
public void OnDiscoveryError(CacheNodeAdvertisment errorNode, Exception ex)
{
- Logger.Error("Failed to discover nodes from server {s} cause:\n{err}", errorNode.NodeId, ex);
+ Logger.Error("Failed to discover nodes from server {s} cause:\n{err}", errorNode?.NodeId, ex);
}
}
}
diff --git a/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClientConfig.cs b/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClientConfig.cs
index 73fb70f..bfa9d92 100644
--- a/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClientConfig.cs
+++ b/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClientConfig.cs
@@ -53,8 +53,8 @@ namespace VNLib.Plugins.Extensions.VNCache
/// The time (in seconds) to randomly delay polling the broker server
/// for available servers
/// </summary>
- [JsonPropertyName("retry_interval_sec")]
- public int? RetryIntervalSeconds { get; set; }
+ [JsonPropertyName("discovery_interval_Sec")]
+ public int? DiscoveryIntervalSeconds { get; set; }
/// <summary>
/// The maximum time (in seconds) for FBM cache operations are allowed
@@ -71,7 +71,7 @@ namespace VNLib.Plugins.Extensions.VNCache
/// <summary>
/// Retry interval in a timespan
/// </summary>
- internal TimeSpan RetryInterval => TimeSpan.FromSeconds(RetryIntervalSeconds!.Value);
+ internal TimeSpan DiscoveryInterval => TimeSpan.FromSeconds(DiscoveryIntervalSeconds!.Value);
/// <summary>
/// FBM Request timeout
@@ -92,7 +92,7 @@ namespace VNLib.Plugins.Extensions.VNCache
public Uri[] GetInitialNodeUris()
{
_ = InitialNodes ?? throw new InvalidOperationException("Initial nodes have not been set");
- return InitialNodes.Select(x => new Uri(x)).ToArray();
+ return InitialNodes.Select(static x => new Uri(x, UriKind.Absolute)).ToArray();
}
void IOnConfigValidation.Validate()
@@ -102,7 +102,7 @@ namespace VNLib.Plugins.Extensions.VNCache
throw new ArgumentException("Your maxium message size should be a reasonable value greater than 0", "max_message_size");
}
- if (!RetryIntervalSeconds.HasValue || RetryIntervalSeconds.Value < 1)
+ if (!DiscoveryIntervalSeconds.HasValue || DiscoveryIntervalSeconds.Value < 1)
{
throw new ArgumentException("You must specify a retry interval period greater than 0", "retry_interval_sec");
}
@@ -128,7 +128,7 @@ namespace VNLib.Plugins.Extensions.VNCache
}
//Verify http connection
- if(peer.Scheme != Uri.UriSchemeHttp || peer.Scheme != Uri.UriSchemeHttps)
+ if(peer.Scheme != Uri.UriSchemeHttp && peer.Scheme != Uri.UriSchemeHttps)
{
throw new ArgumentException("You must specify an HTTP or HTTPS URI for each initial node", "initial_nodes");
}
diff --git a/plugins/ObjectCacheServer/src/CacheConfiguration.cs b/plugins/ObjectCacheServer/src/Cache/CacheConfiguration.cs
index f7adeb3..86e1f5a 100644
--- a/plugins/ObjectCacheServer/src/CacheConfiguration.cs
+++ b/plugins/ObjectCacheServer/src/Cache/CacheConfiguration.cs
@@ -24,7 +24,7 @@
using System.Text.Json.Serialization;
-namespace VNLib.Data.Caching.ObjectCache.Server
+namespace VNLib.Data.Caching.ObjectCache.Server.Cache
{
internal sealed class CacheConfiguration
{
@@ -42,7 +42,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server
[JsonPropertyName("max_message_size")]
public int MaxMessageSize { get; set; } = 1000 * 1024;
-
+
[JsonPropertyName("change_queue_max_depth")]
public int MaxEventQueueDepth { get; set; } = 10 * 1000;
diff --git a/plugins/ObjectCacheServer/src/CacheEventQueueManager.cs b/plugins/ObjectCacheServer/src/Cache/CacheEventQueueManager.cs
index 3827121..ad0eb5a 100644
--- a/plugins/ObjectCacheServer/src/CacheEventQueueManager.cs
+++ b/plugins/ObjectCacheServer/src/Cache/CacheEventQueueManager.cs
@@ -37,31 +37,31 @@ using VNLib.Plugins.Extensions.Loading;
using VNLib.Plugins.Extensions.Loading.Events;
-namespace VNLib.Data.Caching.ObjectCache.Server
+namespace VNLib.Data.Caching.ObjectCache.Server.Cache
{
-
- [ConfigurationName("event_manager")]
internal sealed class CacheEventQueueManager : ICacheEventQueueManager, IDisposable, IIntervalScheduleable
{
private readonly int MaxQueueDepth;
private readonly object SubLock;
- private readonly LinkedList<NodeQueue> Subscribers;
+ private readonly LinkedList<NodeQueue> Subscribers;
private readonly object StoreLock;
private readonly Dictionary<string, NodeQueue> QueueStore;
-
- public CacheEventQueueManager(PluginBase plugin, IConfigScope config)
+
+ public CacheEventQueueManager(PluginBase plugin)
{
- //Get purge interval
- TimeSpan purgeInterval = config["purge_interval_sec"].GetTimeSpan(TimeParseType.Seconds);
+ //Get node config
+ NodeConfig config = plugin.GetOrCreateSingleton<NodeConfig>();
//Get max queue depth
- MaxQueueDepth = (int)config["max_depth"].GetUInt32();
+ MaxQueueDepth = config.MaxQueueDepth;
- //Create purge interval
- plugin.ScheduleInterval(this, purgeInterval);
+ /*
+ * Schedule purge interval to clean up stale queues
+ */
+ plugin.ScheduleInterval(this, config.EventQueuePurgeInterval);
SubLock = new();
Subscribers = new();
@@ -81,7 +81,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server
lock (StoreLock)
{
//Try to recover the queue for the node
- if(!QueueStore.TryGetValue(peer.NodeId, out nq))
+ if (!QueueStore.TryGetValue(peer.NodeId, out nq))
{
//Create new queue
nq = new(peer.NodeId, MaxQueueDepth);
@@ -163,21 +163,21 @@ namespace VNLib.Data.Caching.ObjectCache.Server
public void PurgeStaleSubscribers()
{
//Enter locks
- lock(SubLock)
- lock(StoreLock)
- {
- //Get all stale queues (queues without listeners)
- NodeQueue[] staleQueues = QueueStore.Values.Where(static nq => nq.Listeners == 0).ToArray();
-
- foreach (NodeQueue nq in staleQueues)
+ lock (SubLock)
+ lock (StoreLock)
{
- //Remove from store
- QueueStore.Remove(nq.NodeId);
+ //Get all stale queues (queues without listeners)
+ NodeQueue[] staleQueues = QueueStore.Values.Where(static nq => nq.Listeners == 0).ToArray();
+
+ foreach (NodeQueue nq in staleQueues)
+ {
+ //Remove from store
+ QueueStore.Remove(nq.NodeId);
- //remove from subscribers
- Subscribers.Remove(nq);
+ //remove from subscribers
+ Subscribers.Remove(nq);
+ }
}
- }
}
//Interval to purge stale subscribers
@@ -186,7 +186,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server
log.Debug("Purging stale peer event queues");
PurgeStaleSubscribers();
-
+
return Task.CompletedTask;
}
@@ -241,7 +241,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server
public void PublishChanges(Span<ChangeEvent> changes)
{
- for(int i = 0; i < changes.Length; i++)
+ for (int i = 0; i < changes.Length; i++)
{
Queue.TryEnque(changes[i]);
}
diff --git a/plugins/ObjectCacheServer/src/CacheListenerPubQueue.cs b/plugins/ObjectCacheServer/src/Cache/CacheListenerPubQueue.cs
index 9c7388e..ba39db6 100644
--- a/plugins/ObjectCacheServer/src/CacheListenerPubQueue.cs
+++ b/plugins/ObjectCacheServer/src/Cache/CacheListenerPubQueue.cs
@@ -26,16 +26,28 @@ using System;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Channels;
+using System.Diagnostics.CodeAnalysis;
using VNLib.Utils.Async;
using VNLib.Utils.Logging;
using VNLib.Plugins;
using VNLib.Plugins.Extensions.Loading;
-namespace VNLib.Data.Caching.ObjectCache.Server
+namespace VNLib.Data.Caching.ObjectCache.Server.Cache
{
+ /*
+ * Implements the event queue for the cache listener. Captures changes from the cache store
+ * and publishes them to subscribers.
+ *
+ * It also allows clients that are listening for changes to wait for events to
+ * their individual queues.
+ */
+
internal sealed class CacheListenerPubQueue : ICacheListenerEventQueue, IAsyncBackgroundWork
{
+ private const int MAX_LOCAL_QUEUE_ITEMS = 10000;
+ private const string LOG_SCOPE_NAME = "QUEUE";
+
private readonly AsyncQueue<ChangeEvent> _listenerQueue;
private readonly ILogProvider _logProvider;
private readonly ICacheEventQueueManager _queueManager;
@@ -43,8 +55,10 @@ namespace VNLib.Data.Caching.ObjectCache.Server
public CacheListenerPubQueue(PluginBase plugin)
{
_queueManager = plugin.GetOrCreateSingleton<CacheEventQueueManager>();
- _logProvider = plugin.Log;
- _listenerQueue = new AsyncQueue<ChangeEvent>(new BoundedChannelOptions(10000)
+ _logProvider = plugin.Log.CreateScope(LOG_SCOPE_NAME);
+
+ //Init local queue to store published events
+ _listenerQueue = new(new BoundedChannelOptions(MAX_LOCAL_QUEUE_ITEMS)
{
AllowSynchronousContinuations = true,
FullMode = BoundedChannelFullMode.DropOldest,
@@ -54,12 +68,17 @@ namespace VNLib.Data.Caching.ObjectCache.Server
}
///<inheritdoc/>
- public async Task DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken)
+ async Task IAsyncBackgroundWork.DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken)
{
const int accumulatorSize = 64;
+ //Create scope
+ pluginLog = pluginLog.CreateScope(LOG_SCOPE_NAME);
+
try
{
+ pluginLog.Debug("Change queue worker listening for local cache changes");
+
//Accumulator for events
ChangeEvent[] accumulator = new ChangeEvent[accumulatorSize];
int index = 0;
@@ -89,9 +108,9 @@ namespace VNLib.Data.Caching.ObjectCache.Server
pluginLog.Debug("Change queue listener worker exited");
}
}
-
+
///<inheritdoc/>
- public bool IsEnabled(object userState)
+ public bool IsEnabled([NotNullWhen(true)] object? userState)
{
return userState is IPeerEventQueue;
}
diff --git a/plugins/ObjectCacheServer/src/CacheStore.cs b/plugins/ObjectCacheServer/src/Cache/CacheStore.cs
index c1d47f6..f94a3f5 100644
--- a/plugins/ObjectCacheServer/src/CacheStore.cs
+++ b/plugins/ObjectCacheServer/src/Cache/CacheStore.cs
@@ -30,11 +30,19 @@ using VNLib.Utils.Logging;
using VNLib.Plugins;
using VNLib.Plugins.Extensions.Loading;
-namespace VNLib.Data.Caching.ObjectCache.Server
+namespace VNLib.Data.Caching.ObjectCache.Server.Cache
{
+ /*
+ * Implements the blob cache store, which is an abstraction around the blob cache listener.
+ * This allows for publishing local events (say from other nodes) to keep caches in sync.
+ */
+
[ConfigurationName("cache")]
internal sealed class CacheStore : ICacheStore, IDisposable
{
+ /// <summary>
+ /// Gets the underlying cache listener
+ /// </summary>
public BlobCacheListener Listener { get; }
@@ -44,16 +52,19 @@ namespace VNLib.Data.Caching.ObjectCache.Server
Listener = InitializeCache((ObjectCacheServerEntry)plugin, config);
}
+ ///<inheritdoc/>
ValueTask ICacheStore.AddOrUpdateBlobAsync<T>(string objectId, string? alternateId, GetBodyDataCallback<T> bodyData, T state, CancellationToken token)
{
return Listener.Cache.AddOrUpdateObjectAsync(objectId, alternateId, bodyData, state, default, token);
}
+ ///<inheritdoc/>
void ICacheStore.Clear()
{
throw new NotImplementedException();
}
+ ///<inheritdoc/>
ValueTask<bool> ICacheStore.DeleteItemAsync(string id, CancellationToken token)
{
return Listener.Cache.DeleteObjectAsync(id, token);
@@ -81,14 +92,14 @@ Cache Configuration:
if (cacheConf.MaxCacheEntries < 200)
{
plugin.Log.Information("Suggestion: You may want a larger cache size, you have less than 200 items in cache");
- }
+ }
//calculate the max memory usage
- ulong maxByteSize = ((ulong)cacheConf.MaxCacheEntries * (ulong)cacheConf.BucketCount * (ulong)cacheConf.MaxMessageSize);
+ ulong maxByteSize = cacheConf.MaxCacheEntries * (ulong)cacheConf.BucketCount * (ulong)cacheConf.MaxMessageSize;
//Log the cache config
- plugin.Log.Information(CacheConfigTemplate,
- maxByteSize / (ulong)(1024 * 1000),
+ plugin.Log.Information(CacheConfigTemplate,
+ maxByteSize / (1024 * 1000),
cacheConf.BucketCount,
cacheConf.MaxCacheEntries
);
@@ -103,6 +114,9 @@ Cache Configuration:
return new(bc, queue, plugin.Log, plugin.CacheHeap);
}
+ /*
+ * Cleaned up by the plugin on exit
+ */
public void Dispose()
{
Listener.Dispose();
diff --git a/plugins/ObjectCacheServer/src/CacheSystemUtil.cs b/plugins/ObjectCacheServer/src/Cache/CacheSystemUtil.cs
index f8aedae..2071d2b 100644
--- a/plugins/ObjectCacheServer/src/CacheSystemUtil.cs
+++ b/plugins/ObjectCacheServer/src/Cache/CacheSystemUtil.cs
@@ -33,7 +33,7 @@ using VNLib.Plugins;
using VNLib.Utils.Memory;
using VNLib.Plugins.Extensions.Loading;
-namespace VNLib.Data.Caching.ObjectCache.Server
+namespace VNLib.Data.Caching.ObjectCache.Server.Cache
{
internal static class CacheSystemUtil
{
@@ -136,9 +136,9 @@ namespace VNLib.Data.Caching.ObjectCache.Server
//Return the
return new(loader);
}
-
- private sealed class RuntimeBlobCacheTable : IBlobCacheTable
+
+ private sealed class RuntimeBlobCacheTable : IBlobCacheTable
{
private readonly IBlobCacheTable _table;
@@ -173,7 +173,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server
internal sealed class PersistantCacheManager : IPersistantCacheStore
{
const string INITIALIZE_METHOD_NAME = "OnInitializeForBucket";
-
+
/*
* Our referrence can be technically unloaded, but so will
diff --git a/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs b/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs
index ffdd4f4..5a04737 100644
--- a/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs
+++ b/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs
@@ -36,6 +36,7 @@ using VNLib.Net.Messaging.FBM;
using VNLib.Net.Messaging.FBM.Client;
using VNLib.Plugins.Extensions.Loading;
using VNLib.Data.Caching.Extensions.Clustering;
+using VNLib.Data.Caching.ObjectCache.Server.Cache;
namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
{
diff --git a/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs b/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs
index f132cab..65cc009 100644
--- a/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs
+++ b/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs
@@ -31,9 +31,9 @@ using System.Threading.Tasks;
using System.Collections.Generic;
using VNLib.Utils.Logging;
-using VNLib.Data.Caching.Extensions;
using VNLib.Plugins;
using VNLib.Plugins.Extensions.Loading;
+using VNLib.Data.Caching.Extensions;
using VNLib.Data.Caching.Extensions.Clustering;
namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
@@ -46,38 +46,48 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
internal sealed class PeerDiscoveryManager : IAsyncBackgroundWork, ICachePeerAdapter
{
private const string LOG_SCOPE_NAME = "DISC";
+ /*
+ * The initial discovery delay. This allows for the server to initialize before
+ * starting the discovery process. This will probably be a shorter delay
+ * than a usual discovery interval.
+ */
private static readonly TimeSpan InitialDelay = TimeSpan.FromSeconds(15);
+ private static readonly TimeSpan WhenWellKnownResolveFailed = TimeSpan.FromSeconds(20);
+
private readonly List<CacheNodeAdvertisment> _connectedPeers;
- private readonly NodeConfig _config;
- private readonly CachePeerMonitor _monitor;
+ private readonly NodeConfig Config;
+ private readonly CachePeerMonitor Monitor;
+ private readonly ILogProvider Log;
private readonly bool IsDebug;
- private readonly ILogProvider _log;
+ private readonly bool HasWellKnown;
public PeerDiscoveryManager(PluginBase plugin)
{
//Get config
- _config = plugin.GetOrCreateSingleton<NodeConfig>();
+ Config = plugin.GetOrCreateSingleton<NodeConfig>();
//Get the known peers array from config, its allowed to be null for master nodes
IConfigScope? config = plugin.TryGetConfig("known_peers");
string[] kownPeers = config?.Deserialze<string[]>() ?? Array.Empty<string>();
//Add known peers to the monitor
- _config.Config.WithInitialPeers(kownPeers.Select(static s => new Uri(s)));
+ Config.Config.WithInitialPeers(kownPeers.Select(static s => new Uri(s)));
- plugin.Log.Information("Inital peer nodes: {nodes}", kownPeers);
+ HasWellKnown = kownPeers.Length > 0;
//Get the peer monitor
- _monitor = plugin.GetOrCreateSingleton<CachePeerMonitor>();
+ Monitor = plugin.GetOrCreateSingleton<CachePeerMonitor>();
_connectedPeers = new();
//Create scoped logger
- _log = plugin.Log.CreateScope(LOG_SCOPE_NAME);
+ Log = plugin.Log.CreateScope(LOG_SCOPE_NAME);
+
+ Log.Information("Inital peer nodes: {nodes}", kownPeers);
//Setup discovery error handler
- _config.Config.WithErrorHandler(new ErrorHandler(_log));
+ Config.Config.WithErrorHandler(new ErrorHandler(Log));
IsDebug = plugin.IsDebug();
}
@@ -93,33 +103,29 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
//Start the change listener
Task watcher = WatchForPeersAsync(exitToken);
- _log.Information("Node discovery worker started, waiting for {idel} to start initial discovery", InitialDelay);
+ Log.Information("Node discovery worker started, waiting for {idel} to start initial discovery", InitialDelay);
try
{
//Wait for the initial delay
await Task.Delay(InitialDelay, exitToken);
- _log.Debug("Begining discovery loop");
-
- /*
- * To avoid connecting to ourself, we add ourselves to the connected list
- * and it should never get removed. This is because the monitor will never
- * report our own advertisment.
- */
- _connectedPeers.Add(_config.Config.Advertisment);
+ Log.Debug("Begining discovery loop");
while (true)
{
+ bool wellKnownFailed = false;
+
try
{
if (IsDebug)
{
- _log.Debug("Begining node discovery");
+ Log.Debug("Begining node discovery");
}
//Resolve all known peers
- CacheNodeAdvertisment[] wellKnown = await _config.Config.ResolveWellKnownAsync(exitToken);
+ CacheNodeAdvertisment[] wellKnown = await Config.Config.ResolveWellKnownAsync(exitToken);
+ wellKnownFailed = wellKnown.Length == 0;
//Use the monitor to get the initial peers
IEnumerable<CacheNodeAdvertisment> ads = GetMonitorAds();
@@ -130,15 +136,15 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
if (allAds.Length > 0)
{
//Discover all kown nodes
- await _config.Config.DiscoverNodesAsync(allAds, exitToken);
+ await Config.Config.DiscoverNodesAsync(allAds, exitToken);
}
//Log the discovered nodes if verbose logging is enabled
if (IsDebug)
{
- CacheNodeAdvertisment[] found = _config.Config.NodeCollection.GetAllNodes();
+ CacheNodeAdvertisment[] found = Config.Config.NodeCollection.GetAllNodes();
- _log.Debug("Discovered {count} nodes\n\t{nodes}", found.Length, found.Select(static s => s.NodeId));
+ Log.Debug("Discovered {count} nodes\n\t{nodes}", found.Length, found.Select(static s => s.NodeId));
}
}
catch(OperationCanceledException)
@@ -147,17 +153,38 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
}
catch (Exception ex)
{
- _log.Error(ex, "Failed to discover new peer nodes");
+ Log.Error(ex, "Failed to discover new peer nodes");
}
- //Delay the next discovery
- await Task.Delay(_config.DiscoveryInterval, exitToken);
+ /*
+ * If we have well known nodes and the discovery failed, we wait for a shorter
+ * duration before retrying. This is to avoid spamming the network with requests
+ * if the well known nodes are down. But if we don't have any well known nodes
+ * we cannot continue.
+ *
+ * This only matters if we are exepcted to have well known nodes.
+ */
+ if(HasWellKnown && wellKnownFailed)
+ {
+ if (IsDebug)
+ {
+ Log.Debug("Well known node discovery failed, waiting for {idel} before retrying", WhenWellKnownResolveFailed);
+ }
+
+ //Wait for shorter duration
+ await Task.Delay(WhenWellKnownResolveFailed, exitToken);
+ }
+ else
+ {
+ //Delay the next discovery
+ await Task.Delay(Config.DiscoveryInterval, exitToken);
+ }
}
}
catch (OperationCanceledException)
{
//Normal exit
- _log.Information("Node discovery worker exiting");
+ Log.Information("Node discovery worker exiting on plugin exit");
}
finally
{
@@ -170,10 +197,10 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
private IEnumerable<CacheNodeAdvertisment> GetMonitorAds()
{
- return _monitor.GetAllPeers()
+ return Monitor.GetAllPeers()
.Where(static p => p.Advertisment != null)
//Without us
- .Where(n => n.NodeId != _config.Config.NodeId)
+ .Where(n => n.NodeId != Config.Config.NodeId)
.Select(static p => p.Advertisment!);
}
@@ -182,26 +209,26 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
{
try
{
- _log.Debug("Discovery worker waiting for new peers to connect");
+ Log.Debug("Discovery worker waiting for new peers to connect");
while (true)
{
//Wait for changes, then get new peers
- await _monitor.WaitForChangeAsync().WaitAsync(cancellation);
+ await Monitor.WaitForChangeAsync().WaitAsync(cancellation);
- _log.Verbose("New peers connected");
+ Log.Verbose("New peers connected");
//Use the monitor to get the initial peers
IEnumerable<CacheNodeAdvertisment> ads = GetMonitorAds();
- ((NodeDiscoveryCollection)_config.Config.NodeCollection).AddManualNodes(ads);
+ ((NodeDiscoveryCollection)Config.Config.NodeCollection).AddManualNodes(ads);
}
}
catch (OperationCanceledException)
{
//Normal ext
- _log.Debug("Connected peer listener exited");
+ Log.Debug("Connected peer listener exited");
}
}
@@ -212,7 +239,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
lock (_connectedPeers)
{
//Get all discovered peers
- CacheNodeAdvertisment[] peers = _config.Config.NodeCollection.GetAllNodes();
+ CacheNodeAdvertisment[] peers = Config.Config.NodeCollection.GetAllNodes();
//Get the difference between the discovered peers and the connected peers
return peers.Except(_connectedPeers).ToArray();
diff --git a/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs b/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs
index 5e794f8..d232fd8 100644
--- a/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs
+++ b/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs
@@ -45,13 +45,13 @@ using VNLib.Plugins.Extensions.Loading;
using VNLib.Plugins.Essentials.Endpoints;
using VNLib.Plugins.Essentials.Extensions;
using VNLib.Plugins.Extensions.Loading.Routing;
-using VNLib.Data.Caching.ObjectCache.Server.Distribution;
using VNLib.Data.Caching.Extensions.Clustering;
+using VNLib.Data.Caching.ObjectCache.Server.Cache;
+using VNLib.Data.Caching.ObjectCache.Server.Clustering;
namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
{
- [ConfigurationName("connect_endpoint")]
internal sealed class ConnectEndpoint : ResourceEndpointBase
{
private const string LOG_SCOPE_NAME = "CONEP";
@@ -62,10 +62,8 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
private readonly NodeConfig NodeConfiguration;
private readonly ICacheEventQueueManager PubSubManager;
private readonly IPeerMonitor Peers;
-
private readonly BlobCacheListener Store;
-
- private readonly bool VerifyIp;
+
private readonly string AudienceLocalServerId;
private uint _connectedClients;
@@ -88,21 +86,17 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
DisableSessionsRequired = true
};
- public ConnectEndpoint(PluginBase plugin, IConfigScope config)
+ public ConnectEndpoint(PluginBase plugin)
{
- string? path = config["path"].GetString();
-
- InitPathAndLog(path, plugin.Log.CreateScope(LOG_SCOPE_NAME));
+ //Get node configuration
+ NodeConfiguration = plugin.GetOrCreateSingleton<NodeConfig>();
- //Check for ip-verification flag
- VerifyIp = config.TryGetValue("verify_ip", out JsonElement vIpEl) && vIpEl.GetBoolean();
+ //Init from config and create a new log scope
+ InitPathAndLog(NodeConfiguration.ConnectPath, plugin.Log.CreateScope(LOG_SCOPE_NAME));
//Setup pub/sub manager
PubSubManager = plugin.GetOrCreateSingleton<CacheEventQueueManager>();
- //Get node configuration
- NodeConfiguration = plugin.GetOrCreateSingleton<NodeConfig>();
-
//Get peer monitor
Peers = plugin.GetOrCreateSingleton<CachePeerMonitor>();
@@ -186,7 +180,14 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
}
}
- Log.Debug("Received negotiation request from node {node}", nodeId);
+ if (isPeer)
+ {
+ Log.Debug("Received negotiation request from peer node {node}", nodeId);
+ }
+ else
+ {
+ Log.Debug("Received negotiation request from client {client}", entity.TrustedRemoteIp.ToString());
+ }
//Verified, now we can create an auth message with a short expiration
using JsonWebToken auth = new();
@@ -256,7 +257,8 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
//Verify audience, expiration
- if (!doc.RootElement.TryGetProperty("aud", out JsonElement audEl) || !AudienceLocalServerId.Equals(audEl.GetString(), StringComparison.OrdinalIgnoreCase))
+ if (!doc.RootElement.TryGetProperty("aud", out JsonElement audEl)
+ || !AudienceLocalServerId.Equals(audEl.GetString(), StringComparison.OrdinalIgnoreCase))
{
entity.CloseResponse(HttpStatusCode.Unauthorized);
return VfReturnType.VirtualSkip;
@@ -270,7 +272,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
}
//Check node ip address matches if required
- if (VerifyIp)
+ if (NodeConfiguration.VerifyIp)
{
if (!doc.RootElement.TryGetProperty("ip", out JsonElement ipEl))
{
@@ -318,6 +320,8 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
}
}
+ WsUserState state;
+
try
{
//Get query config suggestions from the client
@@ -340,7 +344,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
int maxMessageSizeClamp = Math.Clamp(maxMessageSize, CacheConfig.MinRecvBufferSize, CacheConfig.MaxRecvBufferSize);
//Init new ws state object and clamp the suggested buffer sizes
- WsUserState state = new()
+ state = new()
{
RecvBufferSize = Math.Clamp(recvBufSize, CacheConfig.MinRecvBufferSize, CacheConfig.MaxRecvBufferSize),
MaxHeaderBufferSize = Math.Clamp(maxHeadBufSize, CacheConfig.MinHeaderBufferSize, CacheConfig.MaxHeaderBufferSize),
@@ -356,20 +360,18 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
NodeId = nodeId,
Advertisment = discoveryAd
};
-
- Log.Debug("Client recv buffer suggestion {recv}, header buffer size {head}, response buffer size {r}", recvBufCmd, maxHeaderCharCmd, state.MaxResponseBufferSize);
-
- //Print state message to console
- Log.Verbose("Client buffer state {state}", state);
-
- //Accept socket and pass state object
- entity.AcceptWebSocket(WebsocketAcceptedAsync, state);
- return VfReturnType.VirtualSkip;
}
catch (KeyNotFoundException)
{
return VfReturnType.BadRequest;
}
+
+ //Print state message to console
+ Log.Debug("Client buffer state {state}", state);
+
+ //Accept socket and pass state object
+ entity.AcceptWebSocket(WebsocketAcceptedAsync, state);
+ return VfReturnType.VirtualSkip;
}
private async Task WebsocketAcceptedAsync(WebSocketSession wss)
diff --git a/plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs b/plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs
index 77d59dd..adb83e0 100644
--- a/plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs
+++ b/plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs
@@ -27,19 +27,17 @@ using System.Net;
using System.Linq;
using System.Text.Json;
-using VNLib.Hashing;
using VNLib.Hashing.IdentityUtility;
using VNLib.Plugins;
using VNLib.Plugins.Essentials;
using VNLib.Plugins.Essentials.Endpoints;
using VNLib.Plugins.Essentials.Extensions;
using VNLib.Plugins.Extensions.Loading;
-using VNLib.Data.Caching.ObjectCache.Server.Distribution;
using VNLib.Data.Caching.Extensions.Clustering;
+using VNLib.Data.Caching.ObjectCache.Server.Clustering;
namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
{
- [ConfigurationName("discovery_endpoint")]
internal sealed class PeerDiscoveryEndpoint : ResourceEndpointBase
{
private readonly IPeerMonitor PeerMonitor;
@@ -53,17 +51,15 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
DisableSessionsRequired = true
};
- public PeerDiscoveryEndpoint(PluginBase plugin, IConfigScope config)
+ public PeerDiscoveryEndpoint(PluginBase plugin)
{
- string? path = config["path"].GetString();
-
- InitPathAndLog(path, plugin.Log);
-
//Get the peer monitor
PeerMonitor = plugin.GetOrCreateSingleton<CachePeerMonitor>();
//Get the node config
Config = plugin.GetOrCreateSingleton<NodeConfig>();
+
+ InitPathAndLog(Config.DiscoveryPath, plugin.Log);
}
protected override VfReturnType Get(HttpEntity entity)
@@ -100,7 +96,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
using JsonDocument payload = jwt.GetPayload();
//Get client info to pass back
- subject = payload.RootElement.GetProperty("sub").GetString() ?? string.Empty;
+ subject = payload.RootElement.TryGetProperty("sub", out JsonElement subEl) ? subEl.GetString() ?? string.Empty : string.Empty;
challenge = payload.RootElement.GetProperty("chl").GetString() ?? string.Empty;
}
diff --git a/plugins/ObjectCacheServer/src/Endpoints/WellKnownEndpoint.cs b/plugins/ObjectCacheServer/src/Endpoints/WellKnownEndpoint.cs
index 99c7f19..3fcc471 100644
--- a/plugins/ObjectCacheServer/src/Endpoints/WellKnownEndpoint.cs
+++ b/plugins/ObjectCacheServer/src/Endpoints/WellKnownEndpoint.cs
@@ -24,10 +24,7 @@
using System;
using System.Net;
-using System.Text.Json;
-using VNLib.Data.Caching.Extensions;
-using VNLib.Data.Caching.Extensions.Clustering;
using VNLib.Hashing;
using VNLib.Hashing.IdentityUtility;
using VNLib.Plugins;
@@ -35,6 +32,8 @@ using VNLib.Plugins.Essentials;
using VNLib.Plugins.Essentials.Endpoints;
using VNLib.Plugins.Essentials.Extensions;
using VNLib.Plugins.Extensions.Loading;
+using VNLib.Data.Caching.Extensions;
+using VNLib.Data.Caching.Extensions.Clustering;
namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
{
@@ -44,12 +43,8 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
* the network. Clients need to know the endpoint layout to be able to
* connect and discover other nodes.
*/
-
- [ConfigurationName("well_known", Required = false)]
internal sealed class WellKnownEndpoint : ResourceEndpointBase
- {
- //Default path for the well known endpoint
- const string DefaultPath = "/.well-known/vncache";
+ {
//Store serialized advertisment
private readonly CacheNodeAdvertisment _advertisment;
@@ -62,10 +57,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
DisableSessionsRequired = true,
};
- public WellKnownEndpoint(PluginBase plugin):this(plugin, null)
- { }
-
- public WellKnownEndpoint(PluginBase plugin, IConfigScope? config)
+ public WellKnownEndpoint(PluginBase plugin)
{
//Get the node config
NodeConfig nodeConfig = plugin.GetOrCreateSingleton<NodeConfig>();
@@ -74,16 +66,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
_advertisment = nodeConfig.Config.Advertisment;
_keyStore = nodeConfig.KeyStore;
- //Default to the well known path
- string path = DefaultPath;
-
- //See if the user configured a path
- if(config != null && config.TryGetValue("path", out JsonElement pathEl))
- {
- path = pathEl.GetString() ?? DefaultPath;
- }
-
- InitPathAndLog(path, plugin.Log);
+ InitPathAndLog(nodeConfig.WellKnownPath, plugin.Log);
}
protected override VfReturnType Get(HttpEntity entity)
diff --git a/plugins/ObjectCacheServer/src/NodeConfig.cs b/plugins/ObjectCacheServer/src/NodeConfig.cs
index a6c5be9..3a2e10e 100644
--- a/plugins/ObjectCacheServer/src/NodeConfig.cs
+++ b/plugins/ObjectCacheServer/src/NodeConfig.cs
@@ -26,29 +26,22 @@ using System;
using System.Net;
using System.Linq;
using System.Text.Json;
+using System.Collections.Generic;
using VNLib.Plugins;
using VNLib.Utils.Logging;
using VNLib.Utils.Extensions;
using VNLib.Plugins.Extensions.Loading;
-using VNLib.Data.Caching.ObjectCache.Server.Endpoints;
using VNLib.Data.Caching.Extensions.Clustering;
+
namespace VNLib.Data.Caching.ObjectCache.Server
{
[ConfigurationName("cluster")]
internal sealed class NodeConfig
{
- const string CacheConfigTemplate =
-@"
-Cluster Configuration:
- Node Id: {id}
- TlsEndabled: {tls}
- Cache Endpoint: {ep}
- Discovery Endpoint: {dep}
- Discovery Interval: {di}
- Max Peers: {mpc}
-";
+ //Default path for the well known endpoint
+ const string DefaultPath = "/.well-known/vncache";
public CacheNodeConfiguration Config { get; }
@@ -56,16 +49,25 @@ Cluster Configuration:
public TimeSpan DiscoveryInterval { get; }
+ public TimeSpan EventQueuePurgeInterval { get; }
+
+ public int MaxQueueDepth { get; }
+
+ public string? DiscoveryPath { get; }
+
+ public string ConnectPath { get; }
+
+ public string WellKnownPath { get; }
+
+ public bool VerifyIp { get; }
+
/// <summary>
/// The maximum number of peer connections to allow
/// </summary>
public uint MaxPeerConnections { get; } = 10;
public NodeConfig(PluginBase plugin, IConfigScope config)
- {
-
- Config = new();
-
+ {
//Get the port of the primary webserver
int port;
bool usingTls;
@@ -86,56 +88,104 @@ Cluster Configuration:
//Server id is just dns name for now
string nodeId = $"{hostname}:{port}";
-
- //The endpoint to advertise to cache clients that allows cache connections
- Uri cacheEndpoint = GetEndpointUri<ConnectEndpoint>(plugin, usingTls, port, hostname);
-
+
//Init key store
KeyStore = new(plugin);
+
+ DiscoveryInterval = config["discovery_interval_sec"].GetTimeSpan(TimeParseType.Seconds);
+
+ //Get the event queue purge interval
+ EventQueuePurgeInterval = config["queue_purge_interval_sec"].GetTimeSpan(TimeParseType.Seconds);
+
+ //Get the max queue depth
+ MaxQueueDepth = (int)config["max_queue_depth"].GetUInt32();
+
+
+ //Get the connect path
+ ConnectPath = config["connect_path"].GetString() ?? throw new KeyNotFoundException("Missing required key 'connect_path' in cluster config");
+
+ //Get the verify ip setting
+ VerifyIp = config["verify_ip"].GetBoolean();
+
+ Uri connectEp = BuildUri(usingTls, hostname, port, ConnectPath);
+ Uri? discoveryEp = null;
+
+ Config = new();
+
//Setup cache node config
- Config.WithCacheEndpoint(cacheEndpoint)
+ Config.WithCacheEndpoint(connectEp)
.WithNodeId(nodeId)
.WithAuthenticator(KeyStore)
.WithTls(usingTls);
- //Check if advertising is enabled
- if(plugin.HasConfigForType<PeerDiscoveryEndpoint>())
+ //Get the discovery path (optional)
+ if (config.TryGetValue("discovery_path", out JsonElement discoveryPathEl))
{
- //Get the the broadcast endpoint
- Uri discoveryEndpoint = GetEndpointUri<PeerDiscoveryEndpoint>(plugin, usingTls, port, hostname);
-
- //Enable advertising
- Config.EnableAdvertisment(discoveryEndpoint);
+ DiscoveryPath = discoveryPathEl.GetString();
+
+ //Enable advertisment if a discovery path is present
+ if (!string.IsNullOrEmpty(DiscoveryPath))
+ {
+ //Build the discovery endpoint, it must be an absolute uri
+ discoveryEp = BuildUri(usingTls, hostname, port, DiscoveryPath);
+ Config.EnableAdvertisment(discoveryEp);
+ }
}
-
- DiscoveryInterval = config["discovery_interval_sec"].GetTimeSpan(TimeParseType.Seconds);
+ //Allow custom well-known path
+ if(config.TryGetValue("well_known_path", out JsonElement wkEl))
+ {
+ WellKnownPath = wkEl.GetString() ?? DefaultPath;
+ }
+ //Default if not set
+ WellKnownPath ??= DefaultPath;
//Get the max peer connections
- if(config.TryGetValue("max_peers", out JsonElement maxPeerEl))
+ if (config.TryGetValue("max_peers", out JsonElement maxPeerEl))
{
MaxPeerConnections = maxPeerEl.GetUInt32();
}
+ const string CacheConfigTemplate =
+@"
+Cluster Configuration:
+ Node Id: {id}
+ TlsEndabled: {tls}
+ Verify Ip: {vi}
+ Well-Known: {wk}
+ Cache Endpoint: {ep}
+ Discovery Endpoint: {dep}
+ Discovery Interval: {di}
+ Max Peer Connections: {mpc}
+ Max Queue Depth: {mqd}
+ Event Queue Purge Interval: {eqpi}
+";
+
//log the config
plugin.Log.Information(CacheConfigTemplate,
nodeId,
usingTls,
- cacheEndpoint,
- Config.DiscoveryEndpoint,
+ VerifyIp,
+ WellKnownPath,
+ connectEp,
+ discoveryEp,
DiscoveryInterval,
- MaxPeerConnections
+ MaxPeerConnections,
+ MaxQueueDepth,
+ EventQueuePurgeInterval
);
}
- private static Uri GetEndpointUri<T>(PluginBase plugin, bool usingTls, int port, string hostName) where T: IEndpoint
+ private static Uri BuildUri(bool tls, string host, int port, string path)
{
- //Get the cache endpoint config
- IConfigScope cacheEpConfig = plugin.GetConfigForType<T>();
-
- //The endpoint to advertise to cache clients that allows cache connections
- return new UriBuilder(usingTls ? Uri.UriSchemeHttps : Uri.UriSchemeHttp, hostName, port, cacheEpConfig["path"].GetString()).Uri;
+ return new UriBuilder
+ {
+ Scheme = tls ? "https" : "http",
+ Host = host,
+ Port = port,
+ Path = path
+ }.Uri;
}
}
}
diff --git a/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs b/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs
index 1ddf49b..a566390 100644
--- a/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs
+++ b/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs
@@ -33,7 +33,7 @@ using VNLib.Utils.Memory.Diagnostics;
using VNLib.Plugins.Extensions.Loading;
using VNLib.Plugins.Extensions.Loading.Routing;
using VNLib.Data.Caching.ObjectCache.Server.Endpoints;
-using VNLib.Data.Caching.ObjectCache.Server.Distribution;
+using VNLib.Data.Caching.ObjectCache.Server.Clustering;
namespace VNLib.Data.Caching.ObjectCache.Server
{
@@ -71,11 +71,13 @@ namespace VNLib.Data.Caching.ObjectCache.Server
}
}
-
protected override void OnLoad()
{
try
{
+ //Get the node configuration first
+ NodeConfig config = this.GetOrCreateSingleton<NodeConfig>();
+
//Route well-known endpoint
this.Route<WellKnownEndpoint>();
@@ -86,7 +88,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server
_ = this.GetOrCreateSingleton<CacheNodeReplicationMaanger>();
//Setup discovery endpoint
- if(this.HasConfigForType<PeerDiscoveryEndpoint>())
+ if(!string.IsNullOrWhiteSpace(config.DiscoveryPath))
{
this.Route<PeerDiscoveryEndpoint>();
}