diff options
Diffstat (limited to 'lib/VNLib.Plugins.Extensions.VNCache/src')
4 files changed, 191 insertions, 39 deletions
diff --git a/lib/VNLib.Plugins.Extensions.VNCache/src/Clustering/ClusterNodeIndex.cs b/lib/VNLib.Plugins.Extensions.VNCache/src/Clustering/ClusterNodeIndex.cs new file mode 100644 index 0000000..487a4f9 --- /dev/null +++ b/lib/VNLib.Plugins.Extensions.VNCache/src/Clustering/ClusterNodeIndex.cs @@ -0,0 +1,75 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Plugins.Extensions.VNCache +* File: ClusterNodeIndex.cs +* +* ClusterNodeIndex.cs is part of VNLib.Plugins.Extensions.VNCache which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Plugins.Extensions.VNCache is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Plugins.Extensions.VNCache is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System.Threading; +using System.Threading.Tasks; + +using VNLib.Utils.Logging; +using VNLib.Data.Caching.Extensions; +using VNLib.Data.Caching.Extensions.Clustering; +using VNLib.Plugins.Extensions.Loading.Events; + +namespace VNLib.Plugins.Extensions.VNCache.Clustering +{ + internal sealed class ClusterNodeIndex : IClusterNodeIndex, IIntervalScheduleable + { + private readonly CacheClientConfiguration _config; + private Task _currentUpdate; + + + public ClusterNodeIndex(CacheClientConfiguration config) + { + _config = config; + _currentUpdate = Task.CompletedTask; + } + + ///<inheritdoc/> + public CacheNodeAdvertisment? GetNextNode() + { + //Get all nodes + CacheNodeAdvertisment[] ads = _config.NodeCollection.GetAllNodes(); + //Just get a random node from the collection for now + return ads.Length > 0 ? ads.SelectRandom() : null; + } + + ///<inheritdoc/> + public Task WaitForDiscoveryAsync(CancellationToken cancellationToken) + { + return _currentUpdate.WaitAsync(cancellationToken); + } + + /// <summary> + /// Runs the discovery process and updates the current update task + /// </summary> + /// <param name="log"></param> + /// <param name="cancellationToken">A token to cancel the operation</param> + /// <returns>A task that completes when the discovery process is complete</returns> + public Task OnIntervalAsync(ILogProvider log, CancellationToken cancellationToken) + { + //Run discovery operation and update the task + _currentUpdate = _config.DiscoverNodesAsync(cancellationToken); + return Task.CompletedTask; + } + } +}
\ No newline at end of file diff --git a/lib/VNLib.Plugins.Extensions.VNCache/src/Clustering/IClusterNodeIndex.cs b/lib/VNLib.Plugins.Extensions.VNCache/src/Clustering/IClusterNodeIndex.cs new file mode 100644 index 0000000..ffbfa0d --- /dev/null +++ b/lib/VNLib.Plugins.Extensions.VNCache/src/Clustering/IClusterNodeIndex.cs @@ -0,0 +1,49 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Plugins.Extensions.VNCache +* File: IClusterNodeIndex.cs +* +* IClusterNodeIndex.cs is part of VNLib.Plugins.Extensions.VNCache which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Plugins.Extensions.VNCache is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Plugins.Extensions.VNCache is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System.Threading; +using System.Threading.Tasks; + +using VNLib.Data.Caching.Extensions.Clustering; + +namespace VNLib.Plugins.Extensions.VNCache.Clustering +{ + internal interface IClusterNodeIndex + { + /// <summary> + /// Gets the next available node using the configured balancing policy + /// or null if no nodes are available + /// </summary> + /// <returns>The next available node to connect to if any are available</returns> + CacheNodeAdvertisment? GetNextNode(); + + /// <summary> + /// Waits for the discovery process to complete. This is just incase a + /// connection wants to happen while a long discovery is processing. + /// </summary> + /// <param name="cancellationToken">A token to cancel the operation</param> + /// <returns>A task that resolves when the discovery process completes</returns> + Task WaitForDiscoveryAsync(CancellationToken cancellationToken); + } +}
\ No newline at end of file diff --git a/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs b/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs index aa3d88f..679c91d 100644 --- a/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs +++ b/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs @@ -41,6 +41,8 @@ using VNLib.Data.Caching.ObjectCache; using VNLib.Net.Messaging.FBM.Client; using VNLib.Plugins.Extensions.Loading; using VNLib.Data.Caching.Extensions.Clustering; +using VNLib.Plugins.Extensions.Loading.Events; +using VNLib.Plugins.Extensions.VNCache.Clustering; namespace VNLib.Plugins.Extensions.VNCache { @@ -51,14 +53,18 @@ namespace VNLib.Plugins.Extensions.VNCache TimeSpan RefreshInterval { get; } } - /// <summary> /// A base class that manages /// </summary> [ConfigurationName(VNCacheExtensions.CACHE_CONFIG_KEY)] internal class VnCacheClient : IGlobalCacheProvider, IAsyncBackgroundWork { + private const string LOG_NAME = "CLIENT"; + private static readonly TimeSpan InitialDelay = TimeSpan.FromSeconds(10); + private static readonly TimeSpan NoNodeDelay = TimeSpan.FromSeconds(10); + private readonly VnCacheClientConfig _config; + private readonly ClusterNodeIndex _index; /// <summary> /// The internal client @@ -76,10 +82,23 @@ namespace VNLib.Plugins.Extensions.VNCache plugin.IsDebug() ? plugin.Log : null ) { + ILogProvider scoped = plugin.Log.CreateScope(LOG_NAME); + //Set authenticator and error handler Client.GetCacheConfiguration() .WithAuthenticator(new AuthManager(plugin)) - .WithErrorHandler(new DiscoveryErrHAndler(plugin.Log)); + .WithErrorHandler(new DiscoveryErrHAndler(scoped)); + + //Schedule discovery interval + plugin.ScheduleInterval(_index, _config.DiscoveryInterval); + + //Run discovery after initial delay if interval is greater than initial delay + if(_config.DiscoveryInterval > InitialDelay) + { + //Run a manual initial load + scoped.Information("Running initial discovery in {delay}", InitialDelay); + _ = plugin.ObserveWork(() => _index.OnIntervalAsync(scoped, plugin.UnloadToken), (int)InitialDelay.TotalMilliseconds); + } } public VnCacheClient(VnCacheClientConfig config, ILogProvider? debugLog) @@ -98,6 +117,9 @@ namespace VNLib.Plugins.Extensions.VNCache Client.GetCacheConfiguration() .WithTls(config.UseTls) .WithInitialPeers(config.GetInitialNodeUris()); + + //Init index + _index = new ClusterNodeIndex(Client.GetCacheConfiguration()); } /* @@ -106,46 +128,50 @@ namespace VNLib.Plugins.Extensions.VNCache */ public virtual async Task DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken) { + //Scope log + pluginLog = pluginLog.CreateScope(LOG_NAME); + try { + //Initial delay + pluginLog.Debug("Worker started, waiting for startup delay"); + await Task.Delay((int)InitialDelay.TotalMilliseconds + 1000, exitToken); + while (true) { - //Discover nodes in the network - - while (true) + //Wait for a discovery to complete + await _index.WaitForDiscoveryAsync(exitToken); + + //Get the next node to connect to + CacheNodeAdvertisment? node = _index.GetNextNode(); + + if (node is null) { - try - { - pluginLog.Debug("Discovering cluster nodes in network"); - //Get server list - await Client.DiscoverAvailableNodesAsync(exitToken); - break; - } - catch (HttpRequestException re) when (re.InnerException is SocketException) - { - pluginLog.Warn("Broker server is unreachable"); - } - catch(CacheDiscoveryFailureException ce) - { - pluginLog.Warn("Failed to discover cache servers, reason {r}", ce); - } - catch (Exception ex) + pluginLog.Warn("No nodes available to connect to, trying again in {delay}", NoNodeDelay); + await Task.Delay(NoNodeDelay, exitToken); + + //Run another manual discovery if the interval is greater than the delay + if (_config.DiscoveryInterval > NoNodeDelay) { - pluginLog.Warn("Failed to get server list from broker, reason {r}", ex.Message); + pluginLog.Debug("Forcing a manual discovery"); + + //We dont need to await this because it is awaited at the top of the loop + _ = _index.OnIntervalAsync(pluginLog, exitToken); } - //Gen random ms delay - int randomMsDelay = RandomNumberGenerator.GetInt32(1000, 2000); - await Task.Delay(randomMsDelay, exitToken); + continue; } + //Ready to connect + try { - pluginLog.Debug("Connecting to random cache server"); + pluginLog.Debug("Connecting to {node}", node); - //Connect to a random server - CacheNodeAdvertisment selected = await Client.ConnectToRandomCacheAsync(exitToken); - pluginLog.Debug("Connected to cache server {s}", selected.NodeId); + //Connect to the node + await Client.ConnectToCacheAsync(node, exitToken); + + pluginLog.Debug("Successfully connected to {s}", node); //Set connection status flag IsConnected = true; @@ -157,7 +183,7 @@ namespace VNLib.Plugins.Extensions.VNCache } catch (WebSocketException wse) { - pluginLog.Warn("Failed to connect to cache server {reason}", wse.Message); + pluginLog.Warn("Failed to connect to cache server {reason}", wse); continue; } catch (HttpRequestException he) when (he.InnerException is SocketException) @@ -170,6 +196,8 @@ namespace VNLib.Plugins.Extensions.VNCache { IsConnected = false; } + + //Loop again } } catch (OperationCanceledException) @@ -279,14 +307,14 @@ namespace VNLib.Plugins.Extensions.VNCache public byte[] SignMessageHash(byte[] hash, HashAlg alg) { //try to get the rsa alg for the signing key - using RSA? rsa = _sigKey.Value.GetRSAPublicKey(); + using RSA? rsa = _sigKey.Value.GetRSAPrivateKey(); if(rsa != null) { return rsa.SignHash(hash, alg.GetAlgName(), RSASignaturePadding.Pkcs1); } //try to get the ecdsa alg for the signing key - using ECDsa? ecdsa = _sigKey.Value.GetECDsaPublicKey(); + using ECDsa? ecdsa = _sigKey.Value.GetECDsaPrivateKey(); if(ecdsa != null) { return ecdsa.SignHash(hash); @@ -326,7 +354,7 @@ namespace VNLib.Plugins.Extensions.VNCache { public void OnDiscoveryError(CacheNodeAdvertisment errorNode, Exception ex) { - Logger.Error("Failed to discover nodes from server {s} cause:\n{err}", errorNode.NodeId, ex); + Logger.Error("Failed to discover nodes from server {s} cause:\n{err}", errorNode?.NodeId, ex); } } } diff --git a/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClientConfig.cs b/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClientConfig.cs index 73fb70f..bfa9d92 100644 --- a/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClientConfig.cs +++ b/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClientConfig.cs @@ -53,8 +53,8 @@ namespace VNLib.Plugins.Extensions.VNCache /// The time (in seconds) to randomly delay polling the broker server /// for available servers /// </summary> - [JsonPropertyName("retry_interval_sec")] - public int? RetryIntervalSeconds { get; set; } + [JsonPropertyName("discovery_interval_Sec")] + public int? DiscoveryIntervalSeconds { get; set; } /// <summary> /// The maximum time (in seconds) for FBM cache operations are allowed @@ -71,7 +71,7 @@ namespace VNLib.Plugins.Extensions.VNCache /// <summary> /// Retry interval in a timespan /// </summary> - internal TimeSpan RetryInterval => TimeSpan.FromSeconds(RetryIntervalSeconds!.Value); + internal TimeSpan DiscoveryInterval => TimeSpan.FromSeconds(DiscoveryIntervalSeconds!.Value); /// <summary> /// FBM Request timeout @@ -92,7 +92,7 @@ namespace VNLib.Plugins.Extensions.VNCache public Uri[] GetInitialNodeUris() { _ = InitialNodes ?? throw new InvalidOperationException("Initial nodes have not been set"); - return InitialNodes.Select(x => new Uri(x)).ToArray(); + return InitialNodes.Select(static x => new Uri(x, UriKind.Absolute)).ToArray(); } void IOnConfigValidation.Validate() @@ -102,7 +102,7 @@ namespace VNLib.Plugins.Extensions.VNCache throw new ArgumentException("Your maxium message size should be a reasonable value greater than 0", "max_message_size"); } - if (!RetryIntervalSeconds.HasValue || RetryIntervalSeconds.Value < 1) + if (!DiscoveryIntervalSeconds.HasValue || DiscoveryIntervalSeconds.Value < 1) { throw new ArgumentException("You must specify a retry interval period greater than 0", "retry_interval_sec"); } @@ -128,7 +128,7 @@ namespace VNLib.Plugins.Extensions.VNCache } //Verify http connection - if(peer.Scheme != Uri.UriSchemeHttp || peer.Scheme != Uri.UriSchemeHttps) + if(peer.Scheme != Uri.UriSchemeHttp && peer.Scheme != Uri.UriSchemeHttps) { throw new ArgumentException("You must specify an HTTP or HTTPS URI for each initial node", "initial_nodes"); } |