aboutsummaryrefslogtreecommitdiff
path: root/plugins/ObjectCacheServer
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/ObjectCacheServer')
-rw-r--r--plugins/ObjectCacheServer/src/CacheAuthKeyStore.cs117
-rw-r--r--plugins/ObjectCacheServer/src/CacheEventQueueManager.cs17
-rw-r--r--plugins/ObjectCacheServer/src/CacheListenerPubQueue.cs117
-rw-r--r--plugins/ObjectCacheServer/src/CacheStore.cs (renamed from plugins/ObjectCacheServer/src/Endpoints/CacheStore.cs)19
-rw-r--r--plugins/ObjectCacheServer/src/CacheSystemUtil.cs (renamed from plugins/ObjectCacheServer/src/Endpoints/CacheSystemUtil.cs)0
-rw-r--r--plugins/ObjectCacheServer/src/Distribution/CacheNodeReplicationMaanger.cs16
-rw-r--r--plugins/ObjectCacheServer/src/Distribution/CachePeerMonitor.cs29
-rw-r--r--plugins/ObjectCacheServer/src/Distribution/ICachePeerAdapter.cs10
-rw-r--r--plugins/ObjectCacheServer/src/Distribution/INodeDiscoveryCollection.cs58
-rw-r--r--plugins/ObjectCacheServer/src/Distribution/INodeDiscoveryEnumerator.cs45
-rw-r--r--plugins/ObjectCacheServer/src/Distribution/IPeerMonitor.cs4
-rw-r--r--plugins/ObjectCacheServer/src/Distribution/KnownPeerList.cs100
-rw-r--r--plugins/ObjectCacheServer/src/Distribution/NodeDiscoveryCollection.cs99
-rw-r--r--plugins/ObjectCacheServer/src/Distribution/PeerDiscoveryManager.cs91
-rw-r--r--plugins/ObjectCacheServer/src/Endpoints/BrokerHeartBeatEndpoint.cs149
-rw-r--r--plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs105
-rw-r--r--plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs14
-rw-r--r--plugins/ObjectCacheServer/src/ICacheEventQueueManager.cs9
-rw-r--r--plugins/ObjectCacheServer/src/ICachePeer.cs2
-rw-r--r--plugins/ObjectCacheServer/src/IPeerEventQueue.cs50
-rw-r--r--plugins/ObjectCacheServer/src/NodeConfig.cs177
-rw-r--r--plugins/ObjectCacheServer/src/ObjectCacheServer.csproj2
-rw-r--r--plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs78
23 files changed, 569 insertions, 739 deletions
diff --git a/plugins/ObjectCacheServer/src/CacheAuthKeyStore.cs b/plugins/ObjectCacheServer/src/CacheAuthKeyStore.cs
new file mode 100644
index 0000000..6725fbe
--- /dev/null
+++ b/plugins/ObjectCacheServer/src/CacheAuthKeyStore.cs
@@ -0,0 +1,117 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: ObjectCacheServer
+* File: ObjectCacheServerEntry.cs
+*
+* ObjectCacheServerEntry.cs is part of ObjectCacheServer which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* ObjectCacheServer is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* ObjectCacheServer is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+using System.Collections.Generic;
+using System.Security.Cryptography;
+
+using VNLib.Hashing;
+using VNLib.Plugins;
+using VNLib.Hashing.IdentityUtility;
+using VNLib.Plugins.Extensions.Loading;
+using VNLib.Data.Caching.Extensions;
+
+
+namespace VNLib.Data.Caching.ObjectCache.Server
+{
+ sealed record class CacheAuthKeyStore : ICacheAuthManager
+ {
+ private readonly IAsyncLazy<ReadOnlyJsonWebKey> _clientPub;
+ private readonly IAsyncLazy<ReadOnlyJsonWebKey> _cachePriv;
+
+ public CacheAuthKeyStore(PluginBase plugin)
+ {
+ _clientPub = plugin.GetSecretAsync("client_public_key").ToLazy(r => r.GetJsonWebKey());
+ _cachePriv = plugin.GetSecretAsync("cache_private_key").ToLazy(r => r.GetJsonWebKey());
+ }
+
+ ///<inheritdoc/>
+ public IReadOnlyDictionary<string, string?> GetJwtHeader()
+ {
+ return _cachePriv.Value.JwtHeader;
+ }
+
+ ///<inheritdoc/>
+ public void SignJwt(JsonWebToken jwt)
+ {
+ jwt.SignFromJwk(_cachePriv.Value);
+ }
+
+ ///<inheritdoc/>
+ public bool VerifyJwt(JsonWebToken jwt)
+ {
+ return jwt.VerifyFromJwk(_clientPub.Value);
+ }
+
+ /// <summary>
+ /// Verifies the message against the stored cache key
+ /// </summary>
+ /// <param name="jwt">The token to verify</param>
+ /// <returns>True if the token was verified, false otherwise</returns>
+ public bool VerifyCachePeer(JsonWebToken jwt)
+ {
+ return jwt.VerifyFromJwk(_cachePriv.Value);
+ }
+
+ ///<inheritdoc/>
+ public byte[] SignMessageHash(byte[] hash, HashAlg alg)
+ {
+ //try to get the rsa alg for the signing key
+ using RSA? rsa = _cachePriv.Value.GetRSAPublicKey();
+ if (rsa != null)
+ {
+ return rsa.SignHash(hash, alg.GetAlgName(), RSASignaturePadding.Pkcs1);
+ }
+
+ //try to get the ecdsa alg for the signing key
+ using ECDsa? ecdsa = _cachePriv.Value.GetECDsaPublicKey();
+ if (ecdsa != null)
+ {
+ return ecdsa.SignHash(hash);
+ }
+
+ throw new NotSupportedException("The signing key is not a valid RSA or ECDSA key");
+ }
+
+ ///<inheritdoc/>
+ public bool VerifyMessageHash(ReadOnlySpan<byte> hash, HashAlg alg, ReadOnlySpan<byte> signature)
+ {
+ //try to get the rsa alg for the signing key
+ using RSA? rsa = _clientPub.Value.GetRSAPublicKey();
+ if (rsa != null)
+ {
+ return rsa.VerifyHash(hash, signature, alg.GetAlgName(), RSASignaturePadding.Pkcs1);
+ }
+
+ //try to get the ecdsa alg for the signing key
+ using ECDsa? ecdsa = _clientPub.Value.GetECDsaPublicKey();
+ if (ecdsa != null)
+ {
+ return ecdsa.VerifyHash(hash, signature);
+ }
+
+ throw new NotSupportedException("The current key is not an RSA or ECDSA key and is not supported");
+ }
+ }
+}
diff --git a/plugins/ObjectCacheServer/src/CacheEventQueueManager.cs b/plugins/ObjectCacheServer/src/CacheEventQueueManager.cs
index 5fb6d2a..049069e 100644
--- a/plugins/ObjectCacheServer/src/CacheEventQueueManager.cs
+++ b/plugins/ObjectCacheServer/src/CacheEventQueueManager.cs
@@ -39,6 +39,7 @@ using VNLib.Plugins.Extensions.Loading.Events;
namespace VNLib.Data.Caching.ObjectCache.Server
{
+
[ConfigurationName("event_manager")]
internal sealed class CacheEventQueueManager : ICacheEventQueueManager, IDisposable, IIntervalScheduleable
{
@@ -70,7 +71,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server
}
///<inheritdoc/>
- public AsyncQueue<ChangeEvent> Subscribe(ICachePeer peer)
+ public IPeerEventQueue Subscribe(ICachePeer peer)
{
NodeQueue? nq;
@@ -198,7 +199,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server
* attached to the queue
*/
- private sealed class NodeQueue
+ private sealed class NodeQueue : IPeerEventQueue
{
public int Listeners;
@@ -243,6 +244,18 @@ namespace VNLib.Data.Caching.ObjectCache.Server
Queue.TryEnque(changes[i]);
}
}
+
+ ///<inheritdoc/>
+ public ValueTask<ChangeEvent> DequeueAsync(CancellationToken cancellation)
+ {
+ return Queue.DequeueAsync(cancellation);
+ }
+
+ ///<inheritdoc/>
+ public bool TryDequeue(out ChangeEvent change)
+ {
+ return Queue.TryDequeue(out change);
+ }
}
}
}
diff --git a/plugins/ObjectCacheServer/src/CacheListenerPubQueue.cs b/plugins/ObjectCacheServer/src/CacheListenerPubQueue.cs
new file mode 100644
index 0000000..52b6abf
--- /dev/null
+++ b/plugins/ObjectCacheServer/src/CacheListenerPubQueue.cs
@@ -0,0 +1,117 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: ObjectCacheServer
+* File: CacheListenerPubQueue.cs
+*
+* CacheListenerPubQueue.cs is part of ObjectCacheServer which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* ObjectCacheServer is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* ObjectCacheServer is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+
+using VNLib.Utils.Async;
+using VNLib.Utils.Logging;
+using VNLib.Plugins;
+using VNLib.Plugins.Extensions.Loading;
+
+
+namespace VNLib.Data.Caching.ObjectCache.Server
+{
+ internal sealed class CacheListenerPubQueue : ICacheListenerEventQueue, IAsyncBackgroundWork
+ {
+ private readonly AsyncQueue<ChangeEvent> _listenerQueue;
+ private readonly ILogProvider _logProvider;
+ private readonly ICacheEventQueueManager _queueManager;
+
+ public CacheListenerPubQueue(PluginBase plugin)
+ {
+ _queueManager = plugin.GetOrCreateSingleton<CacheEventQueueManager>();
+ _logProvider = plugin.Log;
+ _listenerQueue = new AsyncQueue<ChangeEvent>(false, true);
+
+ //Register processing worker
+ _ = plugin.ObserveWork(this, 500);
+ }
+
+ ///<inheritdoc/>
+ public async Task DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken)
+ {
+ const int accumulatorSize = 64;
+
+ try
+ {
+ //Accumulator for events
+ ChangeEvent[] accumulator = new ChangeEvent[accumulatorSize];
+ int ptr = 0;
+
+ //Listen for changes
+ while (true)
+ {
+ //Wait for next event
+ accumulator[ptr++] = await _listenerQueue.DequeueAsync(exitToken);
+
+ //try to accumulate more events until we can't anymore
+ while (_listenerQueue.TryDequeue(out ChangeEvent? ev) && ptr < accumulatorSize)
+ {
+ accumulator[ptr++] = ev;
+ }
+
+ //Publish all events to subscribers
+ _queueManager.PublishMultiple(accumulator.AsSpan(0, ptr));
+
+ //Reset pointer
+ ptr = 0;
+ }
+ }
+ catch (OperationCanceledException)
+ {
+ //Normal exit
+ pluginLog.Debug("Change queue listener worker exited");
+ }
+ }
+
+ ///<inheritdoc/>
+ public bool IsEnabled(object userState)
+ {
+ return userState is IPeerEventQueue;
+ }
+
+ ///<inheritdoc/>
+ public void PublishEvent(ChangeEvent changeEvent)
+ {
+ if (!_listenerQueue.TryEnque(changeEvent))
+ {
+ _logProvider.Warn("Cache listener event queue is overflowing");
+ }
+ }
+
+ ///<inheritdoc/>
+ public bool TryDequeue(object userState, out ChangeEvent changeEvent)
+ {
+ return (userState as IPeerEventQueue)!.TryDequeue(out changeEvent);
+ }
+
+ ///<inheritdoc/>
+ public ValueTask<ChangeEvent> DequeueAsync(object userState, CancellationToken cancellation)
+ {
+ return (userState as IPeerEventQueue)!.DequeueAsync(cancellation);
+ }
+ }
+}
diff --git a/plugins/ObjectCacheServer/src/Endpoints/CacheStore.cs b/plugins/ObjectCacheServer/src/CacheStore.cs
index 67db433..e7a7c63 100644
--- a/plugins/ObjectCacheServer/src/Endpoints/CacheStore.cs
+++ b/plugins/ObjectCacheServer/src/CacheStore.cs
@@ -3,9 +3,9 @@
*
* Library: VNLib
* Package: ObjectCacheServer
-* File: ConnectEndpoint.cs
+* File: CacheStore.cs
*
-* ConnectEndpoint.cs is part of ObjectCacheServer which is part of the larger
+* CacheStore.cs is part of ObjectCacheServer which is part of the larger
* VNLib collection of libraries and utilities.
*
* ObjectCacheServer is free software: you can redistribute it and/or modify
@@ -25,14 +25,16 @@
using System;
using System.Threading;
using System.Threading.Tasks;
+
using VNLib.Utils.Logging;
using VNLib.Plugins;
using VNLib.Plugins.Extensions.Loading;
+
namespace VNLib.Data.Caching.ObjectCache.Server
{
[ConfigurationName("cache")]
- sealed class CacheStore : ICacheStore, IDisposable
+ internal sealed class CacheStore : ICacheStore, IDisposable
{
public BlobCacheListener Listener { get; }
@@ -76,11 +78,20 @@ namespace VNLib.Data.Caching.ObjectCache.Server
plugin.Log.Verbose("Creating cache store with {bc} buckets, with {mc} items/bucket", cacheConf.BucketCount, cacheConf.MaxCacheEntries);
+ //calculate the max memory usage
+ ulong maxByteSize = ((ulong)cacheConf.MaxCacheEntries * (ulong)cacheConf.BucketCount * (ulong)cacheConf.MaxMessageSize);
+
+ //Log max memory usage
+ plugin.Log.Debug("Maxium memory consumption {mx}Mb", maxByteSize / (ulong)(1024 * 1000));
+
//Load the blob cache table system
IBlobCacheTable bc = plugin.LoadMemoryCacheSystem(config, plugin.CacheHeap, cacheConf);
+ //Get the event listener
+ ICacheListenerEventQueue queue = plugin.GetOrCreateSingleton<CacheListenerPubQueue>();
+
//Endpoint only allows for a single reader
- return new(bc, plugin.Log, plugin.CacheHeap, true);
+ return new(bc, queue, plugin.Log, plugin.CacheHeap);
}
public void Dispose()
diff --git a/plugins/ObjectCacheServer/src/Endpoints/CacheSystemUtil.cs b/plugins/ObjectCacheServer/src/CacheSystemUtil.cs
index 669b84f..669b84f 100644
--- a/plugins/ObjectCacheServer/src/Endpoints/CacheSystemUtil.cs
+++ b/plugins/ObjectCacheServer/src/CacheSystemUtil.cs
diff --git a/plugins/ObjectCacheServer/src/Distribution/CacheNodeReplicationMaanger.cs b/plugins/ObjectCacheServer/src/Distribution/CacheNodeReplicationMaanger.cs
index b453dcc..a55e8e2 100644
--- a/plugins/ObjectCacheServer/src/Distribution/CacheNodeReplicationMaanger.cs
+++ b/plugins/ObjectCacheServer/src/Distribution/CacheNodeReplicationMaanger.cs
@@ -58,7 +58,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Distribution
//Get peer adapter
PeerAdapter = plugin.GetOrCreateSingleton<PeerDiscoveryManager>();
-
+ CacheStore = plugin.GetOrCreateSingleton<CacheStore>();
}
public async Task DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken)
@@ -70,15 +70,15 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Distribution
while (true)
{
//Get all new peers
- ICachePeerAdvertisment[] peers = PeerAdapter.GetNewPeers();
+ ICacheNodeAdvertisment[] peers = PeerAdapter.GetNewPeers();
if (peers.Length == 0)
{
pluginLog.Verbose("[REPL] No new peers to connect to");
}
- //Connect to each peer
- foreach (ICachePeerAdvertisment peer in peers)
+ //Connect to each peer as a background task
+ foreach (ICacheNodeAdvertisment peer in peers)
{
_ = Plugin.ObserveWork(() => OnNewPeerDoWorkAsync(peer, pluginLog, exitToken));
}
@@ -104,13 +104,16 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Distribution
pluginLog.Information("[REPL] Node replication worker exited");
}
- private async Task OnNewPeerDoWorkAsync(ICachePeerAdvertisment newPeer, ILogProvider log, CancellationToken exitToken)
+ private async Task OnNewPeerDoWorkAsync(ICacheNodeAdvertisment newPeer, ILogProvider log, CancellationToken exitToken)
{
_ = newPeer ?? throw new ArgumentNullException(nameof(newPeer));
//Setup client
FBMClient client = new(ClientConfig);
+ //Add peer to monitor
+ PeerAdapter.OnPeerListenerAttached(newPeer);
+
try
{
log.Information("Establishing replication connection to peer {server}...", newPeer.NodeId);
@@ -198,7 +201,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Distribution
return;
case "deleted":
//Delete the object from the store
- await CacheStore.DeleteItemAsync(changedObject.CurrentId);
+ await CacheStore.DeleteItemAsync(changedObject.CurrentId, CancellationToken.None);
break;
case "modified":
//Reload the record from the store
@@ -226,6 +229,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Distribution
//Check response code
string status = response.Headers.First(static s => s.Header == HeaderCommand.Status).Value.ToString();
+
if (ResponseCodes.Okay.Equals(status, StringComparison.Ordinal))
{
//Update the record
diff --git a/plugins/ObjectCacheServer/src/Distribution/CachePeerMonitor.cs b/plugins/ObjectCacheServer/src/Distribution/CachePeerMonitor.cs
index 82b280c..f191c9d 100644
--- a/plugins/ObjectCacheServer/src/Distribution/CachePeerMonitor.cs
+++ b/plugins/ObjectCacheServer/src/Distribution/CachePeerMonitor.cs
@@ -22,7 +22,7 @@
* along with this program. If not, see https://www.gnu.org/licenses/.
*/
-using System;
+using System.Linq;
using System.Collections.Generic;
using VNLib.Plugins;
@@ -32,24 +32,39 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Distribution
internal sealed class CachePeerMonitor : IPeerMonitor
{
- public CachePeerMonitor(PluginBase plugin)
- {
- }
+ private readonly LinkedList<ICachePeer> peers = new();
+
+ public CachePeerMonitor(PluginBase plugin)
+ { }
+ ///<inheritdoc/>
public IEnumerable<ICachePeer> GetAllPeers()
{
- throw new NotImplementedException();
+ lock(peers)
+ {
+ return peers.ToArray();
+ }
}
+ ///<inheritdoc/>
public void OnPeerConnected(ICachePeer peer)
{
- throw new NotImplementedException();
+ //When a peer is connected we can add it to the list so the replication manager can see it
+ lock(peers)
+ {
+ peers.AddLast(peer);
+ }
}
+ ///<inheritdoc/>
public void OnPeerDisconnected(ICachePeer peer)
{
- throw new NotImplementedException();
+ //When a peer is disconnected we can remove it from the list
+ lock(peers)
+ {
+ peers.Remove(peer);
+ }
}
}
}
diff --git a/plugins/ObjectCacheServer/src/Distribution/ICachePeerAdapter.cs b/plugins/ObjectCacheServer/src/Distribution/ICachePeerAdapter.cs
index d029f10..c3fb022 100644
--- a/plugins/ObjectCacheServer/src/Distribution/ICachePeerAdapter.cs
+++ b/plugins/ObjectCacheServer/src/Distribution/ICachePeerAdapter.cs
@@ -3,9 +3,9 @@
*
* Library: VNLib
* Package: ObjectCacheServer
-* File: ObjectCacheServerEntry.cs
+* File: ICachePeerAdapter.cs
*
-* ObjectCacheServerEntry.cs is part of ObjectCacheServer which is part of the larger
+* ICachePeerAdapter.cs is part of ObjectCacheServer which is part of the larger
* VNLib collection of libraries and utilities.
*
* ObjectCacheServer is free software: you can redistribute it and/or modify
@@ -32,18 +32,18 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Distribution
/// Gets the peers that have been discovered but not yet connected to
/// </summary>
/// <returns>A collection of peers that have not been connected to yet</returns>
- ICachePeerAdvertisment[] GetNewPeers();
+ ICacheNodeAdvertisment[] GetNewPeers();
/// <summary>
/// Called when a peer has been connected to
/// </summary>
/// <param name="peer">The peer that has been connected</param>
- void OnPeerListenerAttached(ICachePeerAdvertisment peer);
+ void OnPeerListenerAttached(ICacheNodeAdvertisment peer);
/// <summary>
/// Called when a peer has been disconnected from
/// </summary>
/// <param name="peer">The disconnected peer</param>
- void OnPeerListenerDetatched(ICachePeerAdvertisment peer);
+ void OnPeerListenerDetatched(ICacheNodeAdvertisment peer);
}
}
diff --git a/plugins/ObjectCacheServer/src/Distribution/INodeDiscoveryCollection.cs b/plugins/ObjectCacheServer/src/Distribution/INodeDiscoveryCollection.cs
deleted file mode 100644
index d69da40..0000000
--- a/plugins/ObjectCacheServer/src/Distribution/INodeDiscoveryCollection.cs
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
-* Copyright (c) 2023 Vaughn Nugent
-*
-* Library: VNLib
-* Package: ObjectCacheServer
-* File: INodeDiscoveryCollection.cs
-*
-* INodeDiscoveryCollection.cs is part of ObjectCacheServer which is
-* part of the larger VNLib collection of libraries and utilities.
-*
-* ObjectCacheServer is free software: you can redistribute it and/or modify
-* it under the terms of the GNU Affero General Public License as
-* published by the Free Software Foundation, either version 3 of the
-* License, or (at your option) any later version.
-*
-* ObjectCacheServer is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-* GNU Affero General Public License for more details.
-*
-* You should have received a copy of the GNU Affero General Public License
-* along with this program. If not, see https://www.gnu.org/licenses/.
-*/
-
-using System.Collections.Generic;
-
-using VNLib.Data.Caching.Extensions;
-
-namespace VNLib.Data.Caching.ObjectCache.Server.Distribution
-{
- internal interface INodeDiscoveryCollection
- {
- /// <summary>
- /// Begins a new discovery and gets an enumerator for the discovery process
- /// </summary>
- /// <returns>An enumerator that simplifies discovery of unique nodes</returns>
- INodeDiscoveryEnumerator BeginDiscovery();
-
- /// <summary>
- /// Begins a new discovery and gets an enumerator for the discovery process
- /// </summary>
- /// <param name="initialPeers">An initial collection of peers to add to the enumeration</param>
- /// <returns>An enumerator that simplifies discovery of unique nodes</returns>
- INodeDiscoveryEnumerator BeginDiscovery(IEnumerable<ICachePeerAdvertisment> initialPeers);
-
- /// <summary>
- /// Gets a snapshot of all discovered nodes in the current collection.
- /// </summary>
- /// <returns>The current collection of notes</returns>
- ICachePeerAdvertisment[] GetAllNodes();
-
- /// <summary>
- /// Completes a discovery process and updates the collection with the results
- /// </summary>
- /// <param name="enumerator">The enumerator used to collect discovered nodes</param>
- void CompleteDiscovery(INodeDiscoveryEnumerator enumerator);
- }
-}
diff --git a/plugins/ObjectCacheServer/src/Distribution/INodeDiscoveryEnumerator.cs b/plugins/ObjectCacheServer/src/Distribution/INodeDiscoveryEnumerator.cs
deleted file mode 100644
index 5cddf9c..0000000
--- a/plugins/ObjectCacheServer/src/Distribution/INodeDiscoveryEnumerator.cs
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
-* Copyright (c) 2023 Vaughn Nugent
-*
-* Library: VNLib
-* Package: ObjectCacheServer
-* File: INodeDiscoveryEnumerator.cs
-*
-* INodeDiscoveryEnumerator.cs is part of ObjectCacheServer which is part
-* of the larger VNLib collection of libraries and utilities.
-*
-* ObjectCacheServer is free software: you can redistribute it and/or modify
-* it under the terms of the GNU Affero General Public License as
-* published by the Free Software Foundation, either version 3 of the
-* License, or (at your option) any later version.
-*
-* ObjectCacheServer is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-* GNU Affero General Public License for more details.
-*
-* You should have received a copy of the GNU Affero General Public License
-* along with this program. If not, see https://www.gnu.org/licenses/.
-*/
-
-using System.Collections.Generic;
-
-using VNLib.Data.Caching.Extensions;
-
-namespace VNLib.Data.Caching.ObjectCache.Server.Distribution
-{
- internal interface INodeDiscoveryEnumerator
- {
- /// <summary>
- /// Moves the enumerator to the next peer in the discovery process and returns the result
- /// </summary>
- /// <returns>The next peer advertisment in the enumeration</returns>
- ICachePeerAdvertisment? GetNextPeer();
-
- /// <summary>
- /// Adds the specified peer to the collection of discovered peers
- /// </summary>
- /// <param name="discoveredPeers">The peer collection</param>
- void OnPeerDiscoveryComplete(IEnumerable<ICachePeerAdvertisment> discoveredPeers);
- }
-}
diff --git a/plugins/ObjectCacheServer/src/Distribution/IPeerMonitor.cs b/plugins/ObjectCacheServer/src/Distribution/IPeerMonitor.cs
index b4cb840..028171f 100644
--- a/plugins/ObjectCacheServer/src/Distribution/IPeerMonitor.cs
+++ b/plugins/ObjectCacheServer/src/Distribution/IPeerMonitor.cs
@@ -45,9 +45,9 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Distribution
void OnPeerDisconnected(ICachePeer peer);
/// <summary>
- /// Gets an enumerable of all peers currently active in the current peer
+ /// Gets an enumerable of all peers currently connected to this node
/// </summary>
- /// <returns></returns>
+ /// <returns>The collection of all connected peers</returns>
IEnumerable<ICachePeer> GetAllPeers();
}
}
diff --git a/plugins/ObjectCacheServer/src/Distribution/KnownPeerList.cs b/plugins/ObjectCacheServer/src/Distribution/KnownPeerList.cs
new file mode 100644
index 0000000..74df81f
--- /dev/null
+++ b/plugins/ObjectCacheServer/src/Distribution/KnownPeerList.cs
@@ -0,0 +1,100 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: ObjectCacheServer
+* File: KnownPeerList.cs
+*
+* KnownPeerList.cs is part of ObjectCacheServer which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* ObjectCacheServer is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* ObjectCacheServer is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+using System.Linq;
+using System.Collections.Generic;
+using System.Text.Json.Serialization;
+
+using VNLib.Plugins;
+using VNLib.Data.Caching.Extensions;
+using VNLib.Plugins.Extensions.Loading;
+
+
+namespace VNLib.Data.Caching.ObjectCache.Server.Distribution
+{
+ [ConfigurationName("known_peers")]
+ internal sealed class KnownPeerList
+ {
+ private readonly List<KnownPeer> _peers;
+
+ public KnownPeerList(PluginBase plugin, IConfigScope config)
+ {
+ //Deserialze the known peers into an array
+ KnownPeer[] peers = config.Deserialze<KnownPeer[]>();
+
+ foreach (KnownPeer peer in peers)
+ {
+ //Validate the peer
+ peer.Validate();
+ }
+
+ _peers = peers?.ToList() ?? new();
+ }
+
+ public IEnumerable<ICacheNodeAdvertisment> GetPeers()
+ {
+ return _peers;
+ }
+
+ private sealed class KnownPeer : ICacheNodeAdvertisment
+ {
+ public Uri? ConnectEndpoint { get; set; }
+ public Uri? DiscoveryEndpoint { get; set; }
+
+ [JsonPropertyName("node_id")]
+ public string NodeId { get; set; }
+
+ [JsonPropertyName("connect_url")]
+ public string? ConnectEpPath
+ {
+ get => ConnectEndpoint?.ToString() ?? string.Empty;
+ set => ConnectEndpoint = new Uri(value ?? string.Empty);
+ }
+
+ [JsonPropertyName("discovery_url")]
+ public string? DiscoveryEpPath
+ {
+ get => DiscoveryEndpoint?.ToString() ?? string.Empty;
+ set => DiscoveryEndpoint = new Uri(value ?? string.Empty);
+ }
+
+ public void Validate()
+ {
+ if (string.IsNullOrWhiteSpace(NodeId))
+ {
+ throw new ArgumentException("Node ID cannot be null or whitespace");
+ }
+ if (ConnectEndpoint is null)
+ {
+ throw new ArgumentException("Connect endpoint cannot be null");
+ }
+ if (DiscoveryEndpoint is null)
+ {
+ throw new ArgumentException("Discovery endpoint cannot be null");
+ }
+ }
+ }
+ }
+}
diff --git a/plugins/ObjectCacheServer/src/Distribution/NodeDiscoveryCollection.cs b/plugins/ObjectCacheServer/src/Distribution/NodeDiscoveryCollection.cs
deleted file mode 100644
index f773a2e..0000000
--- a/plugins/ObjectCacheServer/src/Distribution/NodeDiscoveryCollection.cs
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
-* Copyright (c) 2023 Vaughn Nugent
-*
-* Library: VNLib
-* Package: ObjectCacheServer
-* File: PeerDiscoveryManager.cs
-*
-* PeerDiscoveryManager.cs is part of ObjectCacheServer which is part of the larger
-* VNLib collection of libraries and utilities.
-*
-* ObjectCacheServer is free software: you can redistribute it and/or modify
-* it under the terms of the GNU Affero General Public License as
-* published by the Free Software Foundation, either version 3 of the
-* License, or (at your option) any later version.
-*
-* ObjectCacheServer is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-* GNU Affero General Public License for more details.
-*
-* You should have received a copy of the GNU Affero General Public License
-* along with this program. If not, see https://www.gnu.org/licenses/.
-*/
-
-using System;
-using System.Linq;
-using System.Collections.Generic;
-
-using VNLib.Plugins;
-using VNLib.Data.Caching.Extensions;
-
-namespace VNLib.Data.Caching.ObjectCache.Server.Distribution
-{
- sealed class NodeDiscoveryCollection : INodeDiscoveryCollection
- {
- private LinkedList<ICachePeerAdvertisment> _peers;
-
-
- public NodeDiscoveryCollection(PluginBase plugin)
- {
- _peers = new();
- }
-
- ///<inheritdoc/>
- public INodeDiscoveryEnumerator BeginDiscovery()
- {
- return new NodeEnumerator(new());
- }
-
- ///<inheritdoc/>
- public INodeDiscoveryEnumerator BeginDiscovery(IEnumerable<ICachePeerAdvertisment> initialPeers)
- {
- //Init new enumerator with the initial peers
- return new NodeEnumerator(new(initialPeers));
- }
-
- ///<inheritdoc/>
- public void CompleteDiscovery(INodeDiscoveryEnumerator enumerator)
- {
- _ = enumerator ?? throw new ArgumentNullException(nameof(enumerator));
-
- //Capture all nodes from the enumerator and store them as our current peers
- _peers = (enumerator as NodeEnumerator)!.Peers;
- }
-
- ///<inheritdoc/>
- public ICachePeerAdvertisment[] GetAllNodes()
- {
- //Capture all current peers
- return _peers.ToArray();
- }
-
- private sealed record class NodeEnumerator(LinkedList<ICachePeerAdvertisment> Peers) : INodeDiscoveryEnumerator
- {
- //Keep track of the current node in the collection so we can move down the list
- private LinkedListNode<ICachePeerAdvertisment>? _currentNode = Peers.First;
-
- public ICachePeerAdvertisment? GetNextPeer()
- {
- //Move to the next peer in the collection
- _currentNode = _currentNode?.Next;
-
- return _currentNode?.Value;
- }
-
- public void OnPeerDiscoveryComplete(IEnumerable<ICachePeerAdvertisment> discoveredPeers)
- {
- //Get only the peers from the discovery that are not already in the collection
- IEnumerable<ICachePeerAdvertisment> newPeers = discoveredPeers.Except(Peers);
-
- //Add them to the end of the collection
- foreach(ICachePeerAdvertisment ad in newPeers)
- {
- Peers.AddLast(ad);
- }
- }
- }
- }
-}
diff --git a/plugins/ObjectCacheServer/src/Distribution/PeerDiscoveryManager.cs b/plugins/ObjectCacheServer/src/Distribution/PeerDiscoveryManager.cs
index 54e4258..26ec565 100644
--- a/plugins/ObjectCacheServer/src/Distribution/PeerDiscoveryManager.cs
+++ b/plugins/ObjectCacheServer/src/Distribution/PeerDiscoveryManager.cs
@@ -33,14 +33,16 @@ using VNLib.Utils.Logging;
using VNLib.Data.Caching.Extensions;
using VNLib.Plugins.Extensions.Loading;
+
namespace VNLib.Data.Caching.ObjectCache.Server.Distribution
{
sealed class PeerDiscoveryManager : IAsyncBackgroundWork, ICachePeerAdapter
{
+ private readonly List<ICacheNodeAdvertisment> _connectedPeers;
private readonly NodeConfig _config;
- private readonly IPeerMonitor _monitor;
- private readonly INodeDiscoveryCollection _peers;
+ private readonly IPeerMonitor _monitor;
+ private readonly KnownPeerList _knownPeers;
public PeerDiscoveryManager(PluginBase plugin)
{
@@ -50,14 +52,23 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Distribution
//Get the peer monitor
_monitor = plugin.GetOrCreateSingleton<CachePeerMonitor>();
- //Get the node collection
- _peers = plugin.GetOrCreateSingleton<NodeDiscoveryCollection>();
+ //Get the known peer list
+ _knownPeers = plugin.GetOrCreateSingleton<KnownPeerList>();
_connectedPeers = new();
+
+ //Setup discovery error handler
+ _config.Config.WithErrorHandler(new ErrorHandler(plugin.Log));
}
async Task IAsyncBackgroundWork.DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken)
{
+ /*
+ * This loop uses the peer monitor to keep track of all connected peers, then gets
+ * all the advertising peers (including known peers) and resolves all nodes across
+ * the network.
+ */
+
pluginLog.Information("Node discovery worker started");
try
@@ -66,7 +77,19 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Distribution
{
try
{
- await DiscoverAllNodesAsync(pluginLog, exitToken);
+ //Use the monitor to get the initial peers
+ IEnumerable<ICacheNodeAdvertisment> ads = _monitor.GetAllPeers()
+ .Where(static p => p.Advertisment != null)
+ .Select(static p => p.Advertisment!);
+
+ //Add known peers to the initial list
+ ads = ads.Union(_knownPeers.GetPeers());
+
+ //Set initial peers
+ _config.Config.WithInitialPeers(ads);
+
+ //Discover all nodes
+ await _config.Config.DiscoverNodesAsync(exitToken);
}
catch(OperationCanceledException)
{
@@ -91,54 +114,15 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Distribution
}
}
-
- async Task DiscoverAllNodesAsync(ILogProvider log, CancellationToken cancellation)
- {
- //Use the monitor to get the initial peers
- IEnumerable<ICachePeerAdvertisment> ads = _monitor.GetAllPeers()
- .Where(static p => p.Advertisment != null)
- .Select(static p => p.Advertisment!);
-
- //Init enumerator with initial peers
- INodeDiscoveryEnumerator enumerator = _peers.BeginDiscovery(ads);
-
- do
- {
- //Load the initial peer
- ICachePeerAdvertisment? peer = enumerator.GetNextPeer();
-
- if (peer == null)
- {
- break;
- }
-
- log.Verbose("Discovering peer nodes from {Peer}", peer.NodeId);
-
- //Discover nodes from this peer
- ICachePeerAdvertisment[]? newNodes = await _config.Config.DiscoverClusterNodesAsync(peer, cancellation);
-
- //Add nodes to the enumerator
- if (newNodes != null)
- {
- enumerator.OnPeerDiscoveryComplete(newNodes);
- }
-
- } while (true);
-
- //Commit peer updates
- _peers.CompleteDiscovery(enumerator);
- }
-
-
- private readonly List<ICachePeerAdvertisment> _connectedPeers;
+
///<inheritdoc/>
- public ICachePeerAdvertisment[] GetNewPeers()
+ public ICacheNodeAdvertisment[] GetNewPeers()
{
lock (_connectedPeers)
{
//Get all discovered peers
- ICachePeerAdvertisment[] peers = _peers.GetAllNodes();
+ ICacheNodeAdvertisment[] peers = _config.Config.NodeCollection.GetAllNodes();
//Get the difference between the discovered peers and the connected peers
return peers.Except(_connectedPeers).ToArray();
@@ -146,7 +130,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Distribution
}
///<inheritdoc/>
- public void OnPeerListenerAttached(ICachePeerAdvertisment peer)
+ public void OnPeerListenerAttached(ICacheNodeAdvertisment peer)
{
lock (_connectedPeers)
{
@@ -156,7 +140,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Distribution
}
///<inheritdoc/>
- public void OnPeerListenerDetatched(ICachePeerAdvertisment peer)
+ public void OnPeerListenerDetatched(ICacheNodeAdvertisment peer)
{
//remove from connected peers
lock (_connectedPeers)
@@ -164,5 +148,14 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Distribution
_connectedPeers.Remove(peer);
}
}
+
+
+ private sealed record class ErrorHandler(ILogProvider Logger) : ICacheDiscoveryErrorHandler
+ {
+ public void OnDiscoveryError(ICacheNodeAdvertisment errorNode, Exception ex)
+ {
+ Logger.Error("Failed to discover nodes from nodeid {nid}, with error\n{err}", errorNode, ex);
+ }
+ }
}
}
diff --git a/plugins/ObjectCacheServer/src/Endpoints/BrokerHeartBeatEndpoint.cs b/plugins/ObjectCacheServer/src/Endpoints/BrokerHeartBeatEndpoint.cs
deleted file mode 100644
index b9c00e6..0000000
--- a/plugins/ObjectCacheServer/src/Endpoints/BrokerHeartBeatEndpoint.cs
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
-* Copyright (c) 2023 Vaughn Nugent
-*
-* Library: VNLib
-* Package: ObjectCacheServer
-* File: BrokerHeartBeatEndpoint.cs
-*
-* BrokerHeartBeatEndpoint.cs is part of ObjectCacheServer which is part of the larger
-* VNLib collection of libraries and utilities.
-*
-* ObjectCacheServer is free software: you can redistribute it and/or modify
-* it under the terms of the GNU Affero General Public License as
-* published by the Free Software Foundation, either version 3 of the
-* License, or (at your option) any later version.
-*
-* ObjectCacheServer is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-* GNU Affero General Public License for more details.
-*
-* You should have received a copy of the GNU Affero General Public License
-* along with this program. If not, see https://www.gnu.org/licenses/.
-*/
-
-using System;
-using System.Net;
-using System.Linq;
-using System.Text.Json;
-using System.Threading.Tasks;
-
-
-using VNLib.Plugins;
-using VNLib.Utils.Logging;
-using VNLib.Plugins.Essentials;
-using VNLib.Hashing.IdentityUtility;
-using VNLib.Plugins.Essentials.Endpoints;
-using VNLib.Plugins.Essentials.Extensions;
-using VNLib.Plugins.Extensions.Loading;
-
-namespace VNLib.Data.Caching.ObjectCache.Server
-{
- internal sealed class BrokerHeartBeatEndpoint : ResourceEndpointBase
- {
- private readonly IBrokerHeartbeatNotifier _heartBeat;
- private readonly Task<IPAddress[]> BrokerIpList;
- private readonly bool DebugMode;
-
- ///<inheritdoc/>
- protected override ProtectionSettings EndpointProtectionSettings { get; } = new()
- {
- DisableBrowsersOnly = true,
- DisableSessionsRequired = true
- };
-
- public BrokerHeartBeatEndpoint(PluginBase plugin)
- {
- //Get debug flag
- DebugMode = plugin.IsDebug();
-
- //Get or create the current node config
- _heartBeat = plugin.GetOrCreateSingleton<NodeConfig>();
-
- /*
- * Resolve the ip address of the broker and store it to verify connections
- * later
- */
- BrokerIpList = Dns.GetHostAddressesAsync(_heartBeat.GetBrokerAddress().DnsSafeHost);
-
- //Setup endpoint
- InitPathAndLog("/heartbeat", plugin.Log);
- }
-
-
- protected override async ValueTask<VfReturnType> GetAsync(HttpEntity entity)
- {
- //If-not loopback then verify server address
- if (!entity.Server.IsLoopBack())
- {
- //Load and verify the broker's ip address matches with an address we have stored
- IPAddress[] addresses = await BrokerIpList;
-
- if (!addresses.Contains(entity.TrustedRemoteIp))
- {
- if (DebugMode)
- {
- Log.Debug("Received connection {ip} that was not a DNS safe address for the broker server, access denied");
- }
-
- //Token invalid
- entity.CloseResponse(HttpStatusCode.Forbidden);
- return VfReturnType.VirtualSkip;
- }
- }
-
- //Get the authorization jwt
- string? jwtAuth = entity.Server.Headers[HttpRequestHeader.Authorization];
-
- if (string.IsNullOrWhiteSpace(jwtAuth))
- {
- //Token invalid
- entity.CloseResponse(HttpStatusCode.Forbidden);
- return VfReturnType.VirtualSkip;
- }
-
- //Parse the jwt
- using JsonWebToken jwt = JsonWebToken.Parse(jwtAuth);
-
- //Verify the jwt using the broker's public key certificate
- using (ReadOnlyJsonWebKey cert = _heartBeat.GetBrokerPublicKey())
- {
- //Verify the jwt
- if (!jwt.VerifyFromJwk(cert))
- {
- //Token invalid
- entity.CloseResponse(HttpStatusCode.Forbidden);
- return VfReturnType.VirtualSkip;
- }
- }
-
- string? auth;
- //Recover the auth token from the jwt
- using (JsonDocument doc = jwt.GetPayload())
- {
- auth = doc.RootElement.GetProperty("token").GetString();
- }
-
- //Get our stored token used for registration
- string? selfToken = _heartBeat.GetAuthToken();
-
- //Verify token
- if (selfToken != null && selfToken.Equals(auth, StringComparison.Ordinal))
- {
- //Signal keepalive
- _heartBeat.HearbeatReceived();
- entity.CloseResponse(HttpStatusCode.OK);
- return VfReturnType.VirtualSkip;
- }
-
- if (DebugMode)
- {
- Log.Debug("Invalid auth token recieved from broker sever, access denied");
- }
-
- //Token invalid
- entity.CloseResponse(HttpStatusCode.Forbidden);
- return VfReturnType.VirtualSkip;
- }
- }
-}
diff --git a/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs b/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs
index 167a7e9..8352635 100644
--- a/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs
+++ b/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs
@@ -31,10 +31,8 @@ using System.Collections.Generic;
using VNLib.Hashing;
using VNLib.Net.Http;
-using VNLib.Utils.Async;
using VNLib.Utils.Memory;
using VNLib.Utils.Logging;
-using VNLib.Utils.Extensions;
using VNLib.Data.Caching;
using VNLib.Data.Caching.Extensions;
using VNLib.Hashing.IdentityUtility;
@@ -49,21 +47,21 @@ using VNLib.Plugins.Essentials.Extensions;
using VNLib.Plugins.Extensions.Loading.Routing;
using VNLib.Data.Caching.ObjectCache.Server.Distribution;
-namespace VNLib.Data.Caching.ObjectCache.Server
+
+namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
{
[ConfigurationName("connect_endpoint")]
- internal sealed class ConnectEndpoint : ResourceEndpointBase, IAsyncBackgroundWork
+ internal sealed class ConnectEndpoint : ResourceEndpointBase
{
private static readonly TimeSpan AuthTokenExpiration = TimeSpan.FromSeconds(30);
- private readonly CacheNodeConfiguration NodeConfiguration;
+ private readonly NodeConfig NodeConfiguration;
private readonly ICacheEventQueueManager PubSubManager;
private readonly IPeerMonitor Peers;
private readonly BlobCacheListener Store;
- private readonly CacheAuthKeyStore KeyStore;
private readonly bool VerifyIp;
private readonly string AudienceLocalServerId;
@@ -94,8 +92,6 @@ namespace VNLib.Data.Caching.ObjectCache.Server
InitPathAndLog(path, plugin.Log);
- KeyStore = new(plugin);
-
//Check for ip-verification flag
VerifyIp = config.TryGetValue("verify_ip", out JsonElement vIpEl) && vIpEl.GetBoolean();
@@ -103,7 +99,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server
PubSubManager = plugin.GetOrCreateSingleton<CacheEventQueueManager>();
//Get node configuration
- NodeConfiguration = plugin.GetOrCreateSingleton<NodeConfig>().Config;
+ NodeConfiguration = plugin.GetOrCreateSingleton<NodeConfig>();
//Get peer monitor
Peers = plugin.GetOrCreateSingleton<CachePeerMonitor>();
@@ -119,9 +115,6 @@ namespace VNLib.Data.Caching.ObjectCache.Server
* know client tokens belong to us when singed by the same key
*/
AudienceLocalServerId = Guid.NewGuid().ToString("N");
-
- //Schedule the queue worker to be run
- _ = plugin.ObserveWork(this, 100);
}
@@ -142,7 +135,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server
* received the messages properly
*/
- protected override async ValueTask<VfReturnType> GetAsync(HttpEntity entity)
+ protected override VfReturnType Get(HttpEntity entity)
{
//Parse jwt from authoriation
string? jwtAuth = entity.Server.Headers[HttpRequestHeader.Authorization];
@@ -161,22 +154,18 @@ namespace VNLib.Data.Caching.ObjectCache.Server
{
bool verified = false;
- //Get the client public key certificate to verify the client's message
- using(ReadOnlyJsonWebKey cert = await KeyStore.GetClientPublicKeyAsync())
+ //verify signature for client
+ if (NodeConfiguration.KeyStore.VerifyJwt(jwt))
{
- //verify signature for client
- if (jwt.VerifyFromJwk(cert))
- {
- verified = true;
- }
- //May be signed by a cache server
- else
- {
- //Set peer and verified flag since the another cache server signed the request
- isPeer = verified = NodeConfiguration.VerifyCache(jwt);
- }
+ verified = true;
+ }
+ //May be signed by a cache server
+ else
+ {
+ //Set peer and verified flag since the another cache server signed the request
+ isPeer = verified = NodeConfiguration.KeyStore.VerifyCachePeer(jwt);
}
-
+
//Check flag
if (!verified)
{
@@ -204,7 +193,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server
//Verified, now we can create an auth message with a short expiration
using JsonWebToken auth = new();
- auth.WriteHeader(NodeConfiguration.GetJwtHeader());
+ auth.WriteHeader(NodeConfiguration.KeyStore.GetJwtHeader());
auth.InitPayloadClaim()
.AddClaim("aud", AudienceLocalServerId)
.AddClaim("exp", entity.RequestedTimeUtc.Add(AuthTokenExpiration).ToUnixTimeSeconds())
@@ -223,14 +212,14 @@ namespace VNLib.Data.Caching.ObjectCache.Server
.CommitClaims();
//Sign the auth message from our private key
- NodeConfiguration.SignJwt(auth);
+ NodeConfiguration.KeyStore.SignJwt(auth);
//Close response
entity.CloseResponse(HttpStatusCode.OK, ContentType.Text, auth.DataBuffer);
return VfReturnType.VirtualSkip;
}
- protected override async ValueTask<VfReturnType> WebsocketRequestedAsync(HttpEntity entity)
+ protected override VfReturnType WebsocketRequested(HttpEntity entity)
{
//Parse jwt from authorization
string? jwtAuth = entity.Server.Headers[HttpRequestHeader.Authorization];
@@ -251,13 +240,13 @@ namespace VNLib.Data.Caching.ObjectCache.Server
}
string? nodeId = null;
- ICachePeerAdvertisment? discoveryAd = null;
+ ICacheNodeAdvertisment? discoveryAd = null;
//Parse jwt
using (JsonWebToken jwt = JsonWebToken.Parse(jwtAuth))
{
//verify signature against the cache public key, since this server must have signed it
- if (!NodeConfiguration.VerifyCache(jwt))
+ if (!NodeConfiguration.KeyStore.VerifyCachePeer(jwt))
{
entity.CloseResponse(HttpStatusCode.Unauthorized);
return VfReturnType.VirtualSkip;
@@ -313,7 +302,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server
if (isPeer)
{
//Verify token signature against a fellow cache public key
- if (!NodeConfiguration.VerifyUpgradeToken(clientSignature, jwtAuth))
+ if (!NodeConfiguration.KeyStore.VerifyUpgradeToken(clientSignature, jwtAuth))
{
entity.CloseResponse(HttpStatusCode.Unauthorized);
return VfReturnType.VirtualSkip;
@@ -325,16 +314,13 @@ namespace VNLib.Data.Caching.ObjectCache.Server
//Verify the node advertisement header and publish it
if (!string.IsNullOrWhiteSpace(discoveryHeader))
{
- discoveryAd = NodeConfiguration.VerifyPeerAdvertisment(discoveryHeader);
+ discoveryAd = NodeConfiguration.KeyStore.VerifyPeerAdvertisment(discoveryHeader);
}
}
else
{
//Not a peer, so verify against the client's public key
- using ReadOnlyJsonWebKey clientPub = await KeyStore.GetClientPublicKeyAsync();
-
- //Verify token signature
- if (!FBMDataCacheExtensions.VerifyUpgradeToken(clientSignature, jwtAuth, clientPub))
+ if (!NodeConfiguration.KeyStore.VerifyUpgradeToken(clientSignature, jwtAuth))
{
entity.CloseResponse(HttpStatusCode.Unauthorized);
return VfReturnType.VirtualSkip;
@@ -426,7 +412,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server
if (!string.IsNullOrWhiteSpace(state.NodeId))
{
//Get the event queue for the current node
- AsyncQueue<ChangeEvent> queue = PubSubManager.Subscribe(state);
+ IPeerEventQueue queue = PubSubManager.Subscribe(state);
try
{
@@ -470,44 +456,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server
Log.Debug("Server websocket exited");
}
-
-
- //Background worker to process event queue items
- async Task IAsyncBackgroundWork.DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken)
- {
- const int accumulatorSize = 64;
-
- try
- {
- //Accumulator for events
- ChangeEvent[] accumulator = new ChangeEvent[accumulatorSize];
- int ptr = 0;
-
- //Listen for changes
- while (true)
- {
- //Wait for next event
- accumulator[ptr++] = await Store.EventQueue.DequeueAsync(exitToken);
-
- //try to accumulate more events until we can't anymore
- while (Store.EventQueue.TryDequeue(out ChangeEvent? ev) && ptr < accumulatorSize)
- {
- accumulator[ptr++] = ev;
- }
-
- //Publish all events to subscribers
- PubSubManager.PublishMultiple(accumulator.AsSpan(0, ptr));
-
- //Reset pointer
- ptr = 0;
- }
- }
- catch (OperationCanceledException)
- {
- //Normal exit
- pluginLog.Debug("Change queue listener worker exited");
- }
- }
+
private class WsUserState : ICachePeer
{
@@ -516,7 +465,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server
public int MaxMessageSize { get; init; }
public int MaxResponseBufferSize { get; init; }
public string? NodeId { get; init; }
- public ICachePeerAdvertisment? Advertisment { get; init; }
+ public ICacheNodeAdvertisment? Advertisment { get; init; }
public override string ToString()
{
diff --git a/plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs b/plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs
index 670d624..90ffca0 100644
--- a/plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs
+++ b/plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs
@@ -23,8 +23,8 @@
*/
using System;
-using System.Linq;
using System.Net;
+using System.Linq;
using System.Text.Json;
using System.Threading.Tasks;
@@ -76,14 +76,12 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
using(JsonWebToken jwt = JsonWebToken.Parse(authToken))
{
//try to verify against cache node first
- if (!Config.Config.VerifyCache(jwt))
+ if (!Config.KeyStore.VerifyCachePeer(jwt))
{
//failed...
//try to verify against client key
- using ReadOnlyJsonWebKey clientPub = await Config.KeyStore.GetClientPublicKeyAsync();
-
- if (!jwt.VerifyFromJwk(clientPub))
+ if (!Config.KeyStore.VerifyJwt(jwt))
{
//invalid token
entity.CloseResponse(HttpStatusCode.Unauthorized);
@@ -97,7 +95,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
}
//Valid key, get peer list to send to client
- ICachePeerAdvertisment[] peers = PeerMonitor.GetAllPeers()
+ ICacheNodeAdvertisment[] peers = PeerMonitor.GetAllPeers()
.Where(static p => p.Advertisment != null)
.Select(static p => p.Advertisment!)
.ToArray();
@@ -106,7 +104,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
using JsonWebToken response = new();
//set header from cache config
- response.WriteHeader(Config.Config.GetJwtHeader());
+ response.WriteHeader(Config.KeyStore.GetJwtHeader());
response.InitPayloadClaim()
.AddClaim("iss", Config.Config.NodeId)
@@ -119,7 +117,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
.CommitClaims();
//Sign the response
- Config.Config.SignJwt(response);
+ Config.KeyStore.SignJwt(response);
//Send response to client
entity.CloseResponse(HttpStatusCode.OK, Net.Http.ContentType.Text, response.DataBuffer);
diff --git a/plugins/ObjectCacheServer/src/ICacheEventQueueManager.cs b/plugins/ObjectCacheServer/src/ICacheEventQueueManager.cs
index 6b07000..20edf0b 100644
--- a/plugins/ObjectCacheServer/src/ICacheEventQueueManager.cs
+++ b/plugins/ObjectCacheServer/src/ICacheEventQueueManager.cs
@@ -24,9 +24,6 @@
using System;
-using VNLib.Utils.Async;
-
-
namespace VNLib.Data.Caching.ObjectCache.Server
{
/// <summary>
@@ -49,14 +46,14 @@ namespace VNLib.Data.Caching.ObjectCache.Server
/// <summary>
/// Attatches a subscriber that will receive all published changes
/// </summary>
- /// <param name="nodeId">The id of the node to get the queue for</param>
+ /// <param name="peer">The peer node that wishes to subscribe for events</param>
/// <returns>The initilaizes event queue for the single subscriber</returns>
- AsyncQueue<ChangeEvent> Subscribe(ICachePeer peer);
+ IPeerEventQueue Subscribe(ICachePeer peer);
/// <summary>
/// Detatches a subscriber from the event queue
/// </summary>
- /// <param name="nodeId">The id of the nede to detach</param>
+ /// <param name="peer">The peer to unsubscribe from events</param>
void Unsubscribe(ICachePeer peer);
/// <summary>
diff --git a/plugins/ObjectCacheServer/src/ICachePeer.cs b/plugins/ObjectCacheServer/src/ICachePeer.cs
index d374400..97b406f 100644
--- a/plugins/ObjectCacheServer/src/ICachePeer.cs
+++ b/plugins/ObjectCacheServer/src/ICachePeer.cs
@@ -39,6 +39,6 @@ namespace VNLib.Data.Caching.ObjectCache.Server
/// <summary>
/// An optional signed advertisment message for other peers
/// </summary>
- ICachePeerAdvertisment? Advertisment { get; }
+ ICacheNodeAdvertisment? Advertisment { get; }
}
}
diff --git a/plugins/ObjectCacheServer/src/IPeerEventQueue.cs b/plugins/ObjectCacheServer/src/IPeerEventQueue.cs
new file mode 100644
index 0000000..63dbd3f
--- /dev/null
+++ b/plugins/ObjectCacheServer/src/IPeerEventQueue.cs
@@ -0,0 +1,50 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: ObjectCacheServer
+* File: CacheEventQueueManager.cs
+*
+* CacheEventQueueManager.cs is part of ObjectCacheServer which is
+* part of the larger VNLib collection of libraries and utilities.
+*
+* ObjectCacheServer is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* ObjectCacheServer is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System.Threading;
+using System.Threading.Tasks;
+
+
+namespace VNLib.Data.Caching.ObjectCache.Server
+{
+ /// <summary>
+ /// Represents a queue of events for a specific peer node
+ /// </summary>
+ internal interface IPeerEventQueue
+ {
+ /// <summary>
+ /// Dequeues an event from the queue asynchronously
+ /// </summary>
+ /// <param name="cancellation">A token to cancel the operation</param>
+ /// <returns>The value task that represents the wait</returns>
+ ValueTask<ChangeEvent> DequeueAsync(CancellationToken cancellation);
+
+ /// <summary>
+ /// Attemts to dequeue an event from the queue
+ /// </summary>
+ /// <param name="change">The change event that was dequeued if possible</param>
+ /// <returns>True if the event was dequeued</returns>
+ bool TryDequeue(out ChangeEvent change);
+ }
+}
diff --git a/plugins/ObjectCacheServer/src/NodeConfig.cs b/plugins/ObjectCacheServer/src/NodeConfig.cs
index 614f0d6..81b8a32 100644
--- a/plugins/ObjectCacheServer/src/NodeConfig.cs
+++ b/plugins/ObjectCacheServer/src/NodeConfig.cs
@@ -3,9 +3,9 @@
*
* Library: VNLib
* Package: ObjectCacheServer
-* File: ObjectCacheServerEntry.cs
+* File: NodeConfig.cs
*
-* ObjectCacheServerEntry.cs is part of ObjectCacheServer which is part of the larger
+* NodeConfig.cs is part of ObjectCacheServer which is part of the larger
* VNLib collection of libraries and utilities.
*
* ObjectCacheServer is free software: you can redistribute it and/or modify
@@ -25,63 +25,48 @@
using System;
using System.Net;
using System.Linq;
-using System.Net.Http;
using System.Text.Json;
-using System.Threading;
-using System.Net.Sockets;
using System.Threading.Tasks;
-using System.Collections.Generic;
-using System.Security.Cryptography;
using VNLib.Plugins;
using VNLib.Utils;
using VNLib.Utils.Logging;
-using VNLib.Utils.Extensions;
-using VNLib.Hashing;
-using VNLib.Hashing.IdentityUtility;
using VNLib.Data.Caching.Extensions;
using VNLib.Plugins.Extensions.Loading;
-
+using VNLib.Data.Caching.ObjectCache.Server.Endpoints;
namespace VNLib.Data.Caching.ObjectCache.Server
{
[ConfigurationName("cluster")]
- internal sealed class NodeConfig : VnDisposeable, IAsyncConfigurable, IAsyncBackgroundWork, IBrokerHeartbeatNotifier
+ internal sealed class NodeConfig : VnDisposeable
{
const string CacheConfigTemplate =
@"
Cluster Configuration:
- Broker Address: {ba}
- Heartbeat Timeout: {hb}
Node Id: {id}
TlsEndabled: {tls},
Cache Endpoint: {ep}
";
public CacheNodeConfiguration Config { get; }
- public CacheAuthKeyStore KeyStore { get; }
-
- private readonly ManualResetEventSlim hearbeatHandle;
- private readonly TimeSpan _hearbeatTimeout;
- private string? _authToken;
+ public CacheAuthKeyStore KeyStore { get; }
public NodeConfig(PluginBase plugin, IConfigScope config)
{
//Server id is just dns name for now
string nodeId = Dns.GetHostName();
- Config = new();
- //Get the heartbeat interval
- TimeSpan heartBeatDelayMs = config["heartbeat_timeout_sec"].GetTimeSpan(TimeParseType.Seconds);
-
- string brokerAddr = config["broker_address"].GetString() ?? throw new KeyNotFoundException("Missing required key 'broker_address' for config 'cluster'");
+ Config = new();
//Get the port of the primary webserver
int port;
bool usingTls;
{
- JsonElement firstHost = plugin.HostConfig.GetProperty("virtual_hosts").EnumerateArray().First();
+ //Get the port number of the first virtual host
+ JsonElement firstHost = plugin.HostConfig.GetProperty("virtual_hosts")
+ .EnumerateArray()
+ .First();
port = firstHost.GetProperty("interface")
.GetProperty("port")
@@ -91,147 +76,51 @@ namespace VNLib.Data.Caching.ObjectCache.Server
usingTls = firstHost.TryGetProperty("ssl", out _);
}
- //Get the cache endpoint config
- IConfigScope cacheEpConfig = plugin.GetConfigForType<ConnectEndpoint>();
-
//The endpoint to advertise to cache clients that allows cache connections
- UriBuilder endpoint = new(usingTls ? Uri.UriSchemeHttps : Uri.UriSchemeHttp, nodeId, port, cacheEpConfig["path"].GetString());
+ Uri cacheEndpoint = GetEndpointUri<ConnectEndpoint>(plugin, usingTls, port, nodeId);
+
+ //Init key store
+ KeyStore = new(plugin);
//Setup cache node config
- Config.WithCacheEndpoint(endpoint.Uri)
+ Config.WithCacheEndpoint(cacheEndpoint)
.WithNodeId(nodeId)
- .WithTls(usingTls)
- .WithBroker(new(brokerAddr));
+ .WithAuthenticator(KeyStore)
+ .WithTls(usingTls);
//Check if advertising is enabled
if(config.TryGetValue("advertise", out JsonElement adEl) && adEl.GetBoolean())
{
- Config.EnableAdvertisment(true, "");
- }
-
- //Init key store
- KeyStore = new(plugin);
-
- //Init heartbeat handle unsiganled waiting for first heartbeat
- hearbeatHandle = new(false);
-
- //Schedule heartbeat
- _ = plugin.ObserveWork(this, 500);
+ //Get the the broadcast endpoint
+ Uri discoveryEndpoint = GetEndpointUri<PeerDiscoveryEndpoint>(plugin, usingTls, port, nodeId);
+ //Enable advertising
+ Config.EnableAdvertisment(discoveryEndpoint);
+ }
+
+
//log the config
plugin.Log.Information(CacheConfigTemplate,
- brokerAddr,
- heartBeatDelayMs,
nodeId,
usingTls,
- endpoint.Uri);
+ cacheEndpoint
+ );
}
- async Task IAsyncConfigurable.ConfigureServiceAsync(PluginBase plugin)
+ private static Uri GetEndpointUri<T>(PluginBase plugin, bool usingTls, int port, string hostName) where T: IEndpoint
{
- //Get cache private key for signing from the key store
- ReadOnlyJsonWebKey signingKey = await KeyStore.GetCachePrivateAsync();
-
- Config.WithSigningKey(signingKey);
-
- //Get broker public key for verifying from the key store
- ReadOnlyJsonWebKey brokerKey = await KeyStore.GetBrokerPublicAsync();
+ //Get the cache endpoint config
+ IConfigScope cacheEpConfig = plugin.GetConfigForType<T>();
- Config.WithBrokerVerificationKey(brokerKey);
+ //The endpoint to advertise to cache clients that allows cache connections
+ return new UriBuilder(usingTls ? Uri.UriSchemeHttps : Uri.UriSchemeHttp, hostName, port, cacheEpConfig["path"].GetString()).Uri;
}
+
protected override void Free()
{
- //Dispose the heartbeat handle
- hearbeatHandle.Dispose();
-
//cleanup keys
- Config.SigningKey?.Dispose();
- Config.VerificationKey?.Dispose();
- Config.BrokerVerificationKey?.Dispose();
- }
-
- ///<inheritdoc/>
- public void HearbeatReceived()
- {
- //Set the heartbeat handle as received
- hearbeatHandle.Set();
+
}
-
- ///<inheritdoc/>
- public string? GetAuthToken() => _authToken;
-
- ///<inheritdoc/>
- public Uri GetBrokerAddress() => Config.DiscoveryEndpoint!;
-
- ///<inheritdoc/>
- public ReadOnlyJsonWebKey GetBrokerPublicKey() => Config.BrokerVerificationKey!;
-
-
- /*
- * Worker loop for registering with the broker and monitoring hearbeat requests
- */
- async Task IAsyncBackgroundWork.DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken)
- {
- //Listen in loop
- while (true)
- {
- try
- {
- //Regen the auth token before registering
- _authToken = RandomHash.GetRandomBase32(32);
-
- pluginLog.Information("Registering with cache broker server with id {id}", Config.NodeId);
-
- //Register with the broker and pass the current auth token
- await Config.RegisterWithBrokerAsync(_authToken);
-
- //Enter heartbeat loop
- while (true)
- {
- //Wait for the heartbeat timeout
- await Task.Delay(_hearbeatTimeout, exitToken);
-
- //Confrim the hearbeat was received within the timeout period
- if (!hearbeatHandle.IsSet)
- {
- //If the heartbeat handle is not set, the heartbeat was not received, reg-register
- pluginLog.Information("Broker missed hearbeat request");
-
- //not received, break out of the heartbeat loop to re-register
- break;
- }
-
- //Reset the handle and continue the heartbeat loop
- hearbeatHandle.Reset();
- }
-
- //Add random delay to prevent all nodes from re-registering at the same time
- await Task.Delay(RandomNumberGenerator.GetInt32(1000, 5000), exitToken);
- }
- catch (OperationCanceledException)
- {
- pluginLog.Debug("Registration loop exited on unload");
- break;
- }
- catch (TimeoutException)
- {
- pluginLog.Warn("Failed to connect to cache broker server within the specified timeout period");
- }
- catch (HttpRequestException re) when (re.InnerException is SocketException)
- {
- pluginLog.Warn("Cache broker is unavailable or network is unavailable");
- }
- catch(HttpRequestException re) when (re.StatusCode.HasValue)
- {
- pluginLog.Warn("Failed to register with cache broker server, received status code {code}", re.StatusCode);
- }
- catch (Exception ex)
- {
- pluginLog.Warn("Exception occured in registraion loop: {ex}", ex!.Message);
- }
- }
- }
-
}
}
diff --git a/plugins/ObjectCacheServer/src/ObjectCacheServer.csproj b/plugins/ObjectCacheServer/src/ObjectCacheServer.csproj
index 0c53095..e8d6291 100644
--- a/plugins/ObjectCacheServer/src/ObjectCacheServer.csproj
+++ b/plugins/ObjectCacheServer/src/ObjectCacheServer.csproj
@@ -37,8 +37,8 @@
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\..\..\..\core\lib\Plugins.PluginBase\src\VNLib.Plugins.PluginBase.csproj" />
+ <ProjectReference Include="..\..\..\..\Extensions\lib\VNLib.Plugins.Extensions.Loading\src\VNLib.Plugins.Extensions.Loading.csproj" />
<ProjectReference Include="..\..\..\lib\VNLib.Data.Caching.Extensions\src\VNLib.Data.Caching.Extensions.csproj" />
<ProjectReference Include="..\..\..\lib\VNLib.Data.Caching.ObjectCache\src\VNLib.Data.Caching.ObjectCache.csproj" />
- <ProjectReference Include="..\..\CacheBroker\src\CacheBroker.csproj" />
</ItemGroup>
</Project>
diff --git a/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs b/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs
index c1a6ad2..5d9a50b 100644
--- a/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs
+++ b/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs
@@ -23,80 +23,20 @@
*/
using System;
-using System.IO;
-using System.Net;
-using System.Linq;
-using System.Net.Http;
using System.Threading;
-using System.Net.Sockets;
-using System.Threading.Tasks;
using System.Collections.Generic;
-using System.Security.Cryptography;
using VNLib.Plugins;
using VNLib.Utils.Memory;
using VNLib.Utils.Logging;
using VNLib.Utils.Memory.Diagnostics;
-using VNLib.Hashing.IdentityUtility;
-using VNLib.Data.Caching.Extensions;
-using static VNLib.Data.Caching.Constants;
-using VNLib.Net.Messaging.FBM;
-using VNLib.Net.Messaging.FBM.Client;
-using VNLib.Plugins.Cache.Broker.Endpoints;
using VNLib.Plugins.Extensions.Loading;
using VNLib.Plugins.Extensions.Loading.Routing;
using VNLib.Data.Caching.ObjectCache.Server.Endpoints;
+
namespace VNLib.Data.Caching.ObjectCache.Server
{
- sealed record class CacheAuthKeyStore(PluginBase Plugin)
- {
- public Task<ReadOnlyJsonWebKey> GetCachePublicAsync()
- {
- return Plugin.TryGetSecretAsync("cache_private_key").ToJsonWebKey(true);
- }
-
- public Task<ReadOnlyJsonWebKey> GetCachePrivateAsync()
- {
- return Plugin.TryGetSecretAsync("cache_private_key").ToJsonWebKey(true);
- }
-
- public Task<ReadOnlyJsonWebKey> GetBrokerPublicAsync()
- {
- return Plugin.TryGetSecretAsync("broker_public_key").ToJsonWebKey(true);
- }
-
- public Task<ReadOnlyJsonWebKey> GetClientPublicKeyAsync()
- {
- return Plugin.TryGetSecretAsync("client_public_key").ToJsonWebKey(true);
- }
- }
-
- internal interface IBrokerHeartbeatNotifier
- {
- /// <summary>
- /// Called when the heartbeat endpoint receives a heartbeat from the broker
- /// </summary>
- void HearbeatReceived();
-
- /// <summary>
- /// Gets the current auth token sent to the broker, which is expected to be sent back in the heartbeat
- /// </summary>
- /// <returns>The heartbeat auth token if set</returns>
- string? GetAuthToken();
-
- /// <summary>
- /// Gets the address of the broker server
- /// </summary>
- /// <returns>The full address of the broker server to connect to</returns>
- Uri GetBrokerAddress();
-
- /// <summary>
- /// Gets the public key of the broker server
- /// </summary>
- /// <returns>The broker's public key</returns>
- ReadOnlyJsonWebKey GetBrokerPublicKey();
- }
public sealed class ObjectCacheServerEntry : PluginBase
{
@@ -136,26 +76,14 @@ namespace VNLib.Data.Caching.ObjectCache.Server
{
try
{
- //Setup Node config
- NodeConfig nodeConf = this.GetOrCreateSingleton<NodeConfig>();
-
//Init connect endpoint
- ConnectEndpoint endpoint = this.Route<ConnectEndpoint>();
-
- //Route the broker endpoint
- this.Route<BrokerHeartBeatEndpoint>();
+ this.Route<ConnectEndpoint>();
//Setup discovery endpoint
if(this.HasConfigForType<PeerDiscoveryEndpoint>())
{
this.Route<PeerDiscoveryEndpoint>();
- }
-
- ulong maxByteSize = ((ulong)endpoint.CacheConfig.MaxCacheEntries * (ulong)endpoint.CacheConfig.BucketCount * (ulong)endpoint.CacheConfig.MaxMessageSize);
-
- //Log max memory usage
- Log.Debug("Maxium memory consumption {mx}Mb", maxByteSize / (ulong)(1024 * 1000));
-
+ }
Log.Information("Plugin loaded");
}