diff options
Diffstat (limited to 'plugins')
12 files changed, 273 insertions, 179 deletions
diff --git a/plugins/ObjectCacheServer/src/CacheConfiguration.cs b/plugins/ObjectCacheServer/src/Cache/CacheConfiguration.cs index f7adeb3..86e1f5a 100644 --- a/plugins/ObjectCacheServer/src/CacheConfiguration.cs +++ b/plugins/ObjectCacheServer/src/Cache/CacheConfiguration.cs @@ -24,7 +24,7 @@ using System.Text.Json.Serialization; -namespace VNLib.Data.Caching.ObjectCache.Server +namespace VNLib.Data.Caching.ObjectCache.Server.Cache { internal sealed class CacheConfiguration { @@ -42,7 +42,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server [JsonPropertyName("max_message_size")] public int MaxMessageSize { get; set; } = 1000 * 1024; - + [JsonPropertyName("change_queue_max_depth")] public int MaxEventQueueDepth { get; set; } = 10 * 1000; diff --git a/plugins/ObjectCacheServer/src/CacheEventQueueManager.cs b/plugins/ObjectCacheServer/src/Cache/CacheEventQueueManager.cs index 3827121..ad0eb5a 100644 --- a/plugins/ObjectCacheServer/src/CacheEventQueueManager.cs +++ b/plugins/ObjectCacheServer/src/Cache/CacheEventQueueManager.cs @@ -37,31 +37,31 @@ using VNLib.Plugins.Extensions.Loading; using VNLib.Plugins.Extensions.Loading.Events; -namespace VNLib.Data.Caching.ObjectCache.Server +namespace VNLib.Data.Caching.ObjectCache.Server.Cache { - - [ConfigurationName("event_manager")] internal sealed class CacheEventQueueManager : ICacheEventQueueManager, IDisposable, IIntervalScheduleable { private readonly int MaxQueueDepth; private readonly object SubLock; - private readonly LinkedList<NodeQueue> Subscribers; + private readonly LinkedList<NodeQueue> Subscribers; private readonly object StoreLock; private readonly Dictionary<string, NodeQueue> QueueStore; - - public CacheEventQueueManager(PluginBase plugin, IConfigScope config) + + public CacheEventQueueManager(PluginBase plugin) { - //Get purge interval - TimeSpan purgeInterval = config["purge_interval_sec"].GetTimeSpan(TimeParseType.Seconds); + //Get node config + NodeConfig config = plugin.GetOrCreateSingleton<NodeConfig>(); //Get max queue depth - MaxQueueDepth = (int)config["max_depth"].GetUInt32(); + MaxQueueDepth = config.MaxQueueDepth; - //Create purge interval - plugin.ScheduleInterval(this, purgeInterval); + /* + * Schedule purge interval to clean up stale queues + */ + plugin.ScheduleInterval(this, config.EventQueuePurgeInterval); SubLock = new(); Subscribers = new(); @@ -81,7 +81,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server lock (StoreLock) { //Try to recover the queue for the node - if(!QueueStore.TryGetValue(peer.NodeId, out nq)) + if (!QueueStore.TryGetValue(peer.NodeId, out nq)) { //Create new queue nq = new(peer.NodeId, MaxQueueDepth); @@ -163,21 +163,21 @@ namespace VNLib.Data.Caching.ObjectCache.Server public void PurgeStaleSubscribers() { //Enter locks - lock(SubLock) - lock(StoreLock) - { - //Get all stale queues (queues without listeners) - NodeQueue[] staleQueues = QueueStore.Values.Where(static nq => nq.Listeners == 0).ToArray(); - - foreach (NodeQueue nq in staleQueues) + lock (SubLock) + lock (StoreLock) { - //Remove from store - QueueStore.Remove(nq.NodeId); + //Get all stale queues (queues without listeners) + NodeQueue[] staleQueues = QueueStore.Values.Where(static nq => nq.Listeners == 0).ToArray(); + + foreach (NodeQueue nq in staleQueues) + { + //Remove from store + QueueStore.Remove(nq.NodeId); - //remove from subscribers - Subscribers.Remove(nq); + //remove from subscribers + Subscribers.Remove(nq); + } } - } } //Interval to purge stale subscribers @@ -186,7 +186,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server log.Debug("Purging stale peer event queues"); PurgeStaleSubscribers(); - + return Task.CompletedTask; } @@ -241,7 +241,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server public void PublishChanges(Span<ChangeEvent> changes) { - for(int i = 0; i < changes.Length; i++) + for (int i = 0; i < changes.Length; i++) { Queue.TryEnque(changes[i]); } diff --git a/plugins/ObjectCacheServer/src/CacheListenerPubQueue.cs b/plugins/ObjectCacheServer/src/Cache/CacheListenerPubQueue.cs index 9c7388e..ba39db6 100644 --- a/plugins/ObjectCacheServer/src/CacheListenerPubQueue.cs +++ b/plugins/ObjectCacheServer/src/Cache/CacheListenerPubQueue.cs @@ -26,16 +26,28 @@ using System; using System.Threading; using System.Threading.Tasks; using System.Threading.Channels; +using System.Diagnostics.CodeAnalysis; using VNLib.Utils.Async; using VNLib.Utils.Logging; using VNLib.Plugins; using VNLib.Plugins.Extensions.Loading; -namespace VNLib.Data.Caching.ObjectCache.Server +namespace VNLib.Data.Caching.ObjectCache.Server.Cache { + /* + * Implements the event queue for the cache listener. Captures changes from the cache store + * and publishes them to subscribers. + * + * It also allows clients that are listening for changes to wait for events to + * their individual queues. + */ + internal sealed class CacheListenerPubQueue : ICacheListenerEventQueue, IAsyncBackgroundWork { + private const int MAX_LOCAL_QUEUE_ITEMS = 10000; + private const string LOG_SCOPE_NAME = "QUEUE"; + private readonly AsyncQueue<ChangeEvent> _listenerQueue; private readonly ILogProvider _logProvider; private readonly ICacheEventQueueManager _queueManager; @@ -43,8 +55,10 @@ namespace VNLib.Data.Caching.ObjectCache.Server public CacheListenerPubQueue(PluginBase plugin) { _queueManager = plugin.GetOrCreateSingleton<CacheEventQueueManager>(); - _logProvider = plugin.Log; - _listenerQueue = new AsyncQueue<ChangeEvent>(new BoundedChannelOptions(10000) + _logProvider = plugin.Log.CreateScope(LOG_SCOPE_NAME); + + //Init local queue to store published events + _listenerQueue = new(new BoundedChannelOptions(MAX_LOCAL_QUEUE_ITEMS) { AllowSynchronousContinuations = true, FullMode = BoundedChannelFullMode.DropOldest, @@ -54,12 +68,17 @@ namespace VNLib.Data.Caching.ObjectCache.Server } ///<inheritdoc/> - public async Task DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken) + async Task IAsyncBackgroundWork.DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken) { const int accumulatorSize = 64; + //Create scope + pluginLog = pluginLog.CreateScope(LOG_SCOPE_NAME); + try { + pluginLog.Debug("Change queue worker listening for local cache changes"); + //Accumulator for events ChangeEvent[] accumulator = new ChangeEvent[accumulatorSize]; int index = 0; @@ -89,9 +108,9 @@ namespace VNLib.Data.Caching.ObjectCache.Server pluginLog.Debug("Change queue listener worker exited"); } } - + ///<inheritdoc/> - public bool IsEnabled(object userState) + public bool IsEnabled([NotNullWhen(true)] object? userState) { return userState is IPeerEventQueue; } diff --git a/plugins/ObjectCacheServer/src/CacheStore.cs b/plugins/ObjectCacheServer/src/Cache/CacheStore.cs index c1d47f6..f94a3f5 100644 --- a/plugins/ObjectCacheServer/src/CacheStore.cs +++ b/plugins/ObjectCacheServer/src/Cache/CacheStore.cs @@ -30,11 +30,19 @@ using VNLib.Utils.Logging; using VNLib.Plugins; using VNLib.Plugins.Extensions.Loading; -namespace VNLib.Data.Caching.ObjectCache.Server +namespace VNLib.Data.Caching.ObjectCache.Server.Cache { + /* + * Implements the blob cache store, which is an abstraction around the blob cache listener. + * This allows for publishing local events (say from other nodes) to keep caches in sync. + */ + [ConfigurationName("cache")] internal sealed class CacheStore : ICacheStore, IDisposable { + /// <summary> + /// Gets the underlying cache listener + /// </summary> public BlobCacheListener Listener { get; } @@ -44,16 +52,19 @@ namespace VNLib.Data.Caching.ObjectCache.Server Listener = InitializeCache((ObjectCacheServerEntry)plugin, config); } + ///<inheritdoc/> ValueTask ICacheStore.AddOrUpdateBlobAsync<T>(string objectId, string? alternateId, GetBodyDataCallback<T> bodyData, T state, CancellationToken token) { return Listener.Cache.AddOrUpdateObjectAsync(objectId, alternateId, bodyData, state, default, token); } + ///<inheritdoc/> void ICacheStore.Clear() { throw new NotImplementedException(); } + ///<inheritdoc/> ValueTask<bool> ICacheStore.DeleteItemAsync(string id, CancellationToken token) { return Listener.Cache.DeleteObjectAsync(id, token); @@ -81,14 +92,14 @@ Cache Configuration: if (cacheConf.MaxCacheEntries < 200) { plugin.Log.Information("Suggestion: You may want a larger cache size, you have less than 200 items in cache"); - } + } //calculate the max memory usage - ulong maxByteSize = ((ulong)cacheConf.MaxCacheEntries * (ulong)cacheConf.BucketCount * (ulong)cacheConf.MaxMessageSize); + ulong maxByteSize = cacheConf.MaxCacheEntries * (ulong)cacheConf.BucketCount * (ulong)cacheConf.MaxMessageSize; //Log the cache config - plugin.Log.Information(CacheConfigTemplate, - maxByteSize / (ulong)(1024 * 1000), + plugin.Log.Information(CacheConfigTemplate, + maxByteSize / (1024 * 1000), cacheConf.BucketCount, cacheConf.MaxCacheEntries ); @@ -103,6 +114,9 @@ Cache Configuration: return new(bc, queue, plugin.Log, plugin.CacheHeap); } + /* + * Cleaned up by the plugin on exit + */ public void Dispose() { Listener.Dispose(); diff --git a/plugins/ObjectCacheServer/src/CacheSystemUtil.cs b/plugins/ObjectCacheServer/src/Cache/CacheSystemUtil.cs index f8aedae..2071d2b 100644 --- a/plugins/ObjectCacheServer/src/CacheSystemUtil.cs +++ b/plugins/ObjectCacheServer/src/Cache/CacheSystemUtil.cs @@ -33,7 +33,7 @@ using VNLib.Plugins; using VNLib.Utils.Memory; using VNLib.Plugins.Extensions.Loading; -namespace VNLib.Data.Caching.ObjectCache.Server +namespace VNLib.Data.Caching.ObjectCache.Server.Cache { internal static class CacheSystemUtil { @@ -136,9 +136,9 @@ namespace VNLib.Data.Caching.ObjectCache.Server //Return the return new(loader); } - - private sealed class RuntimeBlobCacheTable : IBlobCacheTable + + private sealed class RuntimeBlobCacheTable : IBlobCacheTable { private readonly IBlobCacheTable _table; @@ -173,7 +173,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server internal sealed class PersistantCacheManager : IPersistantCacheStore { const string INITIALIZE_METHOD_NAME = "OnInitializeForBucket"; - + /* * Our referrence can be technically unloaded, but so will diff --git a/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs b/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs index ffdd4f4..5a04737 100644 --- a/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs +++ b/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs @@ -36,6 +36,7 @@ using VNLib.Net.Messaging.FBM; using VNLib.Net.Messaging.FBM.Client; using VNLib.Plugins.Extensions.Loading; using VNLib.Data.Caching.Extensions.Clustering; +using VNLib.Data.Caching.ObjectCache.Server.Cache; namespace VNLib.Data.Caching.ObjectCache.Server.Clustering { diff --git a/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs b/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs index f132cab..65cc009 100644 --- a/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs +++ b/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs @@ -31,9 +31,9 @@ using System.Threading.Tasks; using System.Collections.Generic; using VNLib.Utils.Logging; -using VNLib.Data.Caching.Extensions; using VNLib.Plugins; using VNLib.Plugins.Extensions.Loading; +using VNLib.Data.Caching.Extensions; using VNLib.Data.Caching.Extensions.Clustering; namespace VNLib.Data.Caching.ObjectCache.Server.Clustering @@ -46,38 +46,48 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering internal sealed class PeerDiscoveryManager : IAsyncBackgroundWork, ICachePeerAdapter { private const string LOG_SCOPE_NAME = "DISC"; + /* + * The initial discovery delay. This allows for the server to initialize before + * starting the discovery process. This will probably be a shorter delay + * than a usual discovery interval. + */ private static readonly TimeSpan InitialDelay = TimeSpan.FromSeconds(15); + private static readonly TimeSpan WhenWellKnownResolveFailed = TimeSpan.FromSeconds(20); + private readonly List<CacheNodeAdvertisment> _connectedPeers; - private readonly NodeConfig _config; - private readonly CachePeerMonitor _monitor; + private readonly NodeConfig Config; + private readonly CachePeerMonitor Monitor; + private readonly ILogProvider Log; private readonly bool IsDebug; - private readonly ILogProvider _log; + private readonly bool HasWellKnown; public PeerDiscoveryManager(PluginBase plugin) { //Get config - _config = plugin.GetOrCreateSingleton<NodeConfig>(); + Config = plugin.GetOrCreateSingleton<NodeConfig>(); //Get the known peers array from config, its allowed to be null for master nodes IConfigScope? config = plugin.TryGetConfig("known_peers"); string[] kownPeers = config?.Deserialze<string[]>() ?? Array.Empty<string>(); //Add known peers to the monitor - _config.Config.WithInitialPeers(kownPeers.Select(static s => new Uri(s))); + Config.Config.WithInitialPeers(kownPeers.Select(static s => new Uri(s))); - plugin.Log.Information("Inital peer nodes: {nodes}", kownPeers); + HasWellKnown = kownPeers.Length > 0; //Get the peer monitor - _monitor = plugin.GetOrCreateSingleton<CachePeerMonitor>(); + Monitor = plugin.GetOrCreateSingleton<CachePeerMonitor>(); _connectedPeers = new(); //Create scoped logger - _log = plugin.Log.CreateScope(LOG_SCOPE_NAME); + Log = plugin.Log.CreateScope(LOG_SCOPE_NAME); + + Log.Information("Inital peer nodes: {nodes}", kownPeers); //Setup discovery error handler - _config.Config.WithErrorHandler(new ErrorHandler(_log)); + Config.Config.WithErrorHandler(new ErrorHandler(Log)); IsDebug = plugin.IsDebug(); } @@ -93,33 +103,29 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering //Start the change listener Task watcher = WatchForPeersAsync(exitToken); - _log.Information("Node discovery worker started, waiting for {idel} to start initial discovery", InitialDelay); + Log.Information("Node discovery worker started, waiting for {idel} to start initial discovery", InitialDelay); try { //Wait for the initial delay await Task.Delay(InitialDelay, exitToken); - _log.Debug("Begining discovery loop"); - - /* - * To avoid connecting to ourself, we add ourselves to the connected list - * and it should never get removed. This is because the monitor will never - * report our own advertisment. - */ - _connectedPeers.Add(_config.Config.Advertisment); + Log.Debug("Begining discovery loop"); while (true) { + bool wellKnownFailed = false; + try { if (IsDebug) { - _log.Debug("Begining node discovery"); + Log.Debug("Begining node discovery"); } //Resolve all known peers - CacheNodeAdvertisment[] wellKnown = await _config.Config.ResolveWellKnownAsync(exitToken); + CacheNodeAdvertisment[] wellKnown = await Config.Config.ResolveWellKnownAsync(exitToken); + wellKnownFailed = wellKnown.Length == 0; //Use the monitor to get the initial peers IEnumerable<CacheNodeAdvertisment> ads = GetMonitorAds(); @@ -130,15 +136,15 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering if (allAds.Length > 0) { //Discover all kown nodes - await _config.Config.DiscoverNodesAsync(allAds, exitToken); + await Config.Config.DiscoverNodesAsync(allAds, exitToken); } //Log the discovered nodes if verbose logging is enabled if (IsDebug) { - CacheNodeAdvertisment[] found = _config.Config.NodeCollection.GetAllNodes(); + CacheNodeAdvertisment[] found = Config.Config.NodeCollection.GetAllNodes(); - _log.Debug("Discovered {count} nodes\n\t{nodes}", found.Length, found.Select(static s => s.NodeId)); + Log.Debug("Discovered {count} nodes\n\t{nodes}", found.Length, found.Select(static s => s.NodeId)); } } catch(OperationCanceledException) @@ -147,17 +153,38 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering } catch (Exception ex) { - _log.Error(ex, "Failed to discover new peer nodes"); + Log.Error(ex, "Failed to discover new peer nodes"); } - //Delay the next discovery - await Task.Delay(_config.DiscoveryInterval, exitToken); + /* + * If we have well known nodes and the discovery failed, we wait for a shorter + * duration before retrying. This is to avoid spamming the network with requests + * if the well known nodes are down. But if we don't have any well known nodes + * we cannot continue. + * + * This only matters if we are exepcted to have well known nodes. + */ + if(HasWellKnown && wellKnownFailed) + { + if (IsDebug) + { + Log.Debug("Well known node discovery failed, waiting for {idel} before retrying", WhenWellKnownResolveFailed); + } + + //Wait for shorter duration + await Task.Delay(WhenWellKnownResolveFailed, exitToken); + } + else + { + //Delay the next discovery + await Task.Delay(Config.DiscoveryInterval, exitToken); + } } } catch (OperationCanceledException) { //Normal exit - _log.Information("Node discovery worker exiting"); + Log.Information("Node discovery worker exiting on plugin exit"); } finally { @@ -170,10 +197,10 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering private IEnumerable<CacheNodeAdvertisment> GetMonitorAds() { - return _monitor.GetAllPeers() + return Monitor.GetAllPeers() .Where(static p => p.Advertisment != null) //Without us - .Where(n => n.NodeId != _config.Config.NodeId) + .Where(n => n.NodeId != Config.Config.NodeId) .Select(static p => p.Advertisment!); } @@ -182,26 +209,26 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering { try { - _log.Debug("Discovery worker waiting for new peers to connect"); + Log.Debug("Discovery worker waiting for new peers to connect"); while (true) { //Wait for changes, then get new peers - await _monitor.WaitForChangeAsync().WaitAsync(cancellation); + await Monitor.WaitForChangeAsync().WaitAsync(cancellation); - _log.Verbose("New peers connected"); + Log.Verbose("New peers connected"); //Use the monitor to get the initial peers IEnumerable<CacheNodeAdvertisment> ads = GetMonitorAds(); - ((NodeDiscoveryCollection)_config.Config.NodeCollection).AddManualNodes(ads); + ((NodeDiscoveryCollection)Config.Config.NodeCollection).AddManualNodes(ads); } } catch (OperationCanceledException) { //Normal ext - _log.Debug("Connected peer listener exited"); + Log.Debug("Connected peer listener exited"); } } @@ -212,7 +239,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering lock (_connectedPeers) { //Get all discovered peers - CacheNodeAdvertisment[] peers = _config.Config.NodeCollection.GetAllNodes(); + CacheNodeAdvertisment[] peers = Config.Config.NodeCollection.GetAllNodes(); //Get the difference between the discovered peers and the connected peers return peers.Except(_connectedPeers).ToArray(); diff --git a/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs b/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs index 5e794f8..d232fd8 100644 --- a/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs +++ b/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs @@ -45,13 +45,13 @@ using VNLib.Plugins.Extensions.Loading; using VNLib.Plugins.Essentials.Endpoints; using VNLib.Plugins.Essentials.Extensions; using VNLib.Plugins.Extensions.Loading.Routing; -using VNLib.Data.Caching.ObjectCache.Server.Distribution; using VNLib.Data.Caching.Extensions.Clustering; +using VNLib.Data.Caching.ObjectCache.Server.Cache; +using VNLib.Data.Caching.ObjectCache.Server.Clustering; namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints { - [ConfigurationName("connect_endpoint")] internal sealed class ConnectEndpoint : ResourceEndpointBase { private const string LOG_SCOPE_NAME = "CONEP"; @@ -62,10 +62,8 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints private readonly NodeConfig NodeConfiguration; private readonly ICacheEventQueueManager PubSubManager; private readonly IPeerMonitor Peers; - private readonly BlobCacheListener Store; - - private readonly bool VerifyIp; + private readonly string AudienceLocalServerId; private uint _connectedClients; @@ -88,21 +86,17 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints DisableSessionsRequired = true }; - public ConnectEndpoint(PluginBase plugin, IConfigScope config) + public ConnectEndpoint(PluginBase plugin) { - string? path = config["path"].GetString(); - - InitPathAndLog(path, plugin.Log.CreateScope(LOG_SCOPE_NAME)); + //Get node configuration + NodeConfiguration = plugin.GetOrCreateSingleton<NodeConfig>(); - //Check for ip-verification flag - VerifyIp = config.TryGetValue("verify_ip", out JsonElement vIpEl) && vIpEl.GetBoolean(); + //Init from config and create a new log scope + InitPathAndLog(NodeConfiguration.ConnectPath, plugin.Log.CreateScope(LOG_SCOPE_NAME)); //Setup pub/sub manager PubSubManager = plugin.GetOrCreateSingleton<CacheEventQueueManager>(); - //Get node configuration - NodeConfiguration = plugin.GetOrCreateSingleton<NodeConfig>(); - //Get peer monitor Peers = plugin.GetOrCreateSingleton<CachePeerMonitor>(); @@ -186,7 +180,14 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints } } - Log.Debug("Received negotiation request from node {node}", nodeId); + if (isPeer) + { + Log.Debug("Received negotiation request from peer node {node}", nodeId); + } + else + { + Log.Debug("Received negotiation request from client {client}", entity.TrustedRemoteIp.ToString()); + } //Verified, now we can create an auth message with a short expiration using JsonWebToken auth = new(); @@ -256,7 +257,8 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints //Verify audience, expiration - if (!doc.RootElement.TryGetProperty("aud", out JsonElement audEl) || !AudienceLocalServerId.Equals(audEl.GetString(), StringComparison.OrdinalIgnoreCase)) + if (!doc.RootElement.TryGetProperty("aud", out JsonElement audEl) + || !AudienceLocalServerId.Equals(audEl.GetString(), StringComparison.OrdinalIgnoreCase)) { entity.CloseResponse(HttpStatusCode.Unauthorized); return VfReturnType.VirtualSkip; @@ -270,7 +272,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints } //Check node ip address matches if required - if (VerifyIp) + if (NodeConfiguration.VerifyIp) { if (!doc.RootElement.TryGetProperty("ip", out JsonElement ipEl)) { @@ -318,6 +320,8 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints } } + WsUserState state; + try { //Get query config suggestions from the client @@ -340,7 +344,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints int maxMessageSizeClamp = Math.Clamp(maxMessageSize, CacheConfig.MinRecvBufferSize, CacheConfig.MaxRecvBufferSize); //Init new ws state object and clamp the suggested buffer sizes - WsUserState state = new() + state = new() { RecvBufferSize = Math.Clamp(recvBufSize, CacheConfig.MinRecvBufferSize, CacheConfig.MaxRecvBufferSize), MaxHeaderBufferSize = Math.Clamp(maxHeadBufSize, CacheConfig.MinHeaderBufferSize, CacheConfig.MaxHeaderBufferSize), @@ -356,20 +360,18 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints NodeId = nodeId, Advertisment = discoveryAd }; - - Log.Debug("Client recv buffer suggestion {recv}, header buffer size {head}, response buffer size {r}", recvBufCmd, maxHeaderCharCmd, state.MaxResponseBufferSize); - - //Print state message to console - Log.Verbose("Client buffer state {state}", state); - - //Accept socket and pass state object - entity.AcceptWebSocket(WebsocketAcceptedAsync, state); - return VfReturnType.VirtualSkip; } catch (KeyNotFoundException) { return VfReturnType.BadRequest; } + + //Print state message to console + Log.Debug("Client buffer state {state}", state); + + //Accept socket and pass state object + entity.AcceptWebSocket(WebsocketAcceptedAsync, state); + return VfReturnType.VirtualSkip; } private async Task WebsocketAcceptedAsync(WebSocketSession wss) diff --git a/plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs b/plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs index 77d59dd..adb83e0 100644 --- a/plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs +++ b/plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs @@ -27,19 +27,17 @@ using System.Net; using System.Linq; using System.Text.Json; -using VNLib.Hashing; using VNLib.Hashing.IdentityUtility; using VNLib.Plugins; using VNLib.Plugins.Essentials; using VNLib.Plugins.Essentials.Endpoints; using VNLib.Plugins.Essentials.Extensions; using VNLib.Plugins.Extensions.Loading; -using VNLib.Data.Caching.ObjectCache.Server.Distribution; using VNLib.Data.Caching.Extensions.Clustering; +using VNLib.Data.Caching.ObjectCache.Server.Clustering; namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints { - [ConfigurationName("discovery_endpoint")] internal sealed class PeerDiscoveryEndpoint : ResourceEndpointBase { private readonly IPeerMonitor PeerMonitor; @@ -53,17 +51,15 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints DisableSessionsRequired = true }; - public PeerDiscoveryEndpoint(PluginBase plugin, IConfigScope config) + public PeerDiscoveryEndpoint(PluginBase plugin) { - string? path = config["path"].GetString(); - - InitPathAndLog(path, plugin.Log); - //Get the peer monitor PeerMonitor = plugin.GetOrCreateSingleton<CachePeerMonitor>(); //Get the node config Config = plugin.GetOrCreateSingleton<NodeConfig>(); + + InitPathAndLog(Config.DiscoveryPath, plugin.Log); } protected override VfReturnType Get(HttpEntity entity) @@ -100,7 +96,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints using JsonDocument payload = jwt.GetPayload(); //Get client info to pass back - subject = payload.RootElement.GetProperty("sub").GetString() ?? string.Empty; + subject = payload.RootElement.TryGetProperty("sub", out JsonElement subEl) ? subEl.GetString() ?? string.Empty : string.Empty; challenge = payload.RootElement.GetProperty("chl").GetString() ?? string.Empty; } diff --git a/plugins/ObjectCacheServer/src/Endpoints/WellKnownEndpoint.cs b/plugins/ObjectCacheServer/src/Endpoints/WellKnownEndpoint.cs index 99c7f19..3fcc471 100644 --- a/plugins/ObjectCacheServer/src/Endpoints/WellKnownEndpoint.cs +++ b/plugins/ObjectCacheServer/src/Endpoints/WellKnownEndpoint.cs @@ -24,10 +24,7 @@ using System; using System.Net; -using System.Text.Json; -using VNLib.Data.Caching.Extensions; -using VNLib.Data.Caching.Extensions.Clustering; using VNLib.Hashing; using VNLib.Hashing.IdentityUtility; using VNLib.Plugins; @@ -35,6 +32,8 @@ using VNLib.Plugins.Essentials; using VNLib.Plugins.Essentials.Endpoints; using VNLib.Plugins.Essentials.Extensions; using VNLib.Plugins.Extensions.Loading; +using VNLib.Data.Caching.Extensions; +using VNLib.Data.Caching.Extensions.Clustering; namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints { @@ -44,12 +43,8 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints * the network. Clients need to know the endpoint layout to be able to * connect and discover other nodes. */ - - [ConfigurationName("well_known", Required = false)] internal sealed class WellKnownEndpoint : ResourceEndpointBase - { - //Default path for the well known endpoint - const string DefaultPath = "/.well-known/vncache"; + { //Store serialized advertisment private readonly CacheNodeAdvertisment _advertisment; @@ -62,10 +57,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints DisableSessionsRequired = true, }; - public WellKnownEndpoint(PluginBase plugin):this(plugin, null) - { } - - public WellKnownEndpoint(PluginBase plugin, IConfigScope? config) + public WellKnownEndpoint(PluginBase plugin) { //Get the node config NodeConfig nodeConfig = plugin.GetOrCreateSingleton<NodeConfig>(); @@ -74,16 +66,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints _advertisment = nodeConfig.Config.Advertisment; _keyStore = nodeConfig.KeyStore; - //Default to the well known path - string path = DefaultPath; - - //See if the user configured a path - if(config != null && config.TryGetValue("path", out JsonElement pathEl)) - { - path = pathEl.GetString() ?? DefaultPath; - } - - InitPathAndLog(path, plugin.Log); + InitPathAndLog(nodeConfig.WellKnownPath, plugin.Log); } protected override VfReturnType Get(HttpEntity entity) diff --git a/plugins/ObjectCacheServer/src/NodeConfig.cs b/plugins/ObjectCacheServer/src/NodeConfig.cs index a6c5be9..3a2e10e 100644 --- a/plugins/ObjectCacheServer/src/NodeConfig.cs +++ b/plugins/ObjectCacheServer/src/NodeConfig.cs @@ -26,29 +26,22 @@ using System; using System.Net; using System.Linq; using System.Text.Json; +using System.Collections.Generic; using VNLib.Plugins; using VNLib.Utils.Logging; using VNLib.Utils.Extensions; using VNLib.Plugins.Extensions.Loading; -using VNLib.Data.Caching.ObjectCache.Server.Endpoints; using VNLib.Data.Caching.Extensions.Clustering; + namespace VNLib.Data.Caching.ObjectCache.Server { [ConfigurationName("cluster")] internal sealed class NodeConfig { - const string CacheConfigTemplate = -@" -Cluster Configuration: - Node Id: {id} - TlsEndabled: {tls} - Cache Endpoint: {ep} - Discovery Endpoint: {dep} - Discovery Interval: {di} - Max Peers: {mpc} -"; + //Default path for the well known endpoint + const string DefaultPath = "/.well-known/vncache"; public CacheNodeConfiguration Config { get; } @@ -56,16 +49,25 @@ Cluster Configuration: public TimeSpan DiscoveryInterval { get; } + public TimeSpan EventQueuePurgeInterval { get; } + + public int MaxQueueDepth { get; } + + public string? DiscoveryPath { get; } + + public string ConnectPath { get; } + + public string WellKnownPath { get; } + + public bool VerifyIp { get; } + /// <summary> /// The maximum number of peer connections to allow /// </summary> public uint MaxPeerConnections { get; } = 10; public NodeConfig(PluginBase plugin, IConfigScope config) - { - - Config = new(); - + { //Get the port of the primary webserver int port; bool usingTls; @@ -86,56 +88,104 @@ Cluster Configuration: //Server id is just dns name for now string nodeId = $"{hostname}:{port}"; - - //The endpoint to advertise to cache clients that allows cache connections - Uri cacheEndpoint = GetEndpointUri<ConnectEndpoint>(plugin, usingTls, port, hostname); - + //Init key store KeyStore = new(plugin); + + DiscoveryInterval = config["discovery_interval_sec"].GetTimeSpan(TimeParseType.Seconds); + + //Get the event queue purge interval + EventQueuePurgeInterval = config["queue_purge_interval_sec"].GetTimeSpan(TimeParseType.Seconds); + + //Get the max queue depth + MaxQueueDepth = (int)config["max_queue_depth"].GetUInt32(); + + + //Get the connect path + ConnectPath = config["connect_path"].GetString() ?? throw new KeyNotFoundException("Missing required key 'connect_path' in cluster config"); + + //Get the verify ip setting + VerifyIp = config["verify_ip"].GetBoolean(); + + Uri connectEp = BuildUri(usingTls, hostname, port, ConnectPath); + Uri? discoveryEp = null; + + Config = new(); + //Setup cache node config - Config.WithCacheEndpoint(cacheEndpoint) + Config.WithCacheEndpoint(connectEp) .WithNodeId(nodeId) .WithAuthenticator(KeyStore) .WithTls(usingTls); - //Check if advertising is enabled - if(plugin.HasConfigForType<PeerDiscoveryEndpoint>()) + //Get the discovery path (optional) + if (config.TryGetValue("discovery_path", out JsonElement discoveryPathEl)) { - //Get the the broadcast endpoint - Uri discoveryEndpoint = GetEndpointUri<PeerDiscoveryEndpoint>(plugin, usingTls, port, hostname); - - //Enable advertising - Config.EnableAdvertisment(discoveryEndpoint); + DiscoveryPath = discoveryPathEl.GetString(); + + //Enable advertisment if a discovery path is present + if (!string.IsNullOrEmpty(DiscoveryPath)) + { + //Build the discovery endpoint, it must be an absolute uri + discoveryEp = BuildUri(usingTls, hostname, port, DiscoveryPath); + Config.EnableAdvertisment(discoveryEp); + } } - - DiscoveryInterval = config["discovery_interval_sec"].GetTimeSpan(TimeParseType.Seconds); + //Allow custom well-known path + if(config.TryGetValue("well_known_path", out JsonElement wkEl)) + { + WellKnownPath = wkEl.GetString() ?? DefaultPath; + } + //Default if not set + WellKnownPath ??= DefaultPath; //Get the max peer connections - if(config.TryGetValue("max_peers", out JsonElement maxPeerEl)) + if (config.TryGetValue("max_peers", out JsonElement maxPeerEl)) { MaxPeerConnections = maxPeerEl.GetUInt32(); } + const string CacheConfigTemplate = +@" +Cluster Configuration: + Node Id: {id} + TlsEndabled: {tls} + Verify Ip: {vi} + Well-Known: {wk} + Cache Endpoint: {ep} + Discovery Endpoint: {dep} + Discovery Interval: {di} + Max Peer Connections: {mpc} + Max Queue Depth: {mqd} + Event Queue Purge Interval: {eqpi} +"; + //log the config plugin.Log.Information(CacheConfigTemplate, nodeId, usingTls, - cacheEndpoint, - Config.DiscoveryEndpoint, + VerifyIp, + WellKnownPath, + connectEp, + discoveryEp, DiscoveryInterval, - MaxPeerConnections + MaxPeerConnections, + MaxQueueDepth, + EventQueuePurgeInterval ); } - private static Uri GetEndpointUri<T>(PluginBase plugin, bool usingTls, int port, string hostName) where T: IEndpoint + private static Uri BuildUri(bool tls, string host, int port, string path) { - //Get the cache endpoint config - IConfigScope cacheEpConfig = plugin.GetConfigForType<T>(); - - //The endpoint to advertise to cache clients that allows cache connections - return new UriBuilder(usingTls ? Uri.UriSchemeHttps : Uri.UriSchemeHttp, hostName, port, cacheEpConfig["path"].GetString()).Uri; + return new UriBuilder + { + Scheme = tls ? "https" : "http", + Host = host, + Port = port, + Path = path + }.Uri; } } } diff --git a/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs b/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs index 1ddf49b..a566390 100644 --- a/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs +++ b/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs @@ -33,7 +33,7 @@ using VNLib.Utils.Memory.Diagnostics; using VNLib.Plugins.Extensions.Loading; using VNLib.Plugins.Extensions.Loading.Routing; using VNLib.Data.Caching.ObjectCache.Server.Endpoints; -using VNLib.Data.Caching.ObjectCache.Server.Distribution; +using VNLib.Data.Caching.ObjectCache.Server.Clustering; namespace VNLib.Data.Caching.ObjectCache.Server { @@ -71,11 +71,13 @@ namespace VNLib.Data.Caching.ObjectCache.Server } } - protected override void OnLoad() { try { + //Get the node configuration first + NodeConfig config = this.GetOrCreateSingleton<NodeConfig>(); + //Route well-known endpoint this.Route<WellKnownEndpoint>(); @@ -86,7 +88,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server _ = this.GetOrCreateSingleton<CacheNodeReplicationMaanger>(); //Setup discovery endpoint - if(this.HasConfigForType<PeerDiscoveryEndpoint>()) + if(!string.IsNullOrWhiteSpace(config.DiscoveryPath)) { this.Route<PeerDiscoveryEndpoint>(); } |