aboutsummaryrefslogtreecommitdiff
path: root/plugins/ObjectCacheServer/src/ObjectCacheSystemState.cs
diff options
context:
space:
mode:
authorLibravatar vnugent <public@vaughnnugent.com>2024-03-09 19:13:21 -0500
committerLibravatar vnugent <public@vaughnnugent.com>2024-03-09 19:13:21 -0500
commit5d4192880654fd6e00e587814169415b42621327 (patch)
treef35e2e41e346c5067f0195e7b0f7197e9729e940 /plugins/ObjectCacheServer/src/ObjectCacheSystemState.cs
parenta4b3504bb891829074d1efde0433eae010862181 (diff)
chore: #2 Minor fixes and polish before release
Diffstat (limited to 'plugins/ObjectCacheServer/src/ObjectCacheSystemState.cs')
-rw-r--r--plugins/ObjectCacheServer/src/ObjectCacheSystemState.cs126
1 files changed, 114 insertions, 12 deletions
diff --git a/plugins/ObjectCacheServer/src/ObjectCacheSystemState.cs b/plugins/ObjectCacheServer/src/ObjectCacheSystemState.cs
index 6183956..970e832 100644
--- a/plugins/ObjectCacheServer/src/ObjectCacheSystemState.cs
+++ b/plugins/ObjectCacheServer/src/ObjectCacheSystemState.cs
@@ -37,14 +37,19 @@ using VNLib.Plugins.Extensions.Loading;
using VNLib.Data.Caching.Extensions.Clustering;
using VNLib.Data.Caching.ObjectCache.Server.Cache;
using VNLib.Data.Caching.ObjectCache.Server.Clustering;
+using System.Threading.Tasks;
+using System.Threading;
namespace VNLib.Data.Caching.ObjectCache.Server
{
+ /*
+ * The purpose of this class is to manage the state of the entire cache server.
+ * All configuration and state should be creatd and managed by this class. To make it
+ * easier to manage.
+ */
[ConfigurationName("cache")]
internal sealed class ObjectCacheSystemState(PluginBase plugin, IConfigScope config) : IDisposable
{
- const string LISTENER_LOG_SCOPE = "CacheListener";
-
public BlobCacheListener<IPeerEventQueue> Listener { get; private set; } = null!;
public ICacheStore InternalStore { get; private set; } = null!;
@@ -57,7 +62,17 @@ namespace VNLib.Data.Caching.ObjectCache.Server
/// <summary>
/// The plugin-wide, shared node configuration
/// </summary>
- public NodeConfig Configuration { get; } = plugin.GetOrCreateSingleton<NodeConfig>();
+ public ServerClusterConfig ClusterConfig { get; } = plugin.GetOrCreateSingleton<ServerClusterConfig>();
+
+ /// <summary>
+ /// The system wide cache authenticator
+ /// </summary>
+ public CacheAuthKeyStore KeyStore { get; } = new(plugin);
+
+ /// <summary>
+ /// The system cache node configuration
+ /// </summary>
+ public CacheNodeConfiguration NodeConfig { get; private set; }
/// <summary>
/// The peer discovery manager
@@ -76,6 +91,8 @@ namespace VNLib.Data.Caching.ObjectCache.Server
/// </summary>
public PeerEventQueueManager PeerEventQueue { get; private set; }
+ private ICacheMemoryManagerFactory _cacheMemManager;
+
void IDisposable.Dispose()
{
SharedCacheHeap.Dispose();
@@ -104,11 +121,15 @@ namespace VNLib.Data.Caching.ObjectCache.Server
new TrackedHeapWrapper(MemoryUtil.InitializeNewHeapForProcess(), true)
: MemoryUtil.InitializeNewHeapForProcess();
+ //Load node configuration first
+ (NodeConfig = ClusterConfig.BuildNodeConfig())
+ .WithAuthenticator(KeyStore); //Also pass the key store to the node config
+
ConfigurePeerDiscovery();
ConfigureCacheListener();
- PeerEventQueue = new(plugin, Configuration);
+ PeerEventQueue = new(plugin, ClusterConfig);
}
private void ConfigurePeerDiscovery()
@@ -117,15 +138,16 @@ namespace VNLib.Data.Caching.ObjectCache.Server
IConfigScope? config = plugin.TryGetConfig("known_peers");
string[] kownPeers = config?.Deserialze<string[]>() ?? [];
- ILogProvider discLogger = plugin.Log.CreateScope(PeerDiscoveryManager.LOG_SCOPE_NAME);
+ ILogProvider discLogger = plugin.Log.CreateScope(CacheConstants.LogScopes.PeerDiscovery);
- Configuration.Config.WithInitialPeers(kownPeers.Select(static s => new Uri(s)))
+ NodeConfig.WithInitialPeers(kownPeers.Select(static s => new Uri(s)))
.WithErrorHandler(new ErrorHandler(discLogger));
discLogger.Information("Inital peer nodes: {nodes}", kownPeers);
PeerDiscovery = new PeerDiscoveryManager(
- Configuration,
+ NodeConfig,
+ ClusterConfig,
discLogger,
plugin.IsDebug(),
kownPeers.Length > 0
@@ -152,11 +174,13 @@ namespace VNLib.Data.Caching.ObjectCache.Server
manager = plugin.CreateServiceExternal<ICacheMemoryManagerFactory>(MemoryConfiguration.ExternLibPath);
}
+ _cacheMemManager = manager;
+
//Endpoint only allows for a single reader
Listener = new(
plugin.LoadMemoryCacheSystem(config, manager, MemoryConfiguration),
- plugin.GetOrCreateSingleton<CacheListenerPubQueue>(),
- plugin.Log.CreateScope(LISTENER_LOG_SCOPE),
+ new CacheListenerPubQueue(plugin, PeerEventQueue),
+ plugin.Log.CreateScope(CacheConstants.LogScopes.BlobCacheListener),
new SharedHeapFBMMemoryManager(SharedCacheHeap)
);
@@ -189,27 +213,105 @@ Cache Configuration:
);
}
+ public void LogMemoryStats()
+ {
+ if(SharedCacheHeap is TrackedHeapWrapper thw)
+ {
+ const string shStatTemplate =
+@" VNCache shared heap stats:
+ Current: {cur}kB
+ Blocks: {blks}
+ Max size: {max}kB
+";
+ HeapStatistics stats = thw.GetCurrentStats();
+ plugin.Log.Debug(
+ shStatTemplate,
+ stats.AllocatedBytes / 1024,
+ stats.AllocatedBlocks,
+ stats.MaxHeapSize / 1024
+ );
+
+ }
+
+ //Also print logs for the bucket local managers if they are enabled
+ if(_cacheMemManager is BucketLocalManagerFactory blmf)
+ {
+ blmf.LogHeapStats();
+ }
+ }
+
private sealed class ErrorHandler(ILogProvider Logger) : ICacheDiscoveryErrorHandler
{
public void OnDiscoveryError(CacheNodeAdvertisment errorNode, Exception ex)
+ => LogError(ex, errorNode.NodeId, errorNode.ConnectEndpoint);
+
+ public void OnDiscoveryError(Uri errorAddress, Exception ex)
+ => LogError(ex, null, errorAddress);
+
+ private void LogError(Exception ex, string? nodId, Uri? connectAddress)
{
+ //For logging purposes, use the node id if its available, otherwise use the address
+ if(nodId == null && connectAddress != null)
+ {
+ nodId = connectAddress.ToString();
+ }
+
if (ex is HttpRequestException hre)
{
if (hre.InnerException is SocketException se)
{
//transport failed
- Logger.Warn("Failed to connect to server {serv} because {err}", errorNode, se.Message);
+ Logger.Warn("Failed to connect to server {serv} because {err}", nodId, se.Message);
}
else
{
- Logger.Error("Failed to connect to node {n}\n{err}", errorNode, hre);
+ Logger.Error("Failed to connect to node {n}\n{err}", nodId, hre);
}
}
+ if (ex is OperationCanceledException)
+ {
+ Logger.Error("Failed to discover nodes from nodeid {nid}, because the operation was canceled");
+ }
+ else if (ex is TimeoutException)
+ {
+ //Only log exception stack when in debug logging mode
+ Logger.Warn("Failed to discover nodes from nodeid {nid}, because a timeout occured", nodId);
+ }
else
{
- Logger.Error("Failed to discover nodes from nodeid {nid}, with error\n{err}", errorNode, ex);
+ //Only log exception stack when in debug logging mode
+ if (Logger.IsEnabled(LogLevel.Debug))
+ {
+ Logger.Error("Failed to discover nodes from nodeid {nid}, with error\n{err}", nodId, ex);
+ }
+ else
+ {
+ Logger.Error("Failed to discover nodes from nodeid {nid}, with error: {err}", nodId, ex.Message);
+ }
}
}
}
+
+ internal sealed class CacheStore(IBlobCacheTable table) : ICacheStore
+ {
+
+ ///<inheritdoc/>
+ ValueTask ICacheStore.AddOrUpdateBlobAsync<T>(string objectId, string? alternateId, ObjectDataGet<T> bodyData, T state, CancellationToken token)
+ {
+ return table.AddOrUpdateObjectAsync(objectId, alternateId, bodyData, state, default, token);
+ }
+
+ ///<inheritdoc/>
+ void ICacheStore.Clear()
+ {
+ throw new NotImplementedException();
+ }
+
+ ///<inheritdoc/>
+ ValueTask<bool> ICacheStore.DeleteItemAsync(string id, CancellationToken token)
+ {
+ return table.DeleteObjectAsync(id, token);
+ }
+ }
}
}