From cd1daadaeaa6ffbaaef3ed25452decd90d01fdfc Mon Sep 17 00:00:00 2001 From: vnugent Date: Thu, 9 Mar 2023 01:48:38 -0500 Subject: Omega cache, session, and account provider complete overhaul --- .../src/AddOrUpdateBuffer.cs | 97 +++++++ .../src/DataModel/EntityCacheExtensions.cs | 159 ++++++++++++ .../src/DataModel/ICacheEntity.cs | 39 +++ .../src/DataModel/ICacheExpirationStrategy.cs | 48 ++++ .../src/DataModel/ICacheKeyGenerator.cs | 40 +++ .../src/DataModel/IExpirableCacheEntity.cs | 39 +++ .../src/DataModel/ScopedCache.cs | 64 +++++ .../src/MemoryCache.cs | 168 ++++++++++++ .../src/MemoryCacheConfig.cs | 65 +++++ .../src/RemoteBackedMemoryCache.cs | 258 +++++++++++++++++++ .../src/VNCacheExtensions.cs | 135 +++++----- .../src/VNLib.Plugins.Extensions.VNCache.csproj | 18 +- .../src/VnCacheClient.cs | 283 +++++++++++---------- .../src/VnGlobalCache.cs | 102 ++++++++ 14 files changed, 1318 insertions(+), 197 deletions(-) create mode 100644 lib/VNLib.Plugins.Extensions.VNCache/src/AddOrUpdateBuffer.cs create mode 100644 lib/VNLib.Plugins.Extensions.VNCache/src/DataModel/EntityCacheExtensions.cs create mode 100644 lib/VNLib.Plugins.Extensions.VNCache/src/DataModel/ICacheEntity.cs create mode 100644 lib/VNLib.Plugins.Extensions.VNCache/src/DataModel/ICacheExpirationStrategy.cs create mode 100644 lib/VNLib.Plugins.Extensions.VNCache/src/DataModel/ICacheKeyGenerator.cs create mode 100644 lib/VNLib.Plugins.Extensions.VNCache/src/DataModel/IExpirableCacheEntity.cs create mode 100644 lib/VNLib.Plugins.Extensions.VNCache/src/DataModel/ScopedCache.cs create mode 100644 lib/VNLib.Plugins.Extensions.VNCache/src/MemoryCache.cs create mode 100644 lib/VNLib.Plugins.Extensions.VNCache/src/MemoryCacheConfig.cs create mode 100644 lib/VNLib.Plugins.Extensions.VNCache/src/RemoteBackedMemoryCache.cs create mode 100644 lib/VNLib.Plugins.Extensions.VNCache/src/VnGlobalCache.cs (limited to 'lib/VNLib.Plugins.Extensions.VNCache/src') diff --git a/lib/VNLib.Plugins.Extensions.VNCache/src/AddOrUpdateBuffer.cs b/lib/VNLib.Plugins.Extensions.VNCache/src/AddOrUpdateBuffer.cs new file mode 100644 index 0000000..a1fe2b5 --- /dev/null +++ b/lib/VNLib.Plugins.Extensions.VNCache/src/AddOrUpdateBuffer.cs @@ -0,0 +1,97 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Plugins.Extensions.VNCache +* File: AddOrUpdateBuffer.cs +* +* AddOrUpdateBuffer.cs is part of VNLib.Plugins.Extensions.VNCache +* which is part of the larger VNLib collection of libraries and utilities. +* +* VNLib.Plugins.Extensions.VNCache is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Plugins.Extensions.VNCache is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Buffers; + +using VNLib.Utils; +using VNLib.Utils.Memory; +using VNLib.Utils.Extensions; +using VNLib.Data.Caching; + +namespace VNLib.Plugins.Extensions.VNCache +{ + /// + /// Implements a buffer writer that serves to serialize object data and + /// store the object data for use by the memory cache store, and the + /// remote cache store + /// + class AddOrUpdateBuffer : VnDisposeable, IBufferWriter, IObjectData + { + private int _count; + private readonly IUnmangedHeap _heap; + private MemoryHandle? _buffer; + + public AddOrUpdateBuffer(IUnmangedHeap heap) + { + _heap = heap; + } + + public void Advance(int count) + { + //Update count + _count += count; + } + + public Memory GetMemory(int sizeHint = 0) + { + throw new NotImplementedException(); + } + + public Span GetSpan(int sizeHint = 0) + { + //Round to nearest page for new size + nint newSize = MemoryUtil.NearestPage(sizeHint + _count); + + //Alloc buffer it not yet allocated + if (_buffer == null) + { + _buffer = _heap.Alloc(newSize); + } + else + { + //check for resize if allocated + _buffer.ResizeIfSmaller(newSize); + } + + return _buffer.AsSpan(_count); + } + + public void SetData(ReadOnlySpan data) + { + throw new NotSupportedException(); + } + + public ReadOnlySpan GetData() + { + //Get stored data from within handle + return _buffer!.AsSpan(0, _count); + } + + protected override void Free() + { + _buffer?.Dispose(); + } + } +} \ No newline at end of file diff --git a/lib/VNLib.Plugins.Extensions.VNCache/src/DataModel/EntityCacheExtensions.cs b/lib/VNLib.Plugins.Extensions.VNCache/src/DataModel/EntityCacheExtensions.cs new file mode 100644 index 0000000..79bb4fc --- /dev/null +++ b/lib/VNLib.Plugins.Extensions.VNCache/src/DataModel/EntityCacheExtensions.cs @@ -0,0 +1,159 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Plugins.Extensions.VNCache +* File: EntityCacheExtensions.cs +* +* EntityCacheExtensions.cs is part of VNLib.Plugins.Extensions.VNCache +* which is part of the larger VNLib collection of libraries and utilities. +* +* VNLib.Plugins.Extensions.VNCache is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Plugins.Extensions.VNCache is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Threading; +using System.Threading.Tasks; +using System.Runtime.CompilerServices; + +using VNLib.Data.Caching; + +namespace VNLib.Plugins.Extensions.VNCache.DataModel +{ + /// + /// Provides cache extensions for entity caching + /// + public static class EntityCacheExtensions + { + /// + /// Gets a new that is backed by the current cache provider + /// and generates 1:1 keys from the + /// + /// + /// The instance that generates unique keys for a given entity id + /// The new instance + public static ScopedCache GetScopedCache(this IGlobalCacheProvider cache, ICacheKeyGenerator cacheKeyGenerator) => new ScopedCacheImpl(cache, cacheKeyGenerator); + + /// + /// Deletes an from the cache from its id + /// + /// + /// + /// The entity to delete from the store + /// A token to cancel the operation + /// A task that completes when the delete operation has compelted + /// + public static Task DeleteAsync(this IGlobalCacheProvider cache, T entity, CancellationToken cancellation) where T: class, ICacheEntity + { + _ = entity ?? throw new ArgumentNullException(nameof(entity)); + _ = cache ?? throw new ArgumentNullException(nameof(entity)); + //Delete by its id + return cache.DeleteAsync(entity.Id, cancellation); + } + + /// + /// Asynchronously sets (or updates) a cached value in the backing cache store + /// + /// + /// + /// A token to cancel the async operation + /// The entity to set at the given key + /// A task that completes when the add/update operation has compelted + /// + public static Task AddOrUpdateAsync(this IGlobalCacheProvider cache, T entity, CancellationToken cancellation) where T: class, ICacheEntity + { + _ = entity ?? throw new ArgumentNullException(nameof(entity)); + _ = cache ?? throw new ArgumentNullException(nameof(cache)); + + //Add/update with its id + return cache.AddOrUpdateAsync(entity.Id, null, entity, cancellation); + } + + + private sealed class ScopedCacheImpl: ScopedCache + { + private readonly IGlobalCacheProvider cache; + + public override bool IsConnected + { + [MethodImpl(MethodImplOptions.AggressiveInlining)] + get => cache.IsConnected; + } + + + protected override ICacheKeyGenerator KeyGen { get; } + + public ScopedCacheImpl(IGlobalCacheProvider cache, ICacheKeyGenerator keyGen) + { + this.cache = cache; + KeyGen = keyGen; + } + + public override Task AddOrUpdateAsync(string key, string? newKey, T value, CancellationToken cancellation) + { + _ = key ?? throw new ArgumentNullException(nameof(key)); + + //Compute primary key from id + string primary = KeyGen.ComputedKey(key); + + //If newkey exists, compute the secondary key + string? secondary = newKey != null ? KeyGen.ComputedKey(newKey) : null; + + return cache.AddOrUpdateAsync(primary, secondary, value, cancellation); + } + + public override Task DeleteAsync(string key, CancellationToken cancellation) + { + _ = key ?? throw new ArgumentNullException(nameof(key)); + //Compute the key for the id + string scoped = KeyGen.ComputedKey(key); + return cache.DeleteAsync(scoped, cancellation); + } + + public override Task GetAsync(string key, CancellationToken cancellation) + { + _ = key ?? throw new ArgumentNullException(nameof(key)); + + //Compute the key for the id + string scoped = KeyGen.ComputedKey(key); + + return cache.GetAsync(scoped, cancellation); + } + + public override Task GetAsync(string key, ICacheObjectDeserialzer deserializer, CancellationToken cancellation) + { + _ = key ?? throw new ArgumentNullException(nameof(key)); + + //Compute the key for the id + string scoped = KeyGen.ComputedKey(key); + + return cache.GetAsync(scoped, deserializer, cancellation); + } + + public override Task AddOrUpdateAsync(string key, string? newKey, T value, ICacheObjectSerialzer serialzer, CancellationToken cancellation) + { + _ = key ?? throw new ArgumentNullException(nameof(key)); + + //Compute primary key from id + string primary = KeyGen.ComputedKey(key); + + //If newkey exists, compute the secondary key + string? secondary = newKey != null ? KeyGen.ComputedKey(newKey) : null; + + return cache.AddOrUpdateAsync(primary, secondary, value, serialzer, cancellation); + } + } + } + +} diff --git a/lib/VNLib.Plugins.Extensions.VNCache/src/DataModel/ICacheEntity.cs b/lib/VNLib.Plugins.Extensions.VNCache/src/DataModel/ICacheEntity.cs new file mode 100644 index 0000000..77f0667 --- /dev/null +++ b/lib/VNLib.Plugins.Extensions.VNCache/src/DataModel/ICacheEntity.cs @@ -0,0 +1,39 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Plugins.Extensions.VNCache +* File: ICacheEntity.cs +* +* ICacheEntity.cs is part of VNLib.Plugins.Extensions.VNCache +* which is part of the larger VNLib collection of libraries and utilities. +* +* VNLib.Plugins.Extensions.VNCache is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Plugins.Extensions.VNCache is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +namespace VNLib.Plugins.Extensions.VNCache.DataModel +{ + + /// + /// Represents a uniquely cachable item + /// + public interface ICacheEntity + { + /// + /// The unique ID of the item within the store + /// + string Id { get; } + } + +} diff --git a/lib/VNLib.Plugins.Extensions.VNCache/src/DataModel/ICacheExpirationStrategy.cs b/lib/VNLib.Plugins.Extensions.VNCache/src/DataModel/ICacheExpirationStrategy.cs new file mode 100644 index 0000000..f9ff54c --- /dev/null +++ b/lib/VNLib.Plugins.Extensions.VNCache/src/DataModel/ICacheExpirationStrategy.cs @@ -0,0 +1,48 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Plugins.Extensions.VNCache +* File: ICacheExpirationStrategy.cs +* +* ICacheExpirationStrategy.cs is part of VNLib.Plugins.Extensions.VNCache +* which is part of the larger VNLib collection of libraries and utilities. +* +* VNLib.Plugins.Extensions.VNCache is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Plugins.Extensions.VNCache is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; + +namespace VNLib.Plugins.Extensions.VNCache.DataModel +{ + /// + /// An interface that provides an object caching expiration + /// instructions + /// + public interface ICacheExpirationStrategy + { + /// + /// The maxium age of a given entity + /// + TimeSpan CacheMaxAge { get; } + + /// + /// Invoked when a record is retrieved and determined to be expired + /// + /// + /// + void OnExpired(T expired) where T : IExpirableCacheEntity; + } + +} diff --git a/lib/VNLib.Plugins.Extensions.VNCache/src/DataModel/ICacheKeyGenerator.cs b/lib/VNLib.Plugins.Extensions.VNCache/src/DataModel/ICacheKeyGenerator.cs new file mode 100644 index 0000000..2e558b0 --- /dev/null +++ b/lib/VNLib.Plugins.Extensions.VNCache/src/DataModel/ICacheKeyGenerator.cs @@ -0,0 +1,40 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Plugins.Extensions.VNCache +* File: ICacheKeyGenerator.cs +* +* ICacheKeyGenerator.cs is part of VNLib.Plugins.Extensions.VNCache +* which is part of the larger VNLib collection of libraries and utilities. +* +* VNLib.Plugins.Extensions.VNCache is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Plugins.Extensions.VNCache is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +namespace VNLib.Plugins.Extensions.VNCache.DataModel +{ + /// + /// An interface that provides 1:1 entity to cache key mapping + /// + public interface ICacheKeyGenerator + { + /// + /// Computes the unique key identifying the item within + /// the cache store, unique to the store. + /// + /// The id of the entity to get the key for + /// The unique key identifying the item + string ComputedKey(string entityId); + } +} diff --git a/lib/VNLib.Plugins.Extensions.VNCache/src/DataModel/IExpirableCacheEntity.cs b/lib/VNLib.Plugins.Extensions.VNCache/src/DataModel/IExpirableCacheEntity.cs new file mode 100644 index 0000000..a47d3ca --- /dev/null +++ b/lib/VNLib.Plugins.Extensions.VNCache/src/DataModel/IExpirableCacheEntity.cs @@ -0,0 +1,39 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Plugins.Extensions.VNCache +* File: IExpirableCacheEntity.cs +* +* IExpirableCacheEntity.cs is part of VNLib.Plugins.Extensions.VNCache +* which is part of the larger VNLib collection of libraries and utilities. +* +* VNLib.Plugins.Extensions.VNCache is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Plugins.Extensions.VNCache is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +namespace VNLib.Plugins.Extensions.VNCache.DataModel +{ + /// + /// A cache entity that has a controllable expiration + /// + public interface IExpirableCacheEntity : ICacheEntity + { + /// + /// A serializable value set by the cache subsystem to + /// handle stale cache entires + /// + long Expires { get; set; } + } + +} diff --git a/lib/VNLib.Plugins.Extensions.VNCache/src/DataModel/ScopedCache.cs b/lib/VNLib.Plugins.Extensions.VNCache/src/DataModel/ScopedCache.cs new file mode 100644 index 0000000..da2f78a --- /dev/null +++ b/lib/VNLib.Plugins.Extensions.VNCache/src/DataModel/ScopedCache.cs @@ -0,0 +1,64 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Plugins.Extensions.VNCache +* File: ScopedCache.cs +* +* ScopedCache.cs is part of VNLib.Plugins.Extensions.VNCache +* which is part of the larger VNLib collection of libraries and utilities. +* +* VNLib.Plugins.Extensions.VNCache is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Plugins.Extensions.VNCache is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System.Threading; +using System.Threading.Tasks; + +using VNLib.Data.Caching; + +namespace VNLib.Plugins.Extensions.VNCache.DataModel +{ + /// + /// A cache that stores objects with 1:1 keys unique to this instance. That is, a unique entity + /// that is stored in this cache instance may only be retrieved, deleted, or updated, by the + /// same instance. This is an abstract class. + /// + public abstract class ScopedCache : IGlobalCacheProvider + { + + /// + /// The to provide unique + /// cache keys + /// + protected abstract ICacheKeyGenerator KeyGen { get; } + + /// + public abstract bool IsConnected { get; } + + /// + public abstract Task AddOrUpdateAsync(string key, string? newKey, T value, CancellationToken cancellation); + + /// + public abstract Task DeleteAsync(string key, CancellationToken cancellation); + + /// + public abstract Task GetAsync(string key, CancellationToken cancellation); + + /// + public abstract Task GetAsync(string key, ICacheObjectDeserialzer deserializer, CancellationToken cancellation); + + /// + public abstract Task AddOrUpdateAsync(string key, string? newKey, T value, ICacheObjectSerialzer serialzer, CancellationToken cancellation); + } +} diff --git a/lib/VNLib.Plugins.Extensions.VNCache/src/MemoryCache.cs b/lib/VNLib.Plugins.Extensions.VNCache/src/MemoryCache.cs new file mode 100644 index 0000000..7b0fe72 --- /dev/null +++ b/lib/VNLib.Plugins.Extensions.VNCache/src/MemoryCache.cs @@ -0,0 +1,168 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Plugins.Extensions.VNCache +* File: RemoteBackedMemoryCache.cs +* +* RemoteBackedMemoryCache.cs is part of VNLib.Plugins.Extensions.VNCache +* which is part of the larger VNLib collection of libraries and utilities. +* +* VNLib.Plugins.Extensions.VNCache is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Plugins.Extensions.VNCache is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Text.Json; +using System.Threading; +using System.Threading.Tasks; + +using VNLib.Utils; +using VNLib.Utils.Memory; +using VNLib.Utils.Logging; +using VNLib.Utils.Memory.Diagnostics; +using VNLib.Data.Caching; +using VNLib.Data.Caching.ObjectCache; +using VNLib.Plugins.Extensions.Loading; + +namespace VNLib.Plugins.Extensions.VNCache +{ + [ConfigurationName(VNCacheExtensions.CACHE_CONFIG_KEY)] + internal sealed class MemoryCache : VnDisposeable, IGlobalCacheProvider + { + const int MB_DIVISOR = 1000 * 1024; + + const string DEBUG_TEMPLATE =@"Configuring Memory-Only Cache + | ----------------------------- + | Configuration: + | Table Size: {ts} + | Bucket Size: {bs} + | Max Objects: {obj} + | Memory Estimations: + | 4K blocks: {4k}Mb + | 8K blocks: {8k}Mb + | 16K blocks: {16K}Mb + | ----------------------------- +"; + + private readonly ICacheObjectSerialzer _serialzer; + private readonly ICacheObjectDeserialzer _deserialzer; + private readonly IBlobCacheTable _memCache; + private readonly IUnmangedHeap _bufferHeap; + + public MemoryCache(PluginBase pbase, IConfigScope config) + { + //Get nested memory cache config + MemoryCacheConfig memCacheConfig = config[VNCacheExtensions.MEMORY_CACHE_CONFIG_KEY].Deserialize()!; + + if (pbase.IsDebug()) + { + //Use the debug heap + IUnmangedHeap newHeap = MemoryUtil.InitializeNewHeapForProcess(); + + //Wrap in diag heap + _bufferHeap = new TrackedHeapWrapper(newHeap); + } + else + { + //Init new "private" heap to alloc buffer from + _bufferHeap = MemoryUtil.InitializeNewHeapForProcess(); + } + + //Setup cache table + _memCache = new BlobCacheTable(memCacheConfig.TableSize, memCacheConfig.BucketSize, _bufferHeap); + + /* + * Default to json serialization by using the default + * serializer and JSON options + */ + + JsonCacheObjectSerializer defaultSerializer = new(); + _serialzer = defaultSerializer; + _deserialzer = defaultSerializer; + + PrintDebug(pbase.Log, memCacheConfig); + } + + private static void PrintDebug(ILogProvider log, MemoryCacheConfig config) + { + long maxObjects = config.BucketSize * config.TableSize; + + long size4kMb = (maxObjects * 4096)/MB_DIVISOR; + long size8kMb = (maxObjects * 8128)/MB_DIVISOR; + long size16kMb = (maxObjects * 16384)/MB_DIVISOR; + + log.Debug(DEBUG_TEMPLATE, config.TableSize, config.BucketSize, maxObjects, size4kMb, size8kMb, size16kMb); + } + + /// + public bool IsConnected { get; } = true; + + protected override void Free() + { + _memCache.Dispose(); + _bufferHeap.Dispose(); + } + + /// + public Task AddOrUpdateAsync(string key, string? newKey, T value, CancellationToken cancellation) + { + return AddOrUpdateAsync(key, newKey, value, _serialzer, cancellation); + } + + /// + public async Task AddOrUpdateAsync(string key, string? newKey, T value, ICacheObjectSerialzer serialzer, CancellationToken cancellation) + { + Check(); + + //Alloc serialzation buffer + using AddOrUpdateBuffer buffer = new (_bufferHeap); + + //Serialze the value + serialzer.Serialize(value, buffer); + + //Update object data + await _memCache.AddOrUpdateObjectAsync(key, newKey, static b => b.GetData(), buffer, default, cancellation); + } + + /// + public Task DeleteAsync(string key, CancellationToken cancellation) + { + Check(); + return _memCache.DeleteObjectAsync(key, cancellation).AsTask(); + } + + /// + public Task GetAsync(string key, CancellationToken cancellation) => GetAsync(key, _deserialzer, cancellation); + + /// + public async Task GetAsync(string key, ICacheObjectDeserialzer deserializer, CancellationToken cancellation) + { + Check(); + + IBlobCacheBucket bucket = _memCache.GetBucket(key); + + //Obtain cache handle + using (CacheBucketHandle handle = await bucket.WaitAsync(cancellation)) + { + //Try to read the value + if (handle.Cache.TryGetValue(key, out CacheEntry entry)) + { + return (T?)deserializer.Deserialze(typeof(T), entry.GetDataSegment()); + } + } + + return default; + } + } +} \ No newline at end of file diff --git a/lib/VNLib.Plugins.Extensions.VNCache/src/MemoryCacheConfig.cs b/lib/VNLib.Plugins.Extensions.VNCache/src/MemoryCacheConfig.cs new file mode 100644 index 0000000..bcd821b --- /dev/null +++ b/lib/VNLib.Plugins.Extensions.VNCache/src/MemoryCacheConfig.cs @@ -0,0 +1,65 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Plugins.Extensions.VNCache +* File: MemoryCacheConfig.cs +* +* MemoryCacheConfig.cs is part of VNLib.Plugins.Extensions.VNCache +* which is part of the larger VNLib collection of libraries and utilities. +* +* VNLib.Plugins.Extensions.VNCache is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Plugins.Extensions.VNCache is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Text.Json.Serialization; + +namespace VNLib.Plugins.Extensions.VNCache +{ + internal sealed class MemoryCacheConfig : ICacheRefreshPolicy + { + [JsonPropertyName("buckets")] + public uint TableSize { get; set; } = 10; + + [JsonPropertyName("bucket_size")] + public uint BucketSize { get; set; } = 5000; + + [JsonPropertyName("max_object_size")] + public uint MaxBlobSize { get; set; } = 16 * 1024; + + [JsonIgnore] + public TimeSpan MaxCacheAge { get; set; } = TimeSpan.FromMinutes(1); + + [JsonPropertyName("max_age_sec")] + public uint MaxAgeSec + { + get => (uint)MaxCacheAge.TotalSeconds; + set => MaxCacheAge = TimeSpan.FromSeconds(value); + } + /* + * Default disable cache + */ + public TimeSpan RefreshInterval { get; set; } = TimeSpan.Zero; + + [JsonPropertyName("refresh_interval_sec")] + public uint RefreshSec + { + get => (uint)RefreshInterval.TotalSeconds; + set => RefreshInterval = TimeSpan.FromSeconds(value); + } + + [JsonPropertyName("write_through")] + public bool WriteThrough { get; set; } = true; + } +} \ No newline at end of file diff --git a/lib/VNLib.Plugins.Extensions.VNCache/src/RemoteBackedMemoryCache.cs b/lib/VNLib.Plugins.Extensions.VNCache/src/RemoteBackedMemoryCache.cs new file mode 100644 index 0000000..67fb550 --- /dev/null +++ b/lib/VNLib.Plugins.Extensions.VNCache/src/RemoteBackedMemoryCache.cs @@ -0,0 +1,258 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Plugins.Extensions.VNCache +* File: RemoteBackedMemoryCache.cs +* +* RemoteBackedMemoryCache.cs is part of VNLib.Plugins.Extensions.VNCache +* which is part of the larger VNLib collection of libraries and utilities. +* +* VNLib.Plugins.Extensions.VNCache is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Plugins.Extensions.VNCache is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Linq; +using System.Buffers; +using System.Text.Json; +using System.Threading; +using System.Threading.Tasks; +using System.Runtime.CompilerServices; + +using VNLib.Utils; +using VNLib.Utils.Memory; +using VNLib.Utils.Logging; +using VNLib.Utils.Extensions; +using VNLib.Data.Caching; +using VNLib.Data.Caching.ObjectCache; +using VNLib.Plugins.Extensions.Loading; +using VNLib.Plugins.Extensions.Loading.Events; + +namespace VNLib.Plugins.Extensions.VNCache +{ + + [ConfigurationName(VNCacheExtensions.CACHE_CONFIG_KEY)] + internal sealed class RemoteBackedMemoryCache : VnCacheClient, IIntervalScheduleable + { + private readonly MemoryCacheConfig _cacheConfig; + private readonly ICacheObjectSerialzer _serialzer; + private readonly ICacheObjectDeserialzer _deserialzer; + private readonly IBlobCacheTable _memCache; + + public RemoteBackedMemoryCache(PluginBase plugin, IConfigScope config) : base(plugin, config) + { + //Get nested memory cache config + MemoryCacheConfig memCacheConfig = config[VNCacheExtensions.MEMORY_CACHE_CONFIG_KEY].Deserialize()!; + + //Setup cache table + _memCache = new BlobCacheTable(memCacheConfig.TableSize, memCacheConfig.BucketSize, Client.Config.BufferHeap ?? MemoryUtil.Shared); + + _cacheConfig = memCacheConfig; + + /* + * Default to json serialization by using the default + * serializer and JSON options + */ + + JsonCacheObjectSerializer defaultSerializer = new(); + _serialzer = defaultSerializer; + _deserialzer = defaultSerializer; + + //Schedule cache purge + if (memCacheConfig.RefreshInterval > TimeSpan.Zero) + { + plugin.ScheduleInterval(this, memCacheConfig.RefreshInterval); + } + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private void CheckConnected() + { + if (!IsConnected) + { + throw new InvalidOperationException("The client is not connected to the remote cache"); + } + } + + public override async Task DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken) + { + //Cleanup + try + { + await base.DoWorkAsync(pluginLog, exitToken); + } + finally + { + _memCache.Dispose(); + } + } + + /// + public override Task AddOrUpdateAsync(string key, string? newKey, T value, CancellationToken cancellation) + => AddOrUpdateAsync(key, newKey, value, _serialzer, cancellation); + + /// + public override Task DeleteAsync(string key, CancellationToken cancellation) + { + CheckConnected(); + + //Delete the object from + Task local = _memCache.DeleteObjectAsync(key, cancellation).AsTask(); + Task remote = Client.DeleteObjectAsync(key, cancellation); + + //task when both complete + return Task.WhenAll(local, remote); + } + + /// + public override Task GetAsync(string key, CancellationToken cancellation) => GetAsync(key, _deserialzer, cancellation); + + /// + public override async Task GetAsync(string key, ICacheObjectDeserialzer deserializer, CancellationToken cancellation) + { + CheckConnected(); + + Type objType = typeof(T); + + IBlobCacheBucket bucket = _memCache.GetBucket(key); + + //Obtain cache handle + using (CacheBucketHandle handle = await bucket.WaitAsync(cancellation)) + { + //Try to read the value + if (handle.Cache.TryGetValue(key, out CacheEntry entry)) + { + return (T?)deserializer.Deserialze(objType, entry.GetDataSegment()); + } + } + + //Alloc buffer from client heap + using ObjectGetBuffer getBuffer = new(Client.Config.BufferHeap); + + //Get the object from the server + await Client.GetObjectAsync(key, getBuffer, cancellation); + + //See if object data was set + if (getBuffer.GetData().IsEmpty) + { + return default; + } + + //Update local cache + await _memCache.AddOrUpdateObjectAsync(key, null, static b => b.GetData(), getBuffer, DateTime.UtcNow, CancellationToken.None); + + //Deserialze the entity + return (T)deserializer.Deserialze(objType, getBuffer.GetData()); + } + + /// + public override async Task AddOrUpdateAsync(string key, string? newKey, T value, ICacheObjectSerialzer serialzer, CancellationToken cancellation) + { + CheckConnected(); + + DateTime currentTime = DateTime.UtcNow; + + //Alloc serialzation buffer + using AddOrUpdateBuffer buffer = new (Client.Config.BufferHeap); + + //Serialze the value + serialzer.Serialize(value, buffer); + + try + { + //Update remote first, and if exceptions are raised, do not update local cache + await Client.AddOrUpdateObjectAsync(key, newKey, (IObjectData)buffer, cancellation); + + await _memCache.AddOrUpdateObjectAsync(key, newKey, static b => b.GetData(), buffer, currentTime, CancellationToken.None); + } + catch + { + //Remove local cache if exception occurs + await _memCache.DeleteObjectAsync(key, CancellationToken.None); + throw; + } + } + + async Task IIntervalScheduleable.OnIntervalAsync(ILogProvider log, CancellationToken cancellationToken) + { + if(!IsConnected) + { + return; + } + + //Get buckets + IBlobCacheBucket[] buckets = _memCache.ToArray(); + + foreach (IBlobCacheBucket bucket in buckets) + { + //enter bucket lock + using CacheBucketHandle handle = await bucket.WaitAsync(cancellationToken); + + //Prune expired entires + PruneExpired(handle.Cache); + } + } + + private void PruneExpired(IBlobCache cache) + { + DateTime current = DateTime.UtcNow; + + //Enumerate all cache entires to determine if they have expired + string[] expired = (from ec in cache + where ec.Value.GetTime().Add(_cacheConfig.MaxCacheAge) < current + select ec.Key) + .ToArray(); + + //Remove expired entires + for(int i = 0; i < expired.Length; i++) + { + cache.Remove(expired[i]); + } + + Client.Config.DebugLog?.Debug("Cleaned {mc} expired memory cache elements", expired.Length); + } + + /* + * A buffer to store object data on a cache get + */ + private sealed class ObjectGetBuffer : VnDisposeable, IObjectData + { + private IMemoryHandle? _buffer; + private readonly IUnmangedHeap _heap; + + public ObjectGetBuffer(IUnmangedHeap heap) + { + _heap = heap; + } + + public ReadOnlySpan GetData() + { + return _buffer == null ? ReadOnlySpan.Empty : _buffer.Span; + } + + public void SetData(ReadOnlySpan data) + { + //Alloc a buffer from the supplied data + _buffer = data.IsEmpty ? null : _heap.AllocAndCopy(data); + } + + protected override void Free() + { + //Free buffer + _buffer?.Dispose(); + } + } + + } +} \ No newline at end of file diff --git a/lib/VNLib.Plugins.Extensions.VNCache/src/VNCacheExtensions.cs b/lib/VNLib.Plugins.Extensions.VNCache/src/VNCacheExtensions.cs index 75b9bd4..995b71a 100644 --- a/lib/VNLib.Plugins.Extensions.VNCache/src/VNCacheExtensions.cs +++ b/lib/VNLib.Plugins.Extensions.VNCache/src/VNCacheExtensions.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2022 Vaughn Nugent +* Copyright (c) 2023 Vaughn Nugent * * Library: VNLib * Package: VNLib.Plugins.Extensions.VNCache @@ -23,14 +23,13 @@ */ using System; -using System.Text.Json; -using System.Threading.Tasks; -using System.Collections.Generic; +using System.Runtime.CompilerServices; -using VNLib.Utils.Logging; +using VNLib.Hashing; +using VNLib.Utils.Memory; +using VNLib.Utils.Extensions; using VNLib.Data.Caching; -using VNLib.Data.Caching.Extensions; -using VNLib.Plugins.Extensions.Loading; +using VNLib.Plugins.Extensions.VNCache.DataModel; namespace VNLib.Plugins.Extensions.VNCache { @@ -40,72 +39,84 @@ namespace VNLib.Plugins.Extensions.VNCache /// public static class VNCacheExtensions { + internal const string CACHE_CONFIG_KEY = "vncache"; + internal const string MEMORY_CACHE_CONFIG_KEY = "memory_cache"; + internal const string MEMORY_CACHE_ONLY_KEY = "memory_only"; + + /// - /// Loads the shared cache provider for the current plugin + /// Gets a simple scoped cache based on an entity prefix. The prefix is appended + /// to the object id on each cache operation /// - /// - /// A localized log provider to write cache logging information to - /// The shared - /// - /// The returned instance, background work, logging, and its lifetime - /// are managed by the current plugin. Beware when calling this method - /// network connections may be spawend and managed in the background by - /// this library. - /// - public static VnCacheClient GetGlobalCache(this PluginBase pbase, ILogProvider? localized = null) - => LoadingExtensions.GetOrCreateSingleton(pbase, localized == null ? LoadCacheClient : (pbase) => LoadCacheClient(pbase, localized)); - - private static VnCacheClient LoadCacheClient(PluginBase pbase) => LoadCacheClient(pbase, pbase.Log); - - private static VnCacheClient LoadCacheClient(PluginBase pbase, ILogProvider localized) + /// + /// The simple prefix string to append to object ids before computing hashes + /// The algorithm used to hash the combined object-ids + /// The string encoding method used to encode the hash output + /// The instance that will use the prefix to compute object ids + /// + public static ScopedCache GetPrefixedCache(this IGlobalCacheProvider cache, string prefix, HashAlg digest = HashAlg.SHA1, HashEncodingMode encoding = HashEncodingMode.Base64) { - //Get config for client - IReadOnlyDictionary config = pbase.GetConfigForType(); - - //Init client - ILogProvider? debugLog = pbase.IsDebug() ? pbase.Log : null; - VnCacheClient client = new(debugLog); - - //Begin cache connections by scheduling a task on the plugin's scheduler - _ = pbase.ObserveTask(() => RunClientAsync(pbase, config, localized, client), 250); - - return client; + _ = cache ?? throw new ArgumentNullException(nameof(cache)); + _ = prefix ?? throw new ArgumentNullException(nameof(prefix)); + //Create simple cache key generator + SimpleCacheKeyImpl keyProv = new(prefix, digest, encoding); + //Create the scoped cache from the simple provider + return cache.GetScopedCache(keyProv); } - private static async Task RunClientAsync(PluginBase pbase, IReadOnlyDictionary config, ILogProvider localized, VnCacheClient client) + private sealed class SimpleCacheKeyImpl : ICacheKeyGenerator { - ILogProvider Log = localized; + private readonly string Prefix; + private readonly HashAlg Digest; + private readonly HashEncodingMode Encoding; - try + public SimpleCacheKeyImpl(string prefix, HashAlg digest, HashEncodingMode encoding) { - //Try loading config - await client.LoadConfigAsync(pbase, config); + Prefix = prefix; + Digest = digest; + Encoding = encoding; + } - Log.Verbose("VNCache client configration loaded successfully"); + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private int ComputeBufferSize(string id) => id.Length + Prefix.Length; - //Run and wait for exit - await client.RunAsync(Log, pbase.UnloadToken); - } - catch (OperationCanceledException) - { } - catch (KeyNotFoundException e) - { - Log.Error("Missing required configuration variable for VnCache client: {0}", e.Message); - } - catch (FBMServerNegiationException fne) - { - Log.Error("Failed to negotiate connection with cache server {reason}", fne.Message); - } - catch (Exception ex) - { - Log.Error(ex, "Unhandled exception occured in background cache client listening task"); - } - finally + + string ICacheKeyGenerator.ComputedKey(string entityId) { - client.Dispose(); - } + //Compute the required character buffer size + int bufferSize = ComputeBufferSize(entityId); + + if(bufferSize < 128) + { + //Stack alloc a buffer + Span buffer = stackalloc char[bufferSize]; + + //Writer to accumulate data + ForwardOnlyWriter writer = new(buffer); + + //Append prefix and entity id + writer.Append(Prefix); + writer.Append(entityId); - Log.Information("Cache client exited"); - } + //Compute the simple hash of the combined values + return ManagedHash.ComputeHash(writer.AsSpan(), Digest, Encoding); + } + else + { + //Alloc heap buffer for string concatination + using UnsafeMemoryHandle buffer = MemoryUtil.UnsafeAlloc(bufferSize, true); + + //Writer to accumulate data + ForwardOnlyWriter writer = new(buffer); + + //Append prefix and entity id + writer.Append(Prefix); + writer.Append(entityId); + + //Compute the simple hash of the combined values + return ManagedHash.ComputeHash(writer.AsSpan(), Digest, Encoding); + } + } + } } } diff --git a/lib/VNLib.Plugins.Extensions.VNCache/src/VNLib.Plugins.Extensions.VNCache.csproj b/lib/VNLib.Plugins.Extensions.VNCache/src/VNLib.Plugins.Extensions.VNCache.csproj index f8f3e29..1308c4b 100644 --- a/lib/VNLib.Plugins.Extensions.VNCache/src/VNLib.Plugins.Extensions.VNCache.csproj +++ b/lib/VNLib.Plugins.Extensions.VNCache/src/VNLib.Plugins.Extensions.VNCache.csproj @@ -4,9 +4,6 @@ net6.0 VNLib.Plugins.Extensions.VNCache VNLib.Plugins.Extensions.VNCache - 1.0.1.1 - True - \\vaughnnugent.com\Internal\Folder Redirection\vman\Documents\Programming\Software\StrongNameingKey.snk enable latest-all True @@ -14,14 +11,27 @@ Vaughn Nugent + Vaughn Nugent + VNLib.Plugins.Extensions.VNCache + VNLib.Plugins.Extensions.VNCache + + An Essentials framework extension library for integrating VNCache global caching provider into applications + for distributed data caching, and extensions for entity caching. + Copyright © 2023 Vaughn Nugent - https://www.vaughnnugent.com/resources/software + https://www.vaughnnugent.com/resources/software/modules/VNLib.Data.Caching + https://github.com/VnUgE/VNLib.Data.Caching/tree/master/lib/VNLib.Plugins.Extensions.VNCache + + + + + diff --git a/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs b/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs index a216f18..1e1b74c 100644 --- a/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs +++ b/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2022 Vaughn Nugent +* Copyright (c) 2023 Vaughn Nugent * * Library: VNLib * Package: VNLib.Plugins.Extensions.VNCache @@ -24,7 +24,6 @@ using System; using System.Net.Http; -using System.Text.Json; using System.Threading; using System.Threading.Tasks; using System.Net.Sockets; @@ -32,71 +31,74 @@ using System.Net.WebSockets; using System.Collections.Generic; using System.Security.Cryptography; -using VNLib.Utils; using VNLib.Utils.Memory; using VNLib.Utils.Logging; using VNLib.Utils.Extensions; using VNLib.Hashing.IdentityUtility; using VNLib.Data.Caching; using VNLib.Data.Caching.Extensions; +using VNLib.Data.Caching.ObjectCache; using VNLib.Net.Messaging.FBM.Client; using VNLib.Plugins.Extensions.Loading; namespace VNLib.Plugins.Extensions.VNCache { - /// - /// A wrapper to simplify a shared global cache client - /// - [ConfigurationName("vncache")] - public sealed class VnCacheClient : VnDisposeable, IGlobalCacheProvider + public interface ICacheRefreshPolicy { - FBMClient? _client; + TimeSpan MaxCacheAge { get; } - private TimeSpan RetryInterval; + TimeSpan RefreshInterval { get; } + } - private readonly ILogProvider? DebugLog; - private readonly IUnmangedHeap? ClientHeap; + /// + /// A base class that manages + /// + [ConfigurationName(VNCacheExtensions.CACHE_CONFIG_KEY)] + internal class VnCacheClient : IGlobalCacheProvider, IAsyncBackgroundWork, IAsyncConfigurable + { + private readonly TimeSpan RetryInterval; /// - /// Initializes an emtpy client wrapper that still requires - /// configuration loading + /// The internal client /// - /// An optional debugging log - /// An optional for buffers - internal VnCacheClient(ILogProvider? debugLog, IUnmangedHeap? heap = null) - { - DebugLog = debugLog; - //Default to 10 seconds - RetryInterval = TimeSpan.FromSeconds(10); - - ClientHeap = heap; - } - - /// - protected override void Free() - { - _client?.Dispose(); - _client = null; - } - + public FBMClient Client { get; } /// - /// Loads required configuration variables from the config store and - /// intializes the interal client + /// Gets a value that determines if the client is currently connected to a server /// - /// - /// A dictionary of configuration varables - /// - internal async Task LoadConfigAsync(PluginBase pbase, IReadOnlyDictionary config) + public bool IsConnected { get; private set; } + + public VnCacheClient(PluginBase pbase, IConfigScope config) { + //Get required configuration variables int maxMessageSize = config["max_message_size"].GetInt32(); string? brokerAddress = config["broker_address"].GetString() ?? throw new KeyNotFoundException("Missing required configuration variable broker_address"); + RetryInterval = config["retry_interval_sec"].GetTimeSpan(TimeParseType.Seconds); + TimeSpan timeout = config["request_timeout_sec"].GetTimeSpan(TimeParseType.Seconds); + + Uri brokerUri = new(brokerAddress); + + //Setup debug log if the plugin is in debug mode + ILogProvider? debugLog = pbase.IsDebug() ? pbase.Log : null; + + //Init the client with default settings + FBMClientConfig conf = FBMDataCacheExtensions.GetDefaultConfig(MemoryUtil.Shared, maxMessageSize, timeout, debugLog); + + Client = new(conf); + //Add the configuration to the client + Client.GetCacheConfiguration() + .WithBroker(brokerUri) + .WithTls(brokerUri.Scheme == Uri.UriSchemeHttps); + } + + public virtual async Task ConfigureServiceAsync(PluginBase plugin) + { //Get keys async - Task clientPrivTask = pbase.TryGetSecretAsync("client_private_key").ToJsonWebKey(); - Task brokerPubTask = pbase.TryGetSecretAsync("broker_public_key").ToJsonWebKey(); - Task cachePubTask = pbase.TryGetSecretAsync("cache_public_key").ToJsonWebKey(); + Task clientPrivTask = plugin.TryGetSecretAsync("client_private_key").ToJsonWebKey(); + Task brokerPubTask = plugin.TryGetSecretAsync("broker_public_key").ToJsonWebKey(); + Task cachePubTask = plugin.TryGetSecretAsync("cache_public_key").ToJsonWebKey(); //Wait for all tasks to complete _ = await Task.WhenAll(clientPrivTask, brokerPubTask, cachePubTask); @@ -105,132 +107,151 @@ namespace VNLib.Plugins.Extensions.VNCache ReadOnlyJsonWebKey brokerPub = await brokerPubTask ?? throw new KeyNotFoundException("Missing required secret broker_public_key"); ReadOnlyJsonWebKey cachePub = await cachePubTask ?? throw new KeyNotFoundException("Missing required secret cache_public_key"); - RetryInterval = config["retry_interval_sec"].GetTimeSpan(TimeParseType.Seconds); - - Uri brokerUri = new(brokerAddress); - - //Init the client with default settings - FBMClientConfig conf = FBMDataCacheExtensions.GetDefaultConfig(ClientHeap ?? MemoryUtil.Shared, maxMessageSize, DebugLog); - - _client = new(conf); - - //Add the configuration to the client - _client.GetCacheConfiguration() - .WithBroker(brokerUri) + //Connection authentication methods + Client.GetCacheConfiguration() .WithVerificationKey(cachePub) .WithSigningCertificate(clientPriv) - .WithBrokerVerificationKey(brokerPub) - .WithTls(brokerUri.Scheme == Uri.UriSchemeHttps); + .WithBrokerVerificationKey(brokerPub); } - /// - /// Discovers nodes in the configured cluster and connects to a random node - /// - /// A to write log events to - /// A token to cancel the operation - /// A task that completes when the operation has been cancelled or an unrecoverable error occured - /// - /// - internal async Task RunAsync(ILogProvider Log, CancellationToken cancellationToken) + /* + * Background work method manages the remote cache connection + * to the cache cluster + */ + public virtual async Task DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken) { - _ = _client ?? throw new InvalidOperationException("Client configuration not loaded, cannot connect to cache servers"); - - while (true) - { - //Load the server list - ActiveServer[]? servers; + try + { while (true) { - try + //Load the server list + ActiveServer[]? servers; + while (true) { - Log.Debug("Discovering cluster nodes in broker"); - //Get server list - servers = await _client.DiscoverCacheNodesAsync(cancellationToken); - break; + try + { + pluginLog.Debug("Discovering cluster nodes in broker"); + //Get server list + servers = await Client.DiscoverCacheNodesAsync(exitToken); + break; + } + catch (HttpRequestException re) when (re.InnerException is SocketException) + { + pluginLog.Warn("Broker server is unreachable"); + } + catch (Exception ex) + { + pluginLog.Warn("Failed to get server list from broker, reason {r}", ex.Message); + } + + //Gen random ms delay + int randomMsDelay = RandomNumberGenerator.GetInt32(1000, 2000); + await Task.Delay(randomMsDelay, exitToken); } - catch (HttpRequestException re) when (re.InnerException is SocketException) - { - Log.Warn("Broker server is unreachable"); - } - catch (Exception ex) + + if (servers?.Length == 0) { - Log.Warn("Failed to get server list from broker, reason {r}", ex.Message); + pluginLog.Warn("No cluster nodes found, retrying"); + await Task.Delay(RetryInterval, exitToken); + continue; } - //Gen random ms delay - int randomMsDelay = RandomNumberGenerator.GetInt32(1000, 2000); - await Task.Delay(randomMsDelay, cancellationToken); - } - - if (servers?.Length == 0) - { - Log.Warn("No cluster nodes found, retrying"); - await Task.Delay(RetryInterval, cancellationToken); - continue; - } - - try - { - Log.Debug("Connecting to random cache server"); + try + { + pluginLog.Debug("Connecting to random cache server"); - //Connect to a random server - ActiveServer selected = await _client.ConnectToRandomCacheAsync(cancellationToken); - Log.Debug("Connected to cache server {s}", selected.ServerId); + //Connect to a random server + ActiveServer selected = await Client.ConnectToRandomCacheAsync(exitToken); + pluginLog.Debug("Connected to cache server {s}", selected.ServerId); - //Set connection status flag - IsConnected = true; + //Set connection status flag + IsConnected = true; - //Wait for disconnect - await _client.WaitForExitAsync(cancellationToken); + //Wait for disconnect + await Client.WaitForExitAsync(exitToken); - Log.Debug("Cache server disconnected"); - } - catch (WebSocketException wse) - { - Log.Warn("Failed to connect to cache server {reason}", wse.Message); - continue; - } - catch (HttpRequestException he) when (he.InnerException is SocketException) - { - Log.Debug("Failed to connect to random cache server server"); - //Continue next loop - continue; - } - finally - { - IsConnected = false; + pluginLog.Debug("Cache server disconnected"); + } + catch (WebSocketException wse) + { + pluginLog.Warn("Failed to connect to cache server {reason}", wse.Message); + continue; + } + catch (HttpRequestException he) when (he.InnerException is SocketException) + { + pluginLog.Debug("Failed to connect to random cache server server"); + //Continue next loop + continue; + } + finally + { + IsConnected = false; + } } } + catch (OperationCanceledException) + { + //Normal exit from listening loop + } + catch (KeyNotFoundException e) + { + pluginLog.Error("Missing required configuration variable for VnCache client: {0}", e.Message); + } + catch (FBMServerNegiationException fne) + { + pluginLog.Error("Failed to negotiate connection with cache server {reason}", fne.Message); + } + catch (Exception ex) + { + pluginLog.Error(ex, "Unhandled exception occured in background cache client listening task"); + } + finally + { + //Dispose the client on exit + Client.Dispose(); + } + pluginLog.Information("Cache client exited"); } /// - public bool IsConnected { get; private set; } - - - /// - public Task AddOrUpdateAsync(string key, string? newKey, T value, CancellationToken cancellation) + public virtual Task AddOrUpdateAsync(string key, string? newKey, T value, CancellationToken cancellation) { return !IsConnected ? throw new InvalidOperationException("The underlying client is not connected to a cache node") - : _client!.AddOrUpdateObjectAsync(key, newKey, value, cancellation); + : Client!.AddOrUpdateObjectAsync(key, newKey, value, cancellation); } /// - public Task DeleteAsync(string key, CancellationToken cancellation) + public virtual Task DeleteAsync(string key, CancellationToken cancellation) { return !IsConnected ? throw new InvalidOperationException("The underlying client is not connected to a cache node") - : _client!.DeleteObjectAsync(key, cancellation); + : Client!.DeleteObjectAsync(key, cancellation); } - - + /// - public Task GetAsync(string key, CancellationToken cancellation) + public virtual Task GetAsync(string key, CancellationToken cancellation) { return !IsConnected ? throw new InvalidOperationException("The underlying client is not connected to a cache node") - : _client!.GetObjectAsync(key, cancellation); + : Client!.GetObjectAsync(key, cancellation); + } + + /// + public virtual Task GetAsync(string key, ICacheObjectDeserialzer deserializer, CancellationToken cancellation) + { + return !IsConnected + ? throw new InvalidOperationException("The underlying client is not connected to a cache node") + : Client!.GetObjectAsync(key, deserializer, cancellation); + } + + /// + public virtual Task AddOrUpdateAsync(string key, string? newKey, T value, ICacheObjectSerialzer serialzer, CancellationToken cancellation) + { + return !IsConnected + ? throw new InvalidOperationException("The underlying client is not connected to a cache node") + : Client!.AddOrUpdateObjectAsync(key, newKey, value, serialzer, cancellation); } } } \ No newline at end of file diff --git a/lib/VNLib.Plugins.Extensions.VNCache/src/VnGlobalCache.cs b/lib/VNLib.Plugins.Extensions.VNCache/src/VnGlobalCache.cs new file mode 100644 index 0000000..3cdebe3 --- /dev/null +++ b/lib/VNLib.Plugins.Extensions.VNCache/src/VnGlobalCache.cs @@ -0,0 +1,102 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Plugins.Extensions.VNCache +* File: VnGlobalCache.cs +* +* VnGlobalCache.cs is part of VNLib.Plugins.Extensions.VNCache which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Plugins.Extensions.VNCache is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Plugins.Extensions.VNCache is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System.Text.Json; +using System.Threading; +using System.Threading.Tasks; + +using VNLib.Data.Caching; +using VNLib.Plugins.Extensions.Loading; + +namespace VNLib.Plugins.Extensions.VNCache +{ + /// + /// A wrapper to simplify a shared global cache client + /// + [ConfigurationName(VNCacheExtensions.CACHE_CONFIG_KEY)] + public sealed class VnGlobalCache : IGlobalCacheProvider + { + private readonly IGlobalCacheProvider _client; + + /// + /// Initializes an emtpy client wrapper that still requires + /// configuration loading + /// + public VnGlobalCache(PluginBase pbase, IConfigScope config) + { + if (config.TryGetValue(VNCacheExtensions.MEMORY_CACHE_CONFIG_KEY, out _)) + { + //Check for memory only flag + if (config.TryGetValue(VNCacheExtensions.MEMORY_CACHE_ONLY_KEY, out JsonElement memOnly) && memOnly.GetBoolean()) + { + //Create a memory-only cache + _client = pbase.GetOrCreateSingleton(); + } + else + { + //Remote-backed memory cache + _client = pbase.GetOrCreateSingleton(); + } + } + else + { + //Setup non-memory backed cache client + _client = pbase.GetOrCreateSingleton(); + } + } + + /// + public bool IsConnected => _client.IsConnected; + + /// + public Task AddOrUpdateAsync(string key, string? newKey, T value, CancellationToken cancellation) + { + return _client.AddOrUpdateAsync(key, newKey, value, cancellation); + } + + /// + public Task AddOrUpdateAsync(string key, string? newKey, T value, ICacheObjectSerialzer serialzer, CancellationToken cancellation) + { + return _client.AddOrUpdateAsync(key, newKey, value, serialzer, cancellation); + } + + /// + public Task DeleteAsync(string key, CancellationToken cancellation) + { + return _client.DeleteAsync(key, cancellation); + } + + /// + public Task GetAsync(string key, CancellationToken cancellation) + { + return _client.GetAsync(key, cancellation); + } + + /// + public Task GetAsync(string key, ICacheObjectDeserialzer deserializer, CancellationToken cancellation) + { + return _client.GetAsync(key, deserializer, cancellation); + } + } +} \ No newline at end of file -- cgit