aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLibravatar vnugent <public@vaughnnugent.com>2024-03-09 19:13:21 -0500
committerLibravatar vnugent <public@vaughnnugent.com>2024-03-09 19:13:21 -0500
commit5d4192880654fd6e00e587814169415b42621327 (patch)
treef35e2e41e346c5067f0195e7b0f7197e9729e940
parenta4b3504bb891829074d1efde0433eae010862181 (diff)
chore: #2 Minor fixes and polish before release
-rw-r--r--lib/VNLib.Data.Caching.Extensions/src/Clustering/ICacheDiscoveryErrorHandler.cs9
-rw-r--r--lib/VNLib.Data.Caching.Extensions/src/VNCacheClusterManager.cs8
-rw-r--r--plugins/ObjectCacheServer/src/BucketLocalManagerFactory.cs20
-rw-r--r--plugins/ObjectCacheServer/src/Cache/CacheListenerPubQueue.cs22
-rw-r--r--plugins/ObjectCacheServer/src/Cache/CacheStore.cs61
-rw-r--r--plugins/ObjectCacheServer/src/Cache/PeerEventQueueManager.cs2
-rw-r--r--plugins/ObjectCacheServer/src/CacheAuthKeyStore.cs14
-rw-r--r--plugins/ObjectCacheServer/src/CacheConstants.cs107
-rw-r--r--plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs47
-rw-r--r--plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs36
-rw-r--r--plugins/ObjectCacheServer/src/Endpoints/CacheNegotationManager.cs20
-rw-r--r--plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs11
-rw-r--r--plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs14
-rw-r--r--plugins/ObjectCacheServer/src/Endpoints/WellKnownEndpoint.cs8
-rw-r--r--plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs36
-rw-r--r--plugins/ObjectCacheServer/src/ObjectCacheSystemState.cs126
-rw-r--r--plugins/ObjectCacheServer/src/ServerClusterConfig.cs (renamed from plugins/ObjectCacheServer/src/NodeConfig.cs)104
-rw-r--r--plugins/VNLib.Data.Caching.Providers.VNCache/src/FBMCacheClient.cs37
18 files changed, 431 insertions, 251 deletions
diff --git a/lib/VNLib.Data.Caching.Extensions/src/Clustering/ICacheDiscoveryErrorHandler.cs b/lib/VNLib.Data.Caching.Extensions/src/Clustering/ICacheDiscoveryErrorHandler.cs
index 984ce3d..f27f1fb 100644
--- a/lib/VNLib.Data.Caching.Extensions/src/Clustering/ICacheDiscoveryErrorHandler.cs
+++ b/lib/VNLib.Data.Caching.Extensions/src/Clustering/ICacheDiscoveryErrorHandler.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2023 Vaughn Nugent
+* Copyright (c) 2024 Vaughn Nugent
*
* Library: VNLib
* Package: VNLib.Data.Caching.Extensions
@@ -37,5 +37,12 @@ namespace VNLib.Data.Caching.Extensions.Clustering
/// <param name="errorNode">The node that the error occured on</param>
/// <param name="ex">The exception that caused the invocation</param>
void OnDiscoveryError(CacheNodeAdvertisment errorNode, Exception ex);
+
+ /// <summary>
+ /// Invoked when an error occurs during the discovery process
+ /// </summary>
+ /// <param name="errorAddress">The server address that failed to connect</param>
+ /// <param name="ex">The exception that caused the invocation</param>
+ void OnDiscoveryError(Uri errorAddress, Exception ex);
}
}
diff --git a/lib/VNLib.Data.Caching.Extensions/src/VNCacheClusterManager.cs b/lib/VNLib.Data.Caching.Extensions/src/VNCacheClusterManager.cs
index 4a1a6bd..f68968d 100644
--- a/lib/VNLib.Data.Caching.Extensions/src/VNCacheClusterManager.cs
+++ b/lib/VNLib.Data.Caching.Extensions/src/VNCacheClusterManager.cs
@@ -93,6 +93,12 @@ namespace VNLib.Data.Caching.Extensions
/// <returns>An array of resolved nodes</returns>
public async Task<CacheNodeAdvertisment[]> ResolveWellKnownAsync(CancellationToken cancellation)
{
+ //Make sure at least one node defined
+ if (config?.WellKnownNodes == null || config.WellKnownNodes.Length == 0)
+ {
+ throw new ArgumentException("There must be at least one cache node defined in the client configuration");
+ }
+
Task<CacheNodeAdvertisment?>[] initialAdds = new Task<CacheNodeAdvertisment?>[config.WellKnownNodes.Length];
//Discover initial advertisments from well-known addresses
@@ -283,7 +289,7 @@ namespace VNLib.Data.Caching.Extensions
catch (Exception ex) when (config.ErrorHandler != null)
{
//Handle the error
- config.ErrorHandler.OnDiscoveryError(null!, ex);
+ config.ErrorHandler.OnDiscoveryError(serverUri, ex);
return null;
}
catch (Exception ex)
diff --git a/plugins/ObjectCacheServer/src/BucketLocalManagerFactory.cs b/plugins/ObjectCacheServer/src/BucketLocalManagerFactory.cs
index 944aa4b..6f733ed 100644
--- a/plugins/ObjectCacheServer/src/BucketLocalManagerFactory.cs
+++ b/plugins/ObjectCacheServer/src/BucketLocalManagerFactory.cs
@@ -94,6 +94,15 @@ namespace VNLib.Data.Caching.ObjectCache.Server
_statsLogger = plugin.Log.CreateScope("Cache MemStats");
}
+ protected override void Free()
+ {
+ //Free heaps on exit
+ foreach (BucketLocalManager manager in _managers)
+ {
+ manager.Heap.Dispose();
+ }
+ }
+
public void LogHeapStats()
{
//If tracking is not enabled, the heap instances stored by the managers will not be tracked, and the cast in the code below will fail
@@ -109,16 +118,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server
}).ToArray();
- _statsLogger.Debug("Priting memory statistics for cache memory manager: {hm}\n{stats}", GetHashCode(), statsPerHeap);
- }
-
- protected override void Free()
- {
- //Free heaps on exit
- foreach (BucketLocalManager manager in _managers)
- {
- manager.Heap.Dispose();
- }
+ _statsLogger.Debug("Memory statistics for cache memory manager: {hm}\n{stats}", GetHashCode(), statsPerHeap);
}
/*
diff --git a/plugins/ObjectCacheServer/src/Cache/CacheListenerPubQueue.cs b/plugins/ObjectCacheServer/src/Cache/CacheListenerPubQueue.cs
index 16fda39..aef0255 100644
--- a/plugins/ObjectCacheServer/src/Cache/CacheListenerPubQueue.cs
+++ b/plugins/ObjectCacheServer/src/Cache/CacheListenerPubQueue.cs
@@ -45,39 +45,33 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache
internal sealed class CacheListenerPubQueue : ICacheListenerEventQueue<IPeerEventQueue>, IAsyncBackgroundWork
{
- private const int MAX_LOCAL_QUEUE_ITEMS = 10000;
- private const string LOG_SCOPE_NAME = "QUEUE";
-
private readonly AsyncQueue<ChangeEvent> _listenerQueue;
private readonly ILogProvider _logProvider;
private readonly PeerEventQueueManager _queueManager;
- public CacheListenerPubQueue(PluginBase plugin)
+ public CacheListenerPubQueue(PluginBase plugin, PeerEventQueueManager queueMan)
{
- _queueManager = plugin.GetOrCreateSingleton<PeerEventQueueManager>();
- _logProvider = plugin.Log.CreateScope(LOG_SCOPE_NAME);
+ _queueManager = queueMan;
+ _logProvider = plugin.Log.CreateScope(CacheConstants.LogScopes.CacheListenerPubQueue);
//Init local queue to store published events
- _listenerQueue = new(new BoundedChannelOptions(MAX_LOCAL_QUEUE_ITEMS)
+ _listenerQueue = new(new BoundedChannelOptions(CacheConstants.CacheListenerChangeQueueSize)
{
AllowSynchronousContinuations = true,
FullMode = BoundedChannelFullMode.DropOldest,
- SingleReader = true,
+ SingleReader = true, //Always a singe thread reading events
SingleWriter = false,
});
}
///<inheritdoc/>
- async Task IAsyncBackgroundWork.DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken)
+ async Task IAsyncBackgroundWork.DoWorkAsync(ILogProvider _, CancellationToken exitToken)
{
const int accumulatorSize = 64;
- //Create scope
- pluginLog = pluginLog.CreateScope(LOG_SCOPE_NAME);
-
try
{
- pluginLog.Debug("Change queue worker listening for local cache changes");
+ _logProvider.Debug("Change queue worker listening for local cache changes");
//Accumulator for events
ChangeEvent[] accumulator = new ChangeEvent[accumulatorSize];
@@ -105,7 +99,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache
catch (OperationCanceledException)
{
//Normal exit
- pluginLog.Debug("Change queue listener worker exited");
+ _logProvider.Debug("Change queue listener worker exited");
}
}
diff --git a/plugins/ObjectCacheServer/src/Cache/CacheStore.cs b/plugins/ObjectCacheServer/src/Cache/CacheStore.cs
deleted file mode 100644
index 81f4843..0000000
--- a/plugins/ObjectCacheServer/src/Cache/CacheStore.cs
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
-* Copyright (c) 2024 Vaughn Nugent
-*
-* Library: VNLib
-* Package: ObjectCacheServer
-* File: CacheStore.cs
-*
-* CacheStore.cs is part of ObjectCacheServer which is part of the larger
-* VNLib collection of libraries and utilities.
-*
-* ObjectCacheServer is free software: you can redistribute it and/or modify
-* it under the terms of the GNU Affero General Public License as
-* published by the Free Software Foundation, either version 3 of the
-* License, or (at your option) any later version.
-*
-* ObjectCacheServer is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-* GNU Affero General Public License for more details.
-*
-* You should have received a copy of the GNU Affero General Public License
-* along with this program. If not, see https://www.gnu.org/licenses/.
-*/
-
-using System;
-using System.Threading;
-using System.Threading.Tasks;
-
-using VNLib.Plugins.Extensions.Loading;
-
-namespace VNLib.Data.Caching.ObjectCache.Server.Cache
-{
-
- /*
- * Implements the blob cache store, which is an abstraction around the blob cache listener.
- * This allows for publishing local events (say from other nodes) to keep caches in sync.
- */
-
- [ConfigurationName("cache")]
- internal sealed class CacheStore(IBlobCacheTable table) : ICacheStore
- {
-
- ///<inheritdoc/>
- ValueTask ICacheStore.AddOrUpdateBlobAsync<T>(string objectId, string? alternateId, ObjectDataGet<T> bodyData, T state, CancellationToken token)
- {
- return table.AddOrUpdateObjectAsync(objectId, alternateId, bodyData, state, default, token);
- }
-
- ///<inheritdoc/>
- void ICacheStore.Clear()
- {
- throw new NotImplementedException();
- }
-
- ///<inheritdoc/>
- ValueTask<bool> ICacheStore.DeleteItemAsync(string id, CancellationToken token)
- {
- return table.DeleteObjectAsync(id, token);
- }
- }
-}
diff --git a/plugins/ObjectCacheServer/src/Cache/PeerEventQueueManager.cs b/plugins/ObjectCacheServer/src/Cache/PeerEventQueueManager.cs
index 12cf37a..4b76a9b 100644
--- a/plugins/ObjectCacheServer/src/Cache/PeerEventQueueManager.cs
+++ b/plugins/ObjectCacheServer/src/Cache/PeerEventQueueManager.cs
@@ -48,7 +48,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache
private readonly object StoreLock = new();
private readonly Dictionary<string, PeerEventListenerQueue> QueueStore = new(StringComparer.OrdinalIgnoreCase);
- public PeerEventQueueManager(PluginBase plugin, NodeConfig config)
+ public PeerEventQueueManager(PluginBase plugin, ServerClusterConfig config)
{
MaxQueueDepth = config.MaxQueueDepth;
diff --git a/plugins/ObjectCacheServer/src/CacheAuthKeyStore.cs b/plugins/ObjectCacheServer/src/CacheAuthKeyStore.cs
index 5fc700b..5be0776 100644
--- a/plugins/ObjectCacheServer/src/CacheAuthKeyStore.cs
+++ b/plugins/ObjectCacheServer/src/CacheAuthKeyStore.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2023 Vaughn Nugent
+* Copyright (c) 2024 Vaughn Nugent
*
* Library: VNLib
* Package: ObjectCacheServer
@@ -34,16 +34,10 @@ using VNLib.Data.Caching.Extensions;
namespace VNLib.Data.Caching.ObjectCache.Server
{
- sealed record class CacheAuthKeyStore : ICacheAuthManager
+ sealed class CacheAuthKeyStore(PluginBase plugin) : ICacheAuthManager
{
- private readonly IAsyncLazy<ReadOnlyJsonWebKey> _clientPub;
- private readonly IAsyncLazy<ReadOnlyJsonWebKey> _cachePriv;
-
- public CacheAuthKeyStore(PluginBase plugin)
- {
- _clientPub = plugin.GetSecretAsync("client_public_key").ToLazy(r => r.GetJsonWebKey());
- _cachePriv = plugin.GetSecretAsync("cache_private_key").ToLazy(r => r.GetJsonWebKey());
- }
+ private readonly IAsyncLazy<ReadOnlyJsonWebKey> _clientPub = plugin.Secrets().GetSecretAsync("client_public_key").ToLazy(r => r.GetJsonWebKey());
+ private readonly IAsyncLazy<ReadOnlyJsonWebKey> _cachePriv = plugin.Secrets().GetSecretAsync("cache_private_key").ToLazy(r => r.GetJsonWebKey());
///<inheritdoc/>
public IReadOnlyDictionary<string, string?> GetJwtHeader()
diff --git a/plugins/ObjectCacheServer/src/CacheConstants.cs b/plugins/ObjectCacheServer/src/CacheConstants.cs
new file mode 100644
index 0000000..85f737d
--- /dev/null
+++ b/plugins/ObjectCacheServer/src/CacheConstants.cs
@@ -0,0 +1,107 @@
+/*
+* Copyright (c) 2024 Vaughn Nugent
+*
+* Library: VNLib
+* Package: ObjectCacheServer
+* File: CacheConstants.cs
+*
+* CacheConstants.cs is part of ObjectCacheServer which is
+* part of the larger VNLib collection of libraries and utilities.
+*
+* ObjectCacheServer is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* ObjectCacheServer is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+
+namespace VNLib.Data.Caching.ObjectCache.Server
+{
+ internal static class CacheConstants
+ {
+ /// <summary>
+ /// The default path for the VNCache well known endpoint (aka discovery endpoint)
+ /// </summary>
+ public const string DefaultWellKnownPath = "/.well-known/vncache";
+
+ /// <summary>
+ /// The maximum size of buffers for FBM messages sent between servers.
+ /// </summary>
+ public const int MaxSyncMessageSize = 12 * 1024;
+
+ /// <summary>
+ /// The maximum size of the change queue for the cache listener
+ /// </summary>
+ public const int CacheListenerChangeQueueSize = 10000;
+
+ /// <summary>
+ /// The time a client authorization token is valid for
+ /// </summary>
+ public static readonly TimeSpan ClientAuthTokenExpiration = TimeSpan.FromSeconds(30);
+
+ public static class LogScopes
+ {
+ /// <summary>
+ /// The log scope for the cache listener
+ /// </summary>
+ public const string BlobCacheListener = "CacheListener";
+
+ /// <summary>
+ /// The peer discovery log scope
+ /// </summary>
+ public const string PeerDiscovery = "DISC";
+
+ /// <summary>
+ /// The log scope for the replication FBM client debug log (if debugging is enabled)
+ /// </summary>
+ public const string ReplicationFbmDebug = "REPL-CLNT";
+
+ /// <summary>
+ /// The log scope for cache replication events
+ /// </summary>
+ public const string RepliactionManager = "REPL-MGR";
+
+ /// <summary>
+ /// The log scope for the cache listener change event queue
+ /// </summary>
+ public const string CacheListenerPubQueue = "QUEUE";
+
+ /// <summary>
+ /// The log scope for the cache connection websocket endpoint
+ /// </summary>
+ public const string ConnectionEndpoint = "CONEP";
+ }
+
+ public static class Delays
+ {
+ /// <summary>
+ /// The amount of startup delay before starting an initial peer discovery
+ /// </summary>
+ public static readonly TimeSpan InitialDiscovery = TimeSpan.FromSeconds(15);
+
+ /// <summary>
+ /// The amount of time to wait before retrying a failed resolve
+ /// of a well-known peers
+ /// </summary>
+ public static readonly TimeSpan WellKnownResolveFailed = TimeSpan.FromSeconds(20);
+
+ /// <summary>
+ /// The amount of time to wait when getting the value of a changed item from the cache
+ /// </summary>
+ /// <remarks>
+ /// When an item change was detected from another peer, the cache will wait this
+ /// amount of time to get the new value from the cache before timing out.
+ /// </remarks>
+ public static readonly TimeSpan CacheSyncGetItemTimeout = TimeSpan.FromSeconds(10);
+ }
+ }
+}
diff --git a/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs b/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs
index 0a1bb4d..92f0352 100644
--- a/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs
+++ b/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs
@@ -54,12 +54,6 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
internal sealed class CacheNodeReplicationMaanger : IAsyncBackgroundWork
{
- private const string LOG_SCOPE_NAME = "REPL";
- private const string FBM_LOG_SCOPE_NAME = "REPL-CLNT";
-
- private static readonly TimeSpan GetItemTimeout = TimeSpan.FromSeconds(10);
- private const int MAX_MESSAGE_SIZE = 12 * 1024;
-
private readonly PluginBase _plugin;
private readonly ILogProvider _log;
private readonly FBMClientFactory _clientFactory;
@@ -76,20 +70,20 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
//Init fbm config with fixed message size
FBMClientConfig clientConfig = FBMDataCacheExtensions.GetDefaultConfig(
_sysState.SharedCacheHeap,
- MAX_MESSAGE_SIZE,
- debugLog: plugin.IsDebug() ? plugin.Log.CreateScope(FBM_LOG_SCOPE_NAME) : null
+ CacheConstants.MaxSyncMessageSize,
+ debugLog: plugin.IsDebug() ? plugin.Log.CreateScope(CacheConstants.LogScopes.ReplicationFbmDebug) : null
);
//Init ws fallback factory and client factory
_clientFactory = new(
ref clientConfig,
new FBMFallbackClientWsFactory(),
- (int)_sysState.Configuration.MaxPeerConnections
+ (int)_sysState.ClusterConfig.MaxPeerConnections
);
_plugin = plugin;
_isDebug = plugin.IsDebug();
- _log = plugin.Log.CreateScope(LOG_SCOPE_NAME);
+ _log = plugin.Log.CreateScope(CacheConstants.LogScopes.RepliactionManager);
}
public async Task DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken)
@@ -109,7 +103,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
}
//Make sure we don't exceed the max connections
- if(_openConnections >= _sysState.Configuration.MaxPeerConnections)
+ if(_openConnections >= _sysState.ClusterConfig.MaxPeerConnections)
{
if (_isDebug)
{
@@ -146,14 +140,23 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
_log.Information("Node replication worker exited");
}
+ /*
+ * This method is called when a new peer has connected (or discovered) to establish a
+ * replication connection.
+ */
private async Task OnNewPeerDoWorkAsync(CacheNodeAdvertisment newPeer, ILogProvider log, CancellationToken exitToken)
{
ArgumentNullException.ThrowIfNull(newPeer);
-
- //Setup client
+
FBMClient client = _clientFactory.CreateClient();
- //Add peer to monitor
+ /*
+ * Notify discovery that we will be listening to this peer
+ *
+ * This exists so when a new discovery happens, the work loop will produce
+ * the difference of new peers to existing peers, and we can connect to them.
+ * Avoiding infinite connections to the same peer.
+ */
_sysState.PeerDiscovery.OnPeerListenerAttached(newPeer);
Interlocked.Increment(ref _openConnections);
@@ -163,7 +166,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
log.Information("Establishing replication connection to peer {server}...", newPeer.NodeId);
//Connect to the server
- await client.ConnectToCacheAsync(newPeer, _sysState.Configuration.Config, exitToken);
+ await client.ConnectToCacheAsync(newPeer, _sysState.NodeConfig, exitToken);
log.Information("Connected to {server}, starting queue listeners", newPeer.NodeId);
@@ -220,7 +223,15 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
}
catch (Exception ex)
{
- log.Warn("Lost connection to peer {h}\n {m}", newPeer.NodeId, ex);
+ //Avoid call stacks unless debug or higher logging levels
+ if (log.IsEnabled(LogLevel.Debug))
+ {
+ log.Warn("Lost connection to peer {h}\n {m}", newPeer.NodeId, ex);
+ }
+ else
+ {
+ log.Warn("Lost connection to peer {h}\n {m}", newPeer.NodeId, ex.Message);
+ }
}
finally
{
@@ -228,7 +239,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
client.Dispose();
- //Notify monitor of disconnect
+ //Notify monitor of disconnect to make it available again later
_sysState.PeerDiscovery.OnPeerListenerDetatched(newPeer);
}
}
@@ -289,7 +300,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
modRequest.WriteHeader(ObjectId, string.IsNullOrWhiteSpace(newId) ? objectId : newId);
//Make request
- using FBMResponse response = await client.SendAsync(modRequest, GetItemTimeout, cancellation);
+ using FBMResponse response = await client.SendAsync(modRequest, CacheConstants.Delays.CacheSyncGetItemTimeout, cancellation);
response.ThrowIfNotSet();
diff --git a/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs b/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs
index f22e1dd..b8ee9c8 100644
--- a/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs
+++ b/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs
@@ -40,21 +40,13 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
* This class is responsible for resolving and discovering peer nodes in the cluster network.
*/
- internal sealed class PeerDiscoveryManager(NodeConfig config, ILogProvider Log, bool IsDebug, bool HasWellKnown) : IAsyncBackgroundWork, ICachePeerAdapter
+ internal sealed class PeerDiscoveryManager(CacheNodeConfiguration config, ServerClusterConfig clusterConf, ILogProvider Log, bool IsDebug, bool HasWellKnown)
+ : IAsyncBackgroundWork, ICachePeerAdapter
{
- internal const string LOG_SCOPE_NAME = "DISC";
-
- /*
- * The initial discovery delay. This allows for the server to initialize before
- * starting the discovery process. This will probably be a shorter delay
- * than a usual discovery interval.
- */
- private static readonly TimeSpan InitialDelay = TimeSpan.FromSeconds(15);
- private static readonly TimeSpan WhenWellKnownResolveFailed = TimeSpan.FromSeconds(20);
private readonly List<CacheNodeAdvertisment> _connectedPeers = [];
private readonly CachePeerMonitor Monitor = new();
- private readonly VNCacheClusterManager clusterMan = new(config.Config);
+ private readonly VNCacheClusterManager clusterMan = new(config);
async Task IAsyncBackgroundWork.DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken)
{
@@ -67,12 +59,11 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
//Start the change listener
Task watcher = WatchForPeersAsync(exitToken);
- Log.Information("Node discovery worker started, waiting for {idel} to start initial discovery", InitialDelay);
+ Log.Information("Node discovery worker started, waiting for {idel} to start initial discovery", CacheConstants.Delays.InitialDiscovery);
try
- {
- //Wait for the initial delay
- await Task.Delay(InitialDelay, exitToken);
+ {
+ await Task.Delay(CacheConstants.Delays.InitialDiscovery, exitToken);
Log.Debug("Begining discovery loop");
@@ -87,19 +78,22 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
Log.Debug("Begining node discovery");
}
- //Resolve all known peers
+ /*
+ * On every loop we will need to resolve well-known servers incase they go down
+ * or change. There probably should be some more advanced logic and caching here.
+ */
CacheNodeAdvertisment[] wellKnown = await clusterMan.ResolveWellKnownAsync(exitToken);
wellKnownFailed = wellKnown.Length == 0;
//Use the monitor to get the initial peers
IEnumerable<CacheNodeAdvertisment> ads = GetMonitorAds();
- //Combine well-known with new connected peers
+ //Combine well-known peers that are currently connected to this server
CacheNodeAdvertisment[] allAds = ads.Union(wellKnown).ToArray();
if (allAds.Length > 0)
{
- //Discover all known nodes
+ //Build the discovery map from all the known nodes to find all known nodes in the entire cluster
await clusterMan.DiscoverNodesAsync(allAds, exitToken);
}
@@ -132,16 +126,16 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
{
if (IsDebug)
{
- Log.Debug("Well known node discovery failed, waiting for {idel} before retrying", WhenWellKnownResolveFailed);
+ Log.Debug("Well known node discovery failed, waiting for {idel} before retrying", CacheConstants.Delays.WellKnownResolveFailed);
}
//Wait for shorter duration
- await Task.Delay(WhenWellKnownResolveFailed, exitToken);
+ await Task.Delay(CacheConstants.Delays.WellKnownResolveFailed, exitToken);
}
else
{
//Delay the next discovery
- await Task.Delay(config.DiscoveryInterval, exitToken);
+ await Task.Delay(clusterConf.DiscoveryInterval, exitToken);
}
}
}
diff --git a/plugins/ObjectCacheServer/src/Endpoints/CacheNegotationManager.cs b/plugins/ObjectCacheServer/src/Endpoints/CacheNegotationManager.cs
index 48f4448..99433e1 100644
--- a/plugins/ObjectCacheServer/src/Endpoints/CacheNegotationManager.cs
+++ b/plugins/ObjectCacheServer/src/Endpoints/CacheNegotationManager.cs
@@ -62,14 +62,10 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
* client could use trial and error to find the servers buffer configuration.
*/
- private static readonly TimeSpan AuthTokenExpiration = TimeSpan.FromSeconds(30);
-
private readonly string AudienceLocalServerId = Guid.NewGuid().ToString("N");
private readonly ObjectCacheSystemState _sysState = plugin.GetOrCreateSingleton<ObjectCacheSystemState>();
- private NodeConfig NodeConfig => _sysState.Configuration;
-
private CacheMemoryConfiguration CacheConfig => _sysState.MemoryConfiguration;
public bool IsClientNegotiationValid(string authToken, out ClientNegotiationState state)
@@ -80,12 +76,12 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
using JsonWebToken jwt = JsonWebToken.Parse(authToken);
//verify signature for client
- if (NodeConfig.KeyStore.VerifyJwt(jwt, false))
+ if (_sysState.KeyStore.VerifyJwt(jwt, false))
{
//Validated as normal client
}
//May be signed by a cache server
- else if (NodeConfig.KeyStore.VerifyJwt(jwt, true))
+ else if (_sysState.KeyStore.VerifyJwt(jwt, true))
{
//Set peer and verified flag since the another cache server signed the request
state.IsPeer = true;
@@ -114,11 +110,11 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
//Verified, now we can create an auth message with a short expiration
JsonWebToken auth = new();
- auth.WriteHeader(NodeConfig.KeyStore.GetJwtHeader());
+ auth.WriteHeader(_sysState.KeyStore.GetJwtHeader());
auth.InitPayloadClaim()
.AddClaim("aud", AudienceLocalServerId)
.AddClaim("iat", now.ToUnixTimeSeconds())
- .AddClaim("exp", now.Add(AuthTokenExpiration).ToUnixTimeSeconds())
+ .AddClaim("exp", now.Add(CacheConstants.ClientAuthTokenExpiration).ToUnixTimeSeconds())
.AddClaim("nonce", RandomHash.GetRandomBase32(8))
.AddClaim("chl", state.Challenge!)
//Set the ispeer flag if the request was signed by a cache server
@@ -134,7 +130,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
.CommitClaims();
//Sign the auth message from our private key
- NodeConfig.KeyStore.SignJwt(auth);
+ _sysState.KeyStore.SignJwt(auth);
return auth;
}
@@ -150,7 +146,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
using JsonWebToken jwt = JsonWebToken.Parse(upgradeToken);
//verify signature against the cache public key, since this server must have signed it
- if (!NodeConfig.KeyStore.VerifyCachePeer(jwt))
+ if (!_sysState.KeyStore.VerifyCachePeer(jwt))
{
return false;
}
@@ -172,7 +168,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
}
//Check node ip address matches if required
- if (NodeConfig.VerifyIp)
+ if (_sysState.ClusterConfig.VerifyIp)
{
if (!doc.RootElement.TryGetProperty("ip", out JsonElement ipEl))
{
@@ -198,7 +194,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
}
//Verify token signature against a fellow cache public key
- return NodeConfig.KeyStore.VerifyUpgradeToken(tokenSignature, upgradeToken, isPeer);
+ return _sysState.KeyStore.VerifyUpgradeToken(tokenSignature, upgradeToken, isPeer);
}
}
}
diff --git a/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs b/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs
index d6b733c..8368d3a 100644
--- a/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs
+++ b/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs
@@ -53,15 +53,12 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
internal sealed class ConnectEndpoint : ResourceEndpointBase
{
- internal const string LOG_SCOPE_NAME = "CONEP";
-
-
private readonly ObjectCacheSystemState _sysState;
private PeerEventQueueManager PubSubManager => _sysState.PeerEventQueue;
private CachePeerMonitor Peers => _sysState.PeerMonitor;
private BlobCacheListener<IPeerEventQueue> Listener => _sysState.Listener;
- private NodeConfig NodeConfiguration => _sysState.Configuration;
+ private ServerClusterConfig ClusterConfiguration => _sysState.ClusterConfig;
private readonly CacheNegotationManager AuthManager;
@@ -89,7 +86,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
_sysState = plugin.GetOrCreateSingleton<ObjectCacheSystemState>();
//Init from config and create a new log scope
- InitPathAndLog(NodeConfiguration.ConnectPath, plugin.Log.CreateScope(LOG_SCOPE_NAME));
+ InitPathAndLog(ClusterConfiguration.ConnectPath, plugin.Log.CreateScope(CacheConstants.LogScopes.ConnectionEndpoint));
//Get the auth manager
AuthManager = plugin.GetOrCreateSingleton<CacheNegotationManager>();
@@ -158,7 +155,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
* but malicious clients could cache a bunch of tokens and use them
* later, exhausting resources.
*/
- if(_connectedClients >= NodeConfiguration.MaxConcurrentConnections)
+ if(_connectedClients >= ClusterConfiguration.MaxConcurrentConnections)
{
return VirtualClose(entity, HttpStatusCode.ServiceUnavailable);
}
@@ -187,7 +184,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
if (isPeer)
{
- discoveryAd = NodeConfiguration.KeyStore.VerifyPeerAdvertisment(optionalDiscovery);
+ discoveryAd = _sysState.KeyStore.VerifyPeerAdvertisment(optionalDiscovery);
}
WsUserState state;
diff --git a/plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs b/plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs
index 56fe8cd..8038b70 100644
--- a/plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs
+++ b/plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs
@@ -40,13 +40,11 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
{
internal sealed class PeerDiscoveryEndpoint : ResourceEndpointBase
{
- private readonly ObjectCacheSystemState SysState;
+ private readonly ObjectCacheSystemState _sysState;
- private CacheAuthKeyStore KeyStore => SysState.Configuration.KeyStore;
+ private CacheAuthKeyStore KeyStore => _sysState.KeyStore;
- private CachePeerMonitor PeerMonitor => SysState.PeerMonitor;
-
- private CacheNodeConfiguration NodeConfig => SysState.Configuration.Config;
+ private CachePeerMonitor PeerMonitor => _sysState.PeerMonitor;
///<inheritdoc/>
protected override ProtectionSettings EndpointProtectionSettings { get; } = new()
@@ -60,9 +58,9 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
public PeerDiscoveryEndpoint(PluginBase plugin)
{
- SysState = plugin.GetOrCreateSingleton<ObjectCacheSystemState>();
+ _sysState = plugin.GetOrCreateSingleton<ObjectCacheSystemState>();
- InitPathAndLog(SysState.Configuration.DiscoveryPath!, plugin.Log);
+ InitPathAndLog(_sysState.ClusterConfig.DiscoveryPath!, plugin.Log);
}
protected override VfReturnType Get(HttpEntity entity)
@@ -121,7 +119,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
response.WriteHeader(KeyStore.GetJwtHeader());
response.InitPayloadClaim()
- .AddClaim("iss", NodeConfig.NodeId)
+ .AddClaim("iss", _sysState.NodeConfig.NodeId)
//Audience is the requestor id
.AddClaim("sub", subject)
.AddClaim("iat", entity.RequestedTimeUtc.ToUnixTimeSeconds())
diff --git a/plugins/ObjectCacheServer/src/Endpoints/WellKnownEndpoint.cs b/plugins/ObjectCacheServer/src/Endpoints/WellKnownEndpoint.cs
index 04380c5..18855e3 100644
--- a/plugins/ObjectCacheServer/src/Endpoints/WellKnownEndpoint.cs
+++ b/plugins/ObjectCacheServer/src/Endpoints/WellKnownEndpoint.cs
@@ -59,13 +59,13 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
public WellKnownEndpoint(PluginBase plugin)
{
//Get the node config
- NodeConfig nodeConfig = plugin.GetOrCreateSingleton<ObjectCacheSystemState>().Configuration;
+ ObjectCacheSystemState conf = plugin.GetOrCreateSingleton<ObjectCacheSystemState>();
//serialize the config, discovery may not be enabled
- _advertisment = nodeConfig.Config.Advertisment;
- _keyStore = nodeConfig.KeyStore;
+ _advertisment = conf.NodeConfig.Advertisment;
+ _keyStore = conf.KeyStore;
- InitPathAndLog(nodeConfig.WellKnownPath, plugin.Log);
+ InitPathAndLog(conf.ClusterConfig.WellKnownPath, plugin.Log);
}
protected override VfReturnType Get(HttpEntity entity)
diff --git a/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs b/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs
index b970cee..42bd0c7 100644
--- a/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs
+++ b/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs
@@ -26,6 +26,7 @@ using System;
using System.Collections.Generic;
using VNLib.Plugins;
+using VNLib.Utils;
using VNLib.Utils.Logging;
using VNLib.Plugins.Extensions.Loading;
using VNLib.Plugins.Extensions.Loading.Routing;
@@ -39,15 +40,16 @@ namespace VNLib.Data.Caching.ObjectCache.Server
public sealed class ObjectCacheServerEntry : PluginBase
{
public override string PluginName => "ObjectCache.Service";
-
+
+ ObjectCacheSystemState? sysState;
protected override void OnLoad()
{
try
{
//Initialize the cache node builder
- ObjectCacheSystemState builder = this.GetOrCreateSingleton<ObjectCacheSystemState>();
- builder.Initialize();
+ sysState = this.GetOrCreateSingleton<ObjectCacheSystemState>();
+ sysState.Initialize();
//Route well-known endpoint
this.Route<WellKnownEndpoint>();
@@ -58,8 +60,8 @@ namespace VNLib.Data.Caching.ObjectCache.Server
//We must initialize the replication manager
_ = this.GetOrCreateSingleton<CacheNodeReplicationMaanger>();
- //Setup discovery endpoint
- if(!string.IsNullOrWhiteSpace(builder.Configuration.DiscoveryPath))
+ //Setup discovery endpoint only if the user enabled clustering
+ if(!string.IsNullOrWhiteSpace(sysState.ClusterConfig.DiscoveryPath))
{
this.Route<PeerDiscoveryEndpoint>();
}
@@ -79,7 +81,29 @@ namespace VNLib.Data.Caching.ObjectCache.Server
protected override void ProcessHostCommand(string cmd)
{
- throw new NotImplementedException();
+ if(string.IsNullOrWhiteSpace(cmd))
+ {
+ return;
+ }
+
+ ArgumentList al = new(cmd.Split(" "));
+
+ if(al.Count == 0)
+ {
+ Log.Warn("Invalid command");
+ return;
+ }
+
+ switch (al[0].ToLower(null))
+ {
+ case "memstats":
+ sysState?.LogMemoryStats();
+ break;
+
+ default:
+ Log.Warn("Invalid command");
+ break;
+ }
}
}
}
diff --git a/plugins/ObjectCacheServer/src/ObjectCacheSystemState.cs b/plugins/ObjectCacheServer/src/ObjectCacheSystemState.cs
index 6183956..970e832 100644
--- a/plugins/ObjectCacheServer/src/ObjectCacheSystemState.cs
+++ b/plugins/ObjectCacheServer/src/ObjectCacheSystemState.cs
@@ -37,14 +37,19 @@ using VNLib.Plugins.Extensions.Loading;
using VNLib.Data.Caching.Extensions.Clustering;
using VNLib.Data.Caching.ObjectCache.Server.Cache;
using VNLib.Data.Caching.ObjectCache.Server.Clustering;
+using System.Threading.Tasks;
+using System.Threading;
namespace VNLib.Data.Caching.ObjectCache.Server
{
+ /*
+ * The purpose of this class is to manage the state of the entire cache server.
+ * All configuration and state should be creatd and managed by this class. To make it
+ * easier to manage.
+ */
[ConfigurationName("cache")]
internal sealed class ObjectCacheSystemState(PluginBase plugin, IConfigScope config) : IDisposable
{
- const string LISTENER_LOG_SCOPE = "CacheListener";
-
public BlobCacheListener<IPeerEventQueue> Listener { get; private set; } = null!;
public ICacheStore InternalStore { get; private set; } = null!;
@@ -57,7 +62,17 @@ namespace VNLib.Data.Caching.ObjectCache.Server
/// <summary>
/// The plugin-wide, shared node configuration
/// </summary>
- public NodeConfig Configuration { get; } = plugin.GetOrCreateSingleton<NodeConfig>();
+ public ServerClusterConfig ClusterConfig { get; } = plugin.GetOrCreateSingleton<ServerClusterConfig>();
+
+ /// <summary>
+ /// The system wide cache authenticator
+ /// </summary>
+ public CacheAuthKeyStore KeyStore { get; } = new(plugin);
+
+ /// <summary>
+ /// The system cache node configuration
+ /// </summary>
+ public CacheNodeConfiguration NodeConfig { get; private set; }
/// <summary>
/// The peer discovery manager
@@ -76,6 +91,8 @@ namespace VNLib.Data.Caching.ObjectCache.Server
/// </summary>
public PeerEventQueueManager PeerEventQueue { get; private set; }
+ private ICacheMemoryManagerFactory _cacheMemManager;
+
void IDisposable.Dispose()
{
SharedCacheHeap.Dispose();
@@ -104,11 +121,15 @@ namespace VNLib.Data.Caching.ObjectCache.Server
new TrackedHeapWrapper(MemoryUtil.InitializeNewHeapForProcess(), true)
: MemoryUtil.InitializeNewHeapForProcess();
+ //Load node configuration first
+ (NodeConfig = ClusterConfig.BuildNodeConfig())
+ .WithAuthenticator(KeyStore); //Also pass the key store to the node config
+
ConfigurePeerDiscovery();
ConfigureCacheListener();
- PeerEventQueue = new(plugin, Configuration);
+ PeerEventQueue = new(plugin, ClusterConfig);
}
private void ConfigurePeerDiscovery()
@@ -117,15 +138,16 @@ namespace VNLib.Data.Caching.ObjectCache.Server
IConfigScope? config = plugin.TryGetConfig("known_peers");
string[] kownPeers = config?.Deserialze<string[]>() ?? [];
- ILogProvider discLogger = plugin.Log.CreateScope(PeerDiscoveryManager.LOG_SCOPE_NAME);
+ ILogProvider discLogger = plugin.Log.CreateScope(CacheConstants.LogScopes.PeerDiscovery);
- Configuration.Config.WithInitialPeers(kownPeers.Select(static s => new Uri(s)))
+ NodeConfig.WithInitialPeers(kownPeers.Select(static s => new Uri(s)))
.WithErrorHandler(new ErrorHandler(discLogger));
discLogger.Information("Inital peer nodes: {nodes}", kownPeers);
PeerDiscovery = new PeerDiscoveryManager(
- Configuration,
+ NodeConfig,
+ ClusterConfig,
discLogger,
plugin.IsDebug(),
kownPeers.Length > 0
@@ -152,11 +174,13 @@ namespace VNLib.Data.Caching.ObjectCache.Server
manager = plugin.CreateServiceExternal<ICacheMemoryManagerFactory>(MemoryConfiguration.ExternLibPath);
}
+ _cacheMemManager = manager;
+
//Endpoint only allows for a single reader
Listener = new(
plugin.LoadMemoryCacheSystem(config, manager, MemoryConfiguration),
- plugin.GetOrCreateSingleton<CacheListenerPubQueue>(),
- plugin.Log.CreateScope(LISTENER_LOG_SCOPE),
+ new CacheListenerPubQueue(plugin, PeerEventQueue),
+ plugin.Log.CreateScope(CacheConstants.LogScopes.BlobCacheListener),
new SharedHeapFBMMemoryManager(SharedCacheHeap)
);
@@ -189,27 +213,105 @@ Cache Configuration:
);
}
+ public void LogMemoryStats()
+ {
+ if(SharedCacheHeap is TrackedHeapWrapper thw)
+ {
+ const string shStatTemplate =
+@" VNCache shared heap stats:
+ Current: {cur}kB
+ Blocks: {blks}
+ Max size: {max}kB
+";
+ HeapStatistics stats = thw.GetCurrentStats();
+ plugin.Log.Debug(
+ shStatTemplate,
+ stats.AllocatedBytes / 1024,
+ stats.AllocatedBlocks,
+ stats.MaxHeapSize / 1024
+ );
+
+ }
+
+ //Also print logs for the bucket local managers if they are enabled
+ if(_cacheMemManager is BucketLocalManagerFactory blmf)
+ {
+ blmf.LogHeapStats();
+ }
+ }
+
private sealed class ErrorHandler(ILogProvider Logger) : ICacheDiscoveryErrorHandler
{
public void OnDiscoveryError(CacheNodeAdvertisment errorNode, Exception ex)
+ => LogError(ex, errorNode.NodeId, errorNode.ConnectEndpoint);
+
+ public void OnDiscoveryError(Uri errorAddress, Exception ex)
+ => LogError(ex, null, errorAddress);
+
+ private void LogError(Exception ex, string? nodId, Uri? connectAddress)
{
+ //For logging purposes, use the node id if its available, otherwise use the address
+ if(nodId == null && connectAddress != null)
+ {
+ nodId = connectAddress.ToString();
+ }
+
if (ex is HttpRequestException hre)
{
if (hre.InnerException is SocketException se)
{
//transport failed
- Logger.Warn("Failed to connect to server {serv} because {err}", errorNode, se.Message);
+ Logger.Warn("Failed to connect to server {serv} because {err}", nodId, se.Message);
}
else
{
- Logger.Error("Failed to connect to node {n}\n{err}", errorNode, hre);
+ Logger.Error("Failed to connect to node {n}\n{err}", nodId, hre);
}
}
+ if (ex is OperationCanceledException)
+ {
+ Logger.Error("Failed to discover nodes from nodeid {nid}, because the operation was canceled");
+ }
+ else if (ex is TimeoutException)
+ {
+ //Only log exception stack when in debug logging mode
+ Logger.Warn("Failed to discover nodes from nodeid {nid}, because a timeout occured", nodId);
+ }
else
{
- Logger.Error("Failed to discover nodes from nodeid {nid}, with error\n{err}", errorNode, ex);
+ //Only log exception stack when in debug logging mode
+ if (Logger.IsEnabled(LogLevel.Debug))
+ {
+ Logger.Error("Failed to discover nodes from nodeid {nid}, with error\n{err}", nodId, ex);
+ }
+ else
+ {
+ Logger.Error("Failed to discover nodes from nodeid {nid}, with error: {err}", nodId, ex.Message);
+ }
}
}
}
+
+ internal sealed class CacheStore(IBlobCacheTable table) : ICacheStore
+ {
+
+ ///<inheritdoc/>
+ ValueTask ICacheStore.AddOrUpdateBlobAsync<T>(string objectId, string? alternateId, ObjectDataGet<T> bodyData, T state, CancellationToken token)
+ {
+ return table.AddOrUpdateObjectAsync(objectId, alternateId, bodyData, state, default, token);
+ }
+
+ ///<inheritdoc/>
+ void ICacheStore.Clear()
+ {
+ throw new NotImplementedException();
+ }
+
+ ///<inheritdoc/>
+ ValueTask<bool> ICacheStore.DeleteItemAsync(string id, CancellationToken token)
+ {
+ return table.DeleteObjectAsync(id, token);
+ }
+ }
}
}
diff --git a/plugins/ObjectCacheServer/src/NodeConfig.cs b/plugins/ObjectCacheServer/src/ServerClusterConfig.cs
index 4dd9f4a..8f81ba6 100644
--- a/plugins/ObjectCacheServer/src/NodeConfig.cs
+++ b/plugins/ObjectCacheServer/src/ServerClusterConfig.cs
@@ -3,9 +3,9 @@
*
* Library: VNLib
* Package: ObjectCacheServer
-* File: NodeConfig.cs
+* File: ServerClusterConfig.cs
*
-* NodeConfig.cs is part of ObjectCacheServer which is part of the larger
+* ServerClusterConfig.cs is part of ObjectCacheServer which is part of the larger
* VNLib collection of libraries and utilities.
*
* ObjectCacheServer is free software: you can redistribute it and/or modify
@@ -34,37 +34,30 @@ using VNLib.Utils.Extensions;
using VNLib.Plugins.Extensions.Loading;
using VNLib.Data.Caching.Extensions.Clustering;
-
namespace VNLib.Data.Caching.ObjectCache.Server
{
[ConfigurationName("cluster")]
- internal sealed class NodeConfig
+ internal sealed class ServerClusterConfig(PluginBase plugin, IConfigScope config)
{
- //Default path for the well known endpoint
- const string DefaultPath = "/.well-known/vncache";
-
- public CacheNodeConfiguration Config { get; }
-
- public CacheAuthKeyStore KeyStore { get; }
+ public TimeSpan DiscoveryInterval { get; } = config.GetRequiredProperty("discovery_interval_sec", p => p.GetTimeSpan(TimeParseType.Seconds));
- public TimeSpan DiscoveryInterval { get; }
+ public TimeSpan EventQueuePurgeInterval { get; } = config.GetRequiredProperty("queue_purge_interval_sec", p => p.GetTimeSpan(TimeParseType.Seconds));
- public TimeSpan EventQueuePurgeInterval { get; }
+ public int MaxQueueDepth { get; } = (int)config.GetRequiredProperty("max_queue_depth", p => p.GetUInt32());
- public int MaxQueueDepth { get; }
+ public string? DiscoveryPath { get; } = config.GetValueOrDefault(CacheConfigTemplate, p => p.GetString(), null);
- public string? DiscoveryPath { get; }
+ public string ConnectPath { get; } = config.GetRequiredProperty("connect_path", p => p.GetString()!);
- public string ConnectPath { get; }
+ public string WellKnownPath { get; } = config.GetValueOrDefault("well_known_path", p => p.GetString()!, CacheConstants.DefaultWellKnownPath)
+ ?? CacheConstants.DefaultWellKnownPath;
- public string WellKnownPath { get; }
-
- public bool VerifyIp { get; }
+ public bool VerifyIp { get; } = config.GetRequiredProperty("verify_ip", p => p.GetBoolean());
/// <summary>
/// The maximum number of peer connections to allow
/// </summary>
- public uint MaxPeerConnections { get; } = 10;
+ public uint MaxPeerConnections { get; } = config.GetValueOrDefault("max_peers", p => p.GetUInt32(), 10u);
/// <summary>
/// The maxium number of concurrent client connections to allow
@@ -72,8 +65,25 @@ namespace VNLib.Data.Caching.ObjectCache.Server
/// </summary>
public uint MaxConcurrentConnections { get; }
- public NodeConfig(PluginBase plugin, IConfigScope config)
- {
+ const string CacheConfigTemplate =
+@"
+Cluster Configuration:
+ Node Id: {id}
+ TlsEndabled: {tls}
+ Verify Ip: {vi}
+ Well-Known: {wk}
+ Cache Endpoint: {ep}
+ Discovery Endpoint: {dep}
+ Discovery Interval: {di}
+ Max Peer Connections: {mpc}
+ Max Queue Depth: {mqd}
+ Event Queue Purge Interval: {eqpi}
+";
+
+ internal CacheNodeConfiguration BuildNodeConfig()
+ {
+ CacheNodeConfiguration conf = new();
+
//Get the port of the primary webserver
int port;
bool usingTls;
@@ -94,58 +104,24 @@ namespace VNLib.Data.Caching.ObjectCache.Server
//Server id is just dns name for now
string nodeId = $"{hostname}:{port}";
-
- //Init key store
- KeyStore = new(plugin);
-
- DiscoveryInterval = config.GetRequiredProperty("discovery_interval_sec", p => p.GetTimeSpan(TimeParseType.Seconds));
- EventQueuePurgeInterval = config.GetRequiredProperty("queue_purge_interval_sec", p => p.GetTimeSpan(TimeParseType.Seconds));
- MaxQueueDepth = (int)config.GetRequiredProperty("max_queue_depth", p => p.GetUInt32());
- ConnectPath = config.GetRequiredProperty("connect_path", p => p.GetString()!);
- VerifyIp = config.GetRequiredProperty("verify_ip", p => p.GetBoolean());
- WellKnownPath = config.GetValueOrDefault("well_known_path", p => p.GetString()!, DefaultPath);
- MaxPeerConnections = config.GetValueOrDefault("max_peers", p => p.GetUInt32(), 10u);
Uri connectEp = BuildUri(usingTls, hostname, port, ConnectPath);
Uri? discoveryEp = null;
-
- //Setup cache node config
- (Config = new())
- .WithCacheEndpoint(connectEp)
+
+
+ conf.WithCacheEndpoint(connectEp)
.WithNodeId(nodeId)
- .WithAuthenticator(KeyStore)
.WithTls(usingTls);
//Get the discovery path (optional)
- if (config.TryGetValue("discovery_path", out JsonElement discoveryPathEl))
+ if (!string.IsNullOrWhiteSpace(DiscoveryPath))
{
- DiscoveryPath = discoveryPathEl.GetString();
-
- //Enable advertisment if a discovery path is present
- if (!string.IsNullOrEmpty(DiscoveryPath))
- {
- //Build the discovery endpoint, it must be an absolute uri
- discoveryEp = BuildUri(usingTls, hostname, port, DiscoveryPath);
- Config.EnableAdvertisment(discoveryEp);
- }
+ //Build the discovery endpoint, it must be an absolute uri
+ discoveryEp = BuildUri(usingTls, hostname, port, DiscoveryPath);
+ conf.EnableAdvertisment(discoveryEp);
}
- const string CacheConfigTemplate =
-@"
-Cluster Configuration:
- Node Id: {id}
- TlsEndabled: {tls}
- Verify Ip: {vi}
- Well-Known: {wk}
- Cache Endpoint: {ep}
- Discovery Endpoint: {dep}
- Discovery Interval: {di}
- Max Peer Connections: {mpc}
- Max Queue Depth: {mqd}
- Event Queue Purge Interval: {eqpi}
-";
-
- //log the config
+ //print the cluster configuration to the log
plugin.Log.Information(CacheConfigTemplate,
nodeId,
usingTls,
@@ -158,6 +134,8 @@ Cluster Configuration:
MaxQueueDepth,
EventQueuePurgeInterval
);
+
+ return conf;
}
private static Uri BuildUri(bool tls, string host, int port, string path)
diff --git a/plugins/VNLib.Data.Caching.Providers.VNCache/src/FBMCacheClient.cs b/plugins/VNLib.Data.Caching.Providers.VNCache/src/FBMCacheClient.cs
index a8f86f9..e87d430 100644
--- a/plugins/VNLib.Data.Caching.Providers.VNCache/src/FBMCacheClient.cs
+++ b/plugins/VNLib.Data.Caching.Providers.VNCache/src/FBMCacheClient.cs
@@ -404,9 +404,42 @@ namespace VNLib.Data.Caching.Providers.VNCache
private sealed record class DiscoveryErrHAndler(ILogProvider Logger) : ICacheDiscoveryErrorHandler
{
- public void OnDiscoveryError(CacheNodeAdvertisment errorNode, Exception ex)
+ public void OnDiscoveryError(CacheNodeAdvertisment errorNode, Exception ex)
+ => OnDiscoveryError(errorNode, ex);
+
+ public void OnDiscoveryError(Uri errorAddress, Exception ex)
+ => OnDiscoveryError(ex, null, errorAddress);
+
+ public void OnDiscoveryError(Exception ex, CacheNodeAdvertisment? errorNode, Uri? address)
{
- Logger.Error("Failed to discover nodes from server {s} cause:\n{err}", errorNode?.NodeId, ex);
+ string node = errorNode?.NodeId ?? address?.ToString() ?? "unknown";
+
+ if(ex is HttpRequestException he)
+ {
+ if(he.InnerException is SocketException se)
+ {
+ LogErrorException(se);
+ return;
+ }
+
+ LogErrorException(he);
+ return;
+ }
+
+ LogErrorException(ex);
+ return;
+
+ void LogErrorException(Exception ex)
+ {
+ if(Logger.IsEnabled(LogLevel.Debug))
+ {
+ Logger.Error("Failed to discover nodes from server {s} cause:\n{err}", node, ex);
+ }
+ else
+ {
+ Logger.Error("Failed to discover nodes from server {s} cause:\n{err}", node, ex.Message);
+ }
+ }
}
}
}