diff options
author | vnugent <public@vaughnnugent.com> | 2023-01-27 21:13:16 -0500 |
---|---|---|
committer | vnugent <public@vaughnnugent.com> | 2023-01-27 21:13:16 -0500 |
commit | 892bbaaa5c1f62631070cc74820f349c4c80f55d (patch) | |
tree | c04dfa8c5a3ead1522502635d4bc9696102d28dc /plugins/ObjectCacheServer | |
parent | 0ea612dde50e82d722b0654e0e569fd4e7469978 (diff) |
Object cache overhaul and logger updates
Diffstat (limited to 'plugins/ObjectCacheServer')
6 files changed, 378 insertions, 137 deletions
diff --git a/plugins/ObjectCacheServer/src/Endpoints/BrokerHeartBeat.cs b/plugins/ObjectCacheServer/src/Endpoints/BrokerHeartBeat.cs index bd1233e..3930f90 100644 --- a/plugins/ObjectCacheServer/src/Endpoints/BrokerHeartBeat.cs +++ b/plugins/ObjectCacheServer/src/Endpoints/BrokerHeartBeat.cs @@ -30,12 +30,14 @@ using System.Threading; using System.Threading.Tasks; using System.Collections.Generic; +using VNLib.Plugins; +using VNLib.Plugins.Essentials; using VNLib.Hashing.IdentityUtility; using VNLib.Plugins.Essentials.Endpoints; using VNLib.Plugins.Essentials.Extensions; using VNLib.Plugins.Extensions.Loading; -namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints +namespace VNLib.Data.Caching.ObjectCache.Server { internal sealed class BrokerHeartBeat : ResourceEndpointBase { diff --git a/plugins/ObjectCacheServer/src/Endpoints/CacheConfiguration.cs b/plugins/ObjectCacheServer/src/Endpoints/CacheConfiguration.cs new file mode 100644 index 0000000..e9584b6 --- /dev/null +++ b/plugins/ObjectCacheServer/src/Endpoints/CacheConfiguration.cs @@ -0,0 +1,54 @@ +/* +* Copyright (c) 2022 Vaughn Nugent +* +* Library: VNLib +* Package: ObjectCacheServer +* File: ConnectEndpoint.cs +* +* ConnectEndpoint.cs is part of ObjectCacheServer which is part of the larger +* VNLib collection of libraries and utilities. +* +* ObjectCacheServer is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* ObjectCacheServer is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System.Text.Json.Serialization; + +namespace VNLib.Data.Caching.ObjectCache.Server +{ + internal sealed class CacheConfiguration + { + [JsonPropertyName("buffer_recv_max")] + public int MaxRecvBufferSize { get; set; } = 1000 * 1024; + [JsonPropertyName("buffer_recv_min")] + public int MinRecvBufferSize { get; set; } = 8 * 1024; + + + [JsonPropertyName("buffer_header_max")] + public int MaxHeaderBufferSize { get; set; } = 2 * 1024; + [JsonPropertyName("buffer_header_min")] + public int MinHeaderBufferSize { get; set; } = 128; + + + [JsonPropertyName("max_message_size")] + public int MaxMessageSize { get; set; } = 1000 * 1024; + + + [JsonPropertyName("change_queue_max_depth")] + public int MaxEventQueueDepth { get; set; } = 10 * 1000; + + + [JsonPropertyName("max_cache")] + public int MaxCacheEntries { get; set; } = 10000; + } +} diff --git a/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs b/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs index 15cc086..9a1ece0 100644 --- a/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs +++ b/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs @@ -31,34 +31,29 @@ using System.Threading.Channels; using System.Collections.Generic; using System.Collections.Concurrent; -using VNLib.Net.Http; +using VNLib.Plugins; using VNLib.Hashing; +using VNLib.Net.Http; using VNLib.Utils.Async; +using VNLib.Utils.Memory; using VNLib.Utils.Logging; using VNLib.Hashing.IdentityUtility; using VNLib.Net.Messaging.FBM; using VNLib.Net.Messaging.FBM.Client; using VNLib.Net.Messaging.FBM.Server; -using VNLib.Data.Caching.ObjectCache; +using VNLib.Plugins.Essentials; using VNLib.Plugins.Extensions.Loading; using VNLib.Plugins.Essentials.Endpoints; using VNLib.Plugins.Essentials.Extensions; +using System.Text.Json.Serialization; - -namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints +namespace VNLib.Data.Caching.ObjectCache.Server { - internal sealed class ConnectEndpoint : ResourceEndpointBase - { - const int MAX_RECV_BUF_SIZE = 1000 * 1024; - const int MIN_RECV_BUF_SIZE = 8 * 1024; - const int MAX_HEAD_BUF_SIZE = 2048; - const int MIN_MESSAGE_SIZE = 10 * 1024; - const int MAX_MESSAGE_SIZE = 1000 * 1024; - const int MIN_HEAD_BUF_SIZE = 128; - const int MAX_EVENT_QUEUE_SIZE = 10000; - const int MAX_RESPONSE_BUFFER_SIZE = 10 * 1024; - private static readonly TimeSpan AuthTokenExpiration = TimeSpan.FromSeconds(30); + [ConfigurationName("store")] + internal sealed class ConnectEndpoint : ResourceEndpointBase, IDisposable, IAsyncBackgroundWork + { + private static readonly TimeSpan AuthTokenExpiration = TimeSpan.FromSeconds(30); private readonly string AudienceLocalServerId; private readonly ObjectCacheStore Store; @@ -68,8 +63,16 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints private uint _connectedClients; + /// <summary> + /// Gets the number of active connections + /// </summary> public uint ConnectedClients => _connectedClients; + /// <summary> + /// The cache store configuration + /// </summary> + public CacheConfiguration CacheConfig { get; } + //Loosen up protection settings protected override ProtectionSettings EndpointProtectionSettings { get; } = new() { @@ -78,20 +81,89 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints DisableCrossSiteDenied = true }; - public ConnectEndpoint(string path, ObjectCacheStore store, PluginBase pbase) + public ConnectEndpoint(PluginBase plugin, IReadOnlyDictionary<string, JsonElement> config) { - InitPathAndLog(path, pbase.Log); - Store = store;//Load client public key to verify signed messages - Pbase = pbase; + string? path = config["path"].GetString(); + InitPathAndLog(path, plugin.Log); + + Pbase = plugin; + + //Parse cache config or use default + if(config.TryGetValue("cache", out JsonElement confEl)) + { + CacheConfig = confEl.Deserialize<CacheConfiguration>()!; + } + else + { + //Init default config if not fount + CacheConfig = new(); + + Log.Verbose("Loading default cache buffer configuration"); + } + + //Create event queue client lookup table StatefulEventQueue = new(StringComparer.OrdinalIgnoreCase); - //Start the queue worker - _ = pbase.DeferTask(() => ChangeWorkerAsync(pbase.UnloadToken), 10); + //Init the cache store + Store = InitializeCache((ObjectCacheServerEntry)plugin, CacheConfig.MaxCacheEntries); + /* + * Generate a random guid for the current server when created so we + * know client tokens belong to us when singed by the same key + */ AudienceLocalServerId = Guid.NewGuid().ToString("N"); + + //Schedule the queue worker to be run + _ = plugin.ObserveWork(this, 100); + } + + private static ObjectCacheStore InitializeCache(ObjectCacheServerEntry plugin, int maxCache) + { + if(maxCache < 2) + { + throw new ArgumentException("You must configure a 'max_cache' size larger than 1 item"); + } + + //Suggestion + if(maxCache < 200) + { + plugin.Log.Information("Suggestion: You may want a larger cache size, you have less than 200 items in cache"); + } + + //Endpoint only allows for a single reader + return new (maxCache, plugin.Log, plugin.CacheHeap, true); } + /// <summary> + /// Gets the configured cache store + /// </summary> + /// <returns></returns> + public ICacheStore GetCacheStore() => new CacheStore(Store); + + + //Dispose will be called by the host plugin on unload + void IDisposable.Dispose() + { + //Dispose the store on cleanup + Store.Dispose(); + } + + + private async Task<ReadOnlyJsonWebKey> GetClientPubAsync() + { + return await Pbase.TryGetSecretAsync("client_public_key").ToJsonWebKey() ?? throw new KeyNotFoundException("Missing required secret : client_public_key"); + } + private async Task<ReadOnlyJsonWebKey> GetCachePubAsync() + { + return await Pbase.TryGetSecretAsync("cache_public_key").ToJsonWebKey() ?? throw new KeyNotFoundException("Missing required secret : client_public_key"); + } + private async Task<ReadOnlyJsonWebKey> GetCachePrivateKeyAsync() + { + return await Pbase.TryGetSecretAsync("cache_private_key").ToJsonWebKey() ?? throw new KeyNotFoundException("Missing required secret : client_public_key"); + } + + /* * Used as a client negotiation and verification request * @@ -132,7 +204,7 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints { verified = true; } - //May be signed by a cahce server + //May be signed by a cache server else { using ReadOnlyJsonWebKey cacheCert = await GetCachePubAsync(); @@ -163,8 +235,10 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints } Log.Debug("Received negotiation request from node {node}", nodeId); + //Verified, now we can create an auth message with a short expiration using JsonWebToken auth = new(); + //Sign the auth message from the cache certificate's private key using (ReadOnlyJsonWebKey cert = await GetCachePrivateKeyAsync()) { @@ -179,9 +253,9 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints //Specify the server's node id if set .AddClaim("sub", nodeId!) //Add negotiaion args - .AddClaim(FBMClient.REQ_HEAD_BUF_QUERY_ARG, MAX_HEAD_BUF_SIZE) - .AddClaim(FBMClient.REQ_RECV_BUF_QUERY_ARG, MAX_RECV_BUF_SIZE) - .AddClaim(FBMClient.REQ_MAX_MESS_QUERY_ARG, MAX_MESSAGE_SIZE) + .AddClaim(FBMClient.REQ_HEAD_BUF_QUERY_ARG, CacheConfig.MaxHeaderBufferSize) + .AddClaim(FBMClient.REQ_RECV_BUF_QUERY_ARG, CacheConfig.MaxRecvBufferSize) + .AddClaim(FBMClient.REQ_MAX_MESS_QUERY_ARG, CacheConfig.MaxMessageSize) .CommitClaims(); auth.SignFromJwk(cert); @@ -192,27 +266,17 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints return VfReturnType.VirtualSkip; } - private async Task<ReadOnlyJsonWebKey> GetClientPubAsync() - { - return await Pbase.TryGetSecretAsync("client_public_key").ToJsonWebKey() ?? throw new KeyNotFoundException("Missing required secret : client_public_key"); - } - private async Task<ReadOnlyJsonWebKey> GetCachePubAsync() - { - return await Pbase.TryGetSecretAsync("cache_public_key").ToJsonWebKey() ?? throw new KeyNotFoundException("Missing required secret : client_public_key"); - } - private async Task<ReadOnlyJsonWebKey> GetCachePrivateKeyAsync() - { - return await Pbase.TryGetSecretAsync("cache_private_key").ToJsonWebKey() ?? throw new KeyNotFoundException("Missing required secret : client_public_key"); - } - private async Task ChangeWorkerAsync(CancellationToken cancellation) + //Background worker to process event queue items + async Task IAsyncBackgroundWork.DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken) { try { //Listen for changes while (true) { - ChangeEvent ev = await Store.EventQueue.DequeueAsync(cancellation); + ChangeEvent ev = await Store.EventQueue.DequeueAsync(exitToken); + //Add event to queues foreach (AsyncQueue<ChangeEvent> queue in StatefulEventQueue.Values) { @@ -224,10 +288,8 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints } } catch (OperationCanceledException) - { } - catch (Exception ex) { - Log.Error(ex); + //Normal exit } } @@ -238,6 +300,12 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints public int MaxMessageSize { get; init; } public int MaxResponseBufferSize { get; init; } public AsyncQueue<ChangeEvent>? SyncQueue { get; init; } + + public override string ToString() + { + return + $"{nameof(RecvBufferSize)}:{RecvBufferSize}, {nameof(MaxHeaderBufferSize)}: {MaxHeaderBufferSize}, {nameof(MaxMessageSize)}:{MaxMessageSize}, {nameof(MaxResponseBufferSize)}:{MaxResponseBufferSize}"; + } } protected override async ValueTask<VfReturnType> WebsocketRequestedAsync(HttpEntity entity) @@ -246,6 +314,7 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints { //Parse jwt from authorization string? jwtAuth = entity.Server.Headers[HttpRequestHeader.Authorization]; + if (string.IsNullOrWhiteSpace(jwtAuth)) { entity.CloseResponse(HttpStatusCode.Unauthorized); @@ -253,6 +322,7 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints } string? nodeId = null; + //Parse jwt using (JsonWebToken jwt = JsonWebToken.Parse(jwtAuth)) { @@ -301,11 +371,12 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints string maxMessageSizeCmd = entity.QueryArgs[FBMClient.REQ_MAX_MESS_QUERY_ARG]; //Parse recv buffer size - int recvBufSize = int.TryParse(recvBufCmd, out int rbs) ? rbs : MIN_RECV_BUF_SIZE; - int maxHeadBufSize = int.TryParse(maxHeaderCharCmd, out int hbs) ? hbs : MIN_HEAD_BUF_SIZE; - int maxMessageSize = int.TryParse(maxMessageSizeCmd, out int mxs) ? mxs : MIN_MESSAGE_SIZE; + int recvBufSize = int.TryParse(recvBufCmd, out int rbs) ? rbs : CacheConfig.MinRecvBufferSize; + int maxHeadBufSize = int.TryParse(maxHeaderCharCmd, out int hbs) ? hbs : CacheConfig.MinHeaderBufferSize; + int maxMessageSize = int.TryParse(maxMessageSizeCmd, out int mxs) ? mxs : CacheConfig.MaxMessageSize; AsyncQueue<ChangeEvent>? nodeQueue = null; + //The connection may be a caching server node, so get its node-id if (!string.IsNullOrWhiteSpace(nodeId)) { @@ -317,7 +388,7 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints * and change events may be processed on mutliple threads. */ - BoundedChannelOptions queueOptions = new(MAX_EVENT_QUEUE_SIZE) + BoundedChannelOptions queueOptions = new(CacheConfig.MaxEventQueueDepth) { AllowSynchronousContinuations = true, SingleReader = false, @@ -327,21 +398,41 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints }; _ = StatefulEventQueue.TryAdd(nodeId, new(queueOptions)); + //Get the queue nodeQueue = StatefulEventQueue[nodeId]; } - + + /* + * Buffer sizing can get messy as the response/resquest sizes can vary + * and will include headers, this is a drawback of the FBM protocol + * so we need to properly calculate efficient buffer sizes as + * negotiated with the client. + */ + + int maxMessageSizeClamp = Math.Clamp(maxMessageSize, CacheConfig.MinRecvBufferSize, CacheConfig.MaxRecvBufferSize); + //Init new ws state object and clamp the suggested buffer sizes WsUserState state = new() { - RecvBufferSize = Math.Clamp(recvBufSize, MIN_RECV_BUF_SIZE, MAX_RECV_BUF_SIZE), - MaxHeaderBufferSize = Math.Clamp(maxHeadBufSize, MIN_HEAD_BUF_SIZE, MAX_HEAD_BUF_SIZE), - MaxMessageSize = Math.Clamp(maxMessageSize, MIN_MESSAGE_SIZE, MAX_MESSAGE_SIZE), - MaxResponseBufferSize = Math.Min(maxMessageSize, MAX_RESPONSE_BUFFER_SIZE), + RecvBufferSize = Math.Clamp(recvBufSize, CacheConfig.MinRecvBufferSize, CacheConfig.MaxRecvBufferSize), + MaxHeaderBufferSize = Math.Clamp(maxHeadBufSize, CacheConfig.MinHeaderBufferSize, CacheConfig.MaxHeaderBufferSize), + + MaxMessageSize = maxMessageSizeClamp, + + /* + * Response buffer needs to be large enough to store a max message + * as a response along with all response headers + */ + MaxResponseBufferSize = (int)MemoryUtil.NearestPage(maxMessageSizeClamp), + SyncQueue = nodeQueue }; Log.Debug("Client recv buffer suggestion {recv}, header buffer size {head}, response buffer size {r}", recvBufCmd, maxHeaderCharCmd, state.MaxResponseBufferSize); + + //Print state message to console + Log.Verbose("Client buffer state {state}", state); //Accept socket and pass state object entity.AcceptWebSocket(WebsocketAcceptedAsync, state); @@ -370,6 +461,7 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints RecvBufferSize = state.RecvBufferSize, ResponseBufferSize = state.MaxResponseBufferSize, MaxHeaderBufferSize = state.MaxHeaderBufferSize, + HeaderEncoding = Helpers.DefaultEncoding, }; @@ -395,5 +487,31 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints } Log.Debug("Server websocket exited"); } + + + private sealed class CacheStore : ICacheStore + { + private readonly ObjectCacheStore _cache; + + public CacheStore(ObjectCacheStore cache) + { + _cache = cache; + } + + ValueTask ICacheStore.AddOrUpdateBlobAsync<T>(string objectId, string? alternateId, GetBodyDataCallback<T> bodyData, T state, CancellationToken token) + { + return _cache.AddOrUpdateBlobAsync(objectId, alternateId, bodyData, state, token); + } + + void ICacheStore.Clear() + { + throw new NotImplementedException(); + } + + ValueTask<bool> ICacheStore.DeleteItemAsync(string id, CancellationToken token) + { + return _cache.DeleteItemAsync(id, token); + } + } } } diff --git a/plugins/ObjectCacheServer/src/Endpoints/ICacheStore.cs b/plugins/ObjectCacheServer/src/Endpoints/ICacheStore.cs new file mode 100644 index 0000000..3776269 --- /dev/null +++ b/plugins/ObjectCacheServer/src/Endpoints/ICacheStore.cs @@ -0,0 +1,56 @@ +/* +* Copyright (c) 2022 Vaughn Nugent +* +* Library: VNLib +* Package: ObjectCacheServer +* File: ConnectEndpoint.cs +* +* ConnectEndpoint.cs is part of ObjectCacheServer which is part of the larger +* VNLib collection of libraries and utilities. +* +* ObjectCacheServer is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* ObjectCacheServer is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System.Threading; +using System.Threading.Tasks; + +namespace VNLib.Data.Caching.ObjectCache.Server +{ + internal interface ICacheStore + { + /// <summary> + /// Asynchronously adds or updates an object in the store and optionally update's its id + /// </summary> + /// <param name="objectId">The current (or old) id of the object</param> + /// <param name="alternateId">An optional id to update the blob to</param> + /// <param name="bodyData">A callback that returns the data for the blob</param> + /// <param name="state">The state parameter to pass to the data callback</param> + /// <param name="token">A token to cancel the async operation</param> + /// <returns>A value task that represents the async operation</returns> + ValueTask AddOrUpdateBlobAsync<T>(string objectId, string? alternateId, GetBodyDataCallback<T> bodyData, T state, CancellationToken token = default); + + /// <summary> + /// Clears all items from the store + /// </summary> + void Clear(); + + /// <summary> + /// Asynchronously deletes a previously stored item + /// </summary> + /// <param name="id">The id of the object to delete</param> + /// <param name="token">A token to cancel the async lock await</param> + /// <returns>A task that completes when the item has been deleted</returns> + ValueTask<bool> DeleteItemAsync(string id, CancellationToken token = default); + } +} diff --git a/plugins/ObjectCacheServer/src/ObjectCacheServer.csproj b/plugins/ObjectCacheServer/src/ObjectCacheServer.csproj index 672597b..38f5b97 100644 --- a/plugins/ObjectCacheServer/src/ObjectCacheServer.csproj +++ b/plugins/ObjectCacheServer/src/ObjectCacheServer.csproj @@ -2,32 +2,23 @@ <PropertyGroup> <TargetFramework>net6.0</TargetFramework> <Nullable>enable</Nullable> - <Authors>Vaughn Nugent</Authors> + <RootNamespace>VNLib.Data.Caching.ObjectCache.Server</RootNamespace> <Version>1.0.1.1</Version> - <RootNamespace>VNLib.Plugins.Essentials.Sessions.Server</RootNamespace> - <Copyright>Copyright © 2023 Vaughn Nugent</Copyright> <SignAssembly>True</SignAssembly> <AssemblyOriginatorKeyFile>\\vaughnnugent.com\Internal\Folder Redirection\vman\Documents\Programming\Software\StrongNameingKey.snk</AssemblyOriginatorKeyFile> + <EnableDynamicLoading>true</EnableDynamicLoading> + <AnalysisLevel>latest-all</AnalysisLevel> + <ProduceReferenceAssembly>True</ProduceReferenceAssembly> + <GenerateDocumentationFile>False</GenerateDocumentationFile> </PropertyGroup> <!-- Resolve nuget dll files and store them in the output dir --> <PropertyGroup> - <EnableDynamicLoading>true</EnableDynamicLoading> - <GenerateDocumentationFile>False</GenerateDocumentationFile> - <PackageProjectUrl>https://www.vaughnnugent.com/resources</PackageProjectUrl> - <AnalysisLevel>latest-all</AnalysisLevel> - <ProduceReferenceAssembly>True</ProduceReferenceAssembly> - + <Authors>Vaughn Nugent</Authors> + <Copyright>Copyright © 2023 Vaughn Nugent</Copyright> + <PackageProjectUrl>https://www.vaughnnugent.com/resources/software</PackageProjectUrl> </PropertyGroup> <ItemGroup> - <Compile Remove="liveplugin2\**" /> - <Compile Remove="liveplugin\**" /> - <EmbeddedResource Remove="liveplugin2\**" /> - <EmbeddedResource Remove="liveplugin\**" /> - <None Remove="liveplugin2\**" /> - <None Remove="liveplugin\**" /> - </ItemGroup> - <ItemGroup> <PackageReference Include="ErrorProne.NET.CoreAnalyzers" Version="0.1.2"> <PrivateAssets>all</PrivateAssets> <IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets> @@ -42,10 +33,4 @@ <ProjectReference Include="..\..\..\lib\VNLib.Data.Caching.ObjectCache\src\VNLib.Data.Caching.ObjectCache.csproj" /> <ProjectReference Include="..\..\CacheBroker\src\CacheBroker.csproj" /> </ItemGroup> - <ItemGroup> - <None Update="ObjectCacheServer.json"> - <CopyToOutputDirectory>Always</CopyToOutputDirectory> - </None> - </ItemGroup> - </Project> diff --git a/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs b/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs index 7b7b9fb..d6dbd9b 100644 --- a/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs +++ b/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs @@ -34,34 +34,67 @@ using System.Threading.Tasks; using System.Collections.Generic; using System.Security.Cryptography; +using VNLib.Plugins; using VNLib.Utils.Memory; using VNLib.Utils.Logging; -using VNLib.Utils.Extensions; +using VNLib.Utils.Memory.Diagnostics; using VNLib.Hashing; using VNLib.Hashing.IdentityUtility; -using VNLib.Data.Caching; using VNLib.Data.Caching.Extensions; -using VNLib.Data.Caching.ObjectCache; using static VNLib.Data.Caching.Constants; using VNLib.Net.Messaging.FBM; using VNLib.Net.Messaging.FBM.Client; using VNLib.Plugins.Cache.Broker.Endpoints; using VNLib.Plugins.Extensions.Loading; using VNLib.Plugins.Extensions.Loading.Routing; -using VNLib.Plugins.Essentials.Sessions.Server.Endpoints; -namespace VNLib.Plugins.Essentials.Sessions.Server +namespace VNLib.Data.Caching.ObjectCache.Server { public sealed class ObjectCacheServerEntry : PluginBase { public override string PluginName => "ObjectCache.Service"; - private string? BrokerHeartBeatToken; + private readonly Lazy<IUnmangedHeap> _cacheHeap; + private readonly object ServerLock; + private readonly HashSet<ActiveServer> ListeningServers; + private readonly ManualResetEvent BrokerSyncHandle; + + /// <summary> + /// Gets the shared heap for the plugin + /// </summary> + internal IUnmangedHeap CacheHeap => _cacheHeap.Value; - private readonly object ServerLock = new(); - private readonly HashSet<ActiveServer> ListeningServers = new(); + public ObjectCacheServerEntry() + { + //Init heap + _cacheHeap = new Lazy<IUnmangedHeap>(InitializeHeap, LazyThreadSafetyMode.PublicationOnly); + + ServerLock = new(); + ListeningServers = new(); + //Set sync handle + BrokerSyncHandle = new(false); + } + + private IUnmangedHeap InitializeHeap() + { + //Create default heap + IUnmangedHeap _heap = MemoryUtil.InitializeNewHeapForProcess(); + try + { + //If the plugin is in debug mode enable heap tracking + return this.IsDebug() ? new TrackedHeapWrapper(_heap) : _heap; + } + catch + { + _heap.Dispose(); + throw; + } + } + + + private string? BrokerHeartBeatToken; private void RemoveServer(ActiveServer server) { @@ -71,55 +104,45 @@ namespace VNLib.Plugins.Essentials.Sessions.Server } } + private FBMClientConfig ClientConfig; + + protected override void OnLoad() { - //Create default heap - IUnmangedHeap CacheHeap = MemoryUtil.InitializeNewHeapForProcess(); try { IReadOnlyDictionary<string, JsonElement> clusterConf = this.GetConfig("cluster"); - string brokerAddress = clusterConf["broker_address"].GetString() ?? throw new KeyNotFoundException("Missing required key 'broker_address' for config 'cluster'"); - - string swapDir = PluginConfig.GetProperty("swap_dir").GetString() ?? throw new KeyNotFoundException("Missing required key 'swap_dir' for config"); - int cacheSize = PluginConfig.GetProperty("max_cache").GetInt32(); - string connectPath = PluginConfig.GetProperty("connect_path").GetString() ?? throw new KeyNotFoundException("Missing required element 'connect_path' for config 'cluster'"); - //TimeSpan cleanupInterval = PluginConfig.GetProperty("cleanup_interval_sec").GetTimeSpan(TimeParseType.Seconds); - //TimeSpan validFor = PluginConfig.GetProperty("valid_for_sec").GetTimeSpan(TimeParseType.Seconds); - int maxMessageSize = PluginConfig.GetProperty("max_blob_size").GetInt32(); + Uri brokerAddress = new(clusterConf["broker_address"].GetString() ?? throw new KeyNotFoundException("Missing required key 'broker_address' for config 'cluster'")); - //Init dir - DirectoryInfo dir = new(swapDir); - dir.Create(); - //Init cache listener, single threaded reader - ObjectCacheStore CacheListener = new(dir, cacheSize, Log, CacheHeap, true); - + //Init connect endpoint - { - //Init connect endpoint - ConnectEndpoint endpoint = new(connectPath, CacheListener, this); - Route(endpoint); - } - + ConnectEndpoint endpoint = this.Route<ConnectEndpoint>(); + + //Get the cache store from the connection endpoint + ICacheStore store = endpoint.GetCacheStore(); + + //Log max memory usage + Log.Debug("Maxium memory consumption {mx}Mb", ((ulong)endpoint.CacheConfig.MaxCacheEntries * (ulong)endpoint.CacheConfig.MaxMessageSize) / (ulong)(1024 * 1000)); + //Setup broker and regitration { - //init mre to pass the broker heartbeat signal to the registration worker - ManualResetEvent mre = new(false); + //Route the broker endpoint - BrokerHeartBeat brokerEp = new(() => BrokerHeartBeatToken!, mre, new Uri(brokerAddress), this); + BrokerHeartBeat brokerEp = new(() => BrokerHeartBeatToken!, BrokerSyncHandle, brokerAddress, this); Route(brokerEp); //start registration - _ = this.DeferTask(() => RegisterServerAsync(mre), 200); + _ = this.ObserveTask(() => RegisterServerAsync(endpoint.Path), 200); } //Setup cluster worker { //Get pre-configured fbm client config for caching - FBMClientConfig conf = FBMDataCacheExtensions.GetDefaultConfig(CacheHeap, maxMessageSize, this.IsDebug() ? Log : null); + ClientConfig = FBMDataCacheExtensions.GetDefaultConfig(CacheHeap, endpoint.CacheConfig.MaxMessageSize / 2, this.IsDebug() ? Log : null); //Start Client runner - _ = this.DeferTask(() => RunClientAsync(CacheListener, new Uri(brokerAddress), conf), 300); + _ = this.ObserveTask(() => RunClientAsync(store, brokerAddress), 300); } //Load a cache broker to the current server if the config is defined @@ -129,38 +152,32 @@ namespace VNLib.Plugins.Essentials.Sessions.Server this.Route<BrokerRegistrationEndpoint>(); } } - - void Cleanup() - { - CacheHeap.Dispose(); - CacheListener.Dispose(); - } - - //Regsiter cleanup - _ = UnloadToken.RegisterUnobserved(Cleanup); Log.Information("Plugin loaded"); } catch (KeyNotFoundException kne) { - CacheHeap.Dispose(); Log.Error("Missing required configuration variables {m}", kne.Message); } - catch - { - CacheHeap.Dispose(); - throw; - } } protected override void OnUnLoad() { + //dispose heap if initialized + if(_cacheHeap.IsValueCreated) + { + _cacheHeap.Value.Dispose(); + } + + //Dispose mre sync handle + BrokerSyncHandle.Dispose(); + Log.Information("Plugin unloaded"); } #region Registration - private async Task RegisterServerAsync(ManualResetEvent keepaliveWait) + private async Task RegisterServerAsync(string connectPath) { try { @@ -169,9 +186,9 @@ namespace VNLib.Plugins.Essentials.Sessions.Server //Server id is just dns name for now string serverId = Dns.GetHostName(); - int heartBeatDelayMs = clusterConfig["heartbeat_timeout_sec"].GetInt32() * 1000; - string? connectPath = PluginConfig.GetProperty("connect_path").GetString(); + int heartBeatDelayMs = clusterConfig["heartbeat_timeout_sec"].GetInt32() * 1000; + //Get the port of the primary webserver int port; @@ -227,15 +244,17 @@ namespace VNLib.Plugins.Essentials.Sessions.Server while (true) { await Task.Delay(heartBeatDelayMs, UnloadToken); + //Set the timeout to 0 to it will just check the status without blocking - if (!keepaliveWait.WaitOne(0)) + if (!BrokerSyncHandle.WaitOne(0)) { //server miseed a keepalive event, time to break the loop and retry Log.Debug("Broker missed a heartbeat request, attempting to re-register"); break; } + //Reset the msr - keepaliveWait.Reset(); + BrokerSyncHandle.Reset(); } } catch (TaskCanceledException) @@ -275,7 +294,6 @@ namespace VNLib.Plugins.Essentials.Sessions.Server } finally { - keepaliveWait.Dispose(); BrokerHeartBeatToken = null; } Log.Debug("Registration worker exited"); @@ -305,20 +323,25 @@ namespace VNLib.Plugins.Essentials.Sessions.Server /// <param name="serverId">The node-id of the current server</param> /// <param name="clientConf">The configuration to use when initializing synchronization clients</param> /// <returns>A task that resolves when the plugin unloads</returns> - private async Task RunClientAsync(ObjectCacheStore cacheStore, Uri brokerAddress, FBMClientConfig clientConf) + private async Task RunClientAsync(ICacheStore cacheStore, Uri brokerAddress) { TimeSpan noServerDelay = TimeSpan.FromSeconds(10); + + //The node id is just the dns hostname of the current machine string nodeId = Dns.GetHostName(); + ListServerRequest listRequest = new(brokerAddress); try { //Get the broker config element IReadOnlyDictionary<string, JsonElement> clusterConf = this.GetConfig("cluster"); + int serverCheckMs = clusterConf["update_interval_sec"].GetInt32() * 1000; //Setup signing and verification certificates ReadOnlyJsonWebKey cacheSig = await GetCachePrivate(); ReadOnlyJsonWebKey brokerPub = await GetBrokerPublic(); + //Import certificates listRequest.WithVerificationKey(brokerPub) .WithSigningKey(cacheSig); @@ -340,6 +363,7 @@ namespace VNLib.Plugins.Essentials.Sessions.Server //Get server list servers = await FBMDataCacheExtensions.ListServersAsync(listRequest, UnloadToken); + //Servers are loaded, so continue break; } @@ -355,8 +379,10 @@ namespace VNLib.Plugins.Essentials.Sessions.Server { Log.Warn(ex, "Failed to get server list from broker"); } + //Gen random ms delay int randomMsDelay = RandomNumberGenerator.GetInt32(1000, 2000); + //Delay await Task.Delay(randomMsDelay, UnloadToken); } @@ -386,7 +412,7 @@ namespace VNLib.Plugins.Essentials.Sessions.Server ListeningServers.Add(server); //Run listener background task - _ = this.DeferTask(() => RunSyncTaskAsync(server, cacheStore, clientConf, nodeId)); + _ = this.ObserveTask(() => RunSyncTaskAsync(server, cacheStore, nodeId)); } } } @@ -417,10 +443,10 @@ namespace VNLib.Plugins.Essentials.Sessions.Server Log.Debug("Cluster sync worker exited"); } - private async Task RunSyncTaskAsync(ActiveServer server, ObjectCacheStore cacheStore, FBMClientConfig conf, string nodeId) + private async Task RunSyncTaskAsync(ActiveServer server, ICacheStore cacheStore, string nodeId) { //Setup client - FBMClient client = new(conf); + FBMClient client = new(ClientConfig); try { async Task UpdateRecordAsync(string objectId, string newId) @@ -440,7 +466,7 @@ namespace VNLib.Plugins.Essentials.Sessions.Server response.ThrowIfNotSet(); //Check response code - string status = response.Headers.First(static s => s.Key == HeaderCommand.Status).Value.ToString(); + string status = response.Headers.First(static s => s.Header == HeaderCommand.Status).Value.ToString(); if (ResponseCodes.Okay.Equals(status, StringComparison.Ordinal)) { //Update the record |