From 1f2b3530ebeafa162fe4df41e691c33cb2ff0009 Mon Sep 17 00:00:00 2001 From: vman Date: Thu, 15 Dec 2022 01:45:03 -0500 Subject: JWK sigs, session cleanup v1 --- .../SessionCacheServer/ObjectCacheServerEntry.cs | 137 +++++++++++++-------- 1 file changed, 86 insertions(+), 51 deletions(-) (limited to 'Plugins/SessionCacheServer/ObjectCacheServerEntry.cs') diff --git a/Plugins/SessionCacheServer/ObjectCacheServerEntry.cs b/Plugins/SessionCacheServer/ObjectCacheServerEntry.cs index 17d8ba5..20a6268 100644 --- a/Plugins/SessionCacheServer/ObjectCacheServerEntry.cs +++ b/Plugins/SessionCacheServer/ObjectCacheServerEntry.cs @@ -33,12 +33,12 @@ using System.Net.Sockets; using System.Threading.Tasks; using System.Collections.Generic; using System.Security.Cryptography; -using System.Collections.Concurrent; using VNLib.Utils.Memory; using VNLib.Utils.Logging; using VNLib.Utils.Extensions; using VNLib.Hashing; +using VNLib.Hashing.IdentityUtility; using VNLib.Data.Caching; using VNLib.Data.Caching.Extensions; using VNLib.Data.Caching.ObjectCache; @@ -49,7 +49,7 @@ using VNLib.Plugins.Cache.Broker.Endpoints; using VNLib.Plugins.Extensions.Loading; using VNLib.Plugins.Extensions.Loading.Routing; using VNLib.Plugins.Essentials.Sessions.Server.Endpoints; -using VNLib.Utils.Memory.Caching; + namespace VNLib.Plugins.Essentials.Sessions.Server { @@ -58,7 +58,19 @@ namespace VNLib.Plugins.Essentials.Sessions.Server public override string PluginName => "ObjectCache.Service"; private string? BrokerHeartBeatToken; - + + private readonly object ServerLock = new(); + private readonly HashSet ListeningServers = new(); + + + private void RemoveServer(ActiveServer server) + { + lock (ServerLock) + { + ListeningServers.Remove(server); + } + } + protected override void OnLoad() { //Create default heap @@ -153,8 +165,7 @@ namespace VNLib.Plugins.Essentials.Sessions.Server try { //Get the broker config element - IReadOnlyDictionary clusterConfig = this.GetConfig("cluster"); - + IReadOnlyDictionary clusterConfig = this.GetConfig("cluster"); //Server id is just dns name for now string serverId = Dns.GetHostName(); @@ -179,11 +190,9 @@ namespace VNLib.Plugins.Essentials.Sessions.Server using BrokerRegistrationRequest request = new(); { string addr = clusterConfig["broker_address"].GetString() ?? throw new KeyNotFoundException("Missing required key 'broker_address' for config 'cluster'"); - - //Try to get the cache private key - string base64Priv = await this.TryGetSecretAsync("cache_private_key") ?? throw new KeyNotFoundException("Failed to load the cache private key"); - ReadOnlyMemory privKey = Convert.FromBase64String(base64Priv); + //Recover the certificate + ReadOnlyJsonWebKey cacheCert = await GetCachePrivate(); //Init url builder for payload, see if tls is enabled Uri connectAddress = new UriBuilder(usingTls ? Uri.UriSchemeHttps : Uri.UriSchemeHttp, Dns.GetHostName(), port, connectPath).Uri; @@ -191,10 +200,7 @@ namespace VNLib.Plugins.Essentials.Sessions.Server request.WithBroker(new(addr)) .WithRegistrationAddress(connectAddress.ToString()) .WithNodeId(serverId) - .WithPrivateKey(privKey.Span); - //Wipe memory - Memory.UnsafeZeroMemory(base64Priv); - Memory.UnsafeZeroMemory(privKey); + .WithSigningKey(cacheCert, true); } while (true) @@ -279,6 +285,19 @@ namespace VNLib.Plugins.Essentials.Sessions.Server #region Cluster + private async Task GetCachePrivate() + { + using SecretResult secret = await this.TryGetSecretAsync("cache_private_key") ?? throw new KeyNotFoundException("Failed to load the cache private key"); + return secret.GetJsonWebKey(); + } + + private async Task GetBrokerPublic() + { + using SecretResult secret = await this.TryGetSecretAsync("broker_public_key") ?? throw new KeyNotFoundException("Failed to load the broker's public key"); + return secret.GetJsonWebKey(); + } + + /// /// Starts a self-contained process-long task to discover other cache servers /// from a shared broker server @@ -291,31 +310,27 @@ namespace VNLib.Plugins.Essentials.Sessions.Server private async Task RunClientAsync(ObjectCacheStore cacheStore, Uri brokerAddress, FBMClientConfig clientConf) { TimeSpan noServerDelay = TimeSpan.FromSeconds(10); - //Init signing algs - ReadOnlyMemory clientPrivKey = null; - ReadOnlyMemory brokerPubKey = null; string nodeId = Dns.GetHostName(); + ListServerRequest listRequest = new(brokerAddress); try { //Get the broker config element IReadOnlyDictionary clusterConf = this.GetConfig("cluster"); int serverCheckMs = clusterConf["update_interval_sec"].GetInt32() * 1000; - //Get client priv key from secret store - string cpk = await this.TryGetSecretAsync("client_private_key") ?? throw new KeyNotFoundException("Failed to get the client private key from config"); - string bpub = await this.TryGetSecretAsync("broker_public_key") ?? throw new KeyNotFoundException("Failed to get the broker public key from config"); + //Setup signing and verification certificates + ReadOnlyJsonWebKey cacheSig = await GetCachePrivate(); + ReadOnlyJsonWebKey brokerPub = await GetBrokerPublic(); + //Import certificates + listRequest.WithVerificationKey(brokerPub) + .WithSigningKey(cacheSig); - //Load client private key - clientPrivKey = Convert.FromBase64String(cpk); - //Import broker public key - brokerPubKey = Convert.FromBase64String(bpub); - - //Concurrent dict to track remote servers - ConcurrentDictionary ActiveServers = new(); //Main event loop - Log.Information("Discovering available cluster nodes in broker"); + Log.Information("Begining cluster node discovery"); + + ILogProvider? debugLog = this.IsDebug() ? Log : null; - while (!UnloadToken.IsCancellationRequested) + while (true) { //Load the server list ActiveServer[]? servers; @@ -323,8 +338,10 @@ namespace VNLib.Plugins.Essentials.Sessions.Server { try { + debugLog?.Information("[CACHE] Requesting server list from broker"); + //Get server list - servers = await FBMDataCacheExtensions.ListServersAsync(brokerAddress, clientPrivKey, brokerPubKey, UnloadToken); + servers = await FBMDataCacheExtensions.ListServersAsync(listRequest, UnloadToken); //Servers are loaded, so continue break; } @@ -345,23 +362,37 @@ namespace VNLib.Plugins.Essentials.Sessions.Server //Delay await Task.Delay(randomMsDelay, UnloadToken); } - if(servers?.Length == 0) + + if(servers == null || servers.Length == 0) { Log.Information("No cluster nodes found, retrying"); //Delay await Task.Delay(noServerDelay, UnloadToken); continue; } + - //Select servers that are not the current server and are not already being monitored - IEnumerable serversToConnectTo = servers.Where(s => s.ServerId != nodeId) - .Where(s => !ActiveServers.ContainsKey(s.ServerId!)); - //Connect to servers - foreach (ActiveServer server in serversToConnectTo) + //Lock on sever set while enumerating + lock (ServerLock) { - _ = RunSyncTaskAsync(server, ActiveServers, cacheStore, clientConf, clientPrivKey, nodeId) - .ConfigureAwait(false); + //Select servers that are not the current server and are not already being monitored + IEnumerable serversToConnectTo = servers.Where(s => !nodeId.Equals(s.ServerId, StringComparison.OrdinalIgnoreCase)); + + //Connect to servers + foreach (ActiveServer server in serversToConnectTo) + { + //Make sure were not currently connected to the server + if (!ListeningServers.Contains(server)) + { + //Add the server to the set + ListeningServers.Add(server); + + //Run listener background task + _ = this.DeferTask(() => RunSyncTaskAsync(server, cacheStore, clientConf, nodeId)); + } + } } + //Delay until next check cycle await Task.Delay(serverCheckMs, UnloadToken); } @@ -383,19 +414,15 @@ namespace VNLib.Plugins.Essentials.Sessions.Server } finally { - Memory.UnsafeZeroMemory(clientPrivKey); - Memory.UnsafeZeroMemory(brokerPubKey); + listRequest.Dispose(); } Log.Debug("Cluster sync worker exited"); } - private async Task RunSyncTaskAsync(ActiveServer server, ConcurrentDictionary activeList, - ObjectCacheStore cacheStore, FBMClientConfig conf, ReadOnlyMemory privateKey, string nodeId) + private async Task RunSyncTaskAsync(ActiveServer server, ObjectCacheStore cacheStore, FBMClientConfig conf, string nodeId) { //Setup client FBMClient client = new(conf); - //Add server to active list, or replace its old value with the new one - activeList.AddOrUpdate(server.ServerId, server, (old, update) => server); try { async Task UpdateRecordAsync(string objectId, string newId) @@ -413,6 +440,7 @@ namespace VNLib.Plugins.Essentials.Sessions.Server using FBMResponse response = await client.SendAsync(modRequest, UnloadToken); response.ThrowIfNotSet(); + //Check response code string status = response.Headers.First(static s => s.Key == HeaderCommand.Status).Value.ToString(); if (ResponseCodes.Okay.Equals(status, StringComparison.Ordinal)) @@ -431,14 +459,21 @@ namespace VNLib.Plugins.Essentials.Sessions.Server client.ReturnRequest(modRequest); } } + + { + //Sign and verify requests with the cache private key since we are a peer + ReadOnlyJsonWebKey cachePriv = await GetCachePrivate(); + + //Configure cache + client.GetCacheConfiguration() + .WithVerificationKey(cachePriv) + .WithSigningCertificate(cachePriv) + .WithNodeId(nodeId) //set nodeid since were listening for changes + .WithTls(false); + } + + Log.Information("Connecting to {server}...", server.ServerId); - //Configure cache - client.GetCacheConfiguration() - .ImportVerificationKey(privateKey.Span) - .ImportVerificationKey(null) - .WithNodeId(nodeId) //set nodeid since were listening for changes - .WithTls(false); - //Connect to the server await client.ConnectToCacheAsync(server, UnloadToken); @@ -518,7 +553,7 @@ namespace VNLib.Plugins.Essentials.Sessions.Server finally { //Remove server from active list, since its been disconnected - _ = activeList.TryRemove(server.ServerId, out _); + RemoveServer(server); client.Dispose(); } } -- cgit