From e25666bbf408ff33c09dc8e2c5fe2d052363595f Mon Sep 17 00:00:00 2001 From: vnugent Date: Wed, 29 Nov 2023 00:17:13 -0500 Subject: Integrate FBM core changes for immutable client instances --- .../ObjectCacheServer/src/Cache/CacheSystemUtil.cs | 180 ++++----------------- .../src/Clustering/CacheNodeReplicationMaanger.cs | 10 +- .../src/RedisClientCacheEntry.cs | 4 +- .../src/FBMCacheClient.cs | 80 +++++---- .../src/MemoryCache.cs | 1 + .../src/RemoteBackedMemoryCache.cs | 2 +- .../src/VNCacheBase.cs | 4 +- .../src/VnCacheClientConfig.cs | 6 + 8 files changed, 95 insertions(+), 192 deletions(-) (limited to 'plugins') diff --git a/plugins/ObjectCacheServer/src/Cache/CacheSystemUtil.cs b/plugins/ObjectCacheServer/src/Cache/CacheSystemUtil.cs index a02fe75..b7bf83f 100644 --- a/plugins/ObjectCacheServer/src/Cache/CacheSystemUtil.cs +++ b/plugins/ObjectCacheServer/src/Cache/CacheSystemUtil.cs @@ -25,10 +25,8 @@ using System; using System.IO; using System.Text.Json; -using System.Collections; -using System.Collections.Generic; -using System.Runtime.CompilerServices; +using VNLib.Utils.Resources; using VNLib.Plugins; using VNLib.Plugins.Extensions.Loading; @@ -38,8 +36,8 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache { const string PERSISTANT_ASM_CONFIF_KEY = "persistant_cache_asm"; const string USER_CACHE_ASM_CONFIG_KEY = "custom_cache_impl_asm"; - const string LOAD_METHOD_NAME = "OnRuntimeLoad"; - const string TEARDOWN_METHOD_NAME = "OnSystemDetach"; + const string INITIALIZE_METHOD_NAME = "OnInitializeForBucket"; + const string LOAD_METHOD_NAME = "OnRuntimeLoad"; /// /// Loads the implementation (dynamic or default) into the process @@ -53,8 +51,12 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache /// public static IBlobCacheTable LoadMemoryCacheSystem(this PluginBase plugin, IConfigScope config, ICacheMemoryManagerFactory heap, CacheConfiguration cacheConf) { +#pragma warning disable CA2000 // Dispose objects before losing scope + //First, try to load persitant cache store - PersistantCacheManager? pCManager = GetPersistantStore(plugin, config); + IPersistantCacheStore? pCManager = GetPersistantStore(plugin, config); + +#pragma warning restore CA2000 // Dispose objects before losing scope IBlobCacheTable table; @@ -64,46 +66,42 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache string asmName = customEl.GetString() ?? throw new FileNotFoundException("User defined a custom blob cache assembly but the file name was null"); //Return the runtime loaded table - table = LoadCustomMemCacheTable(plugin, asmName, pCManager); + table = plugin.CreateServiceExternal(asmName); + + //Try to get the load method and pass the persistant cache instance + ManagedLibrary.TryGetMethod>(table, LOAD_METHOD_NAME)?.Invoke(plugin, pCManager); } else { //Default type - table = GetInternalBlobCache(heap, cacheConf, pCManager); + table = new BlobCacheTable(cacheConf.BucketCount, cacheConf.MaxCacheEntries, heap, pCManager); } - //Initialize the subsystem from the cache table - pCManager?.InitializeSubsystem(table); + if(pCManager != null) + { + //Initialize the subsystem from the cache table + InitializeSubsystem(pCManager, table); + } return table; } - private static IBlobCacheTable GetInternalBlobCache(ICacheMemoryManagerFactory heap, CacheConfiguration config, IPersistantCacheStore? store) - { - return new BlobCacheTable(config.BucketCount, config.MaxCacheEntries, heap, store); - } - - private static IBlobCacheTable LoadCustomMemCacheTable(PluginBase plugin, string asmName, IPersistantCacheStore? store) + private static void InitializeSubsystem(IPersistantCacheStore store, IBlobCacheTable table) { - //Load the custom assembly - AssemblyLoader customTable = plugin.LoadAssembly(asmName); + //Try to get the Initialize method + Action? initMethod = ManagedLibrary.TryGetMethod>(store, INITIALIZE_METHOD_NAME); - try + if(initMethod != null) { - //Try get onload method and pass the persistant cache instance - Action? onLoad = customTable.TryGetMethod>(LOAD_METHOD_NAME); - onLoad?.Invoke(plugin, store); - } - catch - { - customTable.Dispose(); - throw; + //Itterate all buckets + foreach (IBlobCacheBucket bucket in table) + { + initMethod.Invoke(bucket.Id); + } } - - return new RuntimeBlobCacheTable(customTable); } - private static PersistantCacheManager? GetPersistantStore(PluginBase plugin, IConfigScope config) + private static IPersistantCacheStore? GetPersistantStore(PluginBase plugin, IConfigScope config) { //Get the persistant assembly if (!config.TryGetValue(PERSISTANT_ASM_CONFIF_KEY, out JsonElement asmEl)) @@ -112,130 +110,14 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache } string? asmName = asmEl.GetString(); - if (asmName == null) + if (string.IsNullOrWhiteSpace(asmName)) { return null; } - //Load the dynamic assembly into the alc - AssemblyLoader loader = plugin.LoadAssembly(asmName); - try - { - //Call the OnLoad method - Action? loadMethod = loader.TryGetMethod>(LOAD_METHOD_NAME); - - loadMethod?.Invoke(plugin, config); - } - catch - { - loader.Dispose(); - throw; - } - //Return the - return new(loader); - } - - - private sealed class RuntimeBlobCacheTable : IBlobCacheTable - { - - private readonly IBlobCacheTable _table; - private readonly Action? OnDetatch; - - public RuntimeBlobCacheTable(AssemblyLoader loader) - { - OnDetatch = loader.TryGetMethod(TEARDOWN_METHOD_NAME); - _table = loader.Resource; - } - - public void Dispose() - { - //We can let the loader dispose the cache table, but we can notify of detatch - OnDetatch?.Invoke(); - } - - - /// - [MethodImpl(MethodImplOptions.AggressiveInlining)] - IBlobCacheBucket IBlobCacheTable.GetBucket(ReadOnlySpan objectId) => _table.GetBucket(objectId); - - /// - [MethodImpl(MethodImplOptions.AggressiveInlining)] - public IEnumerator GetEnumerator() => _table.GetEnumerator(); - - /// - [MethodImpl(MethodImplOptions.AggressiveInlining)] - IEnumerator IEnumerable.GetEnumerator() => ((IEnumerable)_table).GetEnumerator(); - } - - internal sealed class PersistantCacheManager : IPersistantCacheStore - { - const string INITIALIZE_METHOD_NAME = "OnInitializeForBucket"; - - - /* - * Our referrence can be technically unloaded, but so will - * this instance, since its loaded into the current ALC, so - * this referrence may exist for the lifetime of this instance. - * - * It also implements IDisposable, which the assembly loader class - * will call when this plugin is unloaded, we dont need to call - * it here, but we can signal a detach. - * - * Since the store implements IDisposable, its likely going to - * check for dispose on each call, so we don't need to add - * and additional disposed check since the method calls must be fast. - */ - - private readonly IPersistantCacheStore store; - - private readonly Action? InitMethod; - private readonly Action? OnServiceDetatch; - - public PersistantCacheManager(AssemblyLoader loader) - { - //Try to get the Initialize method - InitMethod = loader.TryGetMethod>(INITIALIZE_METHOD_NAME); - - //Get the optional detatch method - OnServiceDetatch = loader.TryGetMethod(TEARDOWN_METHOD_NAME); - - store = loader.Resource; - } - - /// - /// Optionally initializes the backing store by publishing the table's bucket - /// id's so it's made aware of the memory cache bucket system. - /// - /// The table containing buckets to publish - public void InitializeSubsystem(IBlobCacheTable table) - { - //Itterate all buckets - foreach (IBlobCacheBucket bucket in table) - { - InitMethod?.Invoke(bucket.Id); - } - } - - void IDisposable.Dispose() - { - //Assembly loader will dispose the type, we can just signal a detach - - OnServiceDetatch?.Invoke(); - } - - [MethodImpl(MethodImplOptions.AggressiveInlining)] - bool IPersistantCacheStore.OnCacheMiss(uint bucketId, string key, IMemoryCacheEntryFactory factory, out CacheEntry entry) - { - return store.OnCacheMiss(bucketId, key, factory, out entry); - } - - [MethodImpl(MethodImplOptions.AggressiveInlining)] - void IPersistantCacheStore.OnEntryDeleted(uint bucketId, string key) => store.OnEntryDeleted(bucketId, key); - - [MethodImpl(MethodImplOptions.AggressiveInlining)] - void IPersistantCacheStore.OnEntryEvicted(uint bucketId, string key, in CacheEntry entry) => store.OnEntryEvicted(bucketId, key, in entry); + return plugin.CreateServiceExternal(asmName); } + } } diff --git a/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs b/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs index dbfd091..a240dde 100644 --- a/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs +++ b/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs @@ -65,7 +65,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering private readonly NodeConfig _nodeConfig; private readonly ICacheStore _cacheStore; private readonly ICachePeerAdapter _peerAdapter; - private readonly FBMClientConfig _replicationClientConfig; + private readonly FBMClientFactory _clientFactory; private readonly bool _isDebug; @@ -79,12 +79,16 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering _peerAdapter = plugin.GetOrCreateSingleton(); //Init fbm config with fixed message size - _replicationClientConfig = FBMDataCacheExtensions.GetDefaultConfig( + FBMClientConfig clientConfig = FBMDataCacheExtensions.GetDefaultConfig( (plugin as ObjectCacheServerEntry)!.ListenerHeap, MAX_MESSAGE_SIZE, debugLog: plugin.IsDebug() ? plugin.Log : null ); + //Init ws fallback factory and client factory + FBMFallbackClientWsFactory wsFactory = new(); + _clientFactory = new(in clientConfig, wsFactory); + _plugin = plugin; _isDebug = plugin.IsDebug(); _log = plugin.Log.CreateScope(LOG_SCOPE_NAME); @@ -149,7 +153,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering _ = newPeer ?? throw new ArgumentNullException(nameof(newPeer)); //Setup client - FBMClient client = new(_replicationClientConfig); + FBMClient client = _clientFactory.CreateClient(); //Add peer to monitor _peerAdapter.OnPeerListenerAttached(newPeer); diff --git a/plugins/VNLib.Data.Caching.Providers.Redis/src/RedisClientCacheEntry.cs b/plugins/VNLib.Data.Caching.Providers.Redis/src/RedisClientCacheEntry.cs index 72e6020..7f2f3ed 100644 --- a/plugins/VNLib.Data.Caching.Providers.Redis/src/RedisClientCacheEntry.cs +++ b/plugins/VNLib.Data.Caching.Providers.Redis/src/RedisClientCacheEntry.cs @@ -69,8 +69,8 @@ namespace VNLib.Data.Caching.Providers.Redis public RedisClientCacheEntry(PluginBase plugin, IConfigScope config) { _defaultHeap = MemoryUtil.Shared; - DefaultDeserializer = new JsonCacheObjectSerializer(); - DefaultSerializer = new JsonCacheObjectSerializer(); + DefaultDeserializer = new JsonCacheObjectSerializer(256); + DefaultSerializer = new JsonCacheObjectSerializer(256); ILogProvider redisLog = plugin.Log.CreateScope("REDIS"); diff --git a/plugins/VNLib.Data.Caching.Providers.VNCache/src/FBMCacheClient.cs b/plugins/VNLib.Data.Caching.Providers.VNCache/src/FBMCacheClient.cs index 5fbebcd..e21cf4a 100644 --- a/plugins/VNLib.Data.Caching.Providers.VNCache/src/FBMCacheClient.cs +++ b/plugins/VNLib.Data.Caching.Providers.VNCache/src/FBMCacheClient.cs @@ -59,13 +59,16 @@ namespace VNLib.Data.Caching.Providers.VNCache private readonly VnCacheClientConfig _config; private readonly IClusterNodeIndex _index; + private readonly FBMClientFactory _clientFactory; + private readonly TimeSpan _initNodeDelay; private bool _isConnected; + private FBMClient? _client; /// - /// The internal client + /// The internal heap used for FBMClients /// - public FBMClient Client { get; } + public IUnmangedHeap BufferHeap { get; } = MemoryUtil.Shared; /// /// Gets a value that determines if the client is currently connected to a server @@ -73,30 +76,30 @@ namespace VNLib.Data.Caching.Providers.VNCache public override bool IsConnected => _isConnected; public FBMCacheClient(PluginBase plugin, IConfigScope config) - : this( - config.Deserialze(), - plugin.IsDebug() ? plugin.Log : null - ) + : this( + config.Deserialze(), + plugin.IsDebug() ? plugin.Log : null + ) { ILogProvider scoped = plugin.Log.CreateScope(LOG_NAME); //Set authenticator and error handler - Client.GetCacheConfiguration() + _clientFactory.GetCacheConfiguration() .WithAuthenticator(new AuthManager(plugin)) .WithErrorHandler(new DiscoveryErrHAndler(scoped)); //Only the master index is schedulable - if(_index is IIntervalScheduleable sch) + if (_index is IIntervalScheduleable sch) { //Schedule discovery interval plugin.ScheduleInterval(sch, _config.DiscoveryInterval); //Run discovery after initial delay if interval is greater than initial delay - if (_config.DiscoveryInterval > InitialDelay) + if (_config.DiscoveryInterval > _initNodeDelay) { //Run a manual initial load - scoped.Information("Running initial discovery in {delay}", InitialDelay); - _ = plugin.ObserveWork(() => sch.OnIntervalAsync(scoped, plugin.UnloadToken), (int)InitialDelay.TotalMilliseconds); + scoped.Information("Running initial discovery in {delay}", _initNodeDelay); + _ = plugin.ObserveWork(() => sch.OnIntervalAsync(scoped, plugin.UnloadToken), (int)_initNodeDelay.TotalMilliseconds); } } } @@ -108,18 +111,22 @@ namespace VNLib.Data.Caching.Providers.VNCache _config = config; + //Set a default node delay if null + _initNodeDelay = _config.InitialNodeDelay.HasValue ? TimeSpan.FromSeconds(_config.InitialNodeDelay.Value) : InitialDelay; + //Init the client with default settings - FBMClientConfig conf = FBMDataCacheExtensions.GetDefaultConfig(MemoryUtil.Shared, (int)config.MaxBlobSize, config.RequestTimeout, debugLog); + FBMClientConfig conf = FBMDataCacheExtensions.GetDefaultConfig(BufferHeap, (int)config.MaxBlobSize, config.RequestTimeout, debugLog); - Client = new(conf); + FBMFallbackClientWsFactory wsFactory = new(); + _clientFactory = new(in conf, wsFactory); //Add the configuration to the client - Client.GetCacheConfiguration() + _clientFactory.GetCacheConfiguration() .WithTls(config.UseTls) .WithInitialPeers(config.GetInitialNodeUris()); //Init index - _index = ClusterNodeIndex.CreateIndex(Client.GetCacheConfiguration()); + _index = ClusterNodeIndex.CreateIndex(_clientFactory.GetCacheConfiguration()); } /* @@ -135,13 +142,17 @@ namespace VNLib.Data.Caching.Providers.VNCache { //Initial delay pluginLog.Debug("Worker started, waiting for startup delay"); - await Task.Delay((int)InitialDelay.TotalMilliseconds + 1000, exitToken); + await Task.Delay(_initNodeDelay, exitToken); CacheNodeAdvertisment? node = null; while (true) { - //Check for master index + /* + * The cache node index is shared across plugin boundries. If the current + * instance is holding the master index, it will be scheduleable, and + * can be manually invoked if no nodes are found + */ if (_index is IIntervalScheduleable sch) { try @@ -204,8 +215,8 @@ namespace VNLib.Data.Caching.Providers.VNCache { pluginLog.Debug("Connecting to {node}", node); - //Connect to the node - await Client.ConnectToCacheAsync(node, exitToken); + //Connect to the node and save new client + _client = await _clientFactory.ConnectToCacheAsync(node, exitToken); if (pluginLog.IsEnabled(LogLevel.Debug)) { @@ -220,7 +231,7 @@ namespace VNLib.Data.Caching.Providers.VNCache _isConnected = true; //Wait for disconnect - await Client.WaitForExitAsync(exitToken); + await _client.WaitForExitAsync(exitToken); pluginLog.Information("Cache server disconnected"); } @@ -248,6 +259,9 @@ namespace VNLib.Data.Caching.Providers.VNCache finally { _isConnected = false; + + //Cleanup client + _client?.Dispose(); } //Loop again @@ -265,11 +279,7 @@ namespace VNLib.Data.Caching.Providers.VNCache { pluginLog.Error(ex, "Unhandled exception occured in background cache client listening task"); } - finally - { - //Dispose the client on exit - Client.Dispose(); - } + pluginLog.Information("Cache client exited"); } @@ -279,43 +289,43 @@ namespace VNLib.Data.Caching.Providers.VNCache { return !IsConnected ? throw new InvalidOperationException("The underlying client is not connected to a cache node") - : Client!.DeleteObjectAsync(key, cancellation); + : _client!.DeleteObjectAsync(key, cancellation); } /// public override Task GetAsync(string key, ICacheObjectDeserializer deserializer, CancellationToken cancellation) { return !IsConnected - ? throw new InvalidOperationException("The underlying client is not connected to a cache node") - : Client!.GetObjectAsync(key, deserializer, cancellation); + ? throw new InvalidOperationException("The underlying client is not connected to a cache node") + : _client!.GetObjectAsync(key, deserializer, cancellation); } /// public override Task AddOrUpdateAsync(string key, string? newKey, T value, ICacheObjectSerializer serialzer, CancellationToken cancellation) { return !IsConnected - ? throw new InvalidOperationException("The underlying client is not connected to a cache node") - : Client!.AddOrUpdateObjectAsync(key, newKey, value, serialzer, cancellation); + ? throw new InvalidOperationException("The underlying client is not connected to a cache node") + : _client!.AddOrUpdateObjectAsync(key, newKey, value, serialzer, cancellation); } /// public override Task GetAsync(string key, ObjectDataSet callback, T state, CancellationToken cancellation) { return !IsConnected - ? throw new InvalidOperationException("The underlying client is not connected to a cache node") - : Client!.GetObjectAsync(key, callback, state, cancellation); + ? throw new InvalidOperationException("The underlying client is not connected to a cache node") + : _client!.GetObjectAsync(key, callback, state, cancellation); } /// public override Task AddOrUpdateAsync(string key, string? newKey, ObjectDataReader callback, T state, CancellationToken cancellation) { return !IsConnected - ? throw new InvalidOperationException("The underlying client is not connected to a cache node") - : Client!.AddOrUpdateObjectAsync(key, newKey, callback, state, cancellation); + ? throw new InvalidOperationException("The underlying client is not connected to a cache node") + : _client!.AddOrUpdateObjectAsync(key, newKey, callback, state, cancellation); } /// - public override object GetUnderlyingStore() => Client; //Client is the underlying "store" + public override object GetUnderlyingStore() => _client ?? throw new InvalidOperationException("The client is not currently connected"); private sealed class AuthManager : ICacheAuthManager { diff --git a/plugins/VNLib.Data.Caching.Providers.VNCache/src/MemoryCache.cs b/plugins/VNLib.Data.Caching.Providers.VNCache/src/MemoryCache.cs index 79348f0..98f6a3d 100644 --- a/plugins/VNLib.Data.Caching.Providers.VNCache/src/MemoryCache.cs +++ b/plugins/VNLib.Data.Caching.Providers.VNCache/src/MemoryCache.cs @@ -26,6 +26,7 @@ using System; using System.Text.Json; using System.Threading; using System.Threading.Tasks; + using VNLib.Utils.Memory; using VNLib.Utils.Logging; using VNLib.Utils.Memory.Diagnostics; diff --git a/plugins/VNLib.Data.Caching.Providers.VNCache/src/RemoteBackedMemoryCache.cs b/plugins/VNLib.Data.Caching.Providers.VNCache/src/RemoteBackedMemoryCache.cs index 2068805..c7952b4 100644 --- a/plugins/VNLib.Data.Caching.Providers.VNCache/src/RemoteBackedMemoryCache.cs +++ b/plugins/VNLib.Data.Caching.Providers.VNCache/src/RemoteBackedMemoryCache.cs @@ -100,7 +100,7 @@ namespace VNLib.Data.Caching.Providers.VNCache _memCache = new BlobCacheTable(memCache.TableSize, memCache.BucketSize, factory, null); //If backing store is a VnCacheClient, steal it's buffer heap - _bufferHeap = backingStore is FBMCacheClient client && client.Client.Config.MemoryManager.TryGetHeap(out IUnmangedHeap? heap) ? heap : MemoryUtil.Shared; + _bufferHeap = backingStore is FBMCacheClient client ? client.BufferHeap : MemoryUtil.Shared; _cacheConfig = memCache; _backing = backingStore; diff --git a/plugins/VNLib.Data.Caching.Providers.VNCache/src/VNCacheBase.cs b/plugins/VNLib.Data.Caching.Providers.VNCache/src/VNCacheBase.cs index fde2e44..f8a9ca6 100644 --- a/plugins/VNLib.Data.Caching.Providers.VNCache/src/VNCacheBase.cs +++ b/plugins/VNLib.Data.Caching.Providers.VNCache/src/VNCacheBase.cs @@ -41,8 +41,8 @@ namespace VNLib.Data.Caching.Providers.VNCache protected VNCacheBase(VNCacheConfig config) { //Set default serializers - DefaultDeserializer = config.CacheObjectDeserializer ?? new JsonCacheObjectSerializer(); - DefaultSerializer = config.CacheObjectSerializer ?? new JsonCacheObjectSerializer(); + DefaultDeserializer = config.CacheObjectDeserializer ?? new JsonCacheObjectSerializer(256); + DefaultSerializer = config.CacheObjectSerializer ?? new JsonCacheObjectSerializer(256); } /// diff --git a/plugins/VNLib.Data.Caching.Providers.VNCache/src/VnCacheClientConfig.cs b/plugins/VNLib.Data.Caching.Providers.VNCache/src/VnCacheClientConfig.cs index 9a21c79..383c979 100644 --- a/plugins/VNLib.Data.Caching.Providers.VNCache/src/VnCacheClientConfig.cs +++ b/plugins/VNLib.Data.Caching.Providers.VNCache/src/VnCacheClientConfig.cs @@ -68,6 +68,12 @@ namespace VNLib.Data.Caching.Providers.VNCache /// internal TimeSpan RequestTimeout => TimeSpan.FromSeconds(RequestTimeoutSeconds!.Value); + /// + /// The time in milliseconds for the initial node delay + /// + [JsonPropertyName("intial_delay_ms")] + public uint? InitialNodeDelay { get; set; } + /// /// The initial peers to connect to /// -- cgit