diff options
Diffstat (limited to 'plugins/ObjectCacheServer/src')
16 files changed, 468 insertions, 447 deletions
diff --git a/plugins/ObjectCacheServer/src/BucketLocalManagerFactory.cs b/plugins/ObjectCacheServer/src/BucketLocalManagerFactory.cs index e7fa3e1..944aa4b 100644 --- a/plugins/ObjectCacheServer/src/BucketLocalManagerFactory.cs +++ b/plugins/ObjectCacheServer/src/BucketLocalManagerFactory.cs @@ -33,11 +33,10 @@ using System.Runtime.CompilerServices; using VNLib.Plugins; using VNLib.Utils; using VNLib.Utils.Memory; -using VNLib.Utils.Extensions; -using VNLib.Plugins.Extensions.Loading; using VNLib.Utils.Memory.Diagnostics; using VNLib.Utils.Logging; - +using VNLib.Utils.Extensions; +using VNLib.Plugins.Extensions.Loading; /* * How bucket local memory works: * diff --git a/plugins/ObjectCacheServer/src/Cache/CacheListenerPubQueue.cs b/plugins/ObjectCacheServer/src/Cache/CacheListenerPubQueue.cs index 6942828..16fda39 100644 --- a/plugins/ObjectCacheServer/src/Cache/CacheListenerPubQueue.cs +++ b/plugins/ObjectCacheServer/src/Cache/CacheListenerPubQueue.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2023 Vaughn Nugent +* Copyright (c) 2024 Vaughn Nugent * * Library: VNLib * Package: ObjectCacheServer @@ -50,11 +50,11 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache private readonly AsyncQueue<ChangeEvent> _listenerQueue; private readonly ILogProvider _logProvider; - private readonly ICacheEventQueueManager _queueManager; + private readonly PeerEventQueueManager _queueManager; public CacheListenerPubQueue(PluginBase plugin) { - _queueManager = plugin.GetOrCreateSingleton<CacheEventQueueManager>(); + _queueManager = plugin.GetOrCreateSingleton<PeerEventQueueManager>(); _logProvider = plugin.Log.CreateScope(LOG_SCOPE_NAME); //Init local queue to store published events @@ -110,10 +110,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache } ///<inheritdoc/> - public bool IsEnabled([NotNullWhen(true)] IPeerEventQueue? userState) - { - return userState is IPeerEventQueue; - } + public bool IsEnabled([NotNullWhen(true)] IPeerEventQueue? userState) => userState is not null; ///<inheritdoc/> public void PublishEvent(ChangeEvent changeEvent) diff --git a/plugins/ObjectCacheServer/src/Cache/CacheConfiguration.cs b/plugins/ObjectCacheServer/src/Cache/CacheMemoryConfiguration.cs index bd15d24..c404cc5 100644 --- a/plugins/ObjectCacheServer/src/Cache/CacheConfiguration.cs +++ b/plugins/ObjectCacheServer/src/Cache/CacheMemoryConfiguration.cs @@ -1,12 +1,12 @@ /* -* Copyright (c) 2023 Vaughn Nugent +* Copyright (c) 2024 Vaughn Nugent * * Library: VNLib * Package: ObjectCacheServer -* File: CacheConfiguration.cs +* File: CacheMemoryConfiguration.cs * -* CacheConfiguration.cs is part of ObjectCacheServer which is part of the larger -* VNLib collection of libraries and utilities. +* CacheMemoryConfiguration.cs is part of ObjectCacheServer which +* is part of the larger VNLib collection of libraries and utilities. * * ObjectCacheServer is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License as @@ -26,7 +26,7 @@ using System.Text.Json.Serialization; namespace VNLib.Data.Caching.ObjectCache.Server.Cache { - internal sealed class CacheConfiguration + internal sealed class CacheMemoryConfiguration { [JsonPropertyName("buffer_recv_max")] public int MaxRecvBufferSize { get; set; } = 1000 * 1024; @@ -36,6 +36,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache [JsonPropertyName("buffer_header_max")] public int MaxHeaderBufferSize { get; set; } = 2 * 1024; + [JsonPropertyName("buffer_header_min")] public int MinHeaderBufferSize { get; set; } = 128; @@ -49,5 +50,9 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache [JsonPropertyName("buckets")] public uint BucketCount { get; set; } = 10; + + + [JsonPropertyName("memory_lib_path")] + public string? ExternLibPath { get; set; } } } diff --git a/plugins/ObjectCacheServer/src/Cache/CacheStore.cs b/plugins/ObjectCacheServer/src/Cache/CacheStore.cs index 86df849..81f4843 100644 --- a/plugins/ObjectCacheServer/src/Cache/CacheStore.cs +++ b/plugins/ObjectCacheServer/src/Cache/CacheStore.cs @@ -26,32 +26,24 @@ using System; using System.Threading; using System.Threading.Tasks; -using VNLib.Utils.Logging; -using VNLib.Net.Messaging.FBM; -using VNLib.Plugins; using VNLib.Plugins.Extensions.Loading; namespace VNLib.Data.Caching.ObjectCache.Server.Cache { + /* * Implements the blob cache store, which is an abstraction around the blob cache listener. * This allows for publishing local events (say from other nodes) to keep caches in sync. */ [ConfigurationName("cache")] - internal sealed class CacheStore(PluginBase plugin, IConfigScope config) : ICacheStore, IDisposable + internal sealed class CacheStore(IBlobCacheTable table) : ICacheStore { - /// <summary> - /// Gets the underlying cache listener - /// </summary> - public BlobCacheListener<IPeerEventQueue> Listener { get; } = InitializeCache((ObjectCacheServerEntry)plugin, config); - - ///<inheritdoc/> ValueTask ICacheStore.AddOrUpdateBlobAsync<T>(string objectId, string? alternateId, ObjectDataGet<T> bodyData, T state, CancellationToken token) { - return Listener.Cache.AddOrUpdateObjectAsync(objectId, alternateId, bodyData, state, default, token); + return table.AddOrUpdateObjectAsync(objectId, alternateId, bodyData, state, default, token); } ///<inheritdoc/> @@ -63,64 +55,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache ///<inheritdoc/> ValueTask<bool> ICacheStore.DeleteItemAsync(string id, CancellationToken token) { - return Listener.Cache.DeleteObjectAsync(id, token); - } - - private static BlobCacheListener<IPeerEventQueue> InitializeCache(ObjectCacheServerEntry plugin, IConfigScope config) - { - const string CacheConfigTemplate = -@" -Cache Configuration: - Max memory: {max} Mb - Buckets: {bc} - Entries per-bucket: {mc} -"; - - //Deserialize the cache config - CacheConfiguration cacheConf = config.Deserialze<CacheConfiguration>(); - - if (cacheConf.MaxCacheEntries < 2) - { - throw new ArgumentException("You must configure a 'max_cache' size larger than 1 item"); - } - - //Suggestion - if (cacheConf.MaxCacheEntries < 200) - { - plugin.Log.Information("Suggestion: You may want a larger cache size, you have less than 200 items in cache"); - } - - //calculate the max memory usage - ulong maxByteSize = cacheConf.MaxCacheEntries * (ulong)cacheConf.BucketCount * (ulong)cacheConf.MaxMessageSize; - - //Log the cache config - plugin.Log.Information(CacheConfigTemplate, - maxByteSize / (1024 * 1000), - cacheConf.BucketCount, - cacheConf.MaxCacheEntries - ); - - //Get the event listener - ICacheListenerEventQueue<IPeerEventQueue> queue = plugin.GetOrCreateSingleton<CacheListenerPubQueue>(); - - //Get the memory manager - ICacheMemoryManagerFactory manager = plugin.GetOrCreateSingleton<BucketLocalManagerFactory>(); - - //Load the blob cache table system - IBlobCacheTable bc = plugin.LoadMemoryCacheSystem(config, manager, cacheConf); - - FallbackFBMMemoryManager fbmMemManager = new(plugin.ListenerHeap); - - //Endpoint only allows for a single reader - return new(bc, queue, plugin.Log, fbmMemManager); - } - - /* - * Cleaned up by the plugin on exit - */ - public void Dispose() - { - Listener.Dispose(); + return table.DeleteObjectAsync(id, token); } } } diff --git a/plugins/ObjectCacheServer/src/Cache/CacheSystemUtil.cs b/plugins/ObjectCacheServer/src/Cache/CacheSystemUtil.cs index b7bf83f..8f196b0 100644 --- a/plugins/ObjectCacheServer/src/Cache/CacheSystemUtil.cs +++ b/plugins/ObjectCacheServer/src/Cache/CacheSystemUtil.cs @@ -29,6 +29,7 @@ using System.Text.Json; using VNLib.Utils.Resources; using VNLib.Plugins; using VNLib.Plugins.Extensions.Loading; +using VNLib.Utils.Extensions; namespace VNLib.Data.Caching.ObjectCache.Server.Cache { @@ -49,7 +50,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache /// <param name="cacheConf">The cache configuration object</param> /// <returns>The loaded <see cref="IBlobCacheTable"/> implementation</returns> /// <exception cref="FileNotFoundException"></exception> - public static IBlobCacheTable LoadMemoryCacheSystem(this PluginBase plugin, IConfigScope config, ICacheMemoryManagerFactory heap, CacheConfiguration cacheConf) + public static IBlobCacheTable LoadMemoryCacheSystem(this PluginBase plugin, IConfigScope config, ICacheMemoryManagerFactory heap, CacheMemoryConfiguration cacheConf) { #pragma warning disable CA2000 // Dispose objects before losing scope @@ -94,10 +95,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache if(initMethod != null) { //Itterate all buckets - foreach (IBlobCacheBucket bucket in table) - { - initMethod.Invoke(bucket.Id); - } + table.ForEach(bucket => initMethod(bucket.Id)); } } diff --git a/plugins/ObjectCacheServer/src/Cache/CacheEventQueueManager.cs b/plugins/ObjectCacheServer/src/Cache/PeerEventQueueManager.cs index e3c613d..12cf37a 100644 --- a/plugins/ObjectCacheServer/src/Cache/CacheEventQueueManager.cs +++ b/plugins/ObjectCacheServer/src/Cache/PeerEventQueueManager.cs @@ -1,11 +1,11 @@ /* -* Copyright (c) 2023 Vaughn Nugent +* Copyright (c) 2024 Vaughn Nugent * * Library: VNLib * Package: ObjectCacheServer -* File: CacheEventQueueManager.cs +* File: PeerEventQueueManager.cs * -* CacheEventQueueManager.cs is part of ObjectCacheServer which is +* PeerEventQueueManager.cs is part of ObjectCacheServer which is * part of the larger VNLib collection of libraries and utilities. * * ObjectCacheServer is free software: you can redistribute it and/or modify @@ -38,41 +38,37 @@ using VNLib.Plugins.Extensions.Loading.Events; namespace VNLib.Data.Caching.ObjectCache.Server.Cache { - internal sealed class CacheEventQueueManager : ICacheEventQueueManager, IDisposable, IIntervalScheduleable + internal sealed class PeerEventQueueManager : ICacheEventQueueManager, IIntervalScheduleable { private readonly int MaxQueueDepth; - private readonly object SubLock; - private readonly LinkedList<NodeQueue> Subscribers; + private readonly object SubLock = new(); + private readonly LinkedList<PeerEventListenerQueue> Subscribers = []; - private readonly object StoreLock; - private readonly Dictionary<string, NodeQueue> QueueStore; + private readonly object StoreLock = new(); + private readonly Dictionary<string, PeerEventListenerQueue> QueueStore = new(StringComparer.OrdinalIgnoreCase); - - public CacheEventQueueManager(PluginBase plugin) + public PeerEventQueueManager(PluginBase plugin, NodeConfig config) { - //Get node config - NodeConfig config = plugin.GetOrCreateSingleton<NodeConfig>(); - - //Get max queue depth MaxQueueDepth = config.MaxQueueDepth; /* - * Schedule purge interval to clean up stale queues - */ + * Schedule purge interval to clean up stale queues + */ plugin.ScheduleInterval(this, config.EventQueuePurgeInterval); - - SubLock = new(); - Subscribers = new(); - - StoreLock = new(); - QueueStore = new(StringComparer.OrdinalIgnoreCase); + + //Cleanup disposeables on unload + _ = plugin.RegisterForUnload(() => + { + QueueStore.Clear(); + Subscribers.Clear(); + }); } ///<inheritdoc/> public IPeerEventQueue Subscribe(ICachePeer peer) { - NodeQueue? nq; + PeerEventListenerQueue? nq; bool isNew = false; @@ -82,13 +78,13 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache //Try to recover the queue for the node if (!QueueStore.TryGetValue(peer.NodeId, out nq)) { - //Create new queue + //Create new queue since an existing queue was not found nq = new(peer.NodeId, MaxQueueDepth); QueueStore.Add(peer.NodeId, nq); isNew = true; } - //Increment listener count + //Increment listener count since a new listener has attached nq.Listeners++; } @@ -109,11 +105,20 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache ///<inheritdoc/> public void Unsubscribe(ICachePeer peer) { + /* + * The reason I am not purging queues that no longer have listeners + * now is because it is possible that a listener needed to detach because of + * a network issue and will be reconnecting shortly. If the node doesnt + * come back before the next purge interval, it's events will be purged. + * + * Point is: there is a reason for the garbage collection style purging + */ + //Detach a listener for a node lock (StoreLock) { //Get the queue and decrement the listener count - NodeQueue nq = QueueStore[peer.NodeId]; + PeerEventListenerQueue nq = QueueStore[peer.NodeId]; nq.Listeners--; } } @@ -125,7 +130,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache lock (SubLock) { //Loop through ll the fast way - LinkedListNode<NodeQueue>? q = Subscribers.First; + LinkedListNode<PeerEventListenerQueue>? q = Subscribers.First; while (q != null) { @@ -145,7 +150,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache lock (SubLock) { //Loop through ll the fast way - LinkedListNode<NodeQueue>? q = Subscribers.First; + LinkedListNode<PeerEventListenerQueue>? q = Subscribers.First; while (q != null) { @@ -167,9 +172,9 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache lock (StoreLock) { //Get all stale queues (queues without listeners) - NodeQueue[] staleQueues = QueueStore.Values.Where(static nq => nq.Listeners == 0).ToArray(); + PeerEventListenerQueue[] staleQueues = QueueStore.Values.Where(static nq => nq.Listeners == 0).ToArray(); - foreach (NodeQueue nq in staleQueues) + foreach (PeerEventListenerQueue nq in staleQueues) { //Remove from store QueueStore.Remove(nq.NodeId); @@ -191,54 +196,39 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache return Task.CompletedTask; } - void IDisposable.Dispose() - { - QueueStore.Clear(); - Subscribers.Clear(); - } /* * Holds queues for each node and keeps track of the number of listeners * attached to the queue + * + * The role of this class is to store change events for a given peer node, + * and return them when the peer requests them. It also keeps track of the + * number of active listeners (server connections) to the queue. */ - private sealed class NodeQueue : IPeerEventQueue + private sealed class PeerEventListenerQueue(string nodeId, int maxDepth) : IPeerEventQueue { public int Listeners; - public string NodeId { get; } - - public AsyncQueue<ChangeEvent> Queue { get; } + public string NodeId => nodeId; - public NodeQueue(string nodeId, int maxDepth) + /* + * Create a bounded channel that acts as a lru and evicts + * the oldest item when the queue is full + * + * There will also only ever be a single thread writing events + * to the queue + */ + private readonly AsyncQueue<ChangeEvent> Queue = new(new BoundedChannelOptions(maxDepth) { - NodeId = nodeId; - - /* - * Create a bounded channel that acts as a lru and evicts - * the oldest item when the queue is full - * - * There will also only ever be a single thread writing events - * to the queue - */ - - BoundedChannelOptions queueOptions = new(maxDepth) - { - AllowSynchronousContinuations = true, - SingleReader = false, - SingleWriter = true, - //Drop oldest item in queue if full - FullMode = BoundedChannelFullMode.DropOldest, - }; - - //Init queue/channel - Queue = new(queueOptions); - } + AllowSynchronousContinuations = true, + SingleReader = false, + SingleWriter = true, + //Drop oldest item in queue if full + FullMode = BoundedChannelFullMode.DropOldest, + }); - public void PublishChange(ChangeEvent change) - { - Queue.TryEnque(change); - } + public void PublishChange(ChangeEvent change) => Queue.TryEnque(change); public void PublishChanges(Span<ChangeEvent> changes) { @@ -249,16 +239,10 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache } ///<inheritdoc/> - public ValueTask<ChangeEvent> DequeueAsync(CancellationToken cancellation) - { - return Queue.DequeueAsync(cancellation); - } + public ValueTask<ChangeEvent> DequeueAsync(CancellationToken cancellation) => Queue.DequeueAsync(cancellation); ///<inheritdoc/> - public bool TryDequeue(out ChangeEvent change) - { - return Queue.TryDequeue(out change); - } + public bool TryDequeue(out ChangeEvent change) => Queue.TryDequeue(out change); } } } diff --git a/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs b/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs index a240dde..0a1bb4d 100644 --- a/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs +++ b/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2023 Vaughn Nugent +* Copyright (c) 2024 Vaughn Nugent * * Library: VNLib * Package: ObjectCacheServer @@ -36,7 +36,6 @@ using VNLib.Net.Messaging.FBM; using VNLib.Net.Messaging.FBM.Client; using VNLib.Plugins.Extensions.Loading; using VNLib.Data.Caching.Extensions.Clustering; -using VNLib.Data.Caching.ObjectCache.Server.Cache; namespace VNLib.Data.Caching.ObjectCache.Server.Clustering { @@ -56,38 +55,37 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering internal sealed class CacheNodeReplicationMaanger : IAsyncBackgroundWork { private const string LOG_SCOPE_NAME = "REPL"; + private const string FBM_LOG_SCOPE_NAME = "REPL-CLNT"; private static readonly TimeSpan GetItemTimeout = TimeSpan.FromSeconds(10); private const int MAX_MESSAGE_SIZE = 12 * 1024; private readonly PluginBase _plugin; private readonly ILogProvider _log; - private readonly NodeConfig _nodeConfig; - private readonly ICacheStore _cacheStore; - private readonly ICachePeerAdapter _peerAdapter; private readonly FBMClientFactory _clientFactory; - + private readonly ObjectCacheSystemState _sysState; + private readonly bool _isDebug; private int _openConnections; public CacheNodeReplicationMaanger(PluginBase plugin) { - //Load the node config - _nodeConfig = plugin.GetOrCreateSingleton<NodeConfig>(); - _cacheStore = plugin.GetOrCreateSingleton<CacheStore>(); - _peerAdapter = plugin.GetOrCreateSingleton<PeerDiscoveryManager>(); + _sysState = plugin.GetOrCreateSingleton<ObjectCacheSystemState>(); //Init fbm config with fixed message size FBMClientConfig clientConfig = FBMDataCacheExtensions.GetDefaultConfig( - (plugin as ObjectCacheServerEntry)!.ListenerHeap, + _sysState.SharedCacheHeap, MAX_MESSAGE_SIZE, - debugLog: plugin.IsDebug() ? plugin.Log : null + debugLog: plugin.IsDebug() ? plugin.Log.CreateScope(FBM_LOG_SCOPE_NAME) : null ); //Init ws fallback factory and client factory - FBMFallbackClientWsFactory wsFactory = new(); - _clientFactory = new(in clientConfig, wsFactory); + _clientFactory = new( + ref clientConfig, + new FBMFallbackClientWsFactory(), + (int)_sysState.Configuration.MaxPeerConnections + ); _plugin = plugin; _isDebug = plugin.IsDebug(); @@ -103,7 +101,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering while (true) { //Get all new peers - CacheNodeAdvertisment[] peers = _peerAdapter.GetNewPeers(); + CacheNodeAdvertisment[] peers = _sysState.PeerDiscovery.GetNewPeers(); if (peers.Length == 0 && _isDebug) { @@ -111,7 +109,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering } //Make sure we don't exceed the max connections - if(_openConnections >= _nodeConfig.MaxPeerConnections) + if(_openConnections >= _sysState.Configuration.MaxPeerConnections) { if (_isDebug) { @@ -150,13 +148,13 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering private async Task OnNewPeerDoWorkAsync(CacheNodeAdvertisment newPeer, ILogProvider log, CancellationToken exitToken) { - _ = newPeer ?? throw new ArgumentNullException(nameof(newPeer)); + ArgumentNullException.ThrowIfNull(newPeer); //Setup client FBMClient client = _clientFactory.CreateClient(); //Add peer to monitor - _peerAdapter.OnPeerListenerAttached(newPeer); + _sysState.PeerDiscovery.OnPeerListenerAttached(newPeer); Interlocked.Increment(ref _openConnections); @@ -165,12 +163,12 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering log.Information("Establishing replication connection to peer {server}...", newPeer.NodeId); //Connect to the server - await client.ConnectToCacheAsync(newPeer, _nodeConfig.Config, exitToken); + await client.ConnectToCacheAsync(newPeer, _sysState.Configuration.Config, exitToken); log.Information("Connected to {server}, starting queue listeners", newPeer.NodeId); //Start worker tasks - List<Task> workerTasks = new(); + List<Task> workerTasks = []; for (int i = 0; i < Environment.ProcessorCount; i++) { @@ -187,6 +185,10 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering //Disconnect client gracefully await client.DisconnectAsync(CancellationToken.None); } + catch(FBMServerNegiationException fbm) + { + log.Error("Failed to negotiate buffer configuration, check your cache memory configuration. Error:{err}", fbm.Message); + } catch (InvalidResponseException ie) { //See if the plugin is unloading @@ -227,7 +229,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering client.Dispose(); //Notify monitor of disconnect - _peerAdapter.OnPeerListenerDetatched(newPeer); + _sysState.PeerDiscovery.OnPeerListenerDetatched(newPeer); } } @@ -259,7 +261,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering return; case "deleted": //Delete the object from the store - await _cacheStore.DeleteItemAsync(changedObject.CurrentId, CancellationToken.None); + await _sysState.InternalStore.DeleteItemAsync(changedObject.CurrentId, CancellationToken.None); break; case "modified": //Reload the record from the store @@ -297,7 +299,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering if (ResponseCodes.Okay.Equals(status, StringComparison.Ordinal)) { //Update the record - await _cacheStore.AddOrUpdateBlobAsync(objectId, newId, static (t) => t.ResponseBody, response, cancellation); + await _sysState.InternalStore.AddOrUpdateBlobAsync(objectId, newId, static (t) => t.ResponseBody, response, cancellation); log.Debug("Updated object {id}", objectId); } else diff --git a/plugins/ObjectCacheServer/src/Clustering/CachePeerMonitor.cs b/plugins/ObjectCacheServer/src/Clustering/CachePeerMonitor.cs index c49a54b..c3fbd8e 100644 --- a/plugins/ObjectCacheServer/src/Clustering/CachePeerMonitor.cs +++ b/plugins/ObjectCacheServer/src/Clustering/CachePeerMonitor.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2023 Vaughn Nugent +* Copyright (c) 2024 Vaughn Nugent * * Library: VNLib * Package: ObjectCacheServer @@ -29,7 +29,6 @@ using System.Collections.Generic; using VNLib.Utils; using VNLib.Utils.Extensions; -using VNLib.Plugins; namespace VNLib.Data.Caching.ObjectCache.Server.Clustering { @@ -37,12 +36,9 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering internal sealed class CachePeerMonitor : VnDisposeable, IPeerMonitor { - private readonly LinkedList<ICachePeer> peers = new(); + private readonly List<ICachePeer> peers = new(); private readonly ManualResetEvent newPeerTrigger = new (false); - public CachePeerMonitor(PluginBase plugin) - { } - /// <summary> /// Waits for new peers to connect to the server /// </summary> @@ -70,7 +66,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering //When a peer is connected we can add it to the list so the replication manager can see it lock(peers) { - peers.AddLast(peer); + peers.Add(peer); } //Trigger monitor when change occurs @@ -92,6 +88,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering protected override void Free() { + peers.Clear(); newPeerTrigger.Dispose(); } } diff --git a/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs b/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs index 6475f9c..f22e1dd 100644 --- a/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs +++ b/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2023 Vaughn Nugent +* Copyright (c) 2024 Vaughn Nugent * * Library: VNLib * Package: ObjectCacheServer @@ -24,14 +24,11 @@ using System; using System.Linq; -using System.Net.Http; using System.Threading; -using System.Net.Sockets; using System.Threading.Tasks; using System.Collections.Generic; using VNLib.Utils.Logging; -using VNLib.Plugins; using VNLib.Plugins.Extensions.Loading; using VNLib.Data.Caching.Extensions; using VNLib.Data.Caching.Extensions.Clustering; @@ -43,9 +40,10 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering * This class is responsible for resolving and discovering peer nodes in the cluster network. */ - internal sealed class PeerDiscoveryManager : IAsyncBackgroundWork, ICachePeerAdapter + internal sealed class PeerDiscoveryManager(NodeConfig config, ILogProvider Log, bool IsDebug, bool HasWellKnown) : IAsyncBackgroundWork, ICachePeerAdapter { - private const string LOG_SCOPE_NAME = "DISC"; + internal const string LOG_SCOPE_NAME = "DISC"; + /* * The initial discovery delay. This allows for the server to initialize before * starting the discovery process. This will probably be a shorter delay @@ -54,43 +52,9 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering private static readonly TimeSpan InitialDelay = TimeSpan.FromSeconds(15); private static readonly TimeSpan WhenWellKnownResolveFailed = TimeSpan.FromSeconds(20); - - private readonly List<CacheNodeAdvertisment> _connectedPeers; - private readonly NodeConfig Config; - private readonly CachePeerMonitor Monitor; - private readonly ILogProvider Log; - private readonly bool IsDebug; - private readonly bool HasWellKnown; - - public PeerDiscoveryManager(PluginBase plugin) - { - //Get config - Config = plugin.GetOrCreateSingleton<NodeConfig>(); - - //Get the known peers array from config, its allowed to be null for master nodes - IConfigScope? config = plugin.TryGetConfig("known_peers"); - string[] kownPeers = config?.Deserialze<string[]>() ?? Array.Empty<string>(); - - //Add known peers to the monitor - Config.Config.WithInitialPeers(kownPeers.Select(static s => new Uri(s))); - - HasWellKnown = kownPeers.Length > 0; - - //Get the peer monitor - Monitor = plugin.GetOrCreateSingleton<CachePeerMonitor>(); - - _connectedPeers = new(); - - //Create scoped logger - Log = plugin.Log.CreateScope(LOG_SCOPE_NAME); - - Log.Information("Inital peer nodes: {nodes}", kownPeers); - - //Setup discovery error handler - Config.Config.WithErrorHandler(new ErrorHandler(Log)); - - IsDebug = plugin.IsDebug(); - } + private readonly List<CacheNodeAdvertisment> _connectedPeers = []; + private readonly CachePeerMonitor Monitor = new(); + private readonly VNCacheClusterManager clusterMan = new(config.Config); async Task IAsyncBackgroundWork.DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken) { @@ -124,7 +88,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering } //Resolve all known peers - CacheNodeAdvertisment[] wellKnown = await Config.Config.ResolveWellKnownAsync(exitToken); + CacheNodeAdvertisment[] wellKnown = await clusterMan.ResolveWellKnownAsync(exitToken); wellKnownFailed = wellKnown.Length == 0; //Use the monitor to get the initial peers @@ -136,13 +100,13 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering if (allAds.Length > 0) { //Discover all known nodes - await Config.Config.DiscoverNodesAsync(allAds, exitToken); + await clusterMan.DiscoverNodesAsync(allAds, exitToken); } //Log the discovered nodes if verbose logging is enabled if (IsDebug) { - CacheNodeAdvertisment[] found = Config.Config.NodeCollection.GetAllNodes(); + CacheNodeAdvertisment[] found = clusterMan.DiscoveredNodes.GetAllNodes(); Log.Debug("Discovered {count} nodes\n\t{nodes}", found.Length, found.Select(static s => s.NodeId)); } @@ -177,7 +141,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering else { //Delay the next discovery - await Task.Delay(Config.DiscoveryInterval, exitToken); + await Task.Delay(config.DiscoveryInterval, exitToken); } } } @@ -188,7 +152,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering } finally { - + Monitor.Dispose(); } //Wait for the watcher to exit @@ -197,10 +161,11 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering private IEnumerable<CacheNodeAdvertisment> GetMonitorAds() { + string selfId = (clusterMan.Config as CacheNodeConfiguration)!.NodeId; return Monitor.GetAllPeers() .Where(static p => p.Advertisment != null) //Without us - .Where(n => n.NodeId != Config.Config.NodeId) + .Where(n => !string.Equals(n.NodeId, selfId, StringComparison.OrdinalIgnoreCase)) .Select(static p => p.Advertisment!); } @@ -222,7 +187,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering //Use the monitor to get the initial peers IEnumerable<CacheNodeAdvertisment> ads = GetMonitorAds(); - ((NodeDiscoveryCollection)Config.Config.NodeCollection).AddManualNodes(ads); + clusterMan.AddManualNodes(ads); } } catch (OperationCanceledException) @@ -239,7 +204,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering lock (_connectedPeers) { //Get all discovered peers - CacheNodeAdvertisment[] peers = Config.Config.NodeCollection.GetAllNodes(); + CacheNodeAdvertisment[] peers = clusterMan.DiscoveredNodes.GetAllNodes(); //Get the difference between the discovered peers and the connected peers return peers.Except(_connectedPeers).ToArray(); @@ -265,31 +230,5 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering _connectedPeers.Remove(peer); } } - - - private sealed record class ErrorHandler(ILogProvider Logger) : ICacheDiscoveryErrorHandler - { - public void OnDiscoveryError(CacheNodeAdvertisment errorNode, Exception ex) - { - - if (ex is HttpRequestException hre) - { - if (hre.InnerException is SocketException se) - { - //traisnport failed - Logger.Warn("Failed to connect to server {serv} because {err}", errorNode, se.Message); - } - else - { - Logger.Error("Failed to connect to node {n}\n{err}", errorNode, hre); - } - } - else - { - Logger.Error("Failed to discover nodes from nodeid {nid}, with error\n{err}", errorNode, ex); - } - - } - } } } diff --git a/plugins/ObjectCacheServer/src/Endpoints/CacheNegotationManager.cs b/plugins/ObjectCacheServer/src/Endpoints/CacheNegotationManager.cs index d1591f8..48f4448 100644 --- a/plugins/ObjectCacheServer/src/Endpoints/CacheNegotationManager.cs +++ b/plugins/ObjectCacheServer/src/Endpoints/CacheNegotationManager.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2023 Vaughn Nugent +* Copyright (c) 2024 Vaughn Nugent * * Library: VNLib * Package: ObjectCacheServer @@ -45,7 +45,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints public bool IsPeer { get; set; } } - internal sealed class CacheNegotationManager + internal sealed class CacheNegotationManager(PluginBase plugin) { /* * Cache keys are centralized and may be shared between all cache server nodes. This means @@ -64,21 +64,13 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints private static readonly TimeSpan AuthTokenExpiration = TimeSpan.FromSeconds(30); - private readonly string AudienceLocalServerId; - private readonly NodeConfig _nodeConfig; - private readonly CacheConfiguration _cacheConfig; + private readonly string AudienceLocalServerId = Guid.NewGuid().ToString("N"); - public CacheNegotationManager(PluginBase plugin) - { - //Get node configuration - _nodeConfig = plugin.GetOrCreateSingleton<NodeConfig>(); + private readonly ObjectCacheSystemState _sysState = plugin.GetOrCreateSingleton<ObjectCacheSystemState>(); - //Get the cache store configuration - _cacheConfig = plugin.GetConfigForType<CacheStore>().Deserialze<CacheConfiguration>(); + private NodeConfig NodeConfig => _sysState.Configuration; - AudienceLocalServerId = Guid.NewGuid().ToString("N"); - } - + private CacheMemoryConfiguration CacheConfig => _sysState.MemoryConfiguration; public bool IsClientNegotiationValid(string authToken, out ClientNegotiationState state) { @@ -88,12 +80,12 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints using JsonWebToken jwt = JsonWebToken.Parse(authToken); //verify signature for client - if (_nodeConfig.KeyStore.VerifyJwt(jwt, false)) + if (NodeConfig.KeyStore.VerifyJwt(jwt, false)) { //Validated as normal client } //May be signed by a cache server - else if (_nodeConfig.KeyStore.VerifyJwt(jwt, true)) + else if (NodeConfig.KeyStore.VerifyJwt(jwt, true)) { //Set peer and verified flag since the another cache server signed the request state.IsPeer = true; @@ -117,12 +109,12 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints return true; } - public JsonWebToken ConfirmCLientNegotiation(ClientNegotiationState state, IPAddress clientIp, DateTimeOffset now) + public JsonWebToken ConfirmClientNegotiation(ClientNegotiationState state, IPAddress clientIp, DateTimeOffset now) { //Verified, now we can create an auth message with a short expiration JsonWebToken auth = new(); - auth.WriteHeader(_nodeConfig.KeyStore.GetJwtHeader()); + auth.WriteHeader(NodeConfig.KeyStore.GetJwtHeader()); auth.InitPayloadClaim() .AddClaim("aud", AudienceLocalServerId) .AddClaim("iat", now.ToUnixTimeSeconds()) @@ -136,24 +128,29 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints //Set ip address .AddClaim("ip", clientIp.ToString()) //Add negotiaion args - .AddClaim(FBMClient.REQ_HEAD_BUF_QUERY_ARG, _cacheConfig.MaxHeaderBufferSize) - .AddClaim(FBMClient.REQ_RECV_BUF_QUERY_ARG, _cacheConfig.MaxRecvBufferSize) - .AddClaim(FBMClient.REQ_MAX_MESS_QUERY_ARG, _cacheConfig.MaxMessageSize) + .AddClaim(FBMClient.REQ_HEAD_BUF_QUERY_ARG, CacheConfig.MaxHeaderBufferSize) + .AddClaim(FBMClient.REQ_RECV_BUF_QUERY_ARG, CacheConfig.MaxRecvBufferSize) + .AddClaim(FBMClient.REQ_MAX_MESS_QUERY_ARG, CacheConfig.MaxMessageSize) .CommitClaims(); //Sign the auth message from our private key - _nodeConfig.KeyStore.SignJwt(auth); + NodeConfig.KeyStore.SignJwt(auth); return auth; } - public bool ValidateUpgrade(string upgradeToken, string tokenSignature, DateTimeOffset now, IPAddress connectionIp, ref string? nodeId, ref bool isPeer) + public bool ValidateUpgrade(string? upgradeToken, string? tokenSignature, DateTimeOffset now, IPAddress connectionIp, ref string? nodeId, ref bool isPeer) { + if(string.IsNullOrWhiteSpace(upgradeToken) || string.IsNullOrWhiteSpace(tokenSignature)) + { + return false; + } + //Parse jwt using JsonWebToken jwt = JsonWebToken.Parse(upgradeToken); //verify signature against the cache public key, since this server must have signed it - if (!_nodeConfig.KeyStore.VerifyCachePeer(jwt)) + if (!NodeConfig.KeyStore.VerifyCachePeer(jwt)) { return false; } @@ -175,7 +172,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints } //Check node ip address matches if required - if (_nodeConfig.VerifyIp) + if (NodeConfig.VerifyIp) { if (!doc.RootElement.TryGetProperty("ip", out JsonElement ipEl)) { @@ -201,7 +198,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints } //Verify token signature against a fellow cache public key - return _nodeConfig.KeyStore.VerifyUpgradeToken(tokenSignature, upgradeToken, isPeer); + return NodeConfig.KeyStore.VerifyUpgradeToken(tokenSignature, upgradeToken, isPeer); } } } diff --git a/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs b/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs index 816e6c3..d6b733c 100644 --- a/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs +++ b/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2023 Vaughn Nugent +* Copyright (c) 2024 Vaughn Nugent * * Library: VNLib * Package: ObjectCacheServer @@ -55,11 +55,14 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints { internal const string LOG_SCOPE_NAME = "CONEP"; - - private readonly ICacheEventQueueManager PubSubManager; - private readonly IPeerMonitor Peers; - private readonly BlobCacheListener<IPeerEventQueue> Store; - private readonly NodeConfig NodeConfiguration; + + private readonly ObjectCacheSystemState _sysState; + + private PeerEventQueueManager PubSubManager => _sysState.PeerEventQueue; + private CachePeerMonitor Peers => _sysState.PeerMonitor; + private BlobCacheListener<IPeerEventQueue> Listener => _sysState.Listener; + private NodeConfig NodeConfiguration => _sysState.Configuration; + private readonly CacheNegotationManager AuthManager; private uint _connectedClients; @@ -72,7 +75,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints /// <summary> /// The cache store configuration /// </summary> - public CacheConfiguration CacheConfig { get; } + public CacheMemoryConfiguration CacheConfig => _sysState.MemoryConfiguration; //Loosen up protection settings ///<inheritdoc/> @@ -83,24 +86,11 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints public ConnectEndpoint(PluginBase plugin) { - //Get node configuration - NodeConfiguration = plugin.GetOrCreateSingleton<NodeConfig>(); + _sysState = plugin.GetOrCreateSingleton<ObjectCacheSystemState>(); //Init from config and create a new log scope InitPathAndLog(NodeConfiguration.ConnectPath, plugin.Log.CreateScope(LOG_SCOPE_NAME)); - - //Setup pub/sub manager - PubSubManager = plugin.GetOrCreateSingleton<CacheEventQueueManager>(); - - //Get peer monitor - Peers = plugin.GetOrCreateSingleton<CachePeerMonitor>(); - - //Init the cache store - Store = plugin.GetOrCreateSingleton<CacheStore>().Listener; - - //Get the cache store configuration - CacheConfig = plugin.GetConfigForType<CacheStore>().Deserialze<CacheConfiguration>(); - + //Get the auth manager AuthManager = plugin.GetOrCreateSingleton<CacheNegotationManager>(); } @@ -127,6 +117,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints { //Parse jwt from authoriation string? jwtAuth = entity.Server.Headers[HttpRequestHeader.Authorization]; + if (string.IsNullOrWhiteSpace(jwtAuth)) { return VirtualClose(entity, HttpStatusCode.Forbidden); @@ -149,26 +140,34 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints } //Verified, now we can create an auth message with a short expiration - using JsonWebToken auth = AuthManager.ConfirmCLientNegotiation(state, entity.TrustedRemoteIp, entity.RequestedTimeUtc); + using JsonWebToken auth = AuthManager.ConfirmClientNegotiation(state, entity.TrustedRemoteIp, entity.RequestedTimeUtc); - //Close response + //Close response by sending a copy of the signed token entity.CloseResponse(HttpStatusCode.OK, ContentType.Text, auth.DataBuffer); return VfReturnType.VirtualSkip; } protected override VfReturnType WebsocketRequested(HttpEntity entity) { + /* + * Check to see if any more connections are allowed, + * otherwise deny the connection + * + * This is done here to prevent the server from being overloaded + * on a new connection. It would be ideal to not grant new tokens + * but malicious clients could cache a bunch of tokens and use them + * later, exhausting resources. + */ + if(_connectedClients >= NodeConfiguration.MaxConcurrentConnections) + { + return VirtualClose(entity, HttpStatusCode.ServiceUnavailable); + } + //Parse jwt from authorization string? jwtAuth = entity.Server.Headers[HttpRequestHeader.Authorization]; string? clientSignature = entity.Server.Headers[FBMDataCacheExtensions.X_UPGRADE_SIG_HEADER]; string? optionalDiscovery = entity.Server.Headers[FBMDataCacheExtensions.X_NODE_DISCOVERY_HEADER]; - //Not null - if (string.IsNullOrWhiteSpace(jwtAuth) || string.IsNullOrWhiteSpace(clientSignature)) - { - return VfReturnType.Forbidden; - } - string? nodeId = null; bool isPeer = false; @@ -178,15 +177,15 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints return VirtualClose(entity, HttpStatusCode.Unauthorized); } - CacheNodeAdvertisment? discoveryAd = null; - /* * If the client is a peer server, it may offer a signed advertisment * that this node will have the duty of making available to other peers * if it is valid */ - if (isPeer && !string.IsNullOrWhiteSpace(optionalDiscovery)) + CacheNodeAdvertisment? discoveryAd = null; + + if (isPeer) { discoveryAd = NodeConfiguration.KeyStore.VerifyPeerAdvertisment(optionalDiscovery); } @@ -196,11 +195,10 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints try { //Get query config suggestions from the client - string recvBufCmd = entity.QueryArgs[FBMClient.REQ_RECV_BUF_QUERY_ARG]; - string maxHeaderCharCmd = entity.QueryArgs[FBMClient.REQ_HEAD_BUF_QUERY_ARG]; - string maxMessageSizeCmd = entity.QueryArgs[FBMClient.REQ_MAX_MESS_QUERY_ARG]; + string? recvBufCmd = entity.QueryArgs.GetValueOrDefault(FBMClient.REQ_RECV_BUF_QUERY_ARG); + string? maxHeaderCharCmd = entity.QueryArgs.GetValueOrDefault(FBMClient.REQ_HEAD_BUF_QUERY_ARG); + string? maxMessageSizeCmd = entity.QueryArgs.GetValueOrDefault(FBMClient.REQ_MAX_MESS_QUERY_ARG); - //Parse recv buffer size int recvBufSize = int.TryParse(recvBufCmd, out int rbs) ? rbs : CacheConfig.MinRecvBufferSize; int maxHeadBufSize = int.TryParse(maxHeaderCharCmd, out int hbs) ? hbs : CacheConfig.MinHeaderBufferSize; int maxMessageSize = int.TryParse(maxMessageSizeCmd, out int mxs) ? mxs : CacheConfig.MaxMessageSize; @@ -253,9 +251,8 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints Peers.OnPeerConnected(state); //Register plugin exit token to cancel the connected socket - CancellationTokenRegistration reg = this.GetPlugin().UnloadToken.Register(wss.CancelAll); - - //Inc connected count + await using CancellationTokenRegistration reg = this.GetPlugin().UnloadToken.Register(wss.CancelAll); + Interlocked.Increment(ref _connectedClients); try @@ -280,7 +277,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints try { //Begin listening for messages with a queue - await Store.ListenAsync(wss, queue, args); + await Listener.ListenAsync(wss, queue, args); } finally { @@ -291,7 +288,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints else { //Begin listening for messages without a queue - await Store.ListenAsync(wss, null!, args); + await Listener.ListenAsync(wss, null!, args); } } catch (OperationCanceledException) @@ -303,15 +300,13 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints } catch (Exception ex) { - Log.Debug(ex); + //If debug logging is enabled print a more detailed error message + Log.Error("An error occured on websocket connection: node {con} -> {error}", state.NodeId, ex.Message); + Log.Debug("Websocket connection error: node {con}\n{error}", state.NodeId, ex); } - - //Dec connected count + Interlocked.Decrement(ref _connectedClients); - //Unregister the token - reg.Unregister(); - //Notify monitor of disconnect Peers.OnPeerDisconnected(state); diff --git a/plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs b/plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs index 7d376b8..56fe8cd 100644 --- a/plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs +++ b/plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2023 Vaughn Nugent +* Copyright (c) 2024 Vaughn Nugent * * Library: VNLib * Package: ObjectCacheServer @@ -40,25 +40,29 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints { internal sealed class PeerDiscoveryEndpoint : ResourceEndpointBase { - private readonly IPeerMonitor PeerMonitor; - private readonly NodeConfig Config; + private readonly ObjectCacheSystemState SysState; + + private CacheAuthKeyStore KeyStore => SysState.Configuration.KeyStore; + + private CachePeerMonitor PeerMonitor => SysState.PeerMonitor; + + private CacheNodeConfiguration NodeConfig => SysState.Configuration.Config; - //Loosen up protection settings ///<inheritdoc/> protected override ProtectionSettings EndpointProtectionSettings { get; } = new() { - DisableSessionsRequired = true + /* + * Sessions will not be used or required for this endpoint. + * We should also assume the session system is not even loaded + */ + DisableSessionsRequired = true }; public PeerDiscoveryEndpoint(PluginBase plugin) { - //Get the peer monitor - PeerMonitor = plugin.GetOrCreateSingleton<CachePeerMonitor>(); - - //Get the node config - Config = plugin.GetOrCreateSingleton<NodeConfig>(); + SysState = plugin.GetOrCreateSingleton<ObjectCacheSystemState>(); - InitPathAndLog(Config.DiscoveryPath, plugin.Log); + InitPathAndLog(SysState.Configuration.DiscoveryPath!, plugin.Log); } protected override VfReturnType Get(HttpEntity entity) @@ -68,36 +72,41 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints if(string.IsNullOrWhiteSpace(authToken)) { - entity.CloseResponse(HttpStatusCode.Unauthorized); - return VfReturnType.VirtualSkip; + return VirtualClose(entity, HttpStatusCode.Unauthorized); } string subject = string.Empty; string challenge = string.Empty; - //Parse auth token - using(JsonWebToken jwt = JsonWebToken.Parse(authToken)) + try { + //Parse auth token + using JsonWebToken jwt = JsonWebToken.Parse(authToken); + //try to verify against cache node first - if (!Config.KeyStore.VerifyJwt(jwt, true)) + if (!KeyStore.VerifyJwt(jwt, true)) { //failed... //try to verify against client key - if (!Config.KeyStore.VerifyJwt(jwt, false)) + if (!KeyStore.VerifyJwt(jwt, false)) { //invalid token - entity.CloseResponse(HttpStatusCode.Unauthorized); - return VfReturnType.VirtualSkip; + return VirtualClose(entity, HttpStatusCode.Unauthorized); } } using JsonDocument payload = jwt.GetPayload(); //Get client info to pass back - subject = payload.RootElement.TryGetProperty("sub", out JsonElement subEl) ? subEl.GetString() ?? string.Empty : string.Empty; + subject = payload.RootElement.TryGetProperty("sub", out JsonElement subEl) ? subEl.GetString() ?? string.Empty : string.Empty; challenge = payload.RootElement.GetProperty("chl").GetString() ?? string.Empty; } + catch (FormatException) + { + //If tokens are invalid format, let the client know instead of a server error + return VfReturnType.BadRequest; + } //Valid key, get peer list to send to client CacheNodeAdvertisment[] peers = PeerMonitor.GetAllPeers() @@ -109,10 +118,10 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints using JsonWebToken response = new(); //set header from cache config - response.WriteHeader(Config.KeyStore.GetJwtHeader()); + response.WriteHeader(KeyStore.GetJwtHeader()); response.InitPayloadClaim() - .AddClaim("iss", Config.Config.NodeId) + .AddClaim("iss", NodeConfig.NodeId) //Audience is the requestor id .AddClaim("sub", subject) .AddClaim("iat", entity.RequestedTimeUtc.ToUnixTimeSeconds()) @@ -122,10 +131,9 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints .AddClaim("chl", challenge) .CommitClaims(); - //Sign the response - Config.KeyStore.SignJwt(response); - - //Send response to client + + KeyStore.SignJwt(response); + entity.CloseResponse(HttpStatusCode.OK, Net.Http.ContentType.Text, response.DataBuffer); return VfReturnType.VirtualSkip; } diff --git a/plugins/ObjectCacheServer/src/Endpoints/WellKnownEndpoint.cs b/plugins/ObjectCacheServer/src/Endpoints/WellKnownEndpoint.cs index 87a471b..04380c5 100644 --- a/plugins/ObjectCacheServer/src/Endpoints/WellKnownEndpoint.cs +++ b/plugins/ObjectCacheServer/src/Endpoints/WellKnownEndpoint.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2023 Vaughn Nugent +* Copyright (c) 2024 Vaughn Nugent * * Library: VNLib * Package: ObjectCacheServer @@ -59,7 +59,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints public WellKnownEndpoint(PluginBase plugin) { //Get the node config - NodeConfig nodeConfig = plugin.GetOrCreateSingleton<NodeConfig>(); + NodeConfig nodeConfig = plugin.GetOrCreateSingleton<ObjectCacheSystemState>().Configuration; //serialize the config, discovery may not be enabled _advertisment = nodeConfig.Config.Advertisment; diff --git a/plugins/ObjectCacheServer/src/NodeConfig.cs b/plugins/ObjectCacheServer/src/NodeConfig.cs index 3a2e10e..4dd9f4a 100644 --- a/plugins/ObjectCacheServer/src/NodeConfig.cs +++ b/plugins/ObjectCacheServer/src/NodeConfig.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2023 Vaughn Nugent +* Copyright (c) 2024 Vaughn Nugent * * Library: VNLib * Package: ObjectCacheServer @@ -66,6 +66,12 @@ namespace VNLib.Data.Caching.ObjectCache.Server /// </summary> public uint MaxPeerConnections { get; } = 10; + /// <summary> + /// The maxium number of concurrent client connections to allow + /// before rejecting new connections + /// </summary> + public uint MaxConcurrentConnections { get; } + public NodeConfig(PluginBase plugin, IConfigScope config) { //Get the port of the primary webserver @@ -92,32 +98,23 @@ namespace VNLib.Data.Caching.ObjectCache.Server //Init key store KeyStore = new(plugin); - - DiscoveryInterval = config["discovery_interval_sec"].GetTimeSpan(TimeParseType.Seconds); - - //Get the event queue purge interval - EventQueuePurgeInterval = config["queue_purge_interval_sec"].GetTimeSpan(TimeParseType.Seconds); - - //Get the max queue depth - MaxQueueDepth = (int)config["max_queue_depth"].GetUInt32(); - - - //Get the connect path - ConnectPath = config["connect_path"].GetString() ?? throw new KeyNotFoundException("Missing required key 'connect_path' in cluster config"); - - //Get the verify ip setting - VerifyIp = config["verify_ip"].GetBoolean(); + DiscoveryInterval = config.GetRequiredProperty("discovery_interval_sec", p => p.GetTimeSpan(TimeParseType.Seconds)); + EventQueuePurgeInterval = config.GetRequiredProperty("queue_purge_interval_sec", p => p.GetTimeSpan(TimeParseType.Seconds)); + MaxQueueDepth = (int)config.GetRequiredProperty("max_queue_depth", p => p.GetUInt32()); + ConnectPath = config.GetRequiredProperty("connect_path", p => p.GetString()!); + VerifyIp = config.GetRequiredProperty("verify_ip", p => p.GetBoolean()); + WellKnownPath = config.GetValueOrDefault("well_known_path", p => p.GetString()!, DefaultPath); + MaxPeerConnections = config.GetValueOrDefault("max_peers", p => p.GetUInt32(), 10u); Uri connectEp = BuildUri(usingTls, hostname, port, ConnectPath); Uri? discoveryEp = null; - Config = new(); - //Setup cache node config - Config.WithCacheEndpoint(connectEp) - .WithNodeId(nodeId) - .WithAuthenticator(KeyStore) - .WithTls(usingTls); + (Config = new()) + .WithCacheEndpoint(connectEp) + .WithNodeId(nodeId) + .WithAuthenticator(KeyStore) + .WithTls(usingTls); //Get the discovery path (optional) if (config.TryGetValue("discovery_path", out JsonElement discoveryPathEl)) @@ -133,20 +130,6 @@ namespace VNLib.Data.Caching.ObjectCache.Server } } - //Allow custom well-known path - if(config.TryGetValue("well_known_path", out JsonElement wkEl)) - { - WellKnownPath = wkEl.GetString() ?? DefaultPath; - } - //Default if not set - WellKnownPath ??= DefaultPath; - - //Get the max peer connections - if (config.TryGetValue("max_peers", out JsonElement maxPeerEl)) - { - MaxPeerConnections = maxPeerEl.GetUInt32(); - } - const string CacheConfigTemplate = @" Cluster Configuration: diff --git a/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs b/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs index aada787..b970cee 100644 --- a/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs +++ b/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2023 Vaughn Nugent +* Copyright (c) 2024 Vaughn Nugent * * Library: VNLib * Package: ObjectCacheServer @@ -23,13 +23,10 @@ */ using System; -using System.Threading; using System.Collections.Generic; using VNLib.Plugins; -using VNLib.Utils.Memory; using VNLib.Utils.Logging; -using VNLib.Utils.Memory.Diagnostics; using VNLib.Plugins.Extensions.Loading; using VNLib.Plugins.Extensions.Loading.Routing; @@ -42,39 +39,15 @@ namespace VNLib.Data.Caching.ObjectCache.Server public sealed class ObjectCacheServerEntry : PluginBase { public override string PluginName => "ObjectCache.Service"; - - private readonly Lazy<IUnmangedHeap> _cacheHeap; - - internal IUnmangedHeap ListenerHeap => _cacheHeap.Value; - - public ObjectCacheServerEntry() - { - //Init heap - _cacheHeap = new Lazy<IUnmangedHeap>(InitializeHeap, LazyThreadSafetyMode.PublicationOnly); - } - - internal IUnmangedHeap InitializeHeap() - { - //Create default heap - IUnmangedHeap _heap = MemoryUtil.InitializeNewHeapForProcess(); - try - { - //If the plugin is in debug mode enable heap tracking - return this.IsDebug() ? new TrackedHeapWrapper(_heap, true) : _heap; - } - catch - { - _heap.Dispose(); - throw; - } - } + protected override void OnLoad() { try { - //Get the node configuration first - NodeConfig config = this.GetOrCreateSingleton<NodeConfig>(); + //Initialize the cache node builder + ObjectCacheSystemState builder = this.GetOrCreateSingleton<ObjectCacheSystemState>(); + builder.Initialize(); //Route well-known endpoint this.Route<WellKnownEndpoint>(); @@ -86,7 +59,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server _ = this.GetOrCreateSingleton<CacheNodeReplicationMaanger>(); //Setup discovery endpoint - if(!string.IsNullOrWhiteSpace(config.DiscoveryPath)) + if(!string.IsNullOrWhiteSpace(builder.Configuration.DiscoveryPath)) { this.Route<PeerDiscoveryEndpoint>(); } @@ -101,12 +74,6 @@ namespace VNLib.Data.Caching.ObjectCache.Server protected override void OnUnLoad() { - //dispose heap if initialized - if(_cacheHeap.IsValueCreated) - { - _cacheHeap.Value.Dispose(); - } - Log.Information("Plugin unloaded"); } diff --git a/plugins/ObjectCacheServer/src/ObjectCacheSystemState.cs b/plugins/ObjectCacheServer/src/ObjectCacheSystemState.cs new file mode 100644 index 0000000..6183956 --- /dev/null +++ b/plugins/ObjectCacheServer/src/ObjectCacheSystemState.cs @@ -0,0 +1,215 @@ +/* +* Copyright (c) 2024 Vaughn Nugent +* +* Library: VNLib +* Package: ObjectCacheServer +* File: ObjectCacheSystemState.cs +* +* ObjectCacheSystemState.cs is part of ObjectCacheServer which is +* part of the larger VNLib collection of libraries and utilities. +* +* ObjectCacheServer is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* ObjectCacheServer is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Linq; +using System.Net.Http; +using System.Net.Sockets; + +using VNLib.Utils.Logging; +using VNLib.Utils.Memory; +using VNLib.Utils.Memory.Diagnostics; +using VNLib.Net.Messaging.FBM; +using VNLib.Plugins; +using VNLib.Plugins.Extensions.Loading; + +using VNLib.Data.Caching.Extensions.Clustering; +using VNLib.Data.Caching.ObjectCache.Server.Cache; +using VNLib.Data.Caching.ObjectCache.Server.Clustering; + +namespace VNLib.Data.Caching.ObjectCache.Server +{ + [ConfigurationName("cache")] + internal sealed class ObjectCacheSystemState(PluginBase plugin, IConfigScope config) : IDisposable + { + const string LISTENER_LOG_SCOPE = "CacheListener"; + + public BlobCacheListener<IPeerEventQueue> Listener { get; private set; } = null!; + + public ICacheStore InternalStore { get; private set; } = null!; + + /// <summary> + /// Used for miscellaneous shared memory allocations (like the cache listener) + /// </summary> + public IUnmangedHeap SharedCacheHeap { get; private set; } = null!; + + /// <summary> + /// The plugin-wide, shared node configuration + /// </summary> + public NodeConfig Configuration { get; } = plugin.GetOrCreateSingleton<NodeConfig>(); + + /// <summary> + /// The peer discovery manager + /// </summary> + public PeerDiscoveryManager PeerDiscovery { get; private set; } = null!; + + /// <summary> + /// System wide peer monitor + /// </summary> + public CachePeerMonitor PeerMonitor { get; } = new(); + + public CacheMemoryConfiguration MemoryConfiguration { get; } = config.Deserialze<CacheMemoryConfiguration>(); + + /// <summary> + /// The system wide peer event queue manager + /// </summary> + public PeerEventQueueManager PeerEventQueue { get; private set; } + + void IDisposable.Dispose() + { + SharedCacheHeap.Dispose(); + Listener.Dispose(); + } + + /// <summary> + /// Initializes the cache node state + /// </summary> + public void Initialize() + { + CacheMemoryConfiguration cacheConf = MemoryConfiguration; + + ArgumentOutOfRangeException.ThrowIfLessThan(cacheConf.MaxCacheEntries, 2u); + + //Suggestion + if (cacheConf.MaxCacheEntries < 200) + { + plugin.Log.Information("Suggestion: You may want a larger cache size, you have less than 200 items in cache"); + } + + LogMemConfiguration(); + + //If the plugin is in debug mode enable heap tracking + SharedCacheHeap = plugin.IsDebug() ? + new TrackedHeapWrapper(MemoryUtil.InitializeNewHeapForProcess(), true) + : MemoryUtil.InitializeNewHeapForProcess(); + + ConfigurePeerDiscovery(); + + ConfigureCacheListener(); + + PeerEventQueue = new(plugin, Configuration); + } + + private void ConfigurePeerDiscovery() + { + //Get the known peers array from config, its allowed to be null for master nodes + IConfigScope? config = plugin.TryGetConfig("known_peers"); + string[] kownPeers = config?.Deserialze<string[]>() ?? []; + + ILogProvider discLogger = plugin.Log.CreateScope(PeerDiscoveryManager.LOG_SCOPE_NAME); + + Configuration.Config.WithInitialPeers(kownPeers.Select(static s => new Uri(s))) + .WithErrorHandler(new ErrorHandler(discLogger)); + + discLogger.Information("Inital peer nodes: {nodes}", kownPeers); + + PeerDiscovery = new PeerDiscoveryManager( + Configuration, + discLogger, + plugin.IsDebug(), + kownPeers.Length > 0 + ); + + //Discovery manager needs to be scheduled for background work to run the discovery loop + _ = plugin.ObserveWork(PeerDiscovery, 10); + } + + private void ConfigureCacheListener() + { + /* + * Allow loading external managed dll for a bucket-local memory manager + */ + ICacheMemoryManagerFactory manager; + + if (string.IsNullOrWhiteSpace(MemoryConfiguration.ExternLibPath)) + { + //Get the memory manager + manager = plugin.GetOrCreateSingleton<BucketLocalManagerFactory>(); + } + else + { + manager = plugin.CreateServiceExternal<ICacheMemoryManagerFactory>(MemoryConfiguration.ExternLibPath); + } + + //Endpoint only allows for a single reader + Listener = new( + plugin.LoadMemoryCacheSystem(config, manager, MemoryConfiguration), + plugin.GetOrCreateSingleton<CacheListenerPubQueue>(), + plugin.Log.CreateScope(LISTENER_LOG_SCOPE), + new SharedHeapFBMMemoryManager(SharedCacheHeap) + ); + + InternalStore = new CacheStore(Listener.Cache); + } + + private void LogMemConfiguration() + { + const string CacheConfigTemplate = +@" +Cache Configuration: + Max memory: {max} Mb + Buckets: {bc} + Entries per-bucket: {mc} + HeapTracking: {ht} +"; + + CacheMemoryConfiguration cacheConf = MemoryConfiguration; + + //calculate the max memory usage + ulong maxByteSize = cacheConf.MaxCacheEntries * (ulong)cacheConf.BucketCount * (ulong)cacheConf.MaxMessageSize; + + //Log the cache config + plugin.Log.Information( + CacheConfigTemplate, + maxByteSize / (1024 * 1000), + cacheConf.BucketCount, + cacheConf.MaxCacheEntries, + plugin.IsDebug() + ); + } + + private sealed class ErrorHandler(ILogProvider Logger) : ICacheDiscoveryErrorHandler + { + public void OnDiscoveryError(CacheNodeAdvertisment errorNode, Exception ex) + { + if (ex is HttpRequestException hre) + { + if (hre.InnerException is SocketException se) + { + //transport failed + Logger.Warn("Failed to connect to server {serv} because {err}", errorNode, se.Message); + } + else + { + Logger.Error("Failed to connect to node {n}\n{err}", errorNode, hre); + } + } + else + { + Logger.Error("Failed to discover nodes from nodeid {nid}, with error\n{err}", errorNode, ex); + } + } + } + } +} |