aboutsummaryrefslogtreecommitdiff
path: root/plugins/ObjectCacheServer/src
diff options
context:
space:
mode:
authorLibravatar vnugent <public@vaughnnugent.com>2023-11-02 01:50:05 -0400
committerLibravatar vnugent <public@vaughnnugent.com>2023-11-02 01:50:05 -0400
commitd2d812213b99ee17f9433f81871b694c4053ff23 (patch)
tree11a1106602112c134e65bf197ef701d1b8d63b67 /plugins/ObjectCacheServer/src
parent483c014b938e2d55ea7c89b67f6d19ba2c2d5b5e (diff)
also carried away
Diffstat (limited to 'plugins/ObjectCacheServer/src')
-rw-r--r--plugins/ObjectCacheServer/src/Cache/CacheListenerPubQueue.cs12
-rw-r--r--plugins/ObjectCacheServer/src/Cache/CacheStore.cs13
-rw-r--r--plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs18
-rw-r--r--plugins/ObjectCacheServer/src/Endpoints/CacheNegotationManager.cs207
-rw-r--r--plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs206
-rw-r--r--plugins/ObjectCacheServer/src/ICacheStore.cs2
6 files changed, 278 insertions, 180 deletions
diff --git a/plugins/ObjectCacheServer/src/Cache/CacheListenerPubQueue.cs b/plugins/ObjectCacheServer/src/Cache/CacheListenerPubQueue.cs
index ba39db6..6942828 100644
--- a/plugins/ObjectCacheServer/src/Cache/CacheListenerPubQueue.cs
+++ b/plugins/ObjectCacheServer/src/Cache/CacheListenerPubQueue.cs
@@ -43,7 +43,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache
* their individual queues.
*/
- internal sealed class CacheListenerPubQueue : ICacheListenerEventQueue, IAsyncBackgroundWork
+ internal sealed class CacheListenerPubQueue : ICacheListenerEventQueue<IPeerEventQueue>, IAsyncBackgroundWork
{
private const int MAX_LOCAL_QUEUE_ITEMS = 10000;
private const string LOG_SCOPE_NAME = "QUEUE";
@@ -110,7 +110,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache
}
///<inheritdoc/>
- public bool IsEnabled([NotNullWhen(true)] object? userState)
+ public bool IsEnabled([NotNullWhen(true)] IPeerEventQueue? userState)
{
return userState is IPeerEventQueue;
}
@@ -125,15 +125,15 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache
}
///<inheritdoc/>
- public bool TryDequeue(object userState, out ChangeEvent changeEvent)
+ public bool TryDequeue(IPeerEventQueue userState, out ChangeEvent changeEvent)
{
- return (userState as IPeerEventQueue)!.TryDequeue(out changeEvent);
+ return userState.TryDequeue(out changeEvent);
}
///<inheritdoc/>
- public ValueTask<ChangeEvent> DequeueAsync(object userState, CancellationToken cancellation)
+ public ValueTask<ChangeEvent> DequeueAsync(IPeerEventQueue userState, CancellationToken cancellation)
{
- return (userState as IPeerEventQueue)!.DequeueAsync(cancellation);
+ return userState.DequeueAsync(cancellation);
}
}
}
diff --git a/plugins/ObjectCacheServer/src/Cache/CacheStore.cs b/plugins/ObjectCacheServer/src/Cache/CacheStore.cs
index 5795222..02ed9b1 100644
--- a/plugins/ObjectCacheServer/src/Cache/CacheStore.cs
+++ b/plugins/ObjectCacheServer/src/Cache/CacheStore.cs
@@ -27,6 +27,7 @@ using System.Threading;
using System.Threading.Tasks;
using VNLib.Utils.Logging;
+using VNLib.Net.Messaging.FBM;
using VNLib.Plugins;
using VNLib.Plugins.Extensions.Loading;
@@ -43,7 +44,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache
/// <summary>
/// Gets the underlying cache listener
/// </summary>
- public BlobCacheListener Listener { get; }
+ public BlobCacheListener<IPeerEventQueue> Listener { get; }
public CacheStore(PluginBase plugin, IConfigScope config)
@@ -53,7 +54,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache
}
///<inheritdoc/>
- ValueTask ICacheStore.AddOrUpdateBlobAsync<T>(string objectId, string? alternateId, GetBodyDataCallback<T> bodyData, T state, CancellationToken token)
+ ValueTask ICacheStore.AddOrUpdateBlobAsync<T>(string objectId, string? alternateId, ObjectDataReader<T> bodyData, T state, CancellationToken token)
{
return Listener.Cache.AddOrUpdateObjectAsync(objectId, alternateId, bodyData, state, default, token);
}
@@ -70,7 +71,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache
return Listener.Cache.DeleteObjectAsync(id, token);
}
- private static BlobCacheListener InitializeCache(ObjectCacheServerEntry plugin, IConfigScope config)
+ private static BlobCacheListener<IPeerEventQueue> InitializeCache(ObjectCacheServerEntry plugin, IConfigScope config)
{
const string CacheConfigTemplate =
@"
@@ -105,7 +106,7 @@ Cache Configuration:
);
//Get the event listener
- ICacheListenerEventQueue queue = plugin.GetOrCreateSingleton<CacheListenerPubQueue>();
+ ICacheListenerEventQueue<IPeerEventQueue> queue = plugin.GetOrCreateSingleton<CacheListenerPubQueue>();
//Get the memory manager
ICacheMemoryManagerFactory manager = plugin.GetOrCreateSingleton<BucketLocalManagerFactory>();
@@ -113,8 +114,10 @@ Cache Configuration:
//Load the blob cache table system
IBlobCacheTable bc = plugin.LoadMemoryCacheSystem(config, manager, cacheConf);
+ FallbackFBMMemoryManager fbmMemManager = new(plugin.ListenerHeap);
+
//Endpoint only allows for a single reader
- return new(bc, queue, plugin.Log, plugin.ListenerHeap);
+ return new(bc, queue, plugin.Log, fbmMemManager);
}
/*
diff --git a/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs b/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs
index 19f09dc..dbfd091 100644
--- a/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs
+++ b/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs
@@ -231,18 +231,25 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
private async Task ReplicationWorkerDoWorkAsync(FBMClient client, ILogProvider log, CancellationToken exitToken)
{
//Reusable request message
- using FBMRequest request = new(client.Config);
+ using FBMRequest request = new(in client.Config);
+
+ WaitForChangeResult changedObject = new();
//Listen for changes
while (true)
{
//Wait for changes
- WaitForChangeResult changedObject = await client.WaitForChangeAsync(exitToken);
+ await client.WaitForChangeAsync(changedObject, exitToken);
log.Debug("Object changed {typ} {obj}", changedObject.Status, changedObject.CurrentId);
switch (changedObject.Status)
{
+ /*
+ * During a WFC operation, if a NotFound response is received, it
+ * means a wait queue was not found for the connection, usually meaning
+ * the server does not support replication.
+ */
case ResponseCodes.NotFound:
log.Error("Server cache not properly configured, worker exiting");
return;
@@ -254,8 +261,15 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
//Reload the record from the store
await UpdateRecordAsync(client, request, log, changedObject.CurrentId, changedObject.NewId, exitToken);
break;
+ default:
+ log.Error("Unknown status {status} received from server", changedObject.Status);
+ break;
}
+ changedObject.Status = null;
+ changedObject.CurrentId = null;
+ changedObject.NewId = null;
+
//Reset request message
request.Reset();
}
diff --git a/plugins/ObjectCacheServer/src/Endpoints/CacheNegotationManager.cs b/plugins/ObjectCacheServer/src/Endpoints/CacheNegotationManager.cs
new file mode 100644
index 0000000..d1591f8
--- /dev/null
+++ b/plugins/ObjectCacheServer/src/Endpoints/CacheNegotationManager.cs
@@ -0,0 +1,207 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: ObjectCacheServer
+* File: CacheNegotationManager.cs
+*
+* CacheNegotationManager.cs is part of ObjectCacheServer which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* ObjectCacheServer is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* ObjectCacheServer is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+using System.Net;
+using System.Text.Json;
+
+using VNLib.Hashing;
+using VNLib.Hashing.IdentityUtility;
+using VNLib.Net.Messaging.FBM.Client;
+using VNLib.Plugins;
+using VNLib.Plugins.Extensions.Loading;
+using VNLib.Data.Caching.Extensions;
+using VNLib.Data.Caching.ObjectCache.Server.Cache;
+
+namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
+{
+ internal class ClientNegotiationState
+ {
+ public string? Challenge { get; set; }
+
+ public string? NodeId { get; set; }
+
+ public bool IsPeer { get; set; }
+ }
+
+ internal sealed class CacheNegotationManager
+ {
+ /*
+ * Cache keys are centralized and may be shared between all cache server nodes. This means
+ * that any client would be able to get a signed negotiation from any server and use it to
+ * upgrade a connection to any other server. This is property is to be avoided because servers
+ * may have different configurations that a malicious client could exploit. To prevent that
+ * a unique server generated Audience ID is used in the negotiation token and verified when
+ * an upgrade is requested. This ensures that the client is connecting to the same server
+ * that issued the negotiation token.
+ *
+ * With this operational theory, someone has to expose their buffer configuration. At the moment
+ * I think it would be best for servers to keep their buffer configuration private, as it could
+ * cause more damage to the network. This is not really a protection measure because a malicious
+ * client could use trial and error to find the servers buffer configuration.
+ */
+
+ private static readonly TimeSpan AuthTokenExpiration = TimeSpan.FromSeconds(30);
+
+ private readonly string AudienceLocalServerId;
+ private readonly NodeConfig _nodeConfig;
+ private readonly CacheConfiguration _cacheConfig;
+
+ public CacheNegotationManager(PluginBase plugin)
+ {
+ //Get node configuration
+ _nodeConfig = plugin.GetOrCreateSingleton<NodeConfig>();
+
+ //Get the cache store configuration
+ _cacheConfig = plugin.GetConfigForType<CacheStore>().Deserialze<CacheConfiguration>();
+
+ AudienceLocalServerId = Guid.NewGuid().ToString("N");
+ }
+
+
+ public bool IsClientNegotiationValid(string authToken, out ClientNegotiationState state)
+ {
+ state = new();
+
+ // Parse jwt
+ using JsonWebToken jwt = JsonWebToken.Parse(authToken);
+
+ //verify signature for client
+ if (_nodeConfig.KeyStore.VerifyJwt(jwt, false))
+ {
+ //Validated as normal client
+ }
+ //May be signed by a cache server
+ else if (_nodeConfig.KeyStore.VerifyJwt(jwt, true))
+ {
+ //Set peer and verified flag since the another cache server signed the request
+ state.IsPeer = true;
+ }
+ else
+ {
+ return false;
+ }
+
+ //Recover json body
+ using JsonDocument doc = jwt.GetPayload();
+
+ if (doc.RootElement.TryGetProperty("sub", out JsonElement servIdEl))
+ {
+ state.NodeId = servIdEl.GetString();
+ }
+
+ //Challenge is required
+ state.Challenge = doc.RootElement.GetProperty("chl").GetString()!;
+
+ return true;
+ }
+
+ public JsonWebToken ConfirmCLientNegotiation(ClientNegotiationState state, IPAddress clientIp, DateTimeOffset now)
+ {
+ //Verified, now we can create an auth message with a short expiration
+ JsonWebToken auth = new();
+
+ auth.WriteHeader(_nodeConfig.KeyStore.GetJwtHeader());
+ auth.InitPayloadClaim()
+ .AddClaim("aud", AudienceLocalServerId)
+ .AddClaim("iat", now.ToUnixTimeSeconds())
+ .AddClaim("exp", now.Add(AuthTokenExpiration).ToUnixTimeSeconds())
+ .AddClaim("nonce", RandomHash.GetRandomBase32(8))
+ .AddClaim("chl", state.Challenge!)
+ //Set the ispeer flag if the request was signed by a cache server
+ .AddClaim("isPeer", state.IsPeer)
+ //Specify the server's node id if set
+ .AddClaim("sub", state.NodeId)
+ //Set ip address
+ .AddClaim("ip", clientIp.ToString())
+ //Add negotiaion args
+ .AddClaim(FBMClient.REQ_HEAD_BUF_QUERY_ARG, _cacheConfig.MaxHeaderBufferSize)
+ .AddClaim(FBMClient.REQ_RECV_BUF_QUERY_ARG, _cacheConfig.MaxRecvBufferSize)
+ .AddClaim(FBMClient.REQ_MAX_MESS_QUERY_ARG, _cacheConfig.MaxMessageSize)
+ .CommitClaims();
+
+ //Sign the auth message from our private key
+ _nodeConfig.KeyStore.SignJwt(auth);
+
+ return auth;
+ }
+
+ public bool ValidateUpgrade(string upgradeToken, string tokenSignature, DateTimeOffset now, IPAddress connectionIp, ref string? nodeId, ref bool isPeer)
+ {
+ //Parse jwt
+ using JsonWebToken jwt = JsonWebToken.Parse(upgradeToken);
+
+ //verify signature against the cache public key, since this server must have signed it
+ if (!_nodeConfig.KeyStore.VerifyCachePeer(jwt))
+ {
+ return false;
+ }
+
+ //Recover json body
+ using JsonDocument doc = jwt.GetPayload();
+
+ //Verify audience, expiration
+ if (!doc.RootElement.TryGetProperty("aud", out JsonElement audEl)
+ || !string.Equals(AudienceLocalServerId, audEl.GetString(), StringComparison.OrdinalIgnoreCase))
+ {
+ return false;
+ }
+
+ if (!doc.RootElement.TryGetProperty("exp", out JsonElement expEl)
+ || DateTimeOffset.FromUnixTimeSeconds(expEl.GetInt64()) < now)
+ {
+ return false;
+ }
+
+ //Check node ip address matches if required
+ if (_nodeConfig.VerifyIp)
+ {
+ if (!doc.RootElement.TryGetProperty("ip", out JsonElement ipEl))
+ {
+ return false;
+ }
+
+ string? clientIp = ipEl.GetString();
+
+ //Verify the client ip address matches the one in the token
+ if (clientIp == null || !IPAddress.TryParse(clientIp, out IPAddress? clientIpAddr) || !clientIpAddr.Equals(connectionIp))
+ {
+ return false;
+ }
+ }
+
+ //Check if the client is a peer
+ isPeer = doc.RootElement.TryGetProperty("isPeer", out JsonElement isPeerEl) && isPeerEl.GetBoolean();
+
+ //The node id is optional and stored in the 'sub' field, ignore if the client is not a peer
+ if (isPeer && doc.RootElement.TryGetProperty("sub", out JsonElement servIdEl))
+ {
+ nodeId = servIdEl.GetString();
+ }
+
+ //Verify token signature against a fellow cache public key
+ return _nodeConfig.KeyStore.VerifyUpgradeToken(tokenSignature, upgradeToken, isPeer);
+ }
+ }
+}
diff --git a/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs b/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs
index d07c59e..816e6c3 100644
--- a/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs
+++ b/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs
@@ -24,12 +24,10 @@
using System;
using System.Net;
-using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using System.Collections.Generic;
-using VNLib.Hashing;
using VNLib.Net.Http;
using VNLib.Utils.Memory;
using VNLib.Utils.Logging;
@@ -49,22 +47,20 @@ using VNLib.Data.Caching.Extensions.Clustering;
using VNLib.Data.Caching.ObjectCache.Server.Cache;
using VNLib.Data.Caching.ObjectCache.Server.Clustering;
+
namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
{
internal sealed class ConnectEndpoint : ResourceEndpointBase
{
- private const string LOG_SCOPE_NAME = "CONEP";
-
- private static readonly TimeSpan AuthTokenExpiration = TimeSpan.FromSeconds(30);
-
+ internal const string LOG_SCOPE_NAME = "CONEP";
- private readonly NodeConfig NodeConfiguration;
+
private readonly ICacheEventQueueManager PubSubManager;
private readonly IPeerMonitor Peers;
- private readonly BlobCacheListener Store;
-
- private readonly string AudienceLocalServerId;
+ private readonly BlobCacheListener<IPeerEventQueue> Store;
+ private readonly NodeConfig NodeConfiguration;
+ private readonly CacheNegotationManager AuthManager;
private uint _connectedClients;
@@ -105,11 +101,8 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
//Get the cache store configuration
CacheConfig = plugin.GetConfigForType<CacheStore>().Deserialze<CacheConfiguration>();
- /*
- * Generate a random guid for the current server when created so we
- * know client tokens belong to us when singed by the same key
- */
- AudienceLocalServerId = Guid.NewGuid().ToString("N");
+ //Get the auth manager
+ AuthManager = plugin.GetOrCreateSingleton<CacheNegotationManager>();
}
@@ -136,52 +129,19 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
string? jwtAuth = entity.Server.Headers[HttpRequestHeader.Authorization];
if (string.IsNullOrWhiteSpace(jwtAuth))
{
- entity.CloseResponse(HttpStatusCode.Unauthorized);
- return VfReturnType.VirtualSkip;
+ return VirtualClose(entity, HttpStatusCode.Forbidden);
}
- string? nodeId = null;
- string? challenge = null;
- bool isPeer = false;
-
- // Parse jwt
- using (JsonWebToken jwt = JsonWebToken.Parse(jwtAuth))
+ //Create negotiation state
+ if(!AuthManager.IsClientNegotiationValid(jwtAuth, out ClientNegotiationState state))
{
- //verify signature for client
- if (NodeConfiguration.KeyStore.VerifyJwt(jwt, false))
- {
- //Validated
- }
- //May be signed by a cache server
- else if(NodeConfiguration.KeyStore.VerifyJwt(jwt, true))
- {
- //Set peer and verified flag since the another cache server signed the request
- isPeer = true;
- }
- else
- {
- Log.Information("Client signature verification failed");
- entity.CloseResponse(HttpStatusCode.Unauthorized);
- return VfReturnType.VirtualSkip;
- }
-
- //Recover json body
- using JsonDocument doc = jwt.GetPayload();
-
- if (doc.RootElement.TryGetProperty("sub", out JsonElement servIdEl))
- {
- nodeId = servIdEl.GetString();
- }
-
- if (doc.RootElement.TryGetProperty("chl", out JsonElement challengeEl))
- {
- challenge = challengeEl.GetString();
- }
+ Log.Information("Initial negotiation client signature verification failed");
+ return VirtualClose(entity, HttpStatusCode.Unauthorized);
}
- if (isPeer)
+ if (state.IsPeer)
{
- Log.Debug("Received negotiation request from peer node {node}", nodeId);
+ Log.Debug("Received negotiation request from peer node {node}", state.NodeId);
}
else
{
@@ -189,29 +149,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
}
//Verified, now we can create an auth message with a short expiration
- using JsonWebToken auth = new();
-
- auth.WriteHeader(NodeConfiguration.KeyStore.GetJwtHeader());
- auth.InitPayloadClaim()
- .AddClaim("aud", AudienceLocalServerId)
- .AddClaim("iat", entity.RequestedTimeUtc.ToUnixTimeSeconds())
- .AddClaim("exp", entity.RequestedTimeUtc.Add(AuthTokenExpiration).ToUnixTimeSeconds())
- .AddClaim("nonce", RandomHash.GetRandomBase32(8))
- .AddClaim("chl", challenge!)
- //Set the ispeer flag if the request was signed by a cache server
- .AddClaim("isPeer", isPeer)
- //Specify the server's node id if set
- .AddClaim("sub", nodeId!)
- //Set ip address
- .AddClaim("ip", entity.TrustedRemoteIp.ToString())
- //Add negotiaion args
- .AddClaim(FBMClient.REQ_HEAD_BUF_QUERY_ARG, CacheConfig.MaxHeaderBufferSize)
- .AddClaim(FBMClient.REQ_RECV_BUF_QUERY_ARG, CacheConfig.MaxRecvBufferSize)
- .AddClaim(FBMClient.REQ_MAX_MESS_QUERY_ARG, CacheConfig.MaxMessageSize)
- .CommitClaims();
-
- //Sign the auth message from our private key
- NodeConfiguration.KeyStore.SignJwt(auth);
+ using JsonWebToken auth = AuthManager.ConfirmCLientNegotiation(state, entity.TrustedRemoteIp, entity.RequestedTimeUtc);
//Close response
entity.CloseResponse(HttpStatusCode.OK, ContentType.Text, auth.DataBuffer);
@@ -222,101 +160,35 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
{
//Parse jwt from authorization
string? jwtAuth = entity.Server.Headers[HttpRequestHeader.Authorization];
-
- if (string.IsNullOrWhiteSpace(jwtAuth))
- {
- entity.CloseResponse(HttpStatusCode.Unauthorized);
- return VfReturnType.VirtualSkip;
- }
-
- //Get the upgrade signature header
string? clientSignature = entity.Server.Headers[FBMDataCacheExtensions.X_UPGRADE_SIG_HEADER];
+ string? optionalDiscovery = entity.Server.Headers[FBMDataCacheExtensions.X_NODE_DISCOVERY_HEADER];
- if (string.IsNullOrWhiteSpace(clientSignature))
+ //Not null
+ if (string.IsNullOrWhiteSpace(jwtAuth) || string.IsNullOrWhiteSpace(clientSignature))
{
- entity.CloseResponse(HttpStatusCode.Unauthorized);
- return VfReturnType.VirtualSkip;
+ return VfReturnType.Forbidden;
}
string? nodeId = null;
- CacheNodeAdvertisment? discoveryAd = null;
+ bool isPeer = false;
- //Parse jwt
- using (JsonWebToken jwt = JsonWebToken.Parse(jwtAuth))
+ //Validate upgrade request
+ if (!AuthManager.ValidateUpgrade(jwtAuth, clientSignature, entity.RequestedTimeUtc, entity.TrustedRemoteIp, ref nodeId, ref isPeer))
{
- //verify signature against the cache public key, since this server must have signed it
- if (!NodeConfiguration.KeyStore.VerifyCachePeer(jwt))
- {
- entity.CloseResponse(HttpStatusCode.Unauthorized);
- return VfReturnType.VirtualSkip;
- }
-
- //Recover json body
- using JsonDocument doc = jwt.GetPayload();
-
- //Verify audience, expiration
-
- if (!doc.RootElement.TryGetProperty("aud", out JsonElement audEl)
- || !AudienceLocalServerId.Equals(audEl.GetString(), StringComparison.OrdinalIgnoreCase))
- {
- entity.CloseResponse(HttpStatusCode.Unauthorized);
- return VfReturnType.VirtualSkip;
- }
-
- if (!doc.RootElement.TryGetProperty("exp", out JsonElement expEl)
- || DateTimeOffset.FromUnixTimeSeconds(expEl.GetInt64()) < entity.RequestedTimeUtc)
- {
- entity.CloseResponse(HttpStatusCode.Unauthorized);
- return VfReturnType.VirtualSkip;
- }
-
- //Check node ip address matches if required
- if (NodeConfiguration.VerifyIp)
- {
- if (!doc.RootElement.TryGetProperty("ip", out JsonElement ipEl))
- {
- entity.CloseResponse(HttpStatusCode.Unauthorized);
- return VfReturnType.VirtualSkip;
- }
-
- string? clientIp = ipEl.GetString();
- //Verify the client ip address matches the one in the token
- if (clientIp == null || !IPAddress.TryParse(clientIp, out IPAddress? clientIpAddr) || !clientIpAddr.Equals(entity.TrustedRemoteIp))
- {
- entity.CloseResponse(HttpStatusCode.Unauthorized);
- return VfReturnType.VirtualSkip;
- }
- }
-
- //Check if the client is a peer
- bool isPeer = doc.RootElement.TryGetProperty("isPeer", out JsonElement isPeerEl) && isPeerEl.GetBoolean();
-
- //The node id is optional and stored in the 'sub' field, ignore if the client is not a peer
- if (isPeer && doc.RootElement.TryGetProperty("sub", out JsonElement servIdEl))
- {
- nodeId = servIdEl.GetString();
- }
-
- //Verify the signature the client included of the auth token
+ return VirtualClose(entity, HttpStatusCode.Unauthorized);
+ }
- //Verify token signature against a fellow cache public key
- if (!NodeConfiguration.KeyStore.VerifyUpgradeToken(clientSignature, jwtAuth, isPeer))
- {
- entity.CloseResponse(HttpStatusCode.Unauthorized);
- return VfReturnType.VirtualSkip;
- }
+ CacheNodeAdvertisment? discoveryAd = null;
- if (isPeer)
- {
- //Try to get the node advertisement header
- string? discoveryHeader = entity.Server.Headers[FBMDataCacheExtensions.X_NODE_DISCOVERY_HEADER];
+ /*
+ * If the client is a peer server, it may offer a signed advertisment
+ * that this node will have the duty of making available to other peers
+ * if it is valid
+ */
- //Verify the node advertisement header and publish it
- if (!string.IsNullOrWhiteSpace(discoveryHeader))
- {
- discoveryAd = NodeConfiguration.KeyStore.VerifyPeerAdvertisment(discoveryHeader);
- }
- }
+ if (isPeer && !string.IsNullOrWhiteSpace(optionalDiscovery))
+ {
+ discoveryAd = NodeConfiguration.KeyStore.VerifyPeerAdvertisment(optionalDiscovery);
}
WsUserState state;
@@ -369,7 +241,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
Log.Debug("Client buffer state {state}", state);
//Accept socket and pass state object
- entity.AcceptWebSocket(WebsocketAcceptedAsync, state);
+ _ = entity.AcceptWebSocket(WebsocketAcceptedAsync, state);
return VfReturnType.VirtualSkip;
}
@@ -400,7 +272,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
};
//Check if the client is a peer node, if it is, subscribe to change events
- if (!string.IsNullOrWhiteSpace(state.NodeId))
+ if (state.IsPeer)
{
//Get the event queue for the current node
IPeerEventQueue queue = PubSubManager.Subscribe(state);
@@ -408,7 +280,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
try
{
//Begin listening for messages with a queue
- await Store.ListenAsync(wss, args, queue);
+ await Store.ListenAsync(wss, queue, args);
}
finally
{
@@ -419,7 +291,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
else
{
//Begin listening for messages without a queue
- await Store.ListenAsync(wss, args, null);
+ await Store.ListenAsync(wss, null!, args);
}
}
catch (OperationCanceledException)
@@ -456,6 +328,8 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
public string? NodeId { get; init; }
public CacheNodeAdvertisment? Advertisment { get; init; }
+ public bool IsPeer => !string.IsNullOrWhiteSpace(NodeId);
+
public override string ToString()
{
return
diff --git a/plugins/ObjectCacheServer/src/ICacheStore.cs b/plugins/ObjectCacheServer/src/ICacheStore.cs
index f911af9..a638169 100644
--- a/plugins/ObjectCacheServer/src/ICacheStore.cs
+++ b/plugins/ObjectCacheServer/src/ICacheStore.cs
@@ -38,7 +38,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server
/// <param name="state">The state parameter to pass to the data callback</param>
/// <param name="token">A token to cancel the async operation</param>
/// <returns>A value task that represents the async operation</returns>
- ValueTask AddOrUpdateBlobAsync<T>(string objectId, string? alternateId, GetBodyDataCallback<T> bodyData, T state, CancellationToken token = default);
+ ValueTask AddOrUpdateBlobAsync<T>(string objectId, string? alternateId, ObjectDataReader<T> bodyData, T state, CancellationToken token = default);
/// <summary>
/// Clears all items from the store