aboutsummaryrefslogtreecommitdiff
path: root/plugins/ObjectCacheServer
diff options
context:
space:
mode:
authorLibravatar vnugent <public@vaughnnugent.com>2023-07-13 13:20:25 -0400
committerLibravatar vnugent <public@vaughnnugent.com>2023-07-13 13:20:25 -0400
commit2f674e79d42e7d36225fa9ac7ecefbc5bc62d325 (patch)
treec58999489f5391bc044e7a9bb3e557afe2860415 /plugins/ObjectCacheServer
parent1a8ab1457244d15b19ddcc94958f645f5ec2abc7 (diff)
Checkpoint, kind of working clustering
Diffstat (limited to 'plugins/ObjectCacheServer')
-rw-r--r--plugins/ObjectCacheServer/src/CacheAuthKeyStore.cs17
-rw-r--r--plugins/ObjectCacheServer/src/CacheEventQueueManager.cs6
-rw-r--r--plugins/ObjectCacheServer/src/CacheListenerPubQueue.cs25
-rw-r--r--plugins/ObjectCacheServer/src/CacheStore.cs27
-rw-r--r--plugins/ObjectCacheServer/src/CacheSystemUtil.cs2
-rw-r--r--plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs (renamed from plugins/ObjectCacheServer/src/Distribution/CacheNodeReplicationMaanger.cs)158
-rw-r--r--plugins/ObjectCacheServer/src/Clustering/CachePeerMonitor.cs (renamed from plugins/ObjectCacheServer/src/Distribution/CachePeerMonitor.cs)32
-rw-r--r--plugins/ObjectCacheServer/src/Clustering/ICachePeerAdapter.cs (renamed from plugins/ObjectCacheServer/src/Distribution/ICachePeerAdapter.cs)10
-rw-r--r--plugins/ObjectCacheServer/src/Clustering/IPeerMonitor.cs (renamed from plugins/ObjectCacheServer/src/Distribution/IPeerMonitor.cs)5
-rw-r--r--plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs268
-rw-r--r--plugins/ObjectCacheServer/src/Distribution/KnownPeerList.cs100
-rw-r--r--plugins/ObjectCacheServer/src/Distribution/PeerDiscoveryManager.cs161
-rw-r--r--plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs71
-rw-r--r--plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs29
-rw-r--r--plugins/ObjectCacheServer/src/Endpoints/WellKnownEndpoint.cs111
-rw-r--r--plugins/ObjectCacheServer/src/ICachePeer.cs4
-rw-r--r--plugins/ObjectCacheServer/src/NodeConfig.cs63
-rw-r--r--plugins/ObjectCacheServer/src/ObjectCacheServer.csproj5
-rw-r--r--plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs8
19 files changed, 666 insertions, 436 deletions
diff --git a/plugins/ObjectCacheServer/src/CacheAuthKeyStore.cs b/plugins/ObjectCacheServer/src/CacheAuthKeyStore.cs
index 6725fbe..5fc700b 100644
--- a/plugins/ObjectCacheServer/src/CacheAuthKeyStore.cs
+++ b/plugins/ObjectCacheServer/src/CacheAuthKeyStore.cs
@@ -32,7 +32,6 @@ using VNLib.Hashing.IdentityUtility;
using VNLib.Plugins.Extensions.Loading;
using VNLib.Data.Caching.Extensions;
-
namespace VNLib.Data.Caching.ObjectCache.Server
{
sealed record class CacheAuthKeyStore : ICacheAuthManager
@@ -59,9 +58,10 @@ namespace VNLib.Data.Caching.ObjectCache.Server
}
///<inheritdoc/>
- public bool VerifyJwt(JsonWebToken jwt)
+ public bool VerifyJwt(JsonWebToken jwt, bool isPeer)
{
- return jwt.VerifyFromJwk(_clientPub.Value);
+ //Verify from peer server or client
+ return isPeer ? jwt.VerifyFromJwk(_cachePriv.Value) : jwt.VerifyFromJwk(_clientPub.Value);
}
/// <summary>
@@ -85,7 +85,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server
}
//try to get the ecdsa alg for the signing key
- using ECDsa? ecdsa = _cachePriv.Value.GetECDsaPublicKey();
+ using ECDsa? ecdsa = _cachePriv.Value.GetECDsaPrivateKey();
if (ecdsa != null)
{
return ecdsa.SignHash(hash);
@@ -95,17 +95,20 @@ namespace VNLib.Data.Caching.ObjectCache.Server
}
///<inheritdoc/>
- public bool VerifyMessageHash(ReadOnlySpan<byte> hash, HashAlg alg, ReadOnlySpan<byte> signature)
+ public bool VerifyMessageHash(ReadOnlySpan<byte> hash, HashAlg alg, ReadOnlySpan<byte> signature, bool isPeer)
{
+ //Determine the key to verify against
+ ReadOnlyJsonWebKey jwk = isPeer ? _cachePriv.Value : _clientPub.Value;
+
//try to get the rsa alg for the signing key
- using RSA? rsa = _clientPub.Value.GetRSAPublicKey();
+ using RSA? rsa = jwk.GetRSAPublicKey();
if (rsa != null)
{
return rsa.VerifyHash(hash, signature, alg.GetAlgName(), RSASignaturePadding.Pkcs1);
}
//try to get the ecdsa alg for the signing key
- using ECDsa? ecdsa = _clientPub.Value.GetECDsaPublicKey();
+ using ECDsa? ecdsa = jwk.GetECDsaPublicKey();
if (ecdsa != null)
{
return ecdsa.VerifyHash(hash, signature);
diff --git a/plugins/ObjectCacheServer/src/CacheEventQueueManager.cs b/plugins/ObjectCacheServer/src/CacheEventQueueManager.cs
index 049069e..3827121 100644
--- a/plugins/ObjectCacheServer/src/CacheEventQueueManager.cs
+++ b/plugins/ObjectCacheServer/src/CacheEventQueueManager.cs
@@ -104,7 +104,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server
}
//Return the node's queue
- return nq.Queue;
+ return nq;
}
///<inheritdoc/>
@@ -183,8 +183,10 @@ namespace VNLib.Data.Caching.ObjectCache.Server
//Interval to purge stale subscribers
Task IIntervalScheduleable.OnIntervalAsync(ILogProvider log, CancellationToken cancellationToken)
{
- //Purge
+ log.Debug("Purging stale peer event queues");
+
PurgeStaleSubscribers();
+
return Task.CompletedTask;
}
diff --git a/plugins/ObjectCacheServer/src/CacheListenerPubQueue.cs b/plugins/ObjectCacheServer/src/CacheListenerPubQueue.cs
index 52b6abf..9c7388e 100644
--- a/plugins/ObjectCacheServer/src/CacheListenerPubQueue.cs
+++ b/plugins/ObjectCacheServer/src/CacheListenerPubQueue.cs
@@ -25,13 +25,13 @@
using System;
using System.Threading;
using System.Threading.Tasks;
+using System.Threading.Channels;
using VNLib.Utils.Async;
using VNLib.Utils.Logging;
using VNLib.Plugins;
using VNLib.Plugins.Extensions.Loading;
-
namespace VNLib.Data.Caching.ObjectCache.Server
{
internal sealed class CacheListenerPubQueue : ICacheListenerEventQueue, IAsyncBackgroundWork
@@ -44,10 +44,13 @@ namespace VNLib.Data.Caching.ObjectCache.Server
{
_queueManager = plugin.GetOrCreateSingleton<CacheEventQueueManager>();
_logProvider = plugin.Log;
- _listenerQueue = new AsyncQueue<ChangeEvent>(false, true);
-
- //Register processing worker
- _ = plugin.ObserveWork(this, 500);
+ _listenerQueue = new AsyncQueue<ChangeEvent>(new BoundedChannelOptions(10000)
+ {
+ AllowSynchronousContinuations = true,
+ FullMode = BoundedChannelFullMode.DropOldest,
+ SingleReader = true,
+ SingleWriter = false,
+ });
}
///<inheritdoc/>
@@ -59,25 +62,25 @@ namespace VNLib.Data.Caching.ObjectCache.Server
{
//Accumulator for events
ChangeEvent[] accumulator = new ChangeEvent[accumulatorSize];
- int ptr = 0;
+ int index = 0;
//Listen for changes
while (true)
{
//Wait for next event
- accumulator[ptr++] = await _listenerQueue.DequeueAsync(exitToken);
+ accumulator[index++] = await _listenerQueue.DequeueAsync(exitToken);
//try to accumulate more events until we can't anymore
- while (_listenerQueue.TryDequeue(out ChangeEvent? ev) && ptr < accumulatorSize)
+ while (_listenerQueue.TryDequeue(out ChangeEvent? ev) && index < accumulatorSize)
{
- accumulator[ptr++] = ev;
+ accumulator[index++] = ev;
}
//Publish all events to subscribers
- _queueManager.PublishMultiple(accumulator.AsSpan(0, ptr));
+ _queueManager.PublishMultiple(accumulator.AsSpan(0, index));
//Reset pointer
- ptr = 0;
+ index = 0;
}
}
catch (OperationCanceledException)
diff --git a/plugins/ObjectCacheServer/src/CacheStore.cs b/plugins/ObjectCacheServer/src/CacheStore.cs
index e7a7c63..c1d47f6 100644
--- a/plugins/ObjectCacheServer/src/CacheStore.cs
+++ b/plugins/ObjectCacheServer/src/CacheStore.cs
@@ -30,7 +30,6 @@ using VNLib.Utils.Logging;
using VNLib.Plugins;
using VNLib.Plugins.Extensions.Loading;
-
namespace VNLib.Data.Caching.ObjectCache.Server
{
[ConfigurationName("cache")]
@@ -62,6 +61,14 @@ namespace VNLib.Data.Caching.ObjectCache.Server
private static BlobCacheListener InitializeCache(ObjectCacheServerEntry plugin, IConfigScope config)
{
+ const string CacheConfigTemplate =
+@"
+Cache Configuration:
+ Max memory: {max} Mb
+ Buckets: {bc}
+ Entries per-bucket: {mc}
+";
+
//Deserialize the cache config
CacheConfiguration cacheConf = config.Deserialze<CacheConfiguration>();
@@ -74,22 +81,24 @@ namespace VNLib.Data.Caching.ObjectCache.Server
if (cacheConf.MaxCacheEntries < 200)
{
plugin.Log.Information("Suggestion: You may want a larger cache size, you have less than 200 items in cache");
- }
-
- plugin.Log.Verbose("Creating cache store with {bc} buckets, with {mc} items/bucket", cacheConf.BucketCount, cacheConf.MaxCacheEntries);
+ }
//calculate the max memory usage
ulong maxByteSize = ((ulong)cacheConf.MaxCacheEntries * (ulong)cacheConf.BucketCount * (ulong)cacheConf.MaxMessageSize);
- //Log max memory usage
- plugin.Log.Debug("Maxium memory consumption {mx}Mb", maxByteSize / (ulong)(1024 * 1000));
-
- //Load the blob cache table system
- IBlobCacheTable bc = plugin.LoadMemoryCacheSystem(config, plugin.CacheHeap, cacheConf);
+ //Log the cache config
+ plugin.Log.Information(CacheConfigTemplate,
+ maxByteSize / (ulong)(1024 * 1000),
+ cacheConf.BucketCount,
+ cacheConf.MaxCacheEntries
+ );
//Get the event listener
ICacheListenerEventQueue queue = plugin.GetOrCreateSingleton<CacheListenerPubQueue>();
+ //Load the blob cache table system
+ IBlobCacheTable bc = plugin.LoadMemoryCacheSystem(config, plugin.CacheHeap, cacheConf);
+
//Endpoint only allows for a single reader
return new(bc, queue, plugin.Log, plugin.CacheHeap);
}
diff --git a/plugins/ObjectCacheServer/src/CacheSystemUtil.cs b/plugins/ObjectCacheServer/src/CacheSystemUtil.cs
index 669b84f..f8aedae 100644
--- a/plugins/ObjectCacheServer/src/CacheSystemUtil.cs
+++ b/plugins/ObjectCacheServer/src/CacheSystemUtil.cs
@@ -70,7 +70,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server
else
{
//Default type
- table = GetInternalBlobCache(heap, cacheConf, pCManager);
+ table = GetInternalBlobCache(heap, cacheConf, pCManager);
}
//Initialize the subsystem from the cache table
diff --git a/plugins/ObjectCacheServer/src/Distribution/CacheNodeReplicationMaanger.cs b/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs
index a55e8e2..ffdd4f4 100644
--- a/plugins/ObjectCacheServer/src/Distribution/CacheNodeReplicationMaanger.cs
+++ b/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs
@@ -3,10 +3,10 @@
*
* Library: VNLib
* Package: ObjectCacheServer
-* File: ObjectCacheServerEntry.cs
+* File: CacheNodeReplicationMaanger.cs
*
-* ObjectCacheServerEntry.cs is part of ObjectCacheServer which is part of the larger
-* VNLib collection of libraries and utilities.
+* CacheNodeReplicationMaanger.cs is part of ObjectCacheServer which is part
+* of the larger VNLib collection of libraries and utilities.
*
* ObjectCacheServer is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
@@ -35,52 +35,91 @@ using static VNLib.Data.Caching.Constants;
using VNLib.Net.Messaging.FBM;
using VNLib.Net.Messaging.FBM.Client;
using VNLib.Plugins.Extensions.Loading;
+using VNLib.Data.Caching.Extensions.Clustering;
-namespace VNLib.Data.Caching.ObjectCache.Server.Distribution
+namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
{
+
+ /*
+ * This class is responsible for replicating the cache with other nodes.
+ *
+ * It does this by connecting to other nodes and listening for change events.
+ * When a change event occurs, it takes action against the local cache store,
+ * to keep it consistent with the other nodes.
+ *
+ * Change events are only handled first-hand, meaning that events do not
+ * propagate to other nodes, they must be connected individually to each node
+ * and listen for changes.
+ */
+
internal sealed class CacheNodeReplicationMaanger : IAsyncBackgroundWork
{
+ private const string LOG_SCOPE_NAME = "REPL";
+
private static readonly TimeSpan GetItemTimeout = TimeSpan.FromSeconds(10);
+ private const int MAX_MESSAGE_SIZE = 12 * 1024;
- private readonly NodeConfig NodeConfig;
- private readonly ICachePeerAdapter PeerAdapter;
- private readonly ICacheStore CacheStore;
- private readonly FBMClientConfig ClientConfig;
- private readonly PluginBase Plugin;
+ private readonly PluginBase _plugin;
+ private readonly ILogProvider _log;
+ private readonly NodeConfig _nodeConfig;
+ private readonly ICacheStore _cacheStore;
+ private readonly ICachePeerAdapter _peerAdapter;
+ private readonly FBMClientConfig _replicationClientConfig;
+
+ private readonly bool _isDebug;
- private CacheNodeConfiguration CacheConfig => NodeConfig.Config;
+ private int _openConnections;
public CacheNodeReplicationMaanger(PluginBase plugin)
{
//Load the node config
- NodeConfig = plugin.GetOrCreateSingleton<NodeConfig>();
-
- //Get peer adapter
- PeerAdapter = plugin.GetOrCreateSingleton<PeerDiscoveryManager>();
-
- CacheStore = plugin.GetOrCreateSingleton<CacheStore>();
+ _nodeConfig = plugin.GetOrCreateSingleton<NodeConfig>();
+ _cacheStore = plugin.GetOrCreateSingleton<CacheStore>();
+ _peerAdapter = plugin.GetOrCreateSingleton<PeerDiscoveryManager>();
+
+ //Init fbm config with fixed message size
+ _replicationClientConfig = FBMDataCacheExtensions.GetDefaultConfig(
+ (plugin as ObjectCacheServerEntry)!.CacheHeap,
+ MAX_MESSAGE_SIZE,
+ debugLog: plugin.IsDebug() ? plugin.Log : null
+ );
+
+ _plugin = plugin;
+ _isDebug = plugin.IsDebug();
+ _log = plugin.Log.CreateScope(LOG_SCOPE_NAME);
}
public async Task DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken)
{
- pluginLog.Information("[REPL] Initializing node replication worker");
+ _log.Information("Initializing node replication worker");
try
{
while (true)
{
//Get all new peers
- ICacheNodeAdvertisment[] peers = PeerAdapter.GetNewPeers();
+ CacheNodeAdvertisment[] peers = _peerAdapter.GetNewPeers();
- if (peers.Length == 0)
+ if (peers.Length == 0 && _isDebug)
{
- pluginLog.Verbose("[REPL] No new peers to connect to");
+ _log.Verbose("No new peers to connect to");
}
- //Connect to each peer as a background task
- foreach (ICacheNodeAdvertisment peer in peers)
+ //Make sure we don't exceed the max connections
+ if(_openConnections >= _nodeConfig.MaxPeerConnections)
{
- _ = Plugin.ObserveWork(() => OnNewPeerDoWorkAsync(peer, pluginLog, exitToken));
+ if (_isDebug)
+ {
+ _log.Verbose("Max peer connections reached, waiting for a connection to close");
+ }
+ }
+ else
+ {
+ //Connect to each peer as a background task
+ foreach (CacheNodeAdvertisment peer in peers)
+ {
+ _ = _plugin.ObserveWork(() => OnNewPeerDoWorkAsync(peer, _log, exitToken));
+ }
}
//Wait for a new peers
@@ -93,7 +132,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Distribution
}
catch
{
- pluginLog.Error("[REPL] Node replication worker exited with an error");
+ _log.Error("Node replication worker exited with an error");
throw;
}
finally
@@ -101,25 +140,27 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Distribution
}
- pluginLog.Information("[REPL] Node replication worker exited");
+ _log.Information("Node replication worker exited");
}
- private async Task OnNewPeerDoWorkAsync(ICacheNodeAdvertisment newPeer, ILogProvider log, CancellationToken exitToken)
+ private async Task OnNewPeerDoWorkAsync(CacheNodeAdvertisment newPeer, ILogProvider log, CancellationToken exitToken)
{
_ = newPeer ?? throw new ArgumentNullException(nameof(newPeer));
//Setup client
- FBMClient client = new(ClientConfig);
+ FBMClient client = new(_replicationClientConfig);
//Add peer to monitor
- PeerAdapter.OnPeerListenerAttached(newPeer);
+ _peerAdapter.OnPeerListenerAttached(newPeer);
+
+ Interlocked.Increment(ref _openConnections);
try
{
log.Information("Establishing replication connection to peer {server}...", newPeer.NodeId);
//Connect to the server
- await client.ConnectToCacheAsync(newPeer, CacheConfig, exitToken);
+ await client.ConnectToCacheAsync(newPeer, _nodeConfig.Config, exitToken);
log.Information("Connected to {server}, starting queue listeners", newPeer.NodeId);
@@ -176,16 +217,21 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Distribution
}
finally
{
+ Interlocked.Decrement(ref _openConnections);
+
client.Dispose();
//Notify monitor of disconnect
- PeerAdapter.OnPeerListenerDetatched(newPeer);
+ _peerAdapter.OnPeerListenerDetatched(newPeer);
}
}
//Wroker task callback method
private async Task ReplicationWorkerDoWorkAsync(FBMClient client, ILogProvider log, CancellationToken exitToken)
{
+ //Reusable request message
+ using FBMRequest request = new(client.Config);
+
//Listen for changes
while (true)
{
@@ -197,53 +243,47 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Distribution
switch (changedObject.Status)
{
case ResponseCodes.NotFound:
- log.Warn("Server cache not properly configured, worker exiting");
+ log.Error("Server cache not properly configured, worker exiting");
return;
case "deleted":
//Delete the object from the store
- await CacheStore.DeleteItemAsync(changedObject.CurrentId, CancellationToken.None);
+ await _cacheStore.DeleteItemAsync(changedObject.CurrentId, CancellationToken.None);
break;
case "modified":
//Reload the record from the store
- await UpdateRecordAsync(client, log, changedObject.CurrentId, changedObject.NewId, exitToken);
+ await UpdateRecordAsync(client, request, log, changedObject.CurrentId, changedObject.NewId, exitToken);
break;
}
+
+ //Reset request message
+ request.Reset();
}
}
- private async Task UpdateRecordAsync(FBMClient client, ILogProvider log, string objectId, string newId, CancellationToken cancellation)
+ private async Task UpdateRecordAsync(FBMClient client, FBMRequest modRequest, ILogProvider log, string objectId, string newId, CancellationToken cancellation)
{
- //Get request message
- FBMRequest modRequest = client.RentRequest();
- try
- {
- //Set action as get/create
- modRequest.WriteHeader(HeaderCommand.Action, Actions.Get);
- //Set session-id header
- modRequest.WriteHeader(ObjectId, string.IsNullOrWhiteSpace(newId) ? objectId : newId);
+ //Set action as get/create
+ modRequest.WriteHeader(HeaderCommand.Action, Actions.Get);
+ //Set session-id header
+ modRequest.WriteHeader(ObjectId, string.IsNullOrWhiteSpace(newId) ? objectId : newId);
- //Make request
- using FBMResponse response = await client.SendAsync(modRequest, GetItemTimeout, cancellation);
+ //Make request
+ using FBMResponse response = await client.SendAsync(modRequest, GetItemTimeout, cancellation);
- response.ThrowIfNotSet();
+ response.ThrowIfNotSet();
- //Check response code
- string status = response.Headers.First(static s => s.Header == HeaderCommand.Status).Value.ToString();
+ //Check response code
+ string status = response.Headers.First(static s => s.Header == HeaderCommand.Status).Value.ToString();
- if (ResponseCodes.Okay.Equals(status, StringComparison.Ordinal))
- {
- //Update the record
- await CacheStore.AddOrUpdateBlobAsync(objectId, newId, static (t) => t.ResponseBody, response, cancellation);
- log.Debug("Updated object {id}", objectId);
- }
- else
- {
- log.Warn("Object {id} was missing on the remote server", objectId);
- }
+ if (ResponseCodes.Okay.Equals(status, StringComparison.Ordinal))
+ {
+ //Update the record
+ await _cacheStore.AddOrUpdateBlobAsync(objectId, newId, static (t) => t.ResponseBody, response, cancellation);
+ log.Debug("Updated object {id}", objectId);
}
- finally
+ else
{
- client.ReturnRequest(modRequest);
+ log.Warn("Object {id} was missing on the remote server", objectId);
}
}
}
diff --git a/plugins/ObjectCacheServer/src/Distribution/CachePeerMonitor.cs b/plugins/ObjectCacheServer/src/Clustering/CachePeerMonitor.cs
index f191c9d..c49a54b 100644
--- a/plugins/ObjectCacheServer/src/Distribution/CachePeerMonitor.cs
+++ b/plugins/ObjectCacheServer/src/Clustering/CachePeerMonitor.cs
@@ -23,21 +23,38 @@
*/
using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
using System.Collections.Generic;
+using VNLib.Utils;
+using VNLib.Utils.Extensions;
using VNLib.Plugins;
-namespace VNLib.Data.Caching.ObjectCache.Server.Distribution
+namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
{
- internal sealed class CachePeerMonitor : IPeerMonitor
+ internal sealed class CachePeerMonitor : VnDisposeable, IPeerMonitor
{
private readonly LinkedList<ICachePeer> peers = new();
+ private readonly ManualResetEvent newPeerTrigger = new (false);
public CachePeerMonitor(PluginBase plugin)
{ }
+ /// <summary>
+ /// Waits for new peers to connect to the server
+ /// </summary>
+ /// <returns>A task that complets when a new peer has connected</returns>
+ public async Task WaitForChangeAsync()
+ {
+ await newPeerTrigger.WaitAsync();
+
+ //Reset the trigger for next call
+ newPeerTrigger.Reset();
+ }
+
///<inheritdoc/>
public IEnumerable<ICachePeer> GetAllPeers()
{
@@ -55,6 +72,12 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Distribution
{
peers.AddLast(peer);
}
+
+ //Trigger monitor when change occurs
+ if(peer.Advertisment != null)
+ {
+ newPeerTrigger.Set();
+ }
}
///<inheritdoc/>
@@ -66,5 +89,10 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Distribution
peers.Remove(peer);
}
}
+
+ protected override void Free()
+ {
+ newPeerTrigger.Dispose();
+ }
}
}
diff --git a/plugins/ObjectCacheServer/src/Distribution/ICachePeerAdapter.cs b/plugins/ObjectCacheServer/src/Clustering/ICachePeerAdapter.cs
index c3fb022..dd426f6 100644
--- a/plugins/ObjectCacheServer/src/Distribution/ICachePeerAdapter.cs
+++ b/plugins/ObjectCacheServer/src/Clustering/ICachePeerAdapter.cs
@@ -22,9 +22,9 @@
* along with this program. If not, see https://www.gnu.org/licenses/.
*/
-using VNLib.Data.Caching.Extensions;
+using VNLib.Data.Caching.Extensions.Clustering;
-namespace VNLib.Data.Caching.ObjectCache.Server.Distribution
+namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
{
internal interface ICachePeerAdapter
{
@@ -32,18 +32,18 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Distribution
/// Gets the peers that have been discovered but not yet connected to
/// </summary>
/// <returns>A collection of peers that have not been connected to yet</returns>
- ICacheNodeAdvertisment[] GetNewPeers();
+ CacheNodeAdvertisment[] GetNewPeers();
/// <summary>
/// Called when a peer has been connected to
/// </summary>
/// <param name="peer">The peer that has been connected</param>
- void OnPeerListenerAttached(ICacheNodeAdvertisment peer);
+ void OnPeerListenerAttached(CacheNodeAdvertisment peer);
/// <summary>
/// Called when a peer has been disconnected from
/// </summary>
/// <param name="peer">The disconnected peer</param>
- void OnPeerListenerDetatched(ICacheNodeAdvertisment peer);
+ void OnPeerListenerDetatched(CacheNodeAdvertisment peer);
}
}
diff --git a/plugins/ObjectCacheServer/src/Distribution/IPeerMonitor.cs b/plugins/ObjectCacheServer/src/Clustering/IPeerMonitor.cs
index 028171f..d8f358f 100644
--- a/plugins/ObjectCacheServer/src/Distribution/IPeerMonitor.cs
+++ b/plugins/ObjectCacheServer/src/Clustering/IPeerMonitor.cs
@@ -24,7 +24,7 @@
using System.Collections.Generic;
-namespace VNLib.Data.Caching.ObjectCache.Server.Distribution
+namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
{
/// <summary>
/// Represents a monitor for peer cache servers to advertise their presence
@@ -33,7 +33,8 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Distribution
internal interface IPeerMonitor
{
/// <summary>
- /// Notifies the monitor that a peer has connected to the cluster
+ /// Notifies the monitor that a remote peer has connected to the current node for
+ /// replication
/// </summary>
/// <param name="peer">The peer that connected</param>
void OnPeerConnected(ICachePeer peer);
diff --git a/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs b/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs
new file mode 100644
index 0000000..f132cab
--- /dev/null
+++ b/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs
@@ -0,0 +1,268 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: ObjectCacheServer
+* File: PeerDiscoveryManager.cs
+*
+* PeerDiscoveryManager.cs is part of ObjectCacheServer which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* ObjectCacheServer is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* ObjectCacheServer is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+using System.Linq;
+using System.Net.Http;
+using System.Threading;
+using System.Net.Sockets;
+using System.Threading.Tasks;
+using System.Collections.Generic;
+
+using VNLib.Utils.Logging;
+using VNLib.Data.Caching.Extensions;
+using VNLib.Plugins;
+using VNLib.Plugins.Extensions.Loading;
+using VNLib.Data.Caching.Extensions.Clustering;
+
+namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
+{
+
+ /*
+ * This class is responsible for resolving and discovering peer nodes in the cluster network.
+ */
+
+ internal sealed class PeerDiscoveryManager : IAsyncBackgroundWork, ICachePeerAdapter
+ {
+ private const string LOG_SCOPE_NAME = "DISC";
+ private static readonly TimeSpan InitialDelay = TimeSpan.FromSeconds(15);
+
+ private readonly List<CacheNodeAdvertisment> _connectedPeers;
+ private readonly NodeConfig _config;
+ private readonly CachePeerMonitor _monitor;
+ private readonly bool IsDebug;
+ private readonly ILogProvider _log;
+
+ public PeerDiscoveryManager(PluginBase plugin)
+ {
+ //Get config
+ _config = plugin.GetOrCreateSingleton<NodeConfig>();
+
+ //Get the known peers array from config, its allowed to be null for master nodes
+ IConfigScope? config = plugin.TryGetConfig("known_peers");
+ string[] kownPeers = config?.Deserialze<string[]>() ?? Array.Empty<string>();
+
+ //Add known peers to the monitor
+ _config.Config.WithInitialPeers(kownPeers.Select(static s => new Uri(s)));
+
+ plugin.Log.Information("Inital peer nodes: {nodes}", kownPeers);
+
+ //Get the peer monitor
+ _monitor = plugin.GetOrCreateSingleton<CachePeerMonitor>();
+
+ _connectedPeers = new();
+
+ //Create scoped logger
+ _log = plugin.Log.CreateScope(LOG_SCOPE_NAME);
+
+ //Setup discovery error handler
+ _config.Config.WithErrorHandler(new ErrorHandler(_log));
+
+ IsDebug = plugin.IsDebug();
+ }
+
+ async Task IAsyncBackgroundWork.DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken)
+ {
+ /*
+ * This loop uses the peer monitor to keep track of all connected peers, then gets
+ * all the advertising peers (including known peers) and resolves all nodes across
+ * the network.
+ */
+
+ //Start the change listener
+ Task watcher = WatchForPeersAsync(exitToken);
+
+ _log.Information("Node discovery worker started, waiting for {idel} to start initial discovery", InitialDelay);
+
+ try
+ {
+ //Wait for the initial delay
+ await Task.Delay(InitialDelay, exitToken);
+
+ _log.Debug("Begining discovery loop");
+
+ /*
+ * To avoid connecting to ourself, we add ourselves to the connected list
+ * and it should never get removed. This is because the monitor will never
+ * report our own advertisment.
+ */
+ _connectedPeers.Add(_config.Config.Advertisment);
+
+ while (true)
+ {
+ try
+ {
+ if (IsDebug)
+ {
+ _log.Debug("Begining node discovery");
+ }
+
+ //Resolve all known peers
+ CacheNodeAdvertisment[] wellKnown = await _config.Config.ResolveWellKnownAsync(exitToken);
+
+ //Use the monitor to get the initial peers
+ IEnumerable<CacheNodeAdvertisment> ads = GetMonitorAds();
+
+ //Combine well-known with new connected peers
+ CacheNodeAdvertisment[] allAds = ads.Union(wellKnown).ToArray();
+
+ if (allAds.Length > 0)
+ {
+ //Discover all kown nodes
+ await _config.Config.DiscoverNodesAsync(allAds, exitToken);
+ }
+
+ //Log the discovered nodes if verbose logging is enabled
+ if (IsDebug)
+ {
+ CacheNodeAdvertisment[] found = _config.Config.NodeCollection.GetAllNodes();
+
+ _log.Debug("Discovered {count} nodes\n\t{nodes}", found.Length, found.Select(static s => s.NodeId));
+ }
+ }
+ catch(OperationCanceledException)
+ {
+ throw;
+ }
+ catch (Exception ex)
+ {
+ _log.Error(ex, "Failed to discover new peer nodes");
+ }
+
+ //Delay the next discovery
+ await Task.Delay(_config.DiscoveryInterval, exitToken);
+ }
+ }
+ catch (OperationCanceledException)
+ {
+ //Normal exit
+ _log.Information("Node discovery worker exiting");
+ }
+ finally
+ {
+
+ }
+
+ //Wait for the watcher to exit
+ await watcher.ConfigureAwait(false);
+ }
+
+ private IEnumerable<CacheNodeAdvertisment> GetMonitorAds()
+ {
+ return _monitor.GetAllPeers()
+ .Where(static p => p.Advertisment != null)
+ //Without us
+ .Where(n => n.NodeId != _config.Config.NodeId)
+ .Select(static p => p.Advertisment!);
+ }
+
+ //Wait for new peers and update the collection
+ private async Task WatchForPeersAsync(CancellationToken cancellation)
+ {
+ try
+ {
+ _log.Debug("Discovery worker waiting for new peers to connect");
+
+ while (true)
+ {
+
+ //Wait for changes, then get new peers
+ await _monitor.WaitForChangeAsync().WaitAsync(cancellation);
+
+ _log.Verbose("New peers connected");
+
+ //Use the monitor to get the initial peers
+ IEnumerable<CacheNodeAdvertisment> ads = GetMonitorAds();
+
+ ((NodeDiscoveryCollection)_config.Config.NodeCollection).AddManualNodes(ads);
+ }
+ }
+ catch (OperationCanceledException)
+ {
+ //Normal ext
+ _log.Debug("Connected peer listener exited");
+ }
+ }
+
+
+ ///<inheritdoc/>
+ public CacheNodeAdvertisment[] GetNewPeers()
+ {
+ lock (_connectedPeers)
+ {
+ //Get all discovered peers
+ CacheNodeAdvertisment[] peers = _config.Config.NodeCollection.GetAllNodes();
+
+ //Get the difference between the discovered peers and the connected peers
+ return peers.Except(_connectedPeers).ToArray();
+ }
+ }
+
+ ///<inheritdoc/>
+ public void OnPeerListenerAttached(CacheNodeAdvertisment peer)
+ {
+ lock (_connectedPeers)
+ {
+ //Add to connected peers
+ _connectedPeers.Add(peer);
+ }
+ }
+
+ ///<inheritdoc/>
+ public void OnPeerListenerDetatched(CacheNodeAdvertisment peer)
+ {
+ //remove from connected peers
+ lock (_connectedPeers)
+ {
+ _connectedPeers.Remove(peer);
+ }
+ }
+
+
+ private sealed record class ErrorHandler(ILogProvider Logger) : ICacheDiscoveryErrorHandler
+ {
+ public void OnDiscoveryError(CacheNodeAdvertisment errorNode, Exception ex)
+ {
+
+ if (ex is HttpRequestException hre)
+ {
+ if (hre.InnerException is SocketException se)
+ {
+ //traisnport failed
+ Logger.Warn("Failed to connect to server {serv} because {err}", errorNode, se.Message);
+ }
+ else
+ {
+ Logger.Error("Failed to connect to node {n}\n{err}", errorNode, hre);
+ }
+ }
+ else
+ {
+ Logger.Error("Failed to discover nodes from nodeid {nid}, with error\n{err}", errorNode, ex);
+ }
+
+ }
+ }
+ }
+}
diff --git a/plugins/ObjectCacheServer/src/Distribution/KnownPeerList.cs b/plugins/ObjectCacheServer/src/Distribution/KnownPeerList.cs
deleted file mode 100644
index 74df81f..0000000
--- a/plugins/ObjectCacheServer/src/Distribution/KnownPeerList.cs
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
-* Copyright (c) 2023 Vaughn Nugent
-*
-* Library: VNLib
-* Package: ObjectCacheServer
-* File: KnownPeerList.cs
-*
-* KnownPeerList.cs is part of ObjectCacheServer which is part of the larger
-* VNLib collection of libraries and utilities.
-*
-* ObjectCacheServer is free software: you can redistribute it and/or modify
-* it under the terms of the GNU Affero General Public License as
-* published by the Free Software Foundation, either version 3 of the
-* License, or (at your option) any later version.
-*
-* ObjectCacheServer is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-* GNU Affero General Public License for more details.
-*
-* You should have received a copy of the GNU Affero General Public License
-* along with this program. If not, see https://www.gnu.org/licenses/.
-*/
-
-using System;
-using System.Linq;
-using System.Collections.Generic;
-using System.Text.Json.Serialization;
-
-using VNLib.Plugins;
-using VNLib.Data.Caching.Extensions;
-using VNLib.Plugins.Extensions.Loading;
-
-
-namespace VNLib.Data.Caching.ObjectCache.Server.Distribution
-{
- [ConfigurationName("known_peers")]
- internal sealed class KnownPeerList
- {
- private readonly List<KnownPeer> _peers;
-
- public KnownPeerList(PluginBase plugin, IConfigScope config)
- {
- //Deserialze the known peers into an array
- KnownPeer[] peers = config.Deserialze<KnownPeer[]>();
-
- foreach (KnownPeer peer in peers)
- {
- //Validate the peer
- peer.Validate();
- }
-
- _peers = peers?.ToList() ?? new();
- }
-
- public IEnumerable<ICacheNodeAdvertisment> GetPeers()
- {
- return _peers;
- }
-
- private sealed class KnownPeer : ICacheNodeAdvertisment
- {
- public Uri? ConnectEndpoint { get; set; }
- public Uri? DiscoveryEndpoint { get; set; }
-
- [JsonPropertyName("node_id")]
- public string NodeId { get; set; }
-
- [JsonPropertyName("connect_url")]
- public string? ConnectEpPath
- {
- get => ConnectEndpoint?.ToString() ?? string.Empty;
- set => ConnectEndpoint = new Uri(value ?? string.Empty);
- }
-
- [JsonPropertyName("discovery_url")]
- public string? DiscoveryEpPath
- {
- get => DiscoveryEndpoint?.ToString() ?? string.Empty;
- set => DiscoveryEndpoint = new Uri(value ?? string.Empty);
- }
-
- public void Validate()
- {
- if (string.IsNullOrWhiteSpace(NodeId))
- {
- throw new ArgumentException("Node ID cannot be null or whitespace");
- }
- if (ConnectEndpoint is null)
- {
- throw new ArgumentException("Connect endpoint cannot be null");
- }
- if (DiscoveryEndpoint is null)
- {
- throw new ArgumentException("Discovery endpoint cannot be null");
- }
- }
- }
- }
-}
diff --git a/plugins/ObjectCacheServer/src/Distribution/PeerDiscoveryManager.cs b/plugins/ObjectCacheServer/src/Distribution/PeerDiscoveryManager.cs
deleted file mode 100644
index 26ec565..0000000
--- a/plugins/ObjectCacheServer/src/Distribution/PeerDiscoveryManager.cs
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
-* Copyright (c) 2023 Vaughn Nugent
-*
-* Library: VNLib
-* Package: ObjectCacheServer
-* File: PeerDiscoveryManager.cs
-*
-* PeerDiscoveryManager.cs is part of ObjectCacheServer which is part of the larger
-* VNLib collection of libraries and utilities.
-*
-* ObjectCacheServer is free software: you can redistribute it and/or modify
-* it under the terms of the GNU Affero General Public License as
-* published by the Free Software Foundation, either version 3 of the
-* License, or (at your option) any later version.
-*
-* ObjectCacheServer is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-* GNU Affero General Public License for more details.
-*
-* You should have received a copy of the GNU Affero General Public License
-* along with this program. If not, see https://www.gnu.org/licenses/.
-*/
-
-using System;
-using System.Linq;
-using System.Threading;
-using System.Threading.Tasks;
-using System.Collections.Generic;
-
-using VNLib.Plugins;
-using VNLib.Utils.Logging;
-using VNLib.Data.Caching.Extensions;
-using VNLib.Plugins.Extensions.Loading;
-
-
-namespace VNLib.Data.Caching.ObjectCache.Server.Distribution
-{
-
- sealed class PeerDiscoveryManager : IAsyncBackgroundWork, ICachePeerAdapter
- {
- private readonly List<ICacheNodeAdvertisment> _connectedPeers;
- private readonly NodeConfig _config;
- private readonly IPeerMonitor _monitor;
- private readonly KnownPeerList _knownPeers;
-
- public PeerDiscoveryManager(PluginBase plugin)
- {
- //Get config
- _config = plugin.GetOrCreateSingleton<NodeConfig>();
-
- //Get the peer monitor
- _monitor = plugin.GetOrCreateSingleton<CachePeerMonitor>();
-
- //Get the known peer list
- _knownPeers = plugin.GetOrCreateSingleton<KnownPeerList>();
-
- _connectedPeers = new();
-
- //Setup discovery error handler
- _config.Config.WithErrorHandler(new ErrorHandler(plugin.Log));
- }
-
- async Task IAsyncBackgroundWork.DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken)
- {
- /*
- * This loop uses the peer monitor to keep track of all connected peers, then gets
- * all the advertising peers (including known peers) and resolves all nodes across
- * the network.
- */
-
- pluginLog.Information("Node discovery worker started");
-
- try
- {
- while (true)
- {
- try
- {
- //Use the monitor to get the initial peers
- IEnumerable<ICacheNodeAdvertisment> ads = _monitor.GetAllPeers()
- .Where(static p => p.Advertisment != null)
- .Select(static p => p.Advertisment!);
-
- //Add known peers to the initial list
- ads = ads.Union(_knownPeers.GetPeers());
-
- //Set initial peers
- _config.Config.WithInitialPeers(ads);
-
- //Discover all nodes
- await _config.Config.DiscoverNodesAsync(exitToken);
- }
- catch(OperationCanceledException)
- {
- throw;
- }
- catch (Exception ex)
- {
- pluginLog.Error(ex, "Failed to discover new peer nodes");
- }
-
- //Delay the next discovery
- await Task.Delay(_config.DiscoveryInterval, exitToken);
- }
- }
- catch (OperationCanceledException)
- {
- //Normal exit
- pluginLog.Information("Node discovery worker exiting");
- }
- finally
- {
-
- }
- }
-
-
- ///<inheritdoc/>
- public ICacheNodeAdvertisment[] GetNewPeers()
- {
- lock (_connectedPeers)
- {
- //Get all discovered peers
- ICacheNodeAdvertisment[] peers = _config.Config.NodeCollection.GetAllNodes();
-
- //Get the difference between the discovered peers and the connected peers
- return peers.Except(_connectedPeers).ToArray();
- }
- }
-
- ///<inheritdoc/>
- public void OnPeerListenerAttached(ICacheNodeAdvertisment peer)
- {
- lock (_connectedPeers)
- {
- //Add to connected peers
- _connectedPeers.Add(peer);
- }
- }
-
- ///<inheritdoc/>
- public void OnPeerListenerDetatched(ICacheNodeAdvertisment peer)
- {
- //remove from connected peers
- lock (_connectedPeers)
- {
- _connectedPeers.Remove(peer);
- }
- }
-
-
- private sealed record class ErrorHandler(ILogProvider Logger) : ICacheDiscoveryErrorHandler
- {
- public void OnDiscoveryError(ICacheNodeAdvertisment errorNode, Exception ex)
- {
- Logger.Error("Failed to discover nodes from nodeid {nid}, with error\n{err}", errorNode, ex);
- }
- }
- }
-}
diff --git a/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs b/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs
index 8352635..5e794f8 100644
--- a/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs
+++ b/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs
@@ -46,7 +46,7 @@ using VNLib.Plugins.Essentials.Endpoints;
using VNLib.Plugins.Essentials.Extensions;
using VNLib.Plugins.Extensions.Loading.Routing;
using VNLib.Data.Caching.ObjectCache.Server.Distribution;
-
+using VNLib.Data.Caching.Extensions.Clustering;
namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
{
@@ -54,6 +54,8 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
[ConfigurationName("connect_endpoint")]
internal sealed class ConnectEndpoint : ResourceEndpointBase
{
+ private const string LOG_SCOPE_NAME = "CONEP";
+
private static readonly TimeSpan AuthTokenExpiration = TimeSpan.FromSeconds(30);
@@ -90,7 +92,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
{
string? path = config["path"].GetString();
- InitPathAndLog(path, plugin.Log);
+ InitPathAndLog(path, plugin.Log.CreateScope(LOG_SCOPE_NAME));
//Check for ip-verification flag
VerifyIp = config.TryGetValue("verify_ip", out JsonElement vIpEl) && vIpEl.GetBoolean();
@@ -152,28 +154,24 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
// Parse jwt
using (JsonWebToken jwt = JsonWebToken.Parse(jwtAuth))
{
- bool verified = false;
-
//verify signature for client
- if (NodeConfiguration.KeyStore.VerifyJwt(jwt))
+ if (NodeConfiguration.KeyStore.VerifyJwt(jwt, false))
{
- verified = true;
+ //Validated
}
//May be signed by a cache server
- else
+ else if(NodeConfiguration.KeyStore.VerifyJwt(jwt, true))
{
//Set peer and verified flag since the another cache server signed the request
- isPeer = verified = NodeConfiguration.KeyStore.VerifyCachePeer(jwt);
+ isPeer = true;
}
-
- //Check flag
- if (!verified)
+ else
{
Log.Information("Client signature verification failed");
entity.CloseResponse(HttpStatusCode.Unauthorized);
return VfReturnType.VirtualSkip;
}
-
+
//Recover json body
using JsonDocument doc = jwt.GetPayload();
@@ -196,6 +194,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
auth.WriteHeader(NodeConfiguration.KeyStore.GetJwtHeader());
auth.InitPayloadClaim()
.AddClaim("aud", AudienceLocalServerId)
+ .AddClaim("iat", entity.RequestedTimeUtc.ToUnixTimeSeconds())
.AddClaim("exp", entity.RequestedTimeUtc.Add(AuthTokenExpiration).ToUnixTimeSeconds())
.AddClaim("nonce", RandomHash.GetRandomBase32(8))
.AddClaim("chl", challenge!)
@@ -240,7 +239,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
}
string? nodeId = null;
- ICacheNodeAdvertisment? discoveryAd = null;
+ CacheNodeAdvertisment? discoveryAd = null;
//Parse jwt
using (JsonWebToken jwt = JsonWebToken.Parse(jwtAuth))
@@ -299,15 +298,15 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
//Verify the signature the client included of the auth token
- if (isPeer)
+ //Verify token signature against a fellow cache public key
+ if (!NodeConfiguration.KeyStore.VerifyUpgradeToken(clientSignature, jwtAuth, isPeer))
{
- //Verify token signature against a fellow cache public key
- if (!NodeConfiguration.KeyStore.VerifyUpgradeToken(clientSignature, jwtAuth))
- {
- entity.CloseResponse(HttpStatusCode.Unauthorized);
- return VfReturnType.VirtualSkip;
- }
+ entity.CloseResponse(HttpStatusCode.Unauthorized);
+ return VfReturnType.VirtualSkip;
+ }
+ if (isPeer)
+ {
//Try to get the node advertisement header
string? discoveryHeader = entity.Server.Headers[FBMDataCacheExtensions.X_NODE_DISCOVERY_HEADER];
@@ -317,15 +316,6 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
discoveryAd = NodeConfiguration.KeyStore.VerifyPeerAdvertisment(discoveryHeader);
}
}
- else
- {
- //Not a peer, so verify against the client's public key
- if (!NodeConfiguration.KeyStore.VerifyUpgradeToken(clientSignature, jwtAuth))
- {
- entity.CloseResponse(HttpStatusCode.Unauthorized);
- return VfReturnType.VirtualSkip;
- }
- }
}
try
@@ -389,12 +379,12 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
//Notify peers of new connection
Peers.OnPeerConnected(state);
- //Inc connected count
- Interlocked.Increment(ref _connectedClients);
-
//Register plugin exit token to cancel the connected socket
CancellationTokenRegistration reg = this.GetPlugin().UnloadToken.Register(wss.CancelAll);
+ //Inc connected count
+ Interlocked.Increment(ref _connectedClients);
+
try
{
//Init listener args from request
@@ -442,14 +432,12 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
{
Log.Debug(ex);
}
- finally
- {
- //Dec connected count
- Interlocked.Decrement(ref _connectedClients);
- //Unregister the
- reg.Unregister();
- }
+ //Dec connected count
+ Interlocked.Decrement(ref _connectedClients);
+
+ //Unregister the token
+ reg.Unregister();
//Notify monitor of disconnect
Peers.OnPeerDisconnected(state);
@@ -465,12 +453,13 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
public int MaxMessageSize { get; init; }
public int MaxResponseBufferSize { get; init; }
public string? NodeId { get; init; }
- public ICacheNodeAdvertisment? Advertisment { get; init; }
+ public CacheNodeAdvertisment? Advertisment { get; init; }
public override string ToString()
{
return
- $"{nameof(RecvBufferSize)}:{RecvBufferSize}, {nameof(MaxHeaderBufferSize)}: {MaxHeaderBufferSize}, {nameof(MaxMessageSize)}:{MaxMessageSize}, {nameof(MaxResponseBufferSize)}:{MaxResponseBufferSize}";
+ $"{nameof(RecvBufferSize)}:{RecvBufferSize}, {nameof(MaxHeaderBufferSize)}: {MaxHeaderBufferSize}, " +
+ $"{nameof(MaxMessageSize)}:{MaxMessageSize}, {nameof(MaxResponseBufferSize)}:{MaxResponseBufferSize}";
}
}
}
diff --git a/plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs b/plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs
index 90ffca0..77d59dd 100644
--- a/plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs
+++ b/plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs
@@ -26,7 +26,6 @@ using System;
using System.Net;
using System.Linq;
using System.Text.Json;
-using System.Threading.Tasks;
using VNLib.Hashing;
using VNLib.Hashing.IdentityUtility;
@@ -35,17 +34,25 @@ using VNLib.Plugins.Essentials;
using VNLib.Plugins.Essentials.Endpoints;
using VNLib.Plugins.Essentials.Extensions;
using VNLib.Plugins.Extensions.Loading;
-using VNLib.Data.Caching.Extensions;
using VNLib.Data.Caching.ObjectCache.Server.Distribution;
+using VNLib.Data.Caching.Extensions.Clustering;
namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
{
[ConfigurationName("discovery_endpoint")]
- internal sealed class PeerDiscoveryEndpoint : UnprotectedWebEndpoint
+ internal sealed class PeerDiscoveryEndpoint : ResourceEndpointBase
{
private readonly IPeerMonitor PeerMonitor;
private readonly NodeConfig Config;
+ //Loosen up protection settings
+ ///<inheritdoc/>
+ protected override ProtectionSettings EndpointProtectionSettings { get; } = new()
+ {
+ DisableBrowsersOnly = true,
+ DisableSessionsRequired = true
+ };
+
public PeerDiscoveryEndpoint(PluginBase plugin, IConfigScope config)
{
string? path = config["path"].GetString();
@@ -59,7 +66,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
Config = plugin.GetOrCreateSingleton<NodeConfig>();
}
- protected override async ValueTask<VfReturnType> GetAsync(HttpEntity entity)
+ protected override VfReturnType Get(HttpEntity entity)
{
//Get auth token
string? authToken = entity.Server.Headers[HttpRequestHeader.Authorization];
@@ -71,17 +78,18 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
}
string subject = string.Empty;
+ string challenge = string.Empty;
//Parse auth token
using(JsonWebToken jwt = JsonWebToken.Parse(authToken))
{
//try to verify against cache node first
- if (!Config.KeyStore.VerifyCachePeer(jwt))
+ if (!Config.KeyStore.VerifyJwt(jwt, true))
{
//failed...
//try to verify against client key
- if (!Config.KeyStore.VerifyJwt(jwt))
+ if (!Config.KeyStore.VerifyJwt(jwt, false))
{
//invalid token
entity.CloseResponse(HttpStatusCode.Unauthorized);
@@ -90,12 +98,14 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
}
using JsonDocument payload = jwt.GetPayload();
-
+
+ //Get client info to pass back
subject = payload.RootElement.GetProperty("sub").GetString() ?? string.Empty;
+ challenge = payload.RootElement.GetProperty("chl").GetString() ?? string.Empty;
}
//Valid key, get peer list to send to client
- ICacheNodeAdvertisment[] peers = PeerMonitor.GetAllPeers()
+ CacheNodeAdvertisment[] peers = PeerMonitor.GetAllPeers()
.Where(static p => p.Advertisment != null)
.Select(static p => p.Advertisment!)
.ToArray();
@@ -113,7 +123,8 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
.AddClaim("iat", entity.RequestedTimeUtc.ToUnixTimeSeconds())
//Send all peers as a json array
.AddClaim("peers", peers)
- .AddClaim("nonce", RandomHash.GetRandomBase32(24))
+ //Send the challenge back
+ .AddClaim("chl", challenge)
.CommitClaims();
//Sign the response
diff --git a/plugins/ObjectCacheServer/src/Endpoints/WellKnownEndpoint.cs b/plugins/ObjectCacheServer/src/Endpoints/WellKnownEndpoint.cs
new file mode 100644
index 0000000..99c7f19
--- /dev/null
+++ b/plugins/ObjectCacheServer/src/Endpoints/WellKnownEndpoint.cs
@@ -0,0 +1,111 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: ObjectCacheServer
+* File: WellKnownEndpoint.cs
+*
+* WellKnownEndpoint.cs is part of ObjectCacheServer which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* ObjectCacheServer is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* ObjectCacheServer is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+using System.Net;
+using System.Text.Json;
+
+using VNLib.Data.Caching.Extensions;
+using VNLib.Data.Caching.Extensions.Clustering;
+using VNLib.Hashing;
+using VNLib.Hashing.IdentityUtility;
+using VNLib.Plugins;
+using VNLib.Plugins.Essentials;
+using VNLib.Plugins.Essentials.Endpoints;
+using VNLib.Plugins.Essentials.Extensions;
+using VNLib.Plugins.Extensions.Loading;
+
+namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
+{
+
+ /*
+ * The well-known endpoint is used to advertise the node's existence to
+ * the network. Clients need to know the endpoint layout to be able to
+ * connect and discover other nodes.
+ */
+
+ [ConfigurationName("well_known", Required = false)]
+ internal sealed class WellKnownEndpoint : ResourceEndpointBase
+ {
+ //Default path for the well known endpoint
+ const string DefaultPath = "/.well-known/vncache";
+
+ //Store serialized advertisment
+ private readonly CacheNodeAdvertisment _advertisment;
+ private readonly ICacheAuthManager _keyStore;
+
+ //Loosen up security requirements
+ protected override ProtectionSettings EndpointProtectionSettings { get; } = new()
+ {
+ DisableBrowsersOnly = true,
+ DisableSessionsRequired = true,
+ };
+
+ public WellKnownEndpoint(PluginBase plugin):this(plugin, null)
+ { }
+
+ public WellKnownEndpoint(PluginBase plugin, IConfigScope? config)
+ {
+ //Get the node config
+ NodeConfig nodeConfig = plugin.GetOrCreateSingleton<NodeConfig>();
+
+ //serialize the config, discovery may not be enabled
+ _advertisment = nodeConfig.Config.Advertisment;
+ _keyStore = nodeConfig.KeyStore;
+
+ //Default to the well known path
+ string path = DefaultPath;
+
+ //See if the user configured a path
+ if(config != null && config.TryGetValue("path", out JsonElement pathEl))
+ {
+ path = pathEl.GetString() ?? DefaultPath;
+ }
+
+ InitPathAndLog(path, plugin.Log);
+ }
+
+ protected override VfReturnType Get(HttpEntity entity)
+ {
+ string entropy = RandomHash.GetRandomBase32(16);
+
+ //Create jwt signed for the client
+ using JsonWebToken jwt = new();
+
+ jwt.WriteHeader(_keyStore.GetJwtHeader());
+ //Write the advertisment as the message body
+ jwt.InitPayloadClaim()
+ .AddClaim("sub", _advertisment)
+ .AddClaim("chl", entropy)
+ .AddClaim("iat", entity.RequestedTimeUtc.ToUnixTimeSeconds())
+ .CommitClaims();
+
+ _keyStore.SignJwt(jwt);
+
+ //Return the advertisment jwt
+ entity.CloseResponse(HttpStatusCode.OK, Net.Http.ContentType.Text, jwt.DataBuffer);
+ return VfReturnType.VirtualSkip;
+ }
+ }
+}
diff --git a/plugins/ObjectCacheServer/src/ICachePeer.cs b/plugins/ObjectCacheServer/src/ICachePeer.cs
index 97b406f..897655a 100644
--- a/plugins/ObjectCacheServer/src/ICachePeer.cs
+++ b/plugins/ObjectCacheServer/src/ICachePeer.cs
@@ -22,7 +22,7 @@
* along with this program. If not, see https://www.gnu.org/licenses/.
*/
-using VNLib.Data.Caching.Extensions;
+using VNLib.Data.Caching.Extensions.Clustering;
namespace VNLib.Data.Caching.ObjectCache.Server
{
@@ -39,6 +39,6 @@ namespace VNLib.Data.Caching.ObjectCache.Server
/// <summary>
/// An optional signed advertisment message for other peers
/// </summary>
- ICacheNodeAdvertisment? Advertisment { get; }
+ CacheNodeAdvertisment? Advertisment { get; }
}
}
diff --git a/plugins/ObjectCacheServer/src/NodeConfig.cs b/plugins/ObjectCacheServer/src/NodeConfig.cs
index 81b8a32..a6c5be9 100644
--- a/plugins/ObjectCacheServer/src/NodeConfig.cs
+++ b/plugins/ObjectCacheServer/src/NodeConfig.cs
@@ -26,36 +26,43 @@ using System;
using System.Net;
using System.Linq;
using System.Text.Json;
-using System.Threading.Tasks;
using VNLib.Plugins;
-using VNLib.Utils;
using VNLib.Utils.Logging;
-using VNLib.Data.Caching.Extensions;
+using VNLib.Utils.Extensions;
using VNLib.Plugins.Extensions.Loading;
using VNLib.Data.Caching.ObjectCache.Server.Endpoints;
+using VNLib.Data.Caching.Extensions.Clustering;
namespace VNLib.Data.Caching.ObjectCache.Server
{
[ConfigurationName("cluster")]
- internal sealed class NodeConfig : VnDisposeable
+ internal sealed class NodeConfig
{
const string CacheConfigTemplate =
@"
- Cluster Configuration:
- Node Id: {id}
- TlsEndabled: {tls},
- Cache Endpoint: {ep}
+Cluster Configuration:
+ Node Id: {id}
+ TlsEndabled: {tls}
+ Cache Endpoint: {ep}
+ Discovery Endpoint: {dep}
+ Discovery Interval: {di}
+ Max Peers: {mpc}
";
public CacheNodeConfiguration Config { get; }
public CacheAuthKeyStore KeyStore { get; }
+ public TimeSpan DiscoveryInterval { get; }
+
+ /// <summary>
+ /// The maximum number of peer connections to allow
+ /// </summary>
+ public uint MaxPeerConnections { get; } = 10;
+
public NodeConfig(PluginBase plugin, IConfigScope config)
- {
- //Server id is just dns name for now
- string nodeId = Dns.GetHostName();
+ {
Config = new();
@@ -75,9 +82,13 @@ namespace VNLib.Data.Caching.ObjectCache.Server
//If the ssl element is present, ssl is enabled for the server
usingTls = firstHost.TryGetProperty("ssl", out _);
}
+ string hostname = Dns.GetHostName();
+
+ //Server id is just dns name for now
+ string nodeId = $"{hostname}:{port}";
//The endpoint to advertise to cache clients that allows cache connections
- Uri cacheEndpoint = GetEndpointUri<ConnectEndpoint>(plugin, usingTls, port, nodeId);
+ Uri cacheEndpoint = GetEndpointUri<ConnectEndpoint>(plugin, usingTls, port, hostname);
//Init key store
KeyStore = new(plugin);
@@ -89,21 +100,32 @@ namespace VNLib.Data.Caching.ObjectCache.Server
.WithTls(usingTls);
//Check if advertising is enabled
- if(config.TryGetValue("advertise", out JsonElement adEl) && adEl.GetBoolean())
+ if(plugin.HasConfigForType<PeerDiscoveryEndpoint>())
{
//Get the the broadcast endpoint
- Uri discoveryEndpoint = GetEndpointUri<PeerDiscoveryEndpoint>(plugin, usingTls, port, nodeId);
+ Uri discoveryEndpoint = GetEndpointUri<PeerDiscoveryEndpoint>(plugin, usingTls, port, hostname);
//Enable advertising
Config.EnableAdvertisment(discoveryEndpoint);
}
-
-
+
+
+ DiscoveryInterval = config["discovery_interval_sec"].GetTimeSpan(TimeParseType.Seconds);
+
+ //Get the max peer connections
+ if(config.TryGetValue("max_peers", out JsonElement maxPeerEl))
+ {
+ MaxPeerConnections = maxPeerEl.GetUInt32();
+ }
+
//log the config
plugin.Log.Information(CacheConfigTemplate,
nodeId,
usingTls,
- cacheEndpoint
+ cacheEndpoint,
+ Config.DiscoveryEndpoint,
+ DiscoveryInterval,
+ MaxPeerConnections
);
}
@@ -115,12 +137,5 @@ namespace VNLib.Data.Caching.ObjectCache.Server
//The endpoint to advertise to cache clients that allows cache connections
return new UriBuilder(usingTls ? Uri.UriSchemeHttps : Uri.UriSchemeHttp, hostName, port, cacheEpConfig["path"].GetString()).Uri;
}
-
-
- protected override void Free()
- {
- //cleanup keys
-
- }
}
}
diff --git a/plugins/ObjectCacheServer/src/ObjectCacheServer.csproj b/plugins/ObjectCacheServer/src/ObjectCacheServer.csproj
index e8d6291..5273b64 100644
--- a/plugins/ObjectCacheServer/src/ObjectCacheServer.csproj
+++ b/plugins/ObjectCacheServer/src/ObjectCacheServer.csproj
@@ -41,4 +41,9 @@
<ProjectReference Include="..\..\..\lib\VNLib.Data.Caching.Extensions\src\VNLib.Data.Caching.Extensions.csproj" />
<ProjectReference Include="..\..\..\lib\VNLib.Data.Caching.ObjectCache\src\VNLib.Data.Caching.ObjectCache.csproj" />
</ItemGroup>
+
+ <Target Condition="'$(BuildingInsideVisualStudio)' == true" Name="PostBuild" AfterTargets="PostBuildEvent">
+ <Exec Command="start xcopy &quot;$(TargetDir)&quot; &quot;F:\downloads\cache-test\plugins\$(TargetName)&quot; /E /Y /R" />
+ </Target>
+
</Project>
diff --git a/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs b/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs
index 5d9a50b..1ddf49b 100644
--- a/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs
+++ b/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs
@@ -33,7 +33,7 @@ using VNLib.Utils.Memory.Diagnostics;
using VNLib.Plugins.Extensions.Loading;
using VNLib.Plugins.Extensions.Loading.Routing;
using VNLib.Data.Caching.ObjectCache.Server.Endpoints;
-
+using VNLib.Data.Caching.ObjectCache.Server.Distribution;
namespace VNLib.Data.Caching.ObjectCache.Server
{
@@ -76,9 +76,15 @@ namespace VNLib.Data.Caching.ObjectCache.Server
{
try
{
+ //Route well-known endpoint
+ this.Route<WellKnownEndpoint>();
+
//Init connect endpoint
this.Route<ConnectEndpoint>();
+ //We must initialize the replication manager
+ _ = this.GetOrCreateSingleton<CacheNodeReplicationMaanger>();
+
//Setup discovery endpoint
if(this.HasConfigForType<PeerDiscoveryEndpoint>())
{