aboutsummaryrefslogtreecommitdiff
path: root/Plugins/SessionCacheServer
diff options
context:
space:
mode:
Diffstat (limited to 'Plugins/SessionCacheServer')
-rw-r--r--Plugins/SessionCacheServer/Endpoints/BrokerHeartBeat.cs38
-rw-r--r--Plugins/SessionCacheServer/Endpoints/ConnectEndpoint.cs136
-rw-r--r--Plugins/SessionCacheServer/ObjectCacheServer.csproj2
-rw-r--r--Plugins/SessionCacheServer/ObjectCacheServerEntry.cs137
4 files changed, 171 insertions, 142 deletions
diff --git a/Plugins/SessionCacheServer/Endpoints/BrokerHeartBeat.cs b/Plugins/SessionCacheServer/Endpoints/BrokerHeartBeat.cs
index bad0f7a..2e380a3 100644
--- a/Plugins/SessionCacheServer/Endpoints/BrokerHeartBeat.cs
+++ b/Plugins/SessionCacheServer/Endpoints/BrokerHeartBeat.cs
@@ -29,9 +29,7 @@ using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using System.Collections.Generic;
-using System.Security.Cryptography;
-using VNLib.Data.Caching.Extensions;
using VNLib.Hashing.IdentityUtility;
using VNLib.Plugins.Essentials.Endpoints;
using VNLib.Plugins.Essentials.Extensions;
@@ -48,7 +46,12 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints
private readonly Task<IPAddress[]> BrokerIpList;
private readonly PluginBase Pbase;
- protected override ProtectionSettings EndpointProtectionSettings { get; }
+ protected override ProtectionSettings EndpointProtectionSettings { get; } = new()
+ {
+ DisableBrowsersOnly = true,
+ DisableSessionsRequired = true,
+ DisableVerifySessionCors = true
+ };
public BrokerHeartBeat(Func<string> token, ManualResetEvent keepaliveSet, Uri brokerUri, PluginBase pbase)
{
@@ -57,20 +60,13 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints
BrokerIpList = Dns.GetHostAddressesAsync(brokerUri.DnsSafeHost);
this.Pbase = pbase;
-
- EndpointProtectionSettings = new()
- {
- BrowsersOnly = false,
- SessionsRequired = false,
- VerifySessionCors = false,
- };
}
- private async Task<byte[]> GetBrokerPubAsync()
+ private async Task<ReadOnlyJsonWebKey> GetBrokerPubAsync()
{
- string? brokerPubKey = await Pbase.TryGetSecretAsync("broker_public_key") ?? throw new KeyNotFoundException("Missing required secret : broker_public_key");
+ using SecretResult brokerPubKey = await Pbase.TryGetSecretAsync("broker_public_key") ?? throw new KeyNotFoundException("Missing required secret : broker_public_key");
- return Convert.FromBase64String(brokerPubKey);
+ return brokerPubKey.GetJsonWebKey();
}
protected override async ValueTask<VfReturnType> GetAsync(HttpEntity entity)
@@ -89,35 +85,36 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints
}
//Get the authorization jwt
string? jwtAuth = entity.Server.Headers[HttpRequestHeader.Authorization];
+
if (string.IsNullOrWhiteSpace(jwtAuth))
{
//Token invalid
entity.CloseResponse(HttpStatusCode.Forbidden);
return VfReturnType.VirtualSkip;
}
+
//Parse the jwt
using JsonWebToken jwt = JsonWebToken.Parse(jwtAuth);
- //Init signature alg
- using (ECDsa alg = ECDsa.Create(FBMDataCacheExtensions.CacheCurve))
- {
- //Get pub key
- byte[] key = await GetBrokerPubAsync();
- alg.ImportSubjectPublicKeyInfo(key, out _);
+ //Verify the jwt using the broker's public key certificate
+ using (ReadOnlyJsonWebKey cert = await GetBrokerPubAsync())
+ {
//Verify the jwt
- if (!jwt.Verify(alg, FBMDataCacheExtensions.CacheJwtAlgorithm))
+ if (!jwt.VerifyFromJwk(cert))
{
//Token invalid
entity.CloseResponse(HttpStatusCode.Forbidden);
return VfReturnType.VirtualSkip;
}
}
+
string? auth;
//Recover the auth token from the jwt
using (JsonDocument doc = jwt.GetPayload())
{
auth = doc.RootElement.GetProperty("token").GetString();
}
+
//Verify token
if(Token().Equals(auth, StringComparison.Ordinal))
{
@@ -126,6 +123,7 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints
entity.CloseResponse(HttpStatusCode.OK);
return VfReturnType.VirtualSkip;
}
+
//Token invalid
entity.CloseResponse(HttpStatusCode.Forbidden);
return VfReturnType.VirtualSkip;
diff --git a/Plugins/SessionCacheServer/Endpoints/ConnectEndpoint.cs b/Plugins/SessionCacheServer/Endpoints/ConnectEndpoint.cs
index fee1ea7..77acb13 100644
--- a/Plugins/SessionCacheServer/Endpoints/ConnectEndpoint.cs
+++ b/Plugins/SessionCacheServer/Endpoints/ConnectEndpoint.cs
@@ -24,25 +24,21 @@
using System;
using System.Net;
-using System.Text;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Channels;
using System.Collections.Generic;
-using System.Security.Cryptography;
using System.Collections.Concurrent;
using VNLib.Net.Http;
using VNLib.Hashing;
using VNLib.Utils.Async;
-using VNLib.Utils.Memory;
using VNLib.Utils.Logging;
using VNLib.Hashing.IdentityUtility;
using VNLib.Net.Messaging.FBM;
using VNLib.Net.Messaging.FBM.Client;
using VNLib.Net.Messaging.FBM.Server;
-using VNLib.Data.Caching.Extensions;
using VNLib.Data.Caching.ObjectCache;
using VNLib.Plugins.Extensions.Loading;
using VNLib.Plugins.Essentials.Endpoints;
@@ -77,9 +73,9 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints
//Loosen up protection settings
protected override ProtectionSettings EndpointProtectionSettings { get; } = new()
{
- BrowsersOnly = false,
- SessionsRequired = false,
- CrossSiteDenied = false
+ DisableBrowsersOnly = true,
+ DisableSessionsRequired = true,
+ DisableCrossSiteDenied = true
};
public ConnectEndpoint(string path, ObjectCacheStore store, PluginBase pbase)
@@ -121,20 +117,33 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints
string? nodeId = null;
string? challenge = null;
+ bool isPeer = false;
// Parse jwt
using (JsonWebToken jwt = JsonWebToken.Parse(jwtAuth))
{
- //Get the client public key
- byte[] clientPub = await GetClientPubAsync();
+ bool verified = false;
- //Init sig alg
- using ECDsa verAlg = ECDsa.Create(FBMDataCacheExtensions.CacheCurve);
- //Import client pub key
- verAlg.ImportSubjectPublicKeyInfo(clientPub, out _);
-
- //verify signature for client
- if (!jwt.Verify(verAlg, in FBMDataCacheExtensions.CacheJwtAlgorithm))
+ //Get the client public key certificate to verify the client's message
+ using(ReadOnlyJsonWebKey cert = await GetClientPubAsync())
+ {
+ //verify signature for client
+ if (jwt.VerifyFromJwk(cert))
+ {
+ verified = true;
+ }
+ //May be signed by a cahce server
+ else
+ {
+ using ReadOnlyJsonWebKey cacheCert = await GetCachePubAsync();
+
+ //Set peer and verified flag since the another cache server signed the request
+ isPeer = verified = jwt.VerifyFromJwk(cacheCert);
+ }
+ }
+
+ //Check flag
+ if (!verified)
{
Log.Information("Client signature verification failed");
entity.CloseResponse(HttpStatusCode.Unauthorized);
@@ -154,36 +163,28 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints
}
Log.Debug("Received negotiation request from node {node}", nodeId);
-
//Verified, now we can create an auth message with a short expiration
using JsonWebToken auth = new();
- auth.WriteHeader(FBMDataCacheExtensions.JwtMessageHeader.Span);
- auth.InitPayloadClaim()
- .AddClaim("aud", AudienceLocalServerId)
- .AddClaim("exp", DateTimeOffset.UtcNow.Add(AuthTokenExpiration).ToUnixTimeSeconds())
- .AddClaim("nonce", RandomHash.GetRandomHex(8))
- .AddClaim("chl", challenge)
- //Specify the server's node id if set
- .AddClaim("sub", nodeId)
- //Add negotiaion args
- .AddClaim(FBMClient.REQ_HEAD_BUF_QUERY_ARG, MAX_HEAD_BUF_SIZE)
- .AddClaim(FBMClient.REQ_RECV_BUF_QUERY_ARG, MAX_RECV_BUF_SIZE)
- .AddClaim(FBMClient.REQ_MAX_MESS_QUERY_ARG, MAX_MESSAGE_SIZE)
- .CommitClaims();
-
- //Sign the auth message
- ECDsa sigAlg = ECDsa.Create(FBMDataCacheExtensions.CacheCurve);
- byte[] cachePrivate = await GetCachePrivateKeyAsync();
- try
+ //Sign the auth message from the cache certificate's private key
+ using (ReadOnlyJsonWebKey cert = await GetCachePrivateKeyAsync())
{
- sigAlg.ImportPkcs8PrivateKey(cachePrivate, out _);
- //sign jwt
- auth.Sign(sigAlg, in FBMDataCacheExtensions.CacheJwtAlgorithm, 256);
- }
- finally
- {
- Memory.InitializeBlock(cachePrivate.AsSpan());
- sigAlg.Dispose();
+ auth.WriteHeader(cert.JwtHeader);
+ auth.InitPayloadClaim()
+ .AddClaim("aud", AudienceLocalServerId)
+ .AddClaim("exp", DateTimeOffset.UtcNow.Add(AuthTokenExpiration).ToUnixTimeSeconds())
+ .AddClaim("nonce", RandomHash.GetRandomBase32(8))
+ .AddClaim("chl", challenge!)
+ //Set the ispeer flag if the request was signed by a cache server
+ .AddClaim("isPeer", isPeer)
+ //Specify the server's node id if set
+ .AddClaim("sub", nodeId!)
+ //Add negotiaion args
+ .AddClaim(FBMClient.REQ_HEAD_BUF_QUERY_ARG, MAX_HEAD_BUF_SIZE)
+ .AddClaim(FBMClient.REQ_RECV_BUF_QUERY_ARG, MAX_RECV_BUF_SIZE)
+ .AddClaim(FBMClient.REQ_MAX_MESS_QUERY_ARG, MAX_MESSAGE_SIZE)
+ .CommitClaims();
+
+ auth.SignFromJwk(cert);
}
//Close response
@@ -191,27 +192,23 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints
return VfReturnType.VirtualSkip;
}
- private async Task<byte[]> GetClientPubAsync()
+ private async Task<ReadOnlyJsonWebKey> GetClientPubAsync()
{
- string? brokerPubKey = await Pbase.TryGetSecretAsync("client_public_key") ?? throw new KeyNotFoundException("Missing required secret : client_public_key");
-
- return Convert.FromBase64String(brokerPubKey);
+ using SecretResult brokerPubKey = await Pbase.TryGetSecretAsync("client_public_key") ?? throw new KeyNotFoundException("Missing required secret : client_public_key");
+
+ return brokerPubKey.GetJsonWebKey();
}
- private async Task<byte[]> GetCachePubAsync()
+ private async Task<ReadOnlyJsonWebKey> GetCachePubAsync()
{
- string? brokerPubKey = await Pbase.TryGetSecretAsync("cache_public_key") ?? throw new KeyNotFoundException("Missing required secret : client_public_key");
+ using SecretResult cachPublic = await Pbase.TryGetSecretAsync("cache_public_key") ?? throw new KeyNotFoundException("Missing required secret : client_public_key");
- return Convert.FromBase64String(brokerPubKey);
+ return cachPublic.GetJsonWebKey();
}
- private async Task<byte[]> GetCachePrivateKeyAsync()
+ private async Task<ReadOnlyJsonWebKey> GetCachePrivateKeyAsync()
{
- string? cachePrivate = await Pbase.TryGetSecretAsync("cache_private_key") ?? throw new KeyNotFoundException("Missing required secret : client_public_key");
-
- byte[] data = Convert.FromBase64String(cachePrivate);
+ using SecretResult cachePrivate = await Pbase.TryGetSecretAsync("cache_private_key") ?? throw new KeyNotFoundException("Missing required secret : client_public_key");
- Memory.UnsafeZeroMemory<char>(cachePrivate);
-
- return data;
+ return cachePrivate.GetJsonWebKey();
}
private async Task ChangeWorkerAsync(CancellationToken cancellation)
@@ -265,16 +262,11 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints
//Parse jwt
using (JsonWebToken jwt = JsonWebToken.Parse(jwtAuth))
{
- //Init sig alg, we will verify that the token was signed by this server
- using (ECDsa sigAlg = ECDsa.Create(FBMDataCacheExtensions.CacheCurve))
+ //Get the client public key certificate to verify the client's message
+ using (ReadOnlyJsonWebKey cert = await GetCachePubAsync())
{
- //Get the cache public key
- byte[] cachePub = await GetCachePubAsync();
-
- sigAlg.ImportSubjectPublicKeyInfo(cachePub, out _);
-
- //verify signature against the cache public key, since this server have signed it
- if (!jwt.Verify(sigAlg, in FBMDataCacheExtensions.CacheJwtAlgorithm))
+ //verify signature against the cache public key, since this server must have signed it
+ if (!jwt.VerifyFromJwk(cert))
{
entity.CloseResponse(HttpStatusCode.Unauthorized);
return VfReturnType.VirtualSkip;
@@ -286,8 +278,7 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints
//Verify audience, expiration
- if (!doc.RootElement.TryGetProperty("aud", out JsonElement audEl)
- || AudienceLocalServerId.Equals(audEl.GetString(), StringComparison.OrdinalIgnoreCase))
+ if (!doc.RootElement.TryGetProperty("aud", out JsonElement audEl) || !AudienceLocalServerId.Equals(audEl.GetString(), StringComparison.OrdinalIgnoreCase))
{
entity.CloseResponse(HttpStatusCode.Unauthorized);
return VfReturnType.VirtualSkip;
@@ -300,8 +291,11 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints
return VfReturnType.VirtualSkip;
}
- //The node id is optional and stored in the 'sub' field
- if (doc.RootElement.TryGetProperty("sub", out JsonElement servIdEl))
+ //Check if the client is a peer
+ bool isPeer = doc.RootElement.TryGetProperty("isPeer", out JsonElement isPeerEl) && isPeerEl.GetBoolean();
+
+ //The node id is optional and stored in the 'sub' field, ignore if the client is not a peer
+ if (isPeer && doc.RootElement.TryGetProperty("sub", out JsonElement servIdEl))
{
nodeId = servIdEl.GetString();
}
@@ -342,6 +336,7 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints
//Get the queue
nodeQueue = StatefulEventQueue[nodeId];
}
+
//Init new ws state object and clamp the suggested buffer sizes
WsUserState state = new()
{
@@ -363,6 +358,7 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints
return VfReturnType.BadRequest;
}
}
+
private async Task WebsocketAcceptedAsync(WebSocketSession wss)
{
//Inc connected count
diff --git a/Plugins/SessionCacheServer/ObjectCacheServer.csproj b/Plugins/SessionCacheServer/ObjectCacheServer.csproj
index 01b33f3..2cf298d 100644
--- a/Plugins/SessionCacheServer/ObjectCacheServer.csproj
+++ b/Plugins/SessionCacheServer/ObjectCacheServer.csproj
@@ -37,7 +37,7 @@
</PackageReference>
</ItemGroup>
<ItemGroup>
- <ProjectReference Include="..\..\..\..\VNLib\Essentials\VNLib.Plugins.Essentials.csproj" />
+ <ProjectReference Include="..\..\..\..\VNLib\Essentials\src\VNLib.Plugins.Essentials.csproj" />
<ProjectReference Include="..\..\..\DataCaching\VNLib.Data.Caching.Extensions\VNLib.Data.Caching.Extensions.csproj" />
<ProjectReference Include="..\..\..\DataCaching\VNLib.Data.Caching.ObjectCache\VNLib.Data.Caching.ObjectCache.csproj" />
<ProjectReference Include="..\..\..\PluginBase\VNLib.Plugins.PluginBase.csproj" />
diff --git a/Plugins/SessionCacheServer/ObjectCacheServerEntry.cs b/Plugins/SessionCacheServer/ObjectCacheServerEntry.cs
index 17d8ba5..20a6268 100644
--- a/Plugins/SessionCacheServer/ObjectCacheServerEntry.cs
+++ b/Plugins/SessionCacheServer/ObjectCacheServerEntry.cs
@@ -33,12 +33,12 @@ using System.Net.Sockets;
using System.Threading.Tasks;
using System.Collections.Generic;
using System.Security.Cryptography;
-using System.Collections.Concurrent;
using VNLib.Utils.Memory;
using VNLib.Utils.Logging;
using VNLib.Utils.Extensions;
using VNLib.Hashing;
+using VNLib.Hashing.IdentityUtility;
using VNLib.Data.Caching;
using VNLib.Data.Caching.Extensions;
using VNLib.Data.Caching.ObjectCache;
@@ -49,7 +49,7 @@ using VNLib.Plugins.Cache.Broker.Endpoints;
using VNLib.Plugins.Extensions.Loading;
using VNLib.Plugins.Extensions.Loading.Routing;
using VNLib.Plugins.Essentials.Sessions.Server.Endpoints;
-using VNLib.Utils.Memory.Caching;
+
namespace VNLib.Plugins.Essentials.Sessions.Server
{
@@ -58,7 +58,19 @@ namespace VNLib.Plugins.Essentials.Sessions.Server
public override string PluginName => "ObjectCache.Service";
private string? BrokerHeartBeatToken;
-
+
+ private readonly object ServerLock = new();
+ private readonly HashSet<ActiveServer> ListeningServers = new();
+
+
+ private void RemoveServer(ActiveServer server)
+ {
+ lock (ServerLock)
+ {
+ ListeningServers.Remove(server);
+ }
+ }
+
protected override void OnLoad()
{
//Create default heap
@@ -153,8 +165,7 @@ namespace VNLib.Plugins.Essentials.Sessions.Server
try
{
//Get the broker config element
- IReadOnlyDictionary<string, JsonElement> clusterConfig = this.GetConfig("cluster");
-
+ IReadOnlyDictionary<string, JsonElement> clusterConfig = this.GetConfig("cluster");
//Server id is just dns name for now
string serverId = Dns.GetHostName();
@@ -179,11 +190,9 @@ namespace VNLib.Plugins.Essentials.Sessions.Server
using BrokerRegistrationRequest request = new();
{
string addr = clusterConfig["broker_address"].GetString() ?? throw new KeyNotFoundException("Missing required key 'broker_address' for config 'cluster'");
-
- //Try to get the cache private key
- string base64Priv = await this.TryGetSecretAsync("cache_private_key") ?? throw new KeyNotFoundException("Failed to load the cache private key");
- ReadOnlyMemory<byte> privKey = Convert.FromBase64String(base64Priv);
+ //Recover the certificate
+ ReadOnlyJsonWebKey cacheCert = await GetCachePrivate();
//Init url builder for payload, see if tls is enabled
Uri connectAddress = new UriBuilder(usingTls ? Uri.UriSchemeHttps : Uri.UriSchemeHttp, Dns.GetHostName(), port, connectPath).Uri;
@@ -191,10 +200,7 @@ namespace VNLib.Plugins.Essentials.Sessions.Server
request.WithBroker(new(addr))
.WithRegistrationAddress(connectAddress.ToString())
.WithNodeId(serverId)
- .WithPrivateKey(privKey.Span);
- //Wipe memory
- Memory.UnsafeZeroMemory<char>(base64Priv);
- Memory.UnsafeZeroMemory(privKey);
+ .WithSigningKey(cacheCert, true);
}
while (true)
@@ -279,6 +285,19 @@ namespace VNLib.Plugins.Essentials.Sessions.Server
#region Cluster
+ private async Task<ReadOnlyJsonWebKey> GetCachePrivate()
+ {
+ using SecretResult secret = await this.TryGetSecretAsync("cache_private_key") ?? throw new KeyNotFoundException("Failed to load the cache private key");
+ return secret.GetJsonWebKey();
+ }
+
+ private async Task<ReadOnlyJsonWebKey> GetBrokerPublic()
+ {
+ using SecretResult secret = await this.TryGetSecretAsync("broker_public_key") ?? throw new KeyNotFoundException("Failed to load the broker's public key");
+ return secret.GetJsonWebKey();
+ }
+
+
/// <summary>
/// Starts a self-contained process-long task to discover other cache servers
/// from a shared broker server
@@ -291,31 +310,27 @@ namespace VNLib.Plugins.Essentials.Sessions.Server
private async Task RunClientAsync(ObjectCacheStore cacheStore, Uri brokerAddress, FBMClientConfig clientConf)
{
TimeSpan noServerDelay = TimeSpan.FromSeconds(10);
- //Init signing algs
- ReadOnlyMemory<byte> clientPrivKey = null;
- ReadOnlyMemory<byte> brokerPubKey = null;
string nodeId = Dns.GetHostName();
+ ListServerRequest listRequest = new(brokerAddress);
try
{
//Get the broker config element
IReadOnlyDictionary<string, JsonElement> clusterConf = this.GetConfig("cluster");
int serverCheckMs = clusterConf["update_interval_sec"].GetInt32() * 1000;
- //Get client priv key from secret store
- string cpk = await this.TryGetSecretAsync("client_private_key") ?? throw new KeyNotFoundException("Failed to get the client private key from config");
- string bpub = await this.TryGetSecretAsync("broker_public_key") ?? throw new KeyNotFoundException("Failed to get the broker public key from config");
+ //Setup signing and verification certificates
+ ReadOnlyJsonWebKey cacheSig = await GetCachePrivate();
+ ReadOnlyJsonWebKey brokerPub = await GetBrokerPublic();
+ //Import certificates
+ listRequest.WithVerificationKey(brokerPub)
+ .WithSigningKey(cacheSig);
- //Load client private key
- clientPrivKey = Convert.FromBase64String(cpk);
- //Import broker public key
- brokerPubKey = Convert.FromBase64String(bpub);
-
- //Concurrent dict to track remote servers
- ConcurrentDictionary<string, ActiveServer> ActiveServers = new();
//Main event loop
- Log.Information("Discovering available cluster nodes in broker");
+ Log.Information("Begining cluster node discovery");
+
+ ILogProvider? debugLog = this.IsDebug() ? Log : null;
- while (!UnloadToken.IsCancellationRequested)
+ while (true)
{
//Load the server list
ActiveServer[]? servers;
@@ -323,8 +338,10 @@ namespace VNLib.Plugins.Essentials.Sessions.Server
{
try
{
+ debugLog?.Information("[CACHE] Requesting server list from broker");
+
//Get server list
- servers = await FBMDataCacheExtensions.ListServersAsync(brokerAddress, clientPrivKey, brokerPubKey, UnloadToken);
+ servers = await FBMDataCacheExtensions.ListServersAsync(listRequest, UnloadToken);
//Servers are loaded, so continue
break;
}
@@ -345,23 +362,37 @@ namespace VNLib.Plugins.Essentials.Sessions.Server
//Delay
await Task.Delay(randomMsDelay, UnloadToken);
}
- if(servers?.Length == 0)
+
+ if(servers == null || servers.Length == 0)
{
Log.Information("No cluster nodes found, retrying");
//Delay
await Task.Delay(noServerDelay, UnloadToken);
continue;
}
+
- //Select servers that are not the current server and are not already being monitored
- IEnumerable<ActiveServer> serversToConnectTo = servers.Where(s => s.ServerId != nodeId)
- .Where(s => !ActiveServers.ContainsKey(s.ServerId!));
- //Connect to servers
- foreach (ActiveServer server in serversToConnectTo)
+ //Lock on sever set while enumerating
+ lock (ServerLock)
{
- _ = RunSyncTaskAsync(server, ActiveServers, cacheStore, clientConf, clientPrivKey, nodeId)
- .ConfigureAwait(false);
+ //Select servers that are not the current server and are not already being monitored
+ IEnumerable<ActiveServer> serversToConnectTo = servers.Where(s => !nodeId.Equals(s.ServerId, StringComparison.OrdinalIgnoreCase));
+
+ //Connect to servers
+ foreach (ActiveServer server in serversToConnectTo)
+ {
+ //Make sure were not currently connected to the server
+ if (!ListeningServers.Contains(server))
+ {
+ //Add the server to the set
+ ListeningServers.Add(server);
+
+ //Run listener background task
+ _ = this.DeferTask(() => RunSyncTaskAsync(server, cacheStore, clientConf, nodeId));
+ }
+ }
}
+
//Delay until next check cycle
await Task.Delay(serverCheckMs, UnloadToken);
}
@@ -383,19 +414,15 @@ namespace VNLib.Plugins.Essentials.Sessions.Server
}
finally
{
- Memory.UnsafeZeroMemory(clientPrivKey);
- Memory.UnsafeZeroMemory(brokerPubKey);
+ listRequest.Dispose();
}
Log.Debug("Cluster sync worker exited");
}
- private async Task RunSyncTaskAsync(ActiveServer server, ConcurrentDictionary<string, ActiveServer> activeList,
- ObjectCacheStore cacheStore, FBMClientConfig conf, ReadOnlyMemory<byte> privateKey, string nodeId)
+ private async Task RunSyncTaskAsync(ActiveServer server, ObjectCacheStore cacheStore, FBMClientConfig conf, string nodeId)
{
//Setup client
FBMClient client = new(conf);
- //Add server to active list, or replace its old value with the new one
- activeList.AddOrUpdate(server.ServerId, server, (old, update) => server);
try
{
async Task UpdateRecordAsync(string objectId, string newId)
@@ -413,6 +440,7 @@ namespace VNLib.Plugins.Essentials.Sessions.Server
using FBMResponse response = await client.SendAsync(modRequest, UnloadToken);
response.ThrowIfNotSet();
+
//Check response code
string status = response.Headers.First(static s => s.Key == HeaderCommand.Status).Value.ToString();
if (ResponseCodes.Okay.Equals(status, StringComparison.Ordinal))
@@ -431,14 +459,21 @@ namespace VNLib.Plugins.Essentials.Sessions.Server
client.ReturnRequest(modRequest);
}
}
+
+ {
+ //Sign and verify requests with the cache private key since we are a peer
+ ReadOnlyJsonWebKey cachePriv = await GetCachePrivate();
+
+ //Configure cache
+ client.GetCacheConfiguration()
+ .WithVerificationKey(cachePriv)
+ .WithSigningCertificate(cachePriv)
+ .WithNodeId(nodeId) //set nodeid since were listening for changes
+ .WithTls(false);
+ }
+
+ Log.Information("Connecting to {server}...", server.ServerId);
- //Configure cache
- client.GetCacheConfiguration()
- .ImportVerificationKey(privateKey.Span)
- .ImportVerificationKey(null)
- .WithNodeId(nodeId) //set nodeid since were listening for changes
- .WithTls(false);
-
//Connect to the server
await client.ConnectToCacheAsync(server, UnloadToken);
@@ -518,7 +553,7 @@ namespace VNLib.Plugins.Essentials.Sessions.Server
finally
{
//Remove server from active list, since its been disconnected
- _ = activeList.TryRemove(server.ServerId, out _);
+ RemoveServer(server);
client.Dispose();
}
}