/*
* Copyright (c) 2023 Vaughn Nugent
*
* Library: VNLib
* Package: VNLib.Plugins.Extensions.VNCache
* File: VnCacheClient.cs
*
* VnCacheClient.cs is part of VNLib.Plugins.Extensions.VNCache which is part of the larger
* VNLib collection of libraries and utilities.
*
* VNLib.Plugins.Extensions.VNCache is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* VNLib.Plugins.Extensions.VNCache is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see https://www.gnu.org/licenses/.
*/
using System;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
using System.Net.Sockets;
using System.Net.WebSockets;
using System.Collections.Generic;
using System.Security.Cryptography;
using VNLib.Hashing;
using VNLib.Hashing.IdentityUtility;
using VNLib.Utils.Memory;
using VNLib.Utils.Logging;
using VNLib.Data.Caching;
using VNLib.Data.Caching.Extensions;
using VNLib.Data.Caching.ObjectCache;
using VNLib.Net.Messaging.FBM.Client;
using VNLib.Plugins.Extensions.Loading;
using VNLib.Data.Caching.Extensions.Clustering;
namespace VNLib.Plugins.Extensions.VNCache
{
public interface ICacheRefreshPolicy
{
TimeSpan MaxCacheAge { get; }
TimeSpan RefreshInterval { get; }
}
///
/// A base class that manages
///
[ConfigurationName(VNCacheExtensions.CACHE_CONFIG_KEY)]
internal class VnCacheClient : IGlobalCacheProvider, IAsyncBackgroundWork
{
private readonly VnCacheClientConfig _config;
///
/// The internal client
///
public FBMClient Client { get; }
///
/// Gets a value that determines if the client is currently connected to a server
///
public bool IsConnected { get; private set; }
public VnCacheClient(PluginBase plugin, IConfigScope config)
:this(
config.Deserialze(),
plugin.IsDebug() ? plugin.Log : null
)
{
//Set authenticator and error handler
Client.GetCacheConfiguration()
.WithAuthenticator(new AuthManager(plugin))
.WithErrorHandler(new DiscoveryErrHAndler(plugin.Log));
}
public VnCacheClient(VnCacheClientConfig config, ILogProvider? debugLog)
{
//Validate config
(config as IOnConfigValidation).Validate();
_config = config;
//Init the client with default settings
FBMClientConfig conf = FBMDataCacheExtensions.GetDefaultConfig(MemoryUtil.Shared, config.MaxMessageSize!.Value, config.RequestTimeout, debugLog);
Client = new(conf);
//Add the configuration to the client
Client.GetCacheConfiguration()
.WithTls(config.UseTls)
.WithInitialPeers(config.GetInitialNodeUris());
}
/*
* Background work method manages the remote cache connection
* to the cache cluster
*/
public virtual async Task DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken)
{
try
{
while (true)
{
//Discover nodes in the network
while (true)
{
try
{
pluginLog.Debug("Discovering cluster nodes in network");
//Get server list
await Client.DiscoverAvailableNodesAsync(exitToken);
break;
}
catch (HttpRequestException re) when (re.InnerException is SocketException)
{
pluginLog.Warn("Broker server is unreachable");
}
catch(CacheDiscoveryFailureException ce)
{
pluginLog.Warn("Failed to discover cache servers, reason {r}", ce);
}
catch (Exception ex)
{
pluginLog.Warn("Failed to get server list from broker, reason {r}", ex.Message);
}
//Gen random ms delay
int randomMsDelay = RandomNumberGenerator.GetInt32(1000, 2000);
await Task.Delay(randomMsDelay, exitToken);
}
try
{
pluginLog.Debug("Connecting to random cache server");
//Connect to a random server
CacheNodeAdvertisment selected = await Client.ConnectToRandomCacheAsync(exitToken);
pluginLog.Debug("Connected to cache server {s}", selected.NodeId);
//Set connection status flag
IsConnected = true;
//Wait for disconnect
await Client.WaitForExitAsync(exitToken);
pluginLog.Debug("Cache server disconnected");
}
catch (WebSocketException wse)
{
pluginLog.Warn("Failed to connect to cache server {reason}", wse.Message);
continue;
}
catch (HttpRequestException he) when (he.InnerException is SocketException)
{
pluginLog.Debug("Failed to connect to random cache server server");
//Continue next loop
continue;
}
finally
{
IsConnected = false;
}
}
}
catch (OperationCanceledException)
{
//Normal exit from listening loop
}
catch (KeyNotFoundException e)
{
pluginLog.Error("Missing required configuration variable for VnCache client: {0}", e.Message);
}
catch (FBMServerNegiationException fne)
{
pluginLog.Error("Failed to negotiate connection with cache server {reason}", fne.Message);
}
catch (Exception ex)
{
pluginLog.Error(ex, "Unhandled exception occured in background cache client listening task");
}
finally
{
//Dispose the client on exit
Client.Dispose();
}
pluginLog.Information("Cache client exited");
}
///
public virtual Task AddOrUpdateAsync(string key, string? newKey, T value, CancellationToken cancellation)
{
return !IsConnected
? throw new InvalidOperationException("The underlying client is not connected to a cache node")
: Client!.AddOrUpdateObjectAsync(key, newKey, value, cancellation);
}
///
public virtual Task DeleteAsync(string key, CancellationToken cancellation)
{
return !IsConnected
? throw new InvalidOperationException("The underlying client is not connected to a cache node")
: Client!.DeleteObjectAsync(key, cancellation);
}
///
public virtual Task GetAsync(string key, CancellationToken cancellation)
{
return !IsConnected
? throw new InvalidOperationException("The underlying client is not connected to a cache node")
: Client!.GetObjectAsync(key, cancellation);
}
///
public virtual Task GetAsync(string key, ICacheObjectDeserialzer deserializer, CancellationToken cancellation)
{
return !IsConnected
? throw new InvalidOperationException("The underlying client is not connected to a cache node")
: Client!.GetObjectAsync(key, deserializer, cancellation);
}
///
public virtual Task AddOrUpdateAsync(string key, string? newKey, T value, ICacheObjectSerialzer serialzer, CancellationToken cancellation)
{
return !IsConnected
? throw new InvalidOperationException("The underlying client is not connected to a cache node")
: Client!.AddOrUpdateObjectAsync(key, newKey, value, serialzer, cancellation);
}
private sealed class AuthManager : ICacheAuthManager
{
private IAsyncLazy _sigKey;
private IAsyncLazy _verKey;
public AuthManager(PluginBase plugin)
{
//Lazy load keys
//Get the signing key
_sigKey = plugin.GetSecretAsync("client_private_key").ToLazy(static r => r.GetJsonWebKey());
//Lazy load cache public key
_verKey = plugin.GetSecretAsync("cache_public_key").ToLazy(static r => r.GetJsonWebKey());
}
public async Task AwaitLazyKeyLoad()
{
await _sigKey;
await _verKey;
}
///
public IReadOnlyDictionary GetJwtHeader()
{
//Get the signing key jwt header
return _sigKey.Value.JwtHeader;
}
///
public void SignJwt(JsonWebToken jwt)
{
//Sign the jwt with signing key
jwt.SignFromJwk(_sigKey.Value);
}
///
public byte[] SignMessageHash(byte[] hash, HashAlg alg)
{
//try to get the rsa alg for the signing key
using RSA? rsa = _sigKey.Value.GetRSAPublicKey();
if(rsa != null)
{
return rsa.SignHash(hash, alg.GetAlgName(), RSASignaturePadding.Pkcs1);
}
//try to get the ecdsa alg for the signing key
using ECDsa? ecdsa = _sigKey.Value.GetECDsaPublicKey();
if(ecdsa != null)
{
return ecdsa.SignHash(hash);
}
throw new NotSupportedException("The signing key is not a valid RSA or ECDSA key");
}
///
public bool VerifyJwt(JsonWebToken jwt, bool isPeer)
{
return jwt.VerifyFromJwk(_verKey.Value);
}
///
public bool VerifyMessageHash(ReadOnlySpan hash, HashAlg alg, ReadOnlySpan signature, bool isPeer)
{
//try to get the rsa alg for the signing key
using RSA? rsa = _verKey.Value.GetRSAPublicKey();
if (rsa != null)
{
return rsa.VerifyHash(hash, signature, alg.GetAlgName(), RSASignaturePadding.Pkcs1);
}
//try to get the ecdsa alg for the signing key
using ECDsa? ecdsa = _verKey.Value.GetECDsaPublicKey();
if (ecdsa != null)
{
return ecdsa.VerifyHash(hash, signature);
}
throw new NotSupportedException("The current key is not an RSA or ECDSA key and is not supported");
}
}
private sealed record class DiscoveryErrHAndler(ILogProvider Logger) : ICacheDiscoveryErrorHandler
{
public void OnDiscoveryError(CacheNodeAdvertisment errorNode, Exception ex)
{
Logger.Error("Failed to discover nodes from server {s} cause:\n{err}", errorNode.NodeId, ex);
}
}
}
}