aboutsummaryrefslogtreecommitdiff
path: root/plugins
diff options
context:
space:
mode:
Diffstat (limited to 'plugins')
-rw-r--r--plugins/CacheBroker/src/Endpoints/BrokerRegistrationEndpoint.cs6
-rw-r--r--plugins/ObjectCacheServer/src/CacheConfiguration.cs (renamed from plugins/ObjectCacheServer/src/Endpoints/CacheConfiguration.cs)0
-rw-r--r--plugins/ObjectCacheServer/src/CacheEventQueueManager.cs248
-rw-r--r--plugins/ObjectCacheServer/src/Distribution/CacheNodeReplicationMaanger.cs246
-rw-r--r--plugins/ObjectCacheServer/src/Distribution/CachePeerMonitor.cs55
-rw-r--r--plugins/ObjectCacheServer/src/Distribution/ICachePeerAdapter.cs49
-rw-r--r--plugins/ObjectCacheServer/src/Distribution/INodeDiscoveryCollection.cs58
-rw-r--r--plugins/ObjectCacheServer/src/Distribution/INodeDiscoveryEnumerator.cs45
-rw-r--r--plugins/ObjectCacheServer/src/Distribution/IPeerMonitor.cs53
-rw-r--r--plugins/ObjectCacheServer/src/Distribution/NodeDiscoveryCollection.cs99
-rw-r--r--plugins/ObjectCacheServer/src/Distribution/PeerDiscoveryManager.cs168
-rw-r--r--plugins/ObjectCacheServer/src/Endpoints/BrokerHeartBeatEndpoint.cs (renamed from plugins/ObjectCacheServer/src/Endpoints/BrokerHeartBeat.cs)69
-rw-r--r--plugins/ObjectCacheServer/src/Endpoints/CacheStore.cs91
-rw-r--r--plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs437
-rw-r--r--plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs129
-rw-r--r--plugins/ObjectCacheServer/src/ICacheEventQueueManager.cs67
-rw-r--r--plugins/ObjectCacheServer/src/ICachePeer.cs44
-rw-r--r--plugins/ObjectCacheServer/src/ICacheStore.cs (renamed from plugins/ObjectCacheServer/src/Endpoints/ICacheStore.cs)0
-rw-r--r--plugins/ObjectCacheServer/src/NodeConfig.cs237
-rw-r--r--plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs540
20 files changed, 1918 insertions, 723 deletions
diff --git a/plugins/CacheBroker/src/Endpoints/BrokerRegistrationEndpoint.cs b/plugins/CacheBroker/src/Endpoints/BrokerRegistrationEndpoint.cs
index db82887..cb22176 100644
--- a/plugins/CacheBroker/src/Endpoints/BrokerRegistrationEndpoint.cs
+++ b/plugins/CacheBroker/src/Endpoints/BrokerRegistrationEndpoint.cs
@@ -109,18 +109,18 @@ namespace VNLib.Plugins.Cache.Broker.Endpoints
private async Task<ReadOnlyJsonWebKey> GetClientPublic()
{
- return await this.GetPlugin().TryGetSecretAsync("client_public_key").ToJsonWebKey() ?? throw new InvalidOperationException("Client public key not found in vault");
+ return await this.GetPlugin().GetSecretAsync("client_public_key").ToJsonWebKey();
}
private async Task<ReadOnlyJsonWebKey> GetCachePublic()
{
- using SecretResult secret = await this.GetPlugin().TryGetSecretAsync("cache_public_key") ?? throw new InvalidOperationException("Cache public key not found in vault");
+ using ISecretResult secret = await this.GetPlugin().GetSecretAsync("cache_public_key");
return secret.GetJsonWebKey();
}
private async Task<ReadOnlyJsonWebKey> GetBrokerCertificate()
{
- using SecretResult secret = await this.GetPlugin().TryGetSecretAsync("broker_private_key") ?? throw new InvalidOperationException("Broker private key not found in vault");
+ using ISecretResult secret = await this.GetPlugin().TryGetSecretAsync("broker_private_key");
return secret.GetJsonWebKey();
}
diff --git a/plugins/ObjectCacheServer/src/Endpoints/CacheConfiguration.cs b/plugins/ObjectCacheServer/src/CacheConfiguration.cs
index f7adeb3..f7adeb3 100644
--- a/plugins/ObjectCacheServer/src/Endpoints/CacheConfiguration.cs
+++ b/plugins/ObjectCacheServer/src/CacheConfiguration.cs
diff --git a/plugins/ObjectCacheServer/src/CacheEventQueueManager.cs b/plugins/ObjectCacheServer/src/CacheEventQueueManager.cs
new file mode 100644
index 0000000..5fb6d2a
--- /dev/null
+++ b/plugins/ObjectCacheServer/src/CacheEventQueueManager.cs
@@ -0,0 +1,248 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: ObjectCacheServer
+* File: CacheEventQueueManager.cs
+*
+* CacheEventQueueManager.cs is part of ObjectCacheServer which is
+* part of the larger VNLib collection of libraries and utilities.
+*
+* ObjectCacheServer is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* ObjectCacheServer is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using System.Threading.Channels;
+using System.Collections.Generic;
+
+using VNLib.Plugins;
+using VNLib.Utils.Async;
+using VNLib.Utils.Logging;
+using VNLib.Utils.Extensions;
+using VNLib.Plugins.Extensions.Loading;
+using VNLib.Plugins.Extensions.Loading.Events;
+
+
+namespace VNLib.Data.Caching.ObjectCache.Server
+{
+ [ConfigurationName("event_manager")]
+ internal sealed class CacheEventQueueManager : ICacheEventQueueManager, IDisposable, IIntervalScheduleable
+ {
+ private readonly int MaxQueueDepth;
+
+ private readonly object SubLock;
+ private readonly LinkedList<NodeQueue> Subscribers;
+
+ private readonly object StoreLock;
+ private readonly Dictionary<string, NodeQueue> QueueStore;
+
+
+ public CacheEventQueueManager(PluginBase plugin, IConfigScope config)
+ {
+ //Get purge interval
+ TimeSpan purgeInterval = config["purge_interval_sec"].GetTimeSpan(TimeParseType.Seconds);
+
+ //Get max queue depth
+ MaxQueueDepth = (int)config["max_depth"].GetUInt32();
+
+ //Create purge interval
+ plugin.ScheduleInterval(this, purgeInterval);
+
+ SubLock = new();
+ Subscribers = new();
+
+ StoreLock = new();
+ QueueStore = new(StringComparer.OrdinalIgnoreCase);
+ }
+
+ ///<inheritdoc/>
+ public AsyncQueue<ChangeEvent> Subscribe(ICachePeer peer)
+ {
+ NodeQueue? nq;
+
+ bool isNew = false;
+
+ //Enter sync lock
+ lock (StoreLock)
+ {
+ //Try to recover the queue for the node
+ if(!QueueStore.TryGetValue(peer.NodeId, out nq))
+ {
+ //Create new queue
+ nq = new(peer.NodeId, MaxQueueDepth);
+ QueueStore.Add(peer.NodeId, nq);
+ isNew = true;
+ }
+
+ //Increment listener count
+ nq.Listeners++;
+ }
+
+ //Publish new peer to subscribers list
+ if (isNew)
+ {
+ lock (SubLock)
+ {
+ //Add peer to subscribers list
+ Subscribers.AddLast(nq);
+ }
+ }
+
+ //Return the node's queue
+ return nq.Queue;
+ }
+
+ ///<inheritdoc/>
+ public void Unsubscribe(ICachePeer peer)
+ {
+ //Detach a listener for a node
+ lock (StoreLock)
+ {
+ //Get the queue and decrement the listener count
+ NodeQueue nq = QueueStore[peer.NodeId];
+ nq.Listeners--;
+ }
+ }
+
+ ///<inheritdoc/>
+ public void PublishSingle(ChangeEvent change)
+ {
+ //Wait to enter the sub lock
+ lock (SubLock)
+ {
+ //Loop through ll the fast way
+ LinkedListNode<NodeQueue>? q = Subscribers.First;
+
+ while (q != null)
+ {
+ //Pub single event node
+ q.Value.PublishChange(change);
+
+ //Get next queue
+ q = q.Next;
+ }
+ }
+ }
+
+ ///<inheritdoc/>
+ public void PublishMultiple(Span<ChangeEvent> changes)
+ {
+ //Wait to enter the sub lock
+ lock (SubLock)
+ {
+ //Loop through ll the fast way
+ LinkedListNode<NodeQueue>? q = Subscribers.First;
+
+ while (q != null)
+ {
+ //Publish multiple
+ q.Value.PublishChanges(changes);
+
+ //Get next queue
+ q = q.Next;
+ }
+ }
+ }
+
+ ///<inheritdoc/>
+ public void PurgeStaleSubscribers()
+ {
+ //Enter locks
+ lock(SubLock)
+ lock(StoreLock)
+ {
+ //Get all stale queues (queues without listeners)
+ NodeQueue[] staleQueues = QueueStore.Values.Where(static nq => nq.Listeners == 0).ToArray();
+
+ foreach (NodeQueue nq in staleQueues)
+ {
+ //Remove from store
+ QueueStore.Remove(nq.NodeId);
+
+ //remove from subscribers
+ Subscribers.Remove(nq);
+ }
+ }
+ }
+
+ //Interval to purge stale subscribers
+ Task IIntervalScheduleable.OnIntervalAsync(ILogProvider log, CancellationToken cancellationToken)
+ {
+ //Purge
+ PurgeStaleSubscribers();
+ return Task.CompletedTask;
+ }
+
+ void IDisposable.Dispose()
+ {
+ QueueStore.Clear();
+ Subscribers.Clear();
+ }
+
+ /*
+ * Holds queues for each node and keeps track of the number of listeners
+ * attached to the queue
+ */
+
+ private sealed class NodeQueue
+ {
+ public int Listeners;
+
+ public string NodeId { get; }
+
+ public AsyncQueue<ChangeEvent> Queue { get; }
+
+ public NodeQueue(string nodeId, int maxDepth)
+ {
+ NodeId = nodeId;
+
+ /*
+ * Create a bounded channel that acts as a lru and evicts
+ * the oldest item when the queue is full
+ *
+ * There will also only ever be a single thread writing events
+ * to the queue
+ */
+
+ BoundedChannelOptions queueOptions = new(maxDepth)
+ {
+ AllowSynchronousContinuations = true,
+ SingleReader = false,
+ SingleWriter = true,
+ //Drop oldest item in queue if full
+ FullMode = BoundedChannelFullMode.DropOldest,
+ };
+
+ //Init queue/channel
+ Queue = new(queueOptions);
+ }
+
+ public void PublishChange(ChangeEvent change)
+ {
+ Queue.TryEnque(change);
+ }
+
+ public void PublishChanges(Span<ChangeEvent> changes)
+ {
+ for(int i = 0; i < changes.Length; i++)
+ {
+ Queue.TryEnque(changes[i]);
+ }
+ }
+ }
+ }
+}
diff --git a/plugins/ObjectCacheServer/src/Distribution/CacheNodeReplicationMaanger.cs b/plugins/ObjectCacheServer/src/Distribution/CacheNodeReplicationMaanger.cs
new file mode 100644
index 0000000..b453dcc
--- /dev/null
+++ b/plugins/ObjectCacheServer/src/Distribution/CacheNodeReplicationMaanger.cs
@@ -0,0 +1,246 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: ObjectCacheServer
+* File: ObjectCacheServerEntry.cs
+*
+* ObjectCacheServerEntry.cs is part of ObjectCacheServer which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* ObjectCacheServer is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* ObjectCacheServer is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using System.Collections.Generic;
+
+using VNLib.Plugins;
+using VNLib.Utils.Logging;
+using VNLib.Data.Caching.Extensions;
+using static VNLib.Data.Caching.Constants;
+using VNLib.Net.Messaging.FBM;
+using VNLib.Net.Messaging.FBM.Client;
+using VNLib.Plugins.Extensions.Loading;
+
+namespace VNLib.Data.Caching.ObjectCache.Server.Distribution
+{
+ internal sealed class CacheNodeReplicationMaanger : IAsyncBackgroundWork
+ {
+ private static readonly TimeSpan GetItemTimeout = TimeSpan.FromSeconds(10);
+
+ private readonly NodeConfig NodeConfig;
+ private readonly ICachePeerAdapter PeerAdapter;
+ private readonly ICacheStore CacheStore;
+ private readonly FBMClientConfig ClientConfig;
+ private readonly PluginBase Plugin;
+
+ private CacheNodeConfiguration CacheConfig => NodeConfig.Config;
+
+ public CacheNodeReplicationMaanger(PluginBase plugin)
+ {
+ //Load the node config
+ NodeConfig = plugin.GetOrCreateSingleton<NodeConfig>();
+
+ //Get peer adapter
+ PeerAdapter = plugin.GetOrCreateSingleton<PeerDiscoveryManager>();
+
+
+ }
+
+ public async Task DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken)
+ {
+ pluginLog.Information("[REPL] Initializing node replication worker");
+
+ try
+ {
+ while (true)
+ {
+ //Get all new peers
+ ICachePeerAdvertisment[] peers = PeerAdapter.GetNewPeers();
+
+ if (peers.Length == 0)
+ {
+ pluginLog.Verbose("[REPL] No new peers to connect to");
+ }
+
+ //Connect to each peer
+ foreach (ICachePeerAdvertisment peer in peers)
+ {
+ _ = Plugin.ObserveWork(() => OnNewPeerDoWorkAsync(peer, pluginLog, exitToken));
+ }
+
+ //Wait for a new peers
+ await Task.Delay(10000, exitToken);
+ }
+ }
+ catch (OperationCanceledException)
+ {
+ //Normal exit
+ }
+ catch
+ {
+ pluginLog.Error("[REPL] Node replication worker exited with an error");
+ throw;
+ }
+ finally
+ {
+
+ }
+
+ pluginLog.Information("[REPL] Node replication worker exited");
+ }
+
+ private async Task OnNewPeerDoWorkAsync(ICachePeerAdvertisment newPeer, ILogProvider log, CancellationToken exitToken)
+ {
+ _ = newPeer ?? throw new ArgumentNullException(nameof(newPeer));
+
+ //Setup client
+ FBMClient client = new(ClientConfig);
+
+ try
+ {
+ log.Information("Establishing replication connection to peer {server}...", newPeer.NodeId);
+
+ //Connect to the server
+ await client.ConnectToCacheAsync(newPeer, CacheConfig, exitToken);
+
+ log.Information("Connected to {server}, starting queue listeners", newPeer.NodeId);
+
+ //Start worker tasks
+ List<Task> workerTasks = new();
+
+ for (int i = 0; i < Environment.ProcessorCount; i++)
+ {
+ Task workerTask = Task.Run(() => ReplicationWorkerDoWorkAsync(client, log, exitToken), exitToken);
+
+ workerTasks.Add(workerTask);
+ }
+
+ //Wait for sync workers to exit
+ await Task.WhenAll(workerTasks);
+
+ log.Debug("All cache worker tasks exited successfully, disconnecting from {server}", newPeer.NodeId);
+
+ //Disconnect client gracefully
+ await client.DisconnectAsync(CancellationToken.None);
+ }
+ catch (InvalidResponseException ie)
+ {
+ //See if the plugin is unloading
+ if (!exitToken.IsCancellationRequested)
+ {
+ log.Debug("Peer {p} responded with invalid response packet, disconnected. reason\n {reason}", newPeer.NodeId, ie);
+ }
+ //Disconnect client gracefully
+ try
+ {
+ await client.DisconnectAsync(CancellationToken.None);
+ }
+ catch (Exception ex)
+ {
+ log.Error(ex);
+ }
+ }
+ catch (OperationCanceledException)
+ {
+ //Plugin unloading, Try to disconnect
+ try
+ {
+ await client.DisconnectAsync(CancellationToken.None);
+ }
+ catch (Exception ex)
+ {
+ log.Error(ex);
+ }
+ }
+ catch (Exception ex)
+ {
+ log.Warn("Lost connection to peer {h}\n {m}", newPeer.NodeId, ex);
+ }
+ finally
+ {
+ client.Dispose();
+
+ //Notify monitor of disconnect
+ PeerAdapter.OnPeerListenerDetatched(newPeer);
+ }
+ }
+
+ //Wroker task callback method
+ private async Task ReplicationWorkerDoWorkAsync(FBMClient client, ILogProvider log, CancellationToken exitToken)
+ {
+ //Listen for changes
+ while (true)
+ {
+ //Wait for changes
+ WaitForChangeResult changedObject = await client.WaitForChangeAsync(exitToken);
+
+ log.Debug("Object changed {typ} {obj}", changedObject.Status, changedObject.CurrentId);
+
+ switch (changedObject.Status)
+ {
+ case ResponseCodes.NotFound:
+ log.Warn("Server cache not properly configured, worker exiting");
+ return;
+ case "deleted":
+ //Delete the object from the store
+ await CacheStore.DeleteItemAsync(changedObject.CurrentId);
+ break;
+ case "modified":
+ //Reload the record from the store
+ await UpdateRecordAsync(client, log, changedObject.CurrentId, changedObject.NewId, exitToken);
+ break;
+ }
+ }
+ }
+
+ private async Task UpdateRecordAsync(FBMClient client, ILogProvider log, string objectId, string newId, CancellationToken cancellation)
+ {
+ //Get request message
+ FBMRequest modRequest = client.RentRequest();
+ try
+ {
+ //Set action as get/create
+ modRequest.WriteHeader(HeaderCommand.Action, Actions.Get);
+ //Set session-id header
+ modRequest.WriteHeader(ObjectId, string.IsNullOrWhiteSpace(newId) ? objectId : newId);
+
+ //Make request
+ using FBMResponse response = await client.SendAsync(modRequest, GetItemTimeout, cancellation);
+
+ response.ThrowIfNotSet();
+
+ //Check response code
+ string status = response.Headers.First(static s => s.Header == HeaderCommand.Status).Value.ToString();
+ if (ResponseCodes.Okay.Equals(status, StringComparison.Ordinal))
+ {
+ //Update the record
+ await CacheStore.AddOrUpdateBlobAsync(objectId, newId, static (t) => t.ResponseBody, response, cancellation);
+ log.Debug("Updated object {id}", objectId);
+ }
+ else
+ {
+ log.Warn("Object {id} was missing on the remote server", objectId);
+ }
+ }
+ finally
+ {
+ client.ReturnRequest(modRequest);
+ }
+ }
+ }
+}
diff --git a/plugins/ObjectCacheServer/src/Distribution/CachePeerMonitor.cs b/plugins/ObjectCacheServer/src/Distribution/CachePeerMonitor.cs
new file mode 100644
index 0000000..82b280c
--- /dev/null
+++ b/plugins/ObjectCacheServer/src/Distribution/CachePeerMonitor.cs
@@ -0,0 +1,55 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: ObjectCacheServer
+* File: CachePeerMonitor.cs
+*
+* CachePeerMonitor.cs is part of ObjectCacheServer which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* ObjectCacheServer is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* ObjectCacheServer is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+using System.Collections.Generic;
+
+using VNLib.Plugins;
+
+namespace VNLib.Data.Caching.ObjectCache.Server.Distribution
+{
+
+ internal sealed class CachePeerMonitor : IPeerMonitor
+ {
+ public CachePeerMonitor(PluginBase plugin)
+ {
+
+ }
+
+ public IEnumerable<ICachePeer> GetAllPeers()
+ {
+ throw new NotImplementedException();
+ }
+
+ public void OnPeerConnected(ICachePeer peer)
+ {
+ throw new NotImplementedException();
+ }
+
+ public void OnPeerDisconnected(ICachePeer peer)
+ {
+ throw new NotImplementedException();
+ }
+ }
+}
diff --git a/plugins/ObjectCacheServer/src/Distribution/ICachePeerAdapter.cs b/plugins/ObjectCacheServer/src/Distribution/ICachePeerAdapter.cs
new file mode 100644
index 0000000..d029f10
--- /dev/null
+++ b/plugins/ObjectCacheServer/src/Distribution/ICachePeerAdapter.cs
@@ -0,0 +1,49 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: ObjectCacheServer
+* File: ObjectCacheServerEntry.cs
+*
+* ObjectCacheServerEntry.cs is part of ObjectCacheServer which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* ObjectCacheServer is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* ObjectCacheServer is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using VNLib.Data.Caching.Extensions;
+
+namespace VNLib.Data.Caching.ObjectCache.Server.Distribution
+{
+ internal interface ICachePeerAdapter
+ {
+ /// <summary>
+ /// Gets the peers that have been discovered but not yet connected to
+ /// </summary>
+ /// <returns>A collection of peers that have not been connected to yet</returns>
+ ICachePeerAdvertisment[] GetNewPeers();
+
+ /// <summary>
+ /// Called when a peer has been connected to
+ /// </summary>
+ /// <param name="peer">The peer that has been connected</param>
+ void OnPeerListenerAttached(ICachePeerAdvertisment peer);
+
+ /// <summary>
+ /// Called when a peer has been disconnected from
+ /// </summary>
+ /// <param name="peer">The disconnected peer</param>
+ void OnPeerListenerDetatched(ICachePeerAdvertisment peer);
+ }
+}
diff --git a/plugins/ObjectCacheServer/src/Distribution/INodeDiscoveryCollection.cs b/plugins/ObjectCacheServer/src/Distribution/INodeDiscoveryCollection.cs
new file mode 100644
index 0000000..d69da40
--- /dev/null
+++ b/plugins/ObjectCacheServer/src/Distribution/INodeDiscoveryCollection.cs
@@ -0,0 +1,58 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: ObjectCacheServer
+* File: INodeDiscoveryCollection.cs
+*
+* INodeDiscoveryCollection.cs is part of ObjectCacheServer which is
+* part of the larger VNLib collection of libraries and utilities.
+*
+* ObjectCacheServer is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* ObjectCacheServer is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System.Collections.Generic;
+
+using VNLib.Data.Caching.Extensions;
+
+namespace VNLib.Data.Caching.ObjectCache.Server.Distribution
+{
+ internal interface INodeDiscoveryCollection
+ {
+ /// <summary>
+ /// Begins a new discovery and gets an enumerator for the discovery process
+ /// </summary>
+ /// <returns>An enumerator that simplifies discovery of unique nodes</returns>
+ INodeDiscoveryEnumerator BeginDiscovery();
+
+ /// <summary>
+ /// Begins a new discovery and gets an enumerator for the discovery process
+ /// </summary>
+ /// <param name="initialPeers">An initial collection of peers to add to the enumeration</param>
+ /// <returns>An enumerator that simplifies discovery of unique nodes</returns>
+ INodeDiscoveryEnumerator BeginDiscovery(IEnumerable<ICachePeerAdvertisment> initialPeers);
+
+ /// <summary>
+ /// Gets a snapshot of all discovered nodes in the current collection.
+ /// </summary>
+ /// <returns>The current collection of notes</returns>
+ ICachePeerAdvertisment[] GetAllNodes();
+
+ /// <summary>
+ /// Completes a discovery process and updates the collection with the results
+ /// </summary>
+ /// <param name="enumerator">The enumerator used to collect discovered nodes</param>
+ void CompleteDiscovery(INodeDiscoveryEnumerator enumerator);
+ }
+}
diff --git a/plugins/ObjectCacheServer/src/Distribution/INodeDiscoveryEnumerator.cs b/plugins/ObjectCacheServer/src/Distribution/INodeDiscoveryEnumerator.cs
new file mode 100644
index 0000000..5cddf9c
--- /dev/null
+++ b/plugins/ObjectCacheServer/src/Distribution/INodeDiscoveryEnumerator.cs
@@ -0,0 +1,45 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: ObjectCacheServer
+* File: INodeDiscoveryEnumerator.cs
+*
+* INodeDiscoveryEnumerator.cs is part of ObjectCacheServer which is part
+* of the larger VNLib collection of libraries and utilities.
+*
+* ObjectCacheServer is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* ObjectCacheServer is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System.Collections.Generic;
+
+using VNLib.Data.Caching.Extensions;
+
+namespace VNLib.Data.Caching.ObjectCache.Server.Distribution
+{
+ internal interface INodeDiscoveryEnumerator
+ {
+ /// <summary>
+ /// Moves the enumerator to the next peer in the discovery process and returns the result
+ /// </summary>
+ /// <returns>The next peer advertisment in the enumeration</returns>
+ ICachePeerAdvertisment? GetNextPeer();
+
+ /// <summary>
+ /// Adds the specified peer to the collection of discovered peers
+ /// </summary>
+ /// <param name="discoveredPeers">The peer collection</param>
+ void OnPeerDiscoveryComplete(IEnumerable<ICachePeerAdvertisment> discoveredPeers);
+ }
+}
diff --git a/plugins/ObjectCacheServer/src/Distribution/IPeerMonitor.cs b/plugins/ObjectCacheServer/src/Distribution/IPeerMonitor.cs
new file mode 100644
index 0000000..b4cb840
--- /dev/null
+++ b/plugins/ObjectCacheServer/src/Distribution/IPeerMonitor.cs
@@ -0,0 +1,53 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: ObjectCacheServer
+* File: IPeerMonitor.cs
+*
+* IPeerMonitor.cs is part of ObjectCacheServer which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* ObjectCacheServer is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* ObjectCacheServer is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System.Collections.Generic;
+
+namespace VNLib.Data.Caching.ObjectCache.Server.Distribution
+{
+ /// <summary>
+ /// Represents a monitor for peer cache servers to advertise their presence
+ /// in the cluster
+ /// </summary>
+ internal interface IPeerMonitor
+ {
+ /// <summary>
+ /// Notifies the monitor that a peer has connected to the cluster
+ /// </summary>
+ /// <param name="peer">The peer that connected</param>
+ void OnPeerConnected(ICachePeer peer);
+
+ /// <summary>
+ /// Notifies the monitor that a peer has disconnected
+ /// </summary>
+ /// <param name="peer">The peer that has disconnected</param>
+ void OnPeerDisconnected(ICachePeer peer);
+
+ /// <summary>
+ /// Gets an enumerable of all peers currently active in the current peer
+ /// </summary>
+ /// <returns></returns>
+ IEnumerable<ICachePeer> GetAllPeers();
+ }
+}
diff --git a/plugins/ObjectCacheServer/src/Distribution/NodeDiscoveryCollection.cs b/plugins/ObjectCacheServer/src/Distribution/NodeDiscoveryCollection.cs
new file mode 100644
index 0000000..f773a2e
--- /dev/null
+++ b/plugins/ObjectCacheServer/src/Distribution/NodeDiscoveryCollection.cs
@@ -0,0 +1,99 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: ObjectCacheServer
+* File: PeerDiscoveryManager.cs
+*
+* PeerDiscoveryManager.cs is part of ObjectCacheServer which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* ObjectCacheServer is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* ObjectCacheServer is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+using System.Linq;
+using System.Collections.Generic;
+
+using VNLib.Plugins;
+using VNLib.Data.Caching.Extensions;
+
+namespace VNLib.Data.Caching.ObjectCache.Server.Distribution
+{
+ sealed class NodeDiscoveryCollection : INodeDiscoveryCollection
+ {
+ private LinkedList<ICachePeerAdvertisment> _peers;
+
+
+ public NodeDiscoveryCollection(PluginBase plugin)
+ {
+ _peers = new();
+ }
+
+ ///<inheritdoc/>
+ public INodeDiscoveryEnumerator BeginDiscovery()
+ {
+ return new NodeEnumerator(new());
+ }
+
+ ///<inheritdoc/>
+ public INodeDiscoveryEnumerator BeginDiscovery(IEnumerable<ICachePeerAdvertisment> initialPeers)
+ {
+ //Init new enumerator with the initial peers
+ return new NodeEnumerator(new(initialPeers));
+ }
+
+ ///<inheritdoc/>
+ public void CompleteDiscovery(INodeDiscoveryEnumerator enumerator)
+ {
+ _ = enumerator ?? throw new ArgumentNullException(nameof(enumerator));
+
+ //Capture all nodes from the enumerator and store them as our current peers
+ _peers = (enumerator as NodeEnumerator)!.Peers;
+ }
+
+ ///<inheritdoc/>
+ public ICachePeerAdvertisment[] GetAllNodes()
+ {
+ //Capture all current peers
+ return _peers.ToArray();
+ }
+
+ private sealed record class NodeEnumerator(LinkedList<ICachePeerAdvertisment> Peers) : INodeDiscoveryEnumerator
+ {
+ //Keep track of the current node in the collection so we can move down the list
+ private LinkedListNode<ICachePeerAdvertisment>? _currentNode = Peers.First;
+
+ public ICachePeerAdvertisment? GetNextPeer()
+ {
+ //Move to the next peer in the collection
+ _currentNode = _currentNode?.Next;
+
+ return _currentNode?.Value;
+ }
+
+ public void OnPeerDiscoveryComplete(IEnumerable<ICachePeerAdvertisment> discoveredPeers)
+ {
+ //Get only the peers from the discovery that are not already in the collection
+ IEnumerable<ICachePeerAdvertisment> newPeers = discoveredPeers.Except(Peers);
+
+ //Add them to the end of the collection
+ foreach(ICachePeerAdvertisment ad in newPeers)
+ {
+ Peers.AddLast(ad);
+ }
+ }
+ }
+ }
+}
diff --git a/plugins/ObjectCacheServer/src/Distribution/PeerDiscoveryManager.cs b/plugins/ObjectCacheServer/src/Distribution/PeerDiscoveryManager.cs
new file mode 100644
index 0000000..54e4258
--- /dev/null
+++ b/plugins/ObjectCacheServer/src/Distribution/PeerDiscoveryManager.cs
@@ -0,0 +1,168 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: ObjectCacheServer
+* File: PeerDiscoveryManager.cs
+*
+* PeerDiscoveryManager.cs is part of ObjectCacheServer which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* ObjectCacheServer is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* ObjectCacheServer is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using System.Collections.Generic;
+
+using VNLib.Plugins;
+using VNLib.Utils.Logging;
+using VNLib.Data.Caching.Extensions;
+using VNLib.Plugins.Extensions.Loading;
+
+namespace VNLib.Data.Caching.ObjectCache.Server.Distribution
+{
+
+ sealed class PeerDiscoveryManager : IAsyncBackgroundWork, ICachePeerAdapter
+ {
+ private readonly NodeConfig _config;
+ private readonly IPeerMonitor _monitor;
+ private readonly INodeDiscoveryCollection _peers;
+
+ public PeerDiscoveryManager(PluginBase plugin)
+ {
+ //Get config
+ _config = plugin.GetOrCreateSingleton<NodeConfig>();
+
+ //Get the peer monitor
+ _monitor = plugin.GetOrCreateSingleton<CachePeerMonitor>();
+
+ //Get the node collection
+ _peers = plugin.GetOrCreateSingleton<NodeDiscoveryCollection>();
+
+ _connectedPeers = new();
+ }
+
+ async Task IAsyncBackgroundWork.DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken)
+ {
+ pluginLog.Information("Node discovery worker started");
+
+ try
+ {
+ while (true)
+ {
+ try
+ {
+ await DiscoverAllNodesAsync(pluginLog, exitToken);
+ }
+ catch(OperationCanceledException)
+ {
+ throw;
+ }
+ catch (Exception ex)
+ {
+ pluginLog.Error(ex, "Failed to discover new peer nodes");
+ }
+
+ //Delay the next discovery
+ await Task.Delay(_config.DiscoveryInterval, exitToken);
+ }
+ }
+ catch (OperationCanceledException)
+ {
+ //Normal exit
+ pluginLog.Information("Node discovery worker exiting");
+ }
+ finally
+ {
+
+ }
+ }
+
+ async Task DiscoverAllNodesAsync(ILogProvider log, CancellationToken cancellation)
+ {
+ //Use the monitor to get the initial peers
+ IEnumerable<ICachePeerAdvertisment> ads = _monitor.GetAllPeers()
+ .Where(static p => p.Advertisment != null)
+ .Select(static p => p.Advertisment!);
+
+ //Init enumerator with initial peers
+ INodeDiscoveryEnumerator enumerator = _peers.BeginDiscovery(ads);
+
+ do
+ {
+ //Load the initial peer
+ ICachePeerAdvertisment? peer = enumerator.GetNextPeer();
+
+ if (peer == null)
+ {
+ break;
+ }
+
+ log.Verbose("Discovering peer nodes from {Peer}", peer.NodeId);
+
+ //Discover nodes from this peer
+ ICachePeerAdvertisment[]? newNodes = await _config.Config.DiscoverClusterNodesAsync(peer, cancellation);
+
+ //Add nodes to the enumerator
+ if (newNodes != null)
+ {
+ enumerator.OnPeerDiscoveryComplete(newNodes);
+ }
+
+ } while (true);
+
+ //Commit peer updates
+ _peers.CompleteDiscovery(enumerator);
+ }
+
+
+ private readonly List<ICachePeerAdvertisment> _connectedPeers;
+
+ ///<inheritdoc/>
+ public ICachePeerAdvertisment[] GetNewPeers()
+ {
+ lock (_connectedPeers)
+ {
+ //Get all discovered peers
+ ICachePeerAdvertisment[] peers = _peers.GetAllNodes();
+
+ //Get the difference between the discovered peers and the connected peers
+ return peers.Except(_connectedPeers).ToArray();
+ }
+ }
+
+ ///<inheritdoc/>
+ public void OnPeerListenerAttached(ICachePeerAdvertisment peer)
+ {
+ lock (_connectedPeers)
+ {
+ //Add to connected peers
+ _connectedPeers.Add(peer);
+ }
+ }
+
+ ///<inheritdoc/>
+ public void OnPeerListenerDetatched(ICachePeerAdvertisment peer)
+ {
+ //remove from connected peers
+ lock (_connectedPeers)
+ {
+ _connectedPeers.Remove(peer);
+ }
+ }
+ }
+}
diff --git a/plugins/ObjectCacheServer/src/Endpoints/BrokerHeartBeat.cs b/plugins/ObjectCacheServer/src/Endpoints/BrokerHeartBeatEndpoint.cs
index 97061b3..b9c00e6 100644
--- a/plugins/ObjectCacheServer/src/Endpoints/BrokerHeartBeat.cs
+++ b/plugins/ObjectCacheServer/src/Endpoints/BrokerHeartBeatEndpoint.cs
@@ -1,11 +1,11 @@
/*
-* Copyright (c) 2022 Vaughn Nugent
+* Copyright (c) 2023 Vaughn Nugent
*
* Library: VNLib
* Package: ObjectCacheServer
-* File: BrokerHeartBeat.cs
+* File: BrokerHeartBeatEndpoint.cs
*
-* BrokerHeartBeat.cs is part of ObjectCacheServer which is part of the larger
+* BrokerHeartBeatEndpoint.cs is part of ObjectCacheServer which is part of the larger
* VNLib collection of libraries and utilities.
*
* ObjectCacheServer is free software: you can redistribute it and/or modify
@@ -26,11 +26,11 @@ using System;
using System.Net;
using System.Linq;
using System.Text.Json;
-using System.Threading;
using System.Threading.Tasks;
-using System.Collections.Generic;
+
using VNLib.Plugins;
+using VNLib.Utils.Logging;
using VNLib.Plugins.Essentials;
using VNLib.Hashing.IdentityUtility;
using VNLib.Plugins.Essentials.Endpoints;
@@ -39,14 +39,11 @@ using VNLib.Plugins.Extensions.Loading;
namespace VNLib.Data.Caching.ObjectCache.Server
{
- internal sealed class BrokerHeartBeat : ResourceEndpointBase
+ internal sealed class BrokerHeartBeatEndpoint : ResourceEndpointBase
{
- public override string Path => "/heartbeat";
-
- private readonly Func<string> Token;
- private readonly ManualResetEvent KeepaliveSet;
+ private readonly IBrokerHeartbeatNotifier _heartBeat;
private readonly Task<IPAddress[]> BrokerIpList;
- private readonly PluginBase Pbase;
+ private readonly bool DebugMode;
///<inheritdoc/>
protected override ProtectionSettings EndpointProtectionSettings { get; } = new()
@@ -55,19 +52,24 @@ namespace VNLib.Data.Caching.ObjectCache.Server
DisableSessionsRequired = true
};
- public BrokerHeartBeat(Func<string> token, ManualResetEvent keepaliveSet, Uri brokerUri, PluginBase pbase)
+ public BrokerHeartBeatEndpoint(PluginBase plugin)
{
- Token = token;
- KeepaliveSet = keepaliveSet;
- BrokerIpList = Dns.GetHostAddressesAsync(brokerUri.DnsSafeHost);
-
- this.Pbase = pbase;
- }
+ //Get debug flag
+ DebugMode = plugin.IsDebug();
- private async Task<ReadOnlyJsonWebKey> GetBrokerPubAsync()
- {
- return await Pbase.TryGetSecretAsync("broker_public_key").ToJsonWebKey() ?? throw new KeyNotFoundException("Missing required secret : broker_public_key");
+ //Get or create the current node config
+ _heartBeat = plugin.GetOrCreateSingleton<NodeConfig>();
+
+ /*
+ * Resolve the ip address of the broker and store it to verify connections
+ * later
+ */
+ BrokerIpList = Dns.GetHostAddressesAsync(_heartBeat.GetBrokerAddress().DnsSafeHost);
+
+ //Setup endpoint
+ InitPathAndLog("/heartbeat", plugin.Log);
}
+
protected override async ValueTask<VfReturnType> GetAsync(HttpEntity entity)
{
@@ -76,13 +78,20 @@ namespace VNLib.Data.Caching.ObjectCache.Server
{
//Load and verify the broker's ip address matches with an address we have stored
IPAddress[] addresses = await BrokerIpList;
+
if (!addresses.Contains(entity.TrustedRemoteIp))
{
+ if (DebugMode)
+ {
+ Log.Debug("Received connection {ip} that was not a DNS safe address for the broker server, access denied");
+ }
+
//Token invalid
entity.CloseResponse(HttpStatusCode.Forbidden);
return VfReturnType.VirtualSkip;
}
}
+
//Get the authorization jwt
string? jwtAuth = entity.Server.Headers[HttpRequestHeader.Authorization];
@@ -97,7 +106,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server
using JsonWebToken jwt = JsonWebToken.Parse(jwtAuth);
//Verify the jwt using the broker's public key certificate
- using (ReadOnlyJsonWebKey cert = await GetBrokerPubAsync())
+ using (ReadOnlyJsonWebKey cert = _heartBeat.GetBrokerPublicKey())
{
//Verify the jwt
if (!jwt.VerifyFromJwk(cert))
@@ -114,16 +123,24 @@ namespace VNLib.Data.Caching.ObjectCache.Server
{
auth = doc.RootElement.GetProperty("token").GetString();
}
-
+
+ //Get our stored token used for registration
+ string? selfToken = _heartBeat.GetAuthToken();
+
//Verify token
- if(Token().Equals(auth, StringComparison.Ordinal))
+ if (selfToken != null && selfToken.Equals(auth, StringComparison.Ordinal))
{
//Signal keepalive
- KeepaliveSet.Set();
+ _heartBeat.HearbeatReceived();
entity.CloseResponse(HttpStatusCode.OK);
return VfReturnType.VirtualSkip;
}
-
+
+ if (DebugMode)
+ {
+ Log.Debug("Invalid auth token recieved from broker sever, access denied");
+ }
+
//Token invalid
entity.CloseResponse(HttpStatusCode.Forbidden);
return VfReturnType.VirtualSkip;
diff --git a/plugins/ObjectCacheServer/src/Endpoints/CacheStore.cs b/plugins/ObjectCacheServer/src/Endpoints/CacheStore.cs
new file mode 100644
index 0000000..67db433
--- /dev/null
+++ b/plugins/ObjectCacheServer/src/Endpoints/CacheStore.cs
@@ -0,0 +1,91 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: ObjectCacheServer
+* File: ConnectEndpoint.cs
+*
+* ConnectEndpoint.cs is part of ObjectCacheServer which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* ObjectCacheServer is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* ObjectCacheServer is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+using VNLib.Utils.Logging;
+using VNLib.Plugins;
+using VNLib.Plugins.Extensions.Loading;
+
+namespace VNLib.Data.Caching.ObjectCache.Server
+{
+ [ConfigurationName("cache")]
+ sealed class CacheStore : ICacheStore, IDisposable
+ {
+ public BlobCacheListener Listener { get; }
+
+
+ public CacheStore(PluginBase plugin, IConfigScope config)
+ {
+ //Init cache
+ Listener = InitializeCache((ObjectCacheServerEntry)plugin, config);
+ }
+
+ ValueTask ICacheStore.AddOrUpdateBlobAsync<T>(string objectId, string? alternateId, GetBodyDataCallback<T> bodyData, T state, CancellationToken token)
+ {
+ return Listener.Cache.AddOrUpdateObjectAsync(objectId, alternateId, bodyData, state, default, token);
+ }
+
+ void ICacheStore.Clear()
+ {
+ throw new NotImplementedException();
+ }
+
+ ValueTask<bool> ICacheStore.DeleteItemAsync(string id, CancellationToken token)
+ {
+ return Listener.Cache.DeleteObjectAsync(id, token);
+ }
+
+ private static BlobCacheListener InitializeCache(ObjectCacheServerEntry plugin, IConfigScope config)
+ {
+ //Deserialize the cache config
+ CacheConfiguration cacheConf = config.Deserialze<CacheConfiguration>();
+
+ if (cacheConf.MaxCacheEntries < 2)
+ {
+ throw new ArgumentException("You must configure a 'max_cache' size larger than 1 item");
+ }
+
+ //Suggestion
+ if (cacheConf.MaxCacheEntries < 200)
+ {
+ plugin.Log.Information("Suggestion: You may want a larger cache size, you have less than 200 items in cache");
+ }
+
+ plugin.Log.Verbose("Creating cache store with {bc} buckets, with {mc} items/bucket", cacheConf.BucketCount, cacheConf.MaxCacheEntries);
+
+ //Load the blob cache table system
+ IBlobCacheTable bc = plugin.LoadMemoryCacheSystem(config, plugin.CacheHeap, cacheConf);
+
+ //Endpoint only allows for a single reader
+ return new(bc, plugin.Log, plugin.CacheHeap, true);
+ }
+
+ public void Dispose()
+ {
+ Listener.Dispose();
+ }
+ }
+}
diff --git a/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs b/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs
index 2f896bc..167a7e9 100644
--- a/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs
+++ b/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs
@@ -27,40 +27,46 @@ using System.Net;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
-using System.Threading.Channels;
using System.Collections.Generic;
-using System.Collections.Concurrent;
-using VNLib.Plugins;
using VNLib.Hashing;
using VNLib.Net.Http;
using VNLib.Utils.Async;
using VNLib.Utils.Memory;
using VNLib.Utils.Logging;
+using VNLib.Utils.Extensions;
using VNLib.Data.Caching;
+using VNLib.Data.Caching.Extensions;
using VNLib.Hashing.IdentityUtility;
using VNLib.Net.Messaging.FBM;
using VNLib.Net.Messaging.FBM.Client;
using VNLib.Net.Messaging.FBM.Server;
+using VNLib.Plugins;
using VNLib.Plugins.Essentials;
using VNLib.Plugins.Extensions.Loading;
using VNLib.Plugins.Essentials.Endpoints;
using VNLib.Plugins.Essentials.Extensions;
-
+using VNLib.Plugins.Extensions.Loading.Routing;
+using VNLib.Data.Caching.ObjectCache.Server.Distribution;
namespace VNLib.Data.Caching.ObjectCache.Server
{
- [ConfigurationName("store")]
- internal sealed class ConnectEndpoint : ResourceEndpointBase, IDisposable, IAsyncBackgroundWork
+ [ConfigurationName("connect_endpoint")]
+ internal sealed class ConnectEndpoint : ResourceEndpointBase, IAsyncBackgroundWork
{
- private static readonly TimeSpan AuthTokenExpiration = TimeSpan.FromSeconds(30);
+ private static readonly TimeSpan AuthTokenExpiration = TimeSpan.FromSeconds(30);
+
+
+ private readonly CacheNodeConfiguration NodeConfiguration;
+ private readonly ICacheEventQueueManager PubSubManager;
+ private readonly IPeerMonitor Peers;
- private readonly string AudienceLocalServerId;
private readonly BlobCacheListener Store;
- private readonly PluginBase Pbase;
+ private readonly CacheAuthKeyStore KeyStore;
- private readonly ConcurrentDictionary<string, AsyncQueue<ChangeEvent>> StatefulEventQueue;
+ private readonly bool VerifyIp;
+ private readonly string AudienceLocalServerId;
private uint _connectedClients;
@@ -87,27 +93,26 @@ namespace VNLib.Data.Caching.ObjectCache.Server
string? path = config["path"].GetString();
InitPathAndLog(path, plugin.Log);
-
- Pbase = plugin;
- //Parse cache config or use default
- if(config.TryGetValue("cache", out JsonElement confEl))
- {
- CacheConfig = confEl.Deserialize<CacheConfiguration>()!;
- }
- else
- {
- //Init default config if not fount
- CacheConfig = new();
+ KeyStore = new(plugin);
- Log.Verbose("Loading default cache buffer configuration");
- }
+ //Check for ip-verification flag
+ VerifyIp = config.TryGetValue("verify_ip", out JsonElement vIpEl) && vIpEl.GetBoolean();
- //Create event queue client lookup table
- StatefulEventQueue = new(StringComparer.OrdinalIgnoreCase);
+ //Setup pub/sub manager
+ PubSubManager = plugin.GetOrCreateSingleton<CacheEventQueueManager>();
+
+ //Get node configuration
+ NodeConfiguration = plugin.GetOrCreateSingleton<NodeConfig>().Config;
+
+ //Get peer monitor
+ Peers = plugin.GetOrCreateSingleton<CachePeerMonitor>();
//Init the cache store
- Store = InitializeCache((ObjectCacheServerEntry)plugin, CacheConfig, config);
+ Store = plugin.GetOrCreateSingleton<CacheStore>().Listener;
+
+ //Get the cache store configuration
+ CacheConfig = plugin.GetConfigForType<CacheStore>().Deserialze<CacheConfiguration>();
/*
* Generate a random guid for the current server when created so we
@@ -118,60 +123,8 @@ namespace VNLib.Data.Caching.ObjectCache.Server
//Schedule the queue worker to be run
_ = plugin.ObserveWork(this, 100);
}
-
-
- private static BlobCacheListener InitializeCache(ObjectCacheServerEntry plugin, CacheConfiguration cacheConf, IConfigScope config)
- {
- if(cacheConf.MaxCacheEntries < 2)
- {
- throw new ArgumentException("You must configure a 'max_cache' size larger than 1 item");
- }
-
- //Suggestion
- if(cacheConf.MaxCacheEntries < 200)
- {
- plugin.Log.Information("Suggestion: You may want a larger cache size, you have less than 200 items in cache");
- }
-
- plugin.Log.Verbose("Creating cache store with {bc} buckets, with {mc} items/bucket", cacheConf.BucketCount, cacheConf.MaxCacheEntries);
-
- //Load the blob cache table system
- IBlobCacheTable bc = plugin.LoadMemoryCacheSystem(config, plugin.CacheHeap, cacheConf);
-
- //Endpoint only allows for a single reader
- return new (bc, plugin.Log, plugin.CacheHeap, true);
- }
-
-
- /// <summary>
- /// Gets the configured cache store
- /// </summary>
- /// <returns></returns>
- public ICacheStore GetCacheStore() => new CacheStore(Store);
- //Dispose will be called by the host plugin on unload
- void IDisposable.Dispose()
- {
- //Dispose the store on cleanup
- Store.Dispose();
- }
-
-
- private async Task<ReadOnlyJsonWebKey> GetClientPubAsync()
- {
- return await Pbase.TryGetSecretAsync("client_public_key").ToJsonWebKey() ?? throw new KeyNotFoundException("Missing required secret : client_public_key");
- }
- private async Task<ReadOnlyJsonWebKey> GetCachePubAsync()
- {
- return await Pbase.TryGetSecretAsync("cache_public_key").ToJsonWebKey() ?? throw new KeyNotFoundException("Missing required secret : client_public_key");
- }
- private async Task<ReadOnlyJsonWebKey> GetCachePrivateKeyAsync()
- {
- return await Pbase.TryGetSecretAsync("cache_private_key").ToJsonWebKey() ?? throw new KeyNotFoundException("Missing required secret : client_public_key");
- }
-
-
/*
* Used as a client negotiation and verification request
*
@@ -183,6 +136,10 @@ namespace VNLib.Data.Caching.ObjectCache.Server
*
* The tokens are very short lived as requests are intended to be made
* directly after verification
+ *
+ * Clients must also sign the entire token with their private key and
+ * set the signature in the x-upgrade-sig header so we can verify they
+ * received the messages properly
*/
protected override async ValueTask<VfReturnType> GetAsync(HttpEntity entity)
@@ -205,7 +162,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server
bool verified = false;
//Get the client public key certificate to verify the client's message
- using(ReadOnlyJsonWebKey cert = await GetClientPubAsync())
+ using(ReadOnlyJsonWebKey cert = await KeyStore.GetClientPublicKeyAsync())
{
//verify signature for client
if (jwt.VerifyFromJwk(cert))
@@ -215,10 +172,8 @@ namespace VNLib.Data.Caching.ObjectCache.Server
//May be signed by a cache server
else
{
- using ReadOnlyJsonWebKey cacheCert = await GetCachePubAsync();
-
//Set peer and verified flag since the another cache server signed the request
- isPeer = verified = jwt.VerifyFromJwk(cacheCert);
+ isPeer = verified = NodeConfiguration.VerifyCache(jwt);
}
}
@@ -232,10 +187,12 @@ namespace VNLib.Data.Caching.ObjectCache.Server
//Recover json body
using JsonDocument doc = jwt.GetPayload();
+
if (doc.RootElement.TryGetProperty("sub", out JsonElement servIdEl))
{
nodeId = servIdEl.GetString();
}
+
if (doc.RootElement.TryGetProperty("chl", out JsonElement challengeEl))
{
challenge = challengeEl.GetString();
@@ -246,133 +203,147 @@ namespace VNLib.Data.Caching.ObjectCache.Server
//Verified, now we can create an auth message with a short expiration
using JsonWebToken auth = new();
+
+ auth.WriteHeader(NodeConfiguration.GetJwtHeader());
+ auth.InitPayloadClaim()
+ .AddClaim("aud", AudienceLocalServerId)
+ .AddClaim("exp", entity.RequestedTimeUtc.Add(AuthTokenExpiration).ToUnixTimeSeconds())
+ .AddClaim("nonce", RandomHash.GetRandomBase32(8))
+ .AddClaim("chl", challenge!)
+ //Set the ispeer flag if the request was signed by a cache server
+ .AddClaim("isPeer", isPeer)
+ //Specify the server's node id if set
+ .AddClaim("sub", nodeId!)
+ //Set ip address
+ .AddClaim("ip", entity.TrustedRemoteIp.ToString())
+ //Add negotiaion args
+ .AddClaim(FBMClient.REQ_HEAD_BUF_QUERY_ARG, CacheConfig.MaxHeaderBufferSize)
+ .AddClaim(FBMClient.REQ_RECV_BUF_QUERY_ARG, CacheConfig.MaxRecvBufferSize)
+ .AddClaim(FBMClient.REQ_MAX_MESS_QUERY_ARG, CacheConfig.MaxMessageSize)
+ .CommitClaims();
+
+ //Sign the auth message from our private key
+ NodeConfiguration.SignJwt(auth);
- //Sign the auth message from the cache certificate's private key
- using (ReadOnlyJsonWebKey cert = await GetCachePrivateKeyAsync())
- {
- auth.WriteHeader(cert.JwtHeader);
- auth.InitPayloadClaim()
- .AddClaim("aud", AudienceLocalServerId)
- .AddClaim("exp", entity.RequestedTimeUtc.Add(AuthTokenExpiration).ToUnixTimeSeconds())
- .AddClaim("nonce", RandomHash.GetRandomBase32(8))
- .AddClaim("chl", challenge!)
- //Set the ispeer flag if the request was signed by a cache server
- .AddClaim("isPeer", isPeer)
- //Specify the server's node id if set
- .AddClaim("sub", nodeId!)
- //Add negotiaion args
- .AddClaim(FBMClient.REQ_HEAD_BUF_QUERY_ARG, CacheConfig.MaxHeaderBufferSize)
- .AddClaim(FBMClient.REQ_RECV_BUF_QUERY_ARG, CacheConfig.MaxRecvBufferSize)
- .AddClaim(FBMClient.REQ_MAX_MESS_QUERY_ARG, CacheConfig.MaxMessageSize)
- .CommitClaims();
-
- auth.SignFromJwk(cert);
- }
-
//Close response
entity.CloseResponse(HttpStatusCode.OK, ContentType.Text, auth.DataBuffer);
return VfReturnType.VirtualSkip;
}
-
- //Background worker to process event queue items
- async Task IAsyncBackgroundWork.DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken)
+ protected override async ValueTask<VfReturnType> WebsocketRequestedAsync(HttpEntity entity)
{
- try
- {
- //Listen for changes
- while (true)
- {
- ChangeEvent ev = await Store.EventQueue.DequeueAsync(exitToken);
+ //Parse jwt from authorization
+ string? jwtAuth = entity.Server.Headers[HttpRequestHeader.Authorization];
- //Add event to queues
- foreach (AsyncQueue<ChangeEvent> queue in StatefulEventQueue.Values)
- {
- if (!queue.TryEnque(ev))
- {
- Log.Debug("Listener queue has exeeded capacity, change events will be lost");
- }
- }
- }
- }
- catch (OperationCanceledException)
+ if (string.IsNullOrWhiteSpace(jwtAuth))
{
- //Normal exit
+ entity.CloseResponse(HttpStatusCode.Unauthorized);
+ return VfReturnType.VirtualSkip;
}
- }
- private class WsUserState
- {
- public int RecvBufferSize { get; init; }
- public int MaxHeaderBufferSize { get; init; }
- public int MaxMessageSize { get; init; }
- public int MaxResponseBufferSize { get; init; }
- public AsyncQueue<ChangeEvent>? SyncQueue { get; init; }
+ //Get the upgrade signature header
+ string? clientSignature = entity.Server.Headers[FBMDataCacheExtensions.X_UPGRADE_SIG_HEADER];
- public override string ToString()
+ if (string.IsNullOrWhiteSpace(clientSignature))
{
- return
- $"{nameof(RecvBufferSize)}:{RecvBufferSize}, {nameof(MaxHeaderBufferSize)}: {MaxHeaderBufferSize}, {nameof(MaxMessageSize)}:{MaxMessageSize}, {nameof(MaxResponseBufferSize)}:{MaxResponseBufferSize}";
+ entity.CloseResponse(HttpStatusCode.Unauthorized);
+ return VfReturnType.VirtualSkip;
}
- }
- protected override async ValueTask<VfReturnType> WebsocketRequestedAsync(HttpEntity entity)
- {
- try
+ string? nodeId = null;
+ ICachePeerAdvertisment? discoveryAd = null;
+
+ //Parse jwt
+ using (JsonWebToken jwt = JsonWebToken.Parse(jwtAuth))
{
- //Parse jwt from authorization
- string? jwtAuth = entity.Server.Headers[HttpRequestHeader.Authorization];
+ //verify signature against the cache public key, since this server must have signed it
+ if (!NodeConfiguration.VerifyCache(jwt))
+ {
+ entity.CloseResponse(HttpStatusCode.Unauthorized);
+ return VfReturnType.VirtualSkip;
+ }
+
+ //Recover json body
+ using JsonDocument doc = jwt.GetPayload();
+
+ //Verify audience, expiration
- if (string.IsNullOrWhiteSpace(jwtAuth))
+ if (!doc.RootElement.TryGetProperty("aud", out JsonElement audEl) || !AudienceLocalServerId.Equals(audEl.GetString(), StringComparison.OrdinalIgnoreCase))
+ {
+ entity.CloseResponse(HttpStatusCode.Unauthorized);
+ return VfReturnType.VirtualSkip;
+ }
+
+ if (!doc.RootElement.TryGetProperty("exp", out JsonElement expEl)
+ || DateTimeOffset.FromUnixTimeSeconds(expEl.GetInt64()) < entity.RequestedTimeUtc)
{
entity.CloseResponse(HttpStatusCode.Unauthorized);
return VfReturnType.VirtualSkip;
}
-
- string? nodeId = null;
- //Parse jwt
- using (JsonWebToken jwt = JsonWebToken.Parse(jwtAuth))
+ //Check node ip address matches if required
+ if (VerifyIp)
{
- //Get the client public key certificate to verify the client's message
- using (ReadOnlyJsonWebKey cert = await GetCachePubAsync())
+ if (!doc.RootElement.TryGetProperty("ip", out JsonElement ipEl))
{
- //verify signature against the cache public key, since this server must have signed it
- if (!jwt.VerifyFromJwk(cert))
- {
- entity.CloseResponse(HttpStatusCode.Unauthorized);
- return VfReturnType.VirtualSkip;
- }
+ entity.CloseResponse(HttpStatusCode.Unauthorized);
+ return VfReturnType.VirtualSkip;
}
-
- //Recover json body
- using JsonDocument doc = jwt.GetPayload();
-
- //Verify audience, expiration
- if (!doc.RootElement.TryGetProperty("aud", out JsonElement audEl) || !AudienceLocalServerId.Equals(audEl.GetString(), StringComparison.OrdinalIgnoreCase))
+ string? clientIp = ipEl.GetString();
+ //Verify the client ip address matches the one in the token
+ if (clientIp == null || !IPAddress.TryParse(clientIp, out IPAddress? clientIpAddr) || !clientIpAddr.Equals(entity.TrustedRemoteIp))
{
entity.CloseResponse(HttpStatusCode.Unauthorized);
return VfReturnType.VirtualSkip;
}
+ }
+
+ //Check if the client is a peer
+ bool isPeer = doc.RootElement.TryGetProperty("isPeer", out JsonElement isPeerEl) && isPeerEl.GetBoolean();
- if (!doc.RootElement.TryGetProperty("exp", out JsonElement expEl)
- || DateTimeOffset.FromUnixTimeSeconds(expEl.GetInt64()) < entity.RequestedTimeUtc)
+ //The node id is optional and stored in the 'sub' field, ignore if the client is not a peer
+ if (isPeer && doc.RootElement.TryGetProperty("sub", out JsonElement servIdEl))
+ {
+ nodeId = servIdEl.GetString();
+ }
+
+ //Verify the signature the client included of the auth token
+
+ if (isPeer)
+ {
+ //Verify token signature against a fellow cache public key
+ if (!NodeConfiguration.VerifyUpgradeToken(clientSignature, jwtAuth))
{
entity.CloseResponse(HttpStatusCode.Unauthorized);
return VfReturnType.VirtualSkip;
}
- //Check if the client is a peer
- bool isPeer = doc.RootElement.TryGetProperty("isPeer", out JsonElement isPeerEl) && isPeerEl.GetBoolean();
+ //Try to get the node advertisement header
+ string? discoveryHeader = entity.Server.Headers[FBMDataCacheExtensions.X_NODE_DISCOVERY_HEADER];
- //The node id is optional and stored in the 'sub' field, ignore if the client is not a peer
- if (isPeer && doc.RootElement.TryGetProperty("sub", out JsonElement servIdEl))
+ //Verify the node advertisement header and publish it
+ if (!string.IsNullOrWhiteSpace(discoveryHeader))
{
- nodeId = servIdEl.GetString();
+ discoveryAd = NodeConfiguration.VerifyPeerAdvertisment(discoveryHeader);
}
}
-
+ else
+ {
+ //Not a peer, so verify against the client's public key
+ using ReadOnlyJsonWebKey clientPub = await KeyStore.GetClientPublicKeyAsync();
+
+ //Verify token signature
+ if (!FBMDataCacheExtensions.VerifyUpgradeToken(clientSignature, jwtAuth, clientPub))
+ {
+ entity.CloseResponse(HttpStatusCode.Unauthorized);
+ return VfReturnType.VirtualSkip;
+ }
+ }
+ }
+
+ try
+ {
//Get query config suggestions from the client
string recvBufCmd = entity.QueryArgs[FBMClient.REQ_RECV_BUF_QUERY_ARG];
string maxHeaderCharCmd = entity.QueryArgs[FBMClient.REQ_HEAD_BUF_QUERY_ARG];
@@ -382,34 +353,6 @@ namespace VNLib.Data.Caching.ObjectCache.Server
int recvBufSize = int.TryParse(recvBufCmd, out int rbs) ? rbs : CacheConfig.MinRecvBufferSize;
int maxHeadBufSize = int.TryParse(maxHeaderCharCmd, out int hbs) ? hbs : CacheConfig.MinHeaderBufferSize;
int maxMessageSize = int.TryParse(maxMessageSizeCmd, out int mxs) ? mxs : CacheConfig.MaxMessageSize;
-
- AsyncQueue<ChangeEvent>? nodeQueue = null;
-
- //The connection may be a caching server node, so get its node-id
- if (!string.IsNullOrWhiteSpace(nodeId))
- {
- /*
- * Store a new async queue, or get an old queue for the current node
- *
- * We should use a bounded queue and disacard LRU items, we also know
- * only a single writer is needed as the queue is processed on a single thread
- * and change events may be processed on mutliple threads.
- */
-
- BoundedChannelOptions queueOptions = new(CacheConfig.MaxEventQueueDepth)
- {
- AllowSynchronousContinuations = true,
- SingleReader = false,
- SingleWriter = true,
- //Drop oldest item in queue if full
- FullMode = BoundedChannelFullMode.DropOldest,
- };
-
- _ = StatefulEventQueue.TryAdd(nodeId, new(queueOptions));
-
- //Get the queue
- nodeQueue = StatefulEventQueue[nodeId];
- }
/*
* Buffer sizing can get messy as the response/resquest sizes can vary
@@ -434,7 +377,8 @@ namespace VNLib.Data.Caching.ObjectCache.Server
*/
MaxResponseBufferSize = (int)MemoryUtil.NearestPage(maxMessageSizeClamp),
- SyncQueue = nodeQueue
+ NodeId = nodeId,
+ Advertisment = discoveryAd
};
Log.Debug("Client recv buffer suggestion {recv}, header buffer size {head}, response buffer size {r}", recvBufCmd, maxHeaderCharCmd, state.MaxResponseBufferSize);
@@ -454,14 +398,19 @@ namespace VNLib.Data.Caching.ObjectCache.Server
private async Task WebsocketAcceptedAsync(WebSocketSession wss)
{
+ WsUserState state = (WsUserState)wss.UserState!;
+
+ //Notify peers of new connection
+ Peers.OnPeerConnected(state);
+
//Inc connected count
Interlocked.Increment(ref _connectedClients);
+
//Register plugin exit token to cancel the connected socket
- CancellationTokenRegistration reg = Pbase.UnloadToken.Register(wss.CancelAll);
+ CancellationTokenRegistration reg = this.GetPlugin().UnloadToken.Register(wss.CancelAll);
+
try
{
- WsUserState state = (wss.UserState as WsUserState)!;
-
//Init listener args from request
FBMListenerSessionParams args = new()
{
@@ -473,12 +422,33 @@ namespace VNLib.Data.Caching.ObjectCache.Server
HeaderEncoding = Helpers.DefaultEncoding,
};
- //Listen for requests
- await Store.ListenAsync(wss, args, state.SyncQueue);
+ //Check if the client is a peer node, if it is, subscribe to change events
+ if (!string.IsNullOrWhiteSpace(state.NodeId))
+ {
+ //Get the event queue for the current node
+ AsyncQueue<ChangeEvent> queue = PubSubManager.Subscribe(state);
+
+ try
+ {
+ //Begin listening for messages with a queue
+ await Store.ListenAsync(wss, args, queue);
+ }
+ finally
+ {
+ //ALAWYS Detatch listener
+ PubSubManager.Unsubscribe(state);
+ }
+ }
+ else
+ {
+ //Begin listening for messages without a queue
+ await Store.ListenAsync(wss, args, null);
+ }
}
catch (OperationCanceledException)
{
Log.Debug("Websocket connection was canceled");
+
//Disconnect the socket
await wss.CloseSocketOutputAsync(System.Net.WebSockets.WebSocketCloseStatus.NormalClosure, "unload", CancellationToken.None);
}
@@ -490,35 +460,68 @@ namespace VNLib.Data.Caching.ObjectCache.Server
{
//Dec connected count
Interlocked.Decrement(ref _connectedClients);
+
//Unregister the
reg.Unregister();
}
+
+ //Notify monitor of disconnect
+ Peers.OnPeerDisconnected(state);
+
Log.Debug("Server websocket exited");
}
-
- private sealed class CacheStore : ICacheStore
+
+ //Background worker to process event queue items
+ async Task IAsyncBackgroundWork.DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken)
{
- private readonly BlobCacheListener _cache;
+ const int accumulatorSize = 64;
- public CacheStore(BlobCacheListener cache)
+ try
{
- _cache = cache;
- }
+ //Accumulator for events
+ ChangeEvent[] accumulator = new ChangeEvent[accumulatorSize];
+ int ptr = 0;
- ValueTask ICacheStore.AddOrUpdateBlobAsync<T>(string objectId, string? alternateId, GetBodyDataCallback<T> bodyData, T state, CancellationToken token)
- {
- return _cache.Cache.AddOrUpdateObjectAsync(objectId, alternateId, bodyData, state, default, token);
- }
+ //Listen for changes
+ while (true)
+ {
+ //Wait for next event
+ accumulator[ptr++] = await Store.EventQueue.DequeueAsync(exitToken);
- void ICacheStore.Clear()
+ //try to accumulate more events until we can't anymore
+ while (Store.EventQueue.TryDequeue(out ChangeEvent? ev) && ptr < accumulatorSize)
+ {
+ accumulator[ptr++] = ev;
+ }
+
+ //Publish all events to subscribers
+ PubSubManager.PublishMultiple(accumulator.AsSpan(0, ptr));
+
+ //Reset pointer
+ ptr = 0;
+ }
+ }
+ catch (OperationCanceledException)
{
- throw new NotImplementedException();
+ //Normal exit
+ pluginLog.Debug("Change queue listener worker exited");
}
+ }
+
+ private class WsUserState : ICachePeer
+ {
+ public int RecvBufferSize { get; init; }
+ public int MaxHeaderBufferSize { get; init; }
+ public int MaxMessageSize { get; init; }
+ public int MaxResponseBufferSize { get; init; }
+ public string? NodeId { get; init; }
+ public ICachePeerAdvertisment? Advertisment { get; init; }
- ValueTask<bool> ICacheStore.DeleteItemAsync(string id, CancellationToken token)
+ public override string ToString()
{
- return _cache.Cache.DeleteObjectAsync(id, token);
+ return
+ $"{nameof(RecvBufferSize)}:{RecvBufferSize}, {nameof(MaxHeaderBufferSize)}: {MaxHeaderBufferSize}, {nameof(MaxMessageSize)}:{MaxMessageSize}, {nameof(MaxResponseBufferSize)}:{MaxResponseBufferSize}";
}
}
}
diff --git a/plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs b/plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs
new file mode 100644
index 0000000..670d624
--- /dev/null
+++ b/plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs
@@ -0,0 +1,129 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: ObjectCacheServer
+* File: PeerDiscoveryEndpoint.cs
+*
+* PeerDiscoveryEndpoint.cs is part of ObjectCacheServer which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* ObjectCacheServer is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* ObjectCacheServer is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+using System.Linq;
+using System.Net;
+using System.Text.Json;
+using System.Threading.Tasks;
+
+using VNLib.Hashing;
+using VNLib.Hashing.IdentityUtility;
+using VNLib.Plugins;
+using VNLib.Plugins.Essentials;
+using VNLib.Plugins.Essentials.Endpoints;
+using VNLib.Plugins.Essentials.Extensions;
+using VNLib.Plugins.Extensions.Loading;
+using VNLib.Data.Caching.Extensions;
+using VNLib.Data.Caching.ObjectCache.Server.Distribution;
+
+namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
+{
+ [ConfigurationName("discovery_endpoint")]
+ internal sealed class PeerDiscoveryEndpoint : UnprotectedWebEndpoint
+ {
+ private readonly IPeerMonitor PeerMonitor;
+ private readonly NodeConfig Config;
+
+ public PeerDiscoveryEndpoint(PluginBase plugin, IConfigScope config)
+ {
+ string? path = config["path"].GetString();
+
+ InitPathAndLog(path, plugin.Log);
+
+ //Get the peer monitor
+ PeerMonitor = plugin.GetOrCreateSingleton<CachePeerMonitor>();
+
+ //Get the node config
+ Config = plugin.GetOrCreateSingleton<NodeConfig>();
+ }
+
+ protected override async ValueTask<VfReturnType> GetAsync(HttpEntity entity)
+ {
+ //Get auth token
+ string? authToken = entity.Server.Headers[HttpRequestHeader.Authorization];
+
+ if(string.IsNullOrWhiteSpace(authToken))
+ {
+ entity.CloseResponse(HttpStatusCode.Unauthorized);
+ return VfReturnType.VirtualSkip;
+ }
+
+ string subject = string.Empty;
+
+ //Parse auth token
+ using(JsonWebToken jwt = JsonWebToken.Parse(authToken))
+ {
+ //try to verify against cache node first
+ if (!Config.Config.VerifyCache(jwt))
+ {
+ //failed...
+
+ //try to verify against client key
+ using ReadOnlyJsonWebKey clientPub = await Config.KeyStore.GetClientPublicKeyAsync();
+
+ if (!jwt.VerifyFromJwk(clientPub))
+ {
+ //invalid token
+ entity.CloseResponse(HttpStatusCode.Unauthorized);
+ return VfReturnType.VirtualSkip;
+ }
+ }
+
+ using JsonDocument payload = jwt.GetPayload();
+
+ subject = payload.RootElement.GetProperty("sub").GetString() ?? string.Empty;
+ }
+
+ //Valid key, get peer list to send to client
+ ICachePeerAdvertisment[] peers = PeerMonitor.GetAllPeers()
+ .Where(static p => p.Advertisment != null)
+ .Select(static p => p.Advertisment!)
+ .ToArray();
+
+ //Build response jwt
+ using JsonWebToken response = new();
+
+ //set header from cache config
+ response.WriteHeader(Config.Config.GetJwtHeader());
+
+ response.InitPayloadClaim()
+ .AddClaim("iss", Config.Config.NodeId)
+ //Audience is the requestor id
+ .AddClaim("sub", subject)
+ .AddClaim("iat", entity.RequestedTimeUtc.ToUnixTimeSeconds())
+ //Send all peers as a json array
+ .AddClaim("peers", peers)
+ .AddClaim("nonce", RandomHash.GetRandomBase32(24))
+ .CommitClaims();
+
+ //Sign the response
+ Config.Config.SignJwt(response);
+
+ //Send response to client
+ entity.CloseResponse(HttpStatusCode.OK, Net.Http.ContentType.Text, response.DataBuffer);
+ return VfReturnType.VirtualSkip;
+ }
+ }
+}
diff --git a/plugins/ObjectCacheServer/src/ICacheEventQueueManager.cs b/plugins/ObjectCacheServer/src/ICacheEventQueueManager.cs
new file mode 100644
index 0000000..6b07000
--- /dev/null
+++ b/plugins/ObjectCacheServer/src/ICacheEventQueueManager.cs
@@ -0,0 +1,67 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: ObjectCacheServer
+* File: ICacheEventQueueManager.cs
+*
+* ICacheEventQueueManager.cs is part of ObjectCacheServer which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* ObjectCacheServer is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* ObjectCacheServer is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+
+using VNLib.Utils.Async;
+
+
+namespace VNLib.Data.Caching.ObjectCache.Server
+{
+ /// <summary>
+ /// Represents a managment system for publishing and subscribing to cache change events
+ /// </summary>
+ internal interface ICacheEventQueueManager
+ {
+ /// <summary>
+ /// Publishes a change event to all subscribers
+ /// </summary>
+ /// <param name="change">The change event to publish</param>
+ void PublishSingle(ChangeEvent change);
+
+ /// <summary>
+ /// Publishes multiple change events to all subscribers
+ /// </summary>
+ /// <param name="changes">The span of changes to publish to all subscribers</param>
+ void PublishMultiple(Span<ChangeEvent> changes);
+
+ /// <summary>
+ /// Attatches a subscriber that will receive all published changes
+ /// </summary>
+ /// <param name="nodeId">The id of the node to get the queue for</param>
+ /// <returns>The initilaizes event queue for the single subscriber</returns>
+ AsyncQueue<ChangeEvent> Subscribe(ICachePeer peer);
+
+ /// <summary>
+ /// Detatches a subscriber from the event queue
+ /// </summary>
+ /// <param name="nodeId">The id of the nede to detach</param>
+ void Unsubscribe(ICachePeer peer);
+
+ /// <summary>
+ /// Purges all stale subcriber nodes
+ /// </summary>
+ void PurgeStaleSubscribers();
+ }
+}
diff --git a/plugins/ObjectCacheServer/src/ICachePeer.cs b/plugins/ObjectCacheServer/src/ICachePeer.cs
new file mode 100644
index 0000000..d374400
--- /dev/null
+++ b/plugins/ObjectCacheServer/src/ICachePeer.cs
@@ -0,0 +1,44 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: ObjectCacheServer
+* File: ICachePeer.cs
+*
+* ICachePeer.cs is part of ObjectCacheServer which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* ObjectCacheServer is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* ObjectCacheServer is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using VNLib.Data.Caching.Extensions;
+
+namespace VNLib.Data.Caching.ObjectCache.Server
+{
+ /// <summary>
+ /// Represents a fellow cache peer in the cluster
+ /// </summary>
+ internal interface ICachePeer
+ {
+ /// <summary>
+ /// The unique identifier of the node
+ /// </summary>
+ string NodeId { get; }
+
+ /// <summary>
+ /// An optional signed advertisment message for other peers
+ /// </summary>
+ ICachePeerAdvertisment? Advertisment { get; }
+ }
+}
diff --git a/plugins/ObjectCacheServer/src/Endpoints/ICacheStore.cs b/plugins/ObjectCacheServer/src/ICacheStore.cs
index f911af9..f911af9 100644
--- a/plugins/ObjectCacheServer/src/Endpoints/ICacheStore.cs
+++ b/plugins/ObjectCacheServer/src/ICacheStore.cs
diff --git a/plugins/ObjectCacheServer/src/NodeConfig.cs b/plugins/ObjectCacheServer/src/NodeConfig.cs
new file mode 100644
index 0000000..614f0d6
--- /dev/null
+++ b/plugins/ObjectCacheServer/src/NodeConfig.cs
@@ -0,0 +1,237 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: ObjectCacheServer
+* File: ObjectCacheServerEntry.cs
+*
+* ObjectCacheServerEntry.cs is part of ObjectCacheServer which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* ObjectCacheServer is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* ObjectCacheServer is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+using System.Net;
+using System.Linq;
+using System.Net.Http;
+using System.Text.Json;
+using System.Threading;
+using System.Net.Sockets;
+using System.Threading.Tasks;
+using System.Collections.Generic;
+using System.Security.Cryptography;
+
+using VNLib.Plugins;
+using VNLib.Utils;
+using VNLib.Utils.Logging;
+using VNLib.Utils.Extensions;
+using VNLib.Hashing;
+using VNLib.Hashing.IdentityUtility;
+using VNLib.Data.Caching.Extensions;
+using VNLib.Plugins.Extensions.Loading;
+
+
+namespace VNLib.Data.Caching.ObjectCache.Server
+{
+ [ConfigurationName("cluster")]
+ internal sealed class NodeConfig : VnDisposeable, IAsyncConfigurable, IAsyncBackgroundWork, IBrokerHeartbeatNotifier
+ {
+ const string CacheConfigTemplate =
+@"
+ Cluster Configuration:
+ Broker Address: {ba}
+ Heartbeat Timeout: {hb}
+ Node Id: {id}
+ TlsEndabled: {tls},
+ Cache Endpoint: {ep}
+";
+
+ public CacheNodeConfiguration Config { get; }
+ public CacheAuthKeyStore KeyStore { get; }
+
+ private readonly ManualResetEventSlim hearbeatHandle;
+ private readonly TimeSpan _hearbeatTimeout;
+
+ private string? _authToken;
+
+ public NodeConfig(PluginBase plugin, IConfigScope config)
+ {
+ //Server id is just dns name for now
+ string nodeId = Dns.GetHostName();
+ Config = new();
+
+ //Get the heartbeat interval
+ TimeSpan heartBeatDelayMs = config["heartbeat_timeout_sec"].GetTimeSpan(TimeParseType.Seconds);
+
+ string brokerAddr = config["broker_address"].GetString() ?? throw new KeyNotFoundException("Missing required key 'broker_address' for config 'cluster'");
+
+ //Get the port of the primary webserver
+ int port;
+ bool usingTls;
+ {
+ JsonElement firstHost = plugin.HostConfig.GetProperty("virtual_hosts").EnumerateArray().First();
+
+ port = firstHost.GetProperty("interface")
+ .GetProperty("port")
+ .GetInt32();
+
+ //If the ssl element is present, ssl is enabled for the server
+ usingTls = firstHost.TryGetProperty("ssl", out _);
+ }
+
+ //Get the cache endpoint config
+ IConfigScope cacheEpConfig = plugin.GetConfigForType<ConnectEndpoint>();
+
+ //The endpoint to advertise to cache clients that allows cache connections
+ UriBuilder endpoint = new(usingTls ? Uri.UriSchemeHttps : Uri.UriSchemeHttp, nodeId, port, cacheEpConfig["path"].GetString());
+
+ //Setup cache node config
+ Config.WithCacheEndpoint(endpoint.Uri)
+ .WithNodeId(nodeId)
+ .WithTls(usingTls)
+ .WithBroker(new(brokerAddr));
+
+ //Check if advertising is enabled
+ if(config.TryGetValue("advertise", out JsonElement adEl) && adEl.GetBoolean())
+ {
+ Config.EnableAdvertisment(true, "");
+ }
+
+ //Init key store
+ KeyStore = new(plugin);
+
+ //Init heartbeat handle unsiganled waiting for first heartbeat
+ hearbeatHandle = new(false);
+
+ //Schedule heartbeat
+ _ = plugin.ObserveWork(this, 500);
+
+ //log the config
+ plugin.Log.Information(CacheConfigTemplate,
+ brokerAddr,
+ heartBeatDelayMs,
+ nodeId,
+ usingTls,
+ endpoint.Uri);
+ }
+
+ async Task IAsyncConfigurable.ConfigureServiceAsync(PluginBase plugin)
+ {
+ //Get cache private key for signing from the key store
+ ReadOnlyJsonWebKey signingKey = await KeyStore.GetCachePrivateAsync();
+
+ Config.WithSigningKey(signingKey);
+
+ //Get broker public key for verifying from the key store
+ ReadOnlyJsonWebKey brokerKey = await KeyStore.GetBrokerPublicAsync();
+
+ Config.WithBrokerVerificationKey(brokerKey);
+ }
+
+ protected override void Free()
+ {
+ //Dispose the heartbeat handle
+ hearbeatHandle.Dispose();
+
+ //cleanup keys
+ Config.SigningKey?.Dispose();
+ Config.VerificationKey?.Dispose();
+ Config.BrokerVerificationKey?.Dispose();
+ }
+
+ ///<inheritdoc/>
+ public void HearbeatReceived()
+ {
+ //Set the heartbeat handle as received
+ hearbeatHandle.Set();
+ }
+
+ ///<inheritdoc/>
+ public string? GetAuthToken() => _authToken;
+
+ ///<inheritdoc/>
+ public Uri GetBrokerAddress() => Config.DiscoveryEndpoint!;
+
+ ///<inheritdoc/>
+ public ReadOnlyJsonWebKey GetBrokerPublicKey() => Config.BrokerVerificationKey!;
+
+
+ /*
+ * Worker loop for registering with the broker and monitoring hearbeat requests
+ */
+ async Task IAsyncBackgroundWork.DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken)
+ {
+ //Listen in loop
+ while (true)
+ {
+ try
+ {
+ //Regen the auth token before registering
+ _authToken = RandomHash.GetRandomBase32(32);
+
+ pluginLog.Information("Registering with cache broker server with id {id}", Config.NodeId);
+
+ //Register with the broker and pass the current auth token
+ await Config.RegisterWithBrokerAsync(_authToken);
+
+ //Enter heartbeat loop
+ while (true)
+ {
+ //Wait for the heartbeat timeout
+ await Task.Delay(_hearbeatTimeout, exitToken);
+
+ //Confrim the hearbeat was received within the timeout period
+ if (!hearbeatHandle.IsSet)
+ {
+ //If the heartbeat handle is not set, the heartbeat was not received, reg-register
+ pluginLog.Information("Broker missed hearbeat request");
+
+ //not received, break out of the heartbeat loop to re-register
+ break;
+ }
+
+ //Reset the handle and continue the heartbeat loop
+ hearbeatHandle.Reset();
+ }
+
+ //Add random delay to prevent all nodes from re-registering at the same time
+ await Task.Delay(RandomNumberGenerator.GetInt32(1000, 5000), exitToken);
+ }
+ catch (OperationCanceledException)
+ {
+ pluginLog.Debug("Registration loop exited on unload");
+ break;
+ }
+ catch (TimeoutException)
+ {
+ pluginLog.Warn("Failed to connect to cache broker server within the specified timeout period");
+ }
+ catch (HttpRequestException re) when (re.InnerException is SocketException)
+ {
+ pluginLog.Warn("Cache broker is unavailable or network is unavailable");
+ }
+ catch(HttpRequestException re) when (re.StatusCode.HasValue)
+ {
+ pluginLog.Warn("Failed to register with cache broker server, received status code {code}", re.StatusCode);
+ }
+ catch (Exception ex)
+ {
+ pluginLog.Warn("Exception occured in registraion loop: {ex}", ex!.Message);
+ }
+ }
+ }
+
+ }
+}
diff --git a/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs b/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs
index 2fa6220..c1a6ad2 100644
--- a/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs
+++ b/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs
@@ -27,7 +27,6 @@ using System.IO;
using System.Net;
using System.Linq;
using System.Net.Http;
-using System.Text.Json;
using System.Threading;
using System.Net.Sockets;
using System.Threading.Tasks;
@@ -38,7 +37,6 @@ using VNLib.Plugins;
using VNLib.Utils.Memory;
using VNLib.Utils.Logging;
using VNLib.Utils.Memory.Diagnostics;
-using VNLib.Hashing;
using VNLib.Hashing.IdentityUtility;
using VNLib.Data.Caching.Extensions;
using static VNLib.Data.Caching.Constants;
@@ -47,18 +45,64 @@ using VNLib.Net.Messaging.FBM.Client;
using VNLib.Plugins.Cache.Broker.Endpoints;
using VNLib.Plugins.Extensions.Loading;
using VNLib.Plugins.Extensions.Loading.Routing;
-
+using VNLib.Data.Caching.ObjectCache.Server.Endpoints;
namespace VNLib.Data.Caching.ObjectCache.Server
{
+ sealed record class CacheAuthKeyStore(PluginBase Plugin)
+ {
+ public Task<ReadOnlyJsonWebKey> GetCachePublicAsync()
+ {
+ return Plugin.TryGetSecretAsync("cache_private_key").ToJsonWebKey(true);
+ }
+
+ public Task<ReadOnlyJsonWebKey> GetCachePrivateAsync()
+ {
+ return Plugin.TryGetSecretAsync("cache_private_key").ToJsonWebKey(true);
+ }
+
+ public Task<ReadOnlyJsonWebKey> GetBrokerPublicAsync()
+ {
+ return Plugin.TryGetSecretAsync("broker_public_key").ToJsonWebKey(true);
+ }
+
+ public Task<ReadOnlyJsonWebKey> GetClientPublicKeyAsync()
+ {
+ return Plugin.TryGetSecretAsync("client_public_key").ToJsonWebKey(true);
+ }
+ }
+
+ internal interface IBrokerHeartbeatNotifier
+ {
+ /// <summary>
+ /// Called when the heartbeat endpoint receives a heartbeat from the broker
+ /// </summary>
+ void HearbeatReceived();
+
+ /// <summary>
+ /// Gets the current auth token sent to the broker, which is expected to be sent back in the heartbeat
+ /// </summary>
+ /// <returns>The heartbeat auth token if set</returns>
+ string? GetAuthToken();
+
+ /// <summary>
+ /// Gets the address of the broker server
+ /// </summary>
+ /// <returns>The full address of the broker server to connect to</returns>
+ Uri GetBrokerAddress();
+
+ /// <summary>
+ /// Gets the public key of the broker server
+ /// </summary>
+ /// <returns>The broker's public key</returns>
+ ReadOnlyJsonWebKey GetBrokerPublicKey();
+ }
+
public sealed class ObjectCacheServerEntry : PluginBase
{
public override string PluginName => "ObjectCache.Service";
- private readonly Lazy<IUnmangedHeap> _cacheHeap;
- private readonly object ServerLock;
- private readonly HashSet<ActiveServer> ListeningServers;
- private readonly ManualResetEvent BrokerSyncHandle;
+ private readonly Lazy<IUnmangedHeap> _cacheHeap;
/// <summary>
/// Gets the shared heap for the plugin
@@ -69,12 +113,6 @@ namespace VNLib.Data.Caching.ObjectCache.Server
{
//Init heap
_cacheHeap = new Lazy<IUnmangedHeap>(InitializeHeap, LazyThreadSafetyMode.PublicationOnly);
-
- ServerLock = new();
- ListeningServers = new();
-
- //Set sync handle
- BrokerSyncHandle = new(false);
}
private IUnmangedHeap InitializeHeap()
@@ -94,64 +132,30 @@ namespace VNLib.Data.Caching.ObjectCache.Server
}
- private string? BrokerHeartBeatToken;
-
- private void RemoveServer(ActiveServer server)
- {
- lock (ServerLock)
- {
- ListeningServers.Remove(server);
- }
- }
-
- private FBMClientConfig ClientConfig;
-
-
protected override void OnLoad()
{
try
{
- IConfigScope clusterConf = this.GetConfig("cluster");
-
- Uri brokerAddress = new(clusterConf["broker_address"].GetString() ?? throw new KeyNotFoundException("Missing required key 'broker_address' for config 'cluster'"));
+ //Setup Node config
+ NodeConfig nodeConf = this.GetOrCreateSingleton<NodeConfig>();
//Init connect endpoint
ConnectEndpoint endpoint = this.Route<ConnectEndpoint>();
- //Get the cache store from the connection endpoint
- ICacheStore store = endpoint.GetCacheStore();
+ //Route the broker endpoint
+ this.Route<BrokerHeartBeatEndpoint>();
+
+ //Setup discovery endpoint
+ if(this.HasConfigForType<PeerDiscoveryEndpoint>())
+ {
+ this.Route<PeerDiscoveryEndpoint>();
+ }
ulong maxByteSize = ((ulong)endpoint.CacheConfig.MaxCacheEntries * (ulong)endpoint.CacheConfig.BucketCount * (ulong)endpoint.CacheConfig.MaxMessageSize);
//Log max memory usage
Log.Debug("Maxium memory consumption {mx}Mb", maxByteSize / (ulong)(1024 * 1000));
-
- //Setup broker and regitration
- {
- //Route the broker endpoint
- BrokerHeartBeat brokerEp = new(() => BrokerHeartBeatToken!, BrokerSyncHandle, brokerAddress, this);
- Route(brokerEp);
-
- //start registration
- _ = this.ObserveWork(() => RegisterServerAsync(endpoint.Path), 200);
- }
-
- //Setup cluster worker
- {
- //Get pre-configured fbm client config for caching
- ClientConfig = FBMDataCacheExtensions.GetDefaultConfig(CacheHeap, endpoint.CacheConfig.MaxMessageSize / 2, default, this.IsDebug() ? Log : null);
-
- //Start Client runner
- _ = this.ObserveWork(() => RunClientAsync(store, brokerAddress), 300);
- }
-
- //Load a cache broker to the current server if the config is defined
- {
- if(this.HasConfigForType<BrokerRegistrationEndpoint>())
- {
- this.Route<BrokerRegistrationEndpoint>();
- }
- }
+
Log.Information("Plugin loaded");
}
@@ -169,430 +173,12 @@ namespace VNLib.Data.Caching.ObjectCache.Server
_cacheHeap.Value.Dispose();
}
- //Dispose mre sync handle
- BrokerSyncHandle.Dispose();
-
Log.Information("Plugin unloaded");
}
- #region Registration
-
- private async Task RegisterServerAsync(string connectPath)
- {
- try
- {
- //Get the broker config element
- IConfigScope clusterConfig = this.GetConfig("cluster");
-
- //Server id is just dns name for now
- string serverId = Dns.GetHostName();
-
- int heartBeatDelayMs = clusterConfig["heartbeat_timeout_sec"].GetInt32() * 1000;
-
-
- //Get the port of the primary webserver
- int port;
- bool usingTls;
- {
- JsonElement firstHost = HostConfig.GetProperty("virtual_hosts").EnumerateArray().First();
-
- port = firstHost.GetProperty("interface")
- .GetProperty("port")
- .GetInt32();
-
- //If a certificate is specified, tls is enabled on the port
- usingTls = firstHost.TryGetProperty("cert", out _);
- }
-
- using BrokerRegistrationRequest request = new();
- {
- string addr = clusterConfig["broker_address"].GetString() ?? throw new KeyNotFoundException("Missing required key 'broker_address' for config 'cluster'");
-
- //Recover the certificate
- ReadOnlyJsonWebKey cacheCert = await GetCachePrivate();
-
- //Init url builder for payload, see if tls is enabled
- Uri connectAddress = new UriBuilder(usingTls ? Uri.UriSchemeHttps : Uri.UriSchemeHttp, Dns.GetHostName(), port, connectPath).Uri;
-
- request.WithBroker(new(addr))
- .WithRegistrationAddress(connectAddress.ToString())
- .WithNodeId(serverId)
- .WithSigningKey(cacheCert, true);
- }
-
- while (true)
- {
- try
- {
- //Gen a random reg token before registering
- BrokerHeartBeatToken = RandomHash.GetRandomHex(32);
-
- //Assign new hb token
- request.WithHeartbeatToken(BrokerHeartBeatToken);
-
- Log.Information("Registering with cache broker {addr}, with node-id {id}", request.BrokerAddress, serverId);
-
- //Register with the broker
- await FBMDataCacheExtensions.ResgisterWithBrokerAsync(request);
-
- Log.Debug("Successfully registered with cache broker");
-
- /*
- * Wait in a loop for the broker to send a keepalive
- * request with the specified token. When the event
- * is signaled the task will be completed
- */
- while (true)
- {
- await Task.Delay(heartBeatDelayMs, UnloadToken);
-
- //Set the timeout to 0 to it will just check the status without blocking
- if (!BrokerSyncHandle.WaitOne(0))
- {
- //server miseed a keepalive event, time to break the loop and retry
- Log.Debug("Broker missed a heartbeat request, attempting to re-register");
- break;
- }
-
- //Reset the msr
- BrokerSyncHandle.Reset();
- }
- }
- catch (TaskCanceledException)
- {
- throw;
- }
- catch (TimeoutException)
- {
- Log.Warn("Failed to connect to cache broker server within the specified timeout period");
- }
- catch (HttpRequestException re) when (re.InnerException is SocketException)
- {
- Log.Warn("Cache broker is unavailable or network is unavailable");
- }
- catch (Exception ex)
- {
- Log.Warn(ex, "Failed to update broker registration");
- }
-
- //Gen random ms delay
- int randomMsDelay = RandomNumberGenerator.GetInt32(500, 2000);
- //Delay
- await Task.Delay(randomMsDelay, UnloadToken);
- }
- }
- catch (KeyNotFoundException kne)
- {
- Log.Error("Missing required broker configuration variables {ke}", kne.Message);
- }
- catch (TaskCanceledException)
- {
- //Normal unload/exit
- }
- catch (Exception ex)
- {
- Log.Error(ex);
- }
- finally
- {
- BrokerHeartBeatToken = null;
- }
- Log.Debug("Registration worker exited");
- }
-
- #endregion
-
- #region Cluster
-
- private async Task<ReadOnlyJsonWebKey> GetCachePrivate()
- {
- return await this.TryGetSecretAsync("cache_private_key").ToJsonWebKey() ?? throw new KeyNotFoundException("Failed to load the cache private key");
- }
-
- private async Task<ReadOnlyJsonWebKey> GetBrokerPublic()
- {
- return await this.TryGetSecretAsync("broker_public_key").ToJsonWebKey() ?? throw new KeyNotFoundException("Failed to load the broker's public key");
- }
-
-
- /// <summary>
- /// Starts a self-contained process-long task to discover other cache servers
- /// from a shared broker server
- /// </summary>
- /// <param name="cacheStore">The cache store to synchronize</param>
- /// <param name="brokerAddress">The broker server's address</param>
- /// <param name="serverId">The node-id of the current server</param>
- /// <param name="clientConf">The configuration to use when initializing synchronization clients</param>
- /// <returns>A task that resolves when the plugin unloads</returns>
- private async Task RunClientAsync(ICacheStore cacheStore, Uri brokerAddress)
- {
- TimeSpan noServerDelay = TimeSpan.FromSeconds(10);
-
- //The node id is just the dns hostname of the current machine
- string nodeId = Dns.GetHostName();
-
- ListServerRequest listRequest = new(brokerAddress);
- try
- {
- //Get the broker config element
- IConfigScope clusterConf = this.GetConfig("cluster");
-
- int serverCheckMs = clusterConf["update_interval_sec"].GetInt32() * 1000;
-
- //Setup signing and verification certificates
- ReadOnlyJsonWebKey cacheSig = await GetCachePrivate();
- ReadOnlyJsonWebKey brokerPub = await GetBrokerPublic();
-
- //Import certificates
- listRequest.WithVerificationKey(brokerPub)
- .WithSigningKey(cacheSig);
-
- //Main event loop
- Log.Information("Begining cluster node discovery");
-
- ILogProvider? debugLog = this.IsDebug() ? Log : null;
-
- while (true)
- {
- //Load the server list
- ActiveServer[]? servers;
- while (true)
- {
- try
- {
- debugLog?.Information("[CACHE] Requesting server list from broker");
-
- //Get server list
- servers = await FBMDataCacheExtensions.ListServersAsync(listRequest, UnloadToken);
-
- //Servers are loaded, so continue
- break;
- }
- catch(HttpRequestException he) when(he.InnerException is SocketException)
- {
- Log.Warn("Failed to connect to cache broker, trying again");
- }
- catch (TimeoutException)
- {
- Log.Warn("Failed to connect to cache broker server within the specified timeout period");
- }
- catch (Exception ex)
- {
- Log.Warn(ex, "Failed to get server list from broker");
- }
-
- //Gen random ms delay
- int randomMsDelay = RandomNumberGenerator.GetInt32(1000, 2000);
-
- //Delay
- await Task.Delay(randomMsDelay, UnloadToken);
- }
-
- if(servers == null || servers.Length == 0)
- {
- Log.Information("No cluster nodes found, retrying");
- //Delay
- await Task.Delay(noServerDelay, UnloadToken);
- continue;
- }
-
-
- //Lock on sever set while enumerating
- lock (ServerLock)
- {
- //Select servers that are not the current server and are not already being monitored
- IEnumerable<ActiveServer> serversToConnectTo = servers.Where(s => !nodeId.Equals(s.ServerId, StringComparison.OrdinalIgnoreCase));
-
- //Connect to servers
- foreach (ActiveServer server in serversToConnectTo)
- {
- //Make sure were not currently connected to the server
- if (!ListeningServers.Contains(server))
- {
- //Add the server to the set
- ListeningServers.Add(server);
-
- //Run listener background task
- _ = this.ObserveWork(() => RunSyncTaskAsync(server, cacheStore, nodeId));
- }
- }
- }
-
- //Delay until next check cycle
- await Task.Delay(serverCheckMs, UnloadToken);
- }
- }
- catch (FileNotFoundException)
- {
- Log.Error("Client/cluster private cluster key file was not found or could not be read");
- }
- catch (KeyNotFoundException)
- {
- Log.Error("Missing required cluster configuration varables");
- }
- catch (TaskCanceledException)
- {
- //normal exit/unload
- }
- catch (Exception ex)
- {
- Log.Error(ex);
- }
- finally
- {
- listRequest.Dispose();
- }
- Log.Debug("Cluster sync worker exited");
- }
-
- private async Task RunSyncTaskAsync(ActiveServer server, ICacheStore cacheStore, string nodeId)
- {
- //Setup timeout for get operations to avoid deadlocks
- TimeSpan getTimeout = TimeSpan.FromSeconds(30);
-
- //Setup client
- FBMClient client = new(ClientConfig);
- try
- {
- async Task UpdateRecordAsync(string objectId, string newId)
- {
- //Get request message
- FBMRequest modRequest = client.RentRequest();
- try
- {
- //Set action as get/create
- modRequest.WriteHeader(HeaderCommand.Action, Actions.Get);
- //Set session-id header
- modRequest.WriteHeader(ObjectId, string.IsNullOrWhiteSpace(newId) ? objectId : newId);
-
- //Make request
- using FBMResponse response = await client.SendAsync(modRequest, getTimeout, UnloadToken);
-
- response.ThrowIfNotSet();
-
- //Check response code
- string status = response.Headers.First(static s => s.Header == HeaderCommand.Status).Value.ToString();
- if (ResponseCodes.Okay.Equals(status, StringComparison.Ordinal))
- {
- //Update the record
- await cacheStore.AddOrUpdateBlobAsync(objectId, newId, static (t) => t.ResponseBody, response);
- Log.Debug("Updated object {id}", objectId);
- }
- else
- {
- Log.Warn("Object {id} was missing on the remote server", objectId);
- }
- }
- finally
- {
- client.ReturnRequest(modRequest);
- }
- }
-
- {
- //Sign and verify requests with the cache private key since we are a peer
- ReadOnlyJsonWebKey cachePriv = await GetCachePrivate();
-
- //Configure cache
- client.GetCacheConfiguration()
- .WithVerificationKey(cachePriv)
- .WithSigningCertificate(cachePriv)
- .WithNodeId(nodeId) //set nodeid since were listening for changes
- .WithTls(false);
- }
-
- Log.Information("Connecting to {server}...", server.ServerId);
-
- //Connect to the server
- await client.ConnectToCacheAsync(server, UnloadToken);
-
- //Wroker task callback method
- async Task BgWorkerAsync()
- {
- //Listen for changes
- while (true)
- {
- //Wait for changes
- WaitForChangeResult changedObject = await client.WaitForChangeAsync(UnloadToken);
-
- Log.Debug("Object changed {typ} {obj}", changedObject.Status, changedObject.CurrentId);
-
- switch (changedObject.Status)
- {
- case ResponseCodes.NotFound:
- Log.Warn("Server cache not properly configured, worker exiting");
- return;
- case "deleted":
- //Delete the object from the store
- await cacheStore.DeleteItemAsync(changedObject.CurrentId);
- break;
- case "modified":
- //Reload the record from the store
- await UpdateRecordAsync(changedObject.CurrentId, changedObject.NewId);
- break;
- }
- }
- }
-
- Log.Information("Connected to {server}, starting queue listeners", server.ServerId);
-
- //Start worker tasks
- List<Task> workerTasks = new();
- for(int i = 0; i < Environment.ProcessorCount; i++)
- {
- workerTasks.Add(Task.Run(BgWorkerAsync));
- }
-
- //Wait for sync workers to exit
- await Task.WhenAll(workerTasks);
- }
- catch (InvalidResponseException ie)
- {
- //See if the plugin is unloading
- if (!UnloadToken.IsCancellationRequested)
- {
- Log.Debug("Server responded with invalid response packet, disconnected. reason {reason}", ie);
- }
- //Disconnect client gracefully
- try
- {
- await client.DisconnectAsync();
- }
- catch (Exception ex)
- {
- Log.Error(ex);
- }
- }
- catch (OperationCanceledException)
- {
- //Plugin unloading, Try to disconnect
- try
- {
- await client.DisconnectAsync();
- }
- catch(Exception ex)
- {
- Log.Error(ex);
- }
- }
- catch(Exception ex)
- {
- Log.Warn("Lost connection to server {h}, {m}", server.ServerId, ex);
- }
- finally
- {
- //Remove server from active list, since its been disconnected
- RemoveServer(server);
- client.Dispose();
- }
- }
-
protected override void ProcessHostCommand(string cmd)
{
- Log.Debug(cmd);
+ throw new NotImplementedException();
}
-
-
- #endregion
}
}