diff options
Diffstat (limited to 'plugins/VNLib.Data.Caching.Providers.VNCache/src')
14 files changed, 2032 insertions, 0 deletions
diff --git a/plugins/VNLib.Data.Caching.Providers.VNCache/src/AddOrUpdateBuffer.cs b/plugins/VNLib.Data.Caching.Providers.VNCache/src/AddOrUpdateBuffer.cs new file mode 100644 index 0000000..cf3cf63 --- /dev/null +++ b/plugins/VNLib.Data.Caching.Providers.VNCache/src/AddOrUpdateBuffer.cs @@ -0,0 +1,96 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Data.Caching.Providers.VNCache +* File: AddOrUpdateBuffer.cs +* +* AddOrUpdateBuffer.cs is part of VNLib.Data.Caching.Providers.VNCache +* which is part of the larger VNLib collection of libraries and utilities. +* +* VNLib.Data.Caching.Providers.VNCache is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Data.Caching.Providers.VNCache is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Buffers; + +using VNLib.Utils; +using VNLib.Utils.Memory; +using VNLib.Utils.Extensions; + +namespace VNLib.Data.Caching.Providers.VNCache +{ + /// <summary> + /// Implements a buffer writer that serves to serialize object data and + /// store the object data for use by the memory cache store, and the + /// remote cache store + /// </summary> + internal sealed class AddOrUpdateBuffer : VnDisposeable, IBufferWriter<byte>, IObjectData + { + private int _count; + private readonly IUnmangedHeap _heap; + private MemoryHandle<byte>? _buffer; + + public AddOrUpdateBuffer(IUnmangedHeap heap) + { + _heap = heap; + } + + ///<inheritdoc/> + public void Advance(int count) + { + //Update count + _count += count; + } + + ///<inheritdoc/> + Memory<byte> IBufferWriter<byte>.GetMemory(int sizeHint = 0) + { + throw new NotImplementedException(); + } + + ///<inheritdoc/> + Span<byte> IBufferWriter<byte>.GetSpan(int sizeHint = 0) + { + //Round to nearest page for new size + nint newSize = MemoryUtil.NearestPage(sizeHint + _count); + + //Alloc buffer it not yet allocated + if (_buffer == null) + { + _buffer = _heap.Alloc<byte>(newSize); + } + else + { + //check for resize if allocated + _buffer.ResizeIfSmaller(newSize); + } + + return _buffer.AsSpan(_count); + } + + void IObjectData.SetData(ReadOnlySpan<byte> data) + { + throw new NotSupportedException(); + } + + ///<inheritdoc/> + public ReadOnlySpan<byte> GetData() => _buffer!.AsSpan(0, _count); + + protected override void Free() + { + _buffer?.Dispose(); + } + } +}
\ No newline at end of file diff --git a/plugins/VNLib.Data.Caching.Providers.VNCache/src/BucketLocalManagerFactory.cs b/plugins/VNLib.Data.Caching.Providers.VNCache/src/BucketLocalManagerFactory.cs new file mode 100644 index 0000000..0f49849 --- /dev/null +++ b/plugins/VNLib.Data.Caching.Providers.VNCache/src/BucketLocalManagerFactory.cs @@ -0,0 +1,155 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: ObjectCacheServer +* File: BucketLocalManagerFactory.cs +* +* BucketLocalManagerFactory.cs is part of ObjectCacheServer which is part of the larger +* VNLib collection of libraries and utilities. +* +* ObjectCacheServer is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* ObjectCacheServer is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Buffers; +using System.Text.Json; +using System.Collections.Generic; +using System.Runtime.CompilerServices; + +using VNLib.Plugins; +using VNLib.Utils; +using VNLib.Utils.Memory; +using VNLib.Utils.Extensions; +using VNLib.Data.Caching.ObjectCache; +using VNLib.Plugins.Extensions.Loading; + +namespace VNLib.Data.Caching.Providers.VNCache +{ + [ConfigurationName("memory_manager", Required = false)] + internal sealed class BucketLocalManagerFactory : VnDisposeable, ICacheMemoryManagerFactory + { + private readonly LinkedList<BucketLocalManager> _managers = new (); + private readonly bool _zeroAll; + + ///<inheritdoc/> + public ICacheEntryMemoryManager CreateForBucket(uint bucketId) + { + //Init a new heap for a individual bucket + IUnmangedHeap localHeap = MemoryUtil.InitializeNewHeapForProcess(); + + BucketLocalManager manager = new (localHeap, bucketId, _zeroAll); + _managers.AddLast(manager); + + return manager; + } + + /// <summary> + /// Creates a new <see cref="BucketLocalManagerFactory"/> with the specified zero all flag + /// that is not managed by a plugin instance + /// </summary> + /// <param name="zeroAll">Forces all allocations to be zeroed before being returned to callers</param> + /// <returns></returns> + public static BucketLocalManagerFactory Create(bool zeroAll) => new(zeroAll); + + private BucketLocalManagerFactory(bool zeroAll) + { + _zeroAll = zeroAll; + } + + public BucketLocalManagerFactory(PluginBase plugin) : this(plugin, null) + { } + + public BucketLocalManagerFactory(PluginBase plugin, IConfigScope? config) + { + if (config != null) + { + //Try to get the zero all flag + if (config.TryGetValue("zero_all", out JsonElement zeroEl)) + { + _zeroAll = zeroEl.GetBoolean(); + } + } + } + + protected override void Free() + { + //Free heaps on exit + foreach (BucketLocalManager manager in _managers) + { + manager.Heap.Dispose(); + } + } + + /* + * Buckets are mutually exclusive, so we can use a single heap for each bucket + * to get a little more performance on memory operations + */ + + private sealed record class BucketLocalManager(IUnmangedHeap Heap, uint BucketId, bool Zero) : ICacheEntryMemoryManager + { + + ///<inheritdoc/> + public object AllocHandle(uint size) => Heap.Alloc<byte>(size, Zero); + + ///<inheritdoc/> + public void FreeHandle(object handle) + { + _ = handle ?? throw new ArgumentNullException(nameof(handle)); + MemoryHandle<byte> _handle = Unsafe.As<object, MemoryHandle<byte>>(ref handle); + + //Free the handle + _handle.Dispose(); + } + + ///<inheritdoc/> + public uint GetHandleSize(object handle) + { + _ = handle ?? throw new ArgumentNullException(nameof(handle)); + MemoryHandle<byte> _handle = Unsafe.As<object, MemoryHandle<byte>>(ref handle); + + return (uint)_handle.Length; + } + + ///<inheritdoc/> + public Span<byte> GetSpan(object handle, uint offset, uint length) + { + _ = handle ?? throw new ArgumentNullException(nameof(handle)); + MemoryHandle<byte> _handle = Unsafe.As<object, MemoryHandle<byte>>(ref handle); + + return _handle.GetOffsetSpan(offset, checked((int)length)); + } + + ///<inheritdoc/> + public MemoryHandle PinHandle(object handle, int offset) + { + _ = handle ?? throw new ArgumentNullException(nameof(handle)); + MemoryHandle<byte> _handle = Unsafe.As<object, MemoryHandle<byte>>(ref handle); + + //Pin the handle + return _handle.Pin(offset); + } + + ///<inheritdoc/> + public void ResizeHandle(object handle, uint newSize) + { + _ = handle ?? throw new ArgumentNullException(nameof(handle)); + MemoryHandle<byte> _handle = Unsafe.As<object, MemoryHandle<byte>>(ref handle); + + //Resize the handle + _handle.ResizeIfSmaller(newSize); + } + } + } +} diff --git a/plugins/VNLib.Data.Caching.Providers.VNCache/src/Clustering/ClusterNodeIndex.cs b/plugins/VNLib.Data.Caching.Providers.VNCache/src/Clustering/ClusterNodeIndex.cs new file mode 100644 index 0000000..a6e264c --- /dev/null +++ b/plugins/VNLib.Data.Caching.Providers.VNCache/src/Clustering/ClusterNodeIndex.cs @@ -0,0 +1,75 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Data.Caching.Providers.VNCache +* File: ClusterNodeIndex.cs +* +* ClusterNodeIndex.cs is part of VNLib.Data.Caching.Providers.VNCache which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Data.Caching.Providers.VNCache is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Data.Caching.Providers.VNCache is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System.Threading; +using System.Threading.Tasks; + +using VNLib.Utils.Logging; +using VNLib.Data.Caching.Extensions; +using VNLib.Data.Caching.Extensions.Clustering; +using VNLib.Plugins.Extensions.Loading.Events; + +namespace VNLib.Data.Caching.Providers.VNCache.Clustering +{ + internal sealed class ClusterNodeIndex : IClusterNodeIndex, IIntervalScheduleable + { + private readonly CacheClientConfiguration _config; + private Task _currentUpdate; + + + public ClusterNodeIndex(CacheClientConfiguration config) + { + _config = config; + _currentUpdate = Task.CompletedTask; + } + + ///<inheritdoc/> + public CacheNodeAdvertisment? GetNextNode() + { + //Get all nodes + CacheNodeAdvertisment[] ads = _config.NodeCollection.GetAllNodes(); + //Just get a random node from the collection for now + return ads.Length > 0 ? ads.SelectRandom() : null; + } + + ///<inheritdoc/> + public Task WaitForDiscoveryAsync(CancellationToken cancellationToken) + { + return _currentUpdate.WaitAsync(cancellationToken); + } + + /// <summary> + /// Runs the discovery process and updates the current update task + /// </summary> + /// <param name="log"></param> + /// <param name="cancellationToken">A token to cancel the operation</param> + /// <returns>A task that completes when the discovery process is complete</returns> + public Task OnIntervalAsync(ILogProvider log, CancellationToken cancellationToken) + { + //Run discovery operation and update the task + _currentUpdate = _config.DiscoverNodesAsync(cancellationToken); + return Task.CompletedTask; + } + } +}
\ No newline at end of file diff --git a/plugins/VNLib.Data.Caching.Providers.VNCache/src/Clustering/IClusterNodeIndex.cs b/plugins/VNLib.Data.Caching.Providers.VNCache/src/Clustering/IClusterNodeIndex.cs new file mode 100644 index 0000000..285f405 --- /dev/null +++ b/plugins/VNLib.Data.Caching.Providers.VNCache/src/Clustering/IClusterNodeIndex.cs @@ -0,0 +1,49 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Data.Caching.Providers.VNCache +* File: IClusterNodeIndex.cs +* +* IClusterNodeIndex.cs is part of VNLib.Data.Caching.Providers.VNCache which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Data.Caching.Providers.VNCache is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Data.Caching.Providers.VNCache is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System.Threading; +using System.Threading.Tasks; + +using VNLib.Data.Caching.Extensions.Clustering; + +namespace VNLib.Data.Caching.Providers.VNCache.Clustering +{ + internal interface IClusterNodeIndex + { + /// <summary> + /// Gets the next available node using the configured balancing policy + /// or null if no nodes are available + /// </summary> + /// <returns>The next available node to connect to if any are available</returns> + CacheNodeAdvertisment? GetNextNode(); + + /// <summary> + /// Waits for the discovery process to complete. This is just incase a + /// connection wants to happen while a long discovery is processing. + /// </summary> + /// <param name="cancellationToken">A token to cancel the operation</param> + /// <returns>A task that resolves when the discovery process completes</returns> + Task WaitForDiscoveryAsync(CancellationToken cancellationToken); + } +}
\ No newline at end of file diff --git a/plugins/VNLib.Data.Caching.Providers.VNCache/src/FBMCacheClient.cs b/plugins/VNLib.Data.Caching.Providers.VNCache/src/FBMCacheClient.cs new file mode 100644 index 0000000..f952bcb --- /dev/null +++ b/plugins/VNLib.Data.Caching.Providers.VNCache/src/FBMCacheClient.cs @@ -0,0 +1,394 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Data.Caching.Providers.VNCache +* File: FBMCacheClient.cs +* +* FBMCacheClient.cs is part of VNLib.Data.Caching.Providers.VNCache which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Data.Caching.Providers.VNCache is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Data.Caching.Providers.VNCache is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Net.Http; +using System.Threading; +using System.Threading.Tasks; +using System.Net.Sockets; +using System.Net.WebSockets; +using System.Collections.Generic; +using System.Security.Cryptography; + +using VNLib.Hashing; +using VNLib.Hashing.IdentityUtility; +using VNLib.Utils.Memory; +using VNLib.Utils.Logging; +using VNLib.Net.Messaging.FBM.Client; +using VNLib.Data.Caching.Extensions; +using VNLib.Data.Caching.Extensions.Clustering; +using VNLib.Plugins; +using VNLib.Plugins.Extensions.Loading; +using VNLib.Plugins.Extensions.Loading.Events; + +using VNLib.Data.Caching.Providers.VNCache.Clustering; + +namespace VNLib.Data.Caching.Providers.VNCache +{ + + /// <summary> + /// A base class that manages + /// </summary> + [ConfigurationName(VNCacheClient.CACHE_CONFIG_KEY)] + internal class FBMCacheClient : IGlobalCacheProvider, IAsyncBackgroundWork + { + private const string LOG_NAME = "CLIENT"; + private static readonly TimeSpan InitialDelay = TimeSpan.FromSeconds(10); + private static readonly TimeSpan NoNodeDelay = TimeSpan.FromSeconds(10); + + private readonly VnCacheClientConfig _config; + private readonly ClusterNodeIndex _index; + + /// <summary> + /// The internal client + /// </summary> + public FBMClient Client { get; } + + /// <summary> + /// Gets a value that determines if the client is currently connected to a server + /// </summary> + public bool IsConnected { get; private set; } + + public FBMCacheClient(PluginBase plugin, IConfigScope config) + : this( + config.Deserialze<VnCacheClientConfig>(), + plugin.IsDebug() ? plugin.Log : null + ) + { + ILogProvider scoped = plugin.Log.CreateScope(LOG_NAME); + + //Set authenticator and error handler + Client.GetCacheConfiguration() + .WithAuthenticator(new AuthManager(plugin)) + .WithErrorHandler(new DiscoveryErrHAndler(scoped)); + + //Schedule discovery interval + plugin.ScheduleInterval(_index, _config.DiscoveryInterval); + + //Run discovery after initial delay if interval is greater than initial delay + if (_config.DiscoveryInterval > InitialDelay) + { + //Run a manual initial load + scoped.Information("Running initial discovery in {delay}", InitialDelay); + _ = plugin.ObserveWork(() => _index.OnIntervalAsync(scoped, plugin.UnloadToken), (int)InitialDelay.TotalMilliseconds); + } + } + + public FBMCacheClient(VnCacheClientConfig config, ILogProvider? debugLog) + { + //Validate config + (config as IOnConfigValidation).Validate(); + + _config = config; + + //Init the client with default settings + FBMClientConfig conf = FBMDataCacheExtensions.GetDefaultConfig(MemoryUtil.Shared, config.MaxMessageSize!.Value, config.RequestTimeout, debugLog); + + Client = new(conf); + + //Add the configuration to the client + Client.GetCacheConfiguration() + .WithTls(config.UseTls) + .WithInitialPeers(config.GetInitialNodeUris()); + + //Init index + _index = new ClusterNodeIndex(Client.GetCacheConfiguration()); + } + + /* + * Background work method manages the remote cache connection + * to the cache cluster + */ + public virtual async Task DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken) + { + //Scope log + pluginLog = pluginLog.CreateScope(LOG_NAME); + + try + { + //Initial delay + pluginLog.Debug("Worker started, waiting for startup delay"); + await Task.Delay((int)InitialDelay.TotalMilliseconds + 1000, exitToken); + + while (true) + { + try + { + //Wait for a discovery to complete + await _index.WaitForDiscoveryAsync(exitToken); + } + catch (CacheDiscoveryFailureException cdfe) + { + pluginLog.Error("Failed to discover nodes, will try again\n{err}", cdfe.Message); + //Continue + } + + //Get the next node to connect to + CacheNodeAdvertisment? node = _index.GetNextNode(); + + if (node is null) + { + pluginLog.Warn("No nodes available to connect to, trying again in {delay}", NoNodeDelay); + await Task.Delay(NoNodeDelay, exitToken); + + //Run another manual discovery if the interval is greater than the delay + if (_config.DiscoveryInterval > NoNodeDelay) + { + pluginLog.Debug("Forcing a manual discovery"); + + //We dont need to await this because it is awaited at the top of the loop + _ = _index.OnIntervalAsync(pluginLog, exitToken); + } + + continue; + } + + //Ready to connect + + try + { + pluginLog.Debug("Connecting to {node}", node); + + //Connect to the node + await Client.ConnectToCacheAsync(node, exitToken); + + if (pluginLog.IsEnabled(LogLevel.Debug)) + { + pluginLog.Debug("Connected server: {s}", node); + } + else + { + pluginLog.Information("Successfully connected to cache node"); + } + + //Set connection status flag + IsConnected = true; + + //Wait for disconnect + await Client.WaitForExitAsync(exitToken); + + pluginLog.Information("Cache server disconnected"); + } + catch (TimeoutException) + { + pluginLog.Warn("Failed to establish a websocket connection to cache server within the timeout period"); + } + catch (WebSocketException wse) + { + pluginLog.Warn("Failed to establish a websocket connection to cache server {reason}", wse.Message); + pluginLog.Verbose("Stack trace: {re}", wse); + } + //SEs may be raised when the server is not available + catch (HttpRequestException he) when (he.InnerException is SocketException) + { + pluginLog.Debug("Failed to connect to random cache server because a TCP connection could not be established"); + pluginLog.Verbose("Stack trace: {re}", he.InnerException); + } + catch (HttpRequestException he) when (he.StatusCode.HasValue) + { + pluginLog.Warn("Failed to negotiate with cache server {reason}", he.Message); + pluginLog.Verbose("Stack trace: {re}", he); + await Task.Delay(1000, exitToken); + } + finally + { + IsConnected = false; + } + + //Loop again + } + } + catch (OperationCanceledException) + { + //Normal exit from listening loop + } + catch (FBMServerNegiationException fne) + { + pluginLog.Error("Failed to negotiate connection with cache server. Please check your configuration\n {reason}", fne.Message); + } + catch (Exception ex) + { + pluginLog.Error(ex, "Unhandled exception occured in background cache client listening task"); + } + finally + { + //Dispose the client on exit + Client.Dispose(); + } + pluginLog.Information("Cache client exited"); + } + + + ///<inheritdoc/> + public virtual Task AddOrUpdateAsync<T>(string key, string? newKey, T value, CancellationToken cancellation) + { + return !IsConnected + ? throw new InvalidOperationException("The underlying client is not connected to a cache node") + : Client!.AddOrUpdateObjectAsync(key, newKey, value, cancellation); + } + + ///<inheritdoc/> + public virtual Task<bool> DeleteAsync(string key, CancellationToken cancellation) + { + return !IsConnected + ? throw new InvalidOperationException("The underlying client is not connected to a cache node") + : Client!.DeleteObjectAsync(key, cancellation); + } + + ///<inheritdoc/> + public virtual Task<T?> GetAsync<T>(string key, CancellationToken cancellation) + { + return !IsConnected + ? throw new InvalidOperationException("The underlying client is not connected to a cache node") + : Client!.GetObjectAsync<T>(key, cancellation); + } + + ///<inheritdoc/> + public virtual Task<T?> GetAsync<T>(string key, ICacheObjectDeserializer deserializer, CancellationToken cancellation) + { + return !IsConnected + ? throw new InvalidOperationException("The underlying client is not connected to a cache node") + : Client!.GetObjectAsync<T>(key, deserializer, cancellation); + } + + ///<inheritdoc/> + public virtual Task AddOrUpdateAsync<T>(string key, string? newKey, T value, ICacheObjectSerializer serialzer, CancellationToken cancellation) + { + return !IsConnected + ? throw new InvalidOperationException("The underlying client is not connected to a cache node") + : Client!.AddOrUpdateObjectAsync(key, newKey, value, serialzer, cancellation); + } + + ///<inheritdoc/> + public virtual Task GetAsync<T>(string key, ObjectDataSet<T> callback, T state, CancellationToken cancellation) + { + return !IsConnected + ? throw new InvalidOperationException("The underlying client is not connected to a cache node") + : Client!.GetObjectAsync(key, callback, state, cancellation); + } + + ///<inheritdoc/> + public virtual Task AddOrUpdateAsync<T>(string key, string? newKey, ObjectDataReader<T> callback, T state, CancellationToken cancellation) + { + return !IsConnected + ? throw new InvalidOperationException("The underlying client is not connected to a cache node") + : Client!.AddOrUpdateObjectAsync(key, newKey, callback, state, cancellation); + } + + ///<inheritdoc/> + public object GetUnderlyingStore() => Client; //Client is the underlying "store" + + private sealed class AuthManager : ICacheAuthManager + { + + private IAsyncLazy<ReadOnlyJsonWebKey> _sigKey; + private IAsyncLazy<ReadOnlyJsonWebKey> _verKey; + + public AuthManager(PluginBase plugin) + { + //Lazy load keys + + //Get the signing key + _sigKey = plugin.GetSecretAsync("client_private_key").ToLazy(static r => r.GetJsonWebKey()); + + //Lazy load cache public key + _verKey = plugin.GetSecretAsync("cache_public_key").ToLazy(static r => r.GetJsonWebKey()); + } + + public async Task AwaitLazyKeyLoad() + { + await _sigKey; + await _verKey; + } + + ///<inheritdoc/> + public IReadOnlyDictionary<string, string?> GetJwtHeader() + { + //Get the signing key jwt header + return _sigKey.Value.JwtHeader; + } + + ///<inheritdoc/> + public void SignJwt(JsonWebToken jwt) + { + //Sign the jwt with signing key + jwt.SignFromJwk(_sigKey.Value); + } + + ///<inheritdoc/> + public byte[] SignMessageHash(byte[] hash, HashAlg alg) + { + //try to get the rsa alg for the signing key + using RSA? rsa = _sigKey.Value.GetRSAPrivateKey(); + if (rsa != null) + { + return rsa.SignHash(hash, alg.GetAlgName(), RSASignaturePadding.Pkcs1); + } + + //try to get the ecdsa alg for the signing key + using ECDsa? ecdsa = _sigKey.Value.GetECDsaPrivateKey(); + if (ecdsa != null) + { + return ecdsa.SignHash(hash); + } + + throw new NotSupportedException("The signing key is not a valid RSA or ECDSA key"); + } + + ///<inheritdoc/> + public bool VerifyJwt(JsonWebToken jwt, bool isPeer) + { + return jwt.VerifyFromJwk(_verKey.Value); + } + + ///<inheritdoc/> + public bool VerifyMessageHash(ReadOnlySpan<byte> hash, HashAlg alg, ReadOnlySpan<byte> signature, bool isPeer) + { + //try to get the rsa alg for the signing key + using RSA? rsa = _verKey.Value.GetRSAPublicKey(); + if (rsa != null) + { + return rsa.VerifyHash(hash, signature, alg.GetAlgName(), RSASignaturePadding.Pkcs1); + } + + //try to get the ecdsa alg for the signing key + using ECDsa? ecdsa = _verKey.Value.GetECDsaPublicKey(); + if (ecdsa != null) + { + return ecdsa.VerifyHash(hash, signature); + } + + throw new NotSupportedException("The current key is not an RSA or ECDSA key and is not supported"); + } + } + + private sealed record class DiscoveryErrHAndler(ILogProvider Logger) : ICacheDiscoveryErrorHandler + { + public void OnDiscoveryError(CacheNodeAdvertisment errorNode, Exception ex) + { + Logger.Error("Failed to discover nodes from server {s} cause:\n{err}", errorNode?.NodeId, ex); + } + } + } +}
\ No newline at end of file diff --git a/plugins/VNLib.Data.Caching.Providers.VNCache/src/ICacheRefreshPolicy.cs b/plugins/VNLib.Data.Caching.Providers.VNCache/src/ICacheRefreshPolicy.cs new file mode 100644 index 0000000..a37465d --- /dev/null +++ b/plugins/VNLib.Data.Caching.Providers.VNCache/src/ICacheRefreshPolicy.cs @@ -0,0 +1,35 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Data.Caching.Providers.VNCache +* File: ICacheRefreshPolicy.cs +* +* ICacheRefreshPolicy.cs is part of VNLib.Data.Caching.Providers.VNCache which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Data.Caching.Providers.VNCache is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Data.Caching.Providers.VNCache is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; + +namespace VNLib.Data.Caching.Providers.VNCache +{ + public interface ICacheRefreshPolicy + { + TimeSpan MaxCacheAge { get; } + + TimeSpan RefreshInterval { get; } + } +}
\ No newline at end of file diff --git a/plugins/VNLib.Data.Caching.Providers.VNCache/src/MemoryCache.cs b/plugins/VNLib.Data.Caching.Providers.VNCache/src/MemoryCache.cs new file mode 100644 index 0000000..7d03918 --- /dev/null +++ b/plugins/VNLib.Data.Caching.Providers.VNCache/src/MemoryCache.cs @@ -0,0 +1,228 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Data.Caching.Providers.VNCache +* File: MemoryCache.cs +* +* MemoryCache.cs is part of VNLib.Data.Caching.Providers.VNCache +* which is part of the larger VNLib collection of libraries and utilities. +* +* VNLib.Data.Caching.Providers.VNCache is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Data.Caching.Providers.VNCache is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Text.Json; +using System.Threading; +using System.Threading.Tasks; + +using VNLib.Utils; +using VNLib.Utils.Memory; +using VNLib.Utils.Logging; +using VNLib.Utils.Memory.Diagnostics; +using VNLib.Data.Caching; +using VNLib.Data.Caching.ObjectCache; +using VNLib.Plugins; +using VNLib.Plugins.Extensions.Loading; + +namespace VNLib.Data.Caching.Providers.VNCache +{ + [ConfigurationName(VNCacheClient.CACHE_CONFIG_KEY)] + internal sealed class MemoryCache : VnDisposeable, IGlobalCacheProvider + { + const int MB_DIVISOR = 1000 * 1024; + + const string DEBUG_TEMPLATE =@"Configuring Memory-Only Cache + | ----------------------------- + | Configuration: + | Table Size: {ts} + | Bucket Size: {bs} + | Max Objects: {obj} + | Max Memory Estimations: + | 4K blocks: {4k}Mb + | 8K blocks: {8k}Mb + | 16K blocks: {16K}Mb + | ----------------------------- +"; + + private readonly ICacheObjectSerializer _serialzer; + private readonly ICacheObjectDeserializer _deserialzer; + private readonly IBlobCacheTable _memCache; + private readonly IUnmangedHeap _bufferHeap; + private readonly BucketLocalManagerFactory? _blobCacheMemManager; + + public MemoryCache(PluginBase pbase, IConfigScope config) + : this( + config[VNCacheClient.MEMORY_CACHE_CONFIG_KEY].Deserialize<MemoryCacheConfig>()!, + pbase.IsDebug(), + pbase.Log, + pbase.GetOrCreateSingleton<BucketLocalManagerFactory>() + ) + { } + + public MemoryCache(MemoryCacheConfig config) : this(config, false, null, null) + { } + + private MemoryCache(MemoryCacheConfig config, bool isDebug, ILogProvider? log, BucketLocalManagerFactory? factory) + { + //Validate config + config.Validate(); + + if (isDebug) + { + //Use the debug heap + IUnmangedHeap newHeap = MemoryUtil.InitializeNewHeapForProcess(); + + //Wrap in diag heap + _bufferHeap = new TrackedHeapWrapper(newHeap, true); + } + else + { + //Init new "private" heap to alloc buffer from + _bufferHeap = MemoryUtil.InitializeNewHeapForProcess(); + } + + //Fallback to creating a local/single instance of the manager + factory ??= _blobCacheMemManager = BucketLocalManagerFactory.Create(config.ZeroAllAllocations); + + //Setup cache table + _memCache = new BlobCacheTable(config.TableSize, config.BucketSize, factory, null); + + /* + * Default to json serialization by using the default + * serializer and JSON options + */ + + JsonCacheObjectSerializer defaultSerializer = new(); + _serialzer = defaultSerializer; + _deserialzer = defaultSerializer; + + PrintDebug(log, config); + } + + private static void PrintDebug(ILogProvider? log, MemoryCacheConfig config) + { + long maxObjects = config.BucketSize * config.TableSize; + + long size4kMb = maxObjects * 4096/MB_DIVISOR; + long size8kMb = maxObjects * 8128/MB_DIVISOR; + long size16kMb = maxObjects * 16384/MB_DIVISOR; + + log?.Debug(DEBUG_TEMPLATE, config.TableSize, config.BucketSize, maxObjects, size4kMb, size8kMb, size16kMb); + } + + ///<inheritdoc/> + public bool IsConnected { get; } = true; + + protected override void Free() + { + _memCache.Dispose(); + _bufferHeap.Dispose(); + _blobCacheMemManager?.Dispose(); + } + + ///<inheritdoc/> + public Task AddOrUpdateAsync<T>(string key, string? newKey, T value, CancellationToken cancellation) => AddOrUpdateAsync(key, newKey, value, _serialzer, cancellation); + + ///<inheritdoc/> + public async Task AddOrUpdateAsync<T>(string key, string? newKey, T value, ICacheObjectSerializer serialzer, CancellationToken cancellation) + { + Check(); + + //Alloc serialzation buffer + using AddOrUpdateBuffer buffer = new (_bufferHeap); + + //Serialze the value + serialzer.Serialize(value, buffer); + + //Update object data + await _memCache.AddOrUpdateObjectAsync(key, newKey, static b => b.GetData(), buffer, default, cancellation); + } + + ///<inheritdoc/> + public Task<bool> DeleteAsync(string key, CancellationToken cancellation) + { + Check(); + return _memCache.DeleteObjectAsync(key, cancellation).AsTask(); + } + + ///<inheritdoc/> + public Task<T?> GetAsync<T>(string key, CancellationToken cancellation) => GetAsync<T>(key, _deserialzer, cancellation); + + ///<inheritdoc/> + public async Task<T?> GetAsync<T>(string key, ICacheObjectDeserializer deserializer, CancellationToken cancellation) + { + Check(); + + IBlobCacheBucket bucket = _memCache.GetBucket(key); + + //Obtain lock + IBlobCache cache = await bucket.ManualWaitAsync(cancellation); + + try + { + //Try to read the value + if (cache.TryGetValue(key, out CacheEntry entry)) + { + return deserializer.Deserialize<T>(entry.GetDataSegment()); + } + + return default; + } + finally + { + bucket.Release(); + } + } + + ///<inheritdoc/> + public async Task GetAsync<T>(string key, ObjectDataSet<T> callback, T state, CancellationToken cancellation) + { + Check(); + + //Get the bucket from the desired key + IBlobCacheBucket bucket = _memCache.GetBucket(key); + + //Obtain lock + IBlobCache cache = await bucket.ManualWaitAsync(cancellation); + + try + { + //Try to read the value + if (cache.TryGetValue(key, out CacheEntry entry)) + { + //Set result data + callback(state, entry.GetDataSegment()); + } + } + finally + { + bucket.Release(); + } + } + + ///<inheritdoc/> + public Task AddOrUpdateAsync<T>(string key, string? newKey, ObjectDataReader<T> callback, T state, CancellationToken cancellation) + { + Check(); + + //Update object data + return _memCache.AddOrUpdateObjectAsync(key, newKey, callback, state, default, cancellation).AsTask(); + } + + ///<inheritdoc/> + public object GetUnderlyingStore() => _memCache; + + } +}
\ No newline at end of file diff --git a/plugins/VNLib.Data.Caching.Providers.VNCache/src/MemoryCacheConfig.cs b/plugins/VNLib.Data.Caching.Providers.VNCache/src/MemoryCacheConfig.cs new file mode 100644 index 0000000..176333f --- /dev/null +++ b/plugins/VNLib.Data.Caching.Providers.VNCache/src/MemoryCacheConfig.cs @@ -0,0 +1,109 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Data.Caching.Providers.VNCache +* File: MemoryCacheConfig.cs +* +* MemoryCacheConfig.cs is part of VNLib.Data.Caching.Providers.VNCache +* which is part of the larger VNLib collection of libraries and utilities. +* +* VNLib.Data.Caching.Providers.VNCache is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Data.Caching.Providers.VNCache is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Text.Json.Serialization; + +using VNLib.Plugins.Extensions.Loading; + +namespace VNLib.Data.Caching.Providers.VNCache +{ + /// <summary> + /// Memorycache configuration object. Json-(de)serializable + /// </summary> + public sealed class MemoryCacheConfig : ICacheRefreshPolicy, IOnConfigValidation + { + /// <summary> + /// The number of buckets within the cache table + /// </summary> + [JsonPropertyName("buckets")] + public uint TableSize { get; set; } = 10; + + /// <summary> + /// The number of cache entries within each bucket + /// </summary> + [JsonPropertyName("bucket_size")] + public uint BucketSize { get; set; } = 5000; + + /// <summary> + /// The maxium size (in bytes) of each cache entry within any bucket + /// </summary> + [JsonPropertyName("max_object_size")] + public uint MaxBlobSize { get; set; } = 16 * 1024; + + [JsonIgnore] + public TimeSpan MaxCacheAge { get; set; } = TimeSpan.FromMinutes(1); + + /// <summary> + /// When refresh intervals are configured, The maxium cache entry age in seconds. + /// </summary> + [JsonPropertyName("max_age_sec")] + public uint MaxAgeSeconds + { + get => (uint)MaxCacheAge.TotalSeconds; + set => MaxCacheAge = TimeSpan.FromSeconds(value); + } + /* + * Default disable cache + */ + [JsonIgnore] + public TimeSpan RefreshInterval { get; set; } = TimeSpan.Zero; + + /// <summary> + /// The time (in seconds) a cache entry refresh interval will occur + /// if scheduled on a plugin + /// </summary> + [JsonPropertyName("refresh_interval_sec")] + public uint RefreshIntervalSeconds + { + get => (uint)RefreshInterval.TotalSeconds; + set => RefreshInterval = TimeSpan.FromSeconds(value); + } + + /// <summary> + /// Zeros all cache entry memory allocations before they are used + /// </summary> + [JsonPropertyName("zero_all")] + public bool ZeroAllAllocations { get; set; } + + ///<inheritdoc/> + public void Validate() + { + if (TableSize == 0) + { + throw new ArgumentException("You must specify a cache bucket table size", "buckets"); + } + + if (BucketSize == 0) + { + throw new ArgumentException("You must specify the maxium number of entires allowed in each bucket ", "bucket_size"); + } + + if (MaxBlobSize < 16) + { + throw new ArgumentException("You must configure a maximum object size", "max_object_size"); + } + } + } +}
\ No newline at end of file diff --git a/plugins/VNLib.Data.Caching.Providers.VNCache/src/MemoryCacheOperator.cs b/plugins/VNLib.Data.Caching.Providers.VNCache/src/MemoryCacheOperator.cs new file mode 100644 index 0000000..739ab71 --- /dev/null +++ b/plugins/VNLib.Data.Caching.Providers.VNCache/src/MemoryCacheOperator.cs @@ -0,0 +1,54 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Data.Caching.Providers.VNCache +* File: MemoryCacheOperator.cs +* +* MemoryCacheOperator.cs is part of VNLib.Data.Caching.Providers.VNCache which is +* part of the larger VNLib collection of libraries and utilities. +* +* VNLib.Data.Caching.Providers.VNCache is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Data.Caching.Providers.VNCache is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using VNLib.Utils; +using VNLib.Plugins; + +namespace VNLib.Data.Caching.Providers.VNCache +{ + /// <summary> + /// A disposable memory cache operator handle. When cache use is complete, you should + /// dispose this handle. You may want to schedule it for cleanup on a <see cref="PluginBase"/> + /// </summary> + public sealed class MemoryCacheOperator : VnDisposeable + { + private readonly MemoryCache _cache; + + internal MemoryCacheOperator(MemoryCache cache) + { + _cache = cache; + } + + /// <summary> + /// The configured global cache instance + /// </summary> + public IGlobalCacheProvider Cache => _cache; + + ///<inheritdoc/> + protected override void Free() + { + _cache.Dispose(); + } + } +}
\ No newline at end of file diff --git a/plugins/VNLib.Data.Caching.Providers.VNCache/src/RemoteBackedMemoryCache.cs b/plugins/VNLib.Data.Caching.Providers.VNCache/src/RemoteBackedMemoryCache.cs new file mode 100644 index 0000000..c14ddb9 --- /dev/null +++ b/plugins/VNLib.Data.Caching.Providers.VNCache/src/RemoteBackedMemoryCache.cs @@ -0,0 +1,350 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Data.Caching.Providers.VNCache +* File: RemoteBackedMemoryCache.cs +* +* RemoteBackedMemoryCache.cs is part of VNLib.Data.Caching.Providers.VNCache +* which is part of the larger VNLib collection of libraries and utilities. +* +* VNLib.Data.Caching.Providers.VNCache is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Data.Caching.Providers.VNCache is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Linq; +using System.Buffers; +using System.Text.Json; +using System.Threading; +using System.Threading.Tasks; +using System.Runtime.CompilerServices; + +using VNLib.Utils; +using VNLib.Utils.Memory; +using VNLib.Utils.Logging; +using VNLib.Utils.Extensions; +using VNLib.Data.Caching; +using VNLib.Data.Caching.ObjectCache; +using VNLib.Plugins; +using VNLib.Plugins.Extensions.Loading; +using VNLib.Plugins.Extensions.Loading.Events; + +namespace VNLib.Data.Caching.Providers.VNCache +{ + + /* + * A combined cache object that uses the blob cache data structures + * from the ObjectCache server library to implement similar memory cache + * features. All update operations are write-through operations, and a timer + * may be scheduled to refresh memorycache against the server (eventually) + * + * Memory cache is destroyed when the connection to the cache server is + * lost or is exiting + */ + + + [ConfigurationName(VNCacheClient.CACHE_CONFIG_KEY)] + internal sealed class RemoteBackedMemoryCache : IDisposable, IGlobalCacheProvider, IIntervalScheduleable + { + private readonly MemoryCacheConfig _cacheConfig; + private readonly ICacheObjectSerializer _fallbackSerializer; + private readonly ICacheObjectDeserializer _fallbackDeserializer; + private readonly IBlobCacheTable _memCache; + private readonly IGlobalCacheProvider _backing; + private readonly IUnmangedHeap _bufferHeap; + private readonly BucketLocalManagerFactory? _bucketFactory; + + public RemoteBackedMemoryCache(PluginBase plugin, IConfigScope config) + : this( + config.GetRequiredProperty(VNCacheClient.MEMORY_CACHE_CONFIG_KEY, p => p.Deserialize<MemoryCacheConfig>()!), + plugin.GetOrCreateSingleton<FBMCacheClient>(), //Cache client is backing store + plugin.GetOrCreateSingleton<BucketLocalManagerFactory>() + ) + { + + //Schedule cache purge interval + if (_cacheConfig.RefreshInterval > TimeSpan.Zero) + { + plugin.ScheduleInterval(this, _cacheConfig.RefreshInterval); + } + } + + + public RemoteBackedMemoryCache(MemoryCacheConfig memCache, IGlobalCacheProvider backingStore) : this(memCache, backingStore, null) + { } + + public RemoteBackedMemoryCache(MemoryCacheConfig memCache, IGlobalCacheProvider backingStore, BucketLocalManagerFactory? factory) + { + _ = memCache ?? throw new ArgumentNullException(nameof(memCache)); + _ = backingStore ?? throw new ArgumentNullException(nameof(backingStore)); + + memCache.Validate(); + + /* + * If no buffer factory was supplied, we can create one, but it has to be + * disposed manually on exit. If one was supplied, we can use it but we do not + * manage it's lifetime + */ + + factory ??= _bucketFactory = BucketLocalManagerFactory.Create(memCache.ZeroAllAllocations); + + //Setup mem cache table + _memCache = new BlobCacheTable(memCache.TableSize, memCache.BucketSize, factory, null); + + //If backing store is a VnCacheClient, steal it's buffer heap + _bufferHeap = backingStore is FBMCacheClient client && client.Client.Config.MemoryManager.TryGetHeap(out IUnmangedHeap? heap) ? heap : MemoryUtil.Shared; + + + _cacheConfig = memCache; + _backing = backingStore; + + /* + * Default to json serialization by using the default + * serializer and JSON options + */ + + JsonCacheObjectSerializer defaultSerializer = new(); + _fallbackSerializer = defaultSerializer; + _fallbackDeserializer = defaultSerializer; + } + + void IDisposable.Dispose() + { + //Dispose of the memory cache + _memCache.Dispose(); + _bucketFactory?.Dispose(); + + if (_backing is IDisposable disposable) + { + disposable.Dispose(); + } + } + + ///<inheritdoc/> + object IGlobalCacheProvider.GetUnderlyingStore() => _backing.GetUnderlyingStore(); + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private void CheckConnected() + { + if (!_backing.IsConnected) + { + throw new InvalidOperationException("The client is not connected to the remote cache"); + } + } + + ///<inheritdoc/> + public bool IsConnected => _backing.IsConnected; + + ///<inheritdoc/> + public Task AddOrUpdateAsync<T>(string key, string? newKey, T value, CancellationToken cancellation) + => AddOrUpdateAsync(key, newKey, value, _fallbackSerializer, cancellation); + + ///<inheritdoc/> + public Task<bool> DeleteAsync(string key, CancellationToken cancellation) + { + CheckConnected(); + + //Delete the object from + Task<bool> local = _memCache.DeleteObjectAsync(key, cancellation).AsTask(); + Task<bool> remote = _backing.DeleteAsync(key, cancellation); + + //task when both complete + return Task.WhenAll(local, remote).ContinueWith(static p => p.Result.First(), TaskScheduler.Default); + } + + ///<inheritdoc/> + public Task<T?> GetAsync<T>(string key, CancellationToken cancellation) => GetAsync<T>(key, _fallbackDeserializer, cancellation); + + ///<inheritdoc/> + public async Task<T?> GetAsync<T>(string key, ICacheObjectDeserializer deserializer, CancellationToken cancellation) + { + _ = deserializer ?? throw new ArgumentNullException(nameof(deserializer)); + + GetStateResult<T?> state = new() + { + Deserialzer = deserializer, + Value = default! + }; + + //Try to the object from the cache and if found, deserialize it and store the result + await GetAsync(key, static (r, data) => r.SetState(data), state, cancellation); + + return state.Value; + } + + ///<inheritdoc/> + public async Task GetAsync<T>(string key, ObjectDataSet<T> setter, T state, CancellationToken cancellation) + { + _ = key ?? throw new ArgumentNullException(nameof(key)); + _ = setter ?? throw new ArgumentNullException(nameof(setter)); + + CheckConnected(); + + IBlobCacheBucket bucket = _memCache.GetBucket(key); + + //Obtain cache handle + using (CacheBucketHandle handle = await bucket.WaitAsync(cancellation)) + { + //Try to read the value + if (handle.Cache.TryGetValue(key, out CacheEntry entry)) + { + setter(state, entry.GetDataSegment()); + return; + } + } + /* + * Can't avoid a double copy because we need to read the data from cache in order to store + * a local copy to update memcache + */ + + //Alloc buffer from client heap + using ObjectGetBuffer getBuffer = new(_bufferHeap); + + //Get the object from the server + await _backing.GetAsync(key, static (b, data) => b.SetData(data), getBuffer, cancellation); + + //See if object data was set + if (!getBuffer.GetData().IsEmpty) + { + //Update local cache + await _memCache.AddOrUpdateObjectAsync(key, null, static b => b.GetData(), getBuffer, DateTime.UtcNow, CancellationToken.None); + + //Invoket the setter + setter(state, getBuffer.GetData()); + } + } + + ///<inheritdoc/> + public async Task AddOrUpdateAsync<T>(string key, string? newKey, T value, ICacheObjectSerializer serialzer, CancellationToken cancellation) + { + CheckConnected(); + + //Alloc serialzation buffer + using AddOrUpdateBuffer buffer = new (_bufferHeap); + + //Serialze the value + serialzer.Serialize(value, buffer); + + await AddOrUpdateAsync(key, newKey, static p => p.GetData(), buffer, cancellation); + } + + ///<inheritdoc/> + public async Task AddOrUpdateAsync<T>(string key, string? newKey, ObjectDataReader<T> callback, T state, CancellationToken cancellation) + { + CheckConnected(); + + DateTime currentTime = DateTime.UtcNow; + + try + { + //Update remote first, and if exceptions are raised, do not update local cache + await _backing.AddOrUpdateAsync(key, newKey, callback, state, cancellation); + + //Safe to update local cache + await _memCache.AddOrUpdateObjectAsync(key, newKey, callback, state, currentTime, CancellationToken.None); + } + catch + { + //Remove local cache if exception occurs + await _memCache.DeleteObjectAsync(key, CancellationToken.None); + throw; + } + } + + async Task IIntervalScheduleable.OnIntervalAsync(ILogProvider log, CancellationToken cancellationToken) + { + if (!IsConnected) + { + return; + } + + //Get buckets + IBlobCacheBucket[] buckets = _memCache.ToArray(); + + foreach (IBlobCacheBucket bucket in buckets) + { + //enter bucket lock + using CacheBucketHandle handle = await bucket.WaitAsync(cancellationToken); + + //Prune expired entires + PruneExpired(handle.Cache); + } + } + + private void PruneExpired(IBlobCache cache) + { + DateTime current = DateTime.UtcNow; + + //Enumerate all cache entires to determine if they have expired + string[] expired = (from ec in cache + where ec.Value.GetTime().Add(_cacheConfig.MaxCacheAge) < current + select ec.Key) + .ToArray(); + + //Remove expired entires + for (int i = 0; i < expired.Length; i++) + { + cache.Remove(expired[i]); + } + } + + /* + * Stores temporary state for a cache get operation + * that requires a deserializer to return it to + * object form + */ + private sealed class GetStateResult<T> + { + public T? Value; + public ICacheObjectDeserializer? Deserialzer; + + public void SetState(ReadOnlySpan<byte> data) + { + Value = Deserialzer!.Deserialize<T>(data); + } + } + + /* + * A buffer to store object data on a cache get + */ + private sealed class ObjectGetBuffer : VnDisposeable + { + private IMemoryHandle<byte>? _buffer; + private readonly IUnmangedHeap _heap; + + public ObjectGetBuffer(IUnmangedHeap heap) + { + _heap = heap; + } + + public ReadOnlySpan<byte> GetData() + { + return _buffer == null ? ReadOnlySpan<byte>.Empty : _buffer.Span; + } + + public void SetData(ReadOnlySpan<byte> data) + { + //Alloc a buffer from the supplied data + _buffer = data.IsEmpty ? null : _heap.AllocAndCopy(data); + } + + protected override void Free() + { + //Free buffer + _buffer?.Dispose(); + } + } + + } +}
\ No newline at end of file diff --git a/plugins/VNLib.Data.Caching.Providers.VNCache/src/RemoteCacheOperator.cs b/plugins/VNLib.Data.Caching.Providers.VNCache/src/RemoteCacheOperator.cs new file mode 100644 index 0000000..fdf1c5e --- /dev/null +++ b/plugins/VNLib.Data.Caching.Providers.VNCache/src/RemoteCacheOperator.cs @@ -0,0 +1,85 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Data.Caching.Providers.VNCache +* File: RemoteCacheOperator.cs +* +* RemoteCacheOperator.cs is part of VNLib.Data.Caching.Providers.VNCache which is +* part of the larger VNLib collection of libraries and utilities. +* +* VNLib.Data.Caching.Providers.VNCache is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Data.Caching.Providers.VNCache is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Threading; +using System.Threading.Tasks; + +using VNLib.Utils.Logging; +using VNLib.Plugins; +using VNLib.Plugins.Extensions.Loading; + +namespace VNLib.Data.Caching.Providers.VNCache +{ + /// <summary> + /// Represents a handle to a VNCache cache client, that exposes a cancellable + /// <see cref="IAsyncBackgroundWork"/> to run inside a <see cref="PluginBase"/> + /// or standlone in your own background work handler + /// </summary> + /// <remarks> + /// The background work method must be sheduled for the cache client to be + /// connected to the backing store + /// </remarks> + public sealed class RemoteCacheOperator : IAsyncBackgroundWork + { + private readonly FBMCacheClient _client; + private CancellationTokenSource? _tokenSource; + + internal RemoteCacheOperator(FBMCacheClient client, RemoteBackedMemoryCache? memCache) + { + //Store the client to be used in the background work + _client = client; + Cache = memCache ?? (IGlobalCacheProvider)client; //Cache is the remote backing store + } + + /// <summary> + /// The configured global cache instance + /// </summary> + public IGlobalCacheProvider Cache { get; } + + ///<inheritdoc/> + ///<exception cref="ArgumentNullException"></exception> + public async Task DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken) + { + _ = pluginLog ?? throw new ArgumentNullException(nameof(pluginLog)); + + //Create cts linked to the exit token to allow user cancellation of the listener + using (_tokenSource = CancellationTokenSource.CreateLinkedTokenSource(exitToken)) + { + //Do work with linked source + await _client.DoWorkAsync(pluginLog, _tokenSource.Token) + .ConfigureAwait(false); + } + + //Remove cts + _tokenSource = null; + } + + /// <summary> + /// Cancels the background cache client listener + /// </summary> + public void CancelListener() => _tokenSource?.Cancel(); + + } +}
\ No newline at end of file diff --git a/plugins/VNLib.Data.Caching.Providers.VNCache/src/VNCacheClient.cs b/plugins/VNLib.Data.Caching.Providers.VNCache/src/VNCacheClient.cs new file mode 100644 index 0000000..a832b41 --- /dev/null +++ b/plugins/VNLib.Data.Caching.Providers.VNCache/src/VNCacheClient.cs @@ -0,0 +1,211 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Data.Caching.Providers.VNCache +* File: VNCacheClient.cs +* +* VNCacheClient.cs is part of VNLib.Data.Caching.Providers.VNCache which is +* part of the larger VNLib collection of libraries and utilities. +* +* VNLib.Data.Caching.Providers.VNCache is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Data.Caching.Providers.VNCache is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Text.Json; +using System.Threading; +using System.Threading.Tasks; + +using VNLib.Utils.Logging; +using VNLib.Plugins; +using VNLib.Plugins.Extensions.Loading; + +/* + * This package exports an IGlobalCacheProvider that is intended to be packaged by + * application distributors that want to use VNCache as a global cache for their + * application. + * + * This package allows for memory only caching, write-through memory cache, and + * direct remote caching using VNCache as the backend. + */ + +namespace VNLib.Data.Caching.Providers.VNCache +{ + + /// <summary> + /// The VNCache global cache provider client, that is intended to be loaded + /// using <see cref="LoadingExtensions.GetOrCreateSingleton{T}(PluginBase)"/> directly + /// on the plugin loading a cache client. + /// <para> + /// Users may also create cache instances outside of plugin context using static + /// methods. + /// </para> + /// </summary> + [ExternService] + [ConfigurationName(CACHE_CONFIG_KEY)] + public sealed class VNCacheClient : IGlobalCacheProvider + { + internal const string CACHE_CONFIG_KEY = "cache"; + internal const string MEMORY_CACHE_CONFIG_KEY = "memory_cache"; + internal const string MEMORY_CACHE_ONLY_KEY = "memory_only"; + + private readonly IGlobalCacheProvider _client; + + public VNCacheClient(PluginBase plugin, IConfigScope config) + { + if (config.TryGetValue(MEMORY_CACHE_CONFIG_KEY, out _)) + { + //Check for memory only flag + if (config.TryGetValue(MEMORY_CACHE_ONLY_KEY, out JsonElement memOnly) && memOnly.GetBoolean()) + { + //Create a memory-only cache + _client = plugin.GetOrCreateSingleton<MemoryCache>(); + } + else + { + //Remote-backed memory cache + _client = plugin.GetOrCreateSingleton<RemoteBackedMemoryCache>(); + } + } + else + { + //Setup non-memory backed cache client + _client = plugin.GetOrCreateSingleton<FBMCacheClient>(); + } + } + + + /// <summary> + /// Allows you to programatically create a remote-only VNCache instance + /// </summary> + /// <param name="config">The remote cache configuration, required for VNCache remote cache servers</param> + /// <param name="debugLog">An optional FBMClient debugging log provider, should be null unless debug logging is desired </param> + /// <returns>An opreator handle that can schedule the remote cache worker task</returns> + /// <exception cref="ArgumentNullException"></exception> + /// <remarks> + /// The returned <see cref="RemoteCacheOperator"/> implements the <see cref="IAsyncBackgroundWork"/> + /// interface and must be scheduled in order to maintain a connection with the remote cache store. + /// </remarks> + public static RemoteCacheOperator CreateRemoteCache(VnCacheClientConfig config, ILogProvider? debugLog = null) + { + _ = config ?? throw new ArgumentNullException(nameof(config)); + + //Init client + FBMCacheClient client = new(config, debugLog); + + //Return single handle + return new(client, null); + } + + /// <summary> + /// Allows you to programatically create your own instance if a VNCache remote server backed + /// memory cache programatically. + /// </summary> + /// <param name="remote">The remote cache configuration, required for VNCache remote cache servers</param> + /// <param name="memory">The local memory backed configuration</param> + /// <param name="debugLog">An optional FBMClient debugging log provider, should be null unless debug logging is desired </param> + /// <returns>An opreator handle that can schedule the remote cache worker task</returns> + /// <exception cref="ArgumentNullException"></exception> + /// <remarks> + /// The returned <see cref="RemoteCacheOperator"/> implements the <see cref="IAsyncBackgroundWork"/> + /// interface and must be scheduled in order to maintain a connection with the remote cache store. The memory cache + /// resources are released when the worker task exits. + /// </remarks> + public static RemoteCacheOperator CreateRemoteBackedMemoryCache(VnCacheClientConfig remote, MemoryCacheConfig memory, ILogProvider? debugLog) + { + _ = remote ?? throw new ArgumentNullException(nameof(remote)); + _ = memory ?? throw new ArgumentNullException(nameof(memory)); + + FBMCacheClient client = new(remote, debugLog); + + //Init client + RemoteBackedMemoryCache memCache = new(memory, client); + + //Return single handle + return new(client, memCache); + } + + /// <summary> + /// Allows you to programatically create a memory only <see cref="IGlobalCacheProvider"/> + /// cache instance. + /// </summary> + /// <param name="config">The memory cache configuration</param> + /// <returns> + /// A <see cref="MemoryCacheOperator"/> handle that holds a ready-to use cache instance. + /// This operator must be disposed to release held resources. + /// </returns> + /// <exception cref="ArgumentNullException"></exception> + public static MemoryCacheOperator CreateMemoryCache(MemoryCacheConfig config) + { + _ = config ?? throw new ArgumentNullException(nameof(config)); + + //Init client + MemoryCache cache = new(config); + + //Return single handle + return new(cache); + } + + ///<inheritdoc/> + public bool IsConnected => _client.IsConnected; + + ///<inheritdoc/> + public Task AddOrUpdateAsync<T>(string key, string? newKey, T value, CancellationToken cancellation) + { + return _client.AddOrUpdateAsync(key, newKey, value, cancellation); + } + + ///<inheritdoc/> + public Task AddOrUpdateAsync<T>(string key, string? newKey, T value, ICacheObjectSerializer serialzer, CancellationToken cancellation) + { + return _client.AddOrUpdateAsync(key, newKey, value, serialzer, cancellation); + } + + ///<inheritdoc/> + public Task AddOrUpdateAsync<T>(string key, string? newKey, ObjectDataReader<T> callback, T state, CancellationToken cancellation) + { + return _client.AddOrUpdateAsync(key, newKey, callback, state, cancellation); + } + + ///<inheritdoc/> + public Task<bool> DeleteAsync(string key, CancellationToken cancellation) + { + return _client.DeleteAsync(key, cancellation); + } + + ///<inheritdoc/> + public Task<T?> GetAsync<T>(string key, CancellationToken cancellation) + { + return _client.GetAsync<T>(key, cancellation); + } + + ///<inheritdoc/> + public Task<T?> GetAsync<T>(string key, ICacheObjectDeserializer deserializer, CancellationToken cancellation) + { + return _client.GetAsync<T>(key, deserializer, cancellation); + } + + ///<inheritdoc/> + public Task GetAsync<T>(string key, ObjectDataSet<T> callback, T state, CancellationToken cancellation) + { + return _client.GetAsync(key, callback, state, cancellation); + } + + ///<inheritdoc/> + public object GetUnderlyingStore() + { + return _client.GetUnderlyingStore(); + } + } +} diff --git a/plugins/VNLib.Data.Caching.Providers.VNCache/src/VNLib.Data.Caching.Providers.VNCache.csproj b/plugins/VNLib.Data.Caching.Providers.VNCache/src/VNLib.Data.Caching.Providers.VNCache.csproj new file mode 100644 index 0000000..93825a5 --- /dev/null +++ b/plugins/VNLib.Data.Caching.Providers.VNCache/src/VNLib.Data.Caching.Providers.VNCache.csproj @@ -0,0 +1,52 @@ +<Project Sdk="Microsoft.NET.Sdk"> + + <PropertyGroup> + <TargetFramework>net6.0</TargetFramework> + <RootNamespace>VNLib.Data.Caching.Providers.VNCache</RootNamespace> + <AssemblyName>VNLib.Data.Caching.Providers.VNCache</AssemblyName> + <Nullable>enable</Nullable> + <AnalysisLevel>latest-all</AnalysisLevel> + <ProduceReferenceAssembly>True</ProduceReferenceAssembly> + <GenerateDocumentationFile>False</GenerateDocumentationFile> + <!-- Resolve nuget dll files and store them in the output dir --> + <EnableDynamicLoading>true</EnableDynamicLoading> + </PropertyGroup> + + <PropertyGroup> + <Authors>Vaughn Nugent</Authors> + <Company>Vaughn Nugent</Company> + <Product>VNLib.Data.Caching.Providers.VNCache</Product> + <PackageId>VNLib.Data.Caching.Providers.VNCache</PackageId> + <Description>A runtime asset VNCache client library that exposes an IGlobalCacheProvider that works with VNCache clusters, write through memory-cache, and memory only cache</Description> + <Copyright>Copyright © 2023 Vaughn Nugent</Copyright> + <PackageProjectUrl>https://www.vaughnnugent.com/resources/software/modules/VNLib.Data.Caching</PackageProjectUrl> + <RepositoryUrl>https://github.com/VnUgE/VNLib.Data.Caching/tree/master/plugins/VNLib.Data.Caching.Providers.VNCache</RepositoryUrl> + <PackageReadmeFile>README.md</PackageReadmeFile> + <PackageLicenseFile>LICENSE</PackageLicenseFile> + <PackageRequireLicenseAcceptance>True</PackageRequireLicenseAcceptance> + </PropertyGroup> + + <ItemGroup> + <None Include="..\..\..\LICENSE"> + <Pack>True</Pack> + <PackagePath>\</PackagePath> + <CopyToOutputDirectory>Always</CopyToOutputDirectory> + </None> + <None Include="..\README.md"> + <Pack>True</Pack> + <PackagePath>\</PackagePath> + </None> + </ItemGroup> + + <ItemGroup> + <ProjectReference Include="..\..\..\..\Extensions\lib\VNLib.Plugins.Extensions.Loading\src\VNLib.Plugins.Extensions.Loading.csproj" /> + <ProjectReference Include="..\..\..\lib\VNLib.Data.Caching.Extensions\src\VNLib.Data.Caching.Extensions.csproj" /> + <ProjectReference Include="..\..\..\lib\VNLib.Data.Caching.ObjectCache\src\VNLib.Data.Caching.ObjectCache.csproj" /> + <ProjectReference Include="..\..\..\lib\VNLib.Data.Caching\src\VNLib.Data.Caching.csproj" /> + </ItemGroup> + + <Target Condition="'$(BuildingInsideVisualStudio)' == true" Name="PostBuild" AfterTargets="PostBuildEvent"> + <Exec Command="start xcopy "$(TargetDir)" "..\..\..\..\..\devplugins\runtimeassets\$(TargetName)" /E /Y /R" /> + </Target> + +</Project> diff --git a/plugins/VNLib.Data.Caching.Providers.VNCache/src/VnCacheClientConfig.cs b/plugins/VNLib.Data.Caching.Providers.VNCache/src/VnCacheClientConfig.cs new file mode 100644 index 0000000..f84fe55 --- /dev/null +++ b/plugins/VNLib.Data.Caching.Providers.VNCache/src/VnCacheClientConfig.cs @@ -0,0 +1,139 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Data.Caching.Providers.VNCache +* File: VnCacheClientConfig.cs +* +* VnCacheClientConfig.cs is part of VNLib.Data.Caching.Providers.VNCache which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Data.Caching.Providers.VNCache is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Data.Caching.Providers.VNCache is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Linq; +using System.Text.Json.Serialization; + +using VNLib.Plugins.Extensions.Loading; + +namespace VNLib.Data.Caching.Providers.VNCache +{ + /// <summary> + /// Represents a remote VNCache client configuration + /// </summary> + public class VnCacheClientConfig : IOnConfigValidation + { + /// <summary> + /// The maximum size (in bytes) of messages sent to the + /// cache server. This value will be negotiated with the server + /// during a connection upgrade + /// </summary> + [JsonPropertyName("max_object_size")] + public int? MaxMessageSize { get; set; } + + /// <summary> + /// The broker server address + /// </summary> + [JsonPropertyName("use_tls")] + public bool UseTls { get; set; } = true; + + /// <summary> + /// The time (in seconds) to randomly delay polling the broker server + /// for available servers + /// </summary> + [JsonPropertyName("discovery_interval_sec")] + public int? DiscoveryIntervalSeconds { get; set; } + + /// <summary> + /// The maximum time (in seconds) for FBM cache operations are allowed + /// to take before timing out. + /// </summary> + /// <remarks> + /// NOTE: You should set this value to something reasonable as FBM messages can + /// be lost and cause deadlocks if your cache implementation does not rely on + /// CancellationTokens + /// </remarks> + [JsonPropertyName("request_timeout_sec")] + public int? RequestTimeoutSeconds { get; set; } + + /// <summary> + /// Retry interval in a timespan + /// </summary> + internal TimeSpan DiscoveryInterval => TimeSpan.FromSeconds(DiscoveryIntervalSeconds!.Value); + + /// <summary> + /// FBM Request timeout + /// </summary> + internal TimeSpan RequestTimeout => TimeSpan.FromSeconds(RequestTimeoutSeconds!.Value); + + /// <summary> + /// The initial peers to connect to + /// </summary> + [JsonPropertyName("initial_nodes")] + public string[]? InitialNodes { get; set; } + + /// <summary> + /// Gets the initial nodes as a collection of URIs + /// </summary> + /// <returns>The nodes as a collection of URIs</returns> + /// <exception cref="InvalidOperationException"></exception> + public Uri[] GetInitialNodeUris() + { + _ = InitialNodes ?? throw new InvalidOperationException("Initial nodes have not been set"); + return InitialNodes.Select(static x => new Uri(x, UriKind.Absolute)).ToArray(); + } + + void IOnConfigValidation.Validate() + { + if (!MaxMessageSize.HasValue || MaxMessageSize.Value < 1) + { + throw new ArgumentException("Your maxium message size should be a reasonable value greater than 0", "max_message_size"); + } + + if (!DiscoveryIntervalSeconds.HasValue || DiscoveryIntervalSeconds.Value < 1) + { + throw new ArgumentException("You must specify a discovery interval period greater than 0", "retry_interval_sec"); + } + + //Allow a 0 timeout to disable timeouts, not recommended, but allowed + if (!RequestTimeoutSeconds.HasValue || RequestTimeoutSeconds.Value < 0) + { + throw new ArgumentException("You must specify a positive integer FBM message timoeut", "request_timeout_sec"); + } + + //Validate initial nodes + if (InitialNodes == null || InitialNodes.Length == 0) + { + throw new ArgumentException("You must specify at least one initial peer", "initial_peers"); + } + + //Validate initial nodes + foreach (Uri peer in GetInitialNodeUris()) + { + if (!peer.IsAbsoluteUri) + { + throw new ArgumentException("You must specify an absolute URI for each initial node", "initial_nodes"); + } + + //Verify http connection + if (peer.Scheme != Uri.UriSchemeHttp && peer.Scheme != Uri.UriSchemeHttps) + { + throw new ArgumentException("You must specify an HTTP or HTTPS URI for each initial node", "initial_nodes"); + } + } + } + + } +}
\ No newline at end of file |