aboutsummaryrefslogtreecommitdiff
path: root/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs
diff options
context:
space:
mode:
authorLibravatar vnugent <public@vaughnnugent.com>2023-07-13 13:20:25 -0400
committerLibravatar vnugent <public@vaughnnugent.com>2023-07-13 13:20:25 -0400
commit2f674e79d42e7d36225fa9ac7ecefbc5bc62d325 (patch)
treec58999489f5391bc044e7a9bb3e557afe2860415 /plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs
parent1a8ab1457244d15b19ddcc94958f645f5ec2abc7 (diff)
Checkpoint, kind of working clustering
Diffstat (limited to 'plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs')
-rw-r--r--plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs71
1 files changed, 30 insertions, 41 deletions
diff --git a/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs b/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs
index 8352635..5e794f8 100644
--- a/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs
+++ b/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs
@@ -46,7 +46,7 @@ using VNLib.Plugins.Essentials.Endpoints;
using VNLib.Plugins.Essentials.Extensions;
using VNLib.Plugins.Extensions.Loading.Routing;
using VNLib.Data.Caching.ObjectCache.Server.Distribution;
-
+using VNLib.Data.Caching.Extensions.Clustering;
namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
{
@@ -54,6 +54,8 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
[ConfigurationName("connect_endpoint")]
internal sealed class ConnectEndpoint : ResourceEndpointBase
{
+ private const string LOG_SCOPE_NAME = "CONEP";
+
private static readonly TimeSpan AuthTokenExpiration = TimeSpan.FromSeconds(30);
@@ -90,7 +92,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
{
string? path = config["path"].GetString();
- InitPathAndLog(path, plugin.Log);
+ InitPathAndLog(path, plugin.Log.CreateScope(LOG_SCOPE_NAME));
//Check for ip-verification flag
VerifyIp = config.TryGetValue("verify_ip", out JsonElement vIpEl) && vIpEl.GetBoolean();
@@ -152,28 +154,24 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
// Parse jwt
using (JsonWebToken jwt = JsonWebToken.Parse(jwtAuth))
{
- bool verified = false;
-
//verify signature for client
- if (NodeConfiguration.KeyStore.VerifyJwt(jwt))
+ if (NodeConfiguration.KeyStore.VerifyJwt(jwt, false))
{
- verified = true;
+ //Validated
}
//May be signed by a cache server
- else
+ else if(NodeConfiguration.KeyStore.VerifyJwt(jwt, true))
{
//Set peer and verified flag since the another cache server signed the request
- isPeer = verified = NodeConfiguration.KeyStore.VerifyCachePeer(jwt);
+ isPeer = true;
}
-
- //Check flag
- if (!verified)
+ else
{
Log.Information("Client signature verification failed");
entity.CloseResponse(HttpStatusCode.Unauthorized);
return VfReturnType.VirtualSkip;
}
-
+
//Recover json body
using JsonDocument doc = jwt.GetPayload();
@@ -196,6 +194,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
auth.WriteHeader(NodeConfiguration.KeyStore.GetJwtHeader());
auth.InitPayloadClaim()
.AddClaim("aud", AudienceLocalServerId)
+ .AddClaim("iat", entity.RequestedTimeUtc.ToUnixTimeSeconds())
.AddClaim("exp", entity.RequestedTimeUtc.Add(AuthTokenExpiration).ToUnixTimeSeconds())
.AddClaim("nonce", RandomHash.GetRandomBase32(8))
.AddClaim("chl", challenge!)
@@ -240,7 +239,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
}
string? nodeId = null;
- ICacheNodeAdvertisment? discoveryAd = null;
+ CacheNodeAdvertisment? discoveryAd = null;
//Parse jwt
using (JsonWebToken jwt = JsonWebToken.Parse(jwtAuth))
@@ -299,15 +298,15 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
//Verify the signature the client included of the auth token
- if (isPeer)
+ //Verify token signature against a fellow cache public key
+ if (!NodeConfiguration.KeyStore.VerifyUpgradeToken(clientSignature, jwtAuth, isPeer))
{
- //Verify token signature against a fellow cache public key
- if (!NodeConfiguration.KeyStore.VerifyUpgradeToken(clientSignature, jwtAuth))
- {
- entity.CloseResponse(HttpStatusCode.Unauthorized);
- return VfReturnType.VirtualSkip;
- }
+ entity.CloseResponse(HttpStatusCode.Unauthorized);
+ return VfReturnType.VirtualSkip;
+ }
+ if (isPeer)
+ {
//Try to get the node advertisement header
string? discoveryHeader = entity.Server.Headers[FBMDataCacheExtensions.X_NODE_DISCOVERY_HEADER];
@@ -317,15 +316,6 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
discoveryAd = NodeConfiguration.KeyStore.VerifyPeerAdvertisment(discoveryHeader);
}
}
- else
- {
- //Not a peer, so verify against the client's public key
- if (!NodeConfiguration.KeyStore.VerifyUpgradeToken(clientSignature, jwtAuth))
- {
- entity.CloseResponse(HttpStatusCode.Unauthorized);
- return VfReturnType.VirtualSkip;
- }
- }
}
try
@@ -389,12 +379,12 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
//Notify peers of new connection
Peers.OnPeerConnected(state);
- //Inc connected count
- Interlocked.Increment(ref _connectedClients);
-
//Register plugin exit token to cancel the connected socket
CancellationTokenRegistration reg = this.GetPlugin().UnloadToken.Register(wss.CancelAll);
+ //Inc connected count
+ Interlocked.Increment(ref _connectedClients);
+
try
{
//Init listener args from request
@@ -442,14 +432,12 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
{
Log.Debug(ex);
}
- finally
- {
- //Dec connected count
- Interlocked.Decrement(ref _connectedClients);
- //Unregister the
- reg.Unregister();
- }
+ //Dec connected count
+ Interlocked.Decrement(ref _connectedClients);
+
+ //Unregister the token
+ reg.Unregister();
//Notify monitor of disconnect
Peers.OnPeerDisconnected(state);
@@ -465,12 +453,13 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
public int MaxMessageSize { get; init; }
public int MaxResponseBufferSize { get; init; }
public string? NodeId { get; init; }
- public ICacheNodeAdvertisment? Advertisment { get; init; }
+ public CacheNodeAdvertisment? Advertisment { get; init; }
public override string ToString()
{
return
- $"{nameof(RecvBufferSize)}:{RecvBufferSize}, {nameof(MaxHeaderBufferSize)}: {MaxHeaderBufferSize}, {nameof(MaxMessageSize)}:{MaxMessageSize}, {nameof(MaxResponseBufferSize)}:{MaxResponseBufferSize}";
+ $"{nameof(RecvBufferSize)}:{RecvBufferSize}, {nameof(MaxHeaderBufferSize)}: {MaxHeaderBufferSize}, " +
+ $"{nameof(MaxMessageSize)}:{MaxMessageSize}, {nameof(MaxResponseBufferSize)}:{MaxResponseBufferSize}";
}
}
}