diff options
36 files changed, 2502 insertions, 765 deletions
diff --git a/lib/VNLib.Data.Caching.Extensions/src/BrokerRegistrationRequest.cs b/lib/VNLib.Data.Caching.Extensions/src/BrokerRegistrationRequest.cs index 8e37014..9ec559a 100644 --- a/lib/VNLib.Data.Caching.Extensions/src/BrokerRegistrationRequest.cs +++ b/lib/VNLib.Data.Caching.Extensions/src/BrokerRegistrationRequest.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2022 Vaughn Nugent +* Copyright (c) 2023 Vaughn Nugent * * Library: VNLib * Package: VNLib.Data.Caching.Extensions diff --git a/lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs b/lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs index 6e79b8c..32f9a0e 100644 --- a/lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs +++ b/lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs @@ -30,6 +30,7 @@ using System.Security; using System.Text.Json; using System.Threading; using System.Threading.Tasks; +using System.Collections.Generic; using System.Security.Cryptography; using System.Runtime.CompilerServices; @@ -44,7 +45,7 @@ using VNLib.Utils.Extensions; using VNLib.Net.Rest.Client; using VNLib.Net.Messaging.FBM; using VNLib.Net.Messaging.FBM.Client; -using System.Collections.Generic; + namespace VNLib.Data.Caching.Extensions { @@ -226,10 +227,7 @@ namespace VNLib.Data.Caching.Extensions using ClientContract cc = ClientPool.Lease(); //Exec the regitration request RestResponse response = await cc.Resource.ExecutePutAsync(regRequest); - if(!response.IsSuccessful) - { - throw response.ErrorException!; - } + response.ThrowIfError(); } @@ -386,12 +384,8 @@ namespace VNLib.Data.Caching.Extensions { //Execute the request RestResponse response = await clientContract.Resource.ExecuteGetAsync(negotation, token); - - //Check verify the response - if (!response.IsSuccessful) - { - throw response.ErrorException!; - } + + response.ThrowIfError(); if (response.Content == null) { diff --git a/lib/VNLib.Data.Caching.Extensions/src/ListServerRequest.cs b/lib/VNLib.Data.Caching.Extensions/src/ListServerRequest.cs index caa53ad..fd25925 100644 --- a/lib/VNLib.Data.Caching.Extensions/src/ListServerRequest.cs +++ b/lib/VNLib.Data.Caching.Extensions/src/ListServerRequest.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2022 Vaughn Nugent +* Copyright (c) 2023 Vaughn Nugent * * Library: VNLib * Package: VNLib.Data.Caching.Extensions diff --git a/lib/VNLib.Data.Caching.ObjectCache/src/BlobCache.cs b/lib/VNLib.Data.Caching.ObjectCache/src/BlobCache.cs index fbb2dcc..440981a 100644 --- a/lib/VNLib.Data.Caching.ObjectCache/src/BlobCache.cs +++ b/lib/VNLib.Data.Caching.ObjectCache/src/BlobCache.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2022 Vaughn Nugent +* Copyright (c) 2023 Vaughn Nugent * * Library: VNLib * Package: VNLib.Data.Caching.ObjectCache @@ -24,30 +24,38 @@ using System; using System.Collections.Generic; +using System.Diagnostics; +using VNLib.Utils.Memory; using VNLib.Utils.Memory.Caching; -namespace VNLib.Data.Caching +namespace VNLib.Data.Caching.ObjectCache { /// <summary> /// A general purpose binary data storage /// </summary> - public class BlobCache : LRUCache<string, CacheEntry> + public sealed class BlobCache : LRUCache<string, CacheEntry>, IBlobCache { + private bool disposedValue; + ///<inheritdoc/> public override bool IsReadOnly { get; } ///<inheritdoc/> protected override int MaxCapacity { get; } + + ///<inheritdoc/> + public IUnmangedHeap CacheHeap { get; } /// <summary> /// Initializes a new <see cref="BlobCache"/> store /// </summary> /// <param name="maxCapacity">The maximum number of items to keep in memory</param> + /// <param name="heap">The unmanaged heap used to allocate cache entry buffers from</param> /// <exception cref="ArgumentException"></exception> - public BlobCache(int maxCapacity) + public BlobCache(int maxCapacity, IUnmangedHeap heap) :base(StringComparer.Ordinal) { if(maxCapacity < 1) @@ -55,6 +63,8 @@ namespace VNLib.Data.Caching throw new ArgumentException("The maxium capacity of the store must be a positive integer larger than 0", nameof(maxCapacity)); } + CacheHeap = heap; + MaxCapacity = maxCapacity; //Update the lookup table size @@ -75,18 +85,11 @@ namespace VNLib.Data.Caching evicted.Value.Dispose(); } - /// <summary> - /// If the <see cref="CacheEntry"/> is found in the store, changes the key - /// that referrences the blob. - /// </summary> - /// <param name="currentKey">The key that currently referrences the blob in the store</param> - /// <param name="newKey">The new key that will referrence the blob</param> - /// <param name="blob">The <see cref="CacheEntry"/> if its found in the store</param> - /// <returns>True if the record was found and the key was changes</returns> - public bool TryChangeKey(string currentKey, string newKey, out CacheEntry blob) + ///<inheritdoc/> + public bool TryChangeKey(string objectId, string newId, out CacheEntry blob) { //Try to get the node at the current key - if (LookupTable.Remove(currentKey, out LinkedListNode<KeyValuePair<string, CacheEntry>> ? node)) + if (LookupTable.Remove(objectId, out LinkedListNode<KeyValuePair<string, CacheEntry>> ? node)) { //Remove the node from the ll List.Remove(node); @@ -95,13 +98,13 @@ namespace VNLib.Data.Caching blob = node.ValueRef.Value; //Update the - node.Value = new KeyValuePair<string, CacheEntry>(newKey, blob); + node.Value = new KeyValuePair<string, CacheEntry>(newId, blob); //Add to end of list List.AddLast(node); //Re-add to lookup table with new key - LookupTable.Add(newKey, node); + LookupTable.Add(newId, node); return true; } @@ -110,11 +113,7 @@ namespace VNLib.Data.Caching return false; } - /// <summary> - /// Removes the <see cref="CacheEntry"/> from the store, and frees its resources - /// </summary> - /// <param name="key">The key that referrences the <see cref="CacheEntry"/> in the store</param> - /// <returns>A value indicating if the blob was removed</returns> + ///<inheritdoc/> public override bool Remove(string key) { //Remove the item from the lookup table and if it exists, remove the node from the list @@ -129,6 +128,7 @@ namespace VNLib.Data.Caching //Remove the node from the list List.Remove(node); } + return true; } @@ -153,5 +153,44 @@ namespace VNLib.Data.Caching //empty all cache entires in the store base.Clear(); } + + ///<inheritdoc/> + public bool Remove(string objectId, out CacheEntry entry) + { + //Try to get the stored object + if(TryGetValue(objectId, out entry)) + { + //remove the entry and bypass the disposal + bool result = base.Remove(objectId); +#if DEBUG + Debug.Assert(result == true); +#endif + return true; + } + + entry = default; + return false; + } + + ///<inheritdoc/> + void Dispose(bool disposing) + { + if (!disposedValue) + { + if (disposing) + { + Clear(); + } + disposedValue = true; + } + } + + ///<inheritdoc/> + public void Dispose() + { + // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method + Dispose(disposing: true); + GC.SuppressFinalize(this); + } } } diff --git a/lib/VNLib.Data.Caching.ObjectCache/src/BlobCacheBucket.cs b/lib/VNLib.Data.Caching.ObjectCache/src/BlobCacheBucket.cs new file mode 100644 index 0000000..f79db3f --- /dev/null +++ b/lib/VNLib.Data.Caching.ObjectCache/src/BlobCacheBucket.cs @@ -0,0 +1,95 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Data.Caching.ObjectCache +* File: BlobCacheBucket.cs +* +* BlobCacheBucket.cs is part of VNLib.Data.Caching.ObjectCache which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Data.Caching.ObjectCache is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Data.Caching.ObjectCache is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System.Threading; +using System.Threading.Tasks; + +using VNLib.Utils.Memory; + +namespace VNLib.Data.Caching.ObjectCache +{ + public sealed class BlobCacheBucket : IBlobCacheBucket + { + private readonly IBlobCache _cacheTable; + private readonly SemaphoreSlim _lock; + + /// <summary> + /// Initialzies a new <see cref="BlobCacheBucket"/> and its underlying + /// <see cref="IBlobCache"/> + /// </summary> + /// <param name="bucketCapacity"> + /// The maxium number of entries allowed in the LRU cache + /// before LRU overflow happens. + /// </param> + /// <param name="heap">The heap to allocate object cache buffers</param> + public BlobCacheBucket(int bucketCapacity, IUnmangedHeap heap) + { + _lock = new(1, 1); + _cacheTable = new BlobCache(bucketCapacity, heap); + } + + ///<inheritdoc/> + public void Dispose() + { + _cacheTable.Dispose(); + _lock.Dispose(); + } + + ///<inheritdoc/> + public async ValueTask<IBlobCache> ManualWaitAsync(CancellationToken cancellation) + { + //try to enter the lock synchronously + if (_lock.Wait(0, CancellationToken.None)) + { + return _cacheTable; + } + else + { + await _lock.WaitAsync(cancellation).ConfigureAwait(false); + return _cacheTable; + } + } + + ///<inheritdoc/> + public void Release() + { + _lock.Release(); + } + + ///<inheritdoc/> + public async ValueTask<CacheBucketHandle> WaitAsync(CancellationToken cancellation) + { + //try to enter the lock synchronously + if (_lock.Wait(0, CancellationToken.None)) + { + return new(this, _cacheTable); + } + else + { + await _lock.WaitAsync(cancellation).ConfigureAwait(false); + return new(this, _cacheTable); + } + } + } +} diff --git a/lib/VNLib.Data.Caching.ObjectCache/src/BlobCacheExtensions.cs b/lib/VNLib.Data.Caching.ObjectCache/src/BlobCacheExtensions.cs new file mode 100644 index 0000000..4a8692d --- /dev/null +++ b/lib/VNLib.Data.Caching.ObjectCache/src/BlobCacheExtensions.cs @@ -0,0 +1,197 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Data.Caching.ObjectCache +* File: BlobCacheExtensions.cs +* +* BlobCacheExtensions.cs is part of VNLib.Data.Caching.ObjectCache which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Data.Caching.ObjectCache is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Data.Caching.ObjectCache is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + + +using System; +using System.Threading; +using System.Threading.Tasks; + +namespace VNLib.Data.Caching.ObjectCache +{ + /// <summary> + /// Provides blob cache extension methods + /// </summary> + public static class BlobCacheExtensions + { + internal static CacheEntry CreateEntry(this IBlobCache cache, string objectId, ReadOnlySpan<byte> initialData, DateTime time) + { + CacheEntry entry = CacheEntry.Create(initialData, cache.CacheHeap); + try + { + //try to add the entry, but if exists, let it throw + cache.Add(objectId, entry); + entry.SetTime(time); + return entry; + } + catch + { + entry.Dispose(); + throw; + } + } + + internal static CacheEntry AddOrUpdateEntry(this IBlobCache cache, string objectId, ReadOnlySpan<byte> data, DateTime time) + { + //See if blob exists + if (cache.TryGetValue(objectId, out CacheEntry entry)) + { + //Update the entry since it exists + entry.UpdateData(data); + + entry.SetTime(time); + } + else + { + //Create the new entry + entry = cache.CreateEntry(objectId, data, time); + } + + return entry; + } + + internal static CacheEntry TryChangeKey(this IBlobCache cache, string objectId, string alternateId, ReadOnlySpan<byte> data, DateTime time) + { + //Change the key of the blob item and update its data + if (cache.TryChangeKey(objectId, alternateId, out CacheEntry entry)) + { + //If date is 0 length do not overwrite the old entry if found + if (data.IsEmpty) + { + return entry; + } + + //Otherwise update the entry + entry.UpdateData(data); + entry.SetTime(time); + + return entry; + } + else + { + //entry does not exist at the old id, so we can create a new one at the alternate id + return cache.CreateEntry(objectId, data, time); + } + } + + + /// <summary> + /// Asynchronously adds or updates an object in the store and optionally update's it's id + /// </summary> + /// <param name="table"></param> + /// <param name="objectId">The current (or old) id of the object</param> + /// <param name="alternateId">An optional id to update the blob to</param> + /// <param name="bodyData">A callback that returns the data for the blob</param> + /// <param name="state">The state parameter to pass to the data callback</param> + /// <param name="time">The time to set on the cache record</param> + /// <param name="cancellation">A token to cancel the async operation</param> + /// <returns>A value task that represents the async operation</returns> + public static async ValueTask AddOrUpdateObjectAsync<T>( + this IBlobCacheTable table, + string objectId, + string? alternateId, + GetBodyDataCallback<T> bodyData, + T state, + DateTime time, + CancellationToken cancellation = default) + { + //See if an id change is required + if (string.IsNullOrWhiteSpace(alternateId)) + { + //safe to get the bucket for the primary id + IBlobCacheBucket bucket = table.GetBucket(objectId); + + //Wait for the bucket + using CacheBucketHandle handle = await bucket.WaitAsync(cancellation); + + //add/update for single entity + _ = handle.Cache.AddOrUpdateEntry(objectId, bodyData(state), time); + } + else + { + //Buckets for each id need to be obtained + IBlobCacheBucket primary = table.GetBucket(objectId); + IBlobCacheBucket alternate = table.GetBucket(alternateId); + + //Same bucket + if (ReferenceEquals(primary, alternate)) + { + //wait for lock on only one bucket otherwise dealock + using CacheBucketHandle handle = await primary.WaitAsync(cancellation); + + //Update the entry for the single bucket + _ = handle.Cache.TryChangeKey(objectId, alternateId, bodyData(state), time); + } + else + { + //Buckets are different must be awaited individually + using CacheBucketHandle primaryHandle = await primary.WaitAsync(cancellation); + using CacheBucketHandle alternateHandle = await alternate.WaitAsync(cancellation); + + //Get the entry from the primary hande + if (primaryHandle.Cache.Remove(objectId, out CacheEntry entry)) + { + try + { + //Update the handle data and reuse the entry + entry.UpdateData(bodyData(state)); + + //Add the updated entry to the alternate table + alternateHandle.Cache.Add(alternateId, entry); + } + catch + { + //Cleanup handle if error adding + entry.Dispose(); + throw; + } + } + else + { + //Old entry did not exist, we need to create a new entry for the alternate bucket + _ = alternateHandle.Cache.CreateEntry(alternateId, bodyData(state), time); + } + } + } + } + + /// <summary> + /// Asynchronously deletes a previously stored item + /// </summary> + /// <param name="table"></param> + /// <param name="objectId">The id of the object to delete</param> + /// <param name="cancellation">A token to cancel the async lock await</param> + /// <returns>A task that completes when the item has been deleted</returns> + public static async ValueTask<bool> DeleteObjectAsync(this IBlobCacheTable table, string objectId, CancellationToken cancellation = default) + { + //Try to get the bucket that the id should belong to + IBlobCacheBucket bucket = table.GetBucket(objectId); + + //Wait for lock on bucket async + using CacheBucketHandle handle = await bucket.WaitAsync(cancellation); + + //Remove the object from the blob store + return handle.Cache.Remove(objectId); + } + } +} diff --git a/lib/VNLib.Data.Caching.ObjectCache/src/BlobCacheLIstener.cs b/lib/VNLib.Data.Caching.ObjectCache/src/BlobCacheLIstener.cs new file mode 100644 index 0000000..818dfcf --- /dev/null +++ b/lib/VNLib.Data.Caching.ObjectCache/src/BlobCacheLIstener.cs @@ -0,0 +1,285 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Data.Caching.ObjectCache +* File: BlobCacheLIstener.cs +* +* BlobCacheLIstener.cs is part of VNLib.Data.Caching.ObjectCache which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Data.Caching.ObjectCache is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Data.Caching.ObjectCache is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + + +/* + * The latest bucket based cache store relys on bucket level locking + * to distribute locking across threads and reduce contention. + * + * This design relys on holding the bucket lock for the entire duration + * of the CacheEntry manipulation, its id, movment, and reading/writing + * the entirie's contents. + * + * Some drawbacks are the basics with key-derrived bucket systems: + * bucket imbalance due to key distribtion. + * + * Design perfers average speed, but will need to be tested heavily per + * use-case. + */ + +using System; +using System.Threading; +using System.Threading.Tasks; + +using VNLib.Utils.Async; +using VNLib.Utils.Memory; +using VNLib.Utils.Logging; +using VNLib.Net.Messaging.FBM.Server; +using static VNLib.Data.Caching.Constants; + +namespace VNLib.Data.Caching.ObjectCache +{ + public delegate ReadOnlySpan<byte> GetBodyDataCallback<T>(T state); + + /// <summary> + /// A <see cref="FBMListener"/> implementation of a <see cref="CacheListener"/> + /// </summary> + public class BlobCacheLIstener : FBMListenerBase, IDisposable + { + private bool disposedValue; + + ///<inheritdoc/> + protected override ILogProvider Log { get; } + + /// <summary> + /// A queue that stores update and delete events + /// </summary> + public AsyncQueue<ChangeEvent> EventQueue { get; } + + /// <summary> + /// The Cache store to access data blobs + /// </summary> + public IBlobCacheTable Cache { get; } + + + /// <summary> + /// Initialzies a new <see cref="BlobCacheLIstener"/> + /// </summary> + /// <param name="cacheMax">The maxium number of items per bucket</param> + /// <param name="buckets">The number of cache store buckets</param> + /// <param name="log"></param> + /// <param name="heap">The heap to alloc FBM buffers and <see cref="CacheEntry"/> cache buffers from</param> + /// <param name="singleReader">A value that indicates if a single thread is processing events</param> + public BlobCacheLIstener(uint buckets, uint cacheMax, ILogProvider log, IUnmangedHeap heap, bool singleReader) + { + Log = log; + + //Writes may happen from multple threads with bucket design and no lock + EventQueue = new(false, singleReader); + + Cache = new BlobCacheTable(buckets, cacheMax, heap); + InitListener(heap); + } + + ///<inheritdoc/> + protected override async Task ProcessAsync(FBMContext context, object? userState, CancellationToken exitToken) + { + try + { + //Get the action header + string action = context.Method(); + + //Optional newid header + string? alternateId = context.NewObjectId(); + + switch (action) + { + case Actions.Get: + { + //Get the object-id header + string objectId = context.ObjectId(); + + //Process read + await ReadEntryAsync(context, objectId, exitToken); + return; + } + case Actions.AddOrUpdate: + { + //Get the object-id header + string objectId = context.ObjectId(); + + //Create change event for the object + ChangeEvent change = new(objectId, alternateId, false); + + await AddOrUpdateAsync(context, change, exitToken); + return; + } + case Actions.Delete: + { + //Get the object-id header + string objectId = context.ObjectId(); + + //Create change event + ChangeEvent change = new(objectId, alternateId, true); + + await DeleteEntryAsync(context, change, exitToken); + return; + } + // event queue dequeue request + case Actions.Dequeue: + { + static void SetResponse(ChangeEvent change, FBMContext context) + { + if (change.Deleted) + { + context.CloseResponse("deleted"); + context.Response.WriteHeader(ObjectId, change.CurrentId); + } + else + { + //Changed + context.CloseResponse("modified"); + context.Response.WriteHeader(ObjectId, change.CurrentId); + + //Set old id if an old id is set + if (change.CurrentId != null) + { + context.Response.WriteHeader(NewObjectId, change.AlternateId); + } + } + } + + static async Task DequeAsync(AsyncQueue<ChangeEvent> queue, FBMContext context, CancellationToken exitToken) + { + //Wait for a new message to process + ChangeEvent ev = await queue.DequeueAsync(exitToken); + + //Set the response + SetResponse(ev, context); + } + + //If no event bus is registered, then this is not a legal command + if (userState is not AsyncQueue<ChangeEvent> eventBus) + { + context.CloseResponse(ResponseCodes.NotFound); + + return; + } + + //try to deq without awaiting + if (eventBus.TryDequeue(out ChangeEvent? change)) + { + SetResponse(change, context); + } + else + { + //Process async + await DequeAsync(eventBus, context, exitToken); + } + + return; + } + + } + + Log.Error("Unhandled cache event for session {id}", context.Request.ConnectionId); + context.CloseResponse(ResponseCodes.Error); + } + catch (OperationCanceledException) + { + throw; + } + catch(Exception ex) + { + //Log error and set error status code + Log.Error(ex); + context.CloseResponse(ResponseCodes.Error); + } + } + + private async ValueTask ReadEntryAsync(FBMContext context, string objectId, CancellationToken cancellation) + { + //Try to get the bucket that the id should belong to + IBlobCacheBucket bucket = Cache.GetBucket(objectId); + + //Wait for lock on bucket async + using CacheBucketHandle handle = await bucket.WaitAsync(cancellation); + + if (handle.Cache.TryGetValue(objectId, out CacheEntry data)) + { + //Set the status code and write the buffered data to the response buffer + context.CloseResponse(ResponseCodes.Okay); + + //Copy data to response buffer + context.Response.WriteBody(data.GetDataSegment()); + } + else + { + context.CloseResponse(ResponseCodes.NotFound); + } + } + + private async ValueTask DeleteEntryAsync(FBMContext context, ChangeEvent change, CancellationToken cancellation) + { + //Remove the object from the blob store + bool found = await Cache.DeleteObjectAsync(change.CurrentId, cancellation); + + context.CloseResponse(found ? ResponseCodes.Okay : ResponseCodes.NotFound); + + //Enque change if item was successfully deleted + if (found) + { + EnqueEvent(change); + } + } + + private async ValueTask AddOrUpdateAsync(FBMContext context, ChangeEvent change, CancellationToken cancellation) + { + //Run add/update and get the valuetask + await Cache.AddOrUpdateObjectAsync(change.CurrentId, change.AlternateId, static r => r.BodyData, context.Request, default, cancellation); + + EnqueEvent(change); + + context.CloseResponse(ResponseCodes.Okay); + } + + private void EnqueEvent(ChangeEvent change) + { + if (!EventQueue.TryEnque(change)) + { + Log.Warn("Change event {ev} was not enqued because the event queue is overflowing!", change.CurrentId); + } + } + + + ///<inheritdoc/> + protected virtual void Dispose(bool disposing) + { + if (!disposedValue) + { + Cache.Dispose(); + + disposedValue = true; + } + } + + ///<inheritdoc/> + public void Dispose() + { + // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method + Dispose(disposing: true); + GC.SuppressFinalize(this); + } + } +} diff --git a/lib/VNLib.Data.Caching.ObjectCache/src/BlobCacheTable.cs b/lib/VNLib.Data.Caching.ObjectCache/src/BlobCacheTable.cs new file mode 100644 index 0000000..270cf1e --- /dev/null +++ b/lib/VNLib.Data.Caching.ObjectCache/src/BlobCacheTable.cs @@ -0,0 +1,139 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Data.Caching.ObjectCache +* File: BlobCacheTable.cs +* +* BlobCacheTable.cs is part of VNLib.Data.Caching.ObjectCache which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Data.Caching.ObjectCache is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Data.Caching.ObjectCache is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Linq; +using System.Collections; +using System.Collections.Generic; + +using VNLib.Utils; +using VNLib.Utils.Memory; + +namespace VNLib.Data.Caching.ObjectCache +{ + /// <summary> + /// A concrete implementation of a <see cref="IBlobCacheTable"/> + /// </summary> + public sealed class BlobCacheTable : VnDisposeable, IBlobCacheTable + { + private readonly uint _tableSize; + private readonly IBlobCacheBucket[] _buckets; + + /// <summary> + /// Initializes a new <see cref="BlobCacheTable"/> + /// </summary> + /// <param name="bucketSize">The number of elements in each bucket</param> + /// <param name="tableSize">The number of buckets within the table</param> + /// <param name="heap">The heap used to allocate cache entry buffers from</param> + /// <exception cref="ArgumentNullException"></exception> + /// <exception cref="ArgumentException"></exception> + public BlobCacheTable(uint tableSize, uint bucketSize, IUnmangedHeap heap) + { + _ = heap ?? throw new ArgumentNullException(nameof(heap)); + + if(tableSize == 0) + { + throw new ArgumentException("Cache table must have atleast 1 bucket"); + } + + //Init bucket table + _tableSize = tableSize; + _buckets = new IBlobCacheBucket[tableSize]; + + //Init buckets + InitBuckets(tableSize, bucketSize, _buckets, heap); + } + + + private static void InitBuckets(uint size, uint bucketSize, IBlobCacheBucket[] table, IUnmangedHeap heap) + { + for(int i = 0; i < size; i++) + { + table[i] = new BlobCacheBucket((int)bucketSize, heap); + } + } + + /* + * A very simple algorithm that captures unique values + * from an object id and builds an unsigned 32bit integer + * used to determine the bucked index within the table. + * + * This method will alawys result in the same index for + * for a given object-id + */ + + private uint FastGetBucketIndexFromId(ReadOnlySpan<char> objectId) + { + if (objectId.Length < 4) + { + throw new ArgumentException("Object id must be larger than 3 characters"); + } + + Span<byte> buffer = stackalloc byte[4]; + + //cast the characters + buffer[0] = (byte)objectId[0]; + buffer[1] = (byte)objectId[objectId.Length / 2]; + buffer[2] = (byte)objectId[1]; + buffer[3] = (byte)objectId[^1]; + + //Read the buffer back to a uint and mod by the table size to get the bucket index + return BitConverter.ToUInt32(buffer) % _tableSize; + } + + + ///<inheritdoc/> + ///<exception cref="ObjectDisposedException"></exception> + public IBlobCacheBucket GetBucket(ReadOnlySpan<char> objectId) + { + Check(); + + //If tablesize is 1, skip lookup, otherwise perform bucket index lookup + uint index = _tableSize == 1 ? 0 : FastGetBucketIndexFromId(objectId); + + return _buckets[index]; + } + + ///<inheritdoc/> + protected sealed override void Free() + { + //Dispose buckets + Array.ForEach(_buckets, static b => b.Dispose()); + } + + ///<inheritdoc/> + public IEnumerator<IBlobCacheBucket> GetEnumerator() + { + Check(); + return _buckets.AsEnumerable().GetEnumerator(); + } + + ///<inheritdoc/> + IEnumerator IEnumerable.GetEnumerator() + { + Check(); + return _buckets.AsEnumerable().GetEnumerator(); + } + } +} diff --git a/lib/VNLib.Data.Caching.ObjectCache/src/CacheBucketHandle.cs b/lib/VNLib.Data.Caching.ObjectCache/src/CacheBucketHandle.cs new file mode 100644 index 0000000..f9c1f17 --- /dev/null +++ b/lib/VNLib.Data.Caching.ObjectCache/src/CacheBucketHandle.cs @@ -0,0 +1,111 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Data.Caching.ObjectCache +* File: CacheBucketHandle.cs +* +* CacheBucketHandle.cs is part of VNLib.Data.Caching.ObjectCache which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Data.Caching.ObjectCache is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Data.Caching.ObjectCache is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Diagnostics.CodeAnalysis; + +namespace VNLib.Data.Caching.ObjectCache +{ + /// <summary> + /// Holds an exclusive lock on a <see cref="IBlobCacheBucket"/>, and exposes + /// acess to its internal <see cref="IBlobCache"/> + /// </summary> + public readonly struct CacheBucketHandle : IDisposable, IEquatable<CacheBucketHandle> + { + private readonly IBlobCacheBucket? _bucket; + + /// <summary> + /// The <see cref="IBlobCache"/> held by the current handle + /// </summary> + public readonly IBlobCache Cache { get; } + + /// <summary> + /// Initializes an empty blobcache handle + /// </summary> + public CacheBucketHandle() + { + _bucket = null; + Cache = null!; + } + + /// <summary> + /// Creates a new bucket lock handle to be released on dispose + /// </summary> + /// <param name="bucket">The bucket to release access to on dispose</param> + /// <param name="cache">The underlying <see cref="IBlobCache"/> provide exclusive access to</param> + public CacheBucketHandle(IBlobCacheBucket bucket, IBlobCache cache) + { + _bucket = bucket; + Cache = cache; + } + + /// <summary> + /// Releases the exlusive lock held on the bucket + /// </summary> + public void Dispose() + { + //Release the bucket when disposed + _bucket?.Release(); + } + + /// <summary> + /// Determines if the other handle instance is equal to the current. Handles are + /// equal iff the underlying bucket referrence is equal. + /// </summary> + /// <param name="other">The other handle to compare</param> + /// <returns>True if the handles hold a referrence to the same bucket</returns> + public bool Equals(CacheBucketHandle other) => _bucket?.Equals(other._bucket) ?? false; + /// <summary> + /// Determines if the other handle instance is equal to the current. Handles are + /// equal iff the underlying bucket referrence is equal. + /// </summary> + /// <param name="obj">The other handle to compare</param> + /// <returns>True if the handles hold a referrence to the same bucket</returns> + public override bool Equals([NotNullWhen(true)] object? obj) => obj is CacheBucketHandle other && Equals(other); + + /// <summary> + /// Gets the hashcode of the underlying bucket + /// </summary> + /// <returns></returns> + public override int GetHashCode() => _bucket?.GetHashCode() ?? -1; + + /// <summary> + /// Determines if the handles are equal by the <see cref="Equals(CacheBucketHandle)"/> + /// method. + /// </summary> + /// <param name="left"></param> + /// <param name="right"></param> + /// <returns>True if the internal bucket references are equal</returns> + public static bool operator ==(CacheBucketHandle left, CacheBucketHandle right) => left.Equals(right); + + /// <summary> + /// Determines if the handles are equal by the <see cref="Equals(CacheBucketHandle)"/> + /// method. + /// </summary> + /// <param name="left"></param> + /// <param name="right"></param> + /// <returns>True if the internal bucket references are NOT equal</returns> + public static bool operator !=(CacheBucketHandle left, CacheBucketHandle right) => !(left == right); + } +} diff --git a/lib/VNLib.Data.Caching.ObjectCache/src/CacheEntry.cs b/lib/VNLib.Data.Caching.ObjectCache/src/CacheEntry.cs index 8644d1d..3d61790 100644 --- a/lib/VNLib.Data.Caching.ObjectCache/src/CacheEntry.cs +++ b/lib/VNLib.Data.Caching.ObjectCache/src/CacheEntry.cs @@ -32,7 +32,8 @@ using VNLib.Utils.Extensions; namespace VNLib.Data.Caching { /// <summary> - /// A structure that represents an item in cache + /// A structure that represents an item in cache. It contains the binary content + /// of a cache entry by its internal memory handle /// </summary> public readonly struct CacheEntry : IDisposable, IEquatable<CacheEntry> { @@ -89,7 +90,7 @@ namespace VNLib.Data.Caching ///<inheritdoc/> - public readonly void Dispose() => _handle.Dispose(); + public readonly void Dispose() => _handle?.Dispose(); [MethodImpl(MethodImplOptions.AggressiveInlining)] @@ -116,7 +117,7 @@ namespace VNLib.Data.Caching /// </summary> /// <returns>The last date stored</returns> /// <exception cref="ObjectDisposedException"></exception> - public readonly DateTime GetCreatedTime() + public readonly DateTime GetTime() { //Get the time segment and write the value in big endian ReadOnlySpan<byte> segment = GetTimeSegment(); diff --git a/lib/VNLib.Data.Caching.ObjectCache/src/IBlobCache.cs b/lib/VNLib.Data.Caching.ObjectCache/src/IBlobCache.cs new file mode 100644 index 0000000..52d53ff --- /dev/null +++ b/lib/VNLib.Data.Caching.ObjectCache/src/IBlobCache.cs @@ -0,0 +1,87 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Data.Caching.ObjectCache +* File: IBlobCache.cs +* +* IBlobCache.cs is part of VNLib.Data.Caching.ObjectCache which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Data.Caching.ObjectCache is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Data.Caching.ObjectCache is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Collections.Generic; + +using VNLib.Utils.Memory; + +namespace VNLib.Data.Caching.ObjectCache +{ + /// <summary> + /// Represents a binary data cache store + /// </summary> + public interface IBlobCache : IEnumerable<KeyValuePair<string, CacheEntry>>, IDisposable + { + /// <summary> + /// The internal heap used to allocate <see cref="CacheEntry"/> buffers + /// </summary> + IUnmangedHeap CacheHeap { get; } + + /// <summary> + /// Attempts to retreive the entry at the given id. + /// </summary> + /// <param name="objectId">The id of the object to locate</param> + /// <param name="entry">The cache entry if found, default otherwise</param> + /// <returns>True if the entry was assigned</returns> + bool TryGetValue(string objectId, out CacheEntry entry); + + /// <summary> + /// Attempts to relocate the entry in the table by its new id. + /// </summary> + /// <param name="objectId">The original id of the entry to modify</param> + /// <param name="newId">The new id of the entry</param> + /// <param name="entry">The original entry if found, default otherwise</param> + /// <returns>True if the item was located and successfully updated, false if the operation failed</returns> + bool TryChangeKey(string objectId, string newId, out CacheEntry entry); + + /// <summary> + /// Adds the entry to the table by the id + /// </summary> + /// <param name="objectId"></param> + /// <param name="entry">The entry to store in the table</param> + void Add(string objectId, CacheEntry entry); + + /// <summary> + /// Attempts to remove the entry at the given id, and returns the + /// entry if located. + /// </summary> + /// <param name="objectId">The id of the entry to remove</param> + /// <param name="entry">The entry if found, default otherwise</param> + /// <returns>True if the entry existed in the store, false otherwise</returns> + /// <remarks> + /// NOTE: If the return value is true, the store no longer maintains the lifetime + /// of the returned <see cref="CacheEntry"/>. You must manually dispose the entry + /// to avoid memory leaks. + /// </remarks> + bool Remove(string objectId, out CacheEntry entry); + + /// <summary> + /// Attempts to remove the entry at the given id, and release its memory. + /// </summary> + /// <param name="objectId">The id of the entry to remove</param> + /// <returns>True if the entry was found and disposed</returns> + bool Remove(string objectId); + } +} diff --git a/lib/VNLib.Data.Caching.ObjectCache/src/IBlobCacheBucket.cs b/lib/VNLib.Data.Caching.ObjectCache/src/IBlobCacheBucket.cs new file mode 100644 index 0000000..4876c5f --- /dev/null +++ b/lib/VNLib.Data.Caching.ObjectCache/src/IBlobCacheBucket.cs @@ -0,0 +1,58 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Data.Caching.ObjectCache +* File: ObjectCacheStore.cs +* +* ObjectCacheStore.cs is part of VNLib.Data.Caching.ObjectCache which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Data.Caching.ObjectCache is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Data.Caching.ObjectCache is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Threading; +using System.Threading.Tasks; + +namespace VNLib.Data.Caching.ObjectCache +{ + /// <summary> + /// Represents a singe cache bucket that maintains cache concurrent <see cref="IBlobCache"/> + /// operations. + /// </summary> + public interface IBlobCacheBucket : IDisposable + { + /// <summary> + /// Gets a <see cref="CacheBucketHandle"/> that holds an exclusive lock + /// for the current bucekt and holds a referrence to the stored + /// <see cref="IBlobCache"/> + /// </summary> + /// <param name="cancellation">A token to cancel the wait operation</param> + /// <returns>A <see cref="CacheBucketHandle"/> that holds the <see cref="IBlobCache"/> referrence</returns> + ValueTask<CacheBucketHandle> WaitAsync(CancellationToken cancellation); + + /// <summary> + /// Allows for waiting for the cache directly, IE without receiving a lock handle + /// </summary> + /// <param name="cancellation"></param> + /// <returns>The underlying <see cref="IBlobCache"/> that now has exlcusive access</returns> + ValueTask<IBlobCache> ManualWaitAsync(CancellationToken cancellation); + + /// <summary> + /// Releases an exlcusive lock on the current bucket, DO NOT CALL BY USER CODE + /// </summary> + void Release(); + } +} diff --git a/lib/VNLib.Data.Caching.ObjectCache/src/IBlobCacheTable.cs b/lib/VNLib.Data.Caching.ObjectCache/src/IBlobCacheTable.cs new file mode 100644 index 0000000..d84aecf --- /dev/null +++ b/lib/VNLib.Data.Caching.ObjectCache/src/IBlobCacheTable.cs @@ -0,0 +1,43 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Data.Caching.ObjectCache +* File: IBlobCacheTable.cs +* +* IBlobCacheTable.cs is part of VNLib.Data.Caching.ObjectCache which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Data.Caching.ObjectCache is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Data.Caching.ObjectCache is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Collections.Generic; + +namespace VNLib.Data.Caching.ObjectCache +{ + /// <summary> + /// A table that contains a collection of blob cache buckets + /// for improved cache concurrency + /// </summary> + public interface IBlobCacheTable : IEnumerable<IBlobCacheBucket>, IDisposable + { + /// <summary> + /// Gets a bucket that should contain the object by its id + /// </summary> + /// <param name="objectId">The id of the object to get the bucket for</param> + /// <returns>The bucket that should contain the object</returns> + IBlobCacheBucket GetBucket(ReadOnlySpan<char> objectId); + } +} diff --git a/lib/VNLib.Data.Caching.ObjectCache/src/ObjectCacheStore.cs b/lib/VNLib.Data.Caching.ObjectCache/src/ObjectCacheStore.cs deleted file mode 100644 index af1e730..0000000 --- a/lib/VNLib.Data.Caching.ObjectCache/src/ObjectCacheStore.cs +++ /dev/null @@ -1,463 +0,0 @@ -/* -* Copyright (c) 2022 Vaughn Nugent -* -* Library: VNLib -* Package: VNLib.Data.Caching.ObjectCache -* File: ObjectCacheStore.cs -* -* ObjectCacheStore.cs is part of VNLib.Data.Caching.ObjectCache which is part of the larger -* VNLib collection of libraries and utilities. -* -* VNLib.Data.Caching.ObjectCache is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License as -* published by the Free Software Foundation, either version 3 of the -* License, or (at your option) any later version. -* -* VNLib.Data.Caching.ObjectCache is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see https://www.gnu.org/licenses/. -*/ - -using System; -using System.IO; -using System.Threading; -using System.Threading.Tasks; - -using VNLib.Utils.Async; -using VNLib.Utils.Memory; -using VNLib.Utils.Logging; -using VNLib.Utils.Extensions; -using VNLib.Net.Messaging.FBM.Server; -using static VNLib.Data.Caching.Constants; - - -#pragma warning disable CA1849 // Call async methods when in an async method - -namespace VNLib.Data.Caching.ObjectCache -{ - public delegate ReadOnlySpan<byte> GetBodyDataCallback<T>(T state); - - /// <summary> - /// A <see cref="FBMListener"/> implementation of a <see cref="CacheListener"/> - /// </summary> - public class ObjectCacheStore : FBMListenerBase, IDisposable - { - private readonly SemaphoreSlim StoreLock; - private bool disposedValue; - - ///<inheritdoc/> - protected override ILogProvider Log { get; } - - /// <summary> - /// A queue that stores update and delete events - /// </summary> - public AsyncQueue<ChangeEvent> EventQueue { get; } - - /// <summary> - /// The Cache store to access data blobs - /// </summary> - private readonly BlobCache Cache; - - private readonly IUnmangedHeap Heap; - - - /// <summary> - /// Initialzies a new <see cref="ObjectCacheStore"/> - /// </summary> - /// <param name="cacheMax"></param> - /// <param name="log"></param> - /// <param name="heap"></param> - /// <param name="singleReader">A value that indicates if a single thread is processing events</param> - public ObjectCacheStore(int cacheMax, ILogProvider log, IUnmangedHeap heap, bool singleReader) - { - Log = log; - //We can use a single writer and single reader in this context - EventQueue = new(true, singleReader); - Cache = new(cacheMax); - Heap = heap; - InitListener(heap); - StoreLock = new(1,1); - } - - ///<inheritdoc/> - protected override Task ProcessAsync(FBMContext context, object? userState, CancellationToken exitToken) - { - try - { - //Get the action header - string action = context.Method(); - - //Optional newid header - string? alternateId = context.NewObjectId(); - - switch (action) - { - case Actions.Get: - { - //Get the object-id header - string objectId = context.ObjectId(); - - //Try read sync - if (StoreLock.Wait(0)) - { - try - { - UnsafeReadEntry(context, objectId); - } - finally - { - StoreLock.Release(); - } - - return Task.CompletedTask; - } - else - { - //Read entry async - return InternalReadEntryAsync(context, objectId, exitToken); - } - } - - case Actions.AddOrUpdate: - { - //Get the object-id header - string objectId = context.ObjectId(); - - //Create change event for the object - ChangeEvent change = new(objectId, alternateId, false); - - //Attempt to aquire lock sync - if (StoreLock.Wait(0)) - { - //aquired sync - try - { - //Update the item - UnsafeAddOrUpdate(objectId, alternateId, GetBodyData, context); - } - finally - { - StoreLock.Release(); - } - - //Add to event queue - EnqueEvent(change); - - //Set status code - context.CloseResponse(ResponseCodes.Okay); - - return Task.CompletedTask; - } - else - { - //Lock will be awaited async and - return InternalAddOrUpdateAsync(context, change, exitToken); - } - } - case Actions.Delete: - { - //Get the object-id header - string objectId = context.ObjectId(); - - //Create change event - ChangeEvent change = new(objectId, alternateId, true); - - //See if lock can be entered without waiting - if (StoreLock.Wait(0)) - { - bool found = false; - - try - { - //Sync - found = UnsafeDeleteEntry(objectId); - } - finally - { - StoreLock.Release(); - } - - //Notify change - EnqueEvent(change); - - //Set status code if found - context.CloseResponse(found ? ResponseCodes.Okay : ResponseCodes.NotFound); - - return Task.CompletedTask; - } - else - { - //lock will yeild async - return InternalDeleteAsync(context, change, exitToken); - } - } - // event queue dequeue request - case Actions.Dequeue: - { - static void SetResponse(ChangeEvent change, FBMContext context) - { - if (change.Deleted) - { - context.CloseResponse("deleted"); - context.Response.WriteHeader(ObjectId, change.CurrentId); - } - else - { - //Changed - context.CloseResponse("modified"); - context.Response.WriteHeader(ObjectId, change.CurrentId); - - //Set old id if an old id is set - if (change.CurrentId != null) - { - context.Response.WriteHeader(NewObjectId, change.AlternateId); - } - } - } - - static async Task DequeAsync(AsyncQueue<ChangeEvent> queue, FBMContext context, CancellationToken exitToken) - { - //Wait for a new message to process - ChangeEvent ev = await queue.DequeueAsync(exitToken); - - //Set the response - SetResponse(ev, context); - } - - //If no event bus is registered, then this is not a legal command - if (userState is not AsyncQueue<ChangeEvent> eventBus) - { - context.CloseResponse(ResponseCodes.NotFound); - - return Task.CompletedTask; - } - - //try to deq without awaiting - if (eventBus.TryDequeue(out ChangeEvent? change)) - { - SetResponse(change, context); - - return Task.CompletedTask; - } - else - { - //Process async - return DequeAsync(eventBus, context, exitToken); - } - } - - } - - Log.Error("Unhandled cache event!"); - } - catch (OperationCanceledException) - { - throw; - } - catch(Exception ex) - { - //Log error and set error status code - Log.Error(ex); - context.CloseResponse(ResponseCodes.Error); - } - - return Task.CompletedTask; - } - - - private static ReadOnlySpan<byte> GetBodyData(FBMContext ctx) => ctx.Request.BodyData; - - private void EnqueEvent(ChangeEvent change) - { - if (!EventQueue.TryEnque(change)) - { - Log.Warn("Change event {ev} was not enqued because the event queue is overflowing!", change.CurrentId); - } - } - - private void UnsafeReadEntry(FBMContext context, string objectId) - { - if (Cache!.TryGetValue(objectId, out CacheEntry data)) - { - //Set the status code and write the buffered data to the response buffer - context.CloseResponse(ResponseCodes.Okay); - - //Copy data to response buffer - context.Response.WriteBody(data.GetDataSegment()); - } - else - { - context.CloseResponse(ResponseCodes.NotFound); - } - } - - async Task InternalReadEntryAsync(FBMContext context, string objectId, CancellationToken cancellation) - { - //enter lock async - using SemSlimReleaser rel = await StoreLock.GetReleaserAsync(cancellation); - - UnsafeReadEntry(context, objectId); - } - - private async Task InternalAddOrUpdateAsync(FBMContext context, ChangeEvent change, CancellationToken cancellation) - { - //Wait for lock since we know it will yeild async - using (SemSlimReleaser rel = await StoreLock.GetReleaserAsync(cancellation)) - { - UnsafeAddOrUpdate(change.CurrentId, change.AlternateId, GetBodyData, context); - } - - //Add to event queue - EnqueEvent(change); - - //Set status code - context.CloseResponse(ResponseCodes.Okay); - } - - private void UnsafeAddOrUpdate<T>(string objectId, string? alternateId, GetBodyDataCallback<T> bodyData, T state) - { - CacheEntry entry; - - //See if new/alt session id was specified - if (string.IsNullOrWhiteSpace(alternateId)) - { - //See if blob exists - if (!Cache!.TryGetValue(objectId, out entry)) - { - //Create the new cache entry since it does not exist - entry = CacheEntry.Create(bodyData(state), Heap); - - //Add to cache - Cache.Add(objectId, entry); - } - else - { - //Reset the buffer state - entry.UpdateData(bodyData(state)); - } - } - //Need to change the id of the record - else - { - //Try to change the blob key - if (!Cache!.TryChangeKey(objectId, alternateId, out entry)) - { - //Create the new cache entry since it does not exist - entry = CacheEntry.Create(bodyData(state), Heap); - - //Add to cache by its alternate id - Cache.Add(alternateId, entry); - } - else - { - //Reset the buffer state - entry.UpdateData(bodyData(state)); - } - } - - //Update modified time to current utc time - entry.SetTime(DateTime.UtcNow); - } - - private async Task InternalDeleteAsync(FBMContext context, ChangeEvent change, CancellationToken cancellation) - { - bool found = false; - - //enter the lock - using(SemSlimReleaser rel = await StoreLock.GetReleaserAsync(cancellation)) - { - //Sync - found = UnsafeDeleteEntry(change.CurrentId); - } - - //Notify change - EnqueEvent(change); - - //Set status code if found - context.CloseResponse(found ? ResponseCodes.Okay : ResponseCodes.NotFound); - } - - private bool UnsafeDeleteEntry(string id) => Cache!.Remove(id); - - - /// <summary> - /// Asynchronously adds or updates an object in the store and optionally update's its id - /// </summary> - /// <param name="objectId">The current (or old) id of the object</param> - /// <param name="alternateId">An optional id to update the blob to</param> - /// <param name="bodyData">A callback that returns the data for the blob</param> - /// <param name="state">The state parameter to pass to the data callback</param> - /// <param name="token">A token to cancel the async operation</param> - /// <returns>A value task that represents the async operation</returns> - public async ValueTask AddOrUpdateBlobAsync<T>(string objectId, string? alternateId, GetBodyDataCallback<T> bodyData, T state, CancellationToken token = default) - { - //Test the lock before waiting async - if (!StoreLock.Wait(0)) - { - //Wait async to avoid task alloc - await StoreLock.WaitAsync(token); - } - try - { - UnsafeAddOrUpdate(objectId, alternateId, bodyData, state); - } - finally - { - StoreLock.Release(); - } - } - - /// <summary> - /// Asynchronously deletes a previously stored item - /// </summary> - /// <param name="id">The id of the object to delete</param> - /// <param name="token">A token to cancel the async lock await</param> - /// <returns>A task that completes when the item has been deleted</returns> - public async ValueTask<bool> DeleteItemAsync(string id, CancellationToken token = default) - { - //Test the lock before waiting async - if (!StoreLock.Wait(0)) - { - //Wait async to avoid task alloc - await StoreLock.WaitAsync(token); - } - try - { - return UnsafeDeleteEntry(id); - } - finally - { - StoreLock.Release(); - } - } - - - ///<inheritdoc/> - protected virtual void Dispose(bool disposing) - { - if (!disposedValue) - { - if (disposing) - { - Cache?.Clear(); - } - - StoreLock.Dispose(); - - disposedValue = true; - } - } - - ///<inheritdoc/> - public void Dispose() - { - // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method - Dispose(disposing: true); - GC.SuppressFinalize(this); - } - } -} diff --git a/lib/VNLib.Data.Caching.ObjectCache/src/VNLib.Data.Caching.ObjectCache.csproj b/lib/VNLib.Data.Caching.ObjectCache/src/VNLib.Data.Caching.ObjectCache.csproj index 119622f..3d1af7f 100644 --- a/lib/VNLib.Data.Caching.ObjectCache/src/VNLib.Data.Caching.ObjectCache.csproj +++ b/lib/VNLib.Data.Caching.ObjectCache/src/VNLib.Data.Caching.ObjectCache.csproj @@ -2,18 +2,25 @@ <PropertyGroup> <TargetFramework>net6.0</TargetFramework> - - <Authors>Vaughn Nugent</Authors> - <Copyright>Copyright © 2023 Vaughn Nugent</Copyright> + <RootNamespace>VNLib.Data.Caching.ObjectCache</RootNamespace> + <AssemblyName>VNLib.Data.Caching.ObjectCache</AssemblyName> <Nullable>enable</Nullable> <GenerateDocumentationFile>True</GenerateDocumentationFile> - <PackageProjectUrl>https://www.vaughnnugent.com/resources</PackageProjectUrl> - <AssemblyVersion>1.0.0.1</AssemblyVersion> - <Version>1.0.1.1</Version> <AnalysisLevel>latest-all</AnalysisLevel> - <SignAssembly>True</SignAssembly> - <AssemblyOriginatorKeyFile>\\vaughnnugent.com\Internal\Folder Redirection\vman\Documents\Programming\Software\StrongNameingKey.snk</AssemblyOriginatorKeyFile> - <Description>Provides server-side object-cache related infrastructure</Description> + </PropertyGroup> + + <PropertyGroup> + <Authors>Vaughn Nugent</Authors> + <Company>Vaughn Nugent</Company> + <Product>VNLib.Data.Caching.ObjectCache</Product> + <PackageId>VNLib.Data.Caching.ObjectCache</PackageId> + <Description> + A library for a high-performance in-memory object-data caching, based on key-derrived cache buckets + for wait distribution. + </Description> + <Copyright>Copyright © 2023 Vaughn Nugent</Copyright> + <PackageProjectUrl>https://www.vaughnnugent.com/resources/software/modules/VNLib.Data.Caching</PackageProjectUrl> + <RepositoryUrl>https://github.com/VnUgE/VNLib.Data.Caching/tree/master/lib/VNLib.Data.Caching.ObjectCache</RepositoryUrl> </PropertyGroup> <ItemGroup> diff --git a/lib/VNLib.Plugins.Extensions.VNCache/src/AddOrUpdateBuffer.cs b/lib/VNLib.Plugins.Extensions.VNCache/src/AddOrUpdateBuffer.cs new file mode 100644 index 0000000..a1fe2b5 --- /dev/null +++ b/lib/VNLib.Plugins.Extensions.VNCache/src/AddOrUpdateBuffer.cs @@ -0,0 +1,97 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Plugins.Extensions.VNCache +* File: AddOrUpdateBuffer.cs +* +* AddOrUpdateBuffer.cs is part of VNLib.Plugins.Extensions.VNCache +* which is part of the larger VNLib collection of libraries and utilities. +* +* VNLib.Plugins.Extensions.VNCache is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Plugins.Extensions.VNCache is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Buffers; + +using VNLib.Utils; +using VNLib.Utils.Memory; +using VNLib.Utils.Extensions; +using VNLib.Data.Caching; + +namespace VNLib.Plugins.Extensions.VNCache +{ + /// <summary> + /// Implements a buffer writer that serves to serialize object data and + /// store the object data for use by the memory cache store, and the + /// remote cache store + /// </summary> + class AddOrUpdateBuffer : VnDisposeable, IBufferWriter<byte>, IObjectData + { + private int _count; + private readonly IUnmangedHeap _heap; + private MemoryHandle<byte>? _buffer; + + public AddOrUpdateBuffer(IUnmangedHeap heap) + { + _heap = heap; + } + + public void Advance(int count) + { + //Update count + _count += count; + } + + public Memory<byte> GetMemory(int sizeHint = 0) + { + throw new NotImplementedException(); + } + + public Span<byte> GetSpan(int sizeHint = 0) + { + //Round to nearest page for new size + nint newSize = MemoryUtil.NearestPage(sizeHint + _count); + + //Alloc buffer it not yet allocated + if (_buffer == null) + { + _buffer = _heap.Alloc<byte>(newSize); + } + else + { + //check for resize if allocated + _buffer.ResizeIfSmaller(newSize); + } + + return _buffer.AsSpan(_count); + } + + public void SetData(ReadOnlySpan<byte> data) + { + throw new NotSupportedException(); + } + + public ReadOnlySpan<byte> GetData() + { + //Get stored data from within handle + return _buffer!.AsSpan(0, _count); + } + + protected override void Free() + { + _buffer?.Dispose(); + } + } +}
\ No newline at end of file diff --git a/lib/VNLib.Plugins.Extensions.VNCache/src/DataModel/EntityCacheExtensions.cs b/lib/VNLib.Plugins.Extensions.VNCache/src/DataModel/EntityCacheExtensions.cs new file mode 100644 index 0000000..79bb4fc --- /dev/null +++ b/lib/VNLib.Plugins.Extensions.VNCache/src/DataModel/EntityCacheExtensions.cs @@ -0,0 +1,159 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Plugins.Extensions.VNCache +* File: EntityCacheExtensions.cs +* +* EntityCacheExtensions.cs is part of VNLib.Plugins.Extensions.VNCache +* which is part of the larger VNLib collection of libraries and utilities. +* +* VNLib.Plugins.Extensions.VNCache is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Plugins.Extensions.VNCache is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Threading; +using System.Threading.Tasks; +using System.Runtime.CompilerServices; + +using VNLib.Data.Caching; + +namespace VNLib.Plugins.Extensions.VNCache.DataModel +{ + /// <summary> + /// Provides cache extensions for entity caching + /// </summary> + public static class EntityCacheExtensions + { + /// <summary> + /// Gets a new <see cref="ScopedCache"/> that is backed by the current cache provider + /// and generates 1:1 keys from the <paramref name="cacheKeyGenerator"/> + /// </summary> + /// <param name="cache"></param> + /// <param name="cacheKeyGenerator">The instance that generates unique keys for a given entity id</param> + /// <returns>The new <see cref="ScopedCache"/> instance</returns> + public static ScopedCache GetScopedCache(this IGlobalCacheProvider cache, ICacheKeyGenerator cacheKeyGenerator) => new ScopedCacheImpl(cache, cacheKeyGenerator); + + /// <summary> + /// Deletes an <see cref="ICacheEntity"/> from the cache from its id + /// </summary> + /// <typeparam name="T"></typeparam> + /// <param name="cache"></param> + /// <param name="entity">The entity to delete from the store</param> + /// <param name="cancellation">A token to cancel the operation</param> + /// <returns>A task that completes when the delete operation has compelted</returns> + /// <exception cref="ArgumentNullException"></exception> + public static Task DeleteAsync<T>(this IGlobalCacheProvider cache, T entity, CancellationToken cancellation) where T: class, ICacheEntity + { + _ = entity ?? throw new ArgumentNullException(nameof(entity)); + _ = cache ?? throw new ArgumentNullException(nameof(entity)); + //Delete by its id + return cache.DeleteAsync(entity.Id, cancellation); + } + + /// <summary> + /// Asynchronously sets (or updates) a cached value in the backing cache store + /// </summary> + /// <typeparam name="T"></typeparam> + /// <param name="cache"></param> + /// <param name="cancellation">A token to cancel the async operation</param> + /// <param name="entity">The entity to set at the given key</param> + /// <returns>A task that completes when the add/update operation has compelted</returns> + /// <exception cref="ArgumentNullException"></exception> + public static Task AddOrUpdateAsync<T>(this IGlobalCacheProvider cache, T entity, CancellationToken cancellation) where T: class, ICacheEntity + { + _ = entity ?? throw new ArgumentNullException(nameof(entity)); + _ = cache ?? throw new ArgumentNullException(nameof(cache)); + + //Add/update with its id + return cache.AddOrUpdateAsync(entity.Id, null, entity, cancellation); + } + + + private sealed class ScopedCacheImpl: ScopedCache + { + private readonly IGlobalCacheProvider cache; + + public override bool IsConnected + { + [MethodImpl(MethodImplOptions.AggressiveInlining)] + get => cache.IsConnected; + } + + + protected override ICacheKeyGenerator KeyGen { get; } + + public ScopedCacheImpl(IGlobalCacheProvider cache, ICacheKeyGenerator keyGen) + { + this.cache = cache; + KeyGen = keyGen; + } + + public override Task AddOrUpdateAsync<T>(string key, string? newKey, T value, CancellationToken cancellation) + { + _ = key ?? throw new ArgumentNullException(nameof(key)); + + //Compute primary key from id + string primary = KeyGen.ComputedKey(key); + + //If newkey exists, compute the secondary key + string? secondary = newKey != null ? KeyGen.ComputedKey(newKey) : null; + + return cache.AddOrUpdateAsync(primary, secondary, value, cancellation); + } + + public override Task DeleteAsync(string key, CancellationToken cancellation) + { + _ = key ?? throw new ArgumentNullException(nameof(key)); + //Compute the key for the id + string scoped = KeyGen.ComputedKey(key); + return cache.DeleteAsync(scoped, cancellation); + } + + public override Task<T> GetAsync<T>(string key, CancellationToken cancellation) + { + _ = key ?? throw new ArgumentNullException(nameof(key)); + + //Compute the key for the id + string scoped = KeyGen.ComputedKey(key); + + return cache.GetAsync<T?>(scoped, cancellation); + } + + public override Task<T> GetAsync<T>(string key, ICacheObjectDeserialzer deserializer, CancellationToken cancellation) + { + _ = key ?? throw new ArgumentNullException(nameof(key)); + + //Compute the key for the id + string scoped = KeyGen.ComputedKey(key); + + return cache.GetAsync<T?>(scoped, deserializer, cancellation); + } + + public override Task AddOrUpdateAsync<T>(string key, string? newKey, T value, ICacheObjectSerialzer serialzer, CancellationToken cancellation) + { + _ = key ?? throw new ArgumentNullException(nameof(key)); + + //Compute primary key from id + string primary = KeyGen.ComputedKey(key); + + //If newkey exists, compute the secondary key + string? secondary = newKey != null ? KeyGen.ComputedKey(newKey) : null; + + return cache.AddOrUpdateAsync(primary, secondary, value, serialzer, cancellation); + } + } + } + +} diff --git a/lib/VNLib.Plugins.Extensions.VNCache/src/DataModel/ICacheEntity.cs b/lib/VNLib.Plugins.Extensions.VNCache/src/DataModel/ICacheEntity.cs new file mode 100644 index 0000000..77f0667 --- /dev/null +++ b/lib/VNLib.Plugins.Extensions.VNCache/src/DataModel/ICacheEntity.cs @@ -0,0 +1,39 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Plugins.Extensions.VNCache +* File: ICacheEntity.cs +* +* ICacheEntity.cs is part of VNLib.Plugins.Extensions.VNCache +* which is part of the larger VNLib collection of libraries and utilities. +* +* VNLib.Plugins.Extensions.VNCache is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Plugins.Extensions.VNCache is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +namespace VNLib.Plugins.Extensions.VNCache.DataModel +{ + + /// <summary> + /// Represents a uniquely cachable item + /// </summary> + public interface ICacheEntity + { + /// <summary> + /// The unique ID of the item within the store + /// </summary> + string Id { get; } + } + +} diff --git a/lib/VNLib.Plugins.Extensions.VNCache/src/DataModel/ICacheExpirationStrategy.cs b/lib/VNLib.Plugins.Extensions.VNCache/src/DataModel/ICacheExpirationStrategy.cs new file mode 100644 index 0000000..f9ff54c --- /dev/null +++ b/lib/VNLib.Plugins.Extensions.VNCache/src/DataModel/ICacheExpirationStrategy.cs @@ -0,0 +1,48 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Plugins.Extensions.VNCache +* File: ICacheExpirationStrategy.cs +* +* ICacheExpirationStrategy.cs is part of VNLib.Plugins.Extensions.VNCache +* which is part of the larger VNLib collection of libraries and utilities. +* +* VNLib.Plugins.Extensions.VNCache is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Plugins.Extensions.VNCache is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; + +namespace VNLib.Plugins.Extensions.VNCache.DataModel +{ + /// <summary> + /// An interface that provides an object caching expiration + /// instructions + /// </summary> + public interface ICacheExpirationStrategy + { + /// <summary> + /// The maxium age of a given entity + /// </summary> + TimeSpan CacheMaxAge { get; } + + /// <summary> + /// Invoked when a record is retrieved and determined to be expired + /// </summary> + /// <typeparam name="T"></typeparam> + /// <param name="expired"></param> + void OnExpired<T>(T expired) where T : IExpirableCacheEntity; + } + +} diff --git a/lib/VNLib.Plugins.Extensions.VNCache/src/DataModel/ICacheKeyGenerator.cs b/lib/VNLib.Plugins.Extensions.VNCache/src/DataModel/ICacheKeyGenerator.cs new file mode 100644 index 0000000..2e558b0 --- /dev/null +++ b/lib/VNLib.Plugins.Extensions.VNCache/src/DataModel/ICacheKeyGenerator.cs @@ -0,0 +1,40 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Plugins.Extensions.VNCache +* File: ICacheKeyGenerator.cs +* +* ICacheKeyGenerator.cs is part of VNLib.Plugins.Extensions.VNCache +* which is part of the larger VNLib collection of libraries and utilities. +* +* VNLib.Plugins.Extensions.VNCache is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Plugins.Extensions.VNCache is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +namespace VNLib.Plugins.Extensions.VNCache.DataModel +{ + /// <summary> + /// An interface that provides 1:1 entity to cache key mapping + /// </summary> + public interface ICacheKeyGenerator + { + /// <summary> + /// Computes the unique key identifying the item within + /// the cache store, unique to the store. + /// </summary> + /// <param name="entityId">The id of the entity to get the key for</param> + /// <returns>The unique key identifying the item</returns> + string ComputedKey(string entityId); + } +} diff --git a/lib/VNLib.Plugins.Extensions.VNCache/src/DataModel/IExpirableCacheEntity.cs b/lib/VNLib.Plugins.Extensions.VNCache/src/DataModel/IExpirableCacheEntity.cs new file mode 100644 index 0000000..a47d3ca --- /dev/null +++ b/lib/VNLib.Plugins.Extensions.VNCache/src/DataModel/IExpirableCacheEntity.cs @@ -0,0 +1,39 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Plugins.Extensions.VNCache +* File: IExpirableCacheEntity.cs +* +* IExpirableCacheEntity.cs is part of VNLib.Plugins.Extensions.VNCache +* which is part of the larger VNLib collection of libraries and utilities. +* +* VNLib.Plugins.Extensions.VNCache is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Plugins.Extensions.VNCache is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +namespace VNLib.Plugins.Extensions.VNCache.DataModel +{ + /// <summary> + /// A cache entity that has a controllable expiration + /// </summary> + public interface IExpirableCacheEntity : ICacheEntity + { + /// <summary> + /// A serializable value set by the cache subsystem to + /// handle stale cache entires + /// </summary> + long Expires { get; set; } + } + +} diff --git a/lib/VNLib.Plugins.Extensions.VNCache/src/DataModel/ScopedCache.cs b/lib/VNLib.Plugins.Extensions.VNCache/src/DataModel/ScopedCache.cs new file mode 100644 index 0000000..da2f78a --- /dev/null +++ b/lib/VNLib.Plugins.Extensions.VNCache/src/DataModel/ScopedCache.cs @@ -0,0 +1,64 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Plugins.Extensions.VNCache +* File: ScopedCache.cs +* +* ScopedCache.cs is part of VNLib.Plugins.Extensions.VNCache +* which is part of the larger VNLib collection of libraries and utilities. +* +* VNLib.Plugins.Extensions.VNCache is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Plugins.Extensions.VNCache is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System.Threading; +using System.Threading.Tasks; + +using VNLib.Data.Caching; + +namespace VNLib.Plugins.Extensions.VNCache.DataModel +{ + /// <summary> + /// A cache that stores objects with 1:1 keys unique to this instance. That is, a unique entity + /// that is stored in this cache instance may only be retrieved, deleted, or updated, by the + /// same instance. This is an abstract class. + /// </summary> + public abstract class ScopedCache : IGlobalCacheProvider + { + + /// <summary> + /// The <see cref="ICacheKeyGenerator"/> to provide unique + /// cache keys + /// </summary> + protected abstract ICacheKeyGenerator KeyGen { get; } + + ///<inheritdoc/> + public abstract bool IsConnected { get; } + + ///<inheritdoc/> + public abstract Task AddOrUpdateAsync<T>(string key, string? newKey, T value, CancellationToken cancellation); + + ///<inheritdoc/> + public abstract Task DeleteAsync(string key, CancellationToken cancellation); + + ///<inheritdoc/> + public abstract Task<T?> GetAsync<T>(string key, CancellationToken cancellation); + + ///<inheritdoc/> + public abstract Task<T?> GetAsync<T>(string key, ICacheObjectDeserialzer deserializer, CancellationToken cancellation); + + ///<inheritdoc/> + public abstract Task AddOrUpdateAsync<T>(string key, string? newKey, T value, ICacheObjectSerialzer serialzer, CancellationToken cancellation); + } +} diff --git a/lib/VNLib.Plugins.Extensions.VNCache/src/MemoryCache.cs b/lib/VNLib.Plugins.Extensions.VNCache/src/MemoryCache.cs new file mode 100644 index 0000000..7b0fe72 --- /dev/null +++ b/lib/VNLib.Plugins.Extensions.VNCache/src/MemoryCache.cs @@ -0,0 +1,168 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Plugins.Extensions.VNCache +* File: RemoteBackedMemoryCache.cs +* +* RemoteBackedMemoryCache.cs is part of VNLib.Plugins.Extensions.VNCache +* which is part of the larger VNLib collection of libraries and utilities. +* +* VNLib.Plugins.Extensions.VNCache is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Plugins.Extensions.VNCache is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Text.Json; +using System.Threading; +using System.Threading.Tasks; + +using VNLib.Utils; +using VNLib.Utils.Memory; +using VNLib.Utils.Logging; +using VNLib.Utils.Memory.Diagnostics; +using VNLib.Data.Caching; +using VNLib.Data.Caching.ObjectCache; +using VNLib.Plugins.Extensions.Loading; + +namespace VNLib.Plugins.Extensions.VNCache +{ + [ConfigurationName(VNCacheExtensions.CACHE_CONFIG_KEY)] + internal sealed class MemoryCache : VnDisposeable, IGlobalCacheProvider + { + const int MB_DIVISOR = 1000 * 1024; + + const string DEBUG_TEMPLATE =@"Configuring Memory-Only Cache + | ----------------------------- + | Configuration: + | Table Size: {ts} + | Bucket Size: {bs} + | Max Objects: {obj} + | Memory Estimations: + | 4K blocks: {4k}Mb + | 8K blocks: {8k}Mb + | 16K blocks: {16K}Mb + | ----------------------------- +"; + + private readonly ICacheObjectSerialzer _serialzer; + private readonly ICacheObjectDeserialzer _deserialzer; + private readonly IBlobCacheTable _memCache; + private readonly IUnmangedHeap _bufferHeap; + + public MemoryCache(PluginBase pbase, IConfigScope config) + { + //Get nested memory cache config + MemoryCacheConfig memCacheConfig = config[VNCacheExtensions.MEMORY_CACHE_CONFIG_KEY].Deserialize<MemoryCacheConfig>()!; + + if (pbase.IsDebug()) + { + //Use the debug heap + IUnmangedHeap newHeap = MemoryUtil.InitializeNewHeapForProcess(); + + //Wrap in diag heap + _bufferHeap = new TrackedHeapWrapper(newHeap); + } + else + { + //Init new "private" heap to alloc buffer from + _bufferHeap = MemoryUtil.InitializeNewHeapForProcess(); + } + + //Setup cache table + _memCache = new BlobCacheTable(memCacheConfig.TableSize, memCacheConfig.BucketSize, _bufferHeap); + + /* + * Default to json serialization by using the default + * serializer and JSON options + */ + + JsonCacheObjectSerializer defaultSerializer = new(); + _serialzer = defaultSerializer; + _deserialzer = defaultSerializer; + + PrintDebug(pbase.Log, memCacheConfig); + } + + private static void PrintDebug(ILogProvider log, MemoryCacheConfig config) + { + long maxObjects = config.BucketSize * config.TableSize; + + long size4kMb = (maxObjects * 4096)/MB_DIVISOR; + long size8kMb = (maxObjects * 8128)/MB_DIVISOR; + long size16kMb = (maxObjects * 16384)/MB_DIVISOR; + + log.Debug(DEBUG_TEMPLATE, config.TableSize, config.BucketSize, maxObjects, size4kMb, size8kMb, size16kMb); + } + + ///<inheritdoc/> + public bool IsConnected { get; } = true; + + protected override void Free() + { + _memCache.Dispose(); + _bufferHeap.Dispose(); + } + + ///<inheritdoc/> + public Task AddOrUpdateAsync<T>(string key, string? newKey, T value, CancellationToken cancellation) + { + return AddOrUpdateAsync(key, newKey, value, _serialzer, cancellation); + } + + ///<inheritdoc/> + public async Task AddOrUpdateAsync<T>(string key, string? newKey, T value, ICacheObjectSerialzer serialzer, CancellationToken cancellation) + { + Check(); + + //Alloc serialzation buffer + using AddOrUpdateBuffer buffer = new (_bufferHeap); + + //Serialze the value + serialzer.Serialize(value, buffer); + + //Update object data + await _memCache.AddOrUpdateObjectAsync(key, newKey, static b => b.GetData(), buffer, default, cancellation); + } + + ///<inheritdoc/> + public Task DeleteAsync(string key, CancellationToken cancellation) + { + Check(); + return _memCache.DeleteObjectAsync(key, cancellation).AsTask(); + } + + ///<inheritdoc/> + public Task<T?> GetAsync<T>(string key, CancellationToken cancellation) => GetAsync<T>(key, _deserialzer, cancellation); + + ///<inheritdoc/> + public async Task<T?> GetAsync<T>(string key, ICacheObjectDeserialzer deserializer, CancellationToken cancellation) + { + Check(); + + IBlobCacheBucket bucket = _memCache.GetBucket(key); + + //Obtain cache handle + using (CacheBucketHandle handle = await bucket.WaitAsync(cancellation)) + { + //Try to read the value + if (handle.Cache.TryGetValue(key, out CacheEntry entry)) + { + return (T?)deserializer.Deserialze(typeof(T), entry.GetDataSegment()); + } + } + + return default; + } + } +}
\ No newline at end of file diff --git a/lib/VNLib.Plugins.Extensions.VNCache/src/MemoryCacheConfig.cs b/lib/VNLib.Plugins.Extensions.VNCache/src/MemoryCacheConfig.cs new file mode 100644 index 0000000..bcd821b --- /dev/null +++ b/lib/VNLib.Plugins.Extensions.VNCache/src/MemoryCacheConfig.cs @@ -0,0 +1,65 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Plugins.Extensions.VNCache +* File: MemoryCacheConfig.cs +* +* MemoryCacheConfig.cs is part of VNLib.Plugins.Extensions.VNCache +* which is part of the larger VNLib collection of libraries and utilities. +* +* VNLib.Plugins.Extensions.VNCache is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Plugins.Extensions.VNCache is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Text.Json.Serialization; + +namespace VNLib.Plugins.Extensions.VNCache +{ + internal sealed class MemoryCacheConfig : ICacheRefreshPolicy + { + [JsonPropertyName("buckets")] + public uint TableSize { get; set; } = 10; + + [JsonPropertyName("bucket_size")] + public uint BucketSize { get; set; } = 5000; + + [JsonPropertyName("max_object_size")] + public uint MaxBlobSize { get; set; } = 16 * 1024; + + [JsonIgnore] + public TimeSpan MaxCacheAge { get; set; } = TimeSpan.FromMinutes(1); + + [JsonPropertyName("max_age_sec")] + public uint MaxAgeSec + { + get => (uint)MaxCacheAge.TotalSeconds; + set => MaxCacheAge = TimeSpan.FromSeconds(value); + } + /* + * Default disable cache + */ + public TimeSpan RefreshInterval { get; set; } = TimeSpan.Zero; + + [JsonPropertyName("refresh_interval_sec")] + public uint RefreshSec + { + get => (uint)RefreshInterval.TotalSeconds; + set => RefreshInterval = TimeSpan.FromSeconds(value); + } + + [JsonPropertyName("write_through")] + public bool WriteThrough { get; set; } = true; + } +}
\ No newline at end of file diff --git a/lib/VNLib.Plugins.Extensions.VNCache/src/RemoteBackedMemoryCache.cs b/lib/VNLib.Plugins.Extensions.VNCache/src/RemoteBackedMemoryCache.cs new file mode 100644 index 0000000..67fb550 --- /dev/null +++ b/lib/VNLib.Plugins.Extensions.VNCache/src/RemoteBackedMemoryCache.cs @@ -0,0 +1,258 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Plugins.Extensions.VNCache +* File: RemoteBackedMemoryCache.cs +* +* RemoteBackedMemoryCache.cs is part of VNLib.Plugins.Extensions.VNCache +* which is part of the larger VNLib collection of libraries and utilities. +* +* VNLib.Plugins.Extensions.VNCache is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Plugins.Extensions.VNCache is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Linq; +using System.Buffers; +using System.Text.Json; +using System.Threading; +using System.Threading.Tasks; +using System.Runtime.CompilerServices; + +using VNLib.Utils; +using VNLib.Utils.Memory; +using VNLib.Utils.Logging; +using VNLib.Utils.Extensions; +using VNLib.Data.Caching; +using VNLib.Data.Caching.ObjectCache; +using VNLib.Plugins.Extensions.Loading; +using VNLib.Plugins.Extensions.Loading.Events; + +namespace VNLib.Plugins.Extensions.VNCache +{ + + [ConfigurationName(VNCacheExtensions.CACHE_CONFIG_KEY)] + internal sealed class RemoteBackedMemoryCache : VnCacheClient, IIntervalScheduleable + { + private readonly MemoryCacheConfig _cacheConfig; + private readonly ICacheObjectSerialzer _serialzer; + private readonly ICacheObjectDeserialzer _deserialzer; + private readonly IBlobCacheTable _memCache; + + public RemoteBackedMemoryCache(PluginBase plugin, IConfigScope config) : base(plugin, config) + { + //Get nested memory cache config + MemoryCacheConfig memCacheConfig = config[VNCacheExtensions.MEMORY_CACHE_CONFIG_KEY].Deserialize<MemoryCacheConfig>()!; + + //Setup cache table + _memCache = new BlobCacheTable(memCacheConfig.TableSize, memCacheConfig.BucketSize, Client.Config.BufferHeap ?? MemoryUtil.Shared); + + _cacheConfig = memCacheConfig; + + /* + * Default to json serialization by using the default + * serializer and JSON options + */ + + JsonCacheObjectSerializer defaultSerializer = new(); + _serialzer = defaultSerializer; + _deserialzer = defaultSerializer; + + //Schedule cache purge + if (memCacheConfig.RefreshInterval > TimeSpan.Zero) + { + plugin.ScheduleInterval(this, memCacheConfig.RefreshInterval); + } + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private void CheckConnected() + { + if (!IsConnected) + { + throw new InvalidOperationException("The client is not connected to the remote cache"); + } + } + + public override async Task DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken) + { + //Cleanup + try + { + await base.DoWorkAsync(pluginLog, exitToken); + } + finally + { + _memCache.Dispose(); + } + } + + ///<inheritdoc/> + public override Task AddOrUpdateAsync<T>(string key, string? newKey, T value, CancellationToken cancellation) + => AddOrUpdateAsync(key, newKey, value, _serialzer, cancellation); + + ///<inheritdoc/> + public override Task DeleteAsync(string key, CancellationToken cancellation) + { + CheckConnected(); + + //Delete the object from + Task local = _memCache.DeleteObjectAsync(key, cancellation).AsTask(); + Task remote = Client.DeleteObjectAsync(key, cancellation); + + //task when both complete + return Task.WhenAll(local, remote); + } + + ///<inheritdoc/> + public override Task<T> GetAsync<T>(string key, CancellationToken cancellation) => GetAsync<T>(key, _deserialzer, cancellation); + + ///<inheritdoc/> + public override async Task<T> GetAsync<T>(string key, ICacheObjectDeserialzer deserializer, CancellationToken cancellation) + { + CheckConnected(); + + Type objType = typeof(T); + + IBlobCacheBucket bucket = _memCache.GetBucket(key); + + //Obtain cache handle + using (CacheBucketHandle handle = await bucket.WaitAsync(cancellation)) + { + //Try to read the value + if (handle.Cache.TryGetValue(key, out CacheEntry entry)) + { + return (T?)deserializer.Deserialze(objType, entry.GetDataSegment()); + } + } + + //Alloc buffer from client heap + using ObjectGetBuffer getBuffer = new(Client.Config.BufferHeap); + + //Get the object from the server + await Client.GetObjectAsync(key, getBuffer, cancellation); + + //See if object data was set + if (getBuffer.GetData().IsEmpty) + { + return default; + } + + //Update local cache + await _memCache.AddOrUpdateObjectAsync(key, null, static b => b.GetData(), getBuffer, DateTime.UtcNow, CancellationToken.None); + + //Deserialze the entity + return (T)deserializer.Deserialze(objType, getBuffer.GetData()); + } + + ///<inheritdoc/> + public override async Task AddOrUpdateAsync<T>(string key, string? newKey, T value, ICacheObjectSerialzer serialzer, CancellationToken cancellation) + { + CheckConnected(); + + DateTime currentTime = DateTime.UtcNow; + + //Alloc serialzation buffer + using AddOrUpdateBuffer buffer = new (Client.Config.BufferHeap); + + //Serialze the value + serialzer.Serialize(value, buffer); + + try + { + //Update remote first, and if exceptions are raised, do not update local cache + await Client.AddOrUpdateObjectAsync(key, newKey, (IObjectData)buffer, cancellation); + + await _memCache.AddOrUpdateObjectAsync(key, newKey, static b => b.GetData(), buffer, currentTime, CancellationToken.None); + } + catch + { + //Remove local cache if exception occurs + await _memCache.DeleteObjectAsync(key, CancellationToken.None); + throw; + } + } + + async Task IIntervalScheduleable.OnIntervalAsync(ILogProvider log, CancellationToken cancellationToken) + { + if(!IsConnected) + { + return; + } + + //Get buckets + IBlobCacheBucket[] buckets = _memCache.ToArray(); + + foreach (IBlobCacheBucket bucket in buckets) + { + //enter bucket lock + using CacheBucketHandle handle = await bucket.WaitAsync(cancellationToken); + + //Prune expired entires + PruneExpired(handle.Cache); + } + } + + private void PruneExpired(IBlobCache cache) + { + DateTime current = DateTime.UtcNow; + + //Enumerate all cache entires to determine if they have expired + string[] expired = (from ec in cache + where ec.Value.GetTime().Add(_cacheConfig.MaxCacheAge) < current + select ec.Key) + .ToArray(); + + //Remove expired entires + for(int i = 0; i < expired.Length; i++) + { + cache.Remove(expired[i]); + } + + Client.Config.DebugLog?.Debug("Cleaned {mc} expired memory cache elements", expired.Length); + } + + /* + * A buffer to store object data on a cache get + */ + private sealed class ObjectGetBuffer : VnDisposeable, IObjectData + { + private IMemoryHandle<byte>? _buffer; + private readonly IUnmangedHeap _heap; + + public ObjectGetBuffer(IUnmangedHeap heap) + { + _heap = heap; + } + + public ReadOnlySpan<byte> GetData() + { + return _buffer == null ? ReadOnlySpan<byte>.Empty : _buffer.Span; + } + + public void SetData(ReadOnlySpan<byte> data) + { + //Alloc a buffer from the supplied data + _buffer = data.IsEmpty ? null : _heap.AllocAndCopy(data); + } + + protected override void Free() + { + //Free buffer + _buffer?.Dispose(); + } + } + + } +}
\ No newline at end of file diff --git a/lib/VNLib.Plugins.Extensions.VNCache/src/VNCacheExtensions.cs b/lib/VNLib.Plugins.Extensions.VNCache/src/VNCacheExtensions.cs index 75b9bd4..995b71a 100644 --- a/lib/VNLib.Plugins.Extensions.VNCache/src/VNCacheExtensions.cs +++ b/lib/VNLib.Plugins.Extensions.VNCache/src/VNCacheExtensions.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2022 Vaughn Nugent +* Copyright (c) 2023 Vaughn Nugent * * Library: VNLib * Package: VNLib.Plugins.Extensions.VNCache @@ -23,14 +23,13 @@ */ using System; -using System.Text.Json; -using System.Threading.Tasks; -using System.Collections.Generic; +using System.Runtime.CompilerServices; -using VNLib.Utils.Logging; +using VNLib.Hashing; +using VNLib.Utils.Memory; +using VNLib.Utils.Extensions; using VNLib.Data.Caching; -using VNLib.Data.Caching.Extensions; -using VNLib.Plugins.Extensions.Loading; +using VNLib.Plugins.Extensions.VNCache.DataModel; namespace VNLib.Plugins.Extensions.VNCache { @@ -40,72 +39,84 @@ namespace VNLib.Plugins.Extensions.VNCache /// </summary> public static class VNCacheExtensions { + internal const string CACHE_CONFIG_KEY = "vncache"; + internal const string MEMORY_CACHE_CONFIG_KEY = "memory_cache"; + internal const string MEMORY_CACHE_ONLY_KEY = "memory_only"; + + /// <summary> - /// Loads the shared cache provider for the current plugin + /// Gets a simple scoped cache based on an entity prefix. The prefix is appended + /// to the object id on each cache operation /// </summary> - /// <param name="pbase"></param> - /// <param name="localized">A localized log provider to write cache logging information to</param> - /// <returns>The shared <see cref="IGlobalCacheProvider"/> </returns> - /// <remarks> - /// The returned instance, background work, logging, and its lifetime - /// are managed by the current plugin. Beware when calling this method - /// network connections may be spawend and managed in the background by - /// this library. - /// </remarks> - public static VnCacheClient GetGlobalCache(this PluginBase pbase, ILogProvider? localized = null) - => LoadingExtensions.GetOrCreateSingleton<VnCacheClient>(pbase, localized == null ? LoadCacheClient : (pbase) => LoadCacheClient(pbase, localized)); - - private static VnCacheClient LoadCacheClient(PluginBase pbase) => LoadCacheClient(pbase, pbase.Log); - - private static VnCacheClient LoadCacheClient(PluginBase pbase, ILogProvider localized) + /// <param name="cache"></param> + /// <param name="prefix">The simple prefix string to append to object ids before computing hashes</param> + /// <param name="digest">The algorithm used to hash the combined object-ids</param> + /// <param name="encoding">The string encoding method used to encode the hash output</param> + /// <returns>The <see cref="ScopedCache"/> instance that will use the prefix to compute object ids</returns> + /// <exception cref="ArgumentNullException"></exception> + public static ScopedCache GetPrefixedCache(this IGlobalCacheProvider cache, string prefix, HashAlg digest = HashAlg.SHA1, HashEncodingMode encoding = HashEncodingMode.Base64) { - //Get config for client - IReadOnlyDictionary<string, JsonElement> config = pbase.GetConfigForType<VnCacheClient>(); - - //Init client - ILogProvider? debugLog = pbase.IsDebug() ? pbase.Log : null; - VnCacheClient client = new(debugLog); - - //Begin cache connections by scheduling a task on the plugin's scheduler - _ = pbase.ObserveTask(() => RunClientAsync(pbase, config, localized, client), 250); - - return client; + _ = cache ?? throw new ArgumentNullException(nameof(cache)); + _ = prefix ?? throw new ArgumentNullException(nameof(prefix)); + //Create simple cache key generator + SimpleCacheKeyImpl keyProv = new(prefix, digest, encoding); + //Create the scoped cache from the simple provider + return cache.GetScopedCache(keyProv); } - private static async Task RunClientAsync(PluginBase pbase, IReadOnlyDictionary<string, JsonElement> config, ILogProvider localized, VnCacheClient client) + private sealed class SimpleCacheKeyImpl : ICacheKeyGenerator { - ILogProvider Log = localized; + private readonly string Prefix; + private readonly HashAlg Digest; + private readonly HashEncodingMode Encoding; - try + public SimpleCacheKeyImpl(string prefix, HashAlg digest, HashEncodingMode encoding) { - //Try loading config - await client.LoadConfigAsync(pbase, config); + Prefix = prefix; + Digest = digest; + Encoding = encoding; + } - Log.Verbose("VNCache client configration loaded successfully"); + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private int ComputeBufferSize(string id) => id.Length + Prefix.Length; - //Run and wait for exit - await client.RunAsync(Log, pbase.UnloadToken); - } - catch (OperationCanceledException) - { } - catch (KeyNotFoundException e) - { - Log.Error("Missing required configuration variable for VnCache client: {0}", e.Message); - } - catch (FBMServerNegiationException fne) - { - Log.Error("Failed to negotiate connection with cache server {reason}", fne.Message); - } - catch (Exception ex) - { - Log.Error(ex, "Unhandled exception occured in background cache client listening task"); - } - finally + + string ICacheKeyGenerator.ComputedKey(string entityId) { - client.Dispose(); - } + //Compute the required character buffer size + int bufferSize = ComputeBufferSize(entityId); + + if(bufferSize < 128) + { + //Stack alloc a buffer + Span<char> buffer = stackalloc char[bufferSize]; + + //Writer to accumulate data + ForwardOnlyWriter<char> writer = new(buffer); + + //Append prefix and entity id + writer.Append(Prefix); + writer.Append(entityId); - Log.Information("Cache client exited"); - } + //Compute the simple hash of the combined values + return ManagedHash.ComputeHash(writer.AsSpan(), Digest, Encoding); + } + else + { + //Alloc heap buffer for string concatination + using UnsafeMemoryHandle<char> buffer = MemoryUtil.UnsafeAlloc<char>(bufferSize, true); + + //Writer to accumulate data + ForwardOnlyWriter<char> writer = new(buffer); + + //Append prefix and entity id + writer.Append(Prefix); + writer.Append(entityId); + + //Compute the simple hash of the combined values + return ManagedHash.ComputeHash(writer.AsSpan(), Digest, Encoding); + } + } + } } } diff --git a/lib/VNLib.Plugins.Extensions.VNCache/src/VNLib.Plugins.Extensions.VNCache.csproj b/lib/VNLib.Plugins.Extensions.VNCache/src/VNLib.Plugins.Extensions.VNCache.csproj index f8f3e29..1308c4b 100644 --- a/lib/VNLib.Plugins.Extensions.VNCache/src/VNLib.Plugins.Extensions.VNCache.csproj +++ b/lib/VNLib.Plugins.Extensions.VNCache/src/VNLib.Plugins.Extensions.VNCache.csproj @@ -4,9 +4,6 @@ <TargetFramework>net6.0</TargetFramework> <RootNamespace>VNLib.Plugins.Extensions.VNCache</RootNamespace> <AssemblyName>VNLib.Plugins.Extensions.VNCache</AssemblyName> - <Version>1.0.1.1</Version> - <SignAssembly>True</SignAssembly> - <AssemblyOriginatorKeyFile>\\vaughnnugent.com\Internal\Folder Redirection\vman\Documents\Programming\Software\StrongNameingKey.snk</AssemblyOriginatorKeyFile> <Nullable>enable</Nullable> <AnalysisLevel>latest-all</AnalysisLevel> <GenerateDocumentationFile>True</GenerateDocumentationFile> @@ -14,14 +11,27 @@ <PropertyGroup> <Authors>Vaughn Nugent</Authors> + <Company>Vaughn Nugent</Company> + <Product>VNLib.Plugins.Extensions.VNCache</Product> + <PackageId>VNLib.Plugins.Extensions.VNCache</PackageId> + <Description> + An Essentials framework extension library for integrating VNCache global caching provider into applications + for distributed data caching, and extensions for entity caching. + </Description> <Copyright>Copyright © 2023 Vaughn Nugent</Copyright> - <PackageProjectUrl>https://www.vaughnnugent.com/resources/software</PackageProjectUrl> + <PackageProjectUrl>https://www.vaughnnugent.com/resources/software/modules/VNLib.Data.Caching</PackageProjectUrl> + <RepositoryUrl>https://github.com/VnUgE/VNLib.Data.Caching/tree/master/lib/VNLib.Plugins.Extensions.VNCache</RepositoryUrl> </PropertyGroup> <ItemGroup> <ProjectReference Include="..\..\..\..\Extensions\lib\VNLib.Plugins.Extensions.Loading\src\VNLib.Plugins.Extensions.Loading.csproj" /> <ProjectReference Include="..\..\VNLib.Data.Caching.Extensions\src\VNLib.Data.Caching.Extensions.csproj" /> + <ProjectReference Include="..\..\VNLib.Data.Caching.ObjectCache\src\VNLib.Data.Caching.ObjectCache.csproj" /> <ProjectReference Include="..\..\VNLib.Data.Caching\src\VNLib.Data.Caching.csproj" /> </ItemGroup> + <ItemGroup> + <Folder Include="ClientCache\" /> + </ItemGroup> + </Project> diff --git a/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs b/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs index a216f18..1e1b74c 100644 --- a/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs +++ b/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2022 Vaughn Nugent +* Copyright (c) 2023 Vaughn Nugent * * Library: VNLib * Package: VNLib.Plugins.Extensions.VNCache @@ -24,7 +24,6 @@ using System; using System.Net.Http; -using System.Text.Json; using System.Threading; using System.Threading.Tasks; using System.Net.Sockets; @@ -32,71 +31,74 @@ using System.Net.WebSockets; using System.Collections.Generic; using System.Security.Cryptography; -using VNLib.Utils; using VNLib.Utils.Memory; using VNLib.Utils.Logging; using VNLib.Utils.Extensions; using VNLib.Hashing.IdentityUtility; using VNLib.Data.Caching; using VNLib.Data.Caching.Extensions; +using VNLib.Data.Caching.ObjectCache; using VNLib.Net.Messaging.FBM.Client; using VNLib.Plugins.Extensions.Loading; namespace VNLib.Plugins.Extensions.VNCache { - /// <summary> - /// A wrapper to simplify a shared global cache client - /// </summary> - [ConfigurationName("vncache")] - public sealed class VnCacheClient : VnDisposeable, IGlobalCacheProvider + public interface ICacheRefreshPolicy { - FBMClient? _client; + TimeSpan MaxCacheAge { get; } - private TimeSpan RetryInterval; + TimeSpan RefreshInterval { get; } + } - private readonly ILogProvider? DebugLog; - private readonly IUnmangedHeap? ClientHeap; + /// <summary> + /// A base class that manages + /// </summary> + [ConfigurationName(VNCacheExtensions.CACHE_CONFIG_KEY)] + internal class VnCacheClient : IGlobalCacheProvider, IAsyncBackgroundWork, IAsyncConfigurable + { + private readonly TimeSpan RetryInterval; /// <summary> - /// Initializes an emtpy client wrapper that still requires - /// configuration loading + /// The internal client /// </summary> - /// <param name="debugLog">An optional debugging log</param> - /// <param name="heap">An optional <see cref="IUnmangedHeap"/> for <see cref="FBMClient"/> buffers</param> - internal VnCacheClient(ILogProvider? debugLog, IUnmangedHeap? heap = null) - { - DebugLog = debugLog; - //Default to 10 seconds - RetryInterval = TimeSpan.FromSeconds(10); - - ClientHeap = heap; - } - - ///<inheritdoc/> - protected override void Free() - { - _client?.Dispose(); - _client = null; - } - + public FBMClient Client { get; } /// <summary> - /// Loads required configuration variables from the config store and - /// intializes the interal client + /// Gets a value that determines if the client is currently connected to a server /// </summary> - /// <param name="pbase"></param> - /// <param name="config">A dictionary of configuration varables</param> - /// <exception cref="KeyNotFoundException"></exception> - internal async Task LoadConfigAsync(PluginBase pbase, IReadOnlyDictionary<string, JsonElement> config) + public bool IsConnected { get; private set; } + + public VnCacheClient(PluginBase pbase, IConfigScope config) { + //Get required configuration variables int maxMessageSize = config["max_message_size"].GetInt32(); string? brokerAddress = config["broker_address"].GetString() ?? throw new KeyNotFoundException("Missing required configuration variable broker_address"); + RetryInterval = config["retry_interval_sec"].GetTimeSpan(TimeParseType.Seconds); + TimeSpan timeout = config["request_timeout_sec"].GetTimeSpan(TimeParseType.Seconds); + + Uri brokerUri = new(brokerAddress); + + //Setup debug log if the plugin is in debug mode + ILogProvider? debugLog = pbase.IsDebug() ? pbase.Log : null; + + //Init the client with default settings + FBMClientConfig conf = FBMDataCacheExtensions.GetDefaultConfig(MemoryUtil.Shared, maxMessageSize, timeout, debugLog); + + Client = new(conf); + //Add the configuration to the client + Client.GetCacheConfiguration() + .WithBroker(brokerUri) + .WithTls(brokerUri.Scheme == Uri.UriSchemeHttps); + } + + public virtual async Task ConfigureServiceAsync(PluginBase plugin) + { //Get keys async - Task<ReadOnlyJsonWebKey?> clientPrivTask = pbase.TryGetSecretAsync("client_private_key").ToJsonWebKey(); - Task<ReadOnlyJsonWebKey?> brokerPubTask = pbase.TryGetSecretAsync("broker_public_key").ToJsonWebKey(); - Task<ReadOnlyJsonWebKey?> cachePubTask = pbase.TryGetSecretAsync("cache_public_key").ToJsonWebKey(); + Task<ReadOnlyJsonWebKey?> clientPrivTask = plugin.TryGetSecretAsync("client_private_key").ToJsonWebKey(); + Task<ReadOnlyJsonWebKey?> brokerPubTask = plugin.TryGetSecretAsync("broker_public_key").ToJsonWebKey(); + Task<ReadOnlyJsonWebKey?> cachePubTask = plugin.TryGetSecretAsync("cache_public_key").ToJsonWebKey(); //Wait for all tasks to complete _ = await Task.WhenAll(clientPrivTask, brokerPubTask, cachePubTask); @@ -105,132 +107,151 @@ namespace VNLib.Plugins.Extensions.VNCache ReadOnlyJsonWebKey brokerPub = await brokerPubTask ?? throw new KeyNotFoundException("Missing required secret broker_public_key"); ReadOnlyJsonWebKey cachePub = await cachePubTask ?? throw new KeyNotFoundException("Missing required secret cache_public_key"); - RetryInterval = config["retry_interval_sec"].GetTimeSpan(TimeParseType.Seconds); - - Uri brokerUri = new(brokerAddress); - - //Init the client with default settings - FBMClientConfig conf = FBMDataCacheExtensions.GetDefaultConfig(ClientHeap ?? MemoryUtil.Shared, maxMessageSize, DebugLog); - - _client = new(conf); - - //Add the configuration to the client - _client.GetCacheConfiguration() - .WithBroker(brokerUri) + //Connection authentication methods + Client.GetCacheConfiguration() .WithVerificationKey(cachePub) .WithSigningCertificate(clientPriv) - .WithBrokerVerificationKey(brokerPub) - .WithTls(brokerUri.Scheme == Uri.UriSchemeHttps); + .WithBrokerVerificationKey(brokerPub); } - /// <summary> - /// Discovers nodes in the configured cluster and connects to a random node - /// </summary> - /// <param name="Log">A <see cref="ILogProvider"/> to write log events to</param> - /// <param name="cancellationToken">A token to cancel the operation</param> - /// <returns>A task that completes when the operation has been cancelled or an unrecoverable error occured</returns> - /// <exception cref="InvalidOperationException"></exception> - /// <exception cref="OperationCanceledException"></exception> - internal async Task RunAsync(ILogProvider Log, CancellationToken cancellationToken) + /* + * Background work method manages the remote cache connection + * to the cache cluster + */ + public virtual async Task DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken) { - _ = _client ?? throw new InvalidOperationException("Client configuration not loaded, cannot connect to cache servers"); - - while (true) - { - //Load the server list - ActiveServer[]? servers; + try + { while (true) { - try + //Load the server list + ActiveServer[]? servers; + while (true) { - Log.Debug("Discovering cluster nodes in broker"); - //Get server list - servers = await _client.DiscoverCacheNodesAsync(cancellationToken); - break; + try + { + pluginLog.Debug("Discovering cluster nodes in broker"); + //Get server list + servers = await Client.DiscoverCacheNodesAsync(exitToken); + break; + } + catch (HttpRequestException re) when (re.InnerException is SocketException) + { + pluginLog.Warn("Broker server is unreachable"); + } + catch (Exception ex) + { + pluginLog.Warn("Failed to get server list from broker, reason {r}", ex.Message); + } + + //Gen random ms delay + int randomMsDelay = RandomNumberGenerator.GetInt32(1000, 2000); + await Task.Delay(randomMsDelay, exitToken); } - catch (HttpRequestException re) when (re.InnerException is SocketException) - { - Log.Warn("Broker server is unreachable"); - } - catch (Exception ex) + + if (servers?.Length == 0) { - Log.Warn("Failed to get server list from broker, reason {r}", ex.Message); + pluginLog.Warn("No cluster nodes found, retrying"); + await Task.Delay(RetryInterval, exitToken); + continue; } - //Gen random ms delay - int randomMsDelay = RandomNumberGenerator.GetInt32(1000, 2000); - await Task.Delay(randomMsDelay, cancellationToken); - } - - if (servers?.Length == 0) - { - Log.Warn("No cluster nodes found, retrying"); - await Task.Delay(RetryInterval, cancellationToken); - continue; - } - - try - { - Log.Debug("Connecting to random cache server"); + try + { + pluginLog.Debug("Connecting to random cache server"); - //Connect to a random server - ActiveServer selected = await _client.ConnectToRandomCacheAsync(cancellationToken); - Log.Debug("Connected to cache server {s}", selected.ServerId); + //Connect to a random server + ActiveServer selected = await Client.ConnectToRandomCacheAsync(exitToken); + pluginLog.Debug("Connected to cache server {s}", selected.ServerId); - //Set connection status flag - IsConnected = true; + //Set connection status flag + IsConnected = true; - //Wait for disconnect - await _client.WaitForExitAsync(cancellationToken); + //Wait for disconnect + await Client.WaitForExitAsync(exitToken); - Log.Debug("Cache server disconnected"); - } - catch (WebSocketException wse) - { - Log.Warn("Failed to connect to cache server {reason}", wse.Message); - continue; - } - catch (HttpRequestException he) when (he.InnerException is SocketException) - { - Log.Debug("Failed to connect to random cache server server"); - //Continue next loop - continue; - } - finally - { - IsConnected = false; + pluginLog.Debug("Cache server disconnected"); + } + catch (WebSocketException wse) + { + pluginLog.Warn("Failed to connect to cache server {reason}", wse.Message); + continue; + } + catch (HttpRequestException he) when (he.InnerException is SocketException) + { + pluginLog.Debug("Failed to connect to random cache server server"); + //Continue next loop + continue; + } + finally + { + IsConnected = false; + } } } + catch (OperationCanceledException) + { + //Normal exit from listening loop + } + catch (KeyNotFoundException e) + { + pluginLog.Error("Missing required configuration variable for VnCache client: {0}", e.Message); + } + catch (FBMServerNegiationException fne) + { + pluginLog.Error("Failed to negotiate connection with cache server {reason}", fne.Message); + } + catch (Exception ex) + { + pluginLog.Error(ex, "Unhandled exception occured in background cache client listening task"); + } + finally + { + //Dispose the client on exit + Client.Dispose(); + } + pluginLog.Information("Cache client exited"); } ///<inheritdoc/> - public bool IsConnected { get; private set; } - - - ///<inheritdoc/> - public Task AddOrUpdateAsync<T>(string key, string? newKey, T value, CancellationToken cancellation) + public virtual Task AddOrUpdateAsync<T>(string key, string? newKey, T value, CancellationToken cancellation) { return !IsConnected ? throw new InvalidOperationException("The underlying client is not connected to a cache node") - : _client!.AddOrUpdateObjectAsync(key, newKey, value, cancellation); + : Client!.AddOrUpdateObjectAsync(key, newKey, value, cancellation); } ///<inheritdoc/> - public Task DeleteAsync(string key, CancellationToken cancellation) + public virtual Task DeleteAsync(string key, CancellationToken cancellation) { return !IsConnected ? throw new InvalidOperationException("The underlying client is not connected to a cache node") - : _client!.DeleteObjectAsync(key, cancellation); + : Client!.DeleteObjectAsync(key, cancellation); } - - + ///<inheritdoc/> - public Task<T?> GetAsync<T>(string key, CancellationToken cancellation) + public virtual Task<T?> GetAsync<T>(string key, CancellationToken cancellation) { return !IsConnected ? throw new InvalidOperationException("The underlying client is not connected to a cache node") - : _client!.GetObjectAsync<T>(key, cancellation); + : Client!.GetObjectAsync<T>(key, cancellation); + } + + ///<inheritdoc/> + public virtual Task<T?> GetAsync<T>(string key, ICacheObjectDeserialzer deserializer, CancellationToken cancellation) + { + return !IsConnected + ? throw new InvalidOperationException("The underlying client is not connected to a cache node") + : Client!.GetObjectAsync<T>(key, deserializer, cancellation); + } + + ///<inheritdoc/> + public virtual Task AddOrUpdateAsync<T>(string key, string? newKey, T value, ICacheObjectSerialzer serialzer, CancellationToken cancellation) + { + return !IsConnected + ? throw new InvalidOperationException("The underlying client is not connected to a cache node") + : Client!.AddOrUpdateObjectAsync(key, newKey, value, serialzer, cancellation); } } }
\ No newline at end of file diff --git a/lib/VNLib.Plugins.Extensions.VNCache/src/VnGlobalCache.cs b/lib/VNLib.Plugins.Extensions.VNCache/src/VnGlobalCache.cs new file mode 100644 index 0000000..3cdebe3 --- /dev/null +++ b/lib/VNLib.Plugins.Extensions.VNCache/src/VnGlobalCache.cs @@ -0,0 +1,102 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Plugins.Extensions.VNCache +* File: VnGlobalCache.cs +* +* VnGlobalCache.cs is part of VNLib.Plugins.Extensions.VNCache which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Plugins.Extensions.VNCache is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Plugins.Extensions.VNCache is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System.Text.Json; +using System.Threading; +using System.Threading.Tasks; + +using VNLib.Data.Caching; +using VNLib.Plugins.Extensions.Loading; + +namespace VNLib.Plugins.Extensions.VNCache +{ + /// <summary> + /// A wrapper to simplify a shared global cache client + /// </summary> + [ConfigurationName(VNCacheExtensions.CACHE_CONFIG_KEY)] + public sealed class VnGlobalCache : IGlobalCacheProvider + { + private readonly IGlobalCacheProvider _client; + + /// <summary> + /// Initializes an emtpy client wrapper that still requires + /// configuration loading + /// </summary> + public VnGlobalCache(PluginBase pbase, IConfigScope config) + { + if (config.TryGetValue(VNCacheExtensions.MEMORY_CACHE_CONFIG_KEY, out _)) + { + //Check for memory only flag + if (config.TryGetValue(VNCacheExtensions.MEMORY_CACHE_ONLY_KEY, out JsonElement memOnly) && memOnly.GetBoolean()) + { + //Create a memory-only cache + _client = pbase.GetOrCreateSingleton<MemoryCache>(); + } + else + { + //Remote-backed memory cache + _client = pbase.GetOrCreateSingleton<RemoteBackedMemoryCache>(); + } + } + else + { + //Setup non-memory backed cache client + _client = pbase.GetOrCreateSingleton<VnCacheClient>(); + } + } + + ///<inheritdoc/> + public bool IsConnected => _client.IsConnected; + + ///<inheritdoc/> + public Task AddOrUpdateAsync<T>(string key, string? newKey, T value, CancellationToken cancellation) + { + return _client.AddOrUpdateAsync(key, newKey, value, cancellation); + } + + ///<inheritdoc/> + public Task AddOrUpdateAsync<T>(string key, string? newKey, T value, ICacheObjectSerialzer serialzer, CancellationToken cancellation) + { + return _client.AddOrUpdateAsync(key, newKey, value, serialzer, cancellation); + } + + ///<inheritdoc/> + public Task DeleteAsync(string key, CancellationToken cancellation) + { + return _client.DeleteAsync(key, cancellation); + } + + ///<inheritdoc/> + public Task<T?> GetAsync<T>(string key, CancellationToken cancellation) + { + return _client.GetAsync<T>(key, cancellation); + } + + ///<inheritdoc/> + public Task<T?> GetAsync<T>(string key, ICacheObjectDeserialzer deserializer, CancellationToken cancellation) + { + return _client.GetAsync<T>(key, deserializer, cancellation); + } + } +}
\ No newline at end of file diff --git a/plugins/CacheBroker/src/CacheBroker.csproj b/plugins/CacheBroker/src/CacheBroker.csproj index fa28dce..5086a4e 100644 --- a/plugins/CacheBroker/src/CacheBroker.csproj +++ b/plugins/CacheBroker/src/CacheBroker.csproj @@ -3,30 +3,25 @@ <PropertyGroup> <TargetFramework>net6.0</TargetFramework> <RootNamespace>VNLib.Plugins.Cache.Broker</RootNamespace> - <Version>1.0.1.2</Version> + <AssemblyName>CacheBroker</AssemblyName> <GenerateDocumentationFile>True</GenerateDocumentationFile> <AnalysisLevel>latest-all</AnalysisLevel> - <SignAssembly>True</SignAssembly> - <AssemblyOriginatorKeyFile>\\vaughnnugent.com\Internal\Folder Redirection\vman\Documents\Programming\Software\StrongNameingKey.snk</AssemblyOriginatorKeyFile> + <!--Enable dynamic loading--> + <EnableDynamicLoading>true</EnableDynamicLoading> </PropertyGroup> - <ItemGroup> - <PackageReference Include="ErrorProne.NET.CoreAnalyzers" Version="0.1.2"> - <PrivateAssets>all</PrivateAssets> - <IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets> - </PackageReference> - <PackageReference Include="ErrorProne.NET.Structs" Version="0.1.2"> - <PrivateAssets>all</PrivateAssets> - <IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets> - </PackageReference> - </ItemGroup> - <PropertyGroup> - <!--Enable dynamic loading--> - <EnableDynamicLoading>true</EnableDynamicLoading> <Authors>Vaughn Nugent</Authors> + <Company>Vaughn Nugent</Company> + <Product>VNLib.Plugins.Cache.Broker</Product> + <PackageId>VNLib.Plugins.Cache.Broker</PackageId> + <Description> + An Essentials framework plugin for implementing a centralized cache server discovery broker. This assembly may also be referrenced + as a library for providing the discovery endpoint. + </Description> <Copyright>Copyright © 2023 Vaughn Nugent</Copyright> - <PackageProjectUrl>https://www.vaughnnugent.com/resources/software</PackageProjectUrl> + <PackageProjectUrl>https://www.vaughnnugent.com/resources/software/modules/VNLib.Data.Caching</PackageProjectUrl> + <RepositoryUrl>https://github.com/VnUgE/VNLib.Data.Caching/tree/master/plugins/CacheBroker</RepositoryUrl> </PropertyGroup> <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'"> @@ -36,7 +31,18 @@ <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|AnyCPU'"> <Deterministic>False</Deterministic> </PropertyGroup> - + + <ItemGroup> + <PackageReference Include="ErrorProne.NET.CoreAnalyzers" Version="0.1.2"> + <PrivateAssets>all</PrivateAssets> + <IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets> + </PackageReference> + <PackageReference Include="ErrorProne.NET.Structs" Version="0.1.2"> + <PrivateAssets>all</PrivateAssets> + <IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets> + </PackageReference> + </ItemGroup> + <ItemGroup> <ProjectReference Include="..\..\..\..\..\core\lib\Net.Rest.Client\src\VNLib.Net.Rest.Client.csproj" /> <ProjectReference Include="..\..\..\..\..\core\lib\Plugins\src\VNLib.Plugins.csproj" /> diff --git a/plugins/CacheBroker/src/Endpoints/BrokerRegistrationEndpoint.cs b/plugins/CacheBroker/src/Endpoints/BrokerRegistrationEndpoint.cs index 8f983ac..61540b9 100644 --- a/plugins/CacheBroker/src/Endpoints/BrokerRegistrationEndpoint.cs +++ b/plugins/CacheBroker/src/Endpoints/BrokerRegistrationEndpoint.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2022 Vaughn Nugent +* Copyright (c) 2023 Vaughn Nugent * * Library: VNLib * Package: CacheBroker @@ -98,7 +98,7 @@ namespace VNLib.Plugins.Cache.Broker.Endpoints DisableVerifySessionCors = true, }; - public BrokerRegistrationEndpoint(PluginBase plugin, IReadOnlyDictionary<string, JsonElement> config) + public BrokerRegistrationEndpoint(PluginBase plugin, IConfigScope config) { string? path = config["path"].GetString(); @@ -412,10 +412,12 @@ namespace VNLib.Plugins.Cache.Broker.Endpoints } - void IDisposable.Dispose() + ///<inheritdoc/> + public void Dispose() { //Cleanup client pool when exiting ClientPool.Dispose(); + GC.SuppressFinalize(this); } } } diff --git a/plugins/ObjectCacheServer/src/Endpoints/CacheConfiguration.cs b/plugins/ObjectCacheServer/src/Endpoints/CacheConfiguration.cs index e9584b6..f7adeb3 100644 --- a/plugins/ObjectCacheServer/src/Endpoints/CacheConfiguration.cs +++ b/plugins/ObjectCacheServer/src/Endpoints/CacheConfiguration.cs @@ -1,11 +1,11 @@ /* -* Copyright (c) 2022 Vaughn Nugent +* Copyright (c) 2023 Vaughn Nugent * * Library: VNLib * Package: ObjectCacheServer -* File: ConnectEndpoint.cs +* File: CacheConfiguration.cs * -* ConnectEndpoint.cs is part of ObjectCacheServer which is part of the larger +* CacheConfiguration.cs is part of ObjectCacheServer which is part of the larger * VNLib collection of libraries and utilities. * * ObjectCacheServer is free software: you can redistribute it and/or modify @@ -49,6 +49,9 @@ namespace VNLib.Data.Caching.ObjectCache.Server [JsonPropertyName("max_cache")] - public int MaxCacheEntries { get; set; } = 10000; + public uint MaxCacheEntries { get; set; } = 10000; + + [JsonPropertyName("buckets")] + public uint BucketCount { get; set; } = 10; } } diff --git a/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs b/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs index 9a1ece0..1a7331d 100644 --- a/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs +++ b/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs @@ -45,7 +45,6 @@ using VNLib.Plugins.Essentials; using VNLib.Plugins.Extensions.Loading; using VNLib.Plugins.Essentials.Endpoints; using VNLib.Plugins.Essentials.Extensions; -using System.Text.Json.Serialization; namespace VNLib.Data.Caching.ObjectCache.Server { @@ -56,7 +55,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server private static readonly TimeSpan AuthTokenExpiration = TimeSpan.FromSeconds(30); private readonly string AudienceLocalServerId; - private readonly ObjectCacheStore Store; + private readonly BlobCacheLIstener Store; private readonly PluginBase Pbase; private readonly ConcurrentDictionary<string, AsyncQueue<ChangeEvent>> StatefulEventQueue; @@ -81,7 +80,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server DisableCrossSiteDenied = true }; - public ConnectEndpoint(PluginBase plugin, IReadOnlyDictionary<string, JsonElement> config) + public ConnectEndpoint(PluginBase plugin, IConfigScope config) { string? path = config["path"].GetString(); @@ -106,7 +105,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server StatefulEventQueue = new(StringComparer.OrdinalIgnoreCase); //Init the cache store - Store = InitializeCache((ObjectCacheServerEntry)plugin, CacheConfig.MaxCacheEntries); + Store = InitializeCache((ObjectCacheServerEntry)plugin, CacheConfig.BucketCount, CacheConfig.MaxCacheEntries); /* * Generate a random guid for the current server when created so we @@ -118,7 +117,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server _ = plugin.ObserveWork(this, 100); } - private static ObjectCacheStore InitializeCache(ObjectCacheServerEntry plugin, int maxCache) + private static BlobCacheLIstener InitializeCache(ObjectCacheServerEntry plugin, uint buckets, uint maxCache) { if(maxCache < 2) { @@ -131,8 +130,10 @@ namespace VNLib.Data.Caching.ObjectCache.Server plugin.Log.Information("Suggestion: You may want a larger cache size, you have less than 200 items in cache"); } + plugin.Log.Verbose("Creating cache store with {bc} buckets, with {mc} items/bucket", buckets, maxCache); + //Endpoint only allows for a single reader - return new (maxCache, plugin.Log, plugin.CacheHeap, true); + return new (buckets, maxCache, plugin.Log, plugin.CacheHeap, true); } /// <summary> @@ -491,16 +492,16 @@ namespace VNLib.Data.Caching.ObjectCache.Server private sealed class CacheStore : ICacheStore { - private readonly ObjectCacheStore _cache; + private readonly BlobCacheLIstener _cache; - public CacheStore(ObjectCacheStore cache) + public CacheStore(BlobCacheLIstener cache) { _cache = cache; } ValueTask ICacheStore.AddOrUpdateBlobAsync<T>(string objectId, string? alternateId, GetBodyDataCallback<T> bodyData, T state, CancellationToken token) { - return _cache.AddOrUpdateBlobAsync(objectId, alternateId, bodyData, state, token); + return _cache.Cache.AddOrUpdateObjectAsync(objectId, alternateId, bodyData, state, default, token); } void ICacheStore.Clear() @@ -510,7 +511,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server ValueTask<bool> ICacheStore.DeleteItemAsync(string id, CancellationToken token) { - return _cache.DeleteItemAsync(id, token); + return _cache.Cache.DeleteObjectAsync(id, token); } } } diff --git a/plugins/ObjectCacheServer/src/Endpoints/ICacheStore.cs b/plugins/ObjectCacheServer/src/Endpoints/ICacheStore.cs index 3776269..f911af9 100644 --- a/plugins/ObjectCacheServer/src/Endpoints/ICacheStore.cs +++ b/plugins/ObjectCacheServer/src/Endpoints/ICacheStore.cs @@ -1,11 +1,11 @@ /* -* Copyright (c) 2022 Vaughn Nugent +* Copyright (c) 2023 Vaughn Nugent * * Library: VNLib * Package: ObjectCacheServer -* File: ConnectEndpoint.cs +* File: ICacheStore.cs * -* ConnectEndpoint.cs is part of ObjectCacheServer which is part of the larger +* ICacheStore.cs is part of ObjectCacheServer which is part of the larger * VNLib collection of libraries and utilities. * * ObjectCacheServer is free software: you can redistribute it and/or modify diff --git a/plugins/ObjectCacheServer/src/ObjectCacheServer.csproj b/plugins/ObjectCacheServer/src/ObjectCacheServer.csproj index 38f5b97..3beb168 100644 --- a/plugins/ObjectCacheServer/src/ObjectCacheServer.csproj +++ b/plugins/ObjectCacheServer/src/ObjectCacheServer.csproj @@ -1,23 +1,30 @@ <Project Sdk="Microsoft.NET.Sdk"> <PropertyGroup> - <TargetFramework>net6.0</TargetFramework> - <Nullable>enable</Nullable> + <TargetFramework>net6.0</TargetFramework> <RootNamespace>VNLib.Data.Caching.ObjectCache.Server</RootNamespace> - <Version>1.0.1.1</Version> - <SignAssembly>True</SignAssembly> - <AssemblyOriginatorKeyFile>\\vaughnnugent.com\Internal\Folder Redirection\vman\Documents\Programming\Software\StrongNameingKey.snk</AssemblyOriginatorKeyFile> - <EnableDynamicLoading>true</EnableDynamicLoading> + <AssemblyName>ObjectCacheServer</AssemblyName> + <Nullable>enable</Nullable> <AnalysisLevel>latest-all</AnalysisLevel> <ProduceReferenceAssembly>True</ProduceReferenceAssembly> <GenerateDocumentationFile>False</GenerateDocumentationFile> + <!-- Resolve nuget dll files and store them in the output dir --> + <EnableDynamicLoading>true</EnableDynamicLoading> </PropertyGroup> - - <!-- Resolve nuget dll files and store them in the output dir --> + <PropertyGroup> <Authors>Vaughn Nugent</Authors> + <Company>Vaughn Nugent</Company> + <Product>VNLib.Data.Caching.ObjectCache.Server</Product> + <PackageId>VNLib.Data.Caching.ObjectCache.Server</PackageId> + <Description> + An Essentials framework plugin for implementing a distributed in-memory data caching server disoverable in cache brokers. It provides + automatic live replication between cache server nodes. + </Description> <Copyright>Copyright © 2023 Vaughn Nugent</Copyright> - <PackageProjectUrl>https://www.vaughnnugent.com/resources/software</PackageProjectUrl> + <PackageProjectUrl>https://www.vaughnnugent.com/resources/software/modules/VNLib.Data.Caching</PackageProjectUrl> + <RepositoryUrl>https://github.com/VnUgE/VNLib.Data.Caching/tree/master/plugins/ObjectCacheServer</RepositoryUrl> </PropertyGroup> + <ItemGroup> <PackageReference Include="ErrorProne.NET.CoreAnalyzers" Version="0.1.2"> <PrivateAssets>all</PrivateAssets> diff --git a/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs b/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs index d6dbd9b..0171064 100644 --- a/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs +++ b/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2022 Vaughn Nugent +* Copyright (c) 2023 Vaughn Nugent * * Library: VNLib * Package: ObjectCacheServer @@ -111,10 +111,9 @@ namespace VNLib.Data.Caching.ObjectCache.Server { try { - IReadOnlyDictionary<string, JsonElement> clusterConf = this.GetConfig("cluster"); + IConfigScope clusterConf = this.GetConfig("cluster"); - Uri brokerAddress = new(clusterConf["broker_address"].GetString() ?? throw new KeyNotFoundException("Missing required key 'broker_address' for config 'cluster'")); - + Uri brokerAddress = new(clusterConf["broker_address"].GetString() ?? throw new KeyNotFoundException("Missing required key 'broker_address' for config 'cluster'")); //Init connect endpoint ConnectEndpoint endpoint = this.Route<ConnectEndpoint>(); @@ -122,27 +121,30 @@ namespace VNLib.Data.Caching.ObjectCache.Server //Get the cache store from the connection endpoint ICacheStore store = endpoint.GetCacheStore(); + ulong maxByteSize = ((ulong)endpoint.CacheConfig.MaxCacheEntries * (ulong)endpoint.CacheConfig.BucketCount * (ulong)endpoint.CacheConfig.MaxMessageSize); + //Log max memory usage - Log.Debug("Maxium memory consumption {mx}Mb", ((ulong)endpoint.CacheConfig.MaxCacheEntries * (ulong)endpoint.CacheConfig.MaxMessageSize) / (ulong)(1024 * 1000)); + Log.Debug("Maxium memory consumption {mx}Mb", maxByteSize / (ulong)(1024 * 1000)); //Setup broker and regitration { - //Route the broker endpoint BrokerHeartBeat brokerEp = new(() => BrokerHeartBeatToken!, BrokerSyncHandle, brokerAddress, this); Route(brokerEp); //start registration - _ = this.ObserveTask(() => RegisterServerAsync(endpoint.Path), 200); + _ = this.ObserveWork(() => RegisterServerAsync(endpoint.Path), 200); } //Setup cluster worker { + TimeSpan timeout = TimeSpan.FromSeconds(10); + //Get pre-configured fbm client config for caching - ClientConfig = FBMDataCacheExtensions.GetDefaultConfig(CacheHeap, endpoint.CacheConfig.MaxMessageSize / 2, this.IsDebug() ? Log : null); + ClientConfig = FBMDataCacheExtensions.GetDefaultConfig(CacheHeap, endpoint.CacheConfig.MaxMessageSize / 2, timeout, this.IsDebug() ? Log : null); //Start Client runner - _ = this.ObserveTask(() => RunClientAsync(store, brokerAddress), 300); + _ = this.ObserveWork(() => RunClientAsync(store, brokerAddress), 300); } //Load a cache broker to the current server if the config is defined @@ -182,7 +184,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server try { //Get the broker config element - IReadOnlyDictionary<string, JsonElement> clusterConfig = this.GetConfig("cluster"); + IConfigScope clusterConfig = this.GetConfig("cluster"); //Server id is just dns name for now string serverId = Dns.GetHostName(); @@ -226,6 +228,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server { //Gen a random reg token before registering BrokerHeartBeatToken = RandomHash.GetRandomHex(32); + //Assign new hb token request.WithHeartbeatToken(BrokerHeartBeatToken); @@ -334,7 +337,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server try { //Get the broker config element - IReadOnlyDictionary<string, JsonElement> clusterConf = this.GetConfig("cluster"); + IConfigScope clusterConf = this.GetConfig("cluster"); int serverCheckMs = clusterConf["update_interval_sec"].GetInt32() * 1000; @@ -412,7 +415,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server ListeningServers.Add(server); //Run listener background task - _ = this.ObserveTask(() => RunSyncTaskAsync(server, cacheStore, nodeId)); + _ = this.ObserveWork(() => RunSyncTaskAsync(server, cacheStore, nodeId)); } } } @@ -431,6 +434,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server } catch (TaskCanceledException) { + //normal exit/unload } catch (Exception ex) { |