diff options
author | vnugent <public@vaughnnugent.com> | 2023-11-06 23:48:57 -0500 |
---|---|---|
committer | vnugent <public@vaughnnugent.com> | 2023-11-06 23:48:57 -0500 |
commit | 97706d04dcb4c9a03ceb98290afbbdf05e3524d5 (patch) | |
tree | 22e4d8d9ff359a3b54242701d17d29ee92e9217f /plugins/VNLib.Data.Caching.Providers.VNCache/src/Clustering | |
parent | 7de39cdbd9aeabb30ea5b4fc986fc796d686929c (diff) |
mostly some cache stuff
Diffstat (limited to 'plugins/VNLib.Data.Caching.Providers.VNCache/src/Clustering')
-rw-r--r-- | plugins/VNLib.Data.Caching.Providers.VNCache/src/Clustering/ClusterNodeIndex.cs | 123 |
1 files changed, 96 insertions, 27 deletions
diff --git a/plugins/VNLib.Data.Caching.Providers.VNCache/src/Clustering/ClusterNodeIndex.cs b/plugins/VNLib.Data.Caching.Providers.VNCache/src/Clustering/ClusterNodeIndex.cs index a6e264c..a8fb0e1 100644 --- a/plugins/VNLib.Data.Caching.Providers.VNCache/src/Clustering/ClusterNodeIndex.cs +++ b/plugins/VNLib.Data.Caching.Providers.VNCache/src/Clustering/ClusterNodeIndex.cs @@ -22,54 +22,123 @@ * along with this program. If not, see https://www.gnu.org/licenses/. */ +using System; using System.Threading; +using System.Text.Json; +using System.Reflection; using System.Threading.Tasks; using VNLib.Utils.Logging; +using VNLib.Utils.Resources; using VNLib.Data.Caching.Extensions; using VNLib.Data.Caching.Extensions.Clustering; using VNLib.Plugins.Extensions.Loading.Events; namespace VNLib.Data.Caching.Providers.VNCache.Clustering { - internal sealed class ClusterNodeIndex : IClusterNodeIndex, IIntervalScheduleable + internal static class ClusterNodeIndex { - private readonly CacheClientConfiguration _config; - private Task _currentUpdate; + const string APP_DOMAIN_KEY = "vnlib.data.caching.providers.vncache"; + /* + * Safeley determines if an instance of a node index already exists in the app domain + * if so it returns that instance, otherwise it creates a new index instance and stores + * it in the app domain. + */ - public ClusterNodeIndex(CacheClientConfiguration config) + public static IClusterNodeIndex CreateIndex(CacheClientConfiguration config) { - _config = config; - _currentUpdate = Task.CompletedTask; - } + //Create a named semaphore to ensure only one index is created per app domain + using Semaphore sm = new (1, 1, APP_DOMAIN_KEY, out _); - ///<inheritdoc/> - public CacheNodeAdvertisment? GetNextNode() - { - //Get all nodes - CacheNodeAdvertisment[] ads = _config.NodeCollection.GetAllNodes(); - //Just get a random node from the collection for now - return ads.Length > 0 ? ads.SelectRandom() : null; + if (!sm.WaitOne(500)) + { + throw new TimeoutException("Failed to access the Cluster index shared semaphore"); + } + + try + { + //Try to get an existing index from the app domain + object? remoteIndex = AppDomain.CurrentDomain.GetData(APP_DOMAIN_KEY); + if (remoteIndex == null) + { + //Create a new index and store it in the app domain + IClusterNodeIndex index = new LocalHandler(config); + AppDomain.CurrentDomain.SetData(APP_DOMAIN_KEY, index); + return index; + } + else + { + //Use the existing index + return new RemoteHandler(remoteIndex); + } + } + finally + { + sm.Release(); + } } + - ///<inheritdoc/> - public Task WaitForDiscoveryAsync(CancellationToken cancellationToken) + record class LocalHandler(CacheClientConfiguration Config) : IClusterNodeIndex, IIntervalScheduleable { - return _currentUpdate.WaitAsync(cancellationToken); + private Task _currentUpdate = Task.CompletedTask; + + ///<inheritdoc/> + public CacheNodeAdvertisment? GetNextNode() + { + //Get all nodes + CacheNodeAdvertisment[] ads = Config.NodeCollection.GetAllNodes(); + //Just get a random node from the collection for now + return ads.Length > 0 ? ads.SelectRandom() : null; + } + + ///<inheritdoc/> + public Task WaitForDiscoveryAsync(CancellationToken cancellationToken) => _currentUpdate; + + ///<inheritdoc/> + public Task OnIntervalAsync(ILogProvider log, CancellationToken cancellationToken) + { + //Run discovery operation and update the task + _currentUpdate = Config.DiscoverNodesAsync(cancellationToken); + return Task.CompletedTask; + } + + /* + * Private methods that are called via reflection + * by remote instances of the index + */ + internal string? SerializeNextNode() + { + CacheNodeAdvertisment? nextNode = GetNextNode(); + return nextNode == null ? null : JsonSerializer.Serialize(nextNode); + } } - /// <summary> - /// Runs the discovery process and updates the current update task - /// </summary> - /// <param name="log"></param> - /// <param name="cancellationToken">A token to cancel the operation</param> - /// <returns>A task that completes when the discovery process is complete</returns> - public Task OnIntervalAsync(ILogProvider log, CancellationToken cancellationToken) + class RemoteHandler : IClusterNodeIndex { - //Run discovery operation and update the task - _currentUpdate = _config.DiscoverNodesAsync(cancellationToken); - return Task.CompletedTask; + private readonly Func<string?> _remoteSerializer; + private readonly Func<CancellationToken, Task> _waitTask; + + public RemoteHandler(object RemoteIndex) + { + //get the serializer method + _remoteSerializer = ManagedLibrary.GetMethod<Func<string?>>(RemoteIndex, nameof(LocalHandler.SerializeNextNode), BindingFlags.NonPublic); + //get the wait task method + _waitTask = ManagedLibrary.GetMethod<Func<CancellationToken, Task>>(RemoteIndex, nameof(WaitForDiscoveryAsync), BindingFlags.Public); + } + + ///<inheritdoc/> + public CacheNodeAdvertisment? GetNextNode() + { + //Deserialize the next node from the remote index + string? nexNode = _remoteSerializer(); + return nexNode == null ? null : JsonSerializer.Deserialize<CacheNodeAdvertisment>(nexNode); + } + + ///<inheritdoc/> + public Task WaitForDiscoveryAsync(CancellationToken cancellationToken) => _waitTask(cancellationToken); + } } }
\ No newline at end of file |