From 5d4192880654fd6e00e587814169415b42621327 Mon Sep 17 00:00:00 2001 From: vnugent Date: Sat, 9 Mar 2024 19:13:21 -0500 Subject: chore: #2 Minor fixes and polish before release --- .../src/ObjectCacheSystemState.cs | 126 +++++++++++++++++++-- 1 file changed, 114 insertions(+), 12 deletions(-) (limited to 'plugins/ObjectCacheServer/src/ObjectCacheSystemState.cs') diff --git a/plugins/ObjectCacheServer/src/ObjectCacheSystemState.cs b/plugins/ObjectCacheServer/src/ObjectCacheSystemState.cs index 6183956..970e832 100644 --- a/plugins/ObjectCacheServer/src/ObjectCacheSystemState.cs +++ b/plugins/ObjectCacheServer/src/ObjectCacheSystemState.cs @@ -37,14 +37,19 @@ using VNLib.Plugins.Extensions.Loading; using VNLib.Data.Caching.Extensions.Clustering; using VNLib.Data.Caching.ObjectCache.Server.Cache; using VNLib.Data.Caching.ObjectCache.Server.Clustering; +using System.Threading.Tasks; +using System.Threading; namespace VNLib.Data.Caching.ObjectCache.Server { + /* + * The purpose of this class is to manage the state of the entire cache server. + * All configuration and state should be creatd and managed by this class. To make it + * easier to manage. + */ [ConfigurationName("cache")] internal sealed class ObjectCacheSystemState(PluginBase plugin, IConfigScope config) : IDisposable { - const string LISTENER_LOG_SCOPE = "CacheListener"; - public BlobCacheListener Listener { get; private set; } = null!; public ICacheStore InternalStore { get; private set; } = null!; @@ -57,7 +62,17 @@ namespace VNLib.Data.Caching.ObjectCache.Server /// /// The plugin-wide, shared node configuration /// - public NodeConfig Configuration { get; } = plugin.GetOrCreateSingleton(); + public ServerClusterConfig ClusterConfig { get; } = plugin.GetOrCreateSingleton(); + + /// + /// The system wide cache authenticator + /// + public CacheAuthKeyStore KeyStore { get; } = new(plugin); + + /// + /// The system cache node configuration + /// + public CacheNodeConfiguration NodeConfig { get; private set; } /// /// The peer discovery manager @@ -76,6 +91,8 @@ namespace VNLib.Data.Caching.ObjectCache.Server /// public PeerEventQueueManager PeerEventQueue { get; private set; } + private ICacheMemoryManagerFactory _cacheMemManager; + void IDisposable.Dispose() { SharedCacheHeap.Dispose(); @@ -104,11 +121,15 @@ namespace VNLib.Data.Caching.ObjectCache.Server new TrackedHeapWrapper(MemoryUtil.InitializeNewHeapForProcess(), true) : MemoryUtil.InitializeNewHeapForProcess(); + //Load node configuration first + (NodeConfig = ClusterConfig.BuildNodeConfig()) + .WithAuthenticator(KeyStore); //Also pass the key store to the node config + ConfigurePeerDiscovery(); ConfigureCacheListener(); - PeerEventQueue = new(plugin, Configuration); + PeerEventQueue = new(plugin, ClusterConfig); } private void ConfigurePeerDiscovery() @@ -117,15 +138,16 @@ namespace VNLib.Data.Caching.ObjectCache.Server IConfigScope? config = plugin.TryGetConfig("known_peers"); string[] kownPeers = config?.Deserialze() ?? []; - ILogProvider discLogger = plugin.Log.CreateScope(PeerDiscoveryManager.LOG_SCOPE_NAME); + ILogProvider discLogger = plugin.Log.CreateScope(CacheConstants.LogScopes.PeerDiscovery); - Configuration.Config.WithInitialPeers(kownPeers.Select(static s => new Uri(s))) + NodeConfig.WithInitialPeers(kownPeers.Select(static s => new Uri(s))) .WithErrorHandler(new ErrorHandler(discLogger)); discLogger.Information("Inital peer nodes: {nodes}", kownPeers); PeerDiscovery = new PeerDiscoveryManager( - Configuration, + NodeConfig, + ClusterConfig, discLogger, plugin.IsDebug(), kownPeers.Length > 0 @@ -152,11 +174,13 @@ namespace VNLib.Data.Caching.ObjectCache.Server manager = plugin.CreateServiceExternal(MemoryConfiguration.ExternLibPath); } + _cacheMemManager = manager; + //Endpoint only allows for a single reader Listener = new( plugin.LoadMemoryCacheSystem(config, manager, MemoryConfiguration), - plugin.GetOrCreateSingleton(), - plugin.Log.CreateScope(LISTENER_LOG_SCOPE), + new CacheListenerPubQueue(plugin, PeerEventQueue), + plugin.Log.CreateScope(CacheConstants.LogScopes.BlobCacheListener), new SharedHeapFBMMemoryManager(SharedCacheHeap) ); @@ -189,27 +213,105 @@ Cache Configuration: ); } + public void LogMemoryStats() + { + if(SharedCacheHeap is TrackedHeapWrapper thw) + { + const string shStatTemplate = +@" VNCache shared heap stats: + Current: {cur}kB + Blocks: {blks} + Max size: {max}kB +"; + HeapStatistics stats = thw.GetCurrentStats(); + plugin.Log.Debug( + shStatTemplate, + stats.AllocatedBytes / 1024, + stats.AllocatedBlocks, + stats.MaxHeapSize / 1024 + ); + + } + + //Also print logs for the bucket local managers if they are enabled + if(_cacheMemManager is BucketLocalManagerFactory blmf) + { + blmf.LogHeapStats(); + } + } + private sealed class ErrorHandler(ILogProvider Logger) : ICacheDiscoveryErrorHandler { public void OnDiscoveryError(CacheNodeAdvertisment errorNode, Exception ex) + => LogError(ex, errorNode.NodeId, errorNode.ConnectEndpoint); + + public void OnDiscoveryError(Uri errorAddress, Exception ex) + => LogError(ex, null, errorAddress); + + private void LogError(Exception ex, string? nodId, Uri? connectAddress) { + //For logging purposes, use the node id if its available, otherwise use the address + if(nodId == null && connectAddress != null) + { + nodId = connectAddress.ToString(); + } + if (ex is HttpRequestException hre) { if (hre.InnerException is SocketException se) { //transport failed - Logger.Warn("Failed to connect to server {serv} because {err}", errorNode, se.Message); + Logger.Warn("Failed to connect to server {serv} because {err}", nodId, se.Message); } else { - Logger.Error("Failed to connect to node {n}\n{err}", errorNode, hre); + Logger.Error("Failed to connect to node {n}\n{err}", nodId, hre); } } + if (ex is OperationCanceledException) + { + Logger.Error("Failed to discover nodes from nodeid {nid}, because the operation was canceled"); + } + else if (ex is TimeoutException) + { + //Only log exception stack when in debug logging mode + Logger.Warn("Failed to discover nodes from nodeid {nid}, because a timeout occured", nodId); + } else { - Logger.Error("Failed to discover nodes from nodeid {nid}, with error\n{err}", errorNode, ex); + //Only log exception stack when in debug logging mode + if (Logger.IsEnabled(LogLevel.Debug)) + { + Logger.Error("Failed to discover nodes from nodeid {nid}, with error\n{err}", nodId, ex); + } + else + { + Logger.Error("Failed to discover nodes from nodeid {nid}, with error: {err}", nodId, ex.Message); + } } } } + + internal sealed class CacheStore(IBlobCacheTable table) : ICacheStore + { + + /// + ValueTask ICacheStore.AddOrUpdateBlobAsync(string objectId, string? alternateId, ObjectDataGet bodyData, T state, CancellationToken token) + { + return table.AddOrUpdateObjectAsync(objectId, alternateId, bodyData, state, default, token); + } + + /// + void ICacheStore.Clear() + { + throw new NotImplementedException(); + } + + /// + ValueTask ICacheStore.DeleteItemAsync(string id, CancellationToken token) + { + return table.DeleteObjectAsync(id, token); + } + } } } -- cgit