diff options
Diffstat (limited to 'lib/VNLib.Plugins.Extensions.VNCache/src')
3 files changed, 365 insertions, 0 deletions
diff --git a/lib/VNLib.Plugins.Extensions.VNCache/src/VNCacheExtensions.cs b/lib/VNLib.Plugins.Extensions.VNCache/src/VNCacheExtensions.cs new file mode 100644 index 0000000..d4aa88a --- /dev/null +++ b/lib/VNLib.Plugins.Extensions.VNCache/src/VNCacheExtensions.cs @@ -0,0 +1,108 @@ +/* +* Copyright (c) 2022 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Plugins.Extensions.VNCache +* File: VNCacheExtensions.cs +* +* VNCacheExtensions.cs is part of VNLib.Plugins.Extensions.VNCache which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Plugins.Extensions.VNCache is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Plugins.Extensions.VNCache is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System.Text.Json; + +using VNLib.Utils.Logging; +using VNLib.Data.Caching; +using VNLib.Data.Caching.Extensions; +using VNLib.Plugins.Extensions.Loading; + +namespace VNLib.Plugins.Extensions.VNCache +{ + /// <summary> + /// Contains extension methods for aquiring a Plugin managed + /// global cache provider. + /// </summary> + public static class VNCacheExtensions + { + /// <summary> + /// Loads the shared cache provider for the current plugin + /// </summary> + /// <param name="pbase"></param> + /// <param name="localized">A localized log provider to write cache logging information to</param> + /// <returns>The shared <see cref="IGlobalCacheProvider"/> </returns> + /// <remarks> + /// The returned instance, background work, logging, and its lifetime + /// are managed by the current plugin. Beware when calling this method + /// network connections may be spawend and managed in the background by + /// this library. + /// </remarks> + public static VnCacheClient GetGlobalCache(this PluginBase pbase, ILogProvider? localized = null) + => LoadingExtensions.GetOrCreateSingleton<VnCacheClient>(pbase, localized == null ? LoadCacheClient : (pbase) => LoadCacheClient(pbase, localized)); + + private static VnCacheClient LoadCacheClient(PluginBase pbase) => LoadCacheClient(pbase, pbase.Log); + + private static VnCacheClient LoadCacheClient(PluginBase pbase, ILogProvider localized) + { + //Get config for client + IReadOnlyDictionary<string, JsonElement> config = pbase.GetConfigForType<VnCacheClient>(); + + //Init client + ILogProvider? debugLog = pbase.IsDebug() ? pbase.Log : null; + VnCacheClient client = new(debugLog); + + //Begin cache connections by scheduling a task on the plugin's scheduler + _ = pbase.DeferTask(() => RunClientAsync(pbase, config, localized, client), 250); + + return client; + } + + private static async Task RunClientAsync(PluginBase pbase, IReadOnlyDictionary<string, JsonElement> config, ILogProvider localized, VnCacheClient client) + { + ILogProvider Log = localized; + + try + { + //Try loading config + await client.LoadConfigAsync(pbase, config); + + Log.Verbose("VNCache client configration loaded successfully"); + + //Run and wait for exit + await client.RunAsync(Log, pbase.UnloadToken); + } + catch (OperationCanceledException) + { } + catch (KeyNotFoundException e) + { + Log.Error("Missing required configuration variable for VnCache client: {0}", e.Message); + } + catch (FBMServerNegiationException fne) + { + Log.Error("Failed to negotiate connection with cache server {reason}", fne.Message); + } + catch (Exception ex) + { + Log.Error(ex, "Cache client error occured in session provider"); + } + finally + { + client.Dispose(); + } + + Log.Information("Cache client exited"); + } + } +} diff --git a/lib/VNLib.Plugins.Extensions.VNCache/src/VNLib.Plugins.Extensions.VNCache.csproj b/lib/VNLib.Plugins.Extensions.VNCache/src/VNLib.Plugins.Extensions.VNCache.csproj new file mode 100644 index 0000000..f913366 --- /dev/null +++ b/lib/VNLib.Plugins.Extensions.VNCache/src/VNLib.Plugins.Extensions.VNCache.csproj @@ -0,0 +1,26 @@ +<Project Sdk="Microsoft.NET.Sdk"> + + <PropertyGroup> + <TargetFramework>net6.0</TargetFramework> + <ImplicitUsings>enable</ImplicitUsings> + <Nullable>enable</Nullable> + <GenerateDocumentationFile>True</GenerateDocumentationFile> + <Version>1.0.1.1</Version> + <Authors>Vaughn Nugent</Authors> + <Copyright>Copyright © 2022 Vaughn Nugent</Copyright> + <PackageProjectUrl>https://www.vaughnnugent.com/resources</PackageProjectUrl> + <CopyLocalLockFileAssemblies>true</CopyLocalLockFileAssemblies> + <SignAssembly>True</SignAssembly> + <AssemblyOriginatorKeyFile>\\vaughnnugent.com\Internal\Folder Redirection\vman\Documents\Programming\Software\StrongNameingKey.snk</AssemblyOriginatorKeyFile> + </PropertyGroup> + + <ItemGroup> + <ProjectReference Include="..\..\..\VNLib\Hashing\src\VNLib.Hashing.Portable.csproj" /> + <ProjectReference Include="..\..\..\VNLib\VNLib.Net.Messaging.FBM\src\VNLib.Net.Messaging.FBM.csproj" /> + <ProjectReference Include="..\..\DataCaching\VNLib.Data.Caching.Extensions\VNLib.Data.Caching.Extensions.csproj" /> + <ProjectReference Include="..\..\DataCaching\VNLib.Data.Caching\VNLib.Data.Caching.csproj" /> + <ProjectReference Include="..\..\PluginBase\VNLib.Plugins.PluginBase.csproj" /> + <ProjectReference Include="..\VNLib.Plugins.Extensions.Loading\VNLib.Plugins.Extensions.Loading.csproj" /> + </ItemGroup> + +</Project> diff --git a/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs b/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs new file mode 100644 index 0000000..a34e611 --- /dev/null +++ b/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs @@ -0,0 +1,231 @@ +/* +* Copyright (c) 2022 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Plugins.Extensions.VNCache +* File: VnCacheClient.cs +* +* VnCacheClient.cs is part of VNLib.Plugins.Extensions.VNCache which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Plugins.Extensions.VNCache is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Plugins.Extensions.VNCache is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System.Text.Json; +using System.Net.Sockets; +using System.Net.WebSockets; +using System.Security.Cryptography; + +using VNLib.Utils; +using VNLib.Utils.Memory; +using VNLib.Utils.Logging; +using VNLib.Utils.Extensions; +using VNLib.Hashing.IdentityUtility; +using VNLib.Data.Caching; +using VNLib.Data.Caching.Extensions; +using VNLib.Net.Messaging.FBM.Client; +using VNLib.Plugins.Extensions.Loading; + + +namespace VNLib.Plugins.Extensions.VNCache +{ + /// <summary> + /// A wrapper to simplify a shared global cache client + /// </summary> + [ConfigurationName("vncache")] + public sealed class VnCacheClient : VnDisposeable, IGlobalCacheProvider + { + FBMClient? _client; + + private TimeSpan RetryInterval; + + private readonly ILogProvider? DebugLog; + private readonly IUnmangedHeap? ClientHeap; + + /// <summary> + /// Initializes an emtpy client wrapper that still requires + /// configuration loading + /// </summary> + /// <param name="debugLog">An optional debugging log</param> + /// <param name="heap">An optional <see cref="IUnmangedHeap"/> for <see cref="FBMClient"/> buffers</param> + internal VnCacheClient(ILogProvider? debugLog, IUnmangedHeap? heap = null) + { + DebugLog = debugLog; + //Default to 10 seconds + RetryInterval = TimeSpan.FromSeconds(10); + + ClientHeap = heap; + } + + ///<inheritdoc/> + protected override void Free() + { + _client?.Dispose(); + _client = null; + } + + + /// <summary> + /// Loads required configuration variables from the config store and + /// intializes the interal client + /// </summary> + /// <param name="pbase"></param> + /// <param name="config">A dictionary of configuration varables</param> + /// <exception cref="KeyNotFoundException"></exception> + internal async Task LoadConfigAsync(PluginBase pbase, IReadOnlyDictionary<string, JsonElement> config) + { + int maxMessageSize = config["max_message_size"].GetInt32(); + string? brokerAddress = config["broker_address"].GetString() ?? throw new KeyNotFoundException("Missing required configuration variable broker_address"); + + //Get keys async + Task<ReadOnlyJsonWebKey?> clientPrivTask = pbase.TryGetSecretAsync("client_private_key").ToJsonWebKey(); + Task<ReadOnlyJsonWebKey?> brokerPubTask = pbase.TryGetSecretAsync("broker_public_key").ToJsonWebKey(); + Task<ReadOnlyJsonWebKey?> cachePubTask = pbase.TryGetSecretAsync("cache_public_key").ToJsonWebKey(); + + //Wait for all tasks to complete + _ = await Task.WhenAll(clientPrivTask, brokerPubTask, cachePubTask); + + ReadOnlyJsonWebKey clientPriv = await clientPrivTask ?? throw new KeyNotFoundException("Missing required secret client_private_key"); + ReadOnlyJsonWebKey brokerPub = await brokerPubTask ?? throw new KeyNotFoundException("Missing required secret broker_public_key"); + ReadOnlyJsonWebKey cachePub = await cachePubTask ?? throw new KeyNotFoundException("Missing required secret cache_public_key"); + + RetryInterval = config["retry_interval_sec"].GetTimeSpan(TimeParseType.Seconds); + + Uri brokerUri = new(brokerAddress); + + //Init the client with default settings + FBMClientConfig conf = FBMDataCacheExtensions.GetDefaultConfig(ClientHeap ?? Memory.Shared, maxMessageSize, DebugLog); + + _client = new(conf); + + //Add the configuration to the client + _client.GetCacheConfiguration() + .WithBroker(brokerUri) + .WithVerificationKey(cachePub) + .WithSigningCertificate(clientPriv) + .WithBrokerVerificationKey(brokerPub) + .WithTls(brokerUri.Scheme == Uri.UriSchemeHttps); + } + + /// <summary> + /// Discovers nodes in the configured cluster and connects to a random node + /// </summary> + /// <param name="Log">A <see cref="ILogProvider"/> to write log events to</param> + /// <param name="cancellationToken">A token to cancel the operation</param> + /// <returns>A task that completes when the operation has been cancelled or an unrecoverable error occured</returns> + /// <exception cref="InvalidOperationException"></exception> + /// <exception cref="OperationCanceledException"></exception> + internal async Task RunAsync(ILogProvider Log, CancellationToken cancellationToken) + { + _ = _client ?? throw new InvalidOperationException("Client configuration not loaded, cannot connect to cache servers"); + + while (true) + { + //Load the server list + ActiveServer[]? servers; + while (true) + { + try + { + Log.Debug("Discovering cluster nodes in broker"); + //Get server list + servers = await _client.DiscoverCacheNodesAsync(cancellationToken); + break; + } + catch (HttpRequestException re) when (re.InnerException is SocketException) + { + Log.Warn("Broker server is unreachable"); + } + catch (Exception ex) + { + Log.Warn("Failed to get server list from broker, reason {r}", ex.Message); + } + + //Gen random ms delay + int randomMsDelay = RandomNumberGenerator.GetInt32(1000, 2000); + await Task.Delay(randomMsDelay, cancellationToken); + } + + if (servers?.Length == 0) + { + Log.Warn("No cluster nodes found, retrying"); + await Task.Delay(RetryInterval, cancellationToken); + continue; + } + + try + { + Log.Debug("Connecting to random cache server"); + + //Connect to a random server + ActiveServer selected = await _client.ConnectToRandomCacheAsync(cancellationToken); + Log.Debug("Connected to cache server {s}", selected.ServerId); + + //Set connection status flag + IsConnected = true; + + //Wait for disconnect + await _client.WaitForExitAsync(cancellationToken); + + Log.Debug("Cache server disconnected"); + } + catch (WebSocketException wse) + { + Log.Warn("Failed to connect to cache server {reason}", wse.Message); + continue; + } + catch (HttpRequestException he) when (he.InnerException is SocketException) + { + Log.Debug("Failed to connect to random cache server server"); + //Continue next loop + continue; + } + finally + { + IsConnected = false; + } + } + } + + + ///<inheritdoc/> + public bool IsConnected { get; private set; } + + + ///<inheritdoc/> + public Task AddOrUpdateAsync<T>(string key, string? newKey, T value, CancellationToken cancellation) + { + return !IsConnected + ? throw new InvalidOperationException("The underlying client is not connected to a cache node") + : _client!.AddOrUpdateObjectAsync(key, newKey, value, cancellation); + } + + ///<inheritdoc/> + public Task DeleteAsync(string key, CancellationToken cancellation) + { + return !IsConnected + ? throw new InvalidOperationException("The underlying client is not connected to a cache node") + : _client!.DeleteObjectAsync(key, cancellation); + } + + + ///<inheritdoc/> + public Task<T?> GetAsync<T>(string key, CancellationToken cancellation) + { + return !IsConnected + ? throw new InvalidOperationException("The underlying client is not connected to a cache node") + : _client!.GetObjectAsync<T>(key, cancellation); + } + } +}
\ No newline at end of file |