From 9bb5ddd8f19c0ecabd7af4ee58d80c16826bc183 Mon Sep 17 00:00:00 2001 From: vman Date: Wed, 28 Dec 2022 14:19:32 -0500 Subject: Cache provider abstractions, reduced deps --- .../VnCacheClient.cs | 201 --------------------- 1 file changed, 201 deletions(-) delete mode 100644 Libs/VNLib.Plugins.Essentials.Sessions.Runtime/VnCacheClient.cs (limited to 'Libs/VNLib.Plugins.Essentials.Sessions.Runtime/VnCacheClient.cs') diff --git a/Libs/VNLib.Plugins.Essentials.Sessions.Runtime/VnCacheClient.cs b/Libs/VNLib.Plugins.Essentials.Sessions.Runtime/VnCacheClient.cs deleted file mode 100644 index 2f7bdf2..0000000 --- a/Libs/VNLib.Plugins.Essentials.Sessions.Runtime/VnCacheClient.cs +++ /dev/null @@ -1,201 +0,0 @@ -/* -* Copyright (c) 2022 Vaughn Nugent -* -* Library: VNLib -* Package: VNLib.Plugins.Essentials.Sessions.Runtime -* File: VnCacheClient.cs -* -* VnCacheClient.cs is part of VNLib.Plugins.Essentials.Sessions.Runtime which is part of the larger -* VNLib collection of libraries and utilities. -* -* VNLib.Plugins.Essentials.Sessions.Runtime is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License as -* published by the Free Software Foundation, either version 3 of the -* License, or (at your option) any later version. -* -* VNLib.Plugins.Essentials.Sessions.Runtime is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see https://www.gnu.org/licenses/. -*/ - -using System; -using System.Text.Json; -using System.Net.Sockets; -using System.Net.WebSockets; -using System.Security.Cryptography; - -using VNLib.Utils.Memory; -using VNLib.Utils.Logging; -using VNLib.Utils.Resources; -using VNLib.Utils.Extensions; -using VNLib.Data.Caching.Extensions; -using VNLib.Net.Messaging.FBM.Client; -using VNLib.Plugins.Extensions.Loading; -using VNLib.Hashing.IdentityUtility; - -namespace VNLib.Plugins.Essentials.Sessions.Runtime -{ - /// - /// A wrapper to simplify a cache client object - /// - public sealed class VnCacheClient : OpenResourceHandle - { - FBMClient? _client; - - /// - /// The wrapped client - /// - public override FBMClient? Resource => _client; - - - private TimeSpan RetryInterval; - - private readonly ILogProvider? DebugLog; - private readonly IUnmangedHeap? ClientHeap; - - /// - /// Initializes an emtpy client wrapper that still requires - /// configuration loading - /// - /// An optional debugging log - /// An optional for buffers - public VnCacheClient(ILogProvider? debugLog, IUnmangedHeap? heap = null) - { - DebugLog = debugLog; - //Default to 10 seconds - RetryInterval = TimeSpan.FromSeconds(10); - - ClientHeap = heap; - } - - /// - protected override void Free() - { - _client?.Dispose(); - _client = null; - } - - /// - /// Loads required configuration variables from the config store and - /// intializes the interal client - /// - /// - /// A dictionary of configuration varables - /// - public async Task LoadConfigAsync(PluginBase pbase, IReadOnlyDictionary config) - { - int maxMessageSize = config["max_message_size"].GetInt32(); - string? brokerAddress = config["broker_address"].GetString() ?? throw new KeyNotFoundException("Missing required configuration variable broker_address"); - - //Get keys async - Task clientPrivTask = pbase.TryGetSecretAsync("client_private_key"); - Task brokerPubTask = pbase.TryGetSecretAsync("broker_public_key"); - Task cachePubTask = pbase.TryGetSecretAsync("cache_public_key"); - - //Wait for all tasks to complete - _ = await Task.WhenAll(clientPrivTask, brokerPubTask, cachePubTask); - - using SecretResult clientPriv = await clientPrivTask ?? throw new KeyNotFoundException("Missing required secret client_private_key"); - using SecretResult brokerPub = await brokerPubTask ?? throw new KeyNotFoundException("Missing required secret broker_public_key"); - using SecretResult cachePub = await cachePubTask ?? throw new KeyNotFoundException("Missing required secret cache_public_key"); - - ReadOnlyJsonWebKey clientCert = clientPriv.GetJsonWebKey(); - ReadOnlyJsonWebKey brokerPubKey = brokerPub.GetJsonWebKey(); - ReadOnlyJsonWebKey cachePubKey = cachePub.GetJsonWebKey(); - - RetryInterval = config["retry_interval_sec"].GetTimeSpan(TimeParseType.Seconds); - - Uri brokerUri = new(brokerAddress); - - //Init the client with default settings - FBMClientConfig conf = FBMDataCacheExtensions.GetDefaultConfig(ClientHeap ?? Memory.Shared, maxMessageSize, DebugLog); - - _client = new(conf); - - //Add the configuration to the client - _client.GetCacheConfiguration() - .WithBroker(brokerUri) - .WithVerificationKey(cachePubKey) - .WithSigningCertificate(clientCert) - .WithBrokerVerificationKey(brokerPubKey) - .WithTls(brokerUri.Scheme == Uri.UriSchemeHttps); - } - - /// - /// Discovers nodes in the configured cluster and connects to a random node - /// - /// A to write log events to - /// A token to cancel the operation - /// A task that completes when the operation has been cancelled or an unrecoverable error occured - /// - /// - public async Task RunAsync(ILogProvider Log, CancellationToken cancellationToken) - { - _ = Resource ?? throw new InvalidOperationException("Client configuration not loaded, cannot connect to cache servers"); - - while (true) - { - //Load the server list - ActiveServer[]? servers; - while (true) - { - try - { - Log.Debug("Discovering cluster nodes in broker"); - //Get server list - servers = await Resource.DiscoverCacheNodesAsync(cancellationToken); - break; - } - catch (HttpRequestException re) when (re.InnerException is SocketException) - { - Log.Warn("Broker server is unreachable"); - } - catch (Exception ex) - { - Log.Warn("Failed to get server list from broker, reason {r}", ex.Message); - } - - //Gen random ms delay - int randomMsDelay = RandomNumberGenerator.GetInt32(1000, 2000); - await Task.Delay(randomMsDelay, cancellationToken); - } - - if (servers?.Length == 0) - { - Log.Warn("No cluster nodes found, retrying"); - await Task.Delay(RetryInterval, cancellationToken); - continue; - } - - try - { - Log.Debug("Connecting to random cache server"); - - //Connect to a random server - ActiveServer selected = await Resource.ConnectToRandomCacheAsync(cancellationToken); - Log.Debug("Connected to cache server {s}", selected.ServerId); - - //Wait for disconnect - await Resource.WaitForExitAsync(cancellationToken); - - Log.Debug("Cache server disconnected"); - } - catch (WebSocketException wse) - { - Log.Warn("Failed to connect to cache server {reason}", wse.Message); - continue; - } - catch (HttpRequestException he) when (he.InnerException is SocketException) - { - Log.Debug("Failed to connect to random cache server server"); - //Continue next loop - continue; - } - } - } - } -} -- cgit