aboutsummaryrefslogtreecommitdiff
path: root/plugins/ObjectCacheServer/src/Endpoints
diff options
context:
space:
mode:
authorLibravatar vnugent <public@vaughnnugent.com>2023-06-12 19:04:15 -0400
committerLibravatar vnugent <public@vaughnnugent.com>2023-06-12 19:04:15 -0400
commitdc0fc53fd3c3f6c32c8b0d063922c7018fa2c48f (patch)
tree92f963014624a1016f6cb645af5afd18278c54c3 /plugins/ObjectCacheServer/src/Endpoints
parent392b38a40e01f2d4dbd457da122dfaf7a1ffe00f (diff)
Baby steps for autonomous nodes
Diffstat (limited to 'plugins/ObjectCacheServer/src/Endpoints')
-rw-r--r--plugins/ObjectCacheServer/src/Endpoints/BrokerHeartBeatEndpoint.cs (renamed from plugins/ObjectCacheServer/src/Endpoints/BrokerHeartBeat.cs)69
-rw-r--r--plugins/ObjectCacheServer/src/Endpoints/CacheConfiguration.cs57
-rw-r--r--plugins/ObjectCacheServer/src/Endpoints/CacheStore.cs91
-rw-r--r--plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs437
-rw-r--r--plugins/ObjectCacheServer/src/Endpoints/ICacheStore.cs56
-rw-r--r--plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs129
6 files changed, 483 insertions, 356 deletions
diff --git a/plugins/ObjectCacheServer/src/Endpoints/BrokerHeartBeat.cs b/plugins/ObjectCacheServer/src/Endpoints/BrokerHeartBeatEndpoint.cs
index 97061b3..b9c00e6 100644
--- a/plugins/ObjectCacheServer/src/Endpoints/BrokerHeartBeat.cs
+++ b/plugins/ObjectCacheServer/src/Endpoints/BrokerHeartBeatEndpoint.cs
@@ -1,11 +1,11 @@
/*
-* Copyright (c) 2022 Vaughn Nugent
+* Copyright (c) 2023 Vaughn Nugent
*
* Library: VNLib
* Package: ObjectCacheServer
-* File: BrokerHeartBeat.cs
+* File: BrokerHeartBeatEndpoint.cs
*
-* BrokerHeartBeat.cs is part of ObjectCacheServer which is part of the larger
+* BrokerHeartBeatEndpoint.cs is part of ObjectCacheServer which is part of the larger
* VNLib collection of libraries and utilities.
*
* ObjectCacheServer is free software: you can redistribute it and/or modify
@@ -26,11 +26,11 @@ using System;
using System.Net;
using System.Linq;
using System.Text.Json;
-using System.Threading;
using System.Threading.Tasks;
-using System.Collections.Generic;
+
using VNLib.Plugins;
+using VNLib.Utils.Logging;
using VNLib.Plugins.Essentials;
using VNLib.Hashing.IdentityUtility;
using VNLib.Plugins.Essentials.Endpoints;
@@ -39,14 +39,11 @@ using VNLib.Plugins.Extensions.Loading;
namespace VNLib.Data.Caching.ObjectCache.Server
{
- internal sealed class BrokerHeartBeat : ResourceEndpointBase
+ internal sealed class BrokerHeartBeatEndpoint : ResourceEndpointBase
{
- public override string Path => "/heartbeat";
-
- private readonly Func<string> Token;
- private readonly ManualResetEvent KeepaliveSet;
+ private readonly IBrokerHeartbeatNotifier _heartBeat;
private readonly Task<IPAddress[]> BrokerIpList;
- private readonly PluginBase Pbase;
+ private readonly bool DebugMode;
///<inheritdoc/>
protected override ProtectionSettings EndpointProtectionSettings { get; } = new()
@@ -55,19 +52,24 @@ namespace VNLib.Data.Caching.ObjectCache.Server
DisableSessionsRequired = true
};
- public BrokerHeartBeat(Func<string> token, ManualResetEvent keepaliveSet, Uri brokerUri, PluginBase pbase)
+ public BrokerHeartBeatEndpoint(PluginBase plugin)
{
- Token = token;
- KeepaliveSet = keepaliveSet;
- BrokerIpList = Dns.GetHostAddressesAsync(brokerUri.DnsSafeHost);
-
- this.Pbase = pbase;
- }
+ //Get debug flag
+ DebugMode = plugin.IsDebug();
- private async Task<ReadOnlyJsonWebKey> GetBrokerPubAsync()
- {
- return await Pbase.TryGetSecretAsync("broker_public_key").ToJsonWebKey() ?? throw new KeyNotFoundException("Missing required secret : broker_public_key");
+ //Get or create the current node config
+ _heartBeat = plugin.GetOrCreateSingleton<NodeConfig>();
+
+ /*
+ * Resolve the ip address of the broker and store it to verify connections
+ * later
+ */
+ BrokerIpList = Dns.GetHostAddressesAsync(_heartBeat.GetBrokerAddress().DnsSafeHost);
+
+ //Setup endpoint
+ InitPathAndLog("/heartbeat", plugin.Log);
}
+
protected override async ValueTask<VfReturnType> GetAsync(HttpEntity entity)
{
@@ -76,13 +78,20 @@ namespace VNLib.Data.Caching.ObjectCache.Server
{
//Load and verify the broker's ip address matches with an address we have stored
IPAddress[] addresses = await BrokerIpList;
+
if (!addresses.Contains(entity.TrustedRemoteIp))
{
+ if (DebugMode)
+ {
+ Log.Debug("Received connection {ip} that was not a DNS safe address for the broker server, access denied");
+ }
+
//Token invalid
entity.CloseResponse(HttpStatusCode.Forbidden);
return VfReturnType.VirtualSkip;
}
}
+
//Get the authorization jwt
string? jwtAuth = entity.Server.Headers[HttpRequestHeader.Authorization];
@@ -97,7 +106,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server
using JsonWebToken jwt = JsonWebToken.Parse(jwtAuth);
//Verify the jwt using the broker's public key certificate
- using (ReadOnlyJsonWebKey cert = await GetBrokerPubAsync())
+ using (ReadOnlyJsonWebKey cert = _heartBeat.GetBrokerPublicKey())
{
//Verify the jwt
if (!jwt.VerifyFromJwk(cert))
@@ -114,16 +123,24 @@ namespace VNLib.Data.Caching.ObjectCache.Server
{
auth = doc.RootElement.GetProperty("token").GetString();
}
-
+
+ //Get our stored token used for registration
+ string? selfToken = _heartBeat.GetAuthToken();
+
//Verify token
- if(Token().Equals(auth, StringComparison.Ordinal))
+ if (selfToken != null && selfToken.Equals(auth, StringComparison.Ordinal))
{
//Signal keepalive
- KeepaliveSet.Set();
+ _heartBeat.HearbeatReceived();
entity.CloseResponse(HttpStatusCode.OK);
return VfReturnType.VirtualSkip;
}
-
+
+ if (DebugMode)
+ {
+ Log.Debug("Invalid auth token recieved from broker sever, access denied");
+ }
+
//Token invalid
entity.CloseResponse(HttpStatusCode.Forbidden);
return VfReturnType.VirtualSkip;
diff --git a/plugins/ObjectCacheServer/src/Endpoints/CacheConfiguration.cs b/plugins/ObjectCacheServer/src/Endpoints/CacheConfiguration.cs
deleted file mode 100644
index f7adeb3..0000000
--- a/plugins/ObjectCacheServer/src/Endpoints/CacheConfiguration.cs
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
-* Copyright (c) 2023 Vaughn Nugent
-*
-* Library: VNLib
-* Package: ObjectCacheServer
-* File: CacheConfiguration.cs
-*
-* CacheConfiguration.cs is part of ObjectCacheServer which is part of the larger
-* VNLib collection of libraries and utilities.
-*
-* ObjectCacheServer is free software: you can redistribute it and/or modify
-* it under the terms of the GNU Affero General Public License as
-* published by the Free Software Foundation, either version 3 of the
-* License, or (at your option) any later version.
-*
-* ObjectCacheServer is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-* GNU Affero General Public License for more details.
-*
-* You should have received a copy of the GNU Affero General Public License
-* along with this program. If not, see https://www.gnu.org/licenses/.
-*/
-
-using System.Text.Json.Serialization;
-
-namespace VNLib.Data.Caching.ObjectCache.Server
-{
- internal sealed class CacheConfiguration
- {
- [JsonPropertyName("buffer_recv_max")]
- public int MaxRecvBufferSize { get; set; } = 1000 * 1024;
- [JsonPropertyName("buffer_recv_min")]
- public int MinRecvBufferSize { get; set; } = 8 * 1024;
-
-
- [JsonPropertyName("buffer_header_max")]
- public int MaxHeaderBufferSize { get; set; } = 2 * 1024;
- [JsonPropertyName("buffer_header_min")]
- public int MinHeaderBufferSize { get; set; } = 128;
-
-
- [JsonPropertyName("max_message_size")]
- public int MaxMessageSize { get; set; } = 1000 * 1024;
-
-
- [JsonPropertyName("change_queue_max_depth")]
- public int MaxEventQueueDepth { get; set; } = 10 * 1000;
-
-
- [JsonPropertyName("max_cache")]
- public uint MaxCacheEntries { get; set; } = 10000;
-
- [JsonPropertyName("buckets")]
- public uint BucketCount { get; set; } = 10;
- }
-}
diff --git a/plugins/ObjectCacheServer/src/Endpoints/CacheStore.cs b/plugins/ObjectCacheServer/src/Endpoints/CacheStore.cs
new file mode 100644
index 0000000..67db433
--- /dev/null
+++ b/plugins/ObjectCacheServer/src/Endpoints/CacheStore.cs
@@ -0,0 +1,91 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: ObjectCacheServer
+* File: ConnectEndpoint.cs
+*
+* ConnectEndpoint.cs is part of ObjectCacheServer which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* ObjectCacheServer is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* ObjectCacheServer is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+using VNLib.Utils.Logging;
+using VNLib.Plugins;
+using VNLib.Plugins.Extensions.Loading;
+
+namespace VNLib.Data.Caching.ObjectCache.Server
+{
+ [ConfigurationName("cache")]
+ sealed class CacheStore : ICacheStore, IDisposable
+ {
+ public BlobCacheListener Listener { get; }
+
+
+ public CacheStore(PluginBase plugin, IConfigScope config)
+ {
+ //Init cache
+ Listener = InitializeCache((ObjectCacheServerEntry)plugin, config);
+ }
+
+ ValueTask ICacheStore.AddOrUpdateBlobAsync<T>(string objectId, string? alternateId, GetBodyDataCallback<T> bodyData, T state, CancellationToken token)
+ {
+ return Listener.Cache.AddOrUpdateObjectAsync(objectId, alternateId, bodyData, state, default, token);
+ }
+
+ void ICacheStore.Clear()
+ {
+ throw new NotImplementedException();
+ }
+
+ ValueTask<bool> ICacheStore.DeleteItemAsync(string id, CancellationToken token)
+ {
+ return Listener.Cache.DeleteObjectAsync(id, token);
+ }
+
+ private static BlobCacheListener InitializeCache(ObjectCacheServerEntry plugin, IConfigScope config)
+ {
+ //Deserialize the cache config
+ CacheConfiguration cacheConf = config.Deserialze<CacheConfiguration>();
+
+ if (cacheConf.MaxCacheEntries < 2)
+ {
+ throw new ArgumentException("You must configure a 'max_cache' size larger than 1 item");
+ }
+
+ //Suggestion
+ if (cacheConf.MaxCacheEntries < 200)
+ {
+ plugin.Log.Information("Suggestion: You may want a larger cache size, you have less than 200 items in cache");
+ }
+
+ plugin.Log.Verbose("Creating cache store with {bc} buckets, with {mc} items/bucket", cacheConf.BucketCount, cacheConf.MaxCacheEntries);
+
+ //Load the blob cache table system
+ IBlobCacheTable bc = plugin.LoadMemoryCacheSystem(config, plugin.CacheHeap, cacheConf);
+
+ //Endpoint only allows for a single reader
+ return new(bc, plugin.Log, plugin.CacheHeap, true);
+ }
+
+ public void Dispose()
+ {
+ Listener.Dispose();
+ }
+ }
+}
diff --git a/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs b/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs
index 2f896bc..167a7e9 100644
--- a/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs
+++ b/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs
@@ -27,40 +27,46 @@ using System.Net;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
-using System.Threading.Channels;
using System.Collections.Generic;
-using System.Collections.Concurrent;
-using VNLib.Plugins;
using VNLib.Hashing;
using VNLib.Net.Http;
using VNLib.Utils.Async;
using VNLib.Utils.Memory;
using VNLib.Utils.Logging;
+using VNLib.Utils.Extensions;
using VNLib.Data.Caching;
+using VNLib.Data.Caching.Extensions;
using VNLib.Hashing.IdentityUtility;
using VNLib.Net.Messaging.FBM;
using VNLib.Net.Messaging.FBM.Client;
using VNLib.Net.Messaging.FBM.Server;
+using VNLib.Plugins;
using VNLib.Plugins.Essentials;
using VNLib.Plugins.Extensions.Loading;
using VNLib.Plugins.Essentials.Endpoints;
using VNLib.Plugins.Essentials.Extensions;
-
+using VNLib.Plugins.Extensions.Loading.Routing;
+using VNLib.Data.Caching.ObjectCache.Server.Distribution;
namespace VNLib.Data.Caching.ObjectCache.Server
{
- [ConfigurationName("store")]
- internal sealed class ConnectEndpoint : ResourceEndpointBase, IDisposable, IAsyncBackgroundWork
+ [ConfigurationName("connect_endpoint")]
+ internal sealed class ConnectEndpoint : ResourceEndpointBase, IAsyncBackgroundWork
{
- private static readonly TimeSpan AuthTokenExpiration = TimeSpan.FromSeconds(30);
+ private static readonly TimeSpan AuthTokenExpiration = TimeSpan.FromSeconds(30);
+
+
+ private readonly CacheNodeConfiguration NodeConfiguration;
+ private readonly ICacheEventQueueManager PubSubManager;
+ private readonly IPeerMonitor Peers;
- private readonly string AudienceLocalServerId;
private readonly BlobCacheListener Store;
- private readonly PluginBase Pbase;
+ private readonly CacheAuthKeyStore KeyStore;
- private readonly ConcurrentDictionary<string, AsyncQueue<ChangeEvent>> StatefulEventQueue;
+ private readonly bool VerifyIp;
+ private readonly string AudienceLocalServerId;
private uint _connectedClients;
@@ -87,27 +93,26 @@ namespace VNLib.Data.Caching.ObjectCache.Server
string? path = config["path"].GetString();
InitPathAndLog(path, plugin.Log);
-
- Pbase = plugin;
- //Parse cache config or use default
- if(config.TryGetValue("cache", out JsonElement confEl))
- {
- CacheConfig = confEl.Deserialize<CacheConfiguration>()!;
- }
- else
- {
- //Init default config if not fount
- CacheConfig = new();
+ KeyStore = new(plugin);
- Log.Verbose("Loading default cache buffer configuration");
- }
+ //Check for ip-verification flag
+ VerifyIp = config.TryGetValue("verify_ip", out JsonElement vIpEl) && vIpEl.GetBoolean();
- //Create event queue client lookup table
- StatefulEventQueue = new(StringComparer.OrdinalIgnoreCase);
+ //Setup pub/sub manager
+ PubSubManager = plugin.GetOrCreateSingleton<CacheEventQueueManager>();
+
+ //Get node configuration
+ NodeConfiguration = plugin.GetOrCreateSingleton<NodeConfig>().Config;
+
+ //Get peer monitor
+ Peers = plugin.GetOrCreateSingleton<CachePeerMonitor>();
//Init the cache store
- Store = InitializeCache((ObjectCacheServerEntry)plugin, CacheConfig, config);
+ Store = plugin.GetOrCreateSingleton<CacheStore>().Listener;
+
+ //Get the cache store configuration
+ CacheConfig = plugin.GetConfigForType<CacheStore>().Deserialze<CacheConfiguration>();
/*
* Generate a random guid for the current server when created so we
@@ -118,60 +123,8 @@ namespace VNLib.Data.Caching.ObjectCache.Server
//Schedule the queue worker to be run
_ = plugin.ObserveWork(this, 100);
}
-
-
- private static BlobCacheListener InitializeCache(ObjectCacheServerEntry plugin, CacheConfiguration cacheConf, IConfigScope config)
- {
- if(cacheConf.MaxCacheEntries < 2)
- {
- throw new ArgumentException("You must configure a 'max_cache' size larger than 1 item");
- }
-
- //Suggestion
- if(cacheConf.MaxCacheEntries < 200)
- {
- plugin.Log.Information("Suggestion: You may want a larger cache size, you have less than 200 items in cache");
- }
-
- plugin.Log.Verbose("Creating cache store with {bc} buckets, with {mc} items/bucket", cacheConf.BucketCount, cacheConf.MaxCacheEntries);
-
- //Load the blob cache table system
- IBlobCacheTable bc = plugin.LoadMemoryCacheSystem(config, plugin.CacheHeap, cacheConf);
-
- //Endpoint only allows for a single reader
- return new (bc, plugin.Log, plugin.CacheHeap, true);
- }
-
-
- /// <summary>
- /// Gets the configured cache store
- /// </summary>
- /// <returns></returns>
- public ICacheStore GetCacheStore() => new CacheStore(Store);
- //Dispose will be called by the host plugin on unload
- void IDisposable.Dispose()
- {
- //Dispose the store on cleanup
- Store.Dispose();
- }
-
-
- private async Task<ReadOnlyJsonWebKey> GetClientPubAsync()
- {
- return await Pbase.TryGetSecretAsync("client_public_key").ToJsonWebKey() ?? throw new KeyNotFoundException("Missing required secret : client_public_key");
- }
- private async Task<ReadOnlyJsonWebKey> GetCachePubAsync()
- {
- return await Pbase.TryGetSecretAsync("cache_public_key").ToJsonWebKey() ?? throw new KeyNotFoundException("Missing required secret : client_public_key");
- }
- private async Task<ReadOnlyJsonWebKey> GetCachePrivateKeyAsync()
- {
- return await Pbase.TryGetSecretAsync("cache_private_key").ToJsonWebKey() ?? throw new KeyNotFoundException("Missing required secret : client_public_key");
- }
-
-
/*
* Used as a client negotiation and verification request
*
@@ -183,6 +136,10 @@ namespace VNLib.Data.Caching.ObjectCache.Server
*
* The tokens are very short lived as requests are intended to be made
* directly after verification
+ *
+ * Clients must also sign the entire token with their private key and
+ * set the signature in the x-upgrade-sig header so we can verify they
+ * received the messages properly
*/
protected override async ValueTask<VfReturnType> GetAsync(HttpEntity entity)
@@ -205,7 +162,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server
bool verified = false;
//Get the client public key certificate to verify the client's message
- using(ReadOnlyJsonWebKey cert = await GetClientPubAsync())
+ using(ReadOnlyJsonWebKey cert = await KeyStore.GetClientPublicKeyAsync())
{
//verify signature for client
if (jwt.VerifyFromJwk(cert))
@@ -215,10 +172,8 @@ namespace VNLib.Data.Caching.ObjectCache.Server
//May be signed by a cache server
else
{
- using ReadOnlyJsonWebKey cacheCert = await GetCachePubAsync();
-
//Set peer and verified flag since the another cache server signed the request
- isPeer = verified = jwt.VerifyFromJwk(cacheCert);
+ isPeer = verified = NodeConfiguration.VerifyCache(jwt);
}
}
@@ -232,10 +187,12 @@ namespace VNLib.Data.Caching.ObjectCache.Server
//Recover json body
using JsonDocument doc = jwt.GetPayload();
+
if (doc.RootElement.TryGetProperty("sub", out JsonElement servIdEl))
{
nodeId = servIdEl.GetString();
}
+
if (doc.RootElement.TryGetProperty("chl", out JsonElement challengeEl))
{
challenge = challengeEl.GetString();
@@ -246,133 +203,147 @@ namespace VNLib.Data.Caching.ObjectCache.Server
//Verified, now we can create an auth message with a short expiration
using JsonWebToken auth = new();
+
+ auth.WriteHeader(NodeConfiguration.GetJwtHeader());
+ auth.InitPayloadClaim()
+ .AddClaim("aud", AudienceLocalServerId)
+ .AddClaim("exp", entity.RequestedTimeUtc.Add(AuthTokenExpiration).ToUnixTimeSeconds())
+ .AddClaim("nonce", RandomHash.GetRandomBase32(8))
+ .AddClaim("chl", challenge!)
+ //Set the ispeer flag if the request was signed by a cache server
+ .AddClaim("isPeer", isPeer)
+ //Specify the server's node id if set
+ .AddClaim("sub", nodeId!)
+ //Set ip address
+ .AddClaim("ip", entity.TrustedRemoteIp.ToString())
+ //Add negotiaion args
+ .AddClaim(FBMClient.REQ_HEAD_BUF_QUERY_ARG, CacheConfig.MaxHeaderBufferSize)
+ .AddClaim(FBMClient.REQ_RECV_BUF_QUERY_ARG, CacheConfig.MaxRecvBufferSize)
+ .AddClaim(FBMClient.REQ_MAX_MESS_QUERY_ARG, CacheConfig.MaxMessageSize)
+ .CommitClaims();
+
+ //Sign the auth message from our private key
+ NodeConfiguration.SignJwt(auth);
- //Sign the auth message from the cache certificate's private key
- using (ReadOnlyJsonWebKey cert = await GetCachePrivateKeyAsync())
- {
- auth.WriteHeader(cert.JwtHeader);
- auth.InitPayloadClaim()
- .AddClaim("aud", AudienceLocalServerId)
- .AddClaim("exp", entity.RequestedTimeUtc.Add(AuthTokenExpiration).ToUnixTimeSeconds())
- .AddClaim("nonce", RandomHash.GetRandomBase32(8))
- .AddClaim("chl", challenge!)
- //Set the ispeer flag if the request was signed by a cache server
- .AddClaim("isPeer", isPeer)
- //Specify the server's node id if set
- .AddClaim("sub", nodeId!)
- //Add negotiaion args
- .AddClaim(FBMClient.REQ_HEAD_BUF_QUERY_ARG, CacheConfig.MaxHeaderBufferSize)
- .AddClaim(FBMClient.REQ_RECV_BUF_QUERY_ARG, CacheConfig.MaxRecvBufferSize)
- .AddClaim(FBMClient.REQ_MAX_MESS_QUERY_ARG, CacheConfig.MaxMessageSize)
- .CommitClaims();
-
- auth.SignFromJwk(cert);
- }
-
//Close response
entity.CloseResponse(HttpStatusCode.OK, ContentType.Text, auth.DataBuffer);
return VfReturnType.VirtualSkip;
}
-
- //Background worker to process event queue items
- async Task IAsyncBackgroundWork.DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken)
+ protected override async ValueTask<VfReturnType> WebsocketRequestedAsync(HttpEntity entity)
{
- try
- {
- //Listen for changes
- while (true)
- {
- ChangeEvent ev = await Store.EventQueue.DequeueAsync(exitToken);
+ //Parse jwt from authorization
+ string? jwtAuth = entity.Server.Headers[HttpRequestHeader.Authorization];
- //Add event to queues
- foreach (AsyncQueue<ChangeEvent> queue in StatefulEventQueue.Values)
- {
- if (!queue.TryEnque(ev))
- {
- Log.Debug("Listener queue has exeeded capacity, change events will be lost");
- }
- }
- }
- }
- catch (OperationCanceledException)
+ if (string.IsNullOrWhiteSpace(jwtAuth))
{
- //Normal exit
+ entity.CloseResponse(HttpStatusCode.Unauthorized);
+ return VfReturnType.VirtualSkip;
}
- }
- private class WsUserState
- {
- public int RecvBufferSize { get; init; }
- public int MaxHeaderBufferSize { get; init; }
- public int MaxMessageSize { get; init; }
- public int MaxResponseBufferSize { get; init; }
- public AsyncQueue<ChangeEvent>? SyncQueue { get; init; }
+ //Get the upgrade signature header
+ string? clientSignature = entity.Server.Headers[FBMDataCacheExtensions.X_UPGRADE_SIG_HEADER];
- public override string ToString()
+ if (string.IsNullOrWhiteSpace(clientSignature))
{
- return
- $"{nameof(RecvBufferSize)}:{RecvBufferSize}, {nameof(MaxHeaderBufferSize)}: {MaxHeaderBufferSize}, {nameof(MaxMessageSize)}:{MaxMessageSize}, {nameof(MaxResponseBufferSize)}:{MaxResponseBufferSize}";
+ entity.CloseResponse(HttpStatusCode.Unauthorized);
+ return VfReturnType.VirtualSkip;
}
- }
- protected override async ValueTask<VfReturnType> WebsocketRequestedAsync(HttpEntity entity)
- {
- try
+ string? nodeId = null;
+ ICachePeerAdvertisment? discoveryAd = null;
+
+ //Parse jwt
+ using (JsonWebToken jwt = JsonWebToken.Parse(jwtAuth))
{
- //Parse jwt from authorization
- string? jwtAuth = entity.Server.Headers[HttpRequestHeader.Authorization];
+ //verify signature against the cache public key, since this server must have signed it
+ if (!NodeConfiguration.VerifyCache(jwt))
+ {
+ entity.CloseResponse(HttpStatusCode.Unauthorized);
+ return VfReturnType.VirtualSkip;
+ }
+
+ //Recover json body
+ using JsonDocument doc = jwt.GetPayload();
+
+ //Verify audience, expiration
- if (string.IsNullOrWhiteSpace(jwtAuth))
+ if (!doc.RootElement.TryGetProperty("aud", out JsonElement audEl) || !AudienceLocalServerId.Equals(audEl.GetString(), StringComparison.OrdinalIgnoreCase))
+ {
+ entity.CloseResponse(HttpStatusCode.Unauthorized);
+ return VfReturnType.VirtualSkip;
+ }
+
+ if (!doc.RootElement.TryGetProperty("exp", out JsonElement expEl)
+ || DateTimeOffset.FromUnixTimeSeconds(expEl.GetInt64()) < entity.RequestedTimeUtc)
{
entity.CloseResponse(HttpStatusCode.Unauthorized);
return VfReturnType.VirtualSkip;
}
-
- string? nodeId = null;
- //Parse jwt
- using (JsonWebToken jwt = JsonWebToken.Parse(jwtAuth))
+ //Check node ip address matches if required
+ if (VerifyIp)
{
- //Get the client public key certificate to verify the client's message
- using (ReadOnlyJsonWebKey cert = await GetCachePubAsync())
+ if (!doc.RootElement.TryGetProperty("ip", out JsonElement ipEl))
{
- //verify signature against the cache public key, since this server must have signed it
- if (!jwt.VerifyFromJwk(cert))
- {
- entity.CloseResponse(HttpStatusCode.Unauthorized);
- return VfReturnType.VirtualSkip;
- }
+ entity.CloseResponse(HttpStatusCode.Unauthorized);
+ return VfReturnType.VirtualSkip;
}
-
- //Recover json body
- using JsonDocument doc = jwt.GetPayload();
-
- //Verify audience, expiration
- if (!doc.RootElement.TryGetProperty("aud", out JsonElement audEl) || !AudienceLocalServerId.Equals(audEl.GetString(), StringComparison.OrdinalIgnoreCase))
+ string? clientIp = ipEl.GetString();
+ //Verify the client ip address matches the one in the token
+ if (clientIp == null || !IPAddress.TryParse(clientIp, out IPAddress? clientIpAddr) || !clientIpAddr.Equals(entity.TrustedRemoteIp))
{
entity.CloseResponse(HttpStatusCode.Unauthorized);
return VfReturnType.VirtualSkip;
}
+ }
+
+ //Check if the client is a peer
+ bool isPeer = doc.RootElement.TryGetProperty("isPeer", out JsonElement isPeerEl) && isPeerEl.GetBoolean();
- if (!doc.RootElement.TryGetProperty("exp", out JsonElement expEl)
- || DateTimeOffset.FromUnixTimeSeconds(expEl.GetInt64()) < entity.RequestedTimeUtc)
+ //The node id is optional and stored in the 'sub' field, ignore if the client is not a peer
+ if (isPeer && doc.RootElement.TryGetProperty("sub", out JsonElement servIdEl))
+ {
+ nodeId = servIdEl.GetString();
+ }
+
+ //Verify the signature the client included of the auth token
+
+ if (isPeer)
+ {
+ //Verify token signature against a fellow cache public key
+ if (!NodeConfiguration.VerifyUpgradeToken(clientSignature, jwtAuth))
{
entity.CloseResponse(HttpStatusCode.Unauthorized);
return VfReturnType.VirtualSkip;
}
- //Check if the client is a peer
- bool isPeer = doc.RootElement.TryGetProperty("isPeer", out JsonElement isPeerEl) && isPeerEl.GetBoolean();
+ //Try to get the node advertisement header
+ string? discoveryHeader = entity.Server.Headers[FBMDataCacheExtensions.X_NODE_DISCOVERY_HEADER];
- //The node id is optional and stored in the 'sub' field, ignore if the client is not a peer
- if (isPeer && doc.RootElement.TryGetProperty("sub", out JsonElement servIdEl))
+ //Verify the node advertisement header and publish it
+ if (!string.IsNullOrWhiteSpace(discoveryHeader))
{
- nodeId = servIdEl.GetString();
+ discoveryAd = NodeConfiguration.VerifyPeerAdvertisment(discoveryHeader);
}
}
-
+ else
+ {
+ //Not a peer, so verify against the client's public key
+ using ReadOnlyJsonWebKey clientPub = await KeyStore.GetClientPublicKeyAsync();
+
+ //Verify token signature
+ if (!FBMDataCacheExtensions.VerifyUpgradeToken(clientSignature, jwtAuth, clientPub))
+ {
+ entity.CloseResponse(HttpStatusCode.Unauthorized);
+ return VfReturnType.VirtualSkip;
+ }
+ }
+ }
+
+ try
+ {
//Get query config suggestions from the client
string recvBufCmd = entity.QueryArgs[FBMClient.REQ_RECV_BUF_QUERY_ARG];
string maxHeaderCharCmd = entity.QueryArgs[FBMClient.REQ_HEAD_BUF_QUERY_ARG];
@@ -382,34 +353,6 @@ namespace VNLib.Data.Caching.ObjectCache.Server
int recvBufSize = int.TryParse(recvBufCmd, out int rbs) ? rbs : CacheConfig.MinRecvBufferSize;
int maxHeadBufSize = int.TryParse(maxHeaderCharCmd, out int hbs) ? hbs : CacheConfig.MinHeaderBufferSize;
int maxMessageSize = int.TryParse(maxMessageSizeCmd, out int mxs) ? mxs : CacheConfig.MaxMessageSize;
-
- AsyncQueue<ChangeEvent>? nodeQueue = null;
-
- //The connection may be a caching server node, so get its node-id
- if (!string.IsNullOrWhiteSpace(nodeId))
- {
- /*
- * Store a new async queue, or get an old queue for the current node
- *
- * We should use a bounded queue and disacard LRU items, we also know
- * only a single writer is needed as the queue is processed on a single thread
- * and change events may be processed on mutliple threads.
- */
-
- BoundedChannelOptions queueOptions = new(CacheConfig.MaxEventQueueDepth)
- {
- AllowSynchronousContinuations = true,
- SingleReader = false,
- SingleWriter = true,
- //Drop oldest item in queue if full
- FullMode = BoundedChannelFullMode.DropOldest,
- };
-
- _ = StatefulEventQueue.TryAdd(nodeId, new(queueOptions));
-
- //Get the queue
- nodeQueue = StatefulEventQueue[nodeId];
- }
/*
* Buffer sizing can get messy as the response/resquest sizes can vary
@@ -434,7 +377,8 @@ namespace VNLib.Data.Caching.ObjectCache.Server
*/
MaxResponseBufferSize = (int)MemoryUtil.NearestPage(maxMessageSizeClamp),
- SyncQueue = nodeQueue
+ NodeId = nodeId,
+ Advertisment = discoveryAd
};
Log.Debug("Client recv buffer suggestion {recv}, header buffer size {head}, response buffer size {r}", recvBufCmd, maxHeaderCharCmd, state.MaxResponseBufferSize);
@@ -454,14 +398,19 @@ namespace VNLib.Data.Caching.ObjectCache.Server
private async Task WebsocketAcceptedAsync(WebSocketSession wss)
{
+ WsUserState state = (WsUserState)wss.UserState!;
+
+ //Notify peers of new connection
+ Peers.OnPeerConnected(state);
+
//Inc connected count
Interlocked.Increment(ref _connectedClients);
+
//Register plugin exit token to cancel the connected socket
- CancellationTokenRegistration reg = Pbase.UnloadToken.Register(wss.CancelAll);
+ CancellationTokenRegistration reg = this.GetPlugin().UnloadToken.Register(wss.CancelAll);
+
try
{
- WsUserState state = (wss.UserState as WsUserState)!;
-
//Init listener args from request
FBMListenerSessionParams args = new()
{
@@ -473,12 +422,33 @@ namespace VNLib.Data.Caching.ObjectCache.Server
HeaderEncoding = Helpers.DefaultEncoding,
};
- //Listen for requests
- await Store.ListenAsync(wss, args, state.SyncQueue);
+ //Check if the client is a peer node, if it is, subscribe to change events
+ if (!string.IsNullOrWhiteSpace(state.NodeId))
+ {
+ //Get the event queue for the current node
+ AsyncQueue<ChangeEvent> queue = PubSubManager.Subscribe(state);
+
+ try
+ {
+ //Begin listening for messages with a queue
+ await Store.ListenAsync(wss, args, queue);
+ }
+ finally
+ {
+ //ALAWYS Detatch listener
+ PubSubManager.Unsubscribe(state);
+ }
+ }
+ else
+ {
+ //Begin listening for messages without a queue
+ await Store.ListenAsync(wss, args, null);
+ }
}
catch (OperationCanceledException)
{
Log.Debug("Websocket connection was canceled");
+
//Disconnect the socket
await wss.CloseSocketOutputAsync(System.Net.WebSockets.WebSocketCloseStatus.NormalClosure, "unload", CancellationToken.None);
}
@@ -490,35 +460,68 @@ namespace VNLib.Data.Caching.ObjectCache.Server
{
//Dec connected count
Interlocked.Decrement(ref _connectedClients);
+
//Unregister the
reg.Unregister();
}
+
+ //Notify monitor of disconnect
+ Peers.OnPeerDisconnected(state);
+
Log.Debug("Server websocket exited");
}
-
- private sealed class CacheStore : ICacheStore
+
+ //Background worker to process event queue items
+ async Task IAsyncBackgroundWork.DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken)
{
- private readonly BlobCacheListener _cache;
+ const int accumulatorSize = 64;
- public CacheStore(BlobCacheListener cache)
+ try
{
- _cache = cache;
- }
+ //Accumulator for events
+ ChangeEvent[] accumulator = new ChangeEvent[accumulatorSize];
+ int ptr = 0;
- ValueTask ICacheStore.AddOrUpdateBlobAsync<T>(string objectId, string? alternateId, GetBodyDataCallback<T> bodyData, T state, CancellationToken token)
- {
- return _cache.Cache.AddOrUpdateObjectAsync(objectId, alternateId, bodyData, state, default, token);
- }
+ //Listen for changes
+ while (true)
+ {
+ //Wait for next event
+ accumulator[ptr++] = await Store.EventQueue.DequeueAsync(exitToken);
- void ICacheStore.Clear()
+ //try to accumulate more events until we can't anymore
+ while (Store.EventQueue.TryDequeue(out ChangeEvent? ev) && ptr < accumulatorSize)
+ {
+ accumulator[ptr++] = ev;
+ }
+
+ //Publish all events to subscribers
+ PubSubManager.PublishMultiple(accumulator.AsSpan(0, ptr));
+
+ //Reset pointer
+ ptr = 0;
+ }
+ }
+ catch (OperationCanceledException)
{
- throw new NotImplementedException();
+ //Normal exit
+ pluginLog.Debug("Change queue listener worker exited");
}
+ }
+
+ private class WsUserState : ICachePeer
+ {
+ public int RecvBufferSize { get; init; }
+ public int MaxHeaderBufferSize { get; init; }
+ public int MaxMessageSize { get; init; }
+ public int MaxResponseBufferSize { get; init; }
+ public string? NodeId { get; init; }
+ public ICachePeerAdvertisment? Advertisment { get; init; }
- ValueTask<bool> ICacheStore.DeleteItemAsync(string id, CancellationToken token)
+ public override string ToString()
{
- return _cache.Cache.DeleteObjectAsync(id, token);
+ return
+ $"{nameof(RecvBufferSize)}:{RecvBufferSize}, {nameof(MaxHeaderBufferSize)}: {MaxHeaderBufferSize}, {nameof(MaxMessageSize)}:{MaxMessageSize}, {nameof(MaxResponseBufferSize)}:{MaxResponseBufferSize}";
}
}
}
diff --git a/plugins/ObjectCacheServer/src/Endpoints/ICacheStore.cs b/plugins/ObjectCacheServer/src/Endpoints/ICacheStore.cs
deleted file mode 100644
index f911af9..0000000
--- a/plugins/ObjectCacheServer/src/Endpoints/ICacheStore.cs
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
-* Copyright (c) 2023 Vaughn Nugent
-*
-* Library: VNLib
-* Package: ObjectCacheServer
-* File: ICacheStore.cs
-*
-* ICacheStore.cs is part of ObjectCacheServer which is part of the larger
-* VNLib collection of libraries and utilities.
-*
-* ObjectCacheServer is free software: you can redistribute it and/or modify
-* it under the terms of the GNU Affero General Public License as
-* published by the Free Software Foundation, either version 3 of the
-* License, or (at your option) any later version.
-*
-* ObjectCacheServer is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-* GNU Affero General Public License for more details.
-*
-* You should have received a copy of the GNU Affero General Public License
-* along with this program. If not, see https://www.gnu.org/licenses/.
-*/
-
-using System.Threading;
-using System.Threading.Tasks;
-
-namespace VNLib.Data.Caching.ObjectCache.Server
-{
- internal interface ICacheStore
- {
- /// <summary>
- /// Asynchronously adds or updates an object in the store and optionally update's its id
- /// </summary>
- /// <param name="objectId">The current (or old) id of the object</param>
- /// <param name="alternateId">An optional id to update the blob to</param>
- /// <param name="bodyData">A callback that returns the data for the blob</param>
- /// <param name="state">The state parameter to pass to the data callback</param>
- /// <param name="token">A token to cancel the async operation</param>
- /// <returns>A value task that represents the async operation</returns>
- ValueTask AddOrUpdateBlobAsync<T>(string objectId, string? alternateId, GetBodyDataCallback<T> bodyData, T state, CancellationToken token = default);
-
- /// <summary>
- /// Clears all items from the store
- /// </summary>
- void Clear();
-
- /// <summary>
- /// Asynchronously deletes a previously stored item
- /// </summary>
- /// <param name="id">The id of the object to delete</param>
- /// <param name="token">A token to cancel the async lock await</param>
- /// <returns>A task that completes when the item has been deleted</returns>
- ValueTask<bool> DeleteItemAsync(string id, CancellationToken token = default);
- }
-}
diff --git a/plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs b/plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs
new file mode 100644
index 0000000..670d624
--- /dev/null
+++ b/plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs
@@ -0,0 +1,129 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: ObjectCacheServer
+* File: PeerDiscoveryEndpoint.cs
+*
+* PeerDiscoveryEndpoint.cs is part of ObjectCacheServer which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* ObjectCacheServer is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* ObjectCacheServer is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+using System.Linq;
+using System.Net;
+using System.Text.Json;
+using System.Threading.Tasks;
+
+using VNLib.Hashing;
+using VNLib.Hashing.IdentityUtility;
+using VNLib.Plugins;
+using VNLib.Plugins.Essentials;
+using VNLib.Plugins.Essentials.Endpoints;
+using VNLib.Plugins.Essentials.Extensions;
+using VNLib.Plugins.Extensions.Loading;
+using VNLib.Data.Caching.Extensions;
+using VNLib.Data.Caching.ObjectCache.Server.Distribution;
+
+namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
+{
+ [ConfigurationName("discovery_endpoint")]
+ internal sealed class PeerDiscoveryEndpoint : UnprotectedWebEndpoint
+ {
+ private readonly IPeerMonitor PeerMonitor;
+ private readonly NodeConfig Config;
+
+ public PeerDiscoveryEndpoint(PluginBase plugin, IConfigScope config)
+ {
+ string? path = config["path"].GetString();
+
+ InitPathAndLog(path, plugin.Log);
+
+ //Get the peer monitor
+ PeerMonitor = plugin.GetOrCreateSingleton<CachePeerMonitor>();
+
+ //Get the node config
+ Config = plugin.GetOrCreateSingleton<NodeConfig>();
+ }
+
+ protected override async ValueTask<VfReturnType> GetAsync(HttpEntity entity)
+ {
+ //Get auth token
+ string? authToken = entity.Server.Headers[HttpRequestHeader.Authorization];
+
+ if(string.IsNullOrWhiteSpace(authToken))
+ {
+ entity.CloseResponse(HttpStatusCode.Unauthorized);
+ return VfReturnType.VirtualSkip;
+ }
+
+ string subject = string.Empty;
+
+ //Parse auth token
+ using(JsonWebToken jwt = JsonWebToken.Parse(authToken))
+ {
+ //try to verify against cache node first
+ if (!Config.Config.VerifyCache(jwt))
+ {
+ //failed...
+
+ //try to verify against client key
+ using ReadOnlyJsonWebKey clientPub = await Config.KeyStore.GetClientPublicKeyAsync();
+
+ if (!jwt.VerifyFromJwk(clientPub))
+ {
+ //invalid token
+ entity.CloseResponse(HttpStatusCode.Unauthorized);
+ return VfReturnType.VirtualSkip;
+ }
+ }
+
+ using JsonDocument payload = jwt.GetPayload();
+
+ subject = payload.RootElement.GetProperty("sub").GetString() ?? string.Empty;
+ }
+
+ //Valid key, get peer list to send to client
+ ICachePeerAdvertisment[] peers = PeerMonitor.GetAllPeers()
+ .Where(static p => p.Advertisment != null)
+ .Select(static p => p.Advertisment!)
+ .ToArray();
+
+ //Build response jwt
+ using JsonWebToken response = new();
+
+ //set header from cache config
+ response.WriteHeader(Config.Config.GetJwtHeader());
+
+ response.InitPayloadClaim()
+ .AddClaim("iss", Config.Config.NodeId)
+ //Audience is the requestor id
+ .AddClaim("sub", subject)
+ .AddClaim("iat", entity.RequestedTimeUtc.ToUnixTimeSeconds())
+ //Send all peers as a json array
+ .AddClaim("peers", peers)
+ .AddClaim("nonce", RandomHash.GetRandomBase32(24))
+ .CommitClaims();
+
+ //Sign the response
+ Config.Config.SignJwt(response);
+
+ //Send response to client
+ entity.CloseResponse(HttpStatusCode.OK, Net.Http.ContentType.Text, response.DataBuffer);
+ return VfReturnType.VirtualSkip;
+ }
+ }
+}