From d2d812213b99ee17f9433f81871b694c4053ff23 Mon Sep 17 00:00:00 2001 From: vnugent Date: Thu, 2 Nov 2023 01:50:05 -0400 Subject: also carried away --- .../src/VnCacheClient.cs | 397 --------------------- 1 file changed, 397 deletions(-) delete mode 100644 lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs (limited to 'lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs') diff --git a/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs b/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs deleted file mode 100644 index e2d0176..0000000 --- a/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs +++ /dev/null @@ -1,397 +0,0 @@ -/* -* Copyright (c) 2023 Vaughn Nugent -* -* Library: VNLib -* Package: VNLib.Plugins.Extensions.VNCache -* File: VnCacheClient.cs -* -* VnCacheClient.cs is part of VNLib.Plugins.Extensions.VNCache which is part of the larger -* VNLib collection of libraries and utilities. -* -* VNLib.Plugins.Extensions.VNCache is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License as -* published by the Free Software Foundation, either version 3 of the -* License, or (at your option) any later version. -* -* VNLib.Plugins.Extensions.VNCache is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see https://www.gnu.org/licenses/. -*/ - -using System; -using System.Net.Http; -using System.Threading; -using System.Threading.Tasks; -using System.Net.Sockets; -using System.Net.WebSockets; -using System.Collections.Generic; -using System.Security.Cryptography; - -using VNLib.Hashing; -using VNLib.Hashing.IdentityUtility; -using VNLib.Utils.Memory; -using VNLib.Utils.Logging; -using VNLib.Data.Caching; -using VNLib.Data.Caching.Extensions; -using VNLib.Data.Caching.ObjectCache; -using VNLib.Net.Messaging.FBM.Client; -using VNLib.Plugins.Extensions.Loading; -using VNLib.Data.Caching.Extensions.Clustering; -using VNLib.Plugins.Extensions.Loading.Events; -using VNLib.Plugins.Extensions.VNCache.Clustering; - -namespace VNLib.Plugins.Extensions.VNCache -{ - public interface ICacheRefreshPolicy - { - TimeSpan MaxCacheAge { get; } - - TimeSpan RefreshInterval { get; } - } - - /// - /// A base class that manages - /// - [ConfigurationName(VNCacheExtensions.CACHE_CONFIG_KEY)] - internal class VnCacheClient : IGlobalCacheProvider, IAsyncBackgroundWork - { - private const string LOG_NAME = "CLIENT"; - private static readonly TimeSpan InitialDelay = TimeSpan.FromSeconds(10); - private static readonly TimeSpan NoNodeDelay = TimeSpan.FromSeconds(10); - - private readonly VnCacheClientConfig _config; - private readonly ClusterNodeIndex _index; - - /// - /// The internal client - /// - public FBMClient Client { get; } - - /// - /// Gets a value that determines if the client is currently connected to a server - /// - public bool IsConnected { get; private set; } - - public VnCacheClient(PluginBase plugin, IConfigScope config) - :this( - config.Deserialze(), - plugin.IsDebug() ? plugin.Log : null - ) - { - ILogProvider scoped = plugin.Log.CreateScope(LOG_NAME); - - //Set authenticator and error handler - Client.GetCacheConfiguration() - .WithAuthenticator(new AuthManager(plugin)) - .WithErrorHandler(new DiscoveryErrHAndler(scoped)); - - //Schedule discovery interval - plugin.ScheduleInterval(_index, _config.DiscoveryInterval); - - //Run discovery after initial delay if interval is greater than initial delay - if(_config.DiscoveryInterval > InitialDelay) - { - //Run a manual initial load - scoped.Information("Running initial discovery in {delay}", InitialDelay); - _ = plugin.ObserveWork(() => _index.OnIntervalAsync(scoped, plugin.UnloadToken), (int)InitialDelay.TotalMilliseconds); - } - } - - public VnCacheClient(VnCacheClientConfig config, ILogProvider? debugLog) - { - //Validate config - (config as IOnConfigValidation).Validate(); - - _config = config; - - //Init the client with default settings - FBMClientConfig conf = FBMDataCacheExtensions.GetDefaultConfig(MemoryUtil.Shared, config.MaxMessageSize!.Value, config.RequestTimeout, debugLog); - - Client = new(conf); - - //Add the configuration to the client - Client.GetCacheConfiguration() - .WithTls(config.UseTls) - .WithInitialPeers(config.GetInitialNodeUris()); - - //Init index - _index = new ClusterNodeIndex(Client.GetCacheConfiguration()); - } - - /* - * Background work method manages the remote cache connection - * to the cache cluster - */ - public virtual async Task DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken) - { - //Scope log - pluginLog = pluginLog.CreateScope(LOG_NAME); - - try - { - //Initial delay - pluginLog.Debug("Worker started, waiting for startup delay"); - await Task.Delay((int)InitialDelay.TotalMilliseconds + 1000, exitToken); - - while (true) - { - try - { - //Wait for a discovery to complete - await _index.WaitForDiscoveryAsync(exitToken); - } - catch(CacheDiscoveryFailureException cdfe) - { - pluginLog.Error("Failed to discover nodes, will try again\n{err}", cdfe.Message); - //Continue - } - - //Get the next node to connect to - CacheNodeAdvertisment? node = _index.GetNextNode(); - - if (node is null) - { - pluginLog.Warn("No nodes available to connect to, trying again in {delay}", NoNodeDelay); - await Task.Delay(NoNodeDelay, exitToken); - - //Run another manual discovery if the interval is greater than the delay - if (_config.DiscoveryInterval > NoNodeDelay) - { - pluginLog.Debug("Forcing a manual discovery"); - - //We dont need to await this because it is awaited at the top of the loop - _ = _index.OnIntervalAsync(pluginLog, exitToken); - } - - continue; - } - - //Ready to connect - - try - { - pluginLog.Debug("Connecting to {node}", node); - - //Connect to the node - await Client.ConnectToCacheAsync(node, exitToken); - - if (pluginLog.IsEnabled(LogLevel.Debug)) - { - pluginLog.Debug("Connected server: {s}", node); - } - else - { - pluginLog.Information("Successfully connected to cache node"); - } - - //Set connection status flag - IsConnected = true; - - //Wait for disconnect - await Client.WaitForExitAsync(exitToken); - - pluginLog.Information("Cache server disconnected"); - } - catch(TimeoutException) - { - pluginLog.Warn("Failed to establish a websocket connection to cache server within the timeout period"); - } - catch (WebSocketException wse) - { - pluginLog.Warn("Failed to establish a websocket connection to cache server {reason}", wse.Message); - pluginLog.Verbose("Stack trace: {re}", wse); - } - //SEs may be raised when the server is not available - catch (HttpRequestException he) when (he.InnerException is SocketException) - { - pluginLog.Debug("Failed to connect to random cache server because a TCP connection could not be established"); - pluginLog.Verbose("Stack trace: {re}", he.InnerException); - } - catch(HttpRequestException he) when(he.StatusCode.HasValue) - { - pluginLog.Warn("Failed to negotiate with cache server {reason}", he.Message); - pluginLog.Verbose("Stack trace: {re}", he); - await Task.Delay(1000, exitToken); - } - finally - { - IsConnected = false; - } - - //Loop again - } - } - catch (OperationCanceledException) - { - //Normal exit from listening loop - } - catch (FBMServerNegiationException fne) - { - pluginLog.Error("Failed to negotiate connection with cache server. Please check your configuration\n {reason}", fne.Message); - } - catch (Exception ex) - { - pluginLog.Error(ex, "Unhandled exception occured in background cache client listening task"); - } - finally - { - //Dispose the client on exit - Client.Dispose(); - } - pluginLog.Information("Cache client exited"); - } - - - /// - public virtual Task AddOrUpdateAsync(string key, string? newKey, T value, CancellationToken cancellation) - { - return !IsConnected - ? throw new InvalidOperationException("The underlying client is not connected to a cache node") - : Client!.AddOrUpdateObjectAsync(key, newKey, value, cancellation); - } - - /// - public virtual Task DeleteAsync(string key, CancellationToken cancellation) - { - return !IsConnected - ? throw new InvalidOperationException("The underlying client is not connected to a cache node") - : Client!.DeleteObjectAsync(key, cancellation); - } - - /// - public virtual Task GetAsync(string key, CancellationToken cancellation) - { - return !IsConnected - ? throw new InvalidOperationException("The underlying client is not connected to a cache node") - : Client!.GetObjectAsync(key, cancellation); - } - - /// - public virtual Task GetAsync(string key, ICacheObjectDeserialzer deserializer, CancellationToken cancellation) - { - return !IsConnected - ? throw new InvalidOperationException("The underlying client is not connected to a cache node") - : Client!.GetObjectAsync(key, deserializer, cancellation); - } - - /// - public virtual Task AddOrUpdateAsync(string key, string? newKey, T value, ICacheObjectSerialzer serialzer, CancellationToken cancellation) - { - return !IsConnected - ? throw new InvalidOperationException("The underlying client is not connected to a cache node") - : Client!.AddOrUpdateObjectAsync(key, newKey, value, serialzer, cancellation); - } - - /// - public virtual Task GetAsync(string key, IObjectData rawData, CancellationToken cancellation) - { - return !IsConnected - ? throw new InvalidOperationException("The underlying client is not connected to a cache node") - : Client!.GetObjectAsync(key, rawData, cancellation); - } - - /// - public virtual Task AddOrUpdateAsync(string key, string? newKey, IObjectData rawData, CancellationToken cancellation) - { - return !IsConnected - ? throw new InvalidOperationException("The underlying client is not connected to a cache node") - : Client!.AddOrUpdateObjectAsync(key, newKey, rawData, cancellation); - } - - private sealed class AuthManager : ICacheAuthManager - { - - private IAsyncLazy _sigKey; - private IAsyncLazy _verKey; - - public AuthManager(PluginBase plugin) - { - //Lazy load keys - - //Get the signing key - _sigKey = plugin.GetSecretAsync("client_private_key").ToLazy(static r => r.GetJsonWebKey()); - - //Lazy load cache public key - _verKey = plugin.GetSecretAsync("cache_public_key").ToLazy(static r => r.GetJsonWebKey()); - } - - public async Task AwaitLazyKeyLoad() - { - await _sigKey; - await _verKey; - } - - /// - public IReadOnlyDictionary GetJwtHeader() - { - //Get the signing key jwt header - return _sigKey.Value.JwtHeader; - } - - /// - public void SignJwt(JsonWebToken jwt) - { - //Sign the jwt with signing key - jwt.SignFromJwk(_sigKey.Value); - } - - /// - public byte[] SignMessageHash(byte[] hash, HashAlg alg) - { - //try to get the rsa alg for the signing key - using RSA? rsa = _sigKey.Value.GetRSAPrivateKey(); - if(rsa != null) - { - return rsa.SignHash(hash, alg.GetAlgName(), RSASignaturePadding.Pkcs1); - } - - //try to get the ecdsa alg for the signing key - using ECDsa? ecdsa = _sigKey.Value.GetECDsaPrivateKey(); - if(ecdsa != null) - { - return ecdsa.SignHash(hash); - } - - throw new NotSupportedException("The signing key is not a valid RSA or ECDSA key"); - } - - /// - public bool VerifyJwt(JsonWebToken jwt, bool isPeer) - { - return jwt.VerifyFromJwk(_verKey.Value); - } - - /// - public bool VerifyMessageHash(ReadOnlySpan hash, HashAlg alg, ReadOnlySpan signature, bool isPeer) - { - //try to get the rsa alg for the signing key - using RSA? rsa = _verKey.Value.GetRSAPublicKey(); - if (rsa != null) - { - return rsa.VerifyHash(hash, signature, alg.GetAlgName(), RSASignaturePadding.Pkcs1); - } - - //try to get the ecdsa alg for the signing key - using ECDsa? ecdsa = _verKey.Value.GetECDsaPublicKey(); - if (ecdsa != null) - { - return ecdsa.VerifyHash(hash, signature); - } - - throw new NotSupportedException("The current key is not an RSA or ECDSA key and is not supported"); - } - } - - private sealed record class DiscoveryErrHAndler(ILogProvider Logger) : ICacheDiscoveryErrorHandler - { - public void OnDiscoveryError(CacheNodeAdvertisment errorNode, Exception ex) - { - Logger.Error("Failed to discover nodes from server {s} cause:\n{err}", errorNode?.NodeId, ex); - } - } - } -} \ No newline at end of file -- cgit