aboutsummaryrefslogtreecommitdiff
path: root/plugins/ObjectCacheServer/src/Endpoints
diff options
context:
space:
mode:
authorLibravatar vnugent <public@vaughnnugent.com>2023-11-02 01:50:05 -0400
committerLibravatar vnugent <public@vaughnnugent.com>2023-11-02 01:50:05 -0400
commitd2d812213b99ee17f9433f81871b694c4053ff23 (patch)
tree11a1106602112c134e65bf197ef701d1b8d63b67 /plugins/ObjectCacheServer/src/Endpoints
parent483c014b938e2d55ea7c89b67f6d19ba2c2d5b5e (diff)
also carried away
Diffstat (limited to 'plugins/ObjectCacheServer/src/Endpoints')
-rw-r--r--plugins/ObjectCacheServer/src/Endpoints/CacheNegotationManager.cs207
-rw-r--r--plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs206
2 files changed, 247 insertions, 166 deletions
diff --git a/plugins/ObjectCacheServer/src/Endpoints/CacheNegotationManager.cs b/plugins/ObjectCacheServer/src/Endpoints/CacheNegotationManager.cs
new file mode 100644
index 0000000..d1591f8
--- /dev/null
+++ b/plugins/ObjectCacheServer/src/Endpoints/CacheNegotationManager.cs
@@ -0,0 +1,207 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: ObjectCacheServer
+* File: CacheNegotationManager.cs
+*
+* CacheNegotationManager.cs is part of ObjectCacheServer which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* ObjectCacheServer is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* ObjectCacheServer is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+using System.Net;
+using System.Text.Json;
+
+using VNLib.Hashing;
+using VNLib.Hashing.IdentityUtility;
+using VNLib.Net.Messaging.FBM.Client;
+using VNLib.Plugins;
+using VNLib.Plugins.Extensions.Loading;
+using VNLib.Data.Caching.Extensions;
+using VNLib.Data.Caching.ObjectCache.Server.Cache;
+
+namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
+{
+ internal class ClientNegotiationState
+ {
+ public string? Challenge { get; set; }
+
+ public string? NodeId { get; set; }
+
+ public bool IsPeer { get; set; }
+ }
+
+ internal sealed class CacheNegotationManager
+ {
+ /*
+ * Cache keys are centralized and may be shared between all cache server nodes. This means
+ * that any client would be able to get a signed negotiation from any server and use it to
+ * upgrade a connection to any other server. This is property is to be avoided because servers
+ * may have different configurations that a malicious client could exploit. To prevent that
+ * a unique server generated Audience ID is used in the negotiation token and verified when
+ * an upgrade is requested. This ensures that the client is connecting to the same server
+ * that issued the negotiation token.
+ *
+ * With this operational theory, someone has to expose their buffer configuration. At the moment
+ * I think it would be best for servers to keep their buffer configuration private, as it could
+ * cause more damage to the network. This is not really a protection measure because a malicious
+ * client could use trial and error to find the servers buffer configuration.
+ */
+
+ private static readonly TimeSpan AuthTokenExpiration = TimeSpan.FromSeconds(30);
+
+ private readonly string AudienceLocalServerId;
+ private readonly NodeConfig _nodeConfig;
+ private readonly CacheConfiguration _cacheConfig;
+
+ public CacheNegotationManager(PluginBase plugin)
+ {
+ //Get node configuration
+ _nodeConfig = plugin.GetOrCreateSingleton<NodeConfig>();
+
+ //Get the cache store configuration
+ _cacheConfig = plugin.GetConfigForType<CacheStore>().Deserialze<CacheConfiguration>();
+
+ AudienceLocalServerId = Guid.NewGuid().ToString("N");
+ }
+
+
+ public bool IsClientNegotiationValid(string authToken, out ClientNegotiationState state)
+ {
+ state = new();
+
+ // Parse jwt
+ using JsonWebToken jwt = JsonWebToken.Parse(authToken);
+
+ //verify signature for client
+ if (_nodeConfig.KeyStore.VerifyJwt(jwt, false))
+ {
+ //Validated as normal client
+ }
+ //May be signed by a cache server
+ else if (_nodeConfig.KeyStore.VerifyJwt(jwt, true))
+ {
+ //Set peer and verified flag since the another cache server signed the request
+ state.IsPeer = true;
+ }
+ else
+ {
+ return false;
+ }
+
+ //Recover json body
+ using JsonDocument doc = jwt.GetPayload();
+
+ if (doc.RootElement.TryGetProperty("sub", out JsonElement servIdEl))
+ {
+ state.NodeId = servIdEl.GetString();
+ }
+
+ //Challenge is required
+ state.Challenge = doc.RootElement.GetProperty("chl").GetString()!;
+
+ return true;
+ }
+
+ public JsonWebToken ConfirmCLientNegotiation(ClientNegotiationState state, IPAddress clientIp, DateTimeOffset now)
+ {
+ //Verified, now we can create an auth message with a short expiration
+ JsonWebToken auth = new();
+
+ auth.WriteHeader(_nodeConfig.KeyStore.GetJwtHeader());
+ auth.InitPayloadClaim()
+ .AddClaim("aud", AudienceLocalServerId)
+ .AddClaim("iat", now.ToUnixTimeSeconds())
+ .AddClaim("exp", now.Add(AuthTokenExpiration).ToUnixTimeSeconds())
+ .AddClaim("nonce", RandomHash.GetRandomBase32(8))
+ .AddClaim("chl", state.Challenge!)
+ //Set the ispeer flag if the request was signed by a cache server
+ .AddClaim("isPeer", state.IsPeer)
+ //Specify the server's node id if set
+ .AddClaim("sub", state.NodeId)
+ //Set ip address
+ .AddClaim("ip", clientIp.ToString())
+ //Add negotiaion args
+ .AddClaim(FBMClient.REQ_HEAD_BUF_QUERY_ARG, _cacheConfig.MaxHeaderBufferSize)
+ .AddClaim(FBMClient.REQ_RECV_BUF_QUERY_ARG, _cacheConfig.MaxRecvBufferSize)
+ .AddClaim(FBMClient.REQ_MAX_MESS_QUERY_ARG, _cacheConfig.MaxMessageSize)
+ .CommitClaims();
+
+ //Sign the auth message from our private key
+ _nodeConfig.KeyStore.SignJwt(auth);
+
+ return auth;
+ }
+
+ public bool ValidateUpgrade(string upgradeToken, string tokenSignature, DateTimeOffset now, IPAddress connectionIp, ref string? nodeId, ref bool isPeer)
+ {
+ //Parse jwt
+ using JsonWebToken jwt = JsonWebToken.Parse(upgradeToken);
+
+ //verify signature against the cache public key, since this server must have signed it
+ if (!_nodeConfig.KeyStore.VerifyCachePeer(jwt))
+ {
+ return false;
+ }
+
+ //Recover json body
+ using JsonDocument doc = jwt.GetPayload();
+
+ //Verify audience, expiration
+ if (!doc.RootElement.TryGetProperty("aud", out JsonElement audEl)
+ || !string.Equals(AudienceLocalServerId, audEl.GetString(), StringComparison.OrdinalIgnoreCase))
+ {
+ return false;
+ }
+
+ if (!doc.RootElement.TryGetProperty("exp", out JsonElement expEl)
+ || DateTimeOffset.FromUnixTimeSeconds(expEl.GetInt64()) < now)
+ {
+ return false;
+ }
+
+ //Check node ip address matches if required
+ if (_nodeConfig.VerifyIp)
+ {
+ if (!doc.RootElement.TryGetProperty("ip", out JsonElement ipEl))
+ {
+ return false;
+ }
+
+ string? clientIp = ipEl.GetString();
+
+ //Verify the client ip address matches the one in the token
+ if (clientIp == null || !IPAddress.TryParse(clientIp, out IPAddress? clientIpAddr) || !clientIpAddr.Equals(connectionIp))
+ {
+ return false;
+ }
+ }
+
+ //Check if the client is a peer
+ isPeer = doc.RootElement.TryGetProperty("isPeer", out JsonElement isPeerEl) && isPeerEl.GetBoolean();
+
+ //The node id is optional and stored in the 'sub' field, ignore if the client is not a peer
+ if (isPeer && doc.RootElement.TryGetProperty("sub", out JsonElement servIdEl))
+ {
+ nodeId = servIdEl.GetString();
+ }
+
+ //Verify token signature against a fellow cache public key
+ return _nodeConfig.KeyStore.VerifyUpgradeToken(tokenSignature, upgradeToken, isPeer);
+ }
+ }
+}
diff --git a/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs b/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs
index d07c59e..816e6c3 100644
--- a/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs
+++ b/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs
@@ -24,12 +24,10 @@
using System;
using System.Net;
-using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using System.Collections.Generic;
-using VNLib.Hashing;
using VNLib.Net.Http;
using VNLib.Utils.Memory;
using VNLib.Utils.Logging;
@@ -49,22 +47,20 @@ using VNLib.Data.Caching.Extensions.Clustering;
using VNLib.Data.Caching.ObjectCache.Server.Cache;
using VNLib.Data.Caching.ObjectCache.Server.Clustering;
+
namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
{
internal sealed class ConnectEndpoint : ResourceEndpointBase
{
- private const string LOG_SCOPE_NAME = "CONEP";
-
- private static readonly TimeSpan AuthTokenExpiration = TimeSpan.FromSeconds(30);
-
+ internal const string LOG_SCOPE_NAME = "CONEP";
- private readonly NodeConfig NodeConfiguration;
+
private readonly ICacheEventQueueManager PubSubManager;
private readonly IPeerMonitor Peers;
- private readonly BlobCacheListener Store;
-
- private readonly string AudienceLocalServerId;
+ private readonly BlobCacheListener<IPeerEventQueue> Store;
+ private readonly NodeConfig NodeConfiguration;
+ private readonly CacheNegotationManager AuthManager;
private uint _connectedClients;
@@ -105,11 +101,8 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
//Get the cache store configuration
CacheConfig = plugin.GetConfigForType<CacheStore>().Deserialze<CacheConfiguration>();
- /*
- * Generate a random guid for the current server when created so we
- * know client tokens belong to us when singed by the same key
- */
- AudienceLocalServerId = Guid.NewGuid().ToString("N");
+ //Get the auth manager
+ AuthManager = plugin.GetOrCreateSingleton<CacheNegotationManager>();
}
@@ -136,52 +129,19 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
string? jwtAuth = entity.Server.Headers[HttpRequestHeader.Authorization];
if (string.IsNullOrWhiteSpace(jwtAuth))
{
- entity.CloseResponse(HttpStatusCode.Unauthorized);
- return VfReturnType.VirtualSkip;
+ return VirtualClose(entity, HttpStatusCode.Forbidden);
}
- string? nodeId = null;
- string? challenge = null;
- bool isPeer = false;
-
- // Parse jwt
- using (JsonWebToken jwt = JsonWebToken.Parse(jwtAuth))
+ //Create negotiation state
+ if(!AuthManager.IsClientNegotiationValid(jwtAuth, out ClientNegotiationState state))
{
- //verify signature for client
- if (NodeConfiguration.KeyStore.VerifyJwt(jwt, false))
- {
- //Validated
- }
- //May be signed by a cache server
- else if(NodeConfiguration.KeyStore.VerifyJwt(jwt, true))
- {
- //Set peer and verified flag since the another cache server signed the request
- isPeer = true;
- }
- else
- {
- Log.Information("Client signature verification failed");
- entity.CloseResponse(HttpStatusCode.Unauthorized);
- return VfReturnType.VirtualSkip;
- }
-
- //Recover json body
- using JsonDocument doc = jwt.GetPayload();
-
- if (doc.RootElement.TryGetProperty("sub", out JsonElement servIdEl))
- {
- nodeId = servIdEl.GetString();
- }
-
- if (doc.RootElement.TryGetProperty("chl", out JsonElement challengeEl))
- {
- challenge = challengeEl.GetString();
- }
+ Log.Information("Initial negotiation client signature verification failed");
+ return VirtualClose(entity, HttpStatusCode.Unauthorized);
}
- if (isPeer)
+ if (state.IsPeer)
{
- Log.Debug("Received negotiation request from peer node {node}", nodeId);
+ Log.Debug("Received negotiation request from peer node {node}", state.NodeId);
}
else
{
@@ -189,29 +149,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
}
//Verified, now we can create an auth message with a short expiration
- using JsonWebToken auth = new();
-
- auth.WriteHeader(NodeConfiguration.KeyStore.GetJwtHeader());
- auth.InitPayloadClaim()
- .AddClaim("aud", AudienceLocalServerId)
- .AddClaim("iat", entity.RequestedTimeUtc.ToUnixTimeSeconds())
- .AddClaim("exp", entity.RequestedTimeUtc.Add(AuthTokenExpiration).ToUnixTimeSeconds())
- .AddClaim("nonce", RandomHash.GetRandomBase32(8))
- .AddClaim("chl", challenge!)
- //Set the ispeer flag if the request was signed by a cache server
- .AddClaim("isPeer", isPeer)
- //Specify the server's node id if set
- .AddClaim("sub", nodeId!)
- //Set ip address
- .AddClaim("ip", entity.TrustedRemoteIp.ToString())
- //Add negotiaion args
- .AddClaim(FBMClient.REQ_HEAD_BUF_QUERY_ARG, CacheConfig.MaxHeaderBufferSize)
- .AddClaim(FBMClient.REQ_RECV_BUF_QUERY_ARG, CacheConfig.MaxRecvBufferSize)
- .AddClaim(FBMClient.REQ_MAX_MESS_QUERY_ARG, CacheConfig.MaxMessageSize)
- .CommitClaims();
-
- //Sign the auth message from our private key
- NodeConfiguration.KeyStore.SignJwt(auth);
+ using JsonWebToken auth = AuthManager.ConfirmCLientNegotiation(state, entity.TrustedRemoteIp, entity.RequestedTimeUtc);
//Close response
entity.CloseResponse(HttpStatusCode.OK, ContentType.Text, auth.DataBuffer);
@@ -222,101 +160,35 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
{
//Parse jwt from authorization
string? jwtAuth = entity.Server.Headers[HttpRequestHeader.Authorization];
-
- if (string.IsNullOrWhiteSpace(jwtAuth))
- {
- entity.CloseResponse(HttpStatusCode.Unauthorized);
- return VfReturnType.VirtualSkip;
- }
-
- //Get the upgrade signature header
string? clientSignature = entity.Server.Headers[FBMDataCacheExtensions.X_UPGRADE_SIG_HEADER];
+ string? optionalDiscovery = entity.Server.Headers[FBMDataCacheExtensions.X_NODE_DISCOVERY_HEADER];
- if (string.IsNullOrWhiteSpace(clientSignature))
+ //Not null
+ if (string.IsNullOrWhiteSpace(jwtAuth) || string.IsNullOrWhiteSpace(clientSignature))
{
- entity.CloseResponse(HttpStatusCode.Unauthorized);
- return VfReturnType.VirtualSkip;
+ return VfReturnType.Forbidden;
}
string? nodeId = null;
- CacheNodeAdvertisment? discoveryAd = null;
+ bool isPeer = false;
- //Parse jwt
- using (JsonWebToken jwt = JsonWebToken.Parse(jwtAuth))
+ //Validate upgrade request
+ if (!AuthManager.ValidateUpgrade(jwtAuth, clientSignature, entity.RequestedTimeUtc, entity.TrustedRemoteIp, ref nodeId, ref isPeer))
{
- //verify signature against the cache public key, since this server must have signed it
- if (!NodeConfiguration.KeyStore.VerifyCachePeer(jwt))
- {
- entity.CloseResponse(HttpStatusCode.Unauthorized);
- return VfReturnType.VirtualSkip;
- }
-
- //Recover json body
- using JsonDocument doc = jwt.GetPayload();
-
- //Verify audience, expiration
-
- if (!doc.RootElement.TryGetProperty("aud", out JsonElement audEl)
- || !AudienceLocalServerId.Equals(audEl.GetString(), StringComparison.OrdinalIgnoreCase))
- {
- entity.CloseResponse(HttpStatusCode.Unauthorized);
- return VfReturnType.VirtualSkip;
- }
-
- if (!doc.RootElement.TryGetProperty("exp", out JsonElement expEl)
- || DateTimeOffset.FromUnixTimeSeconds(expEl.GetInt64()) < entity.RequestedTimeUtc)
- {
- entity.CloseResponse(HttpStatusCode.Unauthorized);
- return VfReturnType.VirtualSkip;
- }
-
- //Check node ip address matches if required
- if (NodeConfiguration.VerifyIp)
- {
- if (!doc.RootElement.TryGetProperty("ip", out JsonElement ipEl))
- {
- entity.CloseResponse(HttpStatusCode.Unauthorized);
- return VfReturnType.VirtualSkip;
- }
-
- string? clientIp = ipEl.GetString();
- //Verify the client ip address matches the one in the token
- if (clientIp == null || !IPAddress.TryParse(clientIp, out IPAddress? clientIpAddr) || !clientIpAddr.Equals(entity.TrustedRemoteIp))
- {
- entity.CloseResponse(HttpStatusCode.Unauthorized);
- return VfReturnType.VirtualSkip;
- }
- }
-
- //Check if the client is a peer
- bool isPeer = doc.RootElement.TryGetProperty("isPeer", out JsonElement isPeerEl) && isPeerEl.GetBoolean();
-
- //The node id is optional and stored in the 'sub' field, ignore if the client is not a peer
- if (isPeer && doc.RootElement.TryGetProperty("sub", out JsonElement servIdEl))
- {
- nodeId = servIdEl.GetString();
- }
-
- //Verify the signature the client included of the auth token
+ return VirtualClose(entity, HttpStatusCode.Unauthorized);
+ }
- //Verify token signature against a fellow cache public key
- if (!NodeConfiguration.KeyStore.VerifyUpgradeToken(clientSignature, jwtAuth, isPeer))
- {
- entity.CloseResponse(HttpStatusCode.Unauthorized);
- return VfReturnType.VirtualSkip;
- }
+ CacheNodeAdvertisment? discoveryAd = null;
- if (isPeer)
- {
- //Try to get the node advertisement header
- string? discoveryHeader = entity.Server.Headers[FBMDataCacheExtensions.X_NODE_DISCOVERY_HEADER];
+ /*
+ * If the client is a peer server, it may offer a signed advertisment
+ * that this node will have the duty of making available to other peers
+ * if it is valid
+ */
- //Verify the node advertisement header and publish it
- if (!string.IsNullOrWhiteSpace(discoveryHeader))
- {
- discoveryAd = NodeConfiguration.KeyStore.VerifyPeerAdvertisment(discoveryHeader);
- }
- }
+ if (isPeer && !string.IsNullOrWhiteSpace(optionalDiscovery))
+ {
+ discoveryAd = NodeConfiguration.KeyStore.VerifyPeerAdvertisment(optionalDiscovery);
}
WsUserState state;
@@ -369,7 +241,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
Log.Debug("Client buffer state {state}", state);
//Accept socket and pass state object
- entity.AcceptWebSocket(WebsocketAcceptedAsync, state);
+ _ = entity.AcceptWebSocket(WebsocketAcceptedAsync, state);
return VfReturnType.VirtualSkip;
}
@@ -400,7 +272,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
};
//Check if the client is a peer node, if it is, subscribe to change events
- if (!string.IsNullOrWhiteSpace(state.NodeId))
+ if (state.IsPeer)
{
//Get the event queue for the current node
IPeerEventQueue queue = PubSubManager.Subscribe(state);
@@ -408,7 +280,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
try
{
//Begin listening for messages with a queue
- await Store.ListenAsync(wss, args, queue);
+ await Store.ListenAsync(wss, queue, args);
}
finally
{
@@ -419,7 +291,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
else
{
//Begin listening for messages without a queue
- await Store.ListenAsync(wss, args, null);
+ await Store.ListenAsync(wss, null!, args);
}
}
catch (OperationCanceledException)
@@ -456,6 +328,8 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
public string? NodeId { get; init; }
public CacheNodeAdvertisment? Advertisment { get; init; }
+ public bool IsPeer => !string.IsNullOrWhiteSpace(NodeId);
+
public override string ToString()
{
return