diff options
author | vnugent <public@vaughnnugent.com> | 2023-07-13 13:20:25 -0400 |
---|---|---|
committer | vnugent <public@vaughnnugent.com> | 2023-07-13 13:20:25 -0400 |
commit | 2f674e79d42e7d36225fa9ac7ecefbc5bc62d325 (patch) | |
tree | c58999489f5391bc044e7a9bb3e557afe2860415 /plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs | |
parent | 1a8ab1457244d15b19ddcc94958f645f5ec2abc7 (diff) |
Checkpoint, kind of working clustering
Diffstat (limited to 'plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs')
-rw-r--r-- | plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs | 71 |
1 files changed, 30 insertions, 41 deletions
diff --git a/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs b/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs index 8352635..5e794f8 100644 --- a/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs +++ b/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs @@ -46,7 +46,7 @@ using VNLib.Plugins.Essentials.Endpoints; using VNLib.Plugins.Essentials.Extensions; using VNLib.Plugins.Extensions.Loading.Routing; using VNLib.Data.Caching.ObjectCache.Server.Distribution; - +using VNLib.Data.Caching.Extensions.Clustering; namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints { @@ -54,6 +54,8 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints [ConfigurationName("connect_endpoint")] internal sealed class ConnectEndpoint : ResourceEndpointBase { + private const string LOG_SCOPE_NAME = "CONEP"; + private static readonly TimeSpan AuthTokenExpiration = TimeSpan.FromSeconds(30); @@ -90,7 +92,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints { string? path = config["path"].GetString(); - InitPathAndLog(path, plugin.Log); + InitPathAndLog(path, plugin.Log.CreateScope(LOG_SCOPE_NAME)); //Check for ip-verification flag VerifyIp = config.TryGetValue("verify_ip", out JsonElement vIpEl) && vIpEl.GetBoolean(); @@ -152,28 +154,24 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints // Parse jwt using (JsonWebToken jwt = JsonWebToken.Parse(jwtAuth)) { - bool verified = false; - //verify signature for client - if (NodeConfiguration.KeyStore.VerifyJwt(jwt)) + if (NodeConfiguration.KeyStore.VerifyJwt(jwt, false)) { - verified = true; + //Validated } //May be signed by a cache server - else + else if(NodeConfiguration.KeyStore.VerifyJwt(jwt, true)) { //Set peer and verified flag since the another cache server signed the request - isPeer = verified = NodeConfiguration.KeyStore.VerifyCachePeer(jwt); + isPeer = true; } - - //Check flag - if (!verified) + else { Log.Information("Client signature verification failed"); entity.CloseResponse(HttpStatusCode.Unauthorized); return VfReturnType.VirtualSkip; } - + //Recover json body using JsonDocument doc = jwt.GetPayload(); @@ -196,6 +194,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints auth.WriteHeader(NodeConfiguration.KeyStore.GetJwtHeader()); auth.InitPayloadClaim() .AddClaim("aud", AudienceLocalServerId) + .AddClaim("iat", entity.RequestedTimeUtc.ToUnixTimeSeconds()) .AddClaim("exp", entity.RequestedTimeUtc.Add(AuthTokenExpiration).ToUnixTimeSeconds()) .AddClaim("nonce", RandomHash.GetRandomBase32(8)) .AddClaim("chl", challenge!) @@ -240,7 +239,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints } string? nodeId = null; - ICacheNodeAdvertisment? discoveryAd = null; + CacheNodeAdvertisment? discoveryAd = null; //Parse jwt using (JsonWebToken jwt = JsonWebToken.Parse(jwtAuth)) @@ -299,15 +298,15 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints //Verify the signature the client included of the auth token - if (isPeer) + //Verify token signature against a fellow cache public key + if (!NodeConfiguration.KeyStore.VerifyUpgradeToken(clientSignature, jwtAuth, isPeer)) { - //Verify token signature against a fellow cache public key - if (!NodeConfiguration.KeyStore.VerifyUpgradeToken(clientSignature, jwtAuth)) - { - entity.CloseResponse(HttpStatusCode.Unauthorized); - return VfReturnType.VirtualSkip; - } + entity.CloseResponse(HttpStatusCode.Unauthorized); + return VfReturnType.VirtualSkip; + } + if (isPeer) + { //Try to get the node advertisement header string? discoveryHeader = entity.Server.Headers[FBMDataCacheExtensions.X_NODE_DISCOVERY_HEADER]; @@ -317,15 +316,6 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints discoveryAd = NodeConfiguration.KeyStore.VerifyPeerAdvertisment(discoveryHeader); } } - else - { - //Not a peer, so verify against the client's public key - if (!NodeConfiguration.KeyStore.VerifyUpgradeToken(clientSignature, jwtAuth)) - { - entity.CloseResponse(HttpStatusCode.Unauthorized); - return VfReturnType.VirtualSkip; - } - } } try @@ -389,12 +379,12 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints //Notify peers of new connection Peers.OnPeerConnected(state); - //Inc connected count - Interlocked.Increment(ref _connectedClients); - //Register plugin exit token to cancel the connected socket CancellationTokenRegistration reg = this.GetPlugin().UnloadToken.Register(wss.CancelAll); + //Inc connected count + Interlocked.Increment(ref _connectedClients); + try { //Init listener args from request @@ -442,14 +432,12 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints { Log.Debug(ex); } - finally - { - //Dec connected count - Interlocked.Decrement(ref _connectedClients); - //Unregister the - reg.Unregister(); - } + //Dec connected count + Interlocked.Decrement(ref _connectedClients); + + //Unregister the token + reg.Unregister(); //Notify monitor of disconnect Peers.OnPeerDisconnected(state); @@ -465,12 +453,13 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints public int MaxMessageSize { get; init; } public int MaxResponseBufferSize { get; init; } public string? NodeId { get; init; } - public ICacheNodeAdvertisment? Advertisment { get; init; } + public CacheNodeAdvertisment? Advertisment { get; init; } public override string ToString() { return - $"{nameof(RecvBufferSize)}:{RecvBufferSize}, {nameof(MaxHeaderBufferSize)}: {MaxHeaderBufferSize}, {nameof(MaxMessageSize)}:{MaxMessageSize}, {nameof(MaxResponseBufferSize)}:{MaxResponseBufferSize}"; + $"{nameof(RecvBufferSize)}:{RecvBufferSize}, {nameof(MaxHeaderBufferSize)}: {MaxHeaderBufferSize}, " + + $"{nameof(MaxMessageSize)}:{MaxMessageSize}, {nameof(MaxResponseBufferSize)}:{MaxResponseBufferSize}"; } } } |