/*
* Copyright (c) 2022 Vaughn Nugent
*
* Library: VNLib
* Package: VNLib.Plugins.Essentials.Sessions.Runtime
* File: VnCacheClient.cs
*
* VnCacheClient.cs is part of VNLib.Plugins.Essentials.Sessions.Runtime which is part of the larger
* VNLib collection of libraries and utilities.
*
* VNLib.Plugins.Essentials.Sessions.Runtime is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* VNLib.Plugins.Essentials.Sessions.Runtime is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see https://www.gnu.org/licenses/.
*/
using System;
using System.Text.Json;
using System.Net.Sockets;
using System.Net.WebSockets;
using System.Security.Cryptography;
using VNLib.Utils;
using VNLib.Utils.Memory;
using VNLib.Utils.Logging;
using VNLib.Utils.Extensions;
using VNLib.Data.Caching.Extensions;
using VNLib.Net.Messaging.FBM.Client;
using VNLib.Plugins.Extensions.Loading;
namespace VNLib.Plugins.Essentials.Sessions.Runtime
{
///
/// A wrapper to simplify a cache client object
///
public sealed class VnCacheClient : OpenResourceHandle
{
FBMClient? _client;
///
/// The wrapped client
///
public override FBMClient? Resource => _client;
private TimeSpan RetryInterval;
private readonly ILogProvider? DebugLog;
private readonly IUnmangedHeap? ClientHeap;
///
/// Initializes an emtpy client wrapper that still requires
/// configuration loading
///
/// An optional debugging log
/// An optional for buffers
public VnCacheClient(ILogProvider? debugLog, IUnmangedHeap? heap = null)
{
DebugLog = debugLog;
//Default to 10 seconds
RetryInterval = TimeSpan.FromSeconds(10);
ClientHeap = heap;
}
protected override void Free()
{
_client?.Dispose();
_client = null;
}
///
/// Loads required configuration variables from the config store and
/// intializes the interal client
///
/// A dictionary of configuration varables
///
public async Task LoadConfigAsync(PluginBase pbase, IReadOnlyDictionary config)
{
int maxMessageSize = config["max_message_size"].GetInt32();
string? brokerAddress = config["broker_address"].GetString() ?? throw new KeyNotFoundException("Missing required configuration variable broker_address");
//Get keys async
Task clientPrivTask = pbase.TryGetSecretAsync("client_private_key");
Task brokerPubTask = pbase.TryGetSecretAsync("broker_public_key");
//Wait for all tasks to complete
string?[] keys = await Task.WhenAll(clientPrivTask, brokerPubTask);
byte[] privKey = Convert.FromBase64String(keys[0] ?? throw new KeyNotFoundException("Missing required secret client_private_key"));
byte[] brokerPub = Convert.FromBase64String(keys[1] ?? throw new KeyNotFoundException("Missing required secret broker_public_key"));
RetryInterval = config["retry_interval_sec"].GetTimeSpan(TimeParseType.Seconds);
Uri brokerUri = new(brokerAddress);
//Init the client with default settings
FBMClientConfig conf = FBMDataCacheExtensions.GetDefaultConfig(ClientHeap ?? Memory.Shared, maxMessageSize, DebugLog);
_client = new(conf);
//Add the configuration
_client.UseBroker(brokerUri)
.ImportBrokerPublicKey(brokerPub)
.ImportClientPrivateKey(privKey)
.UseTls(brokerUri.Scheme == Uri.UriSchemeHttps);
//Zero the key memory
Memory.InitializeBlock(privKey.AsSpan());
Memory.InitializeBlock(brokerPub.AsSpan());
}
///
/// Discovers nodes in the configured cluster and connects to a random node
///
/// A to write log events to
/// A token to cancel the operation
/// A task that completes when the operation has been cancelled or an unrecoverable error occured
///
///
public async Task RunAsync(ILogProvider Log, CancellationToken cancellationToken)
{
_ = Resource ?? throw new InvalidOperationException("Client configuration not loaded, cannot connect to cache servers");
while (true)
{
//Load the server list
ActiveServer[]? servers;
while (true)
{
try
{
Log.Debug("Discovering cluster nodes in broker");
//Get server list
servers = await Resource.DiscoverNodesAsync(cancellationToken);
break;
}
catch (HttpRequestException re) when (re.InnerException is SocketException)
{
Log.Warn("Broker server is unreachable");
}
catch (Exception ex)
{
Log.Warn("Failed to get server list from broker, reason {r}", ex.Message);
}
//Gen random ms delay
int randomMsDelay = RandomNumberGenerator.GetInt32(1000, 2000);
await Task.Delay(randomMsDelay, cancellationToken);
}
if (servers?.Length == 0)
{
Log.Warn("No cluster nodes found, retrying");
await Task.Delay(RetryInterval, cancellationToken);
continue;
}
//select random server from the list of servers
ActiveServer selected = servers!.SelectRandom();
try
{
Log.Debug("Connecting to server {server}", selected.ServerId);
//Try to connect to server
await Resource.ConnectAndWaitForExitAsync(selected, cancellationToken);
Log.Debug("Cache server disconnected");
}
catch (WebSocketException wse)
{
Log.Warn("Failed to connect to cache server {reason}", wse.Message);
continue;
}
catch (HttpRequestException he) when (he.InnerException is SocketException)
{
Log.Debug("Failed to connect to recommended server {server}", selected.ServerId);
//Continue next loop
continue;
}
}
}
}
}