diff options
Diffstat (limited to 'plugins/ObjectCacheServer')
15 files changed, 381 insertions, 247 deletions
diff --git a/plugins/ObjectCacheServer/src/BucketLocalManagerFactory.cs b/plugins/ObjectCacheServer/src/BucketLocalManagerFactory.cs index 944aa4b..6f733ed 100644 --- a/plugins/ObjectCacheServer/src/BucketLocalManagerFactory.cs +++ b/plugins/ObjectCacheServer/src/BucketLocalManagerFactory.cs @@ -94,6 +94,15 @@ namespace VNLib.Data.Caching.ObjectCache.Server _statsLogger = plugin.Log.CreateScope("Cache MemStats"); } + protected override void Free() + { + //Free heaps on exit + foreach (BucketLocalManager manager in _managers) + { + manager.Heap.Dispose(); + } + } + public void LogHeapStats() { //If tracking is not enabled, the heap instances stored by the managers will not be tracked, and the cast in the code below will fail @@ -109,16 +118,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server }).ToArray(); - _statsLogger.Debug("Priting memory statistics for cache memory manager: {hm}\n{stats}", GetHashCode(), statsPerHeap); - } - - protected override void Free() - { - //Free heaps on exit - foreach (BucketLocalManager manager in _managers) - { - manager.Heap.Dispose(); - } + _statsLogger.Debug("Memory statistics for cache memory manager: {hm}\n{stats}", GetHashCode(), statsPerHeap); } /* diff --git a/plugins/ObjectCacheServer/src/Cache/CacheListenerPubQueue.cs b/plugins/ObjectCacheServer/src/Cache/CacheListenerPubQueue.cs index 16fda39..aef0255 100644 --- a/plugins/ObjectCacheServer/src/Cache/CacheListenerPubQueue.cs +++ b/plugins/ObjectCacheServer/src/Cache/CacheListenerPubQueue.cs @@ -45,39 +45,33 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache internal sealed class CacheListenerPubQueue : ICacheListenerEventQueue<IPeerEventQueue>, IAsyncBackgroundWork { - private const int MAX_LOCAL_QUEUE_ITEMS = 10000; - private const string LOG_SCOPE_NAME = "QUEUE"; - private readonly AsyncQueue<ChangeEvent> _listenerQueue; private readonly ILogProvider _logProvider; private readonly PeerEventQueueManager _queueManager; - public CacheListenerPubQueue(PluginBase plugin) + public CacheListenerPubQueue(PluginBase plugin, PeerEventQueueManager queueMan) { - _queueManager = plugin.GetOrCreateSingleton<PeerEventQueueManager>(); - _logProvider = plugin.Log.CreateScope(LOG_SCOPE_NAME); + _queueManager = queueMan; + _logProvider = plugin.Log.CreateScope(CacheConstants.LogScopes.CacheListenerPubQueue); //Init local queue to store published events - _listenerQueue = new(new BoundedChannelOptions(MAX_LOCAL_QUEUE_ITEMS) + _listenerQueue = new(new BoundedChannelOptions(CacheConstants.CacheListenerChangeQueueSize) { AllowSynchronousContinuations = true, FullMode = BoundedChannelFullMode.DropOldest, - SingleReader = true, + SingleReader = true, //Always a singe thread reading events SingleWriter = false, }); } ///<inheritdoc/> - async Task IAsyncBackgroundWork.DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken) + async Task IAsyncBackgroundWork.DoWorkAsync(ILogProvider _, CancellationToken exitToken) { const int accumulatorSize = 64; - //Create scope - pluginLog = pluginLog.CreateScope(LOG_SCOPE_NAME); - try { - pluginLog.Debug("Change queue worker listening for local cache changes"); + _logProvider.Debug("Change queue worker listening for local cache changes"); //Accumulator for events ChangeEvent[] accumulator = new ChangeEvent[accumulatorSize]; @@ -105,7 +99,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache catch (OperationCanceledException) { //Normal exit - pluginLog.Debug("Change queue listener worker exited"); + _logProvider.Debug("Change queue listener worker exited"); } } diff --git a/plugins/ObjectCacheServer/src/Cache/CacheStore.cs b/plugins/ObjectCacheServer/src/Cache/CacheStore.cs deleted file mode 100644 index 81f4843..0000000 --- a/plugins/ObjectCacheServer/src/Cache/CacheStore.cs +++ /dev/null @@ -1,61 +0,0 @@ -/* -* Copyright (c) 2024 Vaughn Nugent -* -* Library: VNLib -* Package: ObjectCacheServer -* File: CacheStore.cs -* -* CacheStore.cs is part of ObjectCacheServer which is part of the larger -* VNLib collection of libraries and utilities. -* -* ObjectCacheServer is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License as -* published by the Free Software Foundation, either version 3 of the -* License, or (at your option) any later version. -* -* ObjectCacheServer is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see https://www.gnu.org/licenses/. -*/ - -using System; -using System.Threading; -using System.Threading.Tasks; - -using VNLib.Plugins.Extensions.Loading; - -namespace VNLib.Data.Caching.ObjectCache.Server.Cache -{ - - /* - * Implements the blob cache store, which is an abstraction around the blob cache listener. - * This allows for publishing local events (say from other nodes) to keep caches in sync. - */ - - [ConfigurationName("cache")] - internal sealed class CacheStore(IBlobCacheTable table) : ICacheStore - { - - ///<inheritdoc/> - ValueTask ICacheStore.AddOrUpdateBlobAsync<T>(string objectId, string? alternateId, ObjectDataGet<T> bodyData, T state, CancellationToken token) - { - return table.AddOrUpdateObjectAsync(objectId, alternateId, bodyData, state, default, token); - } - - ///<inheritdoc/> - void ICacheStore.Clear() - { - throw new NotImplementedException(); - } - - ///<inheritdoc/> - ValueTask<bool> ICacheStore.DeleteItemAsync(string id, CancellationToken token) - { - return table.DeleteObjectAsync(id, token); - } - } -} diff --git a/plugins/ObjectCacheServer/src/Cache/PeerEventQueueManager.cs b/plugins/ObjectCacheServer/src/Cache/PeerEventQueueManager.cs index 12cf37a..4b76a9b 100644 --- a/plugins/ObjectCacheServer/src/Cache/PeerEventQueueManager.cs +++ b/plugins/ObjectCacheServer/src/Cache/PeerEventQueueManager.cs @@ -48,7 +48,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache private readonly object StoreLock = new(); private readonly Dictionary<string, PeerEventListenerQueue> QueueStore = new(StringComparer.OrdinalIgnoreCase); - public PeerEventQueueManager(PluginBase plugin, NodeConfig config) + public PeerEventQueueManager(PluginBase plugin, ServerClusterConfig config) { MaxQueueDepth = config.MaxQueueDepth; diff --git a/plugins/ObjectCacheServer/src/CacheAuthKeyStore.cs b/plugins/ObjectCacheServer/src/CacheAuthKeyStore.cs index 5fc700b..5be0776 100644 --- a/plugins/ObjectCacheServer/src/CacheAuthKeyStore.cs +++ b/plugins/ObjectCacheServer/src/CacheAuthKeyStore.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2023 Vaughn Nugent +* Copyright (c) 2024 Vaughn Nugent * * Library: VNLib * Package: ObjectCacheServer @@ -34,16 +34,10 @@ using VNLib.Data.Caching.Extensions; namespace VNLib.Data.Caching.ObjectCache.Server { - sealed record class CacheAuthKeyStore : ICacheAuthManager + sealed class CacheAuthKeyStore(PluginBase plugin) : ICacheAuthManager { - private readonly IAsyncLazy<ReadOnlyJsonWebKey> _clientPub; - private readonly IAsyncLazy<ReadOnlyJsonWebKey> _cachePriv; - - public CacheAuthKeyStore(PluginBase plugin) - { - _clientPub = plugin.GetSecretAsync("client_public_key").ToLazy(r => r.GetJsonWebKey()); - _cachePriv = plugin.GetSecretAsync("cache_private_key").ToLazy(r => r.GetJsonWebKey()); - } + private readonly IAsyncLazy<ReadOnlyJsonWebKey> _clientPub = plugin.Secrets().GetSecretAsync("client_public_key").ToLazy(r => r.GetJsonWebKey()); + private readonly IAsyncLazy<ReadOnlyJsonWebKey> _cachePriv = plugin.Secrets().GetSecretAsync("cache_private_key").ToLazy(r => r.GetJsonWebKey()); ///<inheritdoc/> public IReadOnlyDictionary<string, string?> GetJwtHeader() diff --git a/plugins/ObjectCacheServer/src/CacheConstants.cs b/plugins/ObjectCacheServer/src/CacheConstants.cs new file mode 100644 index 0000000..85f737d --- /dev/null +++ b/plugins/ObjectCacheServer/src/CacheConstants.cs @@ -0,0 +1,107 @@ +/* +* Copyright (c) 2024 Vaughn Nugent +* +* Library: VNLib +* Package: ObjectCacheServer +* File: CacheConstants.cs +* +* CacheConstants.cs is part of ObjectCacheServer which is +* part of the larger VNLib collection of libraries and utilities. +* +* ObjectCacheServer is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* ObjectCacheServer is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; + +namespace VNLib.Data.Caching.ObjectCache.Server +{ + internal static class CacheConstants + { + /// <summary> + /// The default path for the VNCache well known endpoint (aka discovery endpoint) + /// </summary> + public const string DefaultWellKnownPath = "/.well-known/vncache"; + + /// <summary> + /// The maximum size of buffers for FBM messages sent between servers. + /// </summary> + public const int MaxSyncMessageSize = 12 * 1024; + + /// <summary> + /// The maximum size of the change queue for the cache listener + /// </summary> + public const int CacheListenerChangeQueueSize = 10000; + + /// <summary> + /// The time a client authorization token is valid for + /// </summary> + public static readonly TimeSpan ClientAuthTokenExpiration = TimeSpan.FromSeconds(30); + + public static class LogScopes + { + /// <summary> + /// The log scope for the cache listener + /// </summary> + public const string BlobCacheListener = "CacheListener"; + + /// <summary> + /// The peer discovery log scope + /// </summary> + public const string PeerDiscovery = "DISC"; + + /// <summary> + /// The log scope for the replication FBM client debug log (if debugging is enabled) + /// </summary> + public const string ReplicationFbmDebug = "REPL-CLNT"; + + /// <summary> + /// The log scope for cache replication events + /// </summary> + public const string RepliactionManager = "REPL-MGR"; + + /// <summary> + /// The log scope for the cache listener change event queue + /// </summary> + public const string CacheListenerPubQueue = "QUEUE"; + + /// <summary> + /// The log scope for the cache connection websocket endpoint + /// </summary> + public const string ConnectionEndpoint = "CONEP"; + } + + public static class Delays + { + /// <summary> + /// The amount of startup delay before starting an initial peer discovery + /// </summary> + public static readonly TimeSpan InitialDiscovery = TimeSpan.FromSeconds(15); + + /// <summary> + /// The amount of time to wait before retrying a failed resolve + /// of a well-known peers + /// </summary> + public static readonly TimeSpan WellKnownResolveFailed = TimeSpan.FromSeconds(20); + + /// <summary> + /// The amount of time to wait when getting the value of a changed item from the cache + /// </summary> + /// <remarks> + /// When an item change was detected from another peer, the cache will wait this + /// amount of time to get the new value from the cache before timing out. + /// </remarks> + public static readonly TimeSpan CacheSyncGetItemTimeout = TimeSpan.FromSeconds(10); + } + } +} diff --git a/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs b/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs index 0a1bb4d..92f0352 100644 --- a/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs +++ b/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs @@ -54,12 +54,6 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering internal sealed class CacheNodeReplicationMaanger : IAsyncBackgroundWork { - private const string LOG_SCOPE_NAME = "REPL"; - private const string FBM_LOG_SCOPE_NAME = "REPL-CLNT"; - - private static readonly TimeSpan GetItemTimeout = TimeSpan.FromSeconds(10); - private const int MAX_MESSAGE_SIZE = 12 * 1024; - private readonly PluginBase _plugin; private readonly ILogProvider _log; private readonly FBMClientFactory _clientFactory; @@ -76,20 +70,20 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering //Init fbm config with fixed message size FBMClientConfig clientConfig = FBMDataCacheExtensions.GetDefaultConfig( _sysState.SharedCacheHeap, - MAX_MESSAGE_SIZE, - debugLog: plugin.IsDebug() ? plugin.Log.CreateScope(FBM_LOG_SCOPE_NAME) : null + CacheConstants.MaxSyncMessageSize, + debugLog: plugin.IsDebug() ? plugin.Log.CreateScope(CacheConstants.LogScopes.ReplicationFbmDebug) : null ); //Init ws fallback factory and client factory _clientFactory = new( ref clientConfig, new FBMFallbackClientWsFactory(), - (int)_sysState.Configuration.MaxPeerConnections + (int)_sysState.ClusterConfig.MaxPeerConnections ); _plugin = plugin; _isDebug = plugin.IsDebug(); - _log = plugin.Log.CreateScope(LOG_SCOPE_NAME); + _log = plugin.Log.CreateScope(CacheConstants.LogScopes.RepliactionManager); } public async Task DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken) @@ -109,7 +103,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering } //Make sure we don't exceed the max connections - if(_openConnections >= _sysState.Configuration.MaxPeerConnections) + if(_openConnections >= _sysState.ClusterConfig.MaxPeerConnections) { if (_isDebug) { @@ -146,14 +140,23 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering _log.Information("Node replication worker exited"); } + /* + * This method is called when a new peer has connected (or discovered) to establish a + * replication connection. + */ private async Task OnNewPeerDoWorkAsync(CacheNodeAdvertisment newPeer, ILogProvider log, CancellationToken exitToken) { ArgumentNullException.ThrowIfNull(newPeer); - - //Setup client + FBMClient client = _clientFactory.CreateClient(); - //Add peer to monitor + /* + * Notify discovery that we will be listening to this peer + * + * This exists so when a new discovery happens, the work loop will produce + * the difference of new peers to existing peers, and we can connect to them. + * Avoiding infinite connections to the same peer. + */ _sysState.PeerDiscovery.OnPeerListenerAttached(newPeer); Interlocked.Increment(ref _openConnections); @@ -163,7 +166,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering log.Information("Establishing replication connection to peer {server}...", newPeer.NodeId); //Connect to the server - await client.ConnectToCacheAsync(newPeer, _sysState.Configuration.Config, exitToken); + await client.ConnectToCacheAsync(newPeer, _sysState.NodeConfig, exitToken); log.Information("Connected to {server}, starting queue listeners", newPeer.NodeId); @@ -220,7 +223,15 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering } catch (Exception ex) { - log.Warn("Lost connection to peer {h}\n {m}", newPeer.NodeId, ex); + //Avoid call stacks unless debug or higher logging levels + if (log.IsEnabled(LogLevel.Debug)) + { + log.Warn("Lost connection to peer {h}\n {m}", newPeer.NodeId, ex); + } + else + { + log.Warn("Lost connection to peer {h}\n {m}", newPeer.NodeId, ex.Message); + } } finally { @@ -228,7 +239,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering client.Dispose(); - //Notify monitor of disconnect + //Notify monitor of disconnect to make it available again later _sysState.PeerDiscovery.OnPeerListenerDetatched(newPeer); } } @@ -289,7 +300,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering modRequest.WriteHeader(ObjectId, string.IsNullOrWhiteSpace(newId) ? objectId : newId); //Make request - using FBMResponse response = await client.SendAsync(modRequest, GetItemTimeout, cancellation); + using FBMResponse response = await client.SendAsync(modRequest, CacheConstants.Delays.CacheSyncGetItemTimeout, cancellation); response.ThrowIfNotSet(); diff --git a/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs b/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs index f22e1dd..b8ee9c8 100644 --- a/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs +++ b/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs @@ -40,21 +40,13 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering * This class is responsible for resolving and discovering peer nodes in the cluster network. */ - internal sealed class PeerDiscoveryManager(NodeConfig config, ILogProvider Log, bool IsDebug, bool HasWellKnown) : IAsyncBackgroundWork, ICachePeerAdapter + internal sealed class PeerDiscoveryManager(CacheNodeConfiguration config, ServerClusterConfig clusterConf, ILogProvider Log, bool IsDebug, bool HasWellKnown) + : IAsyncBackgroundWork, ICachePeerAdapter { - internal const string LOG_SCOPE_NAME = "DISC"; - - /* - * The initial discovery delay. This allows for the server to initialize before - * starting the discovery process. This will probably be a shorter delay - * than a usual discovery interval. - */ - private static readonly TimeSpan InitialDelay = TimeSpan.FromSeconds(15); - private static readonly TimeSpan WhenWellKnownResolveFailed = TimeSpan.FromSeconds(20); private readonly List<CacheNodeAdvertisment> _connectedPeers = []; private readonly CachePeerMonitor Monitor = new(); - private readonly VNCacheClusterManager clusterMan = new(config.Config); + private readonly VNCacheClusterManager clusterMan = new(config); async Task IAsyncBackgroundWork.DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken) { @@ -67,12 +59,11 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering //Start the change listener Task watcher = WatchForPeersAsync(exitToken); - Log.Information("Node discovery worker started, waiting for {idel} to start initial discovery", InitialDelay); + Log.Information("Node discovery worker started, waiting for {idel} to start initial discovery", CacheConstants.Delays.InitialDiscovery); try - { - //Wait for the initial delay - await Task.Delay(InitialDelay, exitToken); + { + await Task.Delay(CacheConstants.Delays.InitialDiscovery, exitToken); Log.Debug("Begining discovery loop"); @@ -87,19 +78,22 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering Log.Debug("Begining node discovery"); } - //Resolve all known peers + /* + * On every loop we will need to resolve well-known servers incase they go down + * or change. There probably should be some more advanced logic and caching here. + */ CacheNodeAdvertisment[] wellKnown = await clusterMan.ResolveWellKnownAsync(exitToken); wellKnownFailed = wellKnown.Length == 0; //Use the monitor to get the initial peers IEnumerable<CacheNodeAdvertisment> ads = GetMonitorAds(); - //Combine well-known with new connected peers + //Combine well-known peers that are currently connected to this server CacheNodeAdvertisment[] allAds = ads.Union(wellKnown).ToArray(); if (allAds.Length > 0) { - //Discover all known nodes + //Build the discovery map from all the known nodes to find all known nodes in the entire cluster await clusterMan.DiscoverNodesAsync(allAds, exitToken); } @@ -132,16 +126,16 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering { if (IsDebug) { - Log.Debug("Well known node discovery failed, waiting for {idel} before retrying", WhenWellKnownResolveFailed); + Log.Debug("Well known node discovery failed, waiting for {idel} before retrying", CacheConstants.Delays.WellKnownResolveFailed); } //Wait for shorter duration - await Task.Delay(WhenWellKnownResolveFailed, exitToken); + await Task.Delay(CacheConstants.Delays.WellKnownResolveFailed, exitToken); } else { //Delay the next discovery - await Task.Delay(config.DiscoveryInterval, exitToken); + await Task.Delay(clusterConf.DiscoveryInterval, exitToken); } } } diff --git a/plugins/ObjectCacheServer/src/Endpoints/CacheNegotationManager.cs b/plugins/ObjectCacheServer/src/Endpoints/CacheNegotationManager.cs index 48f4448..99433e1 100644 --- a/plugins/ObjectCacheServer/src/Endpoints/CacheNegotationManager.cs +++ b/plugins/ObjectCacheServer/src/Endpoints/CacheNegotationManager.cs @@ -62,14 +62,10 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints * client could use trial and error to find the servers buffer configuration. */ - private static readonly TimeSpan AuthTokenExpiration = TimeSpan.FromSeconds(30); - private readonly string AudienceLocalServerId = Guid.NewGuid().ToString("N"); private readonly ObjectCacheSystemState _sysState = plugin.GetOrCreateSingleton<ObjectCacheSystemState>(); - private NodeConfig NodeConfig => _sysState.Configuration; - private CacheMemoryConfiguration CacheConfig => _sysState.MemoryConfiguration; public bool IsClientNegotiationValid(string authToken, out ClientNegotiationState state) @@ -80,12 +76,12 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints using JsonWebToken jwt = JsonWebToken.Parse(authToken); //verify signature for client - if (NodeConfig.KeyStore.VerifyJwt(jwt, false)) + if (_sysState.KeyStore.VerifyJwt(jwt, false)) { //Validated as normal client } //May be signed by a cache server - else if (NodeConfig.KeyStore.VerifyJwt(jwt, true)) + else if (_sysState.KeyStore.VerifyJwt(jwt, true)) { //Set peer and verified flag since the another cache server signed the request state.IsPeer = true; @@ -114,11 +110,11 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints //Verified, now we can create an auth message with a short expiration JsonWebToken auth = new(); - auth.WriteHeader(NodeConfig.KeyStore.GetJwtHeader()); + auth.WriteHeader(_sysState.KeyStore.GetJwtHeader()); auth.InitPayloadClaim() .AddClaim("aud", AudienceLocalServerId) .AddClaim("iat", now.ToUnixTimeSeconds()) - .AddClaim("exp", now.Add(AuthTokenExpiration).ToUnixTimeSeconds()) + .AddClaim("exp", now.Add(CacheConstants.ClientAuthTokenExpiration).ToUnixTimeSeconds()) .AddClaim("nonce", RandomHash.GetRandomBase32(8)) .AddClaim("chl", state.Challenge!) //Set the ispeer flag if the request was signed by a cache server @@ -134,7 +130,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints .CommitClaims(); //Sign the auth message from our private key - NodeConfig.KeyStore.SignJwt(auth); + _sysState.KeyStore.SignJwt(auth); return auth; } @@ -150,7 +146,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints using JsonWebToken jwt = JsonWebToken.Parse(upgradeToken); //verify signature against the cache public key, since this server must have signed it - if (!NodeConfig.KeyStore.VerifyCachePeer(jwt)) + if (!_sysState.KeyStore.VerifyCachePeer(jwt)) { return false; } @@ -172,7 +168,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints } //Check node ip address matches if required - if (NodeConfig.VerifyIp) + if (_sysState.ClusterConfig.VerifyIp) { if (!doc.RootElement.TryGetProperty("ip", out JsonElement ipEl)) { @@ -198,7 +194,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints } //Verify token signature against a fellow cache public key - return NodeConfig.KeyStore.VerifyUpgradeToken(tokenSignature, upgradeToken, isPeer); + return _sysState.KeyStore.VerifyUpgradeToken(tokenSignature, upgradeToken, isPeer); } } } diff --git a/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs b/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs index d6b733c..8368d3a 100644 --- a/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs +++ b/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs @@ -53,15 +53,12 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints internal sealed class ConnectEndpoint : ResourceEndpointBase { - internal const string LOG_SCOPE_NAME = "CONEP"; - - private readonly ObjectCacheSystemState _sysState; private PeerEventQueueManager PubSubManager => _sysState.PeerEventQueue; private CachePeerMonitor Peers => _sysState.PeerMonitor; private BlobCacheListener<IPeerEventQueue> Listener => _sysState.Listener; - private NodeConfig NodeConfiguration => _sysState.Configuration; + private ServerClusterConfig ClusterConfiguration => _sysState.ClusterConfig; private readonly CacheNegotationManager AuthManager; @@ -89,7 +86,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints _sysState = plugin.GetOrCreateSingleton<ObjectCacheSystemState>(); //Init from config and create a new log scope - InitPathAndLog(NodeConfiguration.ConnectPath, plugin.Log.CreateScope(LOG_SCOPE_NAME)); + InitPathAndLog(ClusterConfiguration.ConnectPath, plugin.Log.CreateScope(CacheConstants.LogScopes.ConnectionEndpoint)); //Get the auth manager AuthManager = plugin.GetOrCreateSingleton<CacheNegotationManager>(); @@ -158,7 +155,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints * but malicious clients could cache a bunch of tokens and use them * later, exhausting resources. */ - if(_connectedClients >= NodeConfiguration.MaxConcurrentConnections) + if(_connectedClients >= ClusterConfiguration.MaxConcurrentConnections) { return VirtualClose(entity, HttpStatusCode.ServiceUnavailable); } @@ -187,7 +184,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints if (isPeer) { - discoveryAd = NodeConfiguration.KeyStore.VerifyPeerAdvertisment(optionalDiscovery); + discoveryAd = _sysState.KeyStore.VerifyPeerAdvertisment(optionalDiscovery); } WsUserState state; diff --git a/plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs b/plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs index 56fe8cd..8038b70 100644 --- a/plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs +++ b/plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs @@ -40,13 +40,11 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints { internal sealed class PeerDiscoveryEndpoint : ResourceEndpointBase { - private readonly ObjectCacheSystemState SysState; + private readonly ObjectCacheSystemState _sysState; - private CacheAuthKeyStore KeyStore => SysState.Configuration.KeyStore; + private CacheAuthKeyStore KeyStore => _sysState.KeyStore; - private CachePeerMonitor PeerMonitor => SysState.PeerMonitor; - - private CacheNodeConfiguration NodeConfig => SysState.Configuration.Config; + private CachePeerMonitor PeerMonitor => _sysState.PeerMonitor; ///<inheritdoc/> protected override ProtectionSettings EndpointProtectionSettings { get; } = new() @@ -60,9 +58,9 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints public PeerDiscoveryEndpoint(PluginBase plugin) { - SysState = plugin.GetOrCreateSingleton<ObjectCacheSystemState>(); + _sysState = plugin.GetOrCreateSingleton<ObjectCacheSystemState>(); - InitPathAndLog(SysState.Configuration.DiscoveryPath!, plugin.Log); + InitPathAndLog(_sysState.ClusterConfig.DiscoveryPath!, plugin.Log); } protected override VfReturnType Get(HttpEntity entity) @@ -121,7 +119,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints response.WriteHeader(KeyStore.GetJwtHeader()); response.InitPayloadClaim() - .AddClaim("iss", NodeConfig.NodeId) + .AddClaim("iss", _sysState.NodeConfig.NodeId) //Audience is the requestor id .AddClaim("sub", subject) .AddClaim("iat", entity.RequestedTimeUtc.ToUnixTimeSeconds()) diff --git a/plugins/ObjectCacheServer/src/Endpoints/WellKnownEndpoint.cs b/plugins/ObjectCacheServer/src/Endpoints/WellKnownEndpoint.cs index 04380c5..18855e3 100644 --- a/plugins/ObjectCacheServer/src/Endpoints/WellKnownEndpoint.cs +++ b/plugins/ObjectCacheServer/src/Endpoints/WellKnownEndpoint.cs @@ -59,13 +59,13 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints public WellKnownEndpoint(PluginBase plugin) { //Get the node config - NodeConfig nodeConfig = plugin.GetOrCreateSingleton<ObjectCacheSystemState>().Configuration; + ObjectCacheSystemState conf = plugin.GetOrCreateSingleton<ObjectCacheSystemState>(); //serialize the config, discovery may not be enabled - _advertisment = nodeConfig.Config.Advertisment; - _keyStore = nodeConfig.KeyStore; + _advertisment = conf.NodeConfig.Advertisment; + _keyStore = conf.KeyStore; - InitPathAndLog(nodeConfig.WellKnownPath, plugin.Log); + InitPathAndLog(conf.ClusterConfig.WellKnownPath, plugin.Log); } protected override VfReturnType Get(HttpEntity entity) diff --git a/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs b/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs index b970cee..42bd0c7 100644 --- a/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs +++ b/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs @@ -26,6 +26,7 @@ using System; using System.Collections.Generic; using VNLib.Plugins; +using VNLib.Utils; using VNLib.Utils.Logging; using VNLib.Plugins.Extensions.Loading; using VNLib.Plugins.Extensions.Loading.Routing; @@ -39,15 +40,16 @@ namespace VNLib.Data.Caching.ObjectCache.Server public sealed class ObjectCacheServerEntry : PluginBase { public override string PluginName => "ObjectCache.Service"; - + + ObjectCacheSystemState? sysState; protected override void OnLoad() { try { //Initialize the cache node builder - ObjectCacheSystemState builder = this.GetOrCreateSingleton<ObjectCacheSystemState>(); - builder.Initialize(); + sysState = this.GetOrCreateSingleton<ObjectCacheSystemState>(); + sysState.Initialize(); //Route well-known endpoint this.Route<WellKnownEndpoint>(); @@ -58,8 +60,8 @@ namespace VNLib.Data.Caching.ObjectCache.Server //We must initialize the replication manager _ = this.GetOrCreateSingleton<CacheNodeReplicationMaanger>(); - //Setup discovery endpoint - if(!string.IsNullOrWhiteSpace(builder.Configuration.DiscoveryPath)) + //Setup discovery endpoint only if the user enabled clustering + if(!string.IsNullOrWhiteSpace(sysState.ClusterConfig.DiscoveryPath)) { this.Route<PeerDiscoveryEndpoint>(); } @@ -79,7 +81,29 @@ namespace VNLib.Data.Caching.ObjectCache.Server protected override void ProcessHostCommand(string cmd) { - throw new NotImplementedException(); + if(string.IsNullOrWhiteSpace(cmd)) + { + return; + } + + ArgumentList al = new(cmd.Split(" ")); + + if(al.Count == 0) + { + Log.Warn("Invalid command"); + return; + } + + switch (al[0].ToLower(null)) + { + case "memstats": + sysState?.LogMemoryStats(); + break; + + default: + Log.Warn("Invalid command"); + break; + } } } } diff --git a/plugins/ObjectCacheServer/src/ObjectCacheSystemState.cs b/plugins/ObjectCacheServer/src/ObjectCacheSystemState.cs index 6183956..970e832 100644 --- a/plugins/ObjectCacheServer/src/ObjectCacheSystemState.cs +++ b/plugins/ObjectCacheServer/src/ObjectCacheSystemState.cs @@ -37,14 +37,19 @@ using VNLib.Plugins.Extensions.Loading; using VNLib.Data.Caching.Extensions.Clustering; using VNLib.Data.Caching.ObjectCache.Server.Cache; using VNLib.Data.Caching.ObjectCache.Server.Clustering; +using System.Threading.Tasks; +using System.Threading; namespace VNLib.Data.Caching.ObjectCache.Server { + /* + * The purpose of this class is to manage the state of the entire cache server. + * All configuration and state should be creatd and managed by this class. To make it + * easier to manage. + */ [ConfigurationName("cache")] internal sealed class ObjectCacheSystemState(PluginBase plugin, IConfigScope config) : IDisposable { - const string LISTENER_LOG_SCOPE = "CacheListener"; - public BlobCacheListener<IPeerEventQueue> Listener { get; private set; } = null!; public ICacheStore InternalStore { get; private set; } = null!; @@ -57,7 +62,17 @@ namespace VNLib.Data.Caching.ObjectCache.Server /// <summary> /// The plugin-wide, shared node configuration /// </summary> - public NodeConfig Configuration { get; } = plugin.GetOrCreateSingleton<NodeConfig>(); + public ServerClusterConfig ClusterConfig { get; } = plugin.GetOrCreateSingleton<ServerClusterConfig>(); + + /// <summary> + /// The system wide cache authenticator + /// </summary> + public CacheAuthKeyStore KeyStore { get; } = new(plugin); + + /// <summary> + /// The system cache node configuration + /// </summary> + public CacheNodeConfiguration NodeConfig { get; private set; } /// <summary> /// The peer discovery manager @@ -76,6 +91,8 @@ namespace VNLib.Data.Caching.ObjectCache.Server /// </summary> public PeerEventQueueManager PeerEventQueue { get; private set; } + private ICacheMemoryManagerFactory _cacheMemManager; + void IDisposable.Dispose() { SharedCacheHeap.Dispose(); @@ -104,11 +121,15 @@ namespace VNLib.Data.Caching.ObjectCache.Server new TrackedHeapWrapper(MemoryUtil.InitializeNewHeapForProcess(), true) : MemoryUtil.InitializeNewHeapForProcess(); + //Load node configuration first + (NodeConfig = ClusterConfig.BuildNodeConfig()) + .WithAuthenticator(KeyStore); //Also pass the key store to the node config + ConfigurePeerDiscovery(); ConfigureCacheListener(); - PeerEventQueue = new(plugin, Configuration); + PeerEventQueue = new(plugin, ClusterConfig); } private void ConfigurePeerDiscovery() @@ -117,15 +138,16 @@ namespace VNLib.Data.Caching.ObjectCache.Server IConfigScope? config = plugin.TryGetConfig("known_peers"); string[] kownPeers = config?.Deserialze<string[]>() ?? []; - ILogProvider discLogger = plugin.Log.CreateScope(PeerDiscoveryManager.LOG_SCOPE_NAME); + ILogProvider discLogger = plugin.Log.CreateScope(CacheConstants.LogScopes.PeerDiscovery); - Configuration.Config.WithInitialPeers(kownPeers.Select(static s => new Uri(s))) + NodeConfig.WithInitialPeers(kownPeers.Select(static s => new Uri(s))) .WithErrorHandler(new ErrorHandler(discLogger)); discLogger.Information("Inital peer nodes: {nodes}", kownPeers); PeerDiscovery = new PeerDiscoveryManager( - Configuration, + NodeConfig, + ClusterConfig, discLogger, plugin.IsDebug(), kownPeers.Length > 0 @@ -152,11 +174,13 @@ namespace VNLib.Data.Caching.ObjectCache.Server manager = plugin.CreateServiceExternal<ICacheMemoryManagerFactory>(MemoryConfiguration.ExternLibPath); } + _cacheMemManager = manager; + //Endpoint only allows for a single reader Listener = new( plugin.LoadMemoryCacheSystem(config, manager, MemoryConfiguration), - plugin.GetOrCreateSingleton<CacheListenerPubQueue>(), - plugin.Log.CreateScope(LISTENER_LOG_SCOPE), + new CacheListenerPubQueue(plugin, PeerEventQueue), + plugin.Log.CreateScope(CacheConstants.LogScopes.BlobCacheListener), new SharedHeapFBMMemoryManager(SharedCacheHeap) ); @@ -189,27 +213,105 @@ Cache Configuration: ); } + public void LogMemoryStats() + { + if(SharedCacheHeap is TrackedHeapWrapper thw) + { + const string shStatTemplate = +@" VNCache shared heap stats: + Current: {cur}kB + Blocks: {blks} + Max size: {max}kB +"; + HeapStatistics stats = thw.GetCurrentStats(); + plugin.Log.Debug( + shStatTemplate, + stats.AllocatedBytes / 1024, + stats.AllocatedBlocks, + stats.MaxHeapSize / 1024 + ); + + } + + //Also print logs for the bucket local managers if they are enabled + if(_cacheMemManager is BucketLocalManagerFactory blmf) + { + blmf.LogHeapStats(); + } + } + private sealed class ErrorHandler(ILogProvider Logger) : ICacheDiscoveryErrorHandler { public void OnDiscoveryError(CacheNodeAdvertisment errorNode, Exception ex) + => LogError(ex, errorNode.NodeId, errorNode.ConnectEndpoint); + + public void OnDiscoveryError(Uri errorAddress, Exception ex) + => LogError(ex, null, errorAddress); + + private void LogError(Exception ex, string? nodId, Uri? connectAddress) { + //For logging purposes, use the node id if its available, otherwise use the address + if(nodId == null && connectAddress != null) + { + nodId = connectAddress.ToString(); + } + if (ex is HttpRequestException hre) { if (hre.InnerException is SocketException se) { //transport failed - Logger.Warn("Failed to connect to server {serv} because {err}", errorNode, se.Message); + Logger.Warn("Failed to connect to server {serv} because {err}", nodId, se.Message); } else { - Logger.Error("Failed to connect to node {n}\n{err}", errorNode, hre); + Logger.Error("Failed to connect to node {n}\n{err}", nodId, hre); } } + if (ex is OperationCanceledException) + { + Logger.Error("Failed to discover nodes from nodeid {nid}, because the operation was canceled"); + } + else if (ex is TimeoutException) + { + //Only log exception stack when in debug logging mode + Logger.Warn("Failed to discover nodes from nodeid {nid}, because a timeout occured", nodId); + } else { - Logger.Error("Failed to discover nodes from nodeid {nid}, with error\n{err}", errorNode, ex); + //Only log exception stack when in debug logging mode + if (Logger.IsEnabled(LogLevel.Debug)) + { + Logger.Error("Failed to discover nodes from nodeid {nid}, with error\n{err}", nodId, ex); + } + else + { + Logger.Error("Failed to discover nodes from nodeid {nid}, with error: {err}", nodId, ex.Message); + } } } } + + internal sealed class CacheStore(IBlobCacheTable table) : ICacheStore + { + + ///<inheritdoc/> + ValueTask ICacheStore.AddOrUpdateBlobAsync<T>(string objectId, string? alternateId, ObjectDataGet<T> bodyData, T state, CancellationToken token) + { + return table.AddOrUpdateObjectAsync(objectId, alternateId, bodyData, state, default, token); + } + + ///<inheritdoc/> + void ICacheStore.Clear() + { + throw new NotImplementedException(); + } + + ///<inheritdoc/> + ValueTask<bool> ICacheStore.DeleteItemAsync(string id, CancellationToken token) + { + return table.DeleteObjectAsync(id, token); + } + } } } diff --git a/plugins/ObjectCacheServer/src/NodeConfig.cs b/plugins/ObjectCacheServer/src/ServerClusterConfig.cs index 4dd9f4a..8f81ba6 100644 --- a/plugins/ObjectCacheServer/src/NodeConfig.cs +++ b/plugins/ObjectCacheServer/src/ServerClusterConfig.cs @@ -3,9 +3,9 @@ * * Library: VNLib * Package: ObjectCacheServer -* File: NodeConfig.cs +* File: ServerClusterConfig.cs * -* NodeConfig.cs is part of ObjectCacheServer which is part of the larger +* ServerClusterConfig.cs is part of ObjectCacheServer which is part of the larger * VNLib collection of libraries and utilities. * * ObjectCacheServer is free software: you can redistribute it and/or modify @@ -34,37 +34,30 @@ using VNLib.Utils.Extensions; using VNLib.Plugins.Extensions.Loading; using VNLib.Data.Caching.Extensions.Clustering; - namespace VNLib.Data.Caching.ObjectCache.Server { [ConfigurationName("cluster")] - internal sealed class NodeConfig + internal sealed class ServerClusterConfig(PluginBase plugin, IConfigScope config) { - //Default path for the well known endpoint - const string DefaultPath = "/.well-known/vncache"; - - public CacheNodeConfiguration Config { get; } - - public CacheAuthKeyStore KeyStore { get; } + public TimeSpan DiscoveryInterval { get; } = config.GetRequiredProperty("discovery_interval_sec", p => p.GetTimeSpan(TimeParseType.Seconds)); - public TimeSpan DiscoveryInterval { get; } + public TimeSpan EventQueuePurgeInterval { get; } = config.GetRequiredProperty("queue_purge_interval_sec", p => p.GetTimeSpan(TimeParseType.Seconds)); - public TimeSpan EventQueuePurgeInterval { get; } + public int MaxQueueDepth { get; } = (int)config.GetRequiredProperty("max_queue_depth", p => p.GetUInt32()); - public int MaxQueueDepth { get; } + public string? DiscoveryPath { get; } = config.GetValueOrDefault(CacheConfigTemplate, p => p.GetString(), null); - public string? DiscoveryPath { get; } + public string ConnectPath { get; } = config.GetRequiredProperty("connect_path", p => p.GetString()!); - public string ConnectPath { get; } + public string WellKnownPath { get; } = config.GetValueOrDefault("well_known_path", p => p.GetString()!, CacheConstants.DefaultWellKnownPath) + ?? CacheConstants.DefaultWellKnownPath; - public string WellKnownPath { get; } - - public bool VerifyIp { get; } + public bool VerifyIp { get; } = config.GetRequiredProperty("verify_ip", p => p.GetBoolean()); /// <summary> /// The maximum number of peer connections to allow /// </summary> - public uint MaxPeerConnections { get; } = 10; + public uint MaxPeerConnections { get; } = config.GetValueOrDefault("max_peers", p => p.GetUInt32(), 10u); /// <summary> /// The maxium number of concurrent client connections to allow @@ -72,8 +65,25 @@ namespace VNLib.Data.Caching.ObjectCache.Server /// </summary> public uint MaxConcurrentConnections { get; } - public NodeConfig(PluginBase plugin, IConfigScope config) - { + const string CacheConfigTemplate = +@" +Cluster Configuration: + Node Id: {id} + TlsEndabled: {tls} + Verify Ip: {vi} + Well-Known: {wk} + Cache Endpoint: {ep} + Discovery Endpoint: {dep} + Discovery Interval: {di} + Max Peer Connections: {mpc} + Max Queue Depth: {mqd} + Event Queue Purge Interval: {eqpi} +"; + + internal CacheNodeConfiguration BuildNodeConfig() + { + CacheNodeConfiguration conf = new(); + //Get the port of the primary webserver int port; bool usingTls; @@ -94,58 +104,24 @@ namespace VNLib.Data.Caching.ObjectCache.Server //Server id is just dns name for now string nodeId = $"{hostname}:{port}"; - - //Init key store - KeyStore = new(plugin); - - DiscoveryInterval = config.GetRequiredProperty("discovery_interval_sec", p => p.GetTimeSpan(TimeParseType.Seconds)); - EventQueuePurgeInterval = config.GetRequiredProperty("queue_purge_interval_sec", p => p.GetTimeSpan(TimeParseType.Seconds)); - MaxQueueDepth = (int)config.GetRequiredProperty("max_queue_depth", p => p.GetUInt32()); - ConnectPath = config.GetRequiredProperty("connect_path", p => p.GetString()!); - VerifyIp = config.GetRequiredProperty("verify_ip", p => p.GetBoolean()); - WellKnownPath = config.GetValueOrDefault("well_known_path", p => p.GetString()!, DefaultPath); - MaxPeerConnections = config.GetValueOrDefault("max_peers", p => p.GetUInt32(), 10u); Uri connectEp = BuildUri(usingTls, hostname, port, ConnectPath); Uri? discoveryEp = null; - - //Setup cache node config - (Config = new()) - .WithCacheEndpoint(connectEp) + + + conf.WithCacheEndpoint(connectEp) .WithNodeId(nodeId) - .WithAuthenticator(KeyStore) .WithTls(usingTls); //Get the discovery path (optional) - if (config.TryGetValue("discovery_path", out JsonElement discoveryPathEl)) + if (!string.IsNullOrWhiteSpace(DiscoveryPath)) { - DiscoveryPath = discoveryPathEl.GetString(); - - //Enable advertisment if a discovery path is present - if (!string.IsNullOrEmpty(DiscoveryPath)) - { - //Build the discovery endpoint, it must be an absolute uri - discoveryEp = BuildUri(usingTls, hostname, port, DiscoveryPath); - Config.EnableAdvertisment(discoveryEp); - } + //Build the discovery endpoint, it must be an absolute uri + discoveryEp = BuildUri(usingTls, hostname, port, DiscoveryPath); + conf.EnableAdvertisment(discoveryEp); } - const string CacheConfigTemplate = -@" -Cluster Configuration: - Node Id: {id} - TlsEndabled: {tls} - Verify Ip: {vi} - Well-Known: {wk} - Cache Endpoint: {ep} - Discovery Endpoint: {dep} - Discovery Interval: {di} - Max Peer Connections: {mpc} - Max Queue Depth: {mqd} - Event Queue Purge Interval: {eqpi} -"; - - //log the config + //print the cluster configuration to the log plugin.Log.Information(CacheConfigTemplate, nodeId, usingTls, @@ -158,6 +134,8 @@ Cluster Configuration: MaxQueueDepth, EventQueuePurgeInterval ); + + return conf; } private static Uri BuildUri(bool tls, string host, int port, string path) |