From d2d812213b99ee17f9433f81871b694c4053ff23 Mon Sep 17 00:00:00 2001 From: vnugent Date: Thu, 2 Nov 2023 01:50:05 -0400 Subject: also carried away --- .../src/Cache/CacheListenerPubQueue.cs | 12 +- plugins/ObjectCacheServer/src/Cache/CacheStore.cs | 13 +- .../src/Clustering/CacheNodeReplicationMaanger.cs | 18 +- .../src/Endpoints/CacheNegotationManager.cs | 207 +++++++++++++++++++++ .../src/Endpoints/ConnectEndpoint.cs | 206 ++++---------------- plugins/ObjectCacheServer/src/ICacheStore.cs | 2 +- 6 files changed, 278 insertions(+), 180 deletions(-) create mode 100644 plugins/ObjectCacheServer/src/Endpoints/CacheNegotationManager.cs (limited to 'plugins/ObjectCacheServer') diff --git a/plugins/ObjectCacheServer/src/Cache/CacheListenerPubQueue.cs b/plugins/ObjectCacheServer/src/Cache/CacheListenerPubQueue.cs index ba39db6..6942828 100644 --- a/plugins/ObjectCacheServer/src/Cache/CacheListenerPubQueue.cs +++ b/plugins/ObjectCacheServer/src/Cache/CacheListenerPubQueue.cs @@ -43,7 +43,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache * their individual queues. */ - internal sealed class CacheListenerPubQueue : ICacheListenerEventQueue, IAsyncBackgroundWork + internal sealed class CacheListenerPubQueue : ICacheListenerEventQueue, IAsyncBackgroundWork { private const int MAX_LOCAL_QUEUE_ITEMS = 10000; private const string LOG_SCOPE_NAME = "QUEUE"; @@ -110,7 +110,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache } /// - public bool IsEnabled([NotNullWhen(true)] object? userState) + public bool IsEnabled([NotNullWhen(true)] IPeerEventQueue? userState) { return userState is IPeerEventQueue; } @@ -125,15 +125,15 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache } /// - public bool TryDequeue(object userState, out ChangeEvent changeEvent) + public bool TryDequeue(IPeerEventQueue userState, out ChangeEvent changeEvent) { - return (userState as IPeerEventQueue)!.TryDequeue(out changeEvent); + return userState.TryDequeue(out changeEvent); } /// - public ValueTask DequeueAsync(object userState, CancellationToken cancellation) + public ValueTask DequeueAsync(IPeerEventQueue userState, CancellationToken cancellation) { - return (userState as IPeerEventQueue)!.DequeueAsync(cancellation); + return userState.DequeueAsync(cancellation); } } } diff --git a/plugins/ObjectCacheServer/src/Cache/CacheStore.cs b/plugins/ObjectCacheServer/src/Cache/CacheStore.cs index 5795222..02ed9b1 100644 --- a/plugins/ObjectCacheServer/src/Cache/CacheStore.cs +++ b/plugins/ObjectCacheServer/src/Cache/CacheStore.cs @@ -27,6 +27,7 @@ using System.Threading; using System.Threading.Tasks; using VNLib.Utils.Logging; +using VNLib.Net.Messaging.FBM; using VNLib.Plugins; using VNLib.Plugins.Extensions.Loading; @@ -43,7 +44,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache /// /// Gets the underlying cache listener /// - public BlobCacheListener Listener { get; } + public BlobCacheListener Listener { get; } public CacheStore(PluginBase plugin, IConfigScope config) @@ -53,7 +54,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache } /// - ValueTask ICacheStore.AddOrUpdateBlobAsync(string objectId, string? alternateId, GetBodyDataCallback bodyData, T state, CancellationToken token) + ValueTask ICacheStore.AddOrUpdateBlobAsync(string objectId, string? alternateId, ObjectDataReader bodyData, T state, CancellationToken token) { return Listener.Cache.AddOrUpdateObjectAsync(objectId, alternateId, bodyData, state, default, token); } @@ -70,7 +71,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache return Listener.Cache.DeleteObjectAsync(id, token); } - private static BlobCacheListener InitializeCache(ObjectCacheServerEntry plugin, IConfigScope config) + private static BlobCacheListener InitializeCache(ObjectCacheServerEntry plugin, IConfigScope config) { const string CacheConfigTemplate = @" @@ -105,7 +106,7 @@ Cache Configuration: ); //Get the event listener - ICacheListenerEventQueue queue = plugin.GetOrCreateSingleton(); + ICacheListenerEventQueue queue = plugin.GetOrCreateSingleton(); //Get the memory manager ICacheMemoryManagerFactory manager = plugin.GetOrCreateSingleton(); @@ -113,8 +114,10 @@ Cache Configuration: //Load the blob cache table system IBlobCacheTable bc = plugin.LoadMemoryCacheSystem(config, manager, cacheConf); + FallbackFBMMemoryManager fbmMemManager = new(plugin.ListenerHeap); + //Endpoint only allows for a single reader - return new(bc, queue, plugin.Log, plugin.ListenerHeap); + return new(bc, queue, plugin.Log, fbmMemManager); } /* diff --git a/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs b/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs index 19f09dc..dbfd091 100644 --- a/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs +++ b/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs @@ -231,18 +231,25 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering private async Task ReplicationWorkerDoWorkAsync(FBMClient client, ILogProvider log, CancellationToken exitToken) { //Reusable request message - using FBMRequest request = new(client.Config); + using FBMRequest request = new(in client.Config); + + WaitForChangeResult changedObject = new(); //Listen for changes while (true) { //Wait for changes - WaitForChangeResult changedObject = await client.WaitForChangeAsync(exitToken); + await client.WaitForChangeAsync(changedObject, exitToken); log.Debug("Object changed {typ} {obj}", changedObject.Status, changedObject.CurrentId); switch (changedObject.Status) { + /* + * During a WFC operation, if a NotFound response is received, it + * means a wait queue was not found for the connection, usually meaning + * the server does not support replication. + */ case ResponseCodes.NotFound: log.Error("Server cache not properly configured, worker exiting"); return; @@ -254,8 +261,15 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering //Reload the record from the store await UpdateRecordAsync(client, request, log, changedObject.CurrentId, changedObject.NewId, exitToken); break; + default: + log.Error("Unknown status {status} received from server", changedObject.Status); + break; } + changedObject.Status = null; + changedObject.CurrentId = null; + changedObject.NewId = null; + //Reset request message request.Reset(); } diff --git a/plugins/ObjectCacheServer/src/Endpoints/CacheNegotationManager.cs b/plugins/ObjectCacheServer/src/Endpoints/CacheNegotationManager.cs new file mode 100644 index 0000000..d1591f8 --- /dev/null +++ b/plugins/ObjectCacheServer/src/Endpoints/CacheNegotationManager.cs @@ -0,0 +1,207 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: ObjectCacheServer +* File: CacheNegotationManager.cs +* +* CacheNegotationManager.cs is part of ObjectCacheServer which is part of the larger +* VNLib collection of libraries and utilities. +* +* ObjectCacheServer is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* ObjectCacheServer is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Net; +using System.Text.Json; + +using VNLib.Hashing; +using VNLib.Hashing.IdentityUtility; +using VNLib.Net.Messaging.FBM.Client; +using VNLib.Plugins; +using VNLib.Plugins.Extensions.Loading; +using VNLib.Data.Caching.Extensions; +using VNLib.Data.Caching.ObjectCache.Server.Cache; + +namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints +{ + internal class ClientNegotiationState + { + public string? Challenge { get; set; } + + public string? NodeId { get; set; } + + public bool IsPeer { get; set; } + } + + internal sealed class CacheNegotationManager + { + /* + * Cache keys are centralized and may be shared between all cache server nodes. This means + * that any client would be able to get a signed negotiation from any server and use it to + * upgrade a connection to any other server. This is property is to be avoided because servers + * may have different configurations that a malicious client could exploit. To prevent that + * a unique server generated Audience ID is used in the negotiation token and verified when + * an upgrade is requested. This ensures that the client is connecting to the same server + * that issued the negotiation token. + * + * With this operational theory, someone has to expose their buffer configuration. At the moment + * I think it would be best for servers to keep their buffer configuration private, as it could + * cause more damage to the network. This is not really a protection measure because a malicious + * client could use trial and error to find the servers buffer configuration. + */ + + private static readonly TimeSpan AuthTokenExpiration = TimeSpan.FromSeconds(30); + + private readonly string AudienceLocalServerId; + private readonly NodeConfig _nodeConfig; + private readonly CacheConfiguration _cacheConfig; + + public CacheNegotationManager(PluginBase plugin) + { + //Get node configuration + _nodeConfig = plugin.GetOrCreateSingleton(); + + //Get the cache store configuration + _cacheConfig = plugin.GetConfigForType().Deserialze(); + + AudienceLocalServerId = Guid.NewGuid().ToString("N"); + } + + + public bool IsClientNegotiationValid(string authToken, out ClientNegotiationState state) + { + state = new(); + + // Parse jwt + using JsonWebToken jwt = JsonWebToken.Parse(authToken); + + //verify signature for client + if (_nodeConfig.KeyStore.VerifyJwt(jwt, false)) + { + //Validated as normal client + } + //May be signed by a cache server + else if (_nodeConfig.KeyStore.VerifyJwt(jwt, true)) + { + //Set peer and verified flag since the another cache server signed the request + state.IsPeer = true; + } + else + { + return false; + } + + //Recover json body + using JsonDocument doc = jwt.GetPayload(); + + if (doc.RootElement.TryGetProperty("sub", out JsonElement servIdEl)) + { + state.NodeId = servIdEl.GetString(); + } + + //Challenge is required + state.Challenge = doc.RootElement.GetProperty("chl").GetString()!; + + return true; + } + + public JsonWebToken ConfirmCLientNegotiation(ClientNegotiationState state, IPAddress clientIp, DateTimeOffset now) + { + //Verified, now we can create an auth message with a short expiration + JsonWebToken auth = new(); + + auth.WriteHeader(_nodeConfig.KeyStore.GetJwtHeader()); + auth.InitPayloadClaim() + .AddClaim("aud", AudienceLocalServerId) + .AddClaim("iat", now.ToUnixTimeSeconds()) + .AddClaim("exp", now.Add(AuthTokenExpiration).ToUnixTimeSeconds()) + .AddClaim("nonce", RandomHash.GetRandomBase32(8)) + .AddClaim("chl", state.Challenge!) + //Set the ispeer flag if the request was signed by a cache server + .AddClaim("isPeer", state.IsPeer) + //Specify the server's node id if set + .AddClaim("sub", state.NodeId) + //Set ip address + .AddClaim("ip", clientIp.ToString()) + //Add negotiaion args + .AddClaim(FBMClient.REQ_HEAD_BUF_QUERY_ARG, _cacheConfig.MaxHeaderBufferSize) + .AddClaim(FBMClient.REQ_RECV_BUF_QUERY_ARG, _cacheConfig.MaxRecvBufferSize) + .AddClaim(FBMClient.REQ_MAX_MESS_QUERY_ARG, _cacheConfig.MaxMessageSize) + .CommitClaims(); + + //Sign the auth message from our private key + _nodeConfig.KeyStore.SignJwt(auth); + + return auth; + } + + public bool ValidateUpgrade(string upgradeToken, string tokenSignature, DateTimeOffset now, IPAddress connectionIp, ref string? nodeId, ref bool isPeer) + { + //Parse jwt + using JsonWebToken jwt = JsonWebToken.Parse(upgradeToken); + + //verify signature against the cache public key, since this server must have signed it + if (!_nodeConfig.KeyStore.VerifyCachePeer(jwt)) + { + return false; + } + + //Recover json body + using JsonDocument doc = jwt.GetPayload(); + + //Verify audience, expiration + if (!doc.RootElement.TryGetProperty("aud", out JsonElement audEl) + || !string.Equals(AudienceLocalServerId, audEl.GetString(), StringComparison.OrdinalIgnoreCase)) + { + return false; + } + + if (!doc.RootElement.TryGetProperty("exp", out JsonElement expEl) + || DateTimeOffset.FromUnixTimeSeconds(expEl.GetInt64()) < now) + { + return false; + } + + //Check node ip address matches if required + if (_nodeConfig.VerifyIp) + { + if (!doc.RootElement.TryGetProperty("ip", out JsonElement ipEl)) + { + return false; + } + + string? clientIp = ipEl.GetString(); + + //Verify the client ip address matches the one in the token + if (clientIp == null || !IPAddress.TryParse(clientIp, out IPAddress? clientIpAddr) || !clientIpAddr.Equals(connectionIp)) + { + return false; + } + } + + //Check if the client is a peer + isPeer = doc.RootElement.TryGetProperty("isPeer", out JsonElement isPeerEl) && isPeerEl.GetBoolean(); + + //The node id is optional and stored in the 'sub' field, ignore if the client is not a peer + if (isPeer && doc.RootElement.TryGetProperty("sub", out JsonElement servIdEl)) + { + nodeId = servIdEl.GetString(); + } + + //Verify token signature against a fellow cache public key + return _nodeConfig.KeyStore.VerifyUpgradeToken(tokenSignature, upgradeToken, isPeer); + } + } +} diff --git a/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs b/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs index d07c59e..816e6c3 100644 --- a/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs +++ b/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs @@ -24,12 +24,10 @@ using System; using System.Net; -using System.Text.Json; using System.Threading; using System.Threading.Tasks; using System.Collections.Generic; -using VNLib.Hashing; using VNLib.Net.Http; using VNLib.Utils.Memory; using VNLib.Utils.Logging; @@ -49,22 +47,20 @@ using VNLib.Data.Caching.Extensions.Clustering; using VNLib.Data.Caching.ObjectCache.Server.Cache; using VNLib.Data.Caching.ObjectCache.Server.Clustering; + namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints { internal sealed class ConnectEndpoint : ResourceEndpointBase { - private const string LOG_SCOPE_NAME = "CONEP"; - - private static readonly TimeSpan AuthTokenExpiration = TimeSpan.FromSeconds(30); - + internal const string LOG_SCOPE_NAME = "CONEP"; - private readonly NodeConfig NodeConfiguration; + private readonly ICacheEventQueueManager PubSubManager; private readonly IPeerMonitor Peers; - private readonly BlobCacheListener Store; - - private readonly string AudienceLocalServerId; + private readonly BlobCacheListener Store; + private readonly NodeConfig NodeConfiguration; + private readonly CacheNegotationManager AuthManager; private uint _connectedClients; @@ -105,11 +101,8 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints //Get the cache store configuration CacheConfig = plugin.GetConfigForType().Deserialze(); - /* - * Generate a random guid for the current server when created so we - * know client tokens belong to us when singed by the same key - */ - AudienceLocalServerId = Guid.NewGuid().ToString("N"); + //Get the auth manager + AuthManager = plugin.GetOrCreateSingleton(); } @@ -136,52 +129,19 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints string? jwtAuth = entity.Server.Headers[HttpRequestHeader.Authorization]; if (string.IsNullOrWhiteSpace(jwtAuth)) { - entity.CloseResponse(HttpStatusCode.Unauthorized); - return VfReturnType.VirtualSkip; + return VirtualClose(entity, HttpStatusCode.Forbidden); } - string? nodeId = null; - string? challenge = null; - bool isPeer = false; - - // Parse jwt - using (JsonWebToken jwt = JsonWebToken.Parse(jwtAuth)) + //Create negotiation state + if(!AuthManager.IsClientNegotiationValid(jwtAuth, out ClientNegotiationState state)) { - //verify signature for client - if (NodeConfiguration.KeyStore.VerifyJwt(jwt, false)) - { - //Validated - } - //May be signed by a cache server - else if(NodeConfiguration.KeyStore.VerifyJwt(jwt, true)) - { - //Set peer and verified flag since the another cache server signed the request - isPeer = true; - } - else - { - Log.Information("Client signature verification failed"); - entity.CloseResponse(HttpStatusCode.Unauthorized); - return VfReturnType.VirtualSkip; - } - - //Recover json body - using JsonDocument doc = jwt.GetPayload(); - - if (doc.RootElement.TryGetProperty("sub", out JsonElement servIdEl)) - { - nodeId = servIdEl.GetString(); - } - - if (doc.RootElement.TryGetProperty("chl", out JsonElement challengeEl)) - { - challenge = challengeEl.GetString(); - } + Log.Information("Initial negotiation client signature verification failed"); + return VirtualClose(entity, HttpStatusCode.Unauthorized); } - if (isPeer) + if (state.IsPeer) { - Log.Debug("Received negotiation request from peer node {node}", nodeId); + Log.Debug("Received negotiation request from peer node {node}", state.NodeId); } else { @@ -189,29 +149,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints } //Verified, now we can create an auth message with a short expiration - using JsonWebToken auth = new(); - - auth.WriteHeader(NodeConfiguration.KeyStore.GetJwtHeader()); - auth.InitPayloadClaim() - .AddClaim("aud", AudienceLocalServerId) - .AddClaim("iat", entity.RequestedTimeUtc.ToUnixTimeSeconds()) - .AddClaim("exp", entity.RequestedTimeUtc.Add(AuthTokenExpiration).ToUnixTimeSeconds()) - .AddClaim("nonce", RandomHash.GetRandomBase32(8)) - .AddClaim("chl", challenge!) - //Set the ispeer flag if the request was signed by a cache server - .AddClaim("isPeer", isPeer) - //Specify the server's node id if set - .AddClaim("sub", nodeId!) - //Set ip address - .AddClaim("ip", entity.TrustedRemoteIp.ToString()) - //Add negotiaion args - .AddClaim(FBMClient.REQ_HEAD_BUF_QUERY_ARG, CacheConfig.MaxHeaderBufferSize) - .AddClaim(FBMClient.REQ_RECV_BUF_QUERY_ARG, CacheConfig.MaxRecvBufferSize) - .AddClaim(FBMClient.REQ_MAX_MESS_QUERY_ARG, CacheConfig.MaxMessageSize) - .CommitClaims(); - - //Sign the auth message from our private key - NodeConfiguration.KeyStore.SignJwt(auth); + using JsonWebToken auth = AuthManager.ConfirmCLientNegotiation(state, entity.TrustedRemoteIp, entity.RequestedTimeUtc); //Close response entity.CloseResponse(HttpStatusCode.OK, ContentType.Text, auth.DataBuffer); @@ -222,101 +160,35 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints { //Parse jwt from authorization string? jwtAuth = entity.Server.Headers[HttpRequestHeader.Authorization]; - - if (string.IsNullOrWhiteSpace(jwtAuth)) - { - entity.CloseResponse(HttpStatusCode.Unauthorized); - return VfReturnType.VirtualSkip; - } - - //Get the upgrade signature header string? clientSignature = entity.Server.Headers[FBMDataCacheExtensions.X_UPGRADE_SIG_HEADER]; + string? optionalDiscovery = entity.Server.Headers[FBMDataCacheExtensions.X_NODE_DISCOVERY_HEADER]; - if (string.IsNullOrWhiteSpace(clientSignature)) + //Not null + if (string.IsNullOrWhiteSpace(jwtAuth) || string.IsNullOrWhiteSpace(clientSignature)) { - entity.CloseResponse(HttpStatusCode.Unauthorized); - return VfReturnType.VirtualSkip; + return VfReturnType.Forbidden; } string? nodeId = null; - CacheNodeAdvertisment? discoveryAd = null; + bool isPeer = false; - //Parse jwt - using (JsonWebToken jwt = JsonWebToken.Parse(jwtAuth)) + //Validate upgrade request + if (!AuthManager.ValidateUpgrade(jwtAuth, clientSignature, entity.RequestedTimeUtc, entity.TrustedRemoteIp, ref nodeId, ref isPeer)) { - //verify signature against the cache public key, since this server must have signed it - if (!NodeConfiguration.KeyStore.VerifyCachePeer(jwt)) - { - entity.CloseResponse(HttpStatusCode.Unauthorized); - return VfReturnType.VirtualSkip; - } - - //Recover json body - using JsonDocument doc = jwt.GetPayload(); - - //Verify audience, expiration - - if (!doc.RootElement.TryGetProperty("aud", out JsonElement audEl) - || !AudienceLocalServerId.Equals(audEl.GetString(), StringComparison.OrdinalIgnoreCase)) - { - entity.CloseResponse(HttpStatusCode.Unauthorized); - return VfReturnType.VirtualSkip; - } - - if (!doc.RootElement.TryGetProperty("exp", out JsonElement expEl) - || DateTimeOffset.FromUnixTimeSeconds(expEl.GetInt64()) < entity.RequestedTimeUtc) - { - entity.CloseResponse(HttpStatusCode.Unauthorized); - return VfReturnType.VirtualSkip; - } - - //Check node ip address matches if required - if (NodeConfiguration.VerifyIp) - { - if (!doc.RootElement.TryGetProperty("ip", out JsonElement ipEl)) - { - entity.CloseResponse(HttpStatusCode.Unauthorized); - return VfReturnType.VirtualSkip; - } - - string? clientIp = ipEl.GetString(); - //Verify the client ip address matches the one in the token - if (clientIp == null || !IPAddress.TryParse(clientIp, out IPAddress? clientIpAddr) || !clientIpAddr.Equals(entity.TrustedRemoteIp)) - { - entity.CloseResponse(HttpStatusCode.Unauthorized); - return VfReturnType.VirtualSkip; - } - } - - //Check if the client is a peer - bool isPeer = doc.RootElement.TryGetProperty("isPeer", out JsonElement isPeerEl) && isPeerEl.GetBoolean(); - - //The node id is optional and stored in the 'sub' field, ignore if the client is not a peer - if (isPeer && doc.RootElement.TryGetProperty("sub", out JsonElement servIdEl)) - { - nodeId = servIdEl.GetString(); - } - - //Verify the signature the client included of the auth token + return VirtualClose(entity, HttpStatusCode.Unauthorized); + } - //Verify token signature against a fellow cache public key - if (!NodeConfiguration.KeyStore.VerifyUpgradeToken(clientSignature, jwtAuth, isPeer)) - { - entity.CloseResponse(HttpStatusCode.Unauthorized); - return VfReturnType.VirtualSkip; - } + CacheNodeAdvertisment? discoveryAd = null; - if (isPeer) - { - //Try to get the node advertisement header - string? discoveryHeader = entity.Server.Headers[FBMDataCacheExtensions.X_NODE_DISCOVERY_HEADER]; + /* + * If the client is a peer server, it may offer a signed advertisment + * that this node will have the duty of making available to other peers + * if it is valid + */ - //Verify the node advertisement header and publish it - if (!string.IsNullOrWhiteSpace(discoveryHeader)) - { - discoveryAd = NodeConfiguration.KeyStore.VerifyPeerAdvertisment(discoveryHeader); - } - } + if (isPeer && !string.IsNullOrWhiteSpace(optionalDiscovery)) + { + discoveryAd = NodeConfiguration.KeyStore.VerifyPeerAdvertisment(optionalDiscovery); } WsUserState state; @@ -369,7 +241,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints Log.Debug("Client buffer state {state}", state); //Accept socket and pass state object - entity.AcceptWebSocket(WebsocketAcceptedAsync, state); + _ = entity.AcceptWebSocket(WebsocketAcceptedAsync, state); return VfReturnType.VirtualSkip; } @@ -400,7 +272,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints }; //Check if the client is a peer node, if it is, subscribe to change events - if (!string.IsNullOrWhiteSpace(state.NodeId)) + if (state.IsPeer) { //Get the event queue for the current node IPeerEventQueue queue = PubSubManager.Subscribe(state); @@ -408,7 +280,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints try { //Begin listening for messages with a queue - await Store.ListenAsync(wss, args, queue); + await Store.ListenAsync(wss, queue, args); } finally { @@ -419,7 +291,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints else { //Begin listening for messages without a queue - await Store.ListenAsync(wss, args, null); + await Store.ListenAsync(wss, null!, args); } } catch (OperationCanceledException) @@ -456,6 +328,8 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints public string? NodeId { get; init; } public CacheNodeAdvertisment? Advertisment { get; init; } + public bool IsPeer => !string.IsNullOrWhiteSpace(NodeId); + public override string ToString() { return diff --git a/plugins/ObjectCacheServer/src/ICacheStore.cs b/plugins/ObjectCacheServer/src/ICacheStore.cs index f911af9..a638169 100644 --- a/plugins/ObjectCacheServer/src/ICacheStore.cs +++ b/plugins/ObjectCacheServer/src/ICacheStore.cs @@ -38,7 +38,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server /// The state parameter to pass to the data callback /// A token to cancel the async operation /// A value task that represents the async operation - ValueTask AddOrUpdateBlobAsync(string objectId, string? alternateId, GetBodyDataCallback bodyData, T state, CancellationToken token = default); + ValueTask AddOrUpdateBlobAsync(string objectId, string? alternateId, ObjectDataReader bodyData, T state, CancellationToken token = default); /// /// Clears all items from the store -- cgit