diff options
Diffstat (limited to 'plugins/ObjectCacheServer')
5 files changed, 53 insertions, 38 deletions
diff --git a/plugins/ObjectCacheServer/src/Endpoints/CacheConfiguration.cs b/plugins/ObjectCacheServer/src/Endpoints/CacheConfiguration.cs index e9584b6..f7adeb3 100644 --- a/plugins/ObjectCacheServer/src/Endpoints/CacheConfiguration.cs +++ b/plugins/ObjectCacheServer/src/Endpoints/CacheConfiguration.cs @@ -1,11 +1,11 @@ /* -* Copyright (c) 2022 Vaughn Nugent +* Copyright (c) 2023 Vaughn Nugent * * Library: VNLib * Package: ObjectCacheServer -* File: ConnectEndpoint.cs +* File: CacheConfiguration.cs * -* ConnectEndpoint.cs is part of ObjectCacheServer which is part of the larger +* CacheConfiguration.cs is part of ObjectCacheServer which is part of the larger * VNLib collection of libraries and utilities. * * ObjectCacheServer is free software: you can redistribute it and/or modify @@ -49,6 +49,9 @@ namespace VNLib.Data.Caching.ObjectCache.Server [JsonPropertyName("max_cache")] - public int MaxCacheEntries { get; set; } = 10000; + public uint MaxCacheEntries { get; set; } = 10000; + + [JsonPropertyName("buckets")] + public uint BucketCount { get; set; } = 10; } } diff --git a/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs b/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs index 9a1ece0..1a7331d 100644 --- a/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs +++ b/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs @@ -45,7 +45,6 @@ using VNLib.Plugins.Essentials; using VNLib.Plugins.Extensions.Loading; using VNLib.Plugins.Essentials.Endpoints; using VNLib.Plugins.Essentials.Extensions; -using System.Text.Json.Serialization; namespace VNLib.Data.Caching.ObjectCache.Server { @@ -56,7 +55,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server private static readonly TimeSpan AuthTokenExpiration = TimeSpan.FromSeconds(30); private readonly string AudienceLocalServerId; - private readonly ObjectCacheStore Store; + private readonly BlobCacheLIstener Store; private readonly PluginBase Pbase; private readonly ConcurrentDictionary<string, AsyncQueue<ChangeEvent>> StatefulEventQueue; @@ -81,7 +80,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server DisableCrossSiteDenied = true }; - public ConnectEndpoint(PluginBase plugin, IReadOnlyDictionary<string, JsonElement> config) + public ConnectEndpoint(PluginBase plugin, IConfigScope config) { string? path = config["path"].GetString(); @@ -106,7 +105,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server StatefulEventQueue = new(StringComparer.OrdinalIgnoreCase); //Init the cache store - Store = InitializeCache((ObjectCacheServerEntry)plugin, CacheConfig.MaxCacheEntries); + Store = InitializeCache((ObjectCacheServerEntry)plugin, CacheConfig.BucketCount, CacheConfig.MaxCacheEntries); /* * Generate a random guid for the current server when created so we @@ -118,7 +117,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server _ = plugin.ObserveWork(this, 100); } - private static ObjectCacheStore InitializeCache(ObjectCacheServerEntry plugin, int maxCache) + private static BlobCacheLIstener InitializeCache(ObjectCacheServerEntry plugin, uint buckets, uint maxCache) { if(maxCache < 2) { @@ -131,8 +130,10 @@ namespace VNLib.Data.Caching.ObjectCache.Server plugin.Log.Information("Suggestion: You may want a larger cache size, you have less than 200 items in cache"); } + plugin.Log.Verbose("Creating cache store with {bc} buckets, with {mc} items/bucket", buckets, maxCache); + //Endpoint only allows for a single reader - return new (maxCache, plugin.Log, plugin.CacheHeap, true); + return new (buckets, maxCache, plugin.Log, plugin.CacheHeap, true); } /// <summary> @@ -491,16 +492,16 @@ namespace VNLib.Data.Caching.ObjectCache.Server private sealed class CacheStore : ICacheStore { - private readonly ObjectCacheStore _cache; + private readonly BlobCacheLIstener _cache; - public CacheStore(ObjectCacheStore cache) + public CacheStore(BlobCacheLIstener cache) { _cache = cache; } ValueTask ICacheStore.AddOrUpdateBlobAsync<T>(string objectId, string? alternateId, GetBodyDataCallback<T> bodyData, T state, CancellationToken token) { - return _cache.AddOrUpdateBlobAsync(objectId, alternateId, bodyData, state, token); + return _cache.Cache.AddOrUpdateObjectAsync(objectId, alternateId, bodyData, state, default, token); } void ICacheStore.Clear() @@ -510,7 +511,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server ValueTask<bool> ICacheStore.DeleteItemAsync(string id, CancellationToken token) { - return _cache.DeleteItemAsync(id, token); + return _cache.Cache.DeleteObjectAsync(id, token); } } } diff --git a/plugins/ObjectCacheServer/src/Endpoints/ICacheStore.cs b/plugins/ObjectCacheServer/src/Endpoints/ICacheStore.cs index 3776269..f911af9 100644 --- a/plugins/ObjectCacheServer/src/Endpoints/ICacheStore.cs +++ b/plugins/ObjectCacheServer/src/Endpoints/ICacheStore.cs @@ -1,11 +1,11 @@ /* -* Copyright (c) 2022 Vaughn Nugent +* Copyright (c) 2023 Vaughn Nugent * * Library: VNLib * Package: ObjectCacheServer -* File: ConnectEndpoint.cs +* File: ICacheStore.cs * -* ConnectEndpoint.cs is part of ObjectCacheServer which is part of the larger +* ICacheStore.cs is part of ObjectCacheServer which is part of the larger * VNLib collection of libraries and utilities. * * ObjectCacheServer is free software: you can redistribute it and/or modify diff --git a/plugins/ObjectCacheServer/src/ObjectCacheServer.csproj b/plugins/ObjectCacheServer/src/ObjectCacheServer.csproj index 38f5b97..3beb168 100644 --- a/plugins/ObjectCacheServer/src/ObjectCacheServer.csproj +++ b/plugins/ObjectCacheServer/src/ObjectCacheServer.csproj @@ -1,23 +1,30 @@ <Project Sdk="Microsoft.NET.Sdk"> <PropertyGroup> - <TargetFramework>net6.0</TargetFramework> - <Nullable>enable</Nullable> + <TargetFramework>net6.0</TargetFramework> <RootNamespace>VNLib.Data.Caching.ObjectCache.Server</RootNamespace> - <Version>1.0.1.1</Version> - <SignAssembly>True</SignAssembly> - <AssemblyOriginatorKeyFile>\\vaughnnugent.com\Internal\Folder Redirection\vman\Documents\Programming\Software\StrongNameingKey.snk</AssemblyOriginatorKeyFile> - <EnableDynamicLoading>true</EnableDynamicLoading> + <AssemblyName>ObjectCacheServer</AssemblyName> + <Nullable>enable</Nullable> <AnalysisLevel>latest-all</AnalysisLevel> <ProduceReferenceAssembly>True</ProduceReferenceAssembly> <GenerateDocumentationFile>False</GenerateDocumentationFile> + <!-- Resolve nuget dll files and store them in the output dir --> + <EnableDynamicLoading>true</EnableDynamicLoading> </PropertyGroup> - - <!-- Resolve nuget dll files and store them in the output dir --> + <PropertyGroup> <Authors>Vaughn Nugent</Authors> + <Company>Vaughn Nugent</Company> + <Product>VNLib.Data.Caching.ObjectCache.Server</Product> + <PackageId>VNLib.Data.Caching.ObjectCache.Server</PackageId> + <Description> + An Essentials framework plugin for implementing a distributed in-memory data caching server disoverable in cache brokers. It provides + automatic live replication between cache server nodes. + </Description> <Copyright>Copyright © 2023 Vaughn Nugent</Copyright> - <PackageProjectUrl>https://www.vaughnnugent.com/resources/software</PackageProjectUrl> + <PackageProjectUrl>https://www.vaughnnugent.com/resources/software/modules/VNLib.Data.Caching</PackageProjectUrl> + <RepositoryUrl>https://github.com/VnUgE/VNLib.Data.Caching/tree/master/plugins/ObjectCacheServer</RepositoryUrl> </PropertyGroup> + <ItemGroup> <PackageReference Include="ErrorProne.NET.CoreAnalyzers" Version="0.1.2"> <PrivateAssets>all</PrivateAssets> diff --git a/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs b/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs index d6dbd9b..0171064 100644 --- a/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs +++ b/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2022 Vaughn Nugent +* Copyright (c) 2023 Vaughn Nugent * * Library: VNLib * Package: ObjectCacheServer @@ -111,10 +111,9 @@ namespace VNLib.Data.Caching.ObjectCache.Server { try { - IReadOnlyDictionary<string, JsonElement> clusterConf = this.GetConfig("cluster"); + IConfigScope clusterConf = this.GetConfig("cluster"); - Uri brokerAddress = new(clusterConf["broker_address"].GetString() ?? throw new KeyNotFoundException("Missing required key 'broker_address' for config 'cluster'")); - + Uri brokerAddress = new(clusterConf["broker_address"].GetString() ?? throw new KeyNotFoundException("Missing required key 'broker_address' for config 'cluster'")); //Init connect endpoint ConnectEndpoint endpoint = this.Route<ConnectEndpoint>(); @@ -122,27 +121,30 @@ namespace VNLib.Data.Caching.ObjectCache.Server //Get the cache store from the connection endpoint ICacheStore store = endpoint.GetCacheStore(); + ulong maxByteSize = ((ulong)endpoint.CacheConfig.MaxCacheEntries * (ulong)endpoint.CacheConfig.BucketCount * (ulong)endpoint.CacheConfig.MaxMessageSize); + //Log max memory usage - Log.Debug("Maxium memory consumption {mx}Mb", ((ulong)endpoint.CacheConfig.MaxCacheEntries * (ulong)endpoint.CacheConfig.MaxMessageSize) / (ulong)(1024 * 1000)); + Log.Debug("Maxium memory consumption {mx}Mb", maxByteSize / (ulong)(1024 * 1000)); //Setup broker and regitration { - //Route the broker endpoint BrokerHeartBeat brokerEp = new(() => BrokerHeartBeatToken!, BrokerSyncHandle, brokerAddress, this); Route(brokerEp); //start registration - _ = this.ObserveTask(() => RegisterServerAsync(endpoint.Path), 200); + _ = this.ObserveWork(() => RegisterServerAsync(endpoint.Path), 200); } //Setup cluster worker { + TimeSpan timeout = TimeSpan.FromSeconds(10); + //Get pre-configured fbm client config for caching - ClientConfig = FBMDataCacheExtensions.GetDefaultConfig(CacheHeap, endpoint.CacheConfig.MaxMessageSize / 2, this.IsDebug() ? Log : null); + ClientConfig = FBMDataCacheExtensions.GetDefaultConfig(CacheHeap, endpoint.CacheConfig.MaxMessageSize / 2, timeout, this.IsDebug() ? Log : null); //Start Client runner - _ = this.ObserveTask(() => RunClientAsync(store, brokerAddress), 300); + _ = this.ObserveWork(() => RunClientAsync(store, brokerAddress), 300); } //Load a cache broker to the current server if the config is defined @@ -182,7 +184,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server try { //Get the broker config element - IReadOnlyDictionary<string, JsonElement> clusterConfig = this.GetConfig("cluster"); + IConfigScope clusterConfig = this.GetConfig("cluster"); //Server id is just dns name for now string serverId = Dns.GetHostName(); @@ -226,6 +228,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server { //Gen a random reg token before registering BrokerHeartBeatToken = RandomHash.GetRandomHex(32); + //Assign new hb token request.WithHeartbeatToken(BrokerHeartBeatToken); @@ -334,7 +337,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server try { //Get the broker config element - IReadOnlyDictionary<string, JsonElement> clusterConf = this.GetConfig("cluster"); + IConfigScope clusterConf = this.GetConfig("cluster"); int serverCheckMs = clusterConf["update_interval_sec"].GetInt32() * 1000; @@ -412,7 +415,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server ListeningServers.Add(server); //Run listener background task - _ = this.ObserveTask(() => RunSyncTaskAsync(server, cacheStore, nodeId)); + _ = this.ObserveWork(() => RunSyncTaskAsync(server, cacheStore, nodeId)); } } } @@ -431,6 +434,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server } catch (TaskCanceledException) { + //normal exit/unload } catch (Exception ex) { |