using System; using System.Text.Json; using System.Net.Sockets; using System.Net.WebSockets; using System.Security.Cryptography; using VNLib.Utils; using VNLib.Utils.Memory; using VNLib.Utils.Logging; using VNLib.Utils.Extensions; using VNLib.Data.Caching.Extensions; using VNLib.Net.Messaging.FBM.Client; using VNLib.Plugins.Extensions.Loading; namespace VNLib.Plugins.Essentials.Sessions { /// /// A wrapper to simplify a cache client object /// public sealed class VnCacheClient : OpenResourceHandle { FBMClient? _client; /// /// The wrapped client /// public override FBMClient? Resource => _client; private TimeSpan RetryInterval; private readonly ILogProvider? DebugLog; private readonly IUnmangedHeap? ClientHeap; /// /// Initializes an emtpy client wrapper that still requires /// configuration loading /// /// An optional debugging log /// An optional for buffers public VnCacheClient(ILogProvider? debugLog, IUnmangedHeap? heap = null) { DebugLog = debugLog; //Default to 10 seconds RetryInterval = TimeSpan.FromSeconds(10); ClientHeap = heap; } protected override void Free() { _client?.Dispose(); _client = null; } /// /// Loads required configuration variables from the config store and /// intializes the interal client /// /// A dictionary of configuration varables /// public async Task LoadConfigAsync(PluginBase pbase, IReadOnlyDictionary config) { int maxMessageSize = config["max_message_size"].GetInt32(); string? brokerAddress = config["broker_address"].GetString() ?? throw new KeyNotFoundException("Missing required configuration variable broker_address"); //Get keys async Task clientPrivTask = pbase.TryGetSecretAsync("client_private_key"); Task brokerPubTask = pbase.TryGetSecretAsync("broker_public_key"); //Wait for all tasks to complete string?[] keys = await Task.WhenAll(clientPrivTask, brokerPubTask); byte[] privKey = Convert.FromBase64String(keys[0] ?? throw new KeyNotFoundException("Missing required secret client_private_key")); byte[] brokerPub = Convert.FromBase64String(keys[1] ?? throw new KeyNotFoundException("Missing required secret broker_public_key")); RetryInterval = config["retry_interval_sec"].GetTimeSpan(TimeParseType.Seconds); Uri brokerUri = new(brokerAddress); //Init the client with default settings FBMClientConfig conf = FBMDataCacheExtensions.GetDefaultConfig(ClientHeap ?? Memory.Shared, maxMessageSize, DebugLog); _client = new(conf); //Add the configuration _client.UseBroker(brokerUri) .ImportBrokerPublicKey(brokerPub) .ImportClientPrivateKey(privKey) .UseTls(brokerUri.Scheme == Uri.UriSchemeHttps); //Zero the key memory Memory.InitializeBlock(privKey.AsSpan()); Memory.InitializeBlock(brokerPub.AsSpan()); } /// /// Discovers nodes in the configured cluster and connects to a random node /// /// A to write log events to /// A token to cancel the operation /// A task that completes when the operation has been cancelled or an unrecoverable error occured /// /// public async Task RunAsync(ILogProvider Log, CancellationToken cancellationToken) { _ = Resource ?? throw new InvalidOperationException("Client configuration not loaded, cannot connect to cache servers"); while (true) { //Load the server list ActiveServer[]? servers; while (true) { try { Log.Debug("Discovering cluster nodes in broker"); //Get server list servers = await Resource.DiscoverNodesAsync(cancellationToken); break; } catch (HttpRequestException re) when (re.InnerException is SocketException) { Log.Warn("Broker server is unreachable"); } catch (Exception ex) { Log.Warn("Failed to get server list from broker, reason {r}", ex.Message); } //Gen random ms delay int randomMsDelay = RandomNumberGenerator.GetInt32(1000, 2000); await Task.Delay(randomMsDelay, cancellationToken); } if (servers?.Length == 0) { Log.Warn("No cluster nodes found, retrying"); await Task.Delay(RetryInterval, cancellationToken); continue; } //select random server from the list of servers ActiveServer selected = servers!.SelectRandom(); try { Log.Debug("Connecting to server {server}", selected.ServerId); //Try to connect to server await Resource.ConnectAndWaitForExitAsync(selected, cancellationToken); Log.Debug("Cache server disconnected"); } catch (WebSocketException wse) { Log.Warn("Failed to connect to cache server {reason}", wse.Message); continue; } catch (HttpRequestException he) when (he.InnerException is SocketException) { Log.Debug("Failed to connect to recommended server {server}", selected.ServerId); //Continue next loop continue; } } } } }