diff options
Diffstat (limited to 'plugins')
4 files changed, 163 insertions, 62 deletions
diff --git a/plugins/VNLib.Data.Caching.Providers.Redis/src/RedisClientCacheEntry.cs b/plugins/VNLib.Data.Caching.Providers.Redis/src/RedisClientCacheEntry.cs index a0a94c9..72e6020 100644 --- a/plugins/VNLib.Data.Caching.Providers.Redis/src/RedisClientCacheEntry.cs +++ b/plugins/VNLib.Data.Caching.Providers.Redis/src/RedisClientCacheEntry.cs @@ -53,7 +53,7 @@ namespace VNLib.Data.Caching.Providers.Redis * decision making where possible, such as protobufs */ - [ExternService] + [ServiceExport] [ConfigurationName("cache")] public sealed class RedisClientCacheEntry : IGlobalCacheProvider { diff --git a/plugins/VNLib.Data.Caching.Providers.VNCache/src/Clustering/ClusterNodeIndex.cs b/plugins/VNLib.Data.Caching.Providers.VNCache/src/Clustering/ClusterNodeIndex.cs index a6e264c..a8fb0e1 100644 --- a/plugins/VNLib.Data.Caching.Providers.VNCache/src/Clustering/ClusterNodeIndex.cs +++ b/plugins/VNLib.Data.Caching.Providers.VNCache/src/Clustering/ClusterNodeIndex.cs @@ -22,54 +22,123 @@ * along with this program. If not, see https://www.gnu.org/licenses/. */ +using System; using System.Threading; +using System.Text.Json; +using System.Reflection; using System.Threading.Tasks; using VNLib.Utils.Logging; +using VNLib.Utils.Resources; using VNLib.Data.Caching.Extensions; using VNLib.Data.Caching.Extensions.Clustering; using VNLib.Plugins.Extensions.Loading.Events; namespace VNLib.Data.Caching.Providers.VNCache.Clustering { - internal sealed class ClusterNodeIndex : IClusterNodeIndex, IIntervalScheduleable + internal static class ClusterNodeIndex { - private readonly CacheClientConfiguration _config; - private Task _currentUpdate; + const string APP_DOMAIN_KEY = "vnlib.data.caching.providers.vncache"; + /* + * Safeley determines if an instance of a node index already exists in the app domain + * if so it returns that instance, otherwise it creates a new index instance and stores + * it in the app domain. + */ - public ClusterNodeIndex(CacheClientConfiguration config) + public static IClusterNodeIndex CreateIndex(CacheClientConfiguration config) { - _config = config; - _currentUpdate = Task.CompletedTask; - } + //Create a named semaphore to ensure only one index is created per app domain + using Semaphore sm = new (1, 1, APP_DOMAIN_KEY, out _); - ///<inheritdoc/> - public CacheNodeAdvertisment? GetNextNode() - { - //Get all nodes - CacheNodeAdvertisment[] ads = _config.NodeCollection.GetAllNodes(); - //Just get a random node from the collection for now - return ads.Length > 0 ? ads.SelectRandom() : null; + if (!sm.WaitOne(500)) + { + throw new TimeoutException("Failed to access the Cluster index shared semaphore"); + } + + try + { + //Try to get an existing index from the app domain + object? remoteIndex = AppDomain.CurrentDomain.GetData(APP_DOMAIN_KEY); + if (remoteIndex == null) + { + //Create a new index and store it in the app domain + IClusterNodeIndex index = new LocalHandler(config); + AppDomain.CurrentDomain.SetData(APP_DOMAIN_KEY, index); + return index; + } + else + { + //Use the existing index + return new RemoteHandler(remoteIndex); + } + } + finally + { + sm.Release(); + } } + - ///<inheritdoc/> - public Task WaitForDiscoveryAsync(CancellationToken cancellationToken) + record class LocalHandler(CacheClientConfiguration Config) : IClusterNodeIndex, IIntervalScheduleable { - return _currentUpdate.WaitAsync(cancellationToken); + private Task _currentUpdate = Task.CompletedTask; + + ///<inheritdoc/> + public CacheNodeAdvertisment? GetNextNode() + { + //Get all nodes + CacheNodeAdvertisment[] ads = Config.NodeCollection.GetAllNodes(); + //Just get a random node from the collection for now + return ads.Length > 0 ? ads.SelectRandom() : null; + } + + ///<inheritdoc/> + public Task WaitForDiscoveryAsync(CancellationToken cancellationToken) => _currentUpdate; + + ///<inheritdoc/> + public Task OnIntervalAsync(ILogProvider log, CancellationToken cancellationToken) + { + //Run discovery operation and update the task + _currentUpdate = Config.DiscoverNodesAsync(cancellationToken); + return Task.CompletedTask; + } + + /* + * Private methods that are called via reflection + * by remote instances of the index + */ + internal string? SerializeNextNode() + { + CacheNodeAdvertisment? nextNode = GetNextNode(); + return nextNode == null ? null : JsonSerializer.Serialize(nextNode); + } } - /// <summary> - /// Runs the discovery process and updates the current update task - /// </summary> - /// <param name="log"></param> - /// <param name="cancellationToken">A token to cancel the operation</param> - /// <returns>A task that completes when the discovery process is complete</returns> - public Task OnIntervalAsync(ILogProvider log, CancellationToken cancellationToken) + class RemoteHandler : IClusterNodeIndex { - //Run discovery operation and update the task - _currentUpdate = _config.DiscoverNodesAsync(cancellationToken); - return Task.CompletedTask; + private readonly Func<string?> _remoteSerializer; + private readonly Func<CancellationToken, Task> _waitTask; + + public RemoteHandler(object RemoteIndex) + { + //get the serializer method + _remoteSerializer = ManagedLibrary.GetMethod<Func<string?>>(RemoteIndex, nameof(LocalHandler.SerializeNextNode), BindingFlags.NonPublic); + //get the wait task method + _waitTask = ManagedLibrary.GetMethod<Func<CancellationToken, Task>>(RemoteIndex, nameof(WaitForDiscoveryAsync), BindingFlags.Public); + } + + ///<inheritdoc/> + public CacheNodeAdvertisment? GetNextNode() + { + //Deserialize the next node from the remote index + string? nexNode = _remoteSerializer(); + return nexNode == null ? null : JsonSerializer.Deserialize<CacheNodeAdvertisment>(nexNode); + } + + ///<inheritdoc/> + public Task WaitForDiscoveryAsync(CancellationToken cancellationToken) => _waitTask(cancellationToken); + } } }
\ No newline at end of file diff --git a/plugins/VNLib.Data.Caching.Providers.VNCache/src/FBMCacheClient.cs b/plugins/VNLib.Data.Caching.Providers.VNCache/src/FBMCacheClient.cs index 7132212..0d266e8 100644 --- a/plugins/VNLib.Data.Caching.Providers.VNCache/src/FBMCacheClient.cs +++ b/plugins/VNLib.Data.Caching.Providers.VNCache/src/FBMCacheClient.cs @@ -51,14 +51,14 @@ namespace VNLib.Data.Caching.Providers.VNCache /// A base class that manages /// </summary> [ConfigurationName(VNCacheClient.CACHE_CONFIG_KEY)] - internal class FBMCacheClient : VNCacheBase, IAsyncBackgroundWork + internal sealed class FBMCacheClient : VNCacheBase, IAsyncBackgroundWork { private const string LOG_NAME = "CLIENT"; private static readonly TimeSpan InitialDelay = TimeSpan.FromSeconds(10); private static readonly TimeSpan NoNodeDelay = TimeSpan.FromSeconds(10); private readonly VnCacheClientConfig _config; - private readonly ClusterNodeIndex _index; + private readonly IClusterNodeIndex _index; private bool _isConnected; @@ -85,15 +85,19 @@ namespace VNLib.Data.Caching.Providers.VNCache .WithAuthenticator(new AuthManager(plugin)) .WithErrorHandler(new DiscoveryErrHAndler(scoped)); - //Schedule discovery interval - plugin.ScheduleInterval(_index, _config.DiscoveryInterval); - - //Run discovery after initial delay if interval is greater than initial delay - if (_config.DiscoveryInterval > InitialDelay) + //Only the master index is schedulable + if(_index is IIntervalScheduleable sch) { - //Run a manual initial load - scoped.Information("Running initial discovery in {delay}", InitialDelay); - _ = plugin.ObserveWork(() => _index.OnIntervalAsync(scoped, plugin.UnloadToken), (int)InitialDelay.TotalMilliseconds); + //Schedule discovery interval + plugin.ScheduleInterval(sch, _config.DiscoveryInterval); + + //Run discovery after initial delay if interval is greater than initial delay + if (_config.DiscoveryInterval > InitialDelay) + { + //Run a manual initial load + scoped.Information("Running initial discovery in {delay}", InitialDelay); + _ = plugin.ObserveWork(() => sch.OnIntervalAsync(scoped, plugin.UnloadToken), (int)InitialDelay.TotalMilliseconds); + } } } @@ -115,7 +119,7 @@ namespace VNLib.Data.Caching.Providers.VNCache .WithInitialPeers(config.GetInitialNodeUris()); //Init index - _index = new ClusterNodeIndex(Client.GetCacheConfiguration()); + _index = ClusterNodeIndex.CreateIndex(Client.GetCacheConfiguration()); } /* @@ -133,37 +137,65 @@ namespace VNLib.Data.Caching.Providers.VNCache pluginLog.Debug("Worker started, waiting for startup delay"); await Task.Delay((int)InitialDelay.TotalMilliseconds + 1000, exitToken); + CacheNodeAdvertisment? node = null; + while (true) { - try + //Check for master index + if (_index is IIntervalScheduleable sch) { - //Wait for a discovery to complete - await _index.WaitForDiscoveryAsync(exitToken); - } - catch (CacheDiscoveryFailureException cdfe) - { - pluginLog.Error("Failed to discover nodes, will try again\n{err}", cdfe.Message); - //Continue - } - - //Get the next node to connect to - CacheNodeAdvertisment? node = _index.GetNextNode(); + try + { + //Wait for a discovery to complete + await _index.WaitForDiscoveryAsync(exitToken); + } + catch (CacheDiscoveryFailureException cdfe) + { + pluginLog.Error("Failed to discover nodes, will try again\n{err}", cdfe.Message); + //Continue + } - if (node is null) - { - pluginLog.Warn("No nodes available to connect to, trying again in {delay}", NoNodeDelay); - await Task.Delay(NoNodeDelay, exitToken); + //Get the next node to connect to + node = _index.GetNextNode(); - //Run another manual discovery if the interval is greater than the delay - if (_config.DiscoveryInterval > NoNodeDelay) + if (node is null) { - pluginLog.Debug("Forcing a manual discovery"); + pluginLog.Warn("No nodes available to connect to, trying again in {delay}", NoNodeDelay); + await Task.Delay(NoNodeDelay, exitToken); + + //Run another manual discovery if the interval is greater than the delay + if (_config.DiscoveryInterval > NoNodeDelay) + { + pluginLog.Debug("Forcing a manual discovery"); + + //We dont need to await this because it is awaited at the top of the loop + _ = sch.OnIntervalAsync(pluginLog, exitToken); + } - //We dont need to await this because it is awaited at the top of the loop - _ = _index.OnIntervalAsync(pluginLog, exitToken); + continue; + } + } + else + { + try + { + //Wait for a discovery to complete + await _index.WaitForDiscoveryAsync(exitToken); + } + catch (CacheDiscoveryFailureException) + { + //Ignore as master instance will handle this error } - continue; + //Get the next node to connect to + node = _index.GetNextNode(); + + //Again master instance will handle this condition, we just need to wait + if(node is null) + { + await Task.Delay(NoNodeDelay, exitToken); + continue; + } } //Ready to connect diff --git a/plugins/VNLib.Data.Caching.Providers.VNCache/src/VNCacheClient.cs b/plugins/VNLib.Data.Caching.Providers.VNCache/src/VNCacheClient.cs index 3d66c9c..9807939 100644 --- a/plugins/VNLib.Data.Caching.Providers.VNCache/src/VNCacheClient.cs +++ b/plugins/VNLib.Data.Caching.Providers.VNCache/src/VNCacheClient.cs @@ -52,7 +52,7 @@ namespace VNLib.Data.Caching.Providers.VNCache /// methods. /// </para> /// </summary> - [ExternService] + [ServiceExport] [ConfigurationName(CACHE_CONFIG_KEY)] public sealed class VNCacheClient : IGlobalCacheProvider { |