From 2f674e79d42e7d36225fa9ac7ecefbc5bc62d325 Mon Sep 17 00:00:00 2001 From: vnugent Date: Thu, 13 Jul 2023 13:20:25 -0400 Subject: Checkpoint, kind of working clustering --- plugins/ObjectCacheServer/src/CacheAuthKeyStore.cs | 17 +- .../src/CacheEventQueueManager.cs | 6 +- .../ObjectCacheServer/src/CacheListenerPubQueue.cs | 25 +- plugins/ObjectCacheServer/src/CacheStore.cs | 27 +- plugins/ObjectCacheServer/src/CacheSystemUtil.cs | 2 +- .../src/Clustering/CacheNodeReplicationMaanger.cs | 290 +++++++++++++++++++++ .../src/Clustering/CachePeerMonitor.cs | 98 +++++++ .../src/Clustering/ICachePeerAdapter.cs | 49 ++++ .../src/Clustering/IPeerMonitor.cs | 54 ++++ .../src/Clustering/PeerDiscoveryManager.cs | 268 +++++++++++++++++++ .../Distribution/CacheNodeReplicationMaanger.cs | 250 ------------------ .../src/Distribution/CachePeerMonitor.cs | 70 ----- .../src/Distribution/ICachePeerAdapter.cs | 49 ---- .../src/Distribution/IPeerMonitor.cs | 53 ---- .../src/Distribution/KnownPeerList.cs | 100 ------- .../src/Distribution/PeerDiscoveryManager.cs | 161 ------------ .../src/Endpoints/ConnectEndpoint.cs | 71 +++-- .../src/Endpoints/PeerDiscoveryEndpoint.cs | 29 ++- .../src/Endpoints/WellKnownEndpoint.cs | 111 ++++++++ plugins/ObjectCacheServer/src/ICachePeer.cs | 4 +- plugins/ObjectCacheServer/src/NodeConfig.cs | 63 +++-- .../ObjectCacheServer/src/ObjectCacheServer.csproj | 5 + .../src/ObjectCacheServerEntry.cs | 8 +- 23 files changed, 1020 insertions(+), 790 deletions(-) create mode 100644 plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs create mode 100644 plugins/ObjectCacheServer/src/Clustering/CachePeerMonitor.cs create mode 100644 plugins/ObjectCacheServer/src/Clustering/ICachePeerAdapter.cs create mode 100644 plugins/ObjectCacheServer/src/Clustering/IPeerMonitor.cs create mode 100644 plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs delete mode 100644 plugins/ObjectCacheServer/src/Distribution/CacheNodeReplicationMaanger.cs delete mode 100644 plugins/ObjectCacheServer/src/Distribution/CachePeerMonitor.cs delete mode 100644 plugins/ObjectCacheServer/src/Distribution/ICachePeerAdapter.cs delete mode 100644 plugins/ObjectCacheServer/src/Distribution/IPeerMonitor.cs delete mode 100644 plugins/ObjectCacheServer/src/Distribution/KnownPeerList.cs delete mode 100644 plugins/ObjectCacheServer/src/Distribution/PeerDiscoveryManager.cs create mode 100644 plugins/ObjectCacheServer/src/Endpoints/WellKnownEndpoint.cs (limited to 'plugins/ObjectCacheServer') diff --git a/plugins/ObjectCacheServer/src/CacheAuthKeyStore.cs b/plugins/ObjectCacheServer/src/CacheAuthKeyStore.cs index 6725fbe..5fc700b 100644 --- a/plugins/ObjectCacheServer/src/CacheAuthKeyStore.cs +++ b/plugins/ObjectCacheServer/src/CacheAuthKeyStore.cs @@ -32,7 +32,6 @@ using VNLib.Hashing.IdentityUtility; using VNLib.Plugins.Extensions.Loading; using VNLib.Data.Caching.Extensions; - namespace VNLib.Data.Caching.ObjectCache.Server { sealed record class CacheAuthKeyStore : ICacheAuthManager @@ -59,9 +58,10 @@ namespace VNLib.Data.Caching.ObjectCache.Server } /// - public bool VerifyJwt(JsonWebToken jwt) + public bool VerifyJwt(JsonWebToken jwt, bool isPeer) { - return jwt.VerifyFromJwk(_clientPub.Value); + //Verify from peer server or client + return isPeer ? jwt.VerifyFromJwk(_cachePriv.Value) : jwt.VerifyFromJwk(_clientPub.Value); } /// @@ -85,7 +85,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server } //try to get the ecdsa alg for the signing key - using ECDsa? ecdsa = _cachePriv.Value.GetECDsaPublicKey(); + using ECDsa? ecdsa = _cachePriv.Value.GetECDsaPrivateKey(); if (ecdsa != null) { return ecdsa.SignHash(hash); @@ -95,17 +95,20 @@ namespace VNLib.Data.Caching.ObjectCache.Server } /// - public bool VerifyMessageHash(ReadOnlySpan hash, HashAlg alg, ReadOnlySpan signature) + public bool VerifyMessageHash(ReadOnlySpan hash, HashAlg alg, ReadOnlySpan signature, bool isPeer) { + //Determine the key to verify against + ReadOnlyJsonWebKey jwk = isPeer ? _cachePriv.Value : _clientPub.Value; + //try to get the rsa alg for the signing key - using RSA? rsa = _clientPub.Value.GetRSAPublicKey(); + using RSA? rsa = jwk.GetRSAPublicKey(); if (rsa != null) { return rsa.VerifyHash(hash, signature, alg.GetAlgName(), RSASignaturePadding.Pkcs1); } //try to get the ecdsa alg for the signing key - using ECDsa? ecdsa = _clientPub.Value.GetECDsaPublicKey(); + using ECDsa? ecdsa = jwk.GetECDsaPublicKey(); if (ecdsa != null) { return ecdsa.VerifyHash(hash, signature); diff --git a/plugins/ObjectCacheServer/src/CacheEventQueueManager.cs b/plugins/ObjectCacheServer/src/CacheEventQueueManager.cs index 049069e..3827121 100644 --- a/plugins/ObjectCacheServer/src/CacheEventQueueManager.cs +++ b/plugins/ObjectCacheServer/src/CacheEventQueueManager.cs @@ -104,7 +104,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server } //Return the node's queue - return nq.Queue; + return nq; } /// @@ -183,8 +183,10 @@ namespace VNLib.Data.Caching.ObjectCache.Server //Interval to purge stale subscribers Task IIntervalScheduleable.OnIntervalAsync(ILogProvider log, CancellationToken cancellationToken) { - //Purge + log.Debug("Purging stale peer event queues"); + PurgeStaleSubscribers(); + return Task.CompletedTask; } diff --git a/plugins/ObjectCacheServer/src/CacheListenerPubQueue.cs b/plugins/ObjectCacheServer/src/CacheListenerPubQueue.cs index 52b6abf..9c7388e 100644 --- a/plugins/ObjectCacheServer/src/CacheListenerPubQueue.cs +++ b/plugins/ObjectCacheServer/src/CacheListenerPubQueue.cs @@ -25,13 +25,13 @@ using System; using System.Threading; using System.Threading.Tasks; +using System.Threading.Channels; using VNLib.Utils.Async; using VNLib.Utils.Logging; using VNLib.Plugins; using VNLib.Plugins.Extensions.Loading; - namespace VNLib.Data.Caching.ObjectCache.Server { internal sealed class CacheListenerPubQueue : ICacheListenerEventQueue, IAsyncBackgroundWork @@ -44,10 +44,13 @@ namespace VNLib.Data.Caching.ObjectCache.Server { _queueManager = plugin.GetOrCreateSingleton(); _logProvider = plugin.Log; - _listenerQueue = new AsyncQueue(false, true); - - //Register processing worker - _ = plugin.ObserveWork(this, 500); + _listenerQueue = new AsyncQueue(new BoundedChannelOptions(10000) + { + AllowSynchronousContinuations = true, + FullMode = BoundedChannelFullMode.DropOldest, + SingleReader = true, + SingleWriter = false, + }); } /// @@ -59,25 +62,25 @@ namespace VNLib.Data.Caching.ObjectCache.Server { //Accumulator for events ChangeEvent[] accumulator = new ChangeEvent[accumulatorSize]; - int ptr = 0; + int index = 0; //Listen for changes while (true) { //Wait for next event - accumulator[ptr++] = await _listenerQueue.DequeueAsync(exitToken); + accumulator[index++] = await _listenerQueue.DequeueAsync(exitToken); //try to accumulate more events until we can't anymore - while (_listenerQueue.TryDequeue(out ChangeEvent? ev) && ptr < accumulatorSize) + while (_listenerQueue.TryDequeue(out ChangeEvent? ev) && index < accumulatorSize) { - accumulator[ptr++] = ev; + accumulator[index++] = ev; } //Publish all events to subscribers - _queueManager.PublishMultiple(accumulator.AsSpan(0, ptr)); + _queueManager.PublishMultiple(accumulator.AsSpan(0, index)); //Reset pointer - ptr = 0; + index = 0; } } catch (OperationCanceledException) diff --git a/plugins/ObjectCacheServer/src/CacheStore.cs b/plugins/ObjectCacheServer/src/CacheStore.cs index e7a7c63..c1d47f6 100644 --- a/plugins/ObjectCacheServer/src/CacheStore.cs +++ b/plugins/ObjectCacheServer/src/CacheStore.cs @@ -30,7 +30,6 @@ using VNLib.Utils.Logging; using VNLib.Plugins; using VNLib.Plugins.Extensions.Loading; - namespace VNLib.Data.Caching.ObjectCache.Server { [ConfigurationName("cache")] @@ -62,6 +61,14 @@ namespace VNLib.Data.Caching.ObjectCache.Server private static BlobCacheListener InitializeCache(ObjectCacheServerEntry plugin, IConfigScope config) { + const string CacheConfigTemplate = +@" +Cache Configuration: + Max memory: {max} Mb + Buckets: {bc} + Entries per-bucket: {mc} +"; + //Deserialize the cache config CacheConfiguration cacheConf = config.Deserialze(); @@ -74,22 +81,24 @@ namespace VNLib.Data.Caching.ObjectCache.Server if (cacheConf.MaxCacheEntries < 200) { plugin.Log.Information("Suggestion: You may want a larger cache size, you have less than 200 items in cache"); - } - - plugin.Log.Verbose("Creating cache store with {bc} buckets, with {mc} items/bucket", cacheConf.BucketCount, cacheConf.MaxCacheEntries); + } //calculate the max memory usage ulong maxByteSize = ((ulong)cacheConf.MaxCacheEntries * (ulong)cacheConf.BucketCount * (ulong)cacheConf.MaxMessageSize); - //Log max memory usage - plugin.Log.Debug("Maxium memory consumption {mx}Mb", maxByteSize / (ulong)(1024 * 1000)); - - //Load the blob cache table system - IBlobCacheTable bc = plugin.LoadMemoryCacheSystem(config, plugin.CacheHeap, cacheConf); + //Log the cache config + plugin.Log.Information(CacheConfigTemplate, + maxByteSize / (ulong)(1024 * 1000), + cacheConf.BucketCount, + cacheConf.MaxCacheEntries + ); //Get the event listener ICacheListenerEventQueue queue = plugin.GetOrCreateSingleton(); + //Load the blob cache table system + IBlobCacheTable bc = plugin.LoadMemoryCacheSystem(config, plugin.CacheHeap, cacheConf); + //Endpoint only allows for a single reader return new(bc, queue, plugin.Log, plugin.CacheHeap); } diff --git a/plugins/ObjectCacheServer/src/CacheSystemUtil.cs b/plugins/ObjectCacheServer/src/CacheSystemUtil.cs index 669b84f..f8aedae 100644 --- a/plugins/ObjectCacheServer/src/CacheSystemUtil.cs +++ b/plugins/ObjectCacheServer/src/CacheSystemUtil.cs @@ -70,7 +70,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server else { //Default type - table = GetInternalBlobCache(heap, cacheConf, pCManager); + table = GetInternalBlobCache(heap, cacheConf, pCManager); } //Initialize the subsystem from the cache table diff --git a/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs b/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs new file mode 100644 index 0000000..ffdd4f4 --- /dev/null +++ b/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs @@ -0,0 +1,290 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: ObjectCacheServer +* File: CacheNodeReplicationMaanger.cs +* +* CacheNodeReplicationMaanger.cs is part of ObjectCacheServer which is part +* of the larger VNLib collection of libraries and utilities. +* +* ObjectCacheServer is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* ObjectCacheServer is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using System.Collections.Generic; + +using VNLib.Plugins; +using VNLib.Utils.Logging; +using VNLib.Data.Caching.Extensions; +using static VNLib.Data.Caching.Constants; +using VNLib.Net.Messaging.FBM; +using VNLib.Net.Messaging.FBM.Client; +using VNLib.Plugins.Extensions.Loading; +using VNLib.Data.Caching.Extensions.Clustering; + +namespace VNLib.Data.Caching.ObjectCache.Server.Clustering +{ + + /* + * This class is responsible for replicating the cache with other nodes. + * + * It does this by connecting to other nodes and listening for change events. + * When a change event occurs, it takes action against the local cache store, + * to keep it consistent with the other nodes. + * + * Change events are only handled first-hand, meaning that events do not + * propagate to other nodes, they must be connected individually to each node + * and listen for changes. + */ + + internal sealed class CacheNodeReplicationMaanger : IAsyncBackgroundWork + { + private const string LOG_SCOPE_NAME = "REPL"; + + private static readonly TimeSpan GetItemTimeout = TimeSpan.FromSeconds(10); + private const int MAX_MESSAGE_SIZE = 12 * 1024; + + private readonly PluginBase _plugin; + private readonly ILogProvider _log; + private readonly NodeConfig _nodeConfig; + private readonly ICacheStore _cacheStore; + private readonly ICachePeerAdapter _peerAdapter; + private readonly FBMClientConfig _replicationClientConfig; + + private readonly bool _isDebug; + + private int _openConnections; + + public CacheNodeReplicationMaanger(PluginBase plugin) + { + //Load the node config + _nodeConfig = plugin.GetOrCreateSingleton(); + _cacheStore = plugin.GetOrCreateSingleton(); + _peerAdapter = plugin.GetOrCreateSingleton(); + + //Init fbm config with fixed message size + _replicationClientConfig = FBMDataCacheExtensions.GetDefaultConfig( + (plugin as ObjectCacheServerEntry)!.CacheHeap, + MAX_MESSAGE_SIZE, + debugLog: plugin.IsDebug() ? plugin.Log : null + ); + + _plugin = plugin; + _isDebug = plugin.IsDebug(); + _log = plugin.Log.CreateScope(LOG_SCOPE_NAME); + } + + public async Task DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken) + { + _log.Information("Initializing node replication worker"); + + try + { + while (true) + { + //Get all new peers + CacheNodeAdvertisment[] peers = _peerAdapter.GetNewPeers(); + + if (peers.Length == 0 && _isDebug) + { + _log.Verbose("No new peers to connect to"); + } + + //Make sure we don't exceed the max connections + if(_openConnections >= _nodeConfig.MaxPeerConnections) + { + if (_isDebug) + { + _log.Verbose("Max peer connections reached, waiting for a connection to close"); + } + } + else + { + //Connect to each peer as a background task + foreach (CacheNodeAdvertisment peer in peers) + { + _ = _plugin.ObserveWork(() => OnNewPeerDoWorkAsync(peer, _log, exitToken)); + } + } + + //Wait for a new peers + await Task.Delay(10000, exitToken); + } + } + catch (OperationCanceledException) + { + //Normal exit + } + catch + { + _log.Error("Node replication worker exited with an error"); + throw; + } + finally + { + + } + + _log.Information("Node replication worker exited"); + } + + private async Task OnNewPeerDoWorkAsync(CacheNodeAdvertisment newPeer, ILogProvider log, CancellationToken exitToken) + { + _ = newPeer ?? throw new ArgumentNullException(nameof(newPeer)); + + //Setup client + FBMClient client = new(_replicationClientConfig); + + //Add peer to monitor + _peerAdapter.OnPeerListenerAttached(newPeer); + + Interlocked.Increment(ref _openConnections); + + try + { + log.Information("Establishing replication connection to peer {server}...", newPeer.NodeId); + + //Connect to the server + await client.ConnectToCacheAsync(newPeer, _nodeConfig.Config, exitToken); + + log.Information("Connected to {server}, starting queue listeners", newPeer.NodeId); + + //Start worker tasks + List workerTasks = new(); + + for (int i = 0; i < Environment.ProcessorCount; i++) + { + Task workerTask = Task.Run(() => ReplicationWorkerDoWorkAsync(client, log, exitToken), exitToken); + + workerTasks.Add(workerTask); + } + + //Wait for sync workers to exit + await Task.WhenAll(workerTasks); + + log.Debug("All cache worker tasks exited successfully, disconnecting from {server}", newPeer.NodeId); + + //Disconnect client gracefully + await client.DisconnectAsync(CancellationToken.None); + } + catch (InvalidResponseException ie) + { + //See if the plugin is unloading + if (!exitToken.IsCancellationRequested) + { + log.Debug("Peer {p} responded with invalid response packet, disconnected. reason\n {reason}", newPeer.NodeId, ie); + } + //Disconnect client gracefully + try + { + await client.DisconnectAsync(CancellationToken.None); + } + catch (Exception ex) + { + log.Error(ex); + } + } + catch (OperationCanceledException) + { + //Plugin unloading, Try to disconnect + try + { + await client.DisconnectAsync(CancellationToken.None); + } + catch (Exception ex) + { + log.Error(ex); + } + } + catch (Exception ex) + { + log.Warn("Lost connection to peer {h}\n {m}", newPeer.NodeId, ex); + } + finally + { + Interlocked.Decrement(ref _openConnections); + + client.Dispose(); + + //Notify monitor of disconnect + _peerAdapter.OnPeerListenerDetatched(newPeer); + } + } + + //Wroker task callback method + private async Task ReplicationWorkerDoWorkAsync(FBMClient client, ILogProvider log, CancellationToken exitToken) + { + //Reusable request message + using FBMRequest request = new(client.Config); + + //Listen for changes + while (true) + { + //Wait for changes + WaitForChangeResult changedObject = await client.WaitForChangeAsync(exitToken); + + log.Debug("Object changed {typ} {obj}", changedObject.Status, changedObject.CurrentId); + + switch (changedObject.Status) + { + case ResponseCodes.NotFound: + log.Error("Server cache not properly configured, worker exiting"); + return; + case "deleted": + //Delete the object from the store + await _cacheStore.DeleteItemAsync(changedObject.CurrentId, CancellationToken.None); + break; + case "modified": + //Reload the record from the store + await UpdateRecordAsync(client, request, log, changedObject.CurrentId, changedObject.NewId, exitToken); + break; + } + + //Reset request message + request.Reset(); + } + } + + private async Task UpdateRecordAsync(FBMClient client, FBMRequest modRequest, ILogProvider log, string objectId, string newId, CancellationToken cancellation) + { + //Set action as get/create + modRequest.WriteHeader(HeaderCommand.Action, Actions.Get); + //Set session-id header + modRequest.WriteHeader(ObjectId, string.IsNullOrWhiteSpace(newId) ? objectId : newId); + + //Make request + using FBMResponse response = await client.SendAsync(modRequest, GetItemTimeout, cancellation); + + response.ThrowIfNotSet(); + + //Check response code + string status = response.Headers.First(static s => s.Header == HeaderCommand.Status).Value.ToString(); + + if (ResponseCodes.Okay.Equals(status, StringComparison.Ordinal)) + { + //Update the record + await _cacheStore.AddOrUpdateBlobAsync(objectId, newId, static (t) => t.ResponseBody, response, cancellation); + log.Debug("Updated object {id}", objectId); + } + else + { + log.Warn("Object {id} was missing on the remote server", objectId); + } + } + } +} diff --git a/plugins/ObjectCacheServer/src/Clustering/CachePeerMonitor.cs b/plugins/ObjectCacheServer/src/Clustering/CachePeerMonitor.cs new file mode 100644 index 0000000..c49a54b --- /dev/null +++ b/plugins/ObjectCacheServer/src/Clustering/CachePeerMonitor.cs @@ -0,0 +1,98 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: ObjectCacheServer +* File: CachePeerMonitor.cs +* +* CachePeerMonitor.cs is part of ObjectCacheServer which is part of the larger +* VNLib collection of libraries and utilities. +* +* ObjectCacheServer is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* ObjectCacheServer is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using System.Collections.Generic; + +using VNLib.Utils; +using VNLib.Utils.Extensions; +using VNLib.Plugins; + +namespace VNLib.Data.Caching.ObjectCache.Server.Clustering +{ + + internal sealed class CachePeerMonitor : VnDisposeable, IPeerMonitor + { + + private readonly LinkedList peers = new(); + private readonly ManualResetEvent newPeerTrigger = new (false); + + public CachePeerMonitor(PluginBase plugin) + { } + + /// + /// Waits for new peers to connect to the server + /// + /// A task that complets when a new peer has connected + public async Task WaitForChangeAsync() + { + await newPeerTrigger.WaitAsync(); + + //Reset the trigger for next call + newPeerTrigger.Reset(); + } + + /// + public IEnumerable GetAllPeers() + { + lock(peers) + { + return peers.ToArray(); + } + } + + /// + public void OnPeerConnected(ICachePeer peer) + { + //When a peer is connected we can add it to the list so the replication manager can see it + lock(peers) + { + peers.AddLast(peer); + } + + //Trigger monitor when change occurs + if(peer.Advertisment != null) + { + newPeerTrigger.Set(); + } + } + + /// + public void OnPeerDisconnected(ICachePeer peer) + { + //When a peer is disconnected we can remove it from the list + lock(peers) + { + peers.Remove(peer); + } + } + + protected override void Free() + { + newPeerTrigger.Dispose(); + } + } +} diff --git a/plugins/ObjectCacheServer/src/Clustering/ICachePeerAdapter.cs b/plugins/ObjectCacheServer/src/Clustering/ICachePeerAdapter.cs new file mode 100644 index 0000000..dd426f6 --- /dev/null +++ b/plugins/ObjectCacheServer/src/Clustering/ICachePeerAdapter.cs @@ -0,0 +1,49 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: ObjectCacheServer +* File: ICachePeerAdapter.cs +* +* ICachePeerAdapter.cs is part of ObjectCacheServer which is part of the larger +* VNLib collection of libraries and utilities. +* +* ObjectCacheServer is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* ObjectCacheServer is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using VNLib.Data.Caching.Extensions.Clustering; + +namespace VNLib.Data.Caching.ObjectCache.Server.Clustering +{ + internal interface ICachePeerAdapter + { + /// + /// Gets the peers that have been discovered but not yet connected to + /// + /// A collection of peers that have not been connected to yet + CacheNodeAdvertisment[] GetNewPeers(); + + /// + /// Called when a peer has been connected to + /// + /// The peer that has been connected + void OnPeerListenerAttached(CacheNodeAdvertisment peer); + + /// + /// Called when a peer has been disconnected from + /// + /// The disconnected peer + void OnPeerListenerDetatched(CacheNodeAdvertisment peer); + } +} diff --git a/plugins/ObjectCacheServer/src/Clustering/IPeerMonitor.cs b/plugins/ObjectCacheServer/src/Clustering/IPeerMonitor.cs new file mode 100644 index 0000000..d8f358f --- /dev/null +++ b/plugins/ObjectCacheServer/src/Clustering/IPeerMonitor.cs @@ -0,0 +1,54 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: ObjectCacheServer +* File: IPeerMonitor.cs +* +* IPeerMonitor.cs is part of ObjectCacheServer which is part of the larger +* VNLib collection of libraries and utilities. +* +* ObjectCacheServer is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* ObjectCacheServer is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System.Collections.Generic; + +namespace VNLib.Data.Caching.ObjectCache.Server.Clustering +{ + /// + /// Represents a monitor for peer cache servers to advertise their presence + /// in the cluster + /// + internal interface IPeerMonitor + { + /// + /// Notifies the monitor that a remote peer has connected to the current node for + /// replication + /// + /// The peer that connected + void OnPeerConnected(ICachePeer peer); + + /// + /// Notifies the monitor that a peer has disconnected + /// + /// The peer that has disconnected + void OnPeerDisconnected(ICachePeer peer); + + /// + /// Gets an enumerable of all peers currently connected to this node + /// + /// The collection of all connected peers + IEnumerable GetAllPeers(); + } +} diff --git a/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs b/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs new file mode 100644 index 0000000..f132cab --- /dev/null +++ b/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs @@ -0,0 +1,268 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: ObjectCacheServer +* File: PeerDiscoveryManager.cs +* +* PeerDiscoveryManager.cs is part of ObjectCacheServer which is part of the larger +* VNLib collection of libraries and utilities. +* +* ObjectCacheServer is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* ObjectCacheServer is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Linq; +using System.Net.Http; +using System.Threading; +using System.Net.Sockets; +using System.Threading.Tasks; +using System.Collections.Generic; + +using VNLib.Utils.Logging; +using VNLib.Data.Caching.Extensions; +using VNLib.Plugins; +using VNLib.Plugins.Extensions.Loading; +using VNLib.Data.Caching.Extensions.Clustering; + +namespace VNLib.Data.Caching.ObjectCache.Server.Clustering +{ + + /* + * This class is responsible for resolving and discovering peer nodes in the cluster network. + */ + + internal sealed class PeerDiscoveryManager : IAsyncBackgroundWork, ICachePeerAdapter + { + private const string LOG_SCOPE_NAME = "DISC"; + private static readonly TimeSpan InitialDelay = TimeSpan.FromSeconds(15); + + private readonly List _connectedPeers; + private readonly NodeConfig _config; + private readonly CachePeerMonitor _monitor; + private readonly bool IsDebug; + private readonly ILogProvider _log; + + public PeerDiscoveryManager(PluginBase plugin) + { + //Get config + _config = plugin.GetOrCreateSingleton(); + + //Get the known peers array from config, its allowed to be null for master nodes + IConfigScope? config = plugin.TryGetConfig("known_peers"); + string[] kownPeers = config?.Deserialze() ?? Array.Empty(); + + //Add known peers to the monitor + _config.Config.WithInitialPeers(kownPeers.Select(static s => new Uri(s))); + + plugin.Log.Information("Inital peer nodes: {nodes}", kownPeers); + + //Get the peer monitor + _monitor = plugin.GetOrCreateSingleton(); + + _connectedPeers = new(); + + //Create scoped logger + _log = plugin.Log.CreateScope(LOG_SCOPE_NAME); + + //Setup discovery error handler + _config.Config.WithErrorHandler(new ErrorHandler(_log)); + + IsDebug = plugin.IsDebug(); + } + + async Task IAsyncBackgroundWork.DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken) + { + /* + * This loop uses the peer monitor to keep track of all connected peers, then gets + * all the advertising peers (including known peers) and resolves all nodes across + * the network. + */ + + //Start the change listener + Task watcher = WatchForPeersAsync(exitToken); + + _log.Information("Node discovery worker started, waiting for {idel} to start initial discovery", InitialDelay); + + try + { + //Wait for the initial delay + await Task.Delay(InitialDelay, exitToken); + + _log.Debug("Begining discovery loop"); + + /* + * To avoid connecting to ourself, we add ourselves to the connected list + * and it should never get removed. This is because the monitor will never + * report our own advertisment. + */ + _connectedPeers.Add(_config.Config.Advertisment); + + while (true) + { + try + { + if (IsDebug) + { + _log.Debug("Begining node discovery"); + } + + //Resolve all known peers + CacheNodeAdvertisment[] wellKnown = await _config.Config.ResolveWellKnownAsync(exitToken); + + //Use the monitor to get the initial peers + IEnumerable ads = GetMonitorAds(); + + //Combine well-known with new connected peers + CacheNodeAdvertisment[] allAds = ads.Union(wellKnown).ToArray(); + + if (allAds.Length > 0) + { + //Discover all kown nodes + await _config.Config.DiscoverNodesAsync(allAds, exitToken); + } + + //Log the discovered nodes if verbose logging is enabled + if (IsDebug) + { + CacheNodeAdvertisment[] found = _config.Config.NodeCollection.GetAllNodes(); + + _log.Debug("Discovered {count} nodes\n\t{nodes}", found.Length, found.Select(static s => s.NodeId)); + } + } + catch(OperationCanceledException) + { + throw; + } + catch (Exception ex) + { + _log.Error(ex, "Failed to discover new peer nodes"); + } + + //Delay the next discovery + await Task.Delay(_config.DiscoveryInterval, exitToken); + } + } + catch (OperationCanceledException) + { + //Normal exit + _log.Information("Node discovery worker exiting"); + } + finally + { + + } + + //Wait for the watcher to exit + await watcher.ConfigureAwait(false); + } + + private IEnumerable GetMonitorAds() + { + return _monitor.GetAllPeers() + .Where(static p => p.Advertisment != null) + //Without us + .Where(n => n.NodeId != _config.Config.NodeId) + .Select(static p => p.Advertisment!); + } + + //Wait for new peers and update the collection + private async Task WatchForPeersAsync(CancellationToken cancellation) + { + try + { + _log.Debug("Discovery worker waiting for new peers to connect"); + + while (true) + { + + //Wait for changes, then get new peers + await _monitor.WaitForChangeAsync().WaitAsync(cancellation); + + _log.Verbose("New peers connected"); + + //Use the monitor to get the initial peers + IEnumerable ads = GetMonitorAds(); + + ((NodeDiscoveryCollection)_config.Config.NodeCollection).AddManualNodes(ads); + } + } + catch (OperationCanceledException) + { + //Normal ext + _log.Debug("Connected peer listener exited"); + } + } + + + /// + public CacheNodeAdvertisment[] GetNewPeers() + { + lock (_connectedPeers) + { + //Get all discovered peers + CacheNodeAdvertisment[] peers = _config.Config.NodeCollection.GetAllNodes(); + + //Get the difference between the discovered peers and the connected peers + return peers.Except(_connectedPeers).ToArray(); + } + } + + /// + public void OnPeerListenerAttached(CacheNodeAdvertisment peer) + { + lock (_connectedPeers) + { + //Add to connected peers + _connectedPeers.Add(peer); + } + } + + /// + public void OnPeerListenerDetatched(CacheNodeAdvertisment peer) + { + //remove from connected peers + lock (_connectedPeers) + { + _connectedPeers.Remove(peer); + } + } + + + private sealed record class ErrorHandler(ILogProvider Logger) : ICacheDiscoveryErrorHandler + { + public void OnDiscoveryError(CacheNodeAdvertisment errorNode, Exception ex) + { + + if (ex is HttpRequestException hre) + { + if (hre.InnerException is SocketException se) + { + //traisnport failed + Logger.Warn("Failed to connect to server {serv} because {err}", errorNode, se.Message); + } + else + { + Logger.Error("Failed to connect to node {n}\n{err}", errorNode, hre); + } + } + else + { + Logger.Error("Failed to discover nodes from nodeid {nid}, with error\n{err}", errorNode, ex); + } + + } + } + } +} diff --git a/plugins/ObjectCacheServer/src/Distribution/CacheNodeReplicationMaanger.cs b/plugins/ObjectCacheServer/src/Distribution/CacheNodeReplicationMaanger.cs deleted file mode 100644 index a55e8e2..0000000 --- a/plugins/ObjectCacheServer/src/Distribution/CacheNodeReplicationMaanger.cs +++ /dev/null @@ -1,250 +0,0 @@ -/* -* Copyright (c) 2023 Vaughn Nugent -* -* Library: VNLib -* Package: ObjectCacheServer -* File: ObjectCacheServerEntry.cs -* -* ObjectCacheServerEntry.cs is part of ObjectCacheServer which is part of the larger -* VNLib collection of libraries and utilities. -* -* ObjectCacheServer is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License as -* published by the Free Software Foundation, either version 3 of the -* License, or (at your option) any later version. -* -* ObjectCacheServer is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see https://www.gnu.org/licenses/. -*/ - -using System; -using System.Linq; -using System.Threading; -using System.Threading.Tasks; -using System.Collections.Generic; - -using VNLib.Plugins; -using VNLib.Utils.Logging; -using VNLib.Data.Caching.Extensions; -using static VNLib.Data.Caching.Constants; -using VNLib.Net.Messaging.FBM; -using VNLib.Net.Messaging.FBM.Client; -using VNLib.Plugins.Extensions.Loading; - -namespace VNLib.Data.Caching.ObjectCache.Server.Distribution -{ - internal sealed class CacheNodeReplicationMaanger : IAsyncBackgroundWork - { - private static readonly TimeSpan GetItemTimeout = TimeSpan.FromSeconds(10); - - private readonly NodeConfig NodeConfig; - private readonly ICachePeerAdapter PeerAdapter; - private readonly ICacheStore CacheStore; - private readonly FBMClientConfig ClientConfig; - private readonly PluginBase Plugin; - - private CacheNodeConfiguration CacheConfig => NodeConfig.Config; - - public CacheNodeReplicationMaanger(PluginBase plugin) - { - //Load the node config - NodeConfig = plugin.GetOrCreateSingleton(); - - //Get peer adapter - PeerAdapter = plugin.GetOrCreateSingleton(); - - CacheStore = plugin.GetOrCreateSingleton(); - } - - public async Task DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken) - { - pluginLog.Information("[REPL] Initializing node replication worker"); - - try - { - while (true) - { - //Get all new peers - ICacheNodeAdvertisment[] peers = PeerAdapter.GetNewPeers(); - - if (peers.Length == 0) - { - pluginLog.Verbose("[REPL] No new peers to connect to"); - } - - //Connect to each peer as a background task - foreach (ICacheNodeAdvertisment peer in peers) - { - _ = Plugin.ObserveWork(() => OnNewPeerDoWorkAsync(peer, pluginLog, exitToken)); - } - - //Wait for a new peers - await Task.Delay(10000, exitToken); - } - } - catch (OperationCanceledException) - { - //Normal exit - } - catch - { - pluginLog.Error("[REPL] Node replication worker exited with an error"); - throw; - } - finally - { - - } - - pluginLog.Information("[REPL] Node replication worker exited"); - } - - private async Task OnNewPeerDoWorkAsync(ICacheNodeAdvertisment newPeer, ILogProvider log, CancellationToken exitToken) - { - _ = newPeer ?? throw new ArgumentNullException(nameof(newPeer)); - - //Setup client - FBMClient client = new(ClientConfig); - - //Add peer to monitor - PeerAdapter.OnPeerListenerAttached(newPeer); - - try - { - log.Information("Establishing replication connection to peer {server}...", newPeer.NodeId); - - //Connect to the server - await client.ConnectToCacheAsync(newPeer, CacheConfig, exitToken); - - log.Information("Connected to {server}, starting queue listeners", newPeer.NodeId); - - //Start worker tasks - List workerTasks = new(); - - for (int i = 0; i < Environment.ProcessorCount; i++) - { - Task workerTask = Task.Run(() => ReplicationWorkerDoWorkAsync(client, log, exitToken), exitToken); - - workerTasks.Add(workerTask); - } - - //Wait for sync workers to exit - await Task.WhenAll(workerTasks); - - log.Debug("All cache worker tasks exited successfully, disconnecting from {server}", newPeer.NodeId); - - //Disconnect client gracefully - await client.DisconnectAsync(CancellationToken.None); - } - catch (InvalidResponseException ie) - { - //See if the plugin is unloading - if (!exitToken.IsCancellationRequested) - { - log.Debug("Peer {p} responded with invalid response packet, disconnected. reason\n {reason}", newPeer.NodeId, ie); - } - //Disconnect client gracefully - try - { - await client.DisconnectAsync(CancellationToken.None); - } - catch (Exception ex) - { - log.Error(ex); - } - } - catch (OperationCanceledException) - { - //Plugin unloading, Try to disconnect - try - { - await client.DisconnectAsync(CancellationToken.None); - } - catch (Exception ex) - { - log.Error(ex); - } - } - catch (Exception ex) - { - log.Warn("Lost connection to peer {h}\n {m}", newPeer.NodeId, ex); - } - finally - { - client.Dispose(); - - //Notify monitor of disconnect - PeerAdapter.OnPeerListenerDetatched(newPeer); - } - } - - //Wroker task callback method - private async Task ReplicationWorkerDoWorkAsync(FBMClient client, ILogProvider log, CancellationToken exitToken) - { - //Listen for changes - while (true) - { - //Wait for changes - WaitForChangeResult changedObject = await client.WaitForChangeAsync(exitToken); - - log.Debug("Object changed {typ} {obj}", changedObject.Status, changedObject.CurrentId); - - switch (changedObject.Status) - { - case ResponseCodes.NotFound: - log.Warn("Server cache not properly configured, worker exiting"); - return; - case "deleted": - //Delete the object from the store - await CacheStore.DeleteItemAsync(changedObject.CurrentId, CancellationToken.None); - break; - case "modified": - //Reload the record from the store - await UpdateRecordAsync(client, log, changedObject.CurrentId, changedObject.NewId, exitToken); - break; - } - } - } - - private async Task UpdateRecordAsync(FBMClient client, ILogProvider log, string objectId, string newId, CancellationToken cancellation) - { - //Get request message - FBMRequest modRequest = client.RentRequest(); - try - { - //Set action as get/create - modRequest.WriteHeader(HeaderCommand.Action, Actions.Get); - //Set session-id header - modRequest.WriteHeader(ObjectId, string.IsNullOrWhiteSpace(newId) ? objectId : newId); - - //Make request - using FBMResponse response = await client.SendAsync(modRequest, GetItemTimeout, cancellation); - - response.ThrowIfNotSet(); - - //Check response code - string status = response.Headers.First(static s => s.Header == HeaderCommand.Status).Value.ToString(); - - if (ResponseCodes.Okay.Equals(status, StringComparison.Ordinal)) - { - //Update the record - await CacheStore.AddOrUpdateBlobAsync(objectId, newId, static (t) => t.ResponseBody, response, cancellation); - log.Debug("Updated object {id}", objectId); - } - else - { - log.Warn("Object {id} was missing on the remote server", objectId); - } - } - finally - { - client.ReturnRequest(modRequest); - } - } - } -} diff --git a/plugins/ObjectCacheServer/src/Distribution/CachePeerMonitor.cs b/plugins/ObjectCacheServer/src/Distribution/CachePeerMonitor.cs deleted file mode 100644 index f191c9d..0000000 --- a/plugins/ObjectCacheServer/src/Distribution/CachePeerMonitor.cs +++ /dev/null @@ -1,70 +0,0 @@ -/* -* Copyright (c) 2023 Vaughn Nugent -* -* Library: VNLib -* Package: ObjectCacheServer -* File: CachePeerMonitor.cs -* -* CachePeerMonitor.cs is part of ObjectCacheServer which is part of the larger -* VNLib collection of libraries and utilities. -* -* ObjectCacheServer is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License as -* published by the Free Software Foundation, either version 3 of the -* License, or (at your option) any later version. -* -* ObjectCacheServer is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see https://www.gnu.org/licenses/. -*/ - -using System.Linq; -using System.Collections.Generic; - -using VNLib.Plugins; - -namespace VNLib.Data.Caching.ObjectCache.Server.Distribution -{ - - internal sealed class CachePeerMonitor : IPeerMonitor - { - - private readonly LinkedList peers = new(); - - public CachePeerMonitor(PluginBase plugin) - { } - - /// - public IEnumerable GetAllPeers() - { - lock(peers) - { - return peers.ToArray(); - } - } - - /// - public void OnPeerConnected(ICachePeer peer) - { - //When a peer is connected we can add it to the list so the replication manager can see it - lock(peers) - { - peers.AddLast(peer); - } - } - - /// - public void OnPeerDisconnected(ICachePeer peer) - { - //When a peer is disconnected we can remove it from the list - lock(peers) - { - peers.Remove(peer); - } - } - } -} diff --git a/plugins/ObjectCacheServer/src/Distribution/ICachePeerAdapter.cs b/plugins/ObjectCacheServer/src/Distribution/ICachePeerAdapter.cs deleted file mode 100644 index c3fb022..0000000 --- a/plugins/ObjectCacheServer/src/Distribution/ICachePeerAdapter.cs +++ /dev/null @@ -1,49 +0,0 @@ -/* -* Copyright (c) 2023 Vaughn Nugent -* -* Library: VNLib -* Package: ObjectCacheServer -* File: ICachePeerAdapter.cs -* -* ICachePeerAdapter.cs is part of ObjectCacheServer which is part of the larger -* VNLib collection of libraries and utilities. -* -* ObjectCacheServer is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License as -* published by the Free Software Foundation, either version 3 of the -* License, or (at your option) any later version. -* -* ObjectCacheServer is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see https://www.gnu.org/licenses/. -*/ - -using VNLib.Data.Caching.Extensions; - -namespace VNLib.Data.Caching.ObjectCache.Server.Distribution -{ - internal interface ICachePeerAdapter - { - /// - /// Gets the peers that have been discovered but not yet connected to - /// - /// A collection of peers that have not been connected to yet - ICacheNodeAdvertisment[] GetNewPeers(); - - /// - /// Called when a peer has been connected to - /// - /// The peer that has been connected - void OnPeerListenerAttached(ICacheNodeAdvertisment peer); - - /// - /// Called when a peer has been disconnected from - /// - /// The disconnected peer - void OnPeerListenerDetatched(ICacheNodeAdvertisment peer); - } -} diff --git a/plugins/ObjectCacheServer/src/Distribution/IPeerMonitor.cs b/plugins/ObjectCacheServer/src/Distribution/IPeerMonitor.cs deleted file mode 100644 index 028171f..0000000 --- a/plugins/ObjectCacheServer/src/Distribution/IPeerMonitor.cs +++ /dev/null @@ -1,53 +0,0 @@ -/* -* Copyright (c) 2023 Vaughn Nugent -* -* Library: VNLib -* Package: ObjectCacheServer -* File: IPeerMonitor.cs -* -* IPeerMonitor.cs is part of ObjectCacheServer which is part of the larger -* VNLib collection of libraries and utilities. -* -* ObjectCacheServer is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License as -* published by the Free Software Foundation, either version 3 of the -* License, or (at your option) any later version. -* -* ObjectCacheServer is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see https://www.gnu.org/licenses/. -*/ - -using System.Collections.Generic; - -namespace VNLib.Data.Caching.ObjectCache.Server.Distribution -{ - /// - /// Represents a monitor for peer cache servers to advertise their presence - /// in the cluster - /// - internal interface IPeerMonitor - { - /// - /// Notifies the monitor that a peer has connected to the cluster - /// - /// The peer that connected - void OnPeerConnected(ICachePeer peer); - - /// - /// Notifies the monitor that a peer has disconnected - /// - /// The peer that has disconnected - void OnPeerDisconnected(ICachePeer peer); - - /// - /// Gets an enumerable of all peers currently connected to this node - /// - /// The collection of all connected peers - IEnumerable GetAllPeers(); - } -} diff --git a/plugins/ObjectCacheServer/src/Distribution/KnownPeerList.cs b/plugins/ObjectCacheServer/src/Distribution/KnownPeerList.cs deleted file mode 100644 index 74df81f..0000000 --- a/plugins/ObjectCacheServer/src/Distribution/KnownPeerList.cs +++ /dev/null @@ -1,100 +0,0 @@ -/* -* Copyright (c) 2023 Vaughn Nugent -* -* Library: VNLib -* Package: ObjectCacheServer -* File: KnownPeerList.cs -* -* KnownPeerList.cs is part of ObjectCacheServer which is part of the larger -* VNLib collection of libraries and utilities. -* -* ObjectCacheServer is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License as -* published by the Free Software Foundation, either version 3 of the -* License, or (at your option) any later version. -* -* ObjectCacheServer is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see https://www.gnu.org/licenses/. -*/ - -using System; -using System.Linq; -using System.Collections.Generic; -using System.Text.Json.Serialization; - -using VNLib.Plugins; -using VNLib.Data.Caching.Extensions; -using VNLib.Plugins.Extensions.Loading; - - -namespace VNLib.Data.Caching.ObjectCache.Server.Distribution -{ - [ConfigurationName("known_peers")] - internal sealed class KnownPeerList - { - private readonly List _peers; - - public KnownPeerList(PluginBase plugin, IConfigScope config) - { - //Deserialze the known peers into an array - KnownPeer[] peers = config.Deserialze(); - - foreach (KnownPeer peer in peers) - { - //Validate the peer - peer.Validate(); - } - - _peers = peers?.ToList() ?? new(); - } - - public IEnumerable GetPeers() - { - return _peers; - } - - private sealed class KnownPeer : ICacheNodeAdvertisment - { - public Uri? ConnectEndpoint { get; set; } - public Uri? DiscoveryEndpoint { get; set; } - - [JsonPropertyName("node_id")] - public string NodeId { get; set; } - - [JsonPropertyName("connect_url")] - public string? ConnectEpPath - { - get => ConnectEndpoint?.ToString() ?? string.Empty; - set => ConnectEndpoint = new Uri(value ?? string.Empty); - } - - [JsonPropertyName("discovery_url")] - public string? DiscoveryEpPath - { - get => DiscoveryEndpoint?.ToString() ?? string.Empty; - set => DiscoveryEndpoint = new Uri(value ?? string.Empty); - } - - public void Validate() - { - if (string.IsNullOrWhiteSpace(NodeId)) - { - throw new ArgumentException("Node ID cannot be null or whitespace"); - } - if (ConnectEndpoint is null) - { - throw new ArgumentException("Connect endpoint cannot be null"); - } - if (DiscoveryEndpoint is null) - { - throw new ArgumentException("Discovery endpoint cannot be null"); - } - } - } - } -} diff --git a/plugins/ObjectCacheServer/src/Distribution/PeerDiscoveryManager.cs b/plugins/ObjectCacheServer/src/Distribution/PeerDiscoveryManager.cs deleted file mode 100644 index 26ec565..0000000 --- a/plugins/ObjectCacheServer/src/Distribution/PeerDiscoveryManager.cs +++ /dev/null @@ -1,161 +0,0 @@ -/* -* Copyright (c) 2023 Vaughn Nugent -* -* Library: VNLib -* Package: ObjectCacheServer -* File: PeerDiscoveryManager.cs -* -* PeerDiscoveryManager.cs is part of ObjectCacheServer which is part of the larger -* VNLib collection of libraries and utilities. -* -* ObjectCacheServer is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License as -* published by the Free Software Foundation, either version 3 of the -* License, or (at your option) any later version. -* -* ObjectCacheServer is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see https://www.gnu.org/licenses/. -*/ - -using System; -using System.Linq; -using System.Threading; -using System.Threading.Tasks; -using System.Collections.Generic; - -using VNLib.Plugins; -using VNLib.Utils.Logging; -using VNLib.Data.Caching.Extensions; -using VNLib.Plugins.Extensions.Loading; - - -namespace VNLib.Data.Caching.ObjectCache.Server.Distribution -{ - - sealed class PeerDiscoveryManager : IAsyncBackgroundWork, ICachePeerAdapter - { - private readonly List _connectedPeers; - private readonly NodeConfig _config; - private readonly IPeerMonitor _monitor; - private readonly KnownPeerList _knownPeers; - - public PeerDiscoveryManager(PluginBase plugin) - { - //Get config - _config = plugin.GetOrCreateSingleton(); - - //Get the peer monitor - _monitor = plugin.GetOrCreateSingleton(); - - //Get the known peer list - _knownPeers = plugin.GetOrCreateSingleton(); - - _connectedPeers = new(); - - //Setup discovery error handler - _config.Config.WithErrorHandler(new ErrorHandler(plugin.Log)); - } - - async Task IAsyncBackgroundWork.DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken) - { - /* - * This loop uses the peer monitor to keep track of all connected peers, then gets - * all the advertising peers (including known peers) and resolves all nodes across - * the network. - */ - - pluginLog.Information("Node discovery worker started"); - - try - { - while (true) - { - try - { - //Use the monitor to get the initial peers - IEnumerable ads = _monitor.GetAllPeers() - .Where(static p => p.Advertisment != null) - .Select(static p => p.Advertisment!); - - //Add known peers to the initial list - ads = ads.Union(_knownPeers.GetPeers()); - - //Set initial peers - _config.Config.WithInitialPeers(ads); - - //Discover all nodes - await _config.Config.DiscoverNodesAsync(exitToken); - } - catch(OperationCanceledException) - { - throw; - } - catch (Exception ex) - { - pluginLog.Error(ex, "Failed to discover new peer nodes"); - } - - //Delay the next discovery - await Task.Delay(_config.DiscoveryInterval, exitToken); - } - } - catch (OperationCanceledException) - { - //Normal exit - pluginLog.Information("Node discovery worker exiting"); - } - finally - { - - } - } - - - /// - public ICacheNodeAdvertisment[] GetNewPeers() - { - lock (_connectedPeers) - { - //Get all discovered peers - ICacheNodeAdvertisment[] peers = _config.Config.NodeCollection.GetAllNodes(); - - //Get the difference between the discovered peers and the connected peers - return peers.Except(_connectedPeers).ToArray(); - } - } - - /// - public void OnPeerListenerAttached(ICacheNodeAdvertisment peer) - { - lock (_connectedPeers) - { - //Add to connected peers - _connectedPeers.Add(peer); - } - } - - /// - public void OnPeerListenerDetatched(ICacheNodeAdvertisment peer) - { - //remove from connected peers - lock (_connectedPeers) - { - _connectedPeers.Remove(peer); - } - } - - - private sealed record class ErrorHandler(ILogProvider Logger) : ICacheDiscoveryErrorHandler - { - public void OnDiscoveryError(ICacheNodeAdvertisment errorNode, Exception ex) - { - Logger.Error("Failed to discover nodes from nodeid {nid}, with error\n{err}", errorNode, ex); - } - } - } -} diff --git a/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs b/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs index 8352635..5e794f8 100644 --- a/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs +++ b/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs @@ -46,7 +46,7 @@ using VNLib.Plugins.Essentials.Endpoints; using VNLib.Plugins.Essentials.Extensions; using VNLib.Plugins.Extensions.Loading.Routing; using VNLib.Data.Caching.ObjectCache.Server.Distribution; - +using VNLib.Data.Caching.Extensions.Clustering; namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints { @@ -54,6 +54,8 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints [ConfigurationName("connect_endpoint")] internal sealed class ConnectEndpoint : ResourceEndpointBase { + private const string LOG_SCOPE_NAME = "CONEP"; + private static readonly TimeSpan AuthTokenExpiration = TimeSpan.FromSeconds(30); @@ -90,7 +92,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints { string? path = config["path"].GetString(); - InitPathAndLog(path, plugin.Log); + InitPathAndLog(path, plugin.Log.CreateScope(LOG_SCOPE_NAME)); //Check for ip-verification flag VerifyIp = config.TryGetValue("verify_ip", out JsonElement vIpEl) && vIpEl.GetBoolean(); @@ -152,28 +154,24 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints // Parse jwt using (JsonWebToken jwt = JsonWebToken.Parse(jwtAuth)) { - bool verified = false; - //verify signature for client - if (NodeConfiguration.KeyStore.VerifyJwt(jwt)) + if (NodeConfiguration.KeyStore.VerifyJwt(jwt, false)) { - verified = true; + //Validated } //May be signed by a cache server - else + else if(NodeConfiguration.KeyStore.VerifyJwt(jwt, true)) { //Set peer and verified flag since the another cache server signed the request - isPeer = verified = NodeConfiguration.KeyStore.VerifyCachePeer(jwt); + isPeer = true; } - - //Check flag - if (!verified) + else { Log.Information("Client signature verification failed"); entity.CloseResponse(HttpStatusCode.Unauthorized); return VfReturnType.VirtualSkip; } - + //Recover json body using JsonDocument doc = jwt.GetPayload(); @@ -196,6 +194,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints auth.WriteHeader(NodeConfiguration.KeyStore.GetJwtHeader()); auth.InitPayloadClaim() .AddClaim("aud", AudienceLocalServerId) + .AddClaim("iat", entity.RequestedTimeUtc.ToUnixTimeSeconds()) .AddClaim("exp", entity.RequestedTimeUtc.Add(AuthTokenExpiration).ToUnixTimeSeconds()) .AddClaim("nonce", RandomHash.GetRandomBase32(8)) .AddClaim("chl", challenge!) @@ -240,7 +239,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints } string? nodeId = null; - ICacheNodeAdvertisment? discoveryAd = null; + CacheNodeAdvertisment? discoveryAd = null; //Parse jwt using (JsonWebToken jwt = JsonWebToken.Parse(jwtAuth)) @@ -299,15 +298,15 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints //Verify the signature the client included of the auth token - if (isPeer) + //Verify token signature against a fellow cache public key + if (!NodeConfiguration.KeyStore.VerifyUpgradeToken(clientSignature, jwtAuth, isPeer)) { - //Verify token signature against a fellow cache public key - if (!NodeConfiguration.KeyStore.VerifyUpgradeToken(clientSignature, jwtAuth)) - { - entity.CloseResponse(HttpStatusCode.Unauthorized); - return VfReturnType.VirtualSkip; - } + entity.CloseResponse(HttpStatusCode.Unauthorized); + return VfReturnType.VirtualSkip; + } + if (isPeer) + { //Try to get the node advertisement header string? discoveryHeader = entity.Server.Headers[FBMDataCacheExtensions.X_NODE_DISCOVERY_HEADER]; @@ -317,15 +316,6 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints discoveryAd = NodeConfiguration.KeyStore.VerifyPeerAdvertisment(discoveryHeader); } } - else - { - //Not a peer, so verify against the client's public key - if (!NodeConfiguration.KeyStore.VerifyUpgradeToken(clientSignature, jwtAuth)) - { - entity.CloseResponse(HttpStatusCode.Unauthorized); - return VfReturnType.VirtualSkip; - } - } } try @@ -389,12 +379,12 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints //Notify peers of new connection Peers.OnPeerConnected(state); - //Inc connected count - Interlocked.Increment(ref _connectedClients); - //Register plugin exit token to cancel the connected socket CancellationTokenRegistration reg = this.GetPlugin().UnloadToken.Register(wss.CancelAll); + //Inc connected count + Interlocked.Increment(ref _connectedClients); + try { //Init listener args from request @@ -442,14 +432,12 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints { Log.Debug(ex); } - finally - { - //Dec connected count - Interlocked.Decrement(ref _connectedClients); - //Unregister the - reg.Unregister(); - } + //Dec connected count + Interlocked.Decrement(ref _connectedClients); + + //Unregister the token + reg.Unregister(); //Notify monitor of disconnect Peers.OnPeerDisconnected(state); @@ -465,12 +453,13 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints public int MaxMessageSize { get; init; } public int MaxResponseBufferSize { get; init; } public string? NodeId { get; init; } - public ICacheNodeAdvertisment? Advertisment { get; init; } + public CacheNodeAdvertisment? Advertisment { get; init; } public override string ToString() { return - $"{nameof(RecvBufferSize)}:{RecvBufferSize}, {nameof(MaxHeaderBufferSize)}: {MaxHeaderBufferSize}, {nameof(MaxMessageSize)}:{MaxMessageSize}, {nameof(MaxResponseBufferSize)}:{MaxResponseBufferSize}"; + $"{nameof(RecvBufferSize)}:{RecvBufferSize}, {nameof(MaxHeaderBufferSize)}: {MaxHeaderBufferSize}, " + + $"{nameof(MaxMessageSize)}:{MaxMessageSize}, {nameof(MaxResponseBufferSize)}:{MaxResponseBufferSize}"; } } } diff --git a/plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs b/plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs index 90ffca0..77d59dd 100644 --- a/plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs +++ b/plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs @@ -26,7 +26,6 @@ using System; using System.Net; using System.Linq; using System.Text.Json; -using System.Threading.Tasks; using VNLib.Hashing; using VNLib.Hashing.IdentityUtility; @@ -35,17 +34,25 @@ using VNLib.Plugins.Essentials; using VNLib.Plugins.Essentials.Endpoints; using VNLib.Plugins.Essentials.Extensions; using VNLib.Plugins.Extensions.Loading; -using VNLib.Data.Caching.Extensions; using VNLib.Data.Caching.ObjectCache.Server.Distribution; +using VNLib.Data.Caching.Extensions.Clustering; namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints { [ConfigurationName("discovery_endpoint")] - internal sealed class PeerDiscoveryEndpoint : UnprotectedWebEndpoint + internal sealed class PeerDiscoveryEndpoint : ResourceEndpointBase { private readonly IPeerMonitor PeerMonitor; private readonly NodeConfig Config; + //Loosen up protection settings + /// + protected override ProtectionSettings EndpointProtectionSettings { get; } = new() + { + DisableBrowsersOnly = true, + DisableSessionsRequired = true + }; + public PeerDiscoveryEndpoint(PluginBase plugin, IConfigScope config) { string? path = config["path"].GetString(); @@ -59,7 +66,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints Config = plugin.GetOrCreateSingleton(); } - protected override async ValueTask GetAsync(HttpEntity entity) + protected override VfReturnType Get(HttpEntity entity) { //Get auth token string? authToken = entity.Server.Headers[HttpRequestHeader.Authorization]; @@ -71,17 +78,18 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints } string subject = string.Empty; + string challenge = string.Empty; //Parse auth token using(JsonWebToken jwt = JsonWebToken.Parse(authToken)) { //try to verify against cache node first - if (!Config.KeyStore.VerifyCachePeer(jwt)) + if (!Config.KeyStore.VerifyJwt(jwt, true)) { //failed... //try to verify against client key - if (!Config.KeyStore.VerifyJwt(jwt)) + if (!Config.KeyStore.VerifyJwt(jwt, false)) { //invalid token entity.CloseResponse(HttpStatusCode.Unauthorized); @@ -90,12 +98,14 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints } using JsonDocument payload = jwt.GetPayload(); - + + //Get client info to pass back subject = payload.RootElement.GetProperty("sub").GetString() ?? string.Empty; + challenge = payload.RootElement.GetProperty("chl").GetString() ?? string.Empty; } //Valid key, get peer list to send to client - ICacheNodeAdvertisment[] peers = PeerMonitor.GetAllPeers() + CacheNodeAdvertisment[] peers = PeerMonitor.GetAllPeers() .Where(static p => p.Advertisment != null) .Select(static p => p.Advertisment!) .ToArray(); @@ -113,7 +123,8 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints .AddClaim("iat", entity.RequestedTimeUtc.ToUnixTimeSeconds()) //Send all peers as a json array .AddClaim("peers", peers) - .AddClaim("nonce", RandomHash.GetRandomBase32(24)) + //Send the challenge back + .AddClaim("chl", challenge) .CommitClaims(); //Sign the response diff --git a/plugins/ObjectCacheServer/src/Endpoints/WellKnownEndpoint.cs b/plugins/ObjectCacheServer/src/Endpoints/WellKnownEndpoint.cs new file mode 100644 index 0000000..99c7f19 --- /dev/null +++ b/plugins/ObjectCacheServer/src/Endpoints/WellKnownEndpoint.cs @@ -0,0 +1,111 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: ObjectCacheServer +* File: WellKnownEndpoint.cs +* +* WellKnownEndpoint.cs is part of ObjectCacheServer which is part of the larger +* VNLib collection of libraries and utilities. +* +* ObjectCacheServer is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* ObjectCacheServer is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Net; +using System.Text.Json; + +using VNLib.Data.Caching.Extensions; +using VNLib.Data.Caching.Extensions.Clustering; +using VNLib.Hashing; +using VNLib.Hashing.IdentityUtility; +using VNLib.Plugins; +using VNLib.Plugins.Essentials; +using VNLib.Plugins.Essentials.Endpoints; +using VNLib.Plugins.Essentials.Extensions; +using VNLib.Plugins.Extensions.Loading; + +namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints +{ + + /* + * The well-known endpoint is used to advertise the node's existence to + * the network. Clients need to know the endpoint layout to be able to + * connect and discover other nodes. + */ + + [ConfigurationName("well_known", Required = false)] + internal sealed class WellKnownEndpoint : ResourceEndpointBase + { + //Default path for the well known endpoint + const string DefaultPath = "/.well-known/vncache"; + + //Store serialized advertisment + private readonly CacheNodeAdvertisment _advertisment; + private readonly ICacheAuthManager _keyStore; + + //Loosen up security requirements + protected override ProtectionSettings EndpointProtectionSettings { get; } = new() + { + DisableBrowsersOnly = true, + DisableSessionsRequired = true, + }; + + public WellKnownEndpoint(PluginBase plugin):this(plugin, null) + { } + + public WellKnownEndpoint(PluginBase plugin, IConfigScope? config) + { + //Get the node config + NodeConfig nodeConfig = plugin.GetOrCreateSingleton(); + + //serialize the config, discovery may not be enabled + _advertisment = nodeConfig.Config.Advertisment; + _keyStore = nodeConfig.KeyStore; + + //Default to the well known path + string path = DefaultPath; + + //See if the user configured a path + if(config != null && config.TryGetValue("path", out JsonElement pathEl)) + { + path = pathEl.GetString() ?? DefaultPath; + } + + InitPathAndLog(path, plugin.Log); + } + + protected override VfReturnType Get(HttpEntity entity) + { + string entropy = RandomHash.GetRandomBase32(16); + + //Create jwt signed for the client + using JsonWebToken jwt = new(); + + jwt.WriteHeader(_keyStore.GetJwtHeader()); + //Write the advertisment as the message body + jwt.InitPayloadClaim() + .AddClaim("sub", _advertisment) + .AddClaim("chl", entropy) + .AddClaim("iat", entity.RequestedTimeUtc.ToUnixTimeSeconds()) + .CommitClaims(); + + _keyStore.SignJwt(jwt); + + //Return the advertisment jwt + entity.CloseResponse(HttpStatusCode.OK, Net.Http.ContentType.Text, jwt.DataBuffer); + return VfReturnType.VirtualSkip; + } + } +} diff --git a/plugins/ObjectCacheServer/src/ICachePeer.cs b/plugins/ObjectCacheServer/src/ICachePeer.cs index 97b406f..897655a 100644 --- a/plugins/ObjectCacheServer/src/ICachePeer.cs +++ b/plugins/ObjectCacheServer/src/ICachePeer.cs @@ -22,7 +22,7 @@ * along with this program. If not, see https://www.gnu.org/licenses/. */ -using VNLib.Data.Caching.Extensions; +using VNLib.Data.Caching.Extensions.Clustering; namespace VNLib.Data.Caching.ObjectCache.Server { @@ -39,6 +39,6 @@ namespace VNLib.Data.Caching.ObjectCache.Server /// /// An optional signed advertisment message for other peers /// - ICacheNodeAdvertisment? Advertisment { get; } + CacheNodeAdvertisment? Advertisment { get; } } } diff --git a/plugins/ObjectCacheServer/src/NodeConfig.cs b/plugins/ObjectCacheServer/src/NodeConfig.cs index 81b8a32..a6c5be9 100644 --- a/plugins/ObjectCacheServer/src/NodeConfig.cs +++ b/plugins/ObjectCacheServer/src/NodeConfig.cs @@ -26,36 +26,43 @@ using System; using System.Net; using System.Linq; using System.Text.Json; -using System.Threading.Tasks; using VNLib.Plugins; -using VNLib.Utils; using VNLib.Utils.Logging; -using VNLib.Data.Caching.Extensions; +using VNLib.Utils.Extensions; using VNLib.Plugins.Extensions.Loading; using VNLib.Data.Caching.ObjectCache.Server.Endpoints; +using VNLib.Data.Caching.Extensions.Clustering; namespace VNLib.Data.Caching.ObjectCache.Server { [ConfigurationName("cluster")] - internal sealed class NodeConfig : VnDisposeable + internal sealed class NodeConfig { const string CacheConfigTemplate = @" - Cluster Configuration: - Node Id: {id} - TlsEndabled: {tls}, - Cache Endpoint: {ep} +Cluster Configuration: + Node Id: {id} + TlsEndabled: {tls} + Cache Endpoint: {ep} + Discovery Endpoint: {dep} + Discovery Interval: {di} + Max Peers: {mpc} "; public CacheNodeConfiguration Config { get; } public CacheAuthKeyStore KeyStore { get; } + public TimeSpan DiscoveryInterval { get; } + + /// + /// The maximum number of peer connections to allow + /// + public uint MaxPeerConnections { get; } = 10; + public NodeConfig(PluginBase plugin, IConfigScope config) - { - //Server id is just dns name for now - string nodeId = Dns.GetHostName(); + { Config = new(); @@ -75,9 +82,13 @@ namespace VNLib.Data.Caching.ObjectCache.Server //If the ssl element is present, ssl is enabled for the server usingTls = firstHost.TryGetProperty("ssl", out _); } + string hostname = Dns.GetHostName(); + + //Server id is just dns name for now + string nodeId = $"{hostname}:{port}"; //The endpoint to advertise to cache clients that allows cache connections - Uri cacheEndpoint = GetEndpointUri(plugin, usingTls, port, nodeId); + Uri cacheEndpoint = GetEndpointUri(plugin, usingTls, port, hostname); //Init key store KeyStore = new(plugin); @@ -89,21 +100,32 @@ namespace VNLib.Data.Caching.ObjectCache.Server .WithTls(usingTls); //Check if advertising is enabled - if(config.TryGetValue("advertise", out JsonElement adEl) && adEl.GetBoolean()) + if(plugin.HasConfigForType()) { //Get the the broadcast endpoint - Uri discoveryEndpoint = GetEndpointUri(plugin, usingTls, port, nodeId); + Uri discoveryEndpoint = GetEndpointUri(plugin, usingTls, port, hostname); //Enable advertising Config.EnableAdvertisment(discoveryEndpoint); } - - + + + DiscoveryInterval = config["discovery_interval_sec"].GetTimeSpan(TimeParseType.Seconds); + + //Get the max peer connections + if(config.TryGetValue("max_peers", out JsonElement maxPeerEl)) + { + MaxPeerConnections = maxPeerEl.GetUInt32(); + } + //log the config plugin.Log.Information(CacheConfigTemplate, nodeId, usingTls, - cacheEndpoint + cacheEndpoint, + Config.DiscoveryEndpoint, + DiscoveryInterval, + MaxPeerConnections ); } @@ -115,12 +137,5 @@ namespace VNLib.Data.Caching.ObjectCache.Server //The endpoint to advertise to cache clients that allows cache connections return new UriBuilder(usingTls ? Uri.UriSchemeHttps : Uri.UriSchemeHttp, hostName, port, cacheEpConfig["path"].GetString()).Uri; } - - - protected override void Free() - { - //cleanup keys - - } } } diff --git a/plugins/ObjectCacheServer/src/ObjectCacheServer.csproj b/plugins/ObjectCacheServer/src/ObjectCacheServer.csproj index e8d6291..5273b64 100644 --- a/plugins/ObjectCacheServer/src/ObjectCacheServer.csproj +++ b/plugins/ObjectCacheServer/src/ObjectCacheServer.csproj @@ -41,4 +41,9 @@ + + + + + diff --git a/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs b/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs index 5d9a50b..1ddf49b 100644 --- a/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs +++ b/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs @@ -33,7 +33,7 @@ using VNLib.Utils.Memory.Diagnostics; using VNLib.Plugins.Extensions.Loading; using VNLib.Plugins.Extensions.Loading.Routing; using VNLib.Data.Caching.ObjectCache.Server.Endpoints; - +using VNLib.Data.Caching.ObjectCache.Server.Distribution; namespace VNLib.Data.Caching.ObjectCache.Server { @@ -76,9 +76,15 @@ namespace VNLib.Data.Caching.ObjectCache.Server { try { + //Route well-known endpoint + this.Route(); + //Init connect endpoint this.Route(); + //We must initialize the replication manager + _ = this.GetOrCreateSingleton(); + //Setup discovery endpoint if(this.HasConfigForType()) { -- cgit