aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--plugins/VNLib.Data.Caching.Providers.Redis/src/RedisClientCacheEntry.cs2
-rw-r--r--plugins/VNLib.Data.Caching.Providers.VNCache/src/Clustering/ClusterNodeIndex.cs123
-rw-r--r--plugins/VNLib.Data.Caching.Providers.VNCache/src/FBMCacheClient.cs98
-rw-r--r--plugins/VNLib.Data.Caching.Providers.VNCache/src/VNCacheClient.cs2
4 files changed, 163 insertions, 62 deletions
diff --git a/plugins/VNLib.Data.Caching.Providers.Redis/src/RedisClientCacheEntry.cs b/plugins/VNLib.Data.Caching.Providers.Redis/src/RedisClientCacheEntry.cs
index a0a94c9..72e6020 100644
--- a/plugins/VNLib.Data.Caching.Providers.Redis/src/RedisClientCacheEntry.cs
+++ b/plugins/VNLib.Data.Caching.Providers.Redis/src/RedisClientCacheEntry.cs
@@ -53,7 +53,7 @@ namespace VNLib.Data.Caching.Providers.Redis
* decision making where possible, such as protobufs
*/
- [ExternService]
+ [ServiceExport]
[ConfigurationName("cache")]
public sealed class RedisClientCacheEntry : IGlobalCacheProvider
{
diff --git a/plugins/VNLib.Data.Caching.Providers.VNCache/src/Clustering/ClusterNodeIndex.cs b/plugins/VNLib.Data.Caching.Providers.VNCache/src/Clustering/ClusterNodeIndex.cs
index a6e264c..a8fb0e1 100644
--- a/plugins/VNLib.Data.Caching.Providers.VNCache/src/Clustering/ClusterNodeIndex.cs
+++ b/plugins/VNLib.Data.Caching.Providers.VNCache/src/Clustering/ClusterNodeIndex.cs
@@ -22,54 +22,123 @@
* along with this program. If not, see https://www.gnu.org/licenses/.
*/
+using System;
using System.Threading;
+using System.Text.Json;
+using System.Reflection;
using System.Threading.Tasks;
using VNLib.Utils.Logging;
+using VNLib.Utils.Resources;
using VNLib.Data.Caching.Extensions;
using VNLib.Data.Caching.Extensions.Clustering;
using VNLib.Plugins.Extensions.Loading.Events;
namespace VNLib.Data.Caching.Providers.VNCache.Clustering
{
- internal sealed class ClusterNodeIndex : IClusterNodeIndex, IIntervalScheduleable
+ internal static class ClusterNodeIndex
{
- private readonly CacheClientConfiguration _config;
- private Task _currentUpdate;
+ const string APP_DOMAIN_KEY = "vnlib.data.caching.providers.vncache";
+ /*
+ * Safeley determines if an instance of a node index already exists in the app domain
+ * if so it returns that instance, otherwise it creates a new index instance and stores
+ * it in the app domain.
+ */
- public ClusterNodeIndex(CacheClientConfiguration config)
+ public static IClusterNodeIndex CreateIndex(CacheClientConfiguration config)
{
- _config = config;
- _currentUpdate = Task.CompletedTask;
- }
+ //Create a named semaphore to ensure only one index is created per app domain
+ using Semaphore sm = new (1, 1, APP_DOMAIN_KEY, out _);
- ///<inheritdoc/>
- public CacheNodeAdvertisment? GetNextNode()
- {
- //Get all nodes
- CacheNodeAdvertisment[] ads = _config.NodeCollection.GetAllNodes();
- //Just get a random node from the collection for now
- return ads.Length > 0 ? ads.SelectRandom() : null;
+ if (!sm.WaitOne(500))
+ {
+ throw new TimeoutException("Failed to access the Cluster index shared semaphore");
+ }
+
+ try
+ {
+ //Try to get an existing index from the app domain
+ object? remoteIndex = AppDomain.CurrentDomain.GetData(APP_DOMAIN_KEY);
+ if (remoteIndex == null)
+ {
+ //Create a new index and store it in the app domain
+ IClusterNodeIndex index = new LocalHandler(config);
+ AppDomain.CurrentDomain.SetData(APP_DOMAIN_KEY, index);
+ return index;
+ }
+ else
+ {
+ //Use the existing index
+ return new RemoteHandler(remoteIndex);
+ }
+ }
+ finally
+ {
+ sm.Release();
+ }
}
+
- ///<inheritdoc/>
- public Task WaitForDiscoveryAsync(CancellationToken cancellationToken)
+ record class LocalHandler(CacheClientConfiguration Config) : IClusterNodeIndex, IIntervalScheduleable
{
- return _currentUpdate.WaitAsync(cancellationToken);
+ private Task _currentUpdate = Task.CompletedTask;
+
+ ///<inheritdoc/>
+ public CacheNodeAdvertisment? GetNextNode()
+ {
+ //Get all nodes
+ CacheNodeAdvertisment[] ads = Config.NodeCollection.GetAllNodes();
+ //Just get a random node from the collection for now
+ return ads.Length > 0 ? ads.SelectRandom() : null;
+ }
+
+ ///<inheritdoc/>
+ public Task WaitForDiscoveryAsync(CancellationToken cancellationToken) => _currentUpdate;
+
+ ///<inheritdoc/>
+ public Task OnIntervalAsync(ILogProvider log, CancellationToken cancellationToken)
+ {
+ //Run discovery operation and update the task
+ _currentUpdate = Config.DiscoverNodesAsync(cancellationToken);
+ return Task.CompletedTask;
+ }
+
+ /*
+ * Private methods that are called via reflection
+ * by remote instances of the index
+ */
+ internal string? SerializeNextNode()
+ {
+ CacheNodeAdvertisment? nextNode = GetNextNode();
+ return nextNode == null ? null : JsonSerializer.Serialize(nextNode);
+ }
}
- /// <summary>
- /// Runs the discovery process and updates the current update task
- /// </summary>
- /// <param name="log"></param>
- /// <param name="cancellationToken">A token to cancel the operation</param>
- /// <returns>A task that completes when the discovery process is complete</returns>
- public Task OnIntervalAsync(ILogProvider log, CancellationToken cancellationToken)
+ class RemoteHandler : IClusterNodeIndex
{
- //Run discovery operation and update the task
- _currentUpdate = _config.DiscoverNodesAsync(cancellationToken);
- return Task.CompletedTask;
+ private readonly Func<string?> _remoteSerializer;
+ private readonly Func<CancellationToken, Task> _waitTask;
+
+ public RemoteHandler(object RemoteIndex)
+ {
+ //get the serializer method
+ _remoteSerializer = ManagedLibrary.GetMethod<Func<string?>>(RemoteIndex, nameof(LocalHandler.SerializeNextNode), BindingFlags.NonPublic);
+ //get the wait task method
+ _waitTask = ManagedLibrary.GetMethod<Func<CancellationToken, Task>>(RemoteIndex, nameof(WaitForDiscoveryAsync), BindingFlags.Public);
+ }
+
+ ///<inheritdoc/>
+ public CacheNodeAdvertisment? GetNextNode()
+ {
+ //Deserialize the next node from the remote index
+ string? nexNode = _remoteSerializer();
+ return nexNode == null ? null : JsonSerializer.Deserialize<CacheNodeAdvertisment>(nexNode);
+ }
+
+ ///<inheritdoc/>
+ public Task WaitForDiscoveryAsync(CancellationToken cancellationToken) => _waitTask(cancellationToken);
+
}
}
} \ No newline at end of file
diff --git a/plugins/VNLib.Data.Caching.Providers.VNCache/src/FBMCacheClient.cs b/plugins/VNLib.Data.Caching.Providers.VNCache/src/FBMCacheClient.cs
index 7132212..0d266e8 100644
--- a/plugins/VNLib.Data.Caching.Providers.VNCache/src/FBMCacheClient.cs
+++ b/plugins/VNLib.Data.Caching.Providers.VNCache/src/FBMCacheClient.cs
@@ -51,14 +51,14 @@ namespace VNLib.Data.Caching.Providers.VNCache
/// A base class that manages
/// </summary>
[ConfigurationName(VNCacheClient.CACHE_CONFIG_KEY)]
- internal class FBMCacheClient : VNCacheBase, IAsyncBackgroundWork
+ internal sealed class FBMCacheClient : VNCacheBase, IAsyncBackgroundWork
{
private const string LOG_NAME = "CLIENT";
private static readonly TimeSpan InitialDelay = TimeSpan.FromSeconds(10);
private static readonly TimeSpan NoNodeDelay = TimeSpan.FromSeconds(10);
private readonly VnCacheClientConfig _config;
- private readonly ClusterNodeIndex _index;
+ private readonly IClusterNodeIndex _index;
private bool _isConnected;
@@ -85,15 +85,19 @@ namespace VNLib.Data.Caching.Providers.VNCache
.WithAuthenticator(new AuthManager(plugin))
.WithErrorHandler(new DiscoveryErrHAndler(scoped));
- //Schedule discovery interval
- plugin.ScheduleInterval(_index, _config.DiscoveryInterval);
-
- //Run discovery after initial delay if interval is greater than initial delay
- if (_config.DiscoveryInterval > InitialDelay)
+ //Only the master index is schedulable
+ if(_index is IIntervalScheduleable sch)
{
- //Run a manual initial load
- scoped.Information("Running initial discovery in {delay}", InitialDelay);
- _ = plugin.ObserveWork(() => _index.OnIntervalAsync(scoped, plugin.UnloadToken), (int)InitialDelay.TotalMilliseconds);
+ //Schedule discovery interval
+ plugin.ScheduleInterval(sch, _config.DiscoveryInterval);
+
+ //Run discovery after initial delay if interval is greater than initial delay
+ if (_config.DiscoveryInterval > InitialDelay)
+ {
+ //Run a manual initial load
+ scoped.Information("Running initial discovery in {delay}", InitialDelay);
+ _ = plugin.ObserveWork(() => sch.OnIntervalAsync(scoped, plugin.UnloadToken), (int)InitialDelay.TotalMilliseconds);
+ }
}
}
@@ -115,7 +119,7 @@ namespace VNLib.Data.Caching.Providers.VNCache
.WithInitialPeers(config.GetInitialNodeUris());
//Init index
- _index = new ClusterNodeIndex(Client.GetCacheConfiguration());
+ _index = ClusterNodeIndex.CreateIndex(Client.GetCacheConfiguration());
}
/*
@@ -133,37 +137,65 @@ namespace VNLib.Data.Caching.Providers.VNCache
pluginLog.Debug("Worker started, waiting for startup delay");
await Task.Delay((int)InitialDelay.TotalMilliseconds + 1000, exitToken);
+ CacheNodeAdvertisment? node = null;
+
while (true)
{
- try
+ //Check for master index
+ if (_index is IIntervalScheduleable sch)
{
- //Wait for a discovery to complete
- await _index.WaitForDiscoveryAsync(exitToken);
- }
- catch (CacheDiscoveryFailureException cdfe)
- {
- pluginLog.Error("Failed to discover nodes, will try again\n{err}", cdfe.Message);
- //Continue
- }
-
- //Get the next node to connect to
- CacheNodeAdvertisment? node = _index.GetNextNode();
+ try
+ {
+ //Wait for a discovery to complete
+ await _index.WaitForDiscoveryAsync(exitToken);
+ }
+ catch (CacheDiscoveryFailureException cdfe)
+ {
+ pluginLog.Error("Failed to discover nodes, will try again\n{err}", cdfe.Message);
+ //Continue
+ }
- if (node is null)
- {
- pluginLog.Warn("No nodes available to connect to, trying again in {delay}", NoNodeDelay);
- await Task.Delay(NoNodeDelay, exitToken);
+ //Get the next node to connect to
+ node = _index.GetNextNode();
- //Run another manual discovery if the interval is greater than the delay
- if (_config.DiscoveryInterval > NoNodeDelay)
+ if (node is null)
{
- pluginLog.Debug("Forcing a manual discovery");
+ pluginLog.Warn("No nodes available to connect to, trying again in {delay}", NoNodeDelay);
+ await Task.Delay(NoNodeDelay, exitToken);
+
+ //Run another manual discovery if the interval is greater than the delay
+ if (_config.DiscoveryInterval > NoNodeDelay)
+ {
+ pluginLog.Debug("Forcing a manual discovery");
+
+ //We dont need to await this because it is awaited at the top of the loop
+ _ = sch.OnIntervalAsync(pluginLog, exitToken);
+ }
- //We dont need to await this because it is awaited at the top of the loop
- _ = _index.OnIntervalAsync(pluginLog, exitToken);
+ continue;
+ }
+ }
+ else
+ {
+ try
+ {
+ //Wait for a discovery to complete
+ await _index.WaitForDiscoveryAsync(exitToken);
+ }
+ catch (CacheDiscoveryFailureException)
+ {
+ //Ignore as master instance will handle this error
}
- continue;
+ //Get the next node to connect to
+ node = _index.GetNextNode();
+
+ //Again master instance will handle this condition, we just need to wait
+ if(node is null)
+ {
+ await Task.Delay(NoNodeDelay, exitToken);
+ continue;
+ }
}
//Ready to connect
diff --git a/plugins/VNLib.Data.Caching.Providers.VNCache/src/VNCacheClient.cs b/plugins/VNLib.Data.Caching.Providers.VNCache/src/VNCacheClient.cs
index 3d66c9c..9807939 100644
--- a/plugins/VNLib.Data.Caching.Providers.VNCache/src/VNCacheClient.cs
+++ b/plugins/VNLib.Data.Caching.Providers.VNCache/src/VNCacheClient.cs
@@ -52,7 +52,7 @@ namespace VNLib.Data.Caching.Providers.VNCache
/// methods.
/// </para>
/// </summary>
- [ExternService]
+ [ServiceExport]
[ConfigurationName(CACHE_CONFIG_KEY)]
public sealed class VNCacheClient : IGlobalCacheProvider
{