aboutsummaryrefslogtreecommitdiff
path: root/plugins/ObjectCacheServer/src/NodeConfig.cs
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/ObjectCacheServer/src/NodeConfig.cs')
-rw-r--r--plugins/ObjectCacheServer/src/NodeConfig.cs177
1 files changed, 33 insertions, 144 deletions
diff --git a/plugins/ObjectCacheServer/src/NodeConfig.cs b/plugins/ObjectCacheServer/src/NodeConfig.cs
index 614f0d6..81b8a32 100644
--- a/plugins/ObjectCacheServer/src/NodeConfig.cs
+++ b/plugins/ObjectCacheServer/src/NodeConfig.cs
@@ -3,9 +3,9 @@
*
* Library: VNLib
* Package: ObjectCacheServer
-* File: ObjectCacheServerEntry.cs
+* File: NodeConfig.cs
*
-* ObjectCacheServerEntry.cs is part of ObjectCacheServer which is part of the larger
+* NodeConfig.cs is part of ObjectCacheServer which is part of the larger
* VNLib collection of libraries and utilities.
*
* ObjectCacheServer is free software: you can redistribute it and/or modify
@@ -25,63 +25,48 @@
using System;
using System.Net;
using System.Linq;
-using System.Net.Http;
using System.Text.Json;
-using System.Threading;
-using System.Net.Sockets;
using System.Threading.Tasks;
-using System.Collections.Generic;
-using System.Security.Cryptography;
using VNLib.Plugins;
using VNLib.Utils;
using VNLib.Utils.Logging;
-using VNLib.Utils.Extensions;
-using VNLib.Hashing;
-using VNLib.Hashing.IdentityUtility;
using VNLib.Data.Caching.Extensions;
using VNLib.Plugins.Extensions.Loading;
-
+using VNLib.Data.Caching.ObjectCache.Server.Endpoints;
namespace VNLib.Data.Caching.ObjectCache.Server
{
[ConfigurationName("cluster")]
- internal sealed class NodeConfig : VnDisposeable, IAsyncConfigurable, IAsyncBackgroundWork, IBrokerHeartbeatNotifier
+ internal sealed class NodeConfig : VnDisposeable
{
const string CacheConfigTemplate =
@"
Cluster Configuration:
- Broker Address: {ba}
- Heartbeat Timeout: {hb}
Node Id: {id}
TlsEndabled: {tls},
Cache Endpoint: {ep}
";
public CacheNodeConfiguration Config { get; }
- public CacheAuthKeyStore KeyStore { get; }
-
- private readonly ManualResetEventSlim hearbeatHandle;
- private readonly TimeSpan _hearbeatTimeout;
- private string? _authToken;
+ public CacheAuthKeyStore KeyStore { get; }
public NodeConfig(PluginBase plugin, IConfigScope config)
{
//Server id is just dns name for now
string nodeId = Dns.GetHostName();
- Config = new();
- //Get the heartbeat interval
- TimeSpan heartBeatDelayMs = config["heartbeat_timeout_sec"].GetTimeSpan(TimeParseType.Seconds);
-
- string brokerAddr = config["broker_address"].GetString() ?? throw new KeyNotFoundException("Missing required key 'broker_address' for config 'cluster'");
+ Config = new();
//Get the port of the primary webserver
int port;
bool usingTls;
{
- JsonElement firstHost = plugin.HostConfig.GetProperty("virtual_hosts").EnumerateArray().First();
+ //Get the port number of the first virtual host
+ JsonElement firstHost = plugin.HostConfig.GetProperty("virtual_hosts")
+ .EnumerateArray()
+ .First();
port = firstHost.GetProperty("interface")
.GetProperty("port")
@@ -91,147 +76,51 @@ namespace VNLib.Data.Caching.ObjectCache.Server
usingTls = firstHost.TryGetProperty("ssl", out _);
}
- //Get the cache endpoint config
- IConfigScope cacheEpConfig = plugin.GetConfigForType<ConnectEndpoint>();
-
//The endpoint to advertise to cache clients that allows cache connections
- UriBuilder endpoint = new(usingTls ? Uri.UriSchemeHttps : Uri.UriSchemeHttp, nodeId, port, cacheEpConfig["path"].GetString());
+ Uri cacheEndpoint = GetEndpointUri<ConnectEndpoint>(plugin, usingTls, port, nodeId);
+
+ //Init key store
+ KeyStore = new(plugin);
//Setup cache node config
- Config.WithCacheEndpoint(endpoint.Uri)
+ Config.WithCacheEndpoint(cacheEndpoint)
.WithNodeId(nodeId)
- .WithTls(usingTls)
- .WithBroker(new(brokerAddr));
+ .WithAuthenticator(KeyStore)
+ .WithTls(usingTls);
//Check if advertising is enabled
if(config.TryGetValue("advertise", out JsonElement adEl) && adEl.GetBoolean())
{
- Config.EnableAdvertisment(true, "");
- }
-
- //Init key store
- KeyStore = new(plugin);
-
- //Init heartbeat handle unsiganled waiting for first heartbeat
- hearbeatHandle = new(false);
-
- //Schedule heartbeat
- _ = plugin.ObserveWork(this, 500);
+ //Get the the broadcast endpoint
+ Uri discoveryEndpoint = GetEndpointUri<PeerDiscoveryEndpoint>(plugin, usingTls, port, nodeId);
+ //Enable advertising
+ Config.EnableAdvertisment(discoveryEndpoint);
+ }
+
+
//log the config
plugin.Log.Information(CacheConfigTemplate,
- brokerAddr,
- heartBeatDelayMs,
nodeId,
usingTls,
- endpoint.Uri);
+ cacheEndpoint
+ );
}
- async Task IAsyncConfigurable.ConfigureServiceAsync(PluginBase plugin)
+ private static Uri GetEndpointUri<T>(PluginBase plugin, bool usingTls, int port, string hostName) where T: IEndpoint
{
- //Get cache private key for signing from the key store
- ReadOnlyJsonWebKey signingKey = await KeyStore.GetCachePrivateAsync();
-
- Config.WithSigningKey(signingKey);
-
- //Get broker public key for verifying from the key store
- ReadOnlyJsonWebKey brokerKey = await KeyStore.GetBrokerPublicAsync();
+ //Get the cache endpoint config
+ IConfigScope cacheEpConfig = plugin.GetConfigForType<T>();
- Config.WithBrokerVerificationKey(brokerKey);
+ //The endpoint to advertise to cache clients that allows cache connections
+ return new UriBuilder(usingTls ? Uri.UriSchemeHttps : Uri.UriSchemeHttp, hostName, port, cacheEpConfig["path"].GetString()).Uri;
}
+
protected override void Free()
{
- //Dispose the heartbeat handle
- hearbeatHandle.Dispose();
-
//cleanup keys
- Config.SigningKey?.Dispose();
- Config.VerificationKey?.Dispose();
- Config.BrokerVerificationKey?.Dispose();
- }
-
- ///<inheritdoc/>
- public void HearbeatReceived()
- {
- //Set the heartbeat handle as received
- hearbeatHandle.Set();
+
}
-
- ///<inheritdoc/>
- public string? GetAuthToken() => _authToken;
-
- ///<inheritdoc/>
- public Uri GetBrokerAddress() => Config.DiscoveryEndpoint!;
-
- ///<inheritdoc/>
- public ReadOnlyJsonWebKey GetBrokerPublicKey() => Config.BrokerVerificationKey!;
-
-
- /*
- * Worker loop for registering with the broker and monitoring hearbeat requests
- */
- async Task IAsyncBackgroundWork.DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken)
- {
- //Listen in loop
- while (true)
- {
- try
- {
- //Regen the auth token before registering
- _authToken = RandomHash.GetRandomBase32(32);
-
- pluginLog.Information("Registering with cache broker server with id {id}", Config.NodeId);
-
- //Register with the broker and pass the current auth token
- await Config.RegisterWithBrokerAsync(_authToken);
-
- //Enter heartbeat loop
- while (true)
- {
- //Wait for the heartbeat timeout
- await Task.Delay(_hearbeatTimeout, exitToken);
-
- //Confrim the hearbeat was received within the timeout period
- if (!hearbeatHandle.IsSet)
- {
- //If the heartbeat handle is not set, the heartbeat was not received, reg-register
- pluginLog.Information("Broker missed hearbeat request");
-
- //not received, break out of the heartbeat loop to re-register
- break;
- }
-
- //Reset the handle and continue the heartbeat loop
- hearbeatHandle.Reset();
- }
-
- //Add random delay to prevent all nodes from re-registering at the same time
- await Task.Delay(RandomNumberGenerator.GetInt32(1000, 5000), exitToken);
- }
- catch (OperationCanceledException)
- {
- pluginLog.Debug("Registration loop exited on unload");
- break;
- }
- catch (TimeoutException)
- {
- pluginLog.Warn("Failed to connect to cache broker server within the specified timeout period");
- }
- catch (HttpRequestException re) when (re.InnerException is SocketException)
- {
- pluginLog.Warn("Cache broker is unavailable or network is unavailable");
- }
- catch(HttpRequestException re) when (re.StatusCode.HasValue)
- {
- pluginLog.Warn("Failed to register with cache broker server, received status code {code}", re.StatusCode);
- }
- catch (Exception ex)
- {
- pluginLog.Warn("Exception occured in registraion loop: {ex}", ex!.Message);
- }
- }
- }
-
}
}