aboutsummaryrefslogtreecommitdiff
path: root/plugins/VNLib.Data.Caching.Providers.VNCache/src/Clustering
diff options
context:
space:
mode:
authorLibravatar vnugent <public@vaughnnugent.com>2023-11-06 23:48:57 -0500
committerLibravatar vnugent <public@vaughnnugent.com>2023-11-06 23:48:57 -0500
commit97706d04dcb4c9a03ceb98290afbbdf05e3524d5 (patch)
tree22e4d8d9ff359a3b54242701d17d29ee92e9217f /plugins/VNLib.Data.Caching.Providers.VNCache/src/Clustering
parent7de39cdbd9aeabb30ea5b4fc986fc796d686929c (diff)
mostly some cache stuff
Diffstat (limited to 'plugins/VNLib.Data.Caching.Providers.VNCache/src/Clustering')
-rw-r--r--plugins/VNLib.Data.Caching.Providers.VNCache/src/Clustering/ClusterNodeIndex.cs123
1 files changed, 96 insertions, 27 deletions
diff --git a/plugins/VNLib.Data.Caching.Providers.VNCache/src/Clustering/ClusterNodeIndex.cs b/plugins/VNLib.Data.Caching.Providers.VNCache/src/Clustering/ClusterNodeIndex.cs
index a6e264c..a8fb0e1 100644
--- a/plugins/VNLib.Data.Caching.Providers.VNCache/src/Clustering/ClusterNodeIndex.cs
+++ b/plugins/VNLib.Data.Caching.Providers.VNCache/src/Clustering/ClusterNodeIndex.cs
@@ -22,54 +22,123 @@
* along with this program. If not, see https://www.gnu.org/licenses/.
*/
+using System;
using System.Threading;
+using System.Text.Json;
+using System.Reflection;
using System.Threading.Tasks;
using VNLib.Utils.Logging;
+using VNLib.Utils.Resources;
using VNLib.Data.Caching.Extensions;
using VNLib.Data.Caching.Extensions.Clustering;
using VNLib.Plugins.Extensions.Loading.Events;
namespace VNLib.Data.Caching.Providers.VNCache.Clustering
{
- internal sealed class ClusterNodeIndex : IClusterNodeIndex, IIntervalScheduleable
+ internal static class ClusterNodeIndex
{
- private readonly CacheClientConfiguration _config;
- private Task _currentUpdate;
+ const string APP_DOMAIN_KEY = "vnlib.data.caching.providers.vncache";
+ /*
+ * Safeley determines if an instance of a node index already exists in the app domain
+ * if so it returns that instance, otherwise it creates a new index instance and stores
+ * it in the app domain.
+ */
- public ClusterNodeIndex(CacheClientConfiguration config)
+ public static IClusterNodeIndex CreateIndex(CacheClientConfiguration config)
{
- _config = config;
- _currentUpdate = Task.CompletedTask;
- }
+ //Create a named semaphore to ensure only one index is created per app domain
+ using Semaphore sm = new (1, 1, APP_DOMAIN_KEY, out _);
- ///<inheritdoc/>
- public CacheNodeAdvertisment? GetNextNode()
- {
- //Get all nodes
- CacheNodeAdvertisment[] ads = _config.NodeCollection.GetAllNodes();
- //Just get a random node from the collection for now
- return ads.Length > 0 ? ads.SelectRandom() : null;
+ if (!sm.WaitOne(500))
+ {
+ throw new TimeoutException("Failed to access the Cluster index shared semaphore");
+ }
+
+ try
+ {
+ //Try to get an existing index from the app domain
+ object? remoteIndex = AppDomain.CurrentDomain.GetData(APP_DOMAIN_KEY);
+ if (remoteIndex == null)
+ {
+ //Create a new index and store it in the app domain
+ IClusterNodeIndex index = new LocalHandler(config);
+ AppDomain.CurrentDomain.SetData(APP_DOMAIN_KEY, index);
+ return index;
+ }
+ else
+ {
+ //Use the existing index
+ return new RemoteHandler(remoteIndex);
+ }
+ }
+ finally
+ {
+ sm.Release();
+ }
}
+
- ///<inheritdoc/>
- public Task WaitForDiscoveryAsync(CancellationToken cancellationToken)
+ record class LocalHandler(CacheClientConfiguration Config) : IClusterNodeIndex, IIntervalScheduleable
{
- return _currentUpdate.WaitAsync(cancellationToken);
+ private Task _currentUpdate = Task.CompletedTask;
+
+ ///<inheritdoc/>
+ public CacheNodeAdvertisment? GetNextNode()
+ {
+ //Get all nodes
+ CacheNodeAdvertisment[] ads = Config.NodeCollection.GetAllNodes();
+ //Just get a random node from the collection for now
+ return ads.Length > 0 ? ads.SelectRandom() : null;
+ }
+
+ ///<inheritdoc/>
+ public Task WaitForDiscoveryAsync(CancellationToken cancellationToken) => _currentUpdate;
+
+ ///<inheritdoc/>
+ public Task OnIntervalAsync(ILogProvider log, CancellationToken cancellationToken)
+ {
+ //Run discovery operation and update the task
+ _currentUpdate = Config.DiscoverNodesAsync(cancellationToken);
+ return Task.CompletedTask;
+ }
+
+ /*
+ * Private methods that are called via reflection
+ * by remote instances of the index
+ */
+ internal string? SerializeNextNode()
+ {
+ CacheNodeAdvertisment? nextNode = GetNextNode();
+ return nextNode == null ? null : JsonSerializer.Serialize(nextNode);
+ }
}
- /// <summary>
- /// Runs the discovery process and updates the current update task
- /// </summary>
- /// <param name="log"></param>
- /// <param name="cancellationToken">A token to cancel the operation</param>
- /// <returns>A task that completes when the discovery process is complete</returns>
- public Task OnIntervalAsync(ILogProvider log, CancellationToken cancellationToken)
+ class RemoteHandler : IClusterNodeIndex
{
- //Run discovery operation and update the task
- _currentUpdate = _config.DiscoverNodesAsync(cancellationToken);
- return Task.CompletedTask;
+ private readonly Func<string?> _remoteSerializer;
+ private readonly Func<CancellationToken, Task> _waitTask;
+
+ public RemoteHandler(object RemoteIndex)
+ {
+ //get the serializer method
+ _remoteSerializer = ManagedLibrary.GetMethod<Func<string?>>(RemoteIndex, nameof(LocalHandler.SerializeNextNode), BindingFlags.NonPublic);
+ //get the wait task method
+ _waitTask = ManagedLibrary.GetMethod<Func<CancellationToken, Task>>(RemoteIndex, nameof(WaitForDiscoveryAsync), BindingFlags.Public);
+ }
+
+ ///<inheritdoc/>
+ public CacheNodeAdvertisment? GetNextNode()
+ {
+ //Deserialize the next node from the remote index
+ string? nexNode = _remoteSerializer();
+ return nexNode == null ? null : JsonSerializer.Deserialize<CacheNodeAdvertisment>(nexNode);
+ }
+
+ ///<inheritdoc/>
+ public Task WaitForDiscoveryAsync(CancellationToken cancellationToken) => _waitTask(cancellationToken);
+
}
}
} \ No newline at end of file