diff options
author | vman <public@vaughnnugent.com> | 2022-12-15 01:45:03 -0500 |
---|---|---|
committer | vman <public@vaughnnugent.com> | 2022-12-15 01:45:03 -0500 |
commit | 1f2b3530ebeafa162fe4df41e691c33cb2ff0009 (patch) | |
tree | 7f60d7c761cee2df89303c3ef0550743790a63e2 /Plugins/SessionCacheServer | |
parent | a0d5a8d40de9806e21e64475e3297a2a84effe22 (diff) |
JWK sigs, session cleanup v1
Diffstat (limited to 'Plugins/SessionCacheServer')
4 files changed, 171 insertions, 142 deletions
diff --git a/Plugins/SessionCacheServer/Endpoints/BrokerHeartBeat.cs b/Plugins/SessionCacheServer/Endpoints/BrokerHeartBeat.cs index bad0f7a..2e380a3 100644 --- a/Plugins/SessionCacheServer/Endpoints/BrokerHeartBeat.cs +++ b/Plugins/SessionCacheServer/Endpoints/BrokerHeartBeat.cs @@ -29,9 +29,7 @@ using System.Text.Json; using System.Threading; using System.Threading.Tasks; using System.Collections.Generic; -using System.Security.Cryptography; -using VNLib.Data.Caching.Extensions; using VNLib.Hashing.IdentityUtility; using VNLib.Plugins.Essentials.Endpoints; using VNLib.Plugins.Essentials.Extensions; @@ -48,7 +46,12 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints private readonly Task<IPAddress[]> BrokerIpList; private readonly PluginBase Pbase; - protected override ProtectionSettings EndpointProtectionSettings { get; } + protected override ProtectionSettings EndpointProtectionSettings { get; } = new() + { + DisableBrowsersOnly = true, + DisableSessionsRequired = true, + DisableVerifySessionCors = true + }; public BrokerHeartBeat(Func<string> token, ManualResetEvent keepaliveSet, Uri brokerUri, PluginBase pbase) { @@ -57,20 +60,13 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints BrokerIpList = Dns.GetHostAddressesAsync(brokerUri.DnsSafeHost); this.Pbase = pbase; - - EndpointProtectionSettings = new() - { - BrowsersOnly = false, - SessionsRequired = false, - VerifySessionCors = false, - }; } - private async Task<byte[]> GetBrokerPubAsync() + private async Task<ReadOnlyJsonWebKey> GetBrokerPubAsync() { - string? brokerPubKey = await Pbase.TryGetSecretAsync("broker_public_key") ?? throw new KeyNotFoundException("Missing required secret : broker_public_key"); + using SecretResult brokerPubKey = await Pbase.TryGetSecretAsync("broker_public_key") ?? throw new KeyNotFoundException("Missing required secret : broker_public_key"); - return Convert.FromBase64String(brokerPubKey); + return brokerPubKey.GetJsonWebKey(); } protected override async ValueTask<VfReturnType> GetAsync(HttpEntity entity) @@ -89,35 +85,36 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints } //Get the authorization jwt string? jwtAuth = entity.Server.Headers[HttpRequestHeader.Authorization]; + if (string.IsNullOrWhiteSpace(jwtAuth)) { //Token invalid entity.CloseResponse(HttpStatusCode.Forbidden); return VfReturnType.VirtualSkip; } + //Parse the jwt using JsonWebToken jwt = JsonWebToken.Parse(jwtAuth); - //Init signature alg - using (ECDsa alg = ECDsa.Create(FBMDataCacheExtensions.CacheCurve)) - { - //Get pub key - byte[] key = await GetBrokerPubAsync(); - alg.ImportSubjectPublicKeyInfo(key, out _); + //Verify the jwt using the broker's public key certificate + using (ReadOnlyJsonWebKey cert = await GetBrokerPubAsync()) + { //Verify the jwt - if (!jwt.Verify(alg, FBMDataCacheExtensions.CacheJwtAlgorithm)) + if (!jwt.VerifyFromJwk(cert)) { //Token invalid entity.CloseResponse(HttpStatusCode.Forbidden); return VfReturnType.VirtualSkip; } } + string? auth; //Recover the auth token from the jwt using (JsonDocument doc = jwt.GetPayload()) { auth = doc.RootElement.GetProperty("token").GetString(); } + //Verify token if(Token().Equals(auth, StringComparison.Ordinal)) { @@ -126,6 +123,7 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints entity.CloseResponse(HttpStatusCode.OK); return VfReturnType.VirtualSkip; } + //Token invalid entity.CloseResponse(HttpStatusCode.Forbidden); return VfReturnType.VirtualSkip; diff --git a/Plugins/SessionCacheServer/Endpoints/ConnectEndpoint.cs b/Plugins/SessionCacheServer/Endpoints/ConnectEndpoint.cs index fee1ea7..77acb13 100644 --- a/Plugins/SessionCacheServer/Endpoints/ConnectEndpoint.cs +++ b/Plugins/SessionCacheServer/Endpoints/ConnectEndpoint.cs @@ -24,25 +24,21 @@ using System; using System.Net; -using System.Text; using System.Text.Json; using System.Threading; using System.Threading.Tasks; using System.Threading.Channels; using System.Collections.Generic; -using System.Security.Cryptography; using System.Collections.Concurrent; using VNLib.Net.Http; using VNLib.Hashing; using VNLib.Utils.Async; -using VNLib.Utils.Memory; using VNLib.Utils.Logging; using VNLib.Hashing.IdentityUtility; using VNLib.Net.Messaging.FBM; using VNLib.Net.Messaging.FBM.Client; using VNLib.Net.Messaging.FBM.Server; -using VNLib.Data.Caching.Extensions; using VNLib.Data.Caching.ObjectCache; using VNLib.Plugins.Extensions.Loading; using VNLib.Plugins.Essentials.Endpoints; @@ -77,9 +73,9 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints //Loosen up protection settings protected override ProtectionSettings EndpointProtectionSettings { get; } = new() { - BrowsersOnly = false, - SessionsRequired = false, - CrossSiteDenied = false + DisableBrowsersOnly = true, + DisableSessionsRequired = true, + DisableCrossSiteDenied = true }; public ConnectEndpoint(string path, ObjectCacheStore store, PluginBase pbase) @@ -121,20 +117,33 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints string? nodeId = null; string? challenge = null; + bool isPeer = false; // Parse jwt using (JsonWebToken jwt = JsonWebToken.Parse(jwtAuth)) { - //Get the client public key - byte[] clientPub = await GetClientPubAsync(); + bool verified = false; - //Init sig alg - using ECDsa verAlg = ECDsa.Create(FBMDataCacheExtensions.CacheCurve); - //Import client pub key - verAlg.ImportSubjectPublicKeyInfo(clientPub, out _); - - //verify signature for client - if (!jwt.Verify(verAlg, in FBMDataCacheExtensions.CacheJwtAlgorithm)) + //Get the client public key certificate to verify the client's message + using(ReadOnlyJsonWebKey cert = await GetClientPubAsync()) + { + //verify signature for client + if (jwt.VerifyFromJwk(cert)) + { + verified = true; + } + //May be signed by a cahce server + else + { + using ReadOnlyJsonWebKey cacheCert = await GetCachePubAsync(); + + //Set peer and verified flag since the another cache server signed the request + isPeer = verified = jwt.VerifyFromJwk(cacheCert); + } + } + + //Check flag + if (!verified) { Log.Information("Client signature verification failed"); entity.CloseResponse(HttpStatusCode.Unauthorized); @@ -154,36 +163,28 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints } Log.Debug("Received negotiation request from node {node}", nodeId); - //Verified, now we can create an auth message with a short expiration using JsonWebToken auth = new(); - auth.WriteHeader(FBMDataCacheExtensions.JwtMessageHeader.Span); - auth.InitPayloadClaim() - .AddClaim("aud", AudienceLocalServerId) - .AddClaim("exp", DateTimeOffset.UtcNow.Add(AuthTokenExpiration).ToUnixTimeSeconds()) - .AddClaim("nonce", RandomHash.GetRandomHex(8)) - .AddClaim("chl", challenge) - //Specify the server's node id if set - .AddClaim("sub", nodeId) - //Add negotiaion args - .AddClaim(FBMClient.REQ_HEAD_BUF_QUERY_ARG, MAX_HEAD_BUF_SIZE) - .AddClaim(FBMClient.REQ_RECV_BUF_QUERY_ARG, MAX_RECV_BUF_SIZE) - .AddClaim(FBMClient.REQ_MAX_MESS_QUERY_ARG, MAX_MESSAGE_SIZE) - .CommitClaims(); - - //Sign the auth message - ECDsa sigAlg = ECDsa.Create(FBMDataCacheExtensions.CacheCurve); - byte[] cachePrivate = await GetCachePrivateKeyAsync(); - try + //Sign the auth message from the cache certificate's private key + using (ReadOnlyJsonWebKey cert = await GetCachePrivateKeyAsync()) { - sigAlg.ImportPkcs8PrivateKey(cachePrivate, out _); - //sign jwt - auth.Sign(sigAlg, in FBMDataCacheExtensions.CacheJwtAlgorithm, 256); - } - finally - { - Memory.InitializeBlock(cachePrivate.AsSpan()); - sigAlg.Dispose(); + auth.WriteHeader(cert.JwtHeader); + auth.InitPayloadClaim() + .AddClaim("aud", AudienceLocalServerId) + .AddClaim("exp", DateTimeOffset.UtcNow.Add(AuthTokenExpiration).ToUnixTimeSeconds()) + .AddClaim("nonce", RandomHash.GetRandomBase32(8)) + .AddClaim("chl", challenge!) + //Set the ispeer flag if the request was signed by a cache server + .AddClaim("isPeer", isPeer) + //Specify the server's node id if set + .AddClaim("sub", nodeId!) + //Add negotiaion args + .AddClaim(FBMClient.REQ_HEAD_BUF_QUERY_ARG, MAX_HEAD_BUF_SIZE) + .AddClaim(FBMClient.REQ_RECV_BUF_QUERY_ARG, MAX_RECV_BUF_SIZE) + .AddClaim(FBMClient.REQ_MAX_MESS_QUERY_ARG, MAX_MESSAGE_SIZE) + .CommitClaims(); + + auth.SignFromJwk(cert); } //Close response @@ -191,27 +192,23 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints return VfReturnType.VirtualSkip; } - private async Task<byte[]> GetClientPubAsync() + private async Task<ReadOnlyJsonWebKey> GetClientPubAsync() { - string? brokerPubKey = await Pbase.TryGetSecretAsync("client_public_key") ?? throw new KeyNotFoundException("Missing required secret : client_public_key"); - - return Convert.FromBase64String(brokerPubKey); + using SecretResult brokerPubKey = await Pbase.TryGetSecretAsync("client_public_key") ?? throw new KeyNotFoundException("Missing required secret : client_public_key"); + + return brokerPubKey.GetJsonWebKey(); } - private async Task<byte[]> GetCachePubAsync() + private async Task<ReadOnlyJsonWebKey> GetCachePubAsync() { - string? brokerPubKey = await Pbase.TryGetSecretAsync("cache_public_key") ?? throw new KeyNotFoundException("Missing required secret : client_public_key"); + using SecretResult cachPublic = await Pbase.TryGetSecretAsync("cache_public_key") ?? throw new KeyNotFoundException("Missing required secret : client_public_key"); - return Convert.FromBase64String(brokerPubKey); + return cachPublic.GetJsonWebKey(); } - private async Task<byte[]> GetCachePrivateKeyAsync() + private async Task<ReadOnlyJsonWebKey> GetCachePrivateKeyAsync() { - string? cachePrivate = await Pbase.TryGetSecretAsync("cache_private_key") ?? throw new KeyNotFoundException("Missing required secret : client_public_key"); - - byte[] data = Convert.FromBase64String(cachePrivate); + using SecretResult cachePrivate = await Pbase.TryGetSecretAsync("cache_private_key") ?? throw new KeyNotFoundException("Missing required secret : client_public_key"); - Memory.UnsafeZeroMemory<char>(cachePrivate); - - return data; + return cachePrivate.GetJsonWebKey(); } private async Task ChangeWorkerAsync(CancellationToken cancellation) @@ -265,16 +262,11 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints //Parse jwt using (JsonWebToken jwt = JsonWebToken.Parse(jwtAuth)) { - //Init sig alg, we will verify that the token was signed by this server - using (ECDsa sigAlg = ECDsa.Create(FBMDataCacheExtensions.CacheCurve)) + //Get the client public key certificate to verify the client's message + using (ReadOnlyJsonWebKey cert = await GetCachePubAsync()) { - //Get the cache public key - byte[] cachePub = await GetCachePubAsync(); - - sigAlg.ImportSubjectPublicKeyInfo(cachePub, out _); - - //verify signature against the cache public key, since this server have signed it - if (!jwt.Verify(sigAlg, in FBMDataCacheExtensions.CacheJwtAlgorithm)) + //verify signature against the cache public key, since this server must have signed it + if (!jwt.VerifyFromJwk(cert)) { entity.CloseResponse(HttpStatusCode.Unauthorized); return VfReturnType.VirtualSkip; @@ -286,8 +278,7 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints //Verify audience, expiration - if (!doc.RootElement.TryGetProperty("aud", out JsonElement audEl) - || AudienceLocalServerId.Equals(audEl.GetString(), StringComparison.OrdinalIgnoreCase)) + if (!doc.RootElement.TryGetProperty("aud", out JsonElement audEl) || !AudienceLocalServerId.Equals(audEl.GetString(), StringComparison.OrdinalIgnoreCase)) { entity.CloseResponse(HttpStatusCode.Unauthorized); return VfReturnType.VirtualSkip; @@ -300,8 +291,11 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints return VfReturnType.VirtualSkip; } - //The node id is optional and stored in the 'sub' field - if (doc.RootElement.TryGetProperty("sub", out JsonElement servIdEl)) + //Check if the client is a peer + bool isPeer = doc.RootElement.TryGetProperty("isPeer", out JsonElement isPeerEl) && isPeerEl.GetBoolean(); + + //The node id is optional and stored in the 'sub' field, ignore if the client is not a peer + if (isPeer && doc.RootElement.TryGetProperty("sub", out JsonElement servIdEl)) { nodeId = servIdEl.GetString(); } @@ -342,6 +336,7 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints //Get the queue nodeQueue = StatefulEventQueue[nodeId]; } + //Init new ws state object and clamp the suggested buffer sizes WsUserState state = new() { @@ -363,6 +358,7 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints return VfReturnType.BadRequest; } } + private async Task WebsocketAcceptedAsync(WebSocketSession wss) { //Inc connected count diff --git a/Plugins/SessionCacheServer/ObjectCacheServer.csproj b/Plugins/SessionCacheServer/ObjectCacheServer.csproj index 01b33f3..2cf298d 100644 --- a/Plugins/SessionCacheServer/ObjectCacheServer.csproj +++ b/Plugins/SessionCacheServer/ObjectCacheServer.csproj @@ -37,7 +37,7 @@ </PackageReference> </ItemGroup> <ItemGroup> - <ProjectReference Include="..\..\..\..\VNLib\Essentials\VNLib.Plugins.Essentials.csproj" /> + <ProjectReference Include="..\..\..\..\VNLib\Essentials\src\VNLib.Plugins.Essentials.csproj" /> <ProjectReference Include="..\..\..\DataCaching\VNLib.Data.Caching.Extensions\VNLib.Data.Caching.Extensions.csproj" /> <ProjectReference Include="..\..\..\DataCaching\VNLib.Data.Caching.ObjectCache\VNLib.Data.Caching.ObjectCache.csproj" /> <ProjectReference Include="..\..\..\PluginBase\VNLib.Plugins.PluginBase.csproj" /> diff --git a/Plugins/SessionCacheServer/ObjectCacheServerEntry.cs b/Plugins/SessionCacheServer/ObjectCacheServerEntry.cs index 17d8ba5..20a6268 100644 --- a/Plugins/SessionCacheServer/ObjectCacheServerEntry.cs +++ b/Plugins/SessionCacheServer/ObjectCacheServerEntry.cs @@ -33,12 +33,12 @@ using System.Net.Sockets; using System.Threading.Tasks; using System.Collections.Generic; using System.Security.Cryptography; -using System.Collections.Concurrent; using VNLib.Utils.Memory; using VNLib.Utils.Logging; using VNLib.Utils.Extensions; using VNLib.Hashing; +using VNLib.Hashing.IdentityUtility; using VNLib.Data.Caching; using VNLib.Data.Caching.Extensions; using VNLib.Data.Caching.ObjectCache; @@ -49,7 +49,7 @@ using VNLib.Plugins.Cache.Broker.Endpoints; using VNLib.Plugins.Extensions.Loading; using VNLib.Plugins.Extensions.Loading.Routing; using VNLib.Plugins.Essentials.Sessions.Server.Endpoints; -using VNLib.Utils.Memory.Caching; + namespace VNLib.Plugins.Essentials.Sessions.Server { @@ -58,7 +58,19 @@ namespace VNLib.Plugins.Essentials.Sessions.Server public override string PluginName => "ObjectCache.Service"; private string? BrokerHeartBeatToken; - + + private readonly object ServerLock = new(); + private readonly HashSet<ActiveServer> ListeningServers = new(); + + + private void RemoveServer(ActiveServer server) + { + lock (ServerLock) + { + ListeningServers.Remove(server); + } + } + protected override void OnLoad() { //Create default heap @@ -153,8 +165,7 @@ namespace VNLib.Plugins.Essentials.Sessions.Server try { //Get the broker config element - IReadOnlyDictionary<string, JsonElement> clusterConfig = this.GetConfig("cluster"); - + IReadOnlyDictionary<string, JsonElement> clusterConfig = this.GetConfig("cluster"); //Server id is just dns name for now string serverId = Dns.GetHostName(); @@ -179,11 +190,9 @@ namespace VNLib.Plugins.Essentials.Sessions.Server using BrokerRegistrationRequest request = new(); { string addr = clusterConfig["broker_address"].GetString() ?? throw new KeyNotFoundException("Missing required key 'broker_address' for config 'cluster'"); - - //Try to get the cache private key - string base64Priv = await this.TryGetSecretAsync("cache_private_key") ?? throw new KeyNotFoundException("Failed to load the cache private key"); - ReadOnlyMemory<byte> privKey = Convert.FromBase64String(base64Priv); + //Recover the certificate + ReadOnlyJsonWebKey cacheCert = await GetCachePrivate(); //Init url builder for payload, see if tls is enabled Uri connectAddress = new UriBuilder(usingTls ? Uri.UriSchemeHttps : Uri.UriSchemeHttp, Dns.GetHostName(), port, connectPath).Uri; @@ -191,10 +200,7 @@ namespace VNLib.Plugins.Essentials.Sessions.Server request.WithBroker(new(addr)) .WithRegistrationAddress(connectAddress.ToString()) .WithNodeId(serverId) - .WithPrivateKey(privKey.Span); - //Wipe memory - Memory.UnsafeZeroMemory<char>(base64Priv); - Memory.UnsafeZeroMemory(privKey); + .WithSigningKey(cacheCert, true); } while (true) @@ -279,6 +285,19 @@ namespace VNLib.Plugins.Essentials.Sessions.Server #region Cluster + private async Task<ReadOnlyJsonWebKey> GetCachePrivate() + { + using SecretResult secret = await this.TryGetSecretAsync("cache_private_key") ?? throw new KeyNotFoundException("Failed to load the cache private key"); + return secret.GetJsonWebKey(); + } + + private async Task<ReadOnlyJsonWebKey> GetBrokerPublic() + { + using SecretResult secret = await this.TryGetSecretAsync("broker_public_key") ?? throw new KeyNotFoundException("Failed to load the broker's public key"); + return secret.GetJsonWebKey(); + } + + /// <summary> /// Starts a self-contained process-long task to discover other cache servers /// from a shared broker server @@ -291,31 +310,27 @@ namespace VNLib.Plugins.Essentials.Sessions.Server private async Task RunClientAsync(ObjectCacheStore cacheStore, Uri brokerAddress, FBMClientConfig clientConf) { TimeSpan noServerDelay = TimeSpan.FromSeconds(10); - //Init signing algs - ReadOnlyMemory<byte> clientPrivKey = null; - ReadOnlyMemory<byte> brokerPubKey = null; string nodeId = Dns.GetHostName(); + ListServerRequest listRequest = new(brokerAddress); try { //Get the broker config element IReadOnlyDictionary<string, JsonElement> clusterConf = this.GetConfig("cluster"); int serverCheckMs = clusterConf["update_interval_sec"].GetInt32() * 1000; - //Get client priv key from secret store - string cpk = await this.TryGetSecretAsync("client_private_key") ?? throw new KeyNotFoundException("Failed to get the client private key from config"); - string bpub = await this.TryGetSecretAsync("broker_public_key") ?? throw new KeyNotFoundException("Failed to get the broker public key from config"); + //Setup signing and verification certificates + ReadOnlyJsonWebKey cacheSig = await GetCachePrivate(); + ReadOnlyJsonWebKey brokerPub = await GetBrokerPublic(); + //Import certificates + listRequest.WithVerificationKey(brokerPub) + .WithSigningKey(cacheSig); - //Load client private key - clientPrivKey = Convert.FromBase64String(cpk); - //Import broker public key - brokerPubKey = Convert.FromBase64String(bpub); - - //Concurrent dict to track remote servers - ConcurrentDictionary<string, ActiveServer> ActiveServers = new(); //Main event loop - Log.Information("Discovering available cluster nodes in broker"); + Log.Information("Begining cluster node discovery"); + + ILogProvider? debugLog = this.IsDebug() ? Log : null; - while (!UnloadToken.IsCancellationRequested) + while (true) { //Load the server list ActiveServer[]? servers; @@ -323,8 +338,10 @@ namespace VNLib.Plugins.Essentials.Sessions.Server { try { + debugLog?.Information("[CACHE] Requesting server list from broker"); + //Get server list - servers = await FBMDataCacheExtensions.ListServersAsync(brokerAddress, clientPrivKey, brokerPubKey, UnloadToken); + servers = await FBMDataCacheExtensions.ListServersAsync(listRequest, UnloadToken); //Servers are loaded, so continue break; } @@ -345,23 +362,37 @@ namespace VNLib.Plugins.Essentials.Sessions.Server //Delay await Task.Delay(randomMsDelay, UnloadToken); } - if(servers?.Length == 0) + + if(servers == null || servers.Length == 0) { Log.Information("No cluster nodes found, retrying"); //Delay await Task.Delay(noServerDelay, UnloadToken); continue; } + - //Select servers that are not the current server and are not already being monitored - IEnumerable<ActiveServer> serversToConnectTo = servers.Where(s => s.ServerId != nodeId) - .Where(s => !ActiveServers.ContainsKey(s.ServerId!)); - //Connect to servers - foreach (ActiveServer server in serversToConnectTo) + //Lock on sever set while enumerating + lock (ServerLock) { - _ = RunSyncTaskAsync(server, ActiveServers, cacheStore, clientConf, clientPrivKey, nodeId) - .ConfigureAwait(false); + //Select servers that are not the current server and are not already being monitored + IEnumerable<ActiveServer> serversToConnectTo = servers.Where(s => !nodeId.Equals(s.ServerId, StringComparison.OrdinalIgnoreCase)); + + //Connect to servers + foreach (ActiveServer server in serversToConnectTo) + { + //Make sure were not currently connected to the server + if (!ListeningServers.Contains(server)) + { + //Add the server to the set + ListeningServers.Add(server); + + //Run listener background task + _ = this.DeferTask(() => RunSyncTaskAsync(server, cacheStore, clientConf, nodeId)); + } + } } + //Delay until next check cycle await Task.Delay(serverCheckMs, UnloadToken); } @@ -383,19 +414,15 @@ namespace VNLib.Plugins.Essentials.Sessions.Server } finally { - Memory.UnsafeZeroMemory(clientPrivKey); - Memory.UnsafeZeroMemory(brokerPubKey); + listRequest.Dispose(); } Log.Debug("Cluster sync worker exited"); } - private async Task RunSyncTaskAsync(ActiveServer server, ConcurrentDictionary<string, ActiveServer> activeList, - ObjectCacheStore cacheStore, FBMClientConfig conf, ReadOnlyMemory<byte> privateKey, string nodeId) + private async Task RunSyncTaskAsync(ActiveServer server, ObjectCacheStore cacheStore, FBMClientConfig conf, string nodeId) { //Setup client FBMClient client = new(conf); - //Add server to active list, or replace its old value with the new one - activeList.AddOrUpdate(server.ServerId, server, (old, update) => server); try { async Task UpdateRecordAsync(string objectId, string newId) @@ -413,6 +440,7 @@ namespace VNLib.Plugins.Essentials.Sessions.Server using FBMResponse response = await client.SendAsync(modRequest, UnloadToken); response.ThrowIfNotSet(); + //Check response code string status = response.Headers.First(static s => s.Key == HeaderCommand.Status).Value.ToString(); if (ResponseCodes.Okay.Equals(status, StringComparison.Ordinal)) @@ -431,14 +459,21 @@ namespace VNLib.Plugins.Essentials.Sessions.Server client.ReturnRequest(modRequest); } } + + { + //Sign and verify requests with the cache private key since we are a peer + ReadOnlyJsonWebKey cachePriv = await GetCachePrivate(); + + //Configure cache + client.GetCacheConfiguration() + .WithVerificationKey(cachePriv) + .WithSigningCertificate(cachePriv) + .WithNodeId(nodeId) //set nodeid since were listening for changes + .WithTls(false); + } + + Log.Information("Connecting to {server}...", server.ServerId); - //Configure cache - client.GetCacheConfiguration() - .ImportVerificationKey(privateKey.Span) - .ImportVerificationKey(null) - .WithNodeId(nodeId) //set nodeid since were listening for changes - .WithTls(false); - //Connect to the server await client.ConnectToCacheAsync(server, UnloadToken); @@ -518,7 +553,7 @@ namespace VNLib.Plugins.Essentials.Sessions.Server finally { //Remove server from active list, since its been disconnected - _ = activeList.TryRemove(server.ServerId, out _); + RemoveServer(server); client.Dispose(); } } |