aboutsummaryrefslogtreecommitdiff
path: root/plugins/VNLib.Data.Caching.Providers.VNCache/src/FBMCacheClient.cs
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/VNLib.Data.Caching.Providers.VNCache/src/FBMCacheClient.cs')
-rw-r--r--plugins/VNLib.Data.Caching.Providers.VNCache/src/FBMCacheClient.cs80
1 files changed, 45 insertions, 35 deletions
diff --git a/plugins/VNLib.Data.Caching.Providers.VNCache/src/FBMCacheClient.cs b/plugins/VNLib.Data.Caching.Providers.VNCache/src/FBMCacheClient.cs
index 5fbebcd..e21cf4a 100644
--- a/plugins/VNLib.Data.Caching.Providers.VNCache/src/FBMCacheClient.cs
+++ b/plugins/VNLib.Data.Caching.Providers.VNCache/src/FBMCacheClient.cs
@@ -59,13 +59,16 @@ namespace VNLib.Data.Caching.Providers.VNCache
private readonly VnCacheClientConfig _config;
private readonly IClusterNodeIndex _index;
+ private readonly FBMClientFactory _clientFactory;
+ private readonly TimeSpan _initNodeDelay;
private bool _isConnected;
+ private FBMClient? _client;
/// <summary>
- /// The internal client
+ /// The internal heap used for FBMClients
/// </summary>
- public FBMClient Client { get; }
+ public IUnmangedHeap BufferHeap { get; } = MemoryUtil.Shared;
/// <summary>
/// Gets a value that determines if the client is currently connected to a server
@@ -73,30 +76,30 @@ namespace VNLib.Data.Caching.Providers.VNCache
public override bool IsConnected => _isConnected;
public FBMCacheClient(PluginBase plugin, IConfigScope config)
- : this(
- config.Deserialze<VnCacheClientConfig>(),
- plugin.IsDebug() ? plugin.Log : null
- )
+ : this(
+ config.Deserialze<VnCacheClientConfig>(),
+ plugin.IsDebug() ? plugin.Log : null
+ )
{
ILogProvider scoped = plugin.Log.CreateScope(LOG_NAME);
//Set authenticator and error handler
- Client.GetCacheConfiguration()
+ _clientFactory.GetCacheConfiguration()
.WithAuthenticator(new AuthManager(plugin))
.WithErrorHandler(new DiscoveryErrHAndler(scoped));
//Only the master index is schedulable
- if(_index is IIntervalScheduleable sch)
+ if (_index is IIntervalScheduleable sch)
{
//Schedule discovery interval
plugin.ScheduleInterval(sch, _config.DiscoveryInterval);
//Run discovery after initial delay if interval is greater than initial delay
- if (_config.DiscoveryInterval > InitialDelay)
+ if (_config.DiscoveryInterval > _initNodeDelay)
{
//Run a manual initial load
- scoped.Information("Running initial discovery in {delay}", InitialDelay);
- _ = plugin.ObserveWork(() => sch.OnIntervalAsync(scoped, plugin.UnloadToken), (int)InitialDelay.TotalMilliseconds);
+ scoped.Information("Running initial discovery in {delay}", _initNodeDelay);
+ _ = plugin.ObserveWork(() => sch.OnIntervalAsync(scoped, plugin.UnloadToken), (int)_initNodeDelay.TotalMilliseconds);
}
}
}
@@ -108,18 +111,22 @@ namespace VNLib.Data.Caching.Providers.VNCache
_config = config;
+ //Set a default node delay if null
+ _initNodeDelay = _config.InitialNodeDelay.HasValue ? TimeSpan.FromSeconds(_config.InitialNodeDelay.Value) : InitialDelay;
+
//Init the client with default settings
- FBMClientConfig conf = FBMDataCacheExtensions.GetDefaultConfig(MemoryUtil.Shared, (int)config.MaxBlobSize, config.RequestTimeout, debugLog);
+ FBMClientConfig conf = FBMDataCacheExtensions.GetDefaultConfig(BufferHeap, (int)config.MaxBlobSize, config.RequestTimeout, debugLog);
- Client = new(conf);
+ FBMFallbackClientWsFactory wsFactory = new();
+ _clientFactory = new(in conf, wsFactory);
//Add the configuration to the client
- Client.GetCacheConfiguration()
+ _clientFactory.GetCacheConfiguration()
.WithTls(config.UseTls)
.WithInitialPeers(config.GetInitialNodeUris());
//Init index
- _index = ClusterNodeIndex.CreateIndex(Client.GetCacheConfiguration());
+ _index = ClusterNodeIndex.CreateIndex(_clientFactory.GetCacheConfiguration());
}
/*
@@ -135,13 +142,17 @@ namespace VNLib.Data.Caching.Providers.VNCache
{
//Initial delay
pluginLog.Debug("Worker started, waiting for startup delay");
- await Task.Delay((int)InitialDelay.TotalMilliseconds + 1000, exitToken);
+ await Task.Delay(_initNodeDelay, exitToken);
CacheNodeAdvertisment? node = null;
while (true)
{
- //Check for master index
+ /*
+ * The cache node index is shared across plugin boundries. If the current
+ * instance is holding the master index, it will be scheduleable, and
+ * can be manually invoked if no nodes are found
+ */
if (_index is IIntervalScheduleable sch)
{
try
@@ -204,8 +215,8 @@ namespace VNLib.Data.Caching.Providers.VNCache
{
pluginLog.Debug("Connecting to {node}", node);
- //Connect to the node
- await Client.ConnectToCacheAsync(node, exitToken);
+ //Connect to the node and save new client
+ _client = await _clientFactory.ConnectToCacheAsync(node, exitToken);
if (pluginLog.IsEnabled(LogLevel.Debug))
{
@@ -220,7 +231,7 @@ namespace VNLib.Data.Caching.Providers.VNCache
_isConnected = true;
//Wait for disconnect
- await Client.WaitForExitAsync(exitToken);
+ await _client.WaitForExitAsync(exitToken);
pluginLog.Information("Cache server disconnected");
}
@@ -248,6 +259,9 @@ namespace VNLib.Data.Caching.Providers.VNCache
finally
{
_isConnected = false;
+
+ //Cleanup client
+ _client?.Dispose();
}
//Loop again
@@ -265,11 +279,7 @@ namespace VNLib.Data.Caching.Providers.VNCache
{
pluginLog.Error(ex, "Unhandled exception occured in background cache client listening task");
}
- finally
- {
- //Dispose the client on exit
- Client.Dispose();
- }
+
pluginLog.Information("Cache client exited");
}
@@ -279,43 +289,43 @@ namespace VNLib.Data.Caching.Providers.VNCache
{
return !IsConnected
? throw new InvalidOperationException("The underlying client is not connected to a cache node")
- : Client!.DeleteObjectAsync(key, cancellation);
+ : _client!.DeleteObjectAsync(key, cancellation);
}
///<inheritdoc/>
public override Task<T> GetAsync<T>(string key, ICacheObjectDeserializer deserializer, CancellationToken cancellation)
{
return !IsConnected
- ? throw new InvalidOperationException("The underlying client is not connected to a cache node")
- : Client!.GetObjectAsync<T>(key, deserializer, cancellation);
+ ? throw new InvalidOperationException("The underlying client is not connected to a cache node")
+ : _client!.GetObjectAsync<T>(key, deserializer, cancellation);
}
///<inheritdoc/>
public override Task AddOrUpdateAsync<T>(string key, string? newKey, T value, ICacheObjectSerializer serialzer, CancellationToken cancellation)
{
return !IsConnected
- ? throw new InvalidOperationException("The underlying client is not connected to a cache node")
- : Client!.AddOrUpdateObjectAsync(key, newKey, value, serialzer, cancellation);
+ ? throw new InvalidOperationException("The underlying client is not connected to a cache node")
+ : _client!.AddOrUpdateObjectAsync(key, newKey, value, serialzer, cancellation);
}
///<inheritdoc/>
public override Task GetAsync<T>(string key, ObjectDataSet<T> callback, T state, CancellationToken cancellation)
{
return !IsConnected
- ? throw new InvalidOperationException("The underlying client is not connected to a cache node")
- : Client!.GetObjectAsync(key, callback, state, cancellation);
+ ? throw new InvalidOperationException("The underlying client is not connected to a cache node")
+ : _client!.GetObjectAsync(key, callback, state, cancellation);
}
///<inheritdoc/>
public override Task AddOrUpdateAsync<T>(string key, string? newKey, ObjectDataReader<T> callback, T state, CancellationToken cancellation)
{
return !IsConnected
- ? throw new InvalidOperationException("The underlying client is not connected to a cache node")
- : Client!.AddOrUpdateObjectAsync(key, newKey, callback, state, cancellation);
+ ? throw new InvalidOperationException("The underlying client is not connected to a cache node")
+ : _client!.AddOrUpdateObjectAsync(key, newKey, callback, state, cancellation);
}
///<inheritdoc/>
- public override object GetUnderlyingStore() => Client; //Client is the underlying "store"
+ public override object GetUnderlyingStore() => _client ?? throw new InvalidOperationException("The client is not currently connected");
private sealed class AuthManager : ICacheAuthManager
{