diff options
Diffstat (limited to 'plugins/ObjectCacheServer/src/NodeConfig.cs')
-rw-r--r-- | plugins/ObjectCacheServer/src/NodeConfig.cs | 177 |
1 files changed, 33 insertions, 144 deletions
diff --git a/plugins/ObjectCacheServer/src/NodeConfig.cs b/plugins/ObjectCacheServer/src/NodeConfig.cs index 614f0d6..81b8a32 100644 --- a/plugins/ObjectCacheServer/src/NodeConfig.cs +++ b/plugins/ObjectCacheServer/src/NodeConfig.cs @@ -3,9 +3,9 @@ * * Library: VNLib * Package: ObjectCacheServer -* File: ObjectCacheServerEntry.cs +* File: NodeConfig.cs * -* ObjectCacheServerEntry.cs is part of ObjectCacheServer which is part of the larger +* NodeConfig.cs is part of ObjectCacheServer which is part of the larger * VNLib collection of libraries and utilities. * * ObjectCacheServer is free software: you can redistribute it and/or modify @@ -25,63 +25,48 @@ using System; using System.Net; using System.Linq; -using System.Net.Http; using System.Text.Json; -using System.Threading; -using System.Net.Sockets; using System.Threading.Tasks; -using System.Collections.Generic; -using System.Security.Cryptography; using VNLib.Plugins; using VNLib.Utils; using VNLib.Utils.Logging; -using VNLib.Utils.Extensions; -using VNLib.Hashing; -using VNLib.Hashing.IdentityUtility; using VNLib.Data.Caching.Extensions; using VNLib.Plugins.Extensions.Loading; - +using VNLib.Data.Caching.ObjectCache.Server.Endpoints; namespace VNLib.Data.Caching.ObjectCache.Server { [ConfigurationName("cluster")] - internal sealed class NodeConfig : VnDisposeable, IAsyncConfigurable, IAsyncBackgroundWork, IBrokerHeartbeatNotifier + internal sealed class NodeConfig : VnDisposeable { const string CacheConfigTemplate = @" Cluster Configuration: - Broker Address: {ba} - Heartbeat Timeout: {hb} Node Id: {id} TlsEndabled: {tls}, Cache Endpoint: {ep} "; public CacheNodeConfiguration Config { get; } - public CacheAuthKeyStore KeyStore { get; } - - private readonly ManualResetEventSlim hearbeatHandle; - private readonly TimeSpan _hearbeatTimeout; - private string? _authToken; + public CacheAuthKeyStore KeyStore { get; } public NodeConfig(PluginBase plugin, IConfigScope config) { //Server id is just dns name for now string nodeId = Dns.GetHostName(); - Config = new(); - //Get the heartbeat interval - TimeSpan heartBeatDelayMs = config["heartbeat_timeout_sec"].GetTimeSpan(TimeParseType.Seconds); - - string brokerAddr = config["broker_address"].GetString() ?? throw new KeyNotFoundException("Missing required key 'broker_address' for config 'cluster'"); + Config = new(); //Get the port of the primary webserver int port; bool usingTls; { - JsonElement firstHost = plugin.HostConfig.GetProperty("virtual_hosts").EnumerateArray().First(); + //Get the port number of the first virtual host + JsonElement firstHost = plugin.HostConfig.GetProperty("virtual_hosts") + .EnumerateArray() + .First(); port = firstHost.GetProperty("interface") .GetProperty("port") @@ -91,147 +76,51 @@ namespace VNLib.Data.Caching.ObjectCache.Server usingTls = firstHost.TryGetProperty("ssl", out _); } - //Get the cache endpoint config - IConfigScope cacheEpConfig = plugin.GetConfigForType<ConnectEndpoint>(); - //The endpoint to advertise to cache clients that allows cache connections - UriBuilder endpoint = new(usingTls ? Uri.UriSchemeHttps : Uri.UriSchemeHttp, nodeId, port, cacheEpConfig["path"].GetString()); + Uri cacheEndpoint = GetEndpointUri<ConnectEndpoint>(plugin, usingTls, port, nodeId); + + //Init key store + KeyStore = new(plugin); //Setup cache node config - Config.WithCacheEndpoint(endpoint.Uri) + Config.WithCacheEndpoint(cacheEndpoint) .WithNodeId(nodeId) - .WithTls(usingTls) - .WithBroker(new(brokerAddr)); + .WithAuthenticator(KeyStore) + .WithTls(usingTls); //Check if advertising is enabled if(config.TryGetValue("advertise", out JsonElement adEl) && adEl.GetBoolean()) { - Config.EnableAdvertisment(true, ""); - } - - //Init key store - KeyStore = new(plugin); - - //Init heartbeat handle unsiganled waiting for first heartbeat - hearbeatHandle = new(false); - - //Schedule heartbeat - _ = plugin.ObserveWork(this, 500); + //Get the the broadcast endpoint + Uri discoveryEndpoint = GetEndpointUri<PeerDiscoveryEndpoint>(plugin, usingTls, port, nodeId); + //Enable advertising + Config.EnableAdvertisment(discoveryEndpoint); + } + + //log the config plugin.Log.Information(CacheConfigTemplate, - brokerAddr, - heartBeatDelayMs, nodeId, usingTls, - endpoint.Uri); + cacheEndpoint + ); } - async Task IAsyncConfigurable.ConfigureServiceAsync(PluginBase plugin) + private static Uri GetEndpointUri<T>(PluginBase plugin, bool usingTls, int port, string hostName) where T: IEndpoint { - //Get cache private key for signing from the key store - ReadOnlyJsonWebKey signingKey = await KeyStore.GetCachePrivateAsync(); - - Config.WithSigningKey(signingKey); - - //Get broker public key for verifying from the key store - ReadOnlyJsonWebKey brokerKey = await KeyStore.GetBrokerPublicAsync(); + //Get the cache endpoint config + IConfigScope cacheEpConfig = plugin.GetConfigForType<T>(); - Config.WithBrokerVerificationKey(brokerKey); + //The endpoint to advertise to cache clients that allows cache connections + return new UriBuilder(usingTls ? Uri.UriSchemeHttps : Uri.UriSchemeHttp, hostName, port, cacheEpConfig["path"].GetString()).Uri; } + protected override void Free() { - //Dispose the heartbeat handle - hearbeatHandle.Dispose(); - //cleanup keys - Config.SigningKey?.Dispose(); - Config.VerificationKey?.Dispose(); - Config.BrokerVerificationKey?.Dispose(); - } - - ///<inheritdoc/> - public void HearbeatReceived() - { - //Set the heartbeat handle as received - hearbeatHandle.Set(); + } - - ///<inheritdoc/> - public string? GetAuthToken() => _authToken; - - ///<inheritdoc/> - public Uri GetBrokerAddress() => Config.DiscoveryEndpoint!; - - ///<inheritdoc/> - public ReadOnlyJsonWebKey GetBrokerPublicKey() => Config.BrokerVerificationKey!; - - - /* - * Worker loop for registering with the broker and monitoring hearbeat requests - */ - async Task IAsyncBackgroundWork.DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken) - { - //Listen in loop - while (true) - { - try - { - //Regen the auth token before registering - _authToken = RandomHash.GetRandomBase32(32); - - pluginLog.Information("Registering with cache broker server with id {id}", Config.NodeId); - - //Register with the broker and pass the current auth token - await Config.RegisterWithBrokerAsync(_authToken); - - //Enter heartbeat loop - while (true) - { - //Wait for the heartbeat timeout - await Task.Delay(_hearbeatTimeout, exitToken); - - //Confrim the hearbeat was received within the timeout period - if (!hearbeatHandle.IsSet) - { - //If the heartbeat handle is not set, the heartbeat was not received, reg-register - pluginLog.Information("Broker missed hearbeat request"); - - //not received, break out of the heartbeat loop to re-register - break; - } - - //Reset the handle and continue the heartbeat loop - hearbeatHandle.Reset(); - } - - //Add random delay to prevent all nodes from re-registering at the same time - await Task.Delay(RandomNumberGenerator.GetInt32(1000, 5000), exitToken); - } - catch (OperationCanceledException) - { - pluginLog.Debug("Registration loop exited on unload"); - break; - } - catch (TimeoutException) - { - pluginLog.Warn("Failed to connect to cache broker server within the specified timeout period"); - } - catch (HttpRequestException re) when (re.InnerException is SocketException) - { - pluginLog.Warn("Cache broker is unavailable or network is unavailable"); - } - catch(HttpRequestException re) when (re.StatusCode.HasValue) - { - pluginLog.Warn("Failed to register with cache broker server, received status code {code}", re.StatusCode); - } - catch (Exception ex) - { - pluginLog.Warn("Exception occured in registraion loop: {ex}", ex!.Message); - } - } - } - } } |