aboutsummaryrefslogtreecommitdiff
path: root/plugins/ObjectCacheServer/src/Endpoints
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/ObjectCacheServer/src/Endpoints')
-rw-r--r--plugins/ObjectCacheServer/src/Endpoints/CacheNegotationManager.cs49
-rw-r--r--plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs91
-rw-r--r--plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs60
-rw-r--r--plugins/ObjectCacheServer/src/Endpoints/WellKnownEndpoint.cs4
4 files changed, 102 insertions, 102 deletions
diff --git a/plugins/ObjectCacheServer/src/Endpoints/CacheNegotationManager.cs b/plugins/ObjectCacheServer/src/Endpoints/CacheNegotationManager.cs
index d1591f8..48f4448 100644
--- a/plugins/ObjectCacheServer/src/Endpoints/CacheNegotationManager.cs
+++ b/plugins/ObjectCacheServer/src/Endpoints/CacheNegotationManager.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2023 Vaughn Nugent
+* Copyright (c) 2024 Vaughn Nugent
*
* Library: VNLib
* Package: ObjectCacheServer
@@ -45,7 +45,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
public bool IsPeer { get; set; }
}
- internal sealed class CacheNegotationManager
+ internal sealed class CacheNegotationManager(PluginBase plugin)
{
/*
* Cache keys are centralized and may be shared between all cache server nodes. This means
@@ -64,21 +64,13 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
private static readonly TimeSpan AuthTokenExpiration = TimeSpan.FromSeconds(30);
- private readonly string AudienceLocalServerId;
- private readonly NodeConfig _nodeConfig;
- private readonly CacheConfiguration _cacheConfig;
+ private readonly string AudienceLocalServerId = Guid.NewGuid().ToString("N");
- public CacheNegotationManager(PluginBase plugin)
- {
- //Get node configuration
- _nodeConfig = plugin.GetOrCreateSingleton<NodeConfig>();
+ private readonly ObjectCacheSystemState _sysState = plugin.GetOrCreateSingleton<ObjectCacheSystemState>();
- //Get the cache store configuration
- _cacheConfig = plugin.GetConfigForType<CacheStore>().Deserialze<CacheConfiguration>();
+ private NodeConfig NodeConfig => _sysState.Configuration;
- AudienceLocalServerId = Guid.NewGuid().ToString("N");
- }
-
+ private CacheMemoryConfiguration CacheConfig => _sysState.MemoryConfiguration;
public bool IsClientNegotiationValid(string authToken, out ClientNegotiationState state)
{
@@ -88,12 +80,12 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
using JsonWebToken jwt = JsonWebToken.Parse(authToken);
//verify signature for client
- if (_nodeConfig.KeyStore.VerifyJwt(jwt, false))
+ if (NodeConfig.KeyStore.VerifyJwt(jwt, false))
{
//Validated as normal client
}
//May be signed by a cache server
- else if (_nodeConfig.KeyStore.VerifyJwt(jwt, true))
+ else if (NodeConfig.KeyStore.VerifyJwt(jwt, true))
{
//Set peer and verified flag since the another cache server signed the request
state.IsPeer = true;
@@ -117,12 +109,12 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
return true;
}
- public JsonWebToken ConfirmCLientNegotiation(ClientNegotiationState state, IPAddress clientIp, DateTimeOffset now)
+ public JsonWebToken ConfirmClientNegotiation(ClientNegotiationState state, IPAddress clientIp, DateTimeOffset now)
{
//Verified, now we can create an auth message with a short expiration
JsonWebToken auth = new();
- auth.WriteHeader(_nodeConfig.KeyStore.GetJwtHeader());
+ auth.WriteHeader(NodeConfig.KeyStore.GetJwtHeader());
auth.InitPayloadClaim()
.AddClaim("aud", AudienceLocalServerId)
.AddClaim("iat", now.ToUnixTimeSeconds())
@@ -136,24 +128,29 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
//Set ip address
.AddClaim("ip", clientIp.ToString())
//Add negotiaion args
- .AddClaim(FBMClient.REQ_HEAD_BUF_QUERY_ARG, _cacheConfig.MaxHeaderBufferSize)
- .AddClaim(FBMClient.REQ_RECV_BUF_QUERY_ARG, _cacheConfig.MaxRecvBufferSize)
- .AddClaim(FBMClient.REQ_MAX_MESS_QUERY_ARG, _cacheConfig.MaxMessageSize)
+ .AddClaim(FBMClient.REQ_HEAD_BUF_QUERY_ARG, CacheConfig.MaxHeaderBufferSize)
+ .AddClaim(FBMClient.REQ_RECV_BUF_QUERY_ARG, CacheConfig.MaxRecvBufferSize)
+ .AddClaim(FBMClient.REQ_MAX_MESS_QUERY_ARG, CacheConfig.MaxMessageSize)
.CommitClaims();
//Sign the auth message from our private key
- _nodeConfig.KeyStore.SignJwt(auth);
+ NodeConfig.KeyStore.SignJwt(auth);
return auth;
}
- public bool ValidateUpgrade(string upgradeToken, string tokenSignature, DateTimeOffset now, IPAddress connectionIp, ref string? nodeId, ref bool isPeer)
+ public bool ValidateUpgrade(string? upgradeToken, string? tokenSignature, DateTimeOffset now, IPAddress connectionIp, ref string? nodeId, ref bool isPeer)
{
+ if(string.IsNullOrWhiteSpace(upgradeToken) || string.IsNullOrWhiteSpace(tokenSignature))
+ {
+ return false;
+ }
+
//Parse jwt
using JsonWebToken jwt = JsonWebToken.Parse(upgradeToken);
//verify signature against the cache public key, since this server must have signed it
- if (!_nodeConfig.KeyStore.VerifyCachePeer(jwt))
+ if (!NodeConfig.KeyStore.VerifyCachePeer(jwt))
{
return false;
}
@@ -175,7 +172,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
}
//Check node ip address matches if required
- if (_nodeConfig.VerifyIp)
+ if (NodeConfig.VerifyIp)
{
if (!doc.RootElement.TryGetProperty("ip", out JsonElement ipEl))
{
@@ -201,7 +198,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
}
//Verify token signature against a fellow cache public key
- return _nodeConfig.KeyStore.VerifyUpgradeToken(tokenSignature, upgradeToken, isPeer);
+ return NodeConfig.KeyStore.VerifyUpgradeToken(tokenSignature, upgradeToken, isPeer);
}
}
}
diff --git a/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs b/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs
index 816e6c3..d6b733c 100644
--- a/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs
+++ b/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2023 Vaughn Nugent
+* Copyright (c) 2024 Vaughn Nugent
*
* Library: VNLib
* Package: ObjectCacheServer
@@ -55,11 +55,14 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
{
internal const string LOG_SCOPE_NAME = "CONEP";
-
- private readonly ICacheEventQueueManager PubSubManager;
- private readonly IPeerMonitor Peers;
- private readonly BlobCacheListener<IPeerEventQueue> Store;
- private readonly NodeConfig NodeConfiguration;
+
+ private readonly ObjectCacheSystemState _sysState;
+
+ private PeerEventQueueManager PubSubManager => _sysState.PeerEventQueue;
+ private CachePeerMonitor Peers => _sysState.PeerMonitor;
+ private BlobCacheListener<IPeerEventQueue> Listener => _sysState.Listener;
+ private NodeConfig NodeConfiguration => _sysState.Configuration;
+
private readonly CacheNegotationManager AuthManager;
private uint _connectedClients;
@@ -72,7 +75,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
/// <summary>
/// The cache store configuration
/// </summary>
- public CacheConfiguration CacheConfig { get; }
+ public CacheMemoryConfiguration CacheConfig => _sysState.MemoryConfiguration;
//Loosen up protection settings
///<inheritdoc/>
@@ -83,24 +86,11 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
public ConnectEndpoint(PluginBase plugin)
{
- //Get node configuration
- NodeConfiguration = plugin.GetOrCreateSingleton<NodeConfig>();
+ _sysState = plugin.GetOrCreateSingleton<ObjectCacheSystemState>();
//Init from config and create a new log scope
InitPathAndLog(NodeConfiguration.ConnectPath, plugin.Log.CreateScope(LOG_SCOPE_NAME));
-
- //Setup pub/sub manager
- PubSubManager = plugin.GetOrCreateSingleton<CacheEventQueueManager>();
-
- //Get peer monitor
- Peers = plugin.GetOrCreateSingleton<CachePeerMonitor>();
-
- //Init the cache store
- Store = plugin.GetOrCreateSingleton<CacheStore>().Listener;
-
- //Get the cache store configuration
- CacheConfig = plugin.GetConfigForType<CacheStore>().Deserialze<CacheConfiguration>();
-
+
//Get the auth manager
AuthManager = plugin.GetOrCreateSingleton<CacheNegotationManager>();
}
@@ -127,6 +117,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
{
//Parse jwt from authoriation
string? jwtAuth = entity.Server.Headers[HttpRequestHeader.Authorization];
+
if (string.IsNullOrWhiteSpace(jwtAuth))
{
return VirtualClose(entity, HttpStatusCode.Forbidden);
@@ -149,26 +140,34 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
}
//Verified, now we can create an auth message with a short expiration
- using JsonWebToken auth = AuthManager.ConfirmCLientNegotiation(state, entity.TrustedRemoteIp, entity.RequestedTimeUtc);
+ using JsonWebToken auth = AuthManager.ConfirmClientNegotiation(state, entity.TrustedRemoteIp, entity.RequestedTimeUtc);
- //Close response
+ //Close response by sending a copy of the signed token
entity.CloseResponse(HttpStatusCode.OK, ContentType.Text, auth.DataBuffer);
return VfReturnType.VirtualSkip;
}
protected override VfReturnType WebsocketRequested(HttpEntity entity)
{
+ /*
+ * Check to see if any more connections are allowed,
+ * otherwise deny the connection
+ *
+ * This is done here to prevent the server from being overloaded
+ * on a new connection. It would be ideal to not grant new tokens
+ * but malicious clients could cache a bunch of tokens and use them
+ * later, exhausting resources.
+ */
+ if(_connectedClients >= NodeConfiguration.MaxConcurrentConnections)
+ {
+ return VirtualClose(entity, HttpStatusCode.ServiceUnavailable);
+ }
+
//Parse jwt from authorization
string? jwtAuth = entity.Server.Headers[HttpRequestHeader.Authorization];
string? clientSignature = entity.Server.Headers[FBMDataCacheExtensions.X_UPGRADE_SIG_HEADER];
string? optionalDiscovery = entity.Server.Headers[FBMDataCacheExtensions.X_NODE_DISCOVERY_HEADER];
- //Not null
- if (string.IsNullOrWhiteSpace(jwtAuth) || string.IsNullOrWhiteSpace(clientSignature))
- {
- return VfReturnType.Forbidden;
- }
-
string? nodeId = null;
bool isPeer = false;
@@ -178,15 +177,15 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
return VirtualClose(entity, HttpStatusCode.Unauthorized);
}
- CacheNodeAdvertisment? discoveryAd = null;
-
/*
* If the client is a peer server, it may offer a signed advertisment
* that this node will have the duty of making available to other peers
* if it is valid
*/
- if (isPeer && !string.IsNullOrWhiteSpace(optionalDiscovery))
+ CacheNodeAdvertisment? discoveryAd = null;
+
+ if (isPeer)
{
discoveryAd = NodeConfiguration.KeyStore.VerifyPeerAdvertisment(optionalDiscovery);
}
@@ -196,11 +195,10 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
try
{
//Get query config suggestions from the client
- string recvBufCmd = entity.QueryArgs[FBMClient.REQ_RECV_BUF_QUERY_ARG];
- string maxHeaderCharCmd = entity.QueryArgs[FBMClient.REQ_HEAD_BUF_QUERY_ARG];
- string maxMessageSizeCmd = entity.QueryArgs[FBMClient.REQ_MAX_MESS_QUERY_ARG];
+ string? recvBufCmd = entity.QueryArgs.GetValueOrDefault(FBMClient.REQ_RECV_BUF_QUERY_ARG);
+ string? maxHeaderCharCmd = entity.QueryArgs.GetValueOrDefault(FBMClient.REQ_HEAD_BUF_QUERY_ARG);
+ string? maxMessageSizeCmd = entity.QueryArgs.GetValueOrDefault(FBMClient.REQ_MAX_MESS_QUERY_ARG);
- //Parse recv buffer size
int recvBufSize = int.TryParse(recvBufCmd, out int rbs) ? rbs : CacheConfig.MinRecvBufferSize;
int maxHeadBufSize = int.TryParse(maxHeaderCharCmd, out int hbs) ? hbs : CacheConfig.MinHeaderBufferSize;
int maxMessageSize = int.TryParse(maxMessageSizeCmd, out int mxs) ? mxs : CacheConfig.MaxMessageSize;
@@ -253,9 +251,8 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
Peers.OnPeerConnected(state);
//Register plugin exit token to cancel the connected socket
- CancellationTokenRegistration reg = this.GetPlugin().UnloadToken.Register(wss.CancelAll);
-
- //Inc connected count
+ await using CancellationTokenRegistration reg = this.GetPlugin().UnloadToken.Register(wss.CancelAll);
+
Interlocked.Increment(ref _connectedClients);
try
@@ -280,7 +277,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
try
{
//Begin listening for messages with a queue
- await Store.ListenAsync(wss, queue, args);
+ await Listener.ListenAsync(wss, queue, args);
}
finally
{
@@ -291,7 +288,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
else
{
//Begin listening for messages without a queue
- await Store.ListenAsync(wss, null!, args);
+ await Listener.ListenAsync(wss, null!, args);
}
}
catch (OperationCanceledException)
@@ -303,15 +300,13 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
}
catch (Exception ex)
{
- Log.Debug(ex);
+ //If debug logging is enabled print a more detailed error message
+ Log.Error("An error occured on websocket connection: node {con} -> {error}", state.NodeId, ex.Message);
+ Log.Debug("Websocket connection error: node {con}\n{error}", state.NodeId, ex);
}
-
- //Dec connected count
+
Interlocked.Decrement(ref _connectedClients);
- //Unregister the token
- reg.Unregister();
-
//Notify monitor of disconnect
Peers.OnPeerDisconnected(state);
diff --git a/plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs b/plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs
index 7d376b8..56fe8cd 100644
--- a/plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs
+++ b/plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2023 Vaughn Nugent
+* Copyright (c) 2024 Vaughn Nugent
*
* Library: VNLib
* Package: ObjectCacheServer
@@ -40,25 +40,29 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
{
internal sealed class PeerDiscoveryEndpoint : ResourceEndpointBase
{
- private readonly IPeerMonitor PeerMonitor;
- private readonly NodeConfig Config;
+ private readonly ObjectCacheSystemState SysState;
+
+ private CacheAuthKeyStore KeyStore => SysState.Configuration.KeyStore;
+
+ private CachePeerMonitor PeerMonitor => SysState.PeerMonitor;
+
+ private CacheNodeConfiguration NodeConfig => SysState.Configuration.Config;
- //Loosen up protection settings
///<inheritdoc/>
protected override ProtectionSettings EndpointProtectionSettings { get; } = new()
{
- DisableSessionsRequired = true
+ /*
+ * Sessions will not be used or required for this endpoint.
+ * We should also assume the session system is not even loaded
+ */
+ DisableSessionsRequired = true
};
public PeerDiscoveryEndpoint(PluginBase plugin)
{
- //Get the peer monitor
- PeerMonitor = plugin.GetOrCreateSingleton<CachePeerMonitor>();
-
- //Get the node config
- Config = plugin.GetOrCreateSingleton<NodeConfig>();
+ SysState = plugin.GetOrCreateSingleton<ObjectCacheSystemState>();
- InitPathAndLog(Config.DiscoveryPath, plugin.Log);
+ InitPathAndLog(SysState.Configuration.DiscoveryPath!, plugin.Log);
}
protected override VfReturnType Get(HttpEntity entity)
@@ -68,36 +72,41 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
if(string.IsNullOrWhiteSpace(authToken))
{
- entity.CloseResponse(HttpStatusCode.Unauthorized);
- return VfReturnType.VirtualSkip;
+ return VirtualClose(entity, HttpStatusCode.Unauthorized);
}
string subject = string.Empty;
string challenge = string.Empty;
- //Parse auth token
- using(JsonWebToken jwt = JsonWebToken.Parse(authToken))
+ try
{
+ //Parse auth token
+ using JsonWebToken jwt = JsonWebToken.Parse(authToken);
+
//try to verify against cache node first
- if (!Config.KeyStore.VerifyJwt(jwt, true))
+ if (!KeyStore.VerifyJwt(jwt, true))
{
//failed...
//try to verify against client key
- if (!Config.KeyStore.VerifyJwt(jwt, false))
+ if (!KeyStore.VerifyJwt(jwt, false))
{
//invalid token
- entity.CloseResponse(HttpStatusCode.Unauthorized);
- return VfReturnType.VirtualSkip;
+ return VirtualClose(entity, HttpStatusCode.Unauthorized);
}
}
using JsonDocument payload = jwt.GetPayload();
//Get client info to pass back
- subject = payload.RootElement.TryGetProperty("sub", out JsonElement subEl) ? subEl.GetString() ?? string.Empty : string.Empty;
+ subject = payload.RootElement.TryGetProperty("sub", out JsonElement subEl) ? subEl.GetString() ?? string.Empty : string.Empty;
challenge = payload.RootElement.GetProperty("chl").GetString() ?? string.Empty;
}
+ catch (FormatException)
+ {
+ //If tokens are invalid format, let the client know instead of a server error
+ return VfReturnType.BadRequest;
+ }
//Valid key, get peer list to send to client
CacheNodeAdvertisment[] peers = PeerMonitor.GetAllPeers()
@@ -109,10 +118,10 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
using JsonWebToken response = new();
//set header from cache config
- response.WriteHeader(Config.KeyStore.GetJwtHeader());
+ response.WriteHeader(KeyStore.GetJwtHeader());
response.InitPayloadClaim()
- .AddClaim("iss", Config.Config.NodeId)
+ .AddClaim("iss", NodeConfig.NodeId)
//Audience is the requestor id
.AddClaim("sub", subject)
.AddClaim("iat", entity.RequestedTimeUtc.ToUnixTimeSeconds())
@@ -122,10 +131,9 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
.AddClaim("chl", challenge)
.CommitClaims();
- //Sign the response
- Config.KeyStore.SignJwt(response);
-
- //Send response to client
+
+ KeyStore.SignJwt(response);
+
entity.CloseResponse(HttpStatusCode.OK, Net.Http.ContentType.Text, response.DataBuffer);
return VfReturnType.VirtualSkip;
}
diff --git a/plugins/ObjectCacheServer/src/Endpoints/WellKnownEndpoint.cs b/plugins/ObjectCacheServer/src/Endpoints/WellKnownEndpoint.cs
index 87a471b..04380c5 100644
--- a/plugins/ObjectCacheServer/src/Endpoints/WellKnownEndpoint.cs
+++ b/plugins/ObjectCacheServer/src/Endpoints/WellKnownEndpoint.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2023 Vaughn Nugent
+* Copyright (c) 2024 Vaughn Nugent
*
* Library: VNLib
* Package: ObjectCacheServer
@@ -59,7 +59,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
public WellKnownEndpoint(PluginBase plugin)
{
//Get the node config
- NodeConfig nodeConfig = plugin.GetOrCreateSingleton<NodeConfig>();
+ NodeConfig nodeConfig = plugin.GetOrCreateSingleton<ObjectCacheSystemState>().Configuration;
//serialize the config, discovery may not be enabled
_advertisment = nodeConfig.Config.Advertisment;