From 2f674e79d42e7d36225fa9ac7ecefbc5bc62d325 Mon Sep 17 00:00:00 2001 From: vnugent Date: Thu, 13 Jul 2023 13:20:25 -0400 Subject: Checkpoint, kind of working clustering --- .../src/Clustering/CacheNodeReplicationMaanger.cs | 290 +++++++++++++++++++++ .../src/Clustering/CachePeerMonitor.cs | 98 +++++++ .../src/Clustering/ICachePeerAdapter.cs | 49 ++++ .../src/Clustering/IPeerMonitor.cs | 54 ++++ .../src/Clustering/PeerDiscoveryManager.cs | 268 +++++++++++++++++++ 5 files changed, 759 insertions(+) create mode 100644 plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs create mode 100644 plugins/ObjectCacheServer/src/Clustering/CachePeerMonitor.cs create mode 100644 plugins/ObjectCacheServer/src/Clustering/ICachePeerAdapter.cs create mode 100644 plugins/ObjectCacheServer/src/Clustering/IPeerMonitor.cs create mode 100644 plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs (limited to 'plugins/ObjectCacheServer/src/Clustering') diff --git a/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs b/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs new file mode 100644 index 0000000..ffdd4f4 --- /dev/null +++ b/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs @@ -0,0 +1,290 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: ObjectCacheServer +* File: CacheNodeReplicationMaanger.cs +* +* CacheNodeReplicationMaanger.cs is part of ObjectCacheServer which is part +* of the larger VNLib collection of libraries and utilities. +* +* ObjectCacheServer is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* ObjectCacheServer is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using System.Collections.Generic; + +using VNLib.Plugins; +using VNLib.Utils.Logging; +using VNLib.Data.Caching.Extensions; +using static VNLib.Data.Caching.Constants; +using VNLib.Net.Messaging.FBM; +using VNLib.Net.Messaging.FBM.Client; +using VNLib.Plugins.Extensions.Loading; +using VNLib.Data.Caching.Extensions.Clustering; + +namespace VNLib.Data.Caching.ObjectCache.Server.Clustering +{ + + /* + * This class is responsible for replicating the cache with other nodes. + * + * It does this by connecting to other nodes and listening for change events. + * When a change event occurs, it takes action against the local cache store, + * to keep it consistent with the other nodes. + * + * Change events are only handled first-hand, meaning that events do not + * propagate to other nodes, they must be connected individually to each node + * and listen for changes. + */ + + internal sealed class CacheNodeReplicationMaanger : IAsyncBackgroundWork + { + private const string LOG_SCOPE_NAME = "REPL"; + + private static readonly TimeSpan GetItemTimeout = TimeSpan.FromSeconds(10); + private const int MAX_MESSAGE_SIZE = 12 * 1024; + + private readonly PluginBase _plugin; + private readonly ILogProvider _log; + private readonly NodeConfig _nodeConfig; + private readonly ICacheStore _cacheStore; + private readonly ICachePeerAdapter _peerAdapter; + private readonly FBMClientConfig _replicationClientConfig; + + private readonly bool _isDebug; + + private int _openConnections; + + public CacheNodeReplicationMaanger(PluginBase plugin) + { + //Load the node config + _nodeConfig = plugin.GetOrCreateSingleton(); + _cacheStore = plugin.GetOrCreateSingleton(); + _peerAdapter = plugin.GetOrCreateSingleton(); + + //Init fbm config with fixed message size + _replicationClientConfig = FBMDataCacheExtensions.GetDefaultConfig( + (plugin as ObjectCacheServerEntry)!.CacheHeap, + MAX_MESSAGE_SIZE, + debugLog: plugin.IsDebug() ? plugin.Log : null + ); + + _plugin = plugin; + _isDebug = plugin.IsDebug(); + _log = plugin.Log.CreateScope(LOG_SCOPE_NAME); + } + + public async Task DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken) + { + _log.Information("Initializing node replication worker"); + + try + { + while (true) + { + //Get all new peers + CacheNodeAdvertisment[] peers = _peerAdapter.GetNewPeers(); + + if (peers.Length == 0 && _isDebug) + { + _log.Verbose("No new peers to connect to"); + } + + //Make sure we don't exceed the max connections + if(_openConnections >= _nodeConfig.MaxPeerConnections) + { + if (_isDebug) + { + _log.Verbose("Max peer connections reached, waiting for a connection to close"); + } + } + else + { + //Connect to each peer as a background task + foreach (CacheNodeAdvertisment peer in peers) + { + _ = _plugin.ObserveWork(() => OnNewPeerDoWorkAsync(peer, _log, exitToken)); + } + } + + //Wait for a new peers + await Task.Delay(10000, exitToken); + } + } + catch (OperationCanceledException) + { + //Normal exit + } + catch + { + _log.Error("Node replication worker exited with an error"); + throw; + } + finally + { + + } + + _log.Information("Node replication worker exited"); + } + + private async Task OnNewPeerDoWorkAsync(CacheNodeAdvertisment newPeer, ILogProvider log, CancellationToken exitToken) + { + _ = newPeer ?? throw new ArgumentNullException(nameof(newPeer)); + + //Setup client + FBMClient client = new(_replicationClientConfig); + + //Add peer to monitor + _peerAdapter.OnPeerListenerAttached(newPeer); + + Interlocked.Increment(ref _openConnections); + + try + { + log.Information("Establishing replication connection to peer {server}...", newPeer.NodeId); + + //Connect to the server + await client.ConnectToCacheAsync(newPeer, _nodeConfig.Config, exitToken); + + log.Information("Connected to {server}, starting queue listeners", newPeer.NodeId); + + //Start worker tasks + List workerTasks = new(); + + for (int i = 0; i < Environment.ProcessorCount; i++) + { + Task workerTask = Task.Run(() => ReplicationWorkerDoWorkAsync(client, log, exitToken), exitToken); + + workerTasks.Add(workerTask); + } + + //Wait for sync workers to exit + await Task.WhenAll(workerTasks); + + log.Debug("All cache worker tasks exited successfully, disconnecting from {server}", newPeer.NodeId); + + //Disconnect client gracefully + await client.DisconnectAsync(CancellationToken.None); + } + catch (InvalidResponseException ie) + { + //See if the plugin is unloading + if (!exitToken.IsCancellationRequested) + { + log.Debug("Peer {p} responded with invalid response packet, disconnected. reason\n {reason}", newPeer.NodeId, ie); + } + //Disconnect client gracefully + try + { + await client.DisconnectAsync(CancellationToken.None); + } + catch (Exception ex) + { + log.Error(ex); + } + } + catch (OperationCanceledException) + { + //Plugin unloading, Try to disconnect + try + { + await client.DisconnectAsync(CancellationToken.None); + } + catch (Exception ex) + { + log.Error(ex); + } + } + catch (Exception ex) + { + log.Warn("Lost connection to peer {h}\n {m}", newPeer.NodeId, ex); + } + finally + { + Interlocked.Decrement(ref _openConnections); + + client.Dispose(); + + //Notify monitor of disconnect + _peerAdapter.OnPeerListenerDetatched(newPeer); + } + } + + //Wroker task callback method + private async Task ReplicationWorkerDoWorkAsync(FBMClient client, ILogProvider log, CancellationToken exitToken) + { + //Reusable request message + using FBMRequest request = new(client.Config); + + //Listen for changes + while (true) + { + //Wait for changes + WaitForChangeResult changedObject = await client.WaitForChangeAsync(exitToken); + + log.Debug("Object changed {typ} {obj}", changedObject.Status, changedObject.CurrentId); + + switch (changedObject.Status) + { + case ResponseCodes.NotFound: + log.Error("Server cache not properly configured, worker exiting"); + return; + case "deleted": + //Delete the object from the store + await _cacheStore.DeleteItemAsync(changedObject.CurrentId, CancellationToken.None); + break; + case "modified": + //Reload the record from the store + await UpdateRecordAsync(client, request, log, changedObject.CurrentId, changedObject.NewId, exitToken); + break; + } + + //Reset request message + request.Reset(); + } + } + + private async Task UpdateRecordAsync(FBMClient client, FBMRequest modRequest, ILogProvider log, string objectId, string newId, CancellationToken cancellation) + { + //Set action as get/create + modRequest.WriteHeader(HeaderCommand.Action, Actions.Get); + //Set session-id header + modRequest.WriteHeader(ObjectId, string.IsNullOrWhiteSpace(newId) ? objectId : newId); + + //Make request + using FBMResponse response = await client.SendAsync(modRequest, GetItemTimeout, cancellation); + + response.ThrowIfNotSet(); + + //Check response code + string status = response.Headers.First(static s => s.Header == HeaderCommand.Status).Value.ToString(); + + if (ResponseCodes.Okay.Equals(status, StringComparison.Ordinal)) + { + //Update the record + await _cacheStore.AddOrUpdateBlobAsync(objectId, newId, static (t) => t.ResponseBody, response, cancellation); + log.Debug("Updated object {id}", objectId); + } + else + { + log.Warn("Object {id} was missing on the remote server", objectId); + } + } + } +} diff --git a/plugins/ObjectCacheServer/src/Clustering/CachePeerMonitor.cs b/plugins/ObjectCacheServer/src/Clustering/CachePeerMonitor.cs new file mode 100644 index 0000000..c49a54b --- /dev/null +++ b/plugins/ObjectCacheServer/src/Clustering/CachePeerMonitor.cs @@ -0,0 +1,98 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: ObjectCacheServer +* File: CachePeerMonitor.cs +* +* CachePeerMonitor.cs is part of ObjectCacheServer which is part of the larger +* VNLib collection of libraries and utilities. +* +* ObjectCacheServer is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* ObjectCacheServer is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using System.Collections.Generic; + +using VNLib.Utils; +using VNLib.Utils.Extensions; +using VNLib.Plugins; + +namespace VNLib.Data.Caching.ObjectCache.Server.Clustering +{ + + internal sealed class CachePeerMonitor : VnDisposeable, IPeerMonitor + { + + private readonly LinkedList peers = new(); + private readonly ManualResetEvent newPeerTrigger = new (false); + + public CachePeerMonitor(PluginBase plugin) + { } + + /// + /// Waits for new peers to connect to the server + /// + /// A task that complets when a new peer has connected + public async Task WaitForChangeAsync() + { + await newPeerTrigger.WaitAsync(); + + //Reset the trigger for next call + newPeerTrigger.Reset(); + } + + /// + public IEnumerable GetAllPeers() + { + lock(peers) + { + return peers.ToArray(); + } + } + + /// + public void OnPeerConnected(ICachePeer peer) + { + //When a peer is connected we can add it to the list so the replication manager can see it + lock(peers) + { + peers.AddLast(peer); + } + + //Trigger monitor when change occurs + if(peer.Advertisment != null) + { + newPeerTrigger.Set(); + } + } + + /// + public void OnPeerDisconnected(ICachePeer peer) + { + //When a peer is disconnected we can remove it from the list + lock(peers) + { + peers.Remove(peer); + } + } + + protected override void Free() + { + newPeerTrigger.Dispose(); + } + } +} diff --git a/plugins/ObjectCacheServer/src/Clustering/ICachePeerAdapter.cs b/plugins/ObjectCacheServer/src/Clustering/ICachePeerAdapter.cs new file mode 100644 index 0000000..dd426f6 --- /dev/null +++ b/plugins/ObjectCacheServer/src/Clustering/ICachePeerAdapter.cs @@ -0,0 +1,49 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: ObjectCacheServer +* File: ICachePeerAdapter.cs +* +* ICachePeerAdapter.cs is part of ObjectCacheServer which is part of the larger +* VNLib collection of libraries and utilities. +* +* ObjectCacheServer is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* ObjectCacheServer is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using VNLib.Data.Caching.Extensions.Clustering; + +namespace VNLib.Data.Caching.ObjectCache.Server.Clustering +{ + internal interface ICachePeerAdapter + { + /// + /// Gets the peers that have been discovered but not yet connected to + /// + /// A collection of peers that have not been connected to yet + CacheNodeAdvertisment[] GetNewPeers(); + + /// + /// Called when a peer has been connected to + /// + /// The peer that has been connected + void OnPeerListenerAttached(CacheNodeAdvertisment peer); + + /// + /// Called when a peer has been disconnected from + /// + /// The disconnected peer + void OnPeerListenerDetatched(CacheNodeAdvertisment peer); + } +} diff --git a/plugins/ObjectCacheServer/src/Clustering/IPeerMonitor.cs b/plugins/ObjectCacheServer/src/Clustering/IPeerMonitor.cs new file mode 100644 index 0000000..d8f358f --- /dev/null +++ b/plugins/ObjectCacheServer/src/Clustering/IPeerMonitor.cs @@ -0,0 +1,54 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: ObjectCacheServer +* File: IPeerMonitor.cs +* +* IPeerMonitor.cs is part of ObjectCacheServer which is part of the larger +* VNLib collection of libraries and utilities. +* +* ObjectCacheServer is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* ObjectCacheServer is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System.Collections.Generic; + +namespace VNLib.Data.Caching.ObjectCache.Server.Clustering +{ + /// + /// Represents a monitor for peer cache servers to advertise their presence + /// in the cluster + /// + internal interface IPeerMonitor + { + /// + /// Notifies the monitor that a remote peer has connected to the current node for + /// replication + /// + /// The peer that connected + void OnPeerConnected(ICachePeer peer); + + /// + /// Notifies the monitor that a peer has disconnected + /// + /// The peer that has disconnected + void OnPeerDisconnected(ICachePeer peer); + + /// + /// Gets an enumerable of all peers currently connected to this node + /// + /// The collection of all connected peers + IEnumerable GetAllPeers(); + } +} diff --git a/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs b/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs new file mode 100644 index 0000000..f132cab --- /dev/null +++ b/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs @@ -0,0 +1,268 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: ObjectCacheServer +* File: PeerDiscoveryManager.cs +* +* PeerDiscoveryManager.cs is part of ObjectCacheServer which is part of the larger +* VNLib collection of libraries and utilities. +* +* ObjectCacheServer is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* ObjectCacheServer is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Linq; +using System.Net.Http; +using System.Threading; +using System.Net.Sockets; +using System.Threading.Tasks; +using System.Collections.Generic; + +using VNLib.Utils.Logging; +using VNLib.Data.Caching.Extensions; +using VNLib.Plugins; +using VNLib.Plugins.Extensions.Loading; +using VNLib.Data.Caching.Extensions.Clustering; + +namespace VNLib.Data.Caching.ObjectCache.Server.Clustering +{ + + /* + * This class is responsible for resolving and discovering peer nodes in the cluster network. + */ + + internal sealed class PeerDiscoveryManager : IAsyncBackgroundWork, ICachePeerAdapter + { + private const string LOG_SCOPE_NAME = "DISC"; + private static readonly TimeSpan InitialDelay = TimeSpan.FromSeconds(15); + + private readonly List _connectedPeers; + private readonly NodeConfig _config; + private readonly CachePeerMonitor _monitor; + private readonly bool IsDebug; + private readonly ILogProvider _log; + + public PeerDiscoveryManager(PluginBase plugin) + { + //Get config + _config = plugin.GetOrCreateSingleton(); + + //Get the known peers array from config, its allowed to be null for master nodes + IConfigScope? config = plugin.TryGetConfig("known_peers"); + string[] kownPeers = config?.Deserialze() ?? Array.Empty(); + + //Add known peers to the monitor + _config.Config.WithInitialPeers(kownPeers.Select(static s => new Uri(s))); + + plugin.Log.Information("Inital peer nodes: {nodes}", kownPeers); + + //Get the peer monitor + _monitor = plugin.GetOrCreateSingleton(); + + _connectedPeers = new(); + + //Create scoped logger + _log = plugin.Log.CreateScope(LOG_SCOPE_NAME); + + //Setup discovery error handler + _config.Config.WithErrorHandler(new ErrorHandler(_log)); + + IsDebug = plugin.IsDebug(); + } + + async Task IAsyncBackgroundWork.DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken) + { + /* + * This loop uses the peer monitor to keep track of all connected peers, then gets + * all the advertising peers (including known peers) and resolves all nodes across + * the network. + */ + + //Start the change listener + Task watcher = WatchForPeersAsync(exitToken); + + _log.Information("Node discovery worker started, waiting for {idel} to start initial discovery", InitialDelay); + + try + { + //Wait for the initial delay + await Task.Delay(InitialDelay, exitToken); + + _log.Debug("Begining discovery loop"); + + /* + * To avoid connecting to ourself, we add ourselves to the connected list + * and it should never get removed. This is because the monitor will never + * report our own advertisment. + */ + _connectedPeers.Add(_config.Config.Advertisment); + + while (true) + { + try + { + if (IsDebug) + { + _log.Debug("Begining node discovery"); + } + + //Resolve all known peers + CacheNodeAdvertisment[] wellKnown = await _config.Config.ResolveWellKnownAsync(exitToken); + + //Use the monitor to get the initial peers + IEnumerable ads = GetMonitorAds(); + + //Combine well-known with new connected peers + CacheNodeAdvertisment[] allAds = ads.Union(wellKnown).ToArray(); + + if (allAds.Length > 0) + { + //Discover all kown nodes + await _config.Config.DiscoverNodesAsync(allAds, exitToken); + } + + //Log the discovered nodes if verbose logging is enabled + if (IsDebug) + { + CacheNodeAdvertisment[] found = _config.Config.NodeCollection.GetAllNodes(); + + _log.Debug("Discovered {count} nodes\n\t{nodes}", found.Length, found.Select(static s => s.NodeId)); + } + } + catch(OperationCanceledException) + { + throw; + } + catch (Exception ex) + { + _log.Error(ex, "Failed to discover new peer nodes"); + } + + //Delay the next discovery + await Task.Delay(_config.DiscoveryInterval, exitToken); + } + } + catch (OperationCanceledException) + { + //Normal exit + _log.Information("Node discovery worker exiting"); + } + finally + { + + } + + //Wait for the watcher to exit + await watcher.ConfigureAwait(false); + } + + private IEnumerable GetMonitorAds() + { + return _monitor.GetAllPeers() + .Where(static p => p.Advertisment != null) + //Without us + .Where(n => n.NodeId != _config.Config.NodeId) + .Select(static p => p.Advertisment!); + } + + //Wait for new peers and update the collection + private async Task WatchForPeersAsync(CancellationToken cancellation) + { + try + { + _log.Debug("Discovery worker waiting for new peers to connect"); + + while (true) + { + + //Wait for changes, then get new peers + await _monitor.WaitForChangeAsync().WaitAsync(cancellation); + + _log.Verbose("New peers connected"); + + //Use the monitor to get the initial peers + IEnumerable ads = GetMonitorAds(); + + ((NodeDiscoveryCollection)_config.Config.NodeCollection).AddManualNodes(ads); + } + } + catch (OperationCanceledException) + { + //Normal ext + _log.Debug("Connected peer listener exited"); + } + } + + + /// + public CacheNodeAdvertisment[] GetNewPeers() + { + lock (_connectedPeers) + { + //Get all discovered peers + CacheNodeAdvertisment[] peers = _config.Config.NodeCollection.GetAllNodes(); + + //Get the difference between the discovered peers and the connected peers + return peers.Except(_connectedPeers).ToArray(); + } + } + + /// + public void OnPeerListenerAttached(CacheNodeAdvertisment peer) + { + lock (_connectedPeers) + { + //Add to connected peers + _connectedPeers.Add(peer); + } + } + + /// + public void OnPeerListenerDetatched(CacheNodeAdvertisment peer) + { + //remove from connected peers + lock (_connectedPeers) + { + _connectedPeers.Remove(peer); + } + } + + + private sealed record class ErrorHandler(ILogProvider Logger) : ICacheDiscoveryErrorHandler + { + public void OnDiscoveryError(CacheNodeAdvertisment errorNode, Exception ex) + { + + if (ex is HttpRequestException hre) + { + if (hre.InnerException is SocketException se) + { + //traisnport failed + Logger.Warn("Failed to connect to server {serv} because {err}", errorNode, se.Message); + } + else + { + Logger.Error("Failed to connect to node {n}\n{err}", errorNode, hre); + } + } + else + { + Logger.Error("Failed to discover nodes from nodeid {nid}, with error\n{err}", errorNode, ex); + } + + } + } + } +} -- cgit