aboutsummaryrefslogtreecommitdiff
path: root/lib/VNLib.Plugins.Extensions.VNCache/src
diff options
context:
space:
mode:
authorLibravatar vnugent <public@vaughnnugent.com>2023-01-12 17:47:40 -0500
committerLibravatar vnugent <public@vaughnnugent.com>2023-01-12 17:47:40 -0500
commitb75668b164d398b99ee942beced06aa27ef65a50 (patch)
treec1faf6df3caa78083dcc38eb1a7247e456bbe754 /lib/VNLib.Plugins.Extensions.VNCache/src
parentcea64e619e714f6dbe51d37ca8329b58d8c271fb (diff)
Large project reorder and consolidation
Diffstat (limited to 'lib/VNLib.Plugins.Extensions.VNCache/src')
-rw-r--r--lib/VNLib.Plugins.Extensions.VNCache/src/VNCacheExtensions.cs108
-rw-r--r--lib/VNLib.Plugins.Extensions.VNCache/src/VNLib.Plugins.Extensions.VNCache.csproj23
-rw-r--r--lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs231
3 files changed, 362 insertions, 0 deletions
diff --git a/lib/VNLib.Plugins.Extensions.VNCache/src/VNCacheExtensions.cs b/lib/VNLib.Plugins.Extensions.VNCache/src/VNCacheExtensions.cs
new file mode 100644
index 0000000..5f58142
--- /dev/null
+++ b/lib/VNLib.Plugins.Extensions.VNCache/src/VNCacheExtensions.cs
@@ -0,0 +1,108 @@
+/*
+* Copyright (c) 2022 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Plugins.Extensions.VNCache
+* File: VNCacheExtensions.cs
+*
+* VNCacheExtensions.cs is part of VNLib.Plugins.Extensions.VNCache which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* VNLib.Plugins.Extensions.VNCache is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Plugins.Extensions.VNCache is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System.Text.Json;
+
+using VNLib.Utils.Logging;
+using VNLib.Data.Caching;
+using VNLib.Data.Caching.Extensions;
+using VNLib.Plugins.Extensions.Loading;
+
+namespace VNLib.Plugins.Extensions.VNCache
+{
+ /// <summary>
+ /// Contains extension methods for aquiring a Plugin managed
+ /// global cache provider.
+ /// </summary>
+ public static class VNCacheExtensions
+ {
+ /// <summary>
+ /// Loads the shared cache provider for the current plugin
+ /// </summary>
+ /// <param name="pbase"></param>
+ /// <param name="localized">A localized log provider to write cache logging information to</param>
+ /// <returns>The shared <see cref="IGlobalCacheProvider"/> </returns>
+ /// <remarks>
+ /// The returned instance, background work, logging, and its lifetime
+ /// are managed by the current plugin. Beware when calling this method
+ /// network connections may be spawend and managed in the background by
+ /// this library.
+ /// </remarks>
+ public static VnCacheClient GetGlobalCache(this PluginBase pbase, ILogProvider? localized = null)
+ => LoadingExtensions.GetOrCreateSingleton<VnCacheClient>(pbase, localized == null ? LoadCacheClient : (pbase) => LoadCacheClient(pbase, localized));
+
+ private static VnCacheClient LoadCacheClient(PluginBase pbase) => LoadCacheClient(pbase, pbase.Log);
+
+ private static VnCacheClient LoadCacheClient(PluginBase pbase, ILogProvider localized)
+ {
+ //Get config for client
+ IReadOnlyDictionary<string, JsonElement> config = pbase.GetConfigForType<VnCacheClient>();
+
+ //Init client
+ ILogProvider? debugLog = pbase.IsDebug() ? pbase.Log : null;
+ VnCacheClient client = new(debugLog);
+
+ //Begin cache connections by scheduling a task on the plugin's scheduler
+ _ = pbase.DeferTask(() => RunClientAsync(pbase, config, localized, client), 250);
+
+ return client;
+ }
+
+ private static async Task RunClientAsync(PluginBase pbase, IReadOnlyDictionary<string, JsonElement> config, ILogProvider localized, VnCacheClient client)
+ {
+ ILogProvider Log = localized;
+
+ try
+ {
+ //Try loading config
+ await client.LoadConfigAsync(pbase, config);
+
+ Log.Verbose("VNCache client configration loaded successfully");
+
+ //Run and wait for exit
+ await client.RunAsync(Log, pbase.UnloadToken);
+ }
+ catch (OperationCanceledException)
+ { }
+ catch (KeyNotFoundException e)
+ {
+ Log.Error("Missing required configuration variable for VnCache client: {0}", e.Message);
+ }
+ catch (FBMServerNegiationException fne)
+ {
+ Log.Error("Failed to negotiate connection with cache server {reason}", fne.Message);
+ }
+ catch (Exception ex)
+ {
+ Log.Error(ex, "Unhandled exception occured in background cache client listening task");
+ }
+ finally
+ {
+ client.Dispose();
+ }
+
+ Log.Information("Cache client exited");
+ }
+ }
+}
diff --git a/lib/VNLib.Plugins.Extensions.VNCache/src/VNLib.Plugins.Extensions.VNCache.csproj b/lib/VNLib.Plugins.Extensions.VNCache/src/VNLib.Plugins.Extensions.VNCache.csproj
new file mode 100644
index 0000000..a664006
--- /dev/null
+++ b/lib/VNLib.Plugins.Extensions.VNCache/src/VNLib.Plugins.Extensions.VNCache.csproj
@@ -0,0 +1,23 @@
+<Project Sdk="Microsoft.NET.Sdk">
+
+ <PropertyGroup>
+ <TargetFramework>net6.0</TargetFramework>
+ <ImplicitUsings>enable</ImplicitUsings>
+ <Nullable>enable</Nullable>
+ <GenerateDocumentationFile>True</GenerateDocumentationFile>
+ <Version>1.0.1.1</Version>
+ <Authors>Vaughn Nugent</Authors>
+ <Copyright>Copyright © 2023 Vaughn Nugent</Copyright>
+ <PackageProjectUrl>https://www.vaughnnugent.com/resources</PackageProjectUrl>
+ <CopyLocalLockFileAssemblies>true</CopyLocalLockFileAssemblies>
+ <SignAssembly>True</SignAssembly>
+ <AssemblyOriginatorKeyFile>\\vaughnnugent.com\Internal\Folder Redirection\vman\Documents\Programming\Software\StrongNameingKey.snk</AssemblyOriginatorKeyFile>
+ </PropertyGroup>
+
+ <ItemGroup>
+ <ProjectReference Include="..\..\..\..\Extensions\lib\VNLib.Plugins.Extensions.Loading\src\VNLib.Plugins.Extensions.Loading.csproj" />
+ <ProjectReference Include="..\..\VNLib.Data.Caching.Extensions\src\VNLib.Data.Caching.Extensions.csproj" />
+ <ProjectReference Include="..\..\VNLib.Data.Caching\src\VNLib.Data.Caching.csproj" />
+ </ItemGroup>
+
+</Project>
diff --git a/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs b/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs
new file mode 100644
index 0000000..a34e611
--- /dev/null
+++ b/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs
@@ -0,0 +1,231 @@
+/*
+* Copyright (c) 2022 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Plugins.Extensions.VNCache
+* File: VnCacheClient.cs
+*
+* VnCacheClient.cs is part of VNLib.Plugins.Extensions.VNCache which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* VNLib.Plugins.Extensions.VNCache is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Plugins.Extensions.VNCache is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System.Text.Json;
+using System.Net.Sockets;
+using System.Net.WebSockets;
+using System.Security.Cryptography;
+
+using VNLib.Utils;
+using VNLib.Utils.Memory;
+using VNLib.Utils.Logging;
+using VNLib.Utils.Extensions;
+using VNLib.Hashing.IdentityUtility;
+using VNLib.Data.Caching;
+using VNLib.Data.Caching.Extensions;
+using VNLib.Net.Messaging.FBM.Client;
+using VNLib.Plugins.Extensions.Loading;
+
+
+namespace VNLib.Plugins.Extensions.VNCache
+{
+ /// <summary>
+ /// A wrapper to simplify a shared global cache client
+ /// </summary>
+ [ConfigurationName("vncache")]
+ public sealed class VnCacheClient : VnDisposeable, IGlobalCacheProvider
+ {
+ FBMClient? _client;
+
+ private TimeSpan RetryInterval;
+
+ private readonly ILogProvider? DebugLog;
+ private readonly IUnmangedHeap? ClientHeap;
+
+ /// <summary>
+ /// Initializes an emtpy client wrapper that still requires
+ /// configuration loading
+ /// </summary>
+ /// <param name="debugLog">An optional debugging log</param>
+ /// <param name="heap">An optional <see cref="IUnmangedHeap"/> for <see cref="FBMClient"/> buffers</param>
+ internal VnCacheClient(ILogProvider? debugLog, IUnmangedHeap? heap = null)
+ {
+ DebugLog = debugLog;
+ //Default to 10 seconds
+ RetryInterval = TimeSpan.FromSeconds(10);
+
+ ClientHeap = heap;
+ }
+
+ ///<inheritdoc/>
+ protected override void Free()
+ {
+ _client?.Dispose();
+ _client = null;
+ }
+
+
+ /// <summary>
+ /// Loads required configuration variables from the config store and
+ /// intializes the interal client
+ /// </summary>
+ /// <param name="pbase"></param>
+ /// <param name="config">A dictionary of configuration varables</param>
+ /// <exception cref="KeyNotFoundException"></exception>
+ internal async Task LoadConfigAsync(PluginBase pbase, IReadOnlyDictionary<string, JsonElement> config)
+ {
+ int maxMessageSize = config["max_message_size"].GetInt32();
+ string? brokerAddress = config["broker_address"].GetString() ?? throw new KeyNotFoundException("Missing required configuration variable broker_address");
+
+ //Get keys async
+ Task<ReadOnlyJsonWebKey?> clientPrivTask = pbase.TryGetSecretAsync("client_private_key").ToJsonWebKey();
+ Task<ReadOnlyJsonWebKey?> brokerPubTask = pbase.TryGetSecretAsync("broker_public_key").ToJsonWebKey();
+ Task<ReadOnlyJsonWebKey?> cachePubTask = pbase.TryGetSecretAsync("cache_public_key").ToJsonWebKey();
+
+ //Wait for all tasks to complete
+ _ = await Task.WhenAll(clientPrivTask, brokerPubTask, cachePubTask);
+
+ ReadOnlyJsonWebKey clientPriv = await clientPrivTask ?? throw new KeyNotFoundException("Missing required secret client_private_key");
+ ReadOnlyJsonWebKey brokerPub = await brokerPubTask ?? throw new KeyNotFoundException("Missing required secret broker_public_key");
+ ReadOnlyJsonWebKey cachePub = await cachePubTask ?? throw new KeyNotFoundException("Missing required secret cache_public_key");
+
+ RetryInterval = config["retry_interval_sec"].GetTimeSpan(TimeParseType.Seconds);
+
+ Uri brokerUri = new(brokerAddress);
+
+ //Init the client with default settings
+ FBMClientConfig conf = FBMDataCacheExtensions.GetDefaultConfig(ClientHeap ?? Memory.Shared, maxMessageSize, DebugLog);
+
+ _client = new(conf);
+
+ //Add the configuration to the client
+ _client.GetCacheConfiguration()
+ .WithBroker(brokerUri)
+ .WithVerificationKey(cachePub)
+ .WithSigningCertificate(clientPriv)
+ .WithBrokerVerificationKey(brokerPub)
+ .WithTls(brokerUri.Scheme == Uri.UriSchemeHttps);
+ }
+
+ /// <summary>
+ /// Discovers nodes in the configured cluster and connects to a random node
+ /// </summary>
+ /// <param name="Log">A <see cref="ILogProvider"/> to write log events to</param>
+ /// <param name="cancellationToken">A token to cancel the operation</param>
+ /// <returns>A task that completes when the operation has been cancelled or an unrecoverable error occured</returns>
+ /// <exception cref="InvalidOperationException"></exception>
+ /// <exception cref="OperationCanceledException"></exception>
+ internal async Task RunAsync(ILogProvider Log, CancellationToken cancellationToken)
+ {
+ _ = _client ?? throw new InvalidOperationException("Client configuration not loaded, cannot connect to cache servers");
+
+ while (true)
+ {
+ //Load the server list
+ ActiveServer[]? servers;
+ while (true)
+ {
+ try
+ {
+ Log.Debug("Discovering cluster nodes in broker");
+ //Get server list
+ servers = await _client.DiscoverCacheNodesAsync(cancellationToken);
+ break;
+ }
+ catch (HttpRequestException re) when (re.InnerException is SocketException)
+ {
+ Log.Warn("Broker server is unreachable");
+ }
+ catch (Exception ex)
+ {
+ Log.Warn("Failed to get server list from broker, reason {r}", ex.Message);
+ }
+
+ //Gen random ms delay
+ int randomMsDelay = RandomNumberGenerator.GetInt32(1000, 2000);
+ await Task.Delay(randomMsDelay, cancellationToken);
+ }
+
+ if (servers?.Length == 0)
+ {
+ Log.Warn("No cluster nodes found, retrying");
+ await Task.Delay(RetryInterval, cancellationToken);
+ continue;
+ }
+
+ try
+ {
+ Log.Debug("Connecting to random cache server");
+
+ //Connect to a random server
+ ActiveServer selected = await _client.ConnectToRandomCacheAsync(cancellationToken);
+ Log.Debug("Connected to cache server {s}", selected.ServerId);
+
+ //Set connection status flag
+ IsConnected = true;
+
+ //Wait for disconnect
+ await _client.WaitForExitAsync(cancellationToken);
+
+ Log.Debug("Cache server disconnected");
+ }
+ catch (WebSocketException wse)
+ {
+ Log.Warn("Failed to connect to cache server {reason}", wse.Message);
+ continue;
+ }
+ catch (HttpRequestException he) when (he.InnerException is SocketException)
+ {
+ Log.Debug("Failed to connect to random cache server server");
+ //Continue next loop
+ continue;
+ }
+ finally
+ {
+ IsConnected = false;
+ }
+ }
+ }
+
+
+ ///<inheritdoc/>
+ public bool IsConnected { get; private set; }
+
+
+ ///<inheritdoc/>
+ public Task AddOrUpdateAsync<T>(string key, string? newKey, T value, CancellationToken cancellation)
+ {
+ return !IsConnected
+ ? throw new InvalidOperationException("The underlying client is not connected to a cache node")
+ : _client!.AddOrUpdateObjectAsync(key, newKey, value, cancellation);
+ }
+
+ ///<inheritdoc/>
+ public Task DeleteAsync(string key, CancellationToken cancellation)
+ {
+ return !IsConnected
+ ? throw new InvalidOperationException("The underlying client is not connected to a cache node")
+ : _client!.DeleteObjectAsync(key, cancellation);
+ }
+
+
+ ///<inheritdoc/>
+ public Task<T?> GetAsync<T>(string key, CancellationToken cancellation)
+ {
+ return !IsConnected
+ ? throw new InvalidOperationException("The underlying client is not connected to a cache node")
+ : _client!.GetObjectAsync<T>(key, cancellation);
+ }
+ }
+} \ No newline at end of file