diff options
Diffstat (limited to 'lib/VNLib.Data.Caching.ObjectCache')
12 files changed, 1095 insertions, 496 deletions
diff --git a/lib/VNLib.Data.Caching.ObjectCache/src/BlobCache.cs b/lib/VNLib.Data.Caching.ObjectCache/src/BlobCache.cs index fbb2dcc..440981a 100644 --- a/lib/VNLib.Data.Caching.ObjectCache/src/BlobCache.cs +++ b/lib/VNLib.Data.Caching.ObjectCache/src/BlobCache.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2022 Vaughn Nugent +* Copyright (c) 2023 Vaughn Nugent * * Library: VNLib * Package: VNLib.Data.Caching.ObjectCache @@ -24,30 +24,38 @@ using System; using System.Collections.Generic; +using System.Diagnostics; +using VNLib.Utils.Memory; using VNLib.Utils.Memory.Caching; -namespace VNLib.Data.Caching +namespace VNLib.Data.Caching.ObjectCache { /// <summary> /// A general purpose binary data storage /// </summary> - public class BlobCache : LRUCache<string, CacheEntry> + public sealed class BlobCache : LRUCache<string, CacheEntry>, IBlobCache { + private bool disposedValue; + ///<inheritdoc/> public override bool IsReadOnly { get; } ///<inheritdoc/> protected override int MaxCapacity { get; } + + ///<inheritdoc/> + public IUnmangedHeap CacheHeap { get; } /// <summary> /// Initializes a new <see cref="BlobCache"/> store /// </summary> /// <param name="maxCapacity">The maximum number of items to keep in memory</param> + /// <param name="heap">The unmanaged heap used to allocate cache entry buffers from</param> /// <exception cref="ArgumentException"></exception> - public BlobCache(int maxCapacity) + public BlobCache(int maxCapacity, IUnmangedHeap heap) :base(StringComparer.Ordinal) { if(maxCapacity < 1) @@ -55,6 +63,8 @@ namespace VNLib.Data.Caching throw new ArgumentException("The maxium capacity of the store must be a positive integer larger than 0", nameof(maxCapacity)); } + CacheHeap = heap; + MaxCapacity = maxCapacity; //Update the lookup table size @@ -75,18 +85,11 @@ namespace VNLib.Data.Caching evicted.Value.Dispose(); } - /// <summary> - /// If the <see cref="CacheEntry"/> is found in the store, changes the key - /// that referrences the blob. - /// </summary> - /// <param name="currentKey">The key that currently referrences the blob in the store</param> - /// <param name="newKey">The new key that will referrence the blob</param> - /// <param name="blob">The <see cref="CacheEntry"/> if its found in the store</param> - /// <returns>True if the record was found and the key was changes</returns> - public bool TryChangeKey(string currentKey, string newKey, out CacheEntry blob) + ///<inheritdoc/> + public bool TryChangeKey(string objectId, string newId, out CacheEntry blob) { //Try to get the node at the current key - if (LookupTable.Remove(currentKey, out LinkedListNode<KeyValuePair<string, CacheEntry>> ? node)) + if (LookupTable.Remove(objectId, out LinkedListNode<KeyValuePair<string, CacheEntry>> ? node)) { //Remove the node from the ll List.Remove(node); @@ -95,13 +98,13 @@ namespace VNLib.Data.Caching blob = node.ValueRef.Value; //Update the - node.Value = new KeyValuePair<string, CacheEntry>(newKey, blob); + node.Value = new KeyValuePair<string, CacheEntry>(newId, blob); //Add to end of list List.AddLast(node); //Re-add to lookup table with new key - LookupTable.Add(newKey, node); + LookupTable.Add(newId, node); return true; } @@ -110,11 +113,7 @@ namespace VNLib.Data.Caching return false; } - /// <summary> - /// Removes the <see cref="CacheEntry"/> from the store, and frees its resources - /// </summary> - /// <param name="key">The key that referrences the <see cref="CacheEntry"/> in the store</param> - /// <returns>A value indicating if the blob was removed</returns> + ///<inheritdoc/> public override bool Remove(string key) { //Remove the item from the lookup table and if it exists, remove the node from the list @@ -129,6 +128,7 @@ namespace VNLib.Data.Caching //Remove the node from the list List.Remove(node); } + return true; } @@ -153,5 +153,44 @@ namespace VNLib.Data.Caching //empty all cache entires in the store base.Clear(); } + + ///<inheritdoc/> + public bool Remove(string objectId, out CacheEntry entry) + { + //Try to get the stored object + if(TryGetValue(objectId, out entry)) + { + //remove the entry and bypass the disposal + bool result = base.Remove(objectId); +#if DEBUG + Debug.Assert(result == true); +#endif + return true; + } + + entry = default; + return false; + } + + ///<inheritdoc/> + void Dispose(bool disposing) + { + if (!disposedValue) + { + if (disposing) + { + Clear(); + } + disposedValue = true; + } + } + + ///<inheritdoc/> + public void Dispose() + { + // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method + Dispose(disposing: true); + GC.SuppressFinalize(this); + } } } diff --git a/lib/VNLib.Data.Caching.ObjectCache/src/BlobCacheBucket.cs b/lib/VNLib.Data.Caching.ObjectCache/src/BlobCacheBucket.cs new file mode 100644 index 0000000..f79db3f --- /dev/null +++ b/lib/VNLib.Data.Caching.ObjectCache/src/BlobCacheBucket.cs @@ -0,0 +1,95 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Data.Caching.ObjectCache +* File: BlobCacheBucket.cs +* +* BlobCacheBucket.cs is part of VNLib.Data.Caching.ObjectCache which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Data.Caching.ObjectCache is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Data.Caching.ObjectCache is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System.Threading; +using System.Threading.Tasks; + +using VNLib.Utils.Memory; + +namespace VNLib.Data.Caching.ObjectCache +{ + public sealed class BlobCacheBucket : IBlobCacheBucket + { + private readonly IBlobCache _cacheTable; + private readonly SemaphoreSlim _lock; + + /// <summary> + /// Initialzies a new <see cref="BlobCacheBucket"/> and its underlying + /// <see cref="IBlobCache"/> + /// </summary> + /// <param name="bucketCapacity"> + /// The maxium number of entries allowed in the LRU cache + /// before LRU overflow happens. + /// </param> + /// <param name="heap">The heap to allocate object cache buffers</param> + public BlobCacheBucket(int bucketCapacity, IUnmangedHeap heap) + { + _lock = new(1, 1); + _cacheTable = new BlobCache(bucketCapacity, heap); + } + + ///<inheritdoc/> + public void Dispose() + { + _cacheTable.Dispose(); + _lock.Dispose(); + } + + ///<inheritdoc/> + public async ValueTask<IBlobCache> ManualWaitAsync(CancellationToken cancellation) + { + //try to enter the lock synchronously + if (_lock.Wait(0, CancellationToken.None)) + { + return _cacheTable; + } + else + { + await _lock.WaitAsync(cancellation).ConfigureAwait(false); + return _cacheTable; + } + } + + ///<inheritdoc/> + public void Release() + { + _lock.Release(); + } + + ///<inheritdoc/> + public async ValueTask<CacheBucketHandle> WaitAsync(CancellationToken cancellation) + { + //try to enter the lock synchronously + if (_lock.Wait(0, CancellationToken.None)) + { + return new(this, _cacheTable); + } + else + { + await _lock.WaitAsync(cancellation).ConfigureAwait(false); + return new(this, _cacheTable); + } + } + } +} diff --git a/lib/VNLib.Data.Caching.ObjectCache/src/BlobCacheExtensions.cs b/lib/VNLib.Data.Caching.ObjectCache/src/BlobCacheExtensions.cs new file mode 100644 index 0000000..4a8692d --- /dev/null +++ b/lib/VNLib.Data.Caching.ObjectCache/src/BlobCacheExtensions.cs @@ -0,0 +1,197 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Data.Caching.ObjectCache +* File: BlobCacheExtensions.cs +* +* BlobCacheExtensions.cs is part of VNLib.Data.Caching.ObjectCache which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Data.Caching.ObjectCache is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Data.Caching.ObjectCache is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + + +using System; +using System.Threading; +using System.Threading.Tasks; + +namespace VNLib.Data.Caching.ObjectCache +{ + /// <summary> + /// Provides blob cache extension methods + /// </summary> + public static class BlobCacheExtensions + { + internal static CacheEntry CreateEntry(this IBlobCache cache, string objectId, ReadOnlySpan<byte> initialData, DateTime time) + { + CacheEntry entry = CacheEntry.Create(initialData, cache.CacheHeap); + try + { + //try to add the entry, but if exists, let it throw + cache.Add(objectId, entry); + entry.SetTime(time); + return entry; + } + catch + { + entry.Dispose(); + throw; + } + } + + internal static CacheEntry AddOrUpdateEntry(this IBlobCache cache, string objectId, ReadOnlySpan<byte> data, DateTime time) + { + //See if blob exists + if (cache.TryGetValue(objectId, out CacheEntry entry)) + { + //Update the entry since it exists + entry.UpdateData(data); + + entry.SetTime(time); + } + else + { + //Create the new entry + entry = cache.CreateEntry(objectId, data, time); + } + + return entry; + } + + internal static CacheEntry TryChangeKey(this IBlobCache cache, string objectId, string alternateId, ReadOnlySpan<byte> data, DateTime time) + { + //Change the key of the blob item and update its data + if (cache.TryChangeKey(objectId, alternateId, out CacheEntry entry)) + { + //If date is 0 length do not overwrite the old entry if found + if (data.IsEmpty) + { + return entry; + } + + //Otherwise update the entry + entry.UpdateData(data); + entry.SetTime(time); + + return entry; + } + else + { + //entry does not exist at the old id, so we can create a new one at the alternate id + return cache.CreateEntry(objectId, data, time); + } + } + + + /// <summary> + /// Asynchronously adds or updates an object in the store and optionally update's it's id + /// </summary> + /// <param name="table"></param> + /// <param name="objectId">The current (or old) id of the object</param> + /// <param name="alternateId">An optional id to update the blob to</param> + /// <param name="bodyData">A callback that returns the data for the blob</param> + /// <param name="state">The state parameter to pass to the data callback</param> + /// <param name="time">The time to set on the cache record</param> + /// <param name="cancellation">A token to cancel the async operation</param> + /// <returns>A value task that represents the async operation</returns> + public static async ValueTask AddOrUpdateObjectAsync<T>( + this IBlobCacheTable table, + string objectId, + string? alternateId, + GetBodyDataCallback<T> bodyData, + T state, + DateTime time, + CancellationToken cancellation = default) + { + //See if an id change is required + if (string.IsNullOrWhiteSpace(alternateId)) + { + //safe to get the bucket for the primary id + IBlobCacheBucket bucket = table.GetBucket(objectId); + + //Wait for the bucket + using CacheBucketHandle handle = await bucket.WaitAsync(cancellation); + + //add/update for single entity + _ = handle.Cache.AddOrUpdateEntry(objectId, bodyData(state), time); + } + else + { + //Buckets for each id need to be obtained + IBlobCacheBucket primary = table.GetBucket(objectId); + IBlobCacheBucket alternate = table.GetBucket(alternateId); + + //Same bucket + if (ReferenceEquals(primary, alternate)) + { + //wait for lock on only one bucket otherwise dealock + using CacheBucketHandle handle = await primary.WaitAsync(cancellation); + + //Update the entry for the single bucket + _ = handle.Cache.TryChangeKey(objectId, alternateId, bodyData(state), time); + } + else + { + //Buckets are different must be awaited individually + using CacheBucketHandle primaryHandle = await primary.WaitAsync(cancellation); + using CacheBucketHandle alternateHandle = await alternate.WaitAsync(cancellation); + + //Get the entry from the primary hande + if (primaryHandle.Cache.Remove(objectId, out CacheEntry entry)) + { + try + { + //Update the handle data and reuse the entry + entry.UpdateData(bodyData(state)); + + //Add the updated entry to the alternate table + alternateHandle.Cache.Add(alternateId, entry); + } + catch + { + //Cleanup handle if error adding + entry.Dispose(); + throw; + } + } + else + { + //Old entry did not exist, we need to create a new entry for the alternate bucket + _ = alternateHandle.Cache.CreateEntry(alternateId, bodyData(state), time); + } + } + } + } + + /// <summary> + /// Asynchronously deletes a previously stored item + /// </summary> + /// <param name="table"></param> + /// <param name="objectId">The id of the object to delete</param> + /// <param name="cancellation">A token to cancel the async lock await</param> + /// <returns>A task that completes when the item has been deleted</returns> + public static async ValueTask<bool> DeleteObjectAsync(this IBlobCacheTable table, string objectId, CancellationToken cancellation = default) + { + //Try to get the bucket that the id should belong to + IBlobCacheBucket bucket = table.GetBucket(objectId); + + //Wait for lock on bucket async + using CacheBucketHandle handle = await bucket.WaitAsync(cancellation); + + //Remove the object from the blob store + return handle.Cache.Remove(objectId); + } + } +} diff --git a/lib/VNLib.Data.Caching.ObjectCache/src/BlobCacheLIstener.cs b/lib/VNLib.Data.Caching.ObjectCache/src/BlobCacheLIstener.cs new file mode 100644 index 0000000..818dfcf --- /dev/null +++ b/lib/VNLib.Data.Caching.ObjectCache/src/BlobCacheLIstener.cs @@ -0,0 +1,285 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Data.Caching.ObjectCache +* File: BlobCacheLIstener.cs +* +* BlobCacheLIstener.cs is part of VNLib.Data.Caching.ObjectCache which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Data.Caching.ObjectCache is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Data.Caching.ObjectCache is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + + +/* + * The latest bucket based cache store relys on bucket level locking + * to distribute locking across threads and reduce contention. + * + * This design relys on holding the bucket lock for the entire duration + * of the CacheEntry manipulation, its id, movment, and reading/writing + * the entirie's contents. + * + * Some drawbacks are the basics with key-derrived bucket systems: + * bucket imbalance due to key distribtion. + * + * Design perfers average speed, but will need to be tested heavily per + * use-case. + */ + +using System; +using System.Threading; +using System.Threading.Tasks; + +using VNLib.Utils.Async; +using VNLib.Utils.Memory; +using VNLib.Utils.Logging; +using VNLib.Net.Messaging.FBM.Server; +using static VNLib.Data.Caching.Constants; + +namespace VNLib.Data.Caching.ObjectCache +{ + public delegate ReadOnlySpan<byte> GetBodyDataCallback<T>(T state); + + /// <summary> + /// A <see cref="FBMListener"/> implementation of a <see cref="CacheListener"/> + /// </summary> + public class BlobCacheLIstener : FBMListenerBase, IDisposable + { + private bool disposedValue; + + ///<inheritdoc/> + protected override ILogProvider Log { get; } + + /// <summary> + /// A queue that stores update and delete events + /// </summary> + public AsyncQueue<ChangeEvent> EventQueue { get; } + + /// <summary> + /// The Cache store to access data blobs + /// </summary> + public IBlobCacheTable Cache { get; } + + + /// <summary> + /// Initialzies a new <see cref="BlobCacheLIstener"/> + /// </summary> + /// <param name="cacheMax">The maxium number of items per bucket</param> + /// <param name="buckets">The number of cache store buckets</param> + /// <param name="log"></param> + /// <param name="heap">The heap to alloc FBM buffers and <see cref="CacheEntry"/> cache buffers from</param> + /// <param name="singleReader">A value that indicates if a single thread is processing events</param> + public BlobCacheLIstener(uint buckets, uint cacheMax, ILogProvider log, IUnmangedHeap heap, bool singleReader) + { + Log = log; + + //Writes may happen from multple threads with bucket design and no lock + EventQueue = new(false, singleReader); + + Cache = new BlobCacheTable(buckets, cacheMax, heap); + InitListener(heap); + } + + ///<inheritdoc/> + protected override async Task ProcessAsync(FBMContext context, object? userState, CancellationToken exitToken) + { + try + { + //Get the action header + string action = context.Method(); + + //Optional newid header + string? alternateId = context.NewObjectId(); + + switch (action) + { + case Actions.Get: + { + //Get the object-id header + string objectId = context.ObjectId(); + + //Process read + await ReadEntryAsync(context, objectId, exitToken); + return; + } + case Actions.AddOrUpdate: + { + //Get the object-id header + string objectId = context.ObjectId(); + + //Create change event for the object + ChangeEvent change = new(objectId, alternateId, false); + + await AddOrUpdateAsync(context, change, exitToken); + return; + } + case Actions.Delete: + { + //Get the object-id header + string objectId = context.ObjectId(); + + //Create change event + ChangeEvent change = new(objectId, alternateId, true); + + await DeleteEntryAsync(context, change, exitToken); + return; + } + // event queue dequeue request + case Actions.Dequeue: + { + static void SetResponse(ChangeEvent change, FBMContext context) + { + if (change.Deleted) + { + context.CloseResponse("deleted"); + context.Response.WriteHeader(ObjectId, change.CurrentId); + } + else + { + //Changed + context.CloseResponse("modified"); + context.Response.WriteHeader(ObjectId, change.CurrentId); + + //Set old id if an old id is set + if (change.CurrentId != null) + { + context.Response.WriteHeader(NewObjectId, change.AlternateId); + } + } + } + + static async Task DequeAsync(AsyncQueue<ChangeEvent> queue, FBMContext context, CancellationToken exitToken) + { + //Wait for a new message to process + ChangeEvent ev = await queue.DequeueAsync(exitToken); + + //Set the response + SetResponse(ev, context); + } + + //If no event bus is registered, then this is not a legal command + if (userState is not AsyncQueue<ChangeEvent> eventBus) + { + context.CloseResponse(ResponseCodes.NotFound); + + return; + } + + //try to deq without awaiting + if (eventBus.TryDequeue(out ChangeEvent? change)) + { + SetResponse(change, context); + } + else + { + //Process async + await DequeAsync(eventBus, context, exitToken); + } + + return; + } + + } + + Log.Error("Unhandled cache event for session {id}", context.Request.ConnectionId); + context.CloseResponse(ResponseCodes.Error); + } + catch (OperationCanceledException) + { + throw; + } + catch(Exception ex) + { + //Log error and set error status code + Log.Error(ex); + context.CloseResponse(ResponseCodes.Error); + } + } + + private async ValueTask ReadEntryAsync(FBMContext context, string objectId, CancellationToken cancellation) + { + //Try to get the bucket that the id should belong to + IBlobCacheBucket bucket = Cache.GetBucket(objectId); + + //Wait for lock on bucket async + using CacheBucketHandle handle = await bucket.WaitAsync(cancellation); + + if (handle.Cache.TryGetValue(objectId, out CacheEntry data)) + { + //Set the status code and write the buffered data to the response buffer + context.CloseResponse(ResponseCodes.Okay); + + //Copy data to response buffer + context.Response.WriteBody(data.GetDataSegment()); + } + else + { + context.CloseResponse(ResponseCodes.NotFound); + } + } + + private async ValueTask DeleteEntryAsync(FBMContext context, ChangeEvent change, CancellationToken cancellation) + { + //Remove the object from the blob store + bool found = await Cache.DeleteObjectAsync(change.CurrentId, cancellation); + + context.CloseResponse(found ? ResponseCodes.Okay : ResponseCodes.NotFound); + + //Enque change if item was successfully deleted + if (found) + { + EnqueEvent(change); + } + } + + private async ValueTask AddOrUpdateAsync(FBMContext context, ChangeEvent change, CancellationToken cancellation) + { + //Run add/update and get the valuetask + await Cache.AddOrUpdateObjectAsync(change.CurrentId, change.AlternateId, static r => r.BodyData, context.Request, default, cancellation); + + EnqueEvent(change); + + context.CloseResponse(ResponseCodes.Okay); + } + + private void EnqueEvent(ChangeEvent change) + { + if (!EventQueue.TryEnque(change)) + { + Log.Warn("Change event {ev} was not enqued because the event queue is overflowing!", change.CurrentId); + } + } + + + ///<inheritdoc/> + protected virtual void Dispose(bool disposing) + { + if (!disposedValue) + { + Cache.Dispose(); + + disposedValue = true; + } + } + + ///<inheritdoc/> + public void Dispose() + { + // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method + Dispose(disposing: true); + GC.SuppressFinalize(this); + } + } +} diff --git a/lib/VNLib.Data.Caching.ObjectCache/src/BlobCacheTable.cs b/lib/VNLib.Data.Caching.ObjectCache/src/BlobCacheTable.cs new file mode 100644 index 0000000..270cf1e --- /dev/null +++ b/lib/VNLib.Data.Caching.ObjectCache/src/BlobCacheTable.cs @@ -0,0 +1,139 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Data.Caching.ObjectCache +* File: BlobCacheTable.cs +* +* BlobCacheTable.cs is part of VNLib.Data.Caching.ObjectCache which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Data.Caching.ObjectCache is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Data.Caching.ObjectCache is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Linq; +using System.Collections; +using System.Collections.Generic; + +using VNLib.Utils; +using VNLib.Utils.Memory; + +namespace VNLib.Data.Caching.ObjectCache +{ + /// <summary> + /// A concrete implementation of a <see cref="IBlobCacheTable"/> + /// </summary> + public sealed class BlobCacheTable : VnDisposeable, IBlobCacheTable + { + private readonly uint _tableSize; + private readonly IBlobCacheBucket[] _buckets; + + /// <summary> + /// Initializes a new <see cref="BlobCacheTable"/> + /// </summary> + /// <param name="bucketSize">The number of elements in each bucket</param> + /// <param name="tableSize">The number of buckets within the table</param> + /// <param name="heap">The heap used to allocate cache entry buffers from</param> + /// <exception cref="ArgumentNullException"></exception> + /// <exception cref="ArgumentException"></exception> + public BlobCacheTable(uint tableSize, uint bucketSize, IUnmangedHeap heap) + { + _ = heap ?? throw new ArgumentNullException(nameof(heap)); + + if(tableSize == 0) + { + throw new ArgumentException("Cache table must have atleast 1 bucket"); + } + + //Init bucket table + _tableSize = tableSize; + _buckets = new IBlobCacheBucket[tableSize]; + + //Init buckets + InitBuckets(tableSize, bucketSize, _buckets, heap); + } + + + private static void InitBuckets(uint size, uint bucketSize, IBlobCacheBucket[] table, IUnmangedHeap heap) + { + for(int i = 0; i < size; i++) + { + table[i] = new BlobCacheBucket((int)bucketSize, heap); + } + } + + /* + * A very simple algorithm that captures unique values + * from an object id and builds an unsigned 32bit integer + * used to determine the bucked index within the table. + * + * This method will alawys result in the same index for + * for a given object-id + */ + + private uint FastGetBucketIndexFromId(ReadOnlySpan<char> objectId) + { + if (objectId.Length < 4) + { + throw new ArgumentException("Object id must be larger than 3 characters"); + } + + Span<byte> buffer = stackalloc byte[4]; + + //cast the characters + buffer[0] = (byte)objectId[0]; + buffer[1] = (byte)objectId[objectId.Length / 2]; + buffer[2] = (byte)objectId[1]; + buffer[3] = (byte)objectId[^1]; + + //Read the buffer back to a uint and mod by the table size to get the bucket index + return BitConverter.ToUInt32(buffer) % _tableSize; + } + + + ///<inheritdoc/> + ///<exception cref="ObjectDisposedException"></exception> + public IBlobCacheBucket GetBucket(ReadOnlySpan<char> objectId) + { + Check(); + + //If tablesize is 1, skip lookup, otherwise perform bucket index lookup + uint index = _tableSize == 1 ? 0 : FastGetBucketIndexFromId(objectId); + + return _buckets[index]; + } + + ///<inheritdoc/> + protected sealed override void Free() + { + //Dispose buckets + Array.ForEach(_buckets, static b => b.Dispose()); + } + + ///<inheritdoc/> + public IEnumerator<IBlobCacheBucket> GetEnumerator() + { + Check(); + return _buckets.AsEnumerable().GetEnumerator(); + } + + ///<inheritdoc/> + IEnumerator IEnumerable.GetEnumerator() + { + Check(); + return _buckets.AsEnumerable().GetEnumerator(); + } + } +} diff --git a/lib/VNLib.Data.Caching.ObjectCache/src/CacheBucketHandle.cs b/lib/VNLib.Data.Caching.ObjectCache/src/CacheBucketHandle.cs new file mode 100644 index 0000000..f9c1f17 --- /dev/null +++ b/lib/VNLib.Data.Caching.ObjectCache/src/CacheBucketHandle.cs @@ -0,0 +1,111 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Data.Caching.ObjectCache +* File: CacheBucketHandle.cs +* +* CacheBucketHandle.cs is part of VNLib.Data.Caching.ObjectCache which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Data.Caching.ObjectCache is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Data.Caching.ObjectCache is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Diagnostics.CodeAnalysis; + +namespace VNLib.Data.Caching.ObjectCache +{ + /// <summary> + /// Holds an exclusive lock on a <see cref="IBlobCacheBucket"/>, and exposes + /// acess to its internal <see cref="IBlobCache"/> + /// </summary> + public readonly struct CacheBucketHandle : IDisposable, IEquatable<CacheBucketHandle> + { + private readonly IBlobCacheBucket? _bucket; + + /// <summary> + /// The <see cref="IBlobCache"/> held by the current handle + /// </summary> + public readonly IBlobCache Cache { get; } + + /// <summary> + /// Initializes an empty blobcache handle + /// </summary> + public CacheBucketHandle() + { + _bucket = null; + Cache = null!; + } + + /// <summary> + /// Creates a new bucket lock handle to be released on dispose + /// </summary> + /// <param name="bucket">The bucket to release access to on dispose</param> + /// <param name="cache">The underlying <see cref="IBlobCache"/> provide exclusive access to</param> + public CacheBucketHandle(IBlobCacheBucket bucket, IBlobCache cache) + { + _bucket = bucket; + Cache = cache; + } + + /// <summary> + /// Releases the exlusive lock held on the bucket + /// </summary> + public void Dispose() + { + //Release the bucket when disposed + _bucket?.Release(); + } + + /// <summary> + /// Determines if the other handle instance is equal to the current. Handles are + /// equal iff the underlying bucket referrence is equal. + /// </summary> + /// <param name="other">The other handle to compare</param> + /// <returns>True if the handles hold a referrence to the same bucket</returns> + public bool Equals(CacheBucketHandle other) => _bucket?.Equals(other._bucket) ?? false; + /// <summary> + /// Determines if the other handle instance is equal to the current. Handles are + /// equal iff the underlying bucket referrence is equal. + /// </summary> + /// <param name="obj">The other handle to compare</param> + /// <returns>True if the handles hold a referrence to the same bucket</returns> + public override bool Equals([NotNullWhen(true)] object? obj) => obj is CacheBucketHandle other && Equals(other); + + /// <summary> + /// Gets the hashcode of the underlying bucket + /// </summary> + /// <returns></returns> + public override int GetHashCode() => _bucket?.GetHashCode() ?? -1; + + /// <summary> + /// Determines if the handles are equal by the <see cref="Equals(CacheBucketHandle)"/> + /// method. + /// </summary> + /// <param name="left"></param> + /// <param name="right"></param> + /// <returns>True if the internal bucket references are equal</returns> + public static bool operator ==(CacheBucketHandle left, CacheBucketHandle right) => left.Equals(right); + + /// <summary> + /// Determines if the handles are equal by the <see cref="Equals(CacheBucketHandle)"/> + /// method. + /// </summary> + /// <param name="left"></param> + /// <param name="right"></param> + /// <returns>True if the internal bucket references are NOT equal</returns> + public static bool operator !=(CacheBucketHandle left, CacheBucketHandle right) => !(left == right); + } +} diff --git a/lib/VNLib.Data.Caching.ObjectCache/src/CacheEntry.cs b/lib/VNLib.Data.Caching.ObjectCache/src/CacheEntry.cs index 8644d1d..3d61790 100644 --- a/lib/VNLib.Data.Caching.ObjectCache/src/CacheEntry.cs +++ b/lib/VNLib.Data.Caching.ObjectCache/src/CacheEntry.cs @@ -32,7 +32,8 @@ using VNLib.Utils.Extensions; namespace VNLib.Data.Caching { /// <summary> - /// A structure that represents an item in cache + /// A structure that represents an item in cache. It contains the binary content + /// of a cache entry by its internal memory handle /// </summary> public readonly struct CacheEntry : IDisposable, IEquatable<CacheEntry> { @@ -89,7 +90,7 @@ namespace VNLib.Data.Caching ///<inheritdoc/> - public readonly void Dispose() => _handle.Dispose(); + public readonly void Dispose() => _handle?.Dispose(); [MethodImpl(MethodImplOptions.AggressiveInlining)] @@ -116,7 +117,7 @@ namespace VNLib.Data.Caching /// </summary> /// <returns>The last date stored</returns> /// <exception cref="ObjectDisposedException"></exception> - public readonly DateTime GetCreatedTime() + public readonly DateTime GetTime() { //Get the time segment and write the value in big endian ReadOnlySpan<byte> segment = GetTimeSegment(); diff --git a/lib/VNLib.Data.Caching.ObjectCache/src/IBlobCache.cs b/lib/VNLib.Data.Caching.ObjectCache/src/IBlobCache.cs new file mode 100644 index 0000000..52d53ff --- /dev/null +++ b/lib/VNLib.Data.Caching.ObjectCache/src/IBlobCache.cs @@ -0,0 +1,87 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Data.Caching.ObjectCache +* File: IBlobCache.cs +* +* IBlobCache.cs is part of VNLib.Data.Caching.ObjectCache which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Data.Caching.ObjectCache is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Data.Caching.ObjectCache is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Collections.Generic; + +using VNLib.Utils.Memory; + +namespace VNLib.Data.Caching.ObjectCache +{ + /// <summary> + /// Represents a binary data cache store + /// </summary> + public interface IBlobCache : IEnumerable<KeyValuePair<string, CacheEntry>>, IDisposable + { + /// <summary> + /// The internal heap used to allocate <see cref="CacheEntry"/> buffers + /// </summary> + IUnmangedHeap CacheHeap { get; } + + /// <summary> + /// Attempts to retreive the entry at the given id. + /// </summary> + /// <param name="objectId">The id of the object to locate</param> + /// <param name="entry">The cache entry if found, default otherwise</param> + /// <returns>True if the entry was assigned</returns> + bool TryGetValue(string objectId, out CacheEntry entry); + + /// <summary> + /// Attempts to relocate the entry in the table by its new id. + /// </summary> + /// <param name="objectId">The original id of the entry to modify</param> + /// <param name="newId">The new id of the entry</param> + /// <param name="entry">The original entry if found, default otherwise</param> + /// <returns>True if the item was located and successfully updated, false if the operation failed</returns> + bool TryChangeKey(string objectId, string newId, out CacheEntry entry); + + /// <summary> + /// Adds the entry to the table by the id + /// </summary> + /// <param name="objectId"></param> + /// <param name="entry">The entry to store in the table</param> + void Add(string objectId, CacheEntry entry); + + /// <summary> + /// Attempts to remove the entry at the given id, and returns the + /// entry if located. + /// </summary> + /// <param name="objectId">The id of the entry to remove</param> + /// <param name="entry">The entry if found, default otherwise</param> + /// <returns>True if the entry existed in the store, false otherwise</returns> + /// <remarks> + /// NOTE: If the return value is true, the store no longer maintains the lifetime + /// of the returned <see cref="CacheEntry"/>. You must manually dispose the entry + /// to avoid memory leaks. + /// </remarks> + bool Remove(string objectId, out CacheEntry entry); + + /// <summary> + /// Attempts to remove the entry at the given id, and release its memory. + /// </summary> + /// <param name="objectId">The id of the entry to remove</param> + /// <returns>True if the entry was found and disposed</returns> + bool Remove(string objectId); + } +} diff --git a/lib/VNLib.Data.Caching.ObjectCache/src/IBlobCacheBucket.cs b/lib/VNLib.Data.Caching.ObjectCache/src/IBlobCacheBucket.cs new file mode 100644 index 0000000..4876c5f --- /dev/null +++ b/lib/VNLib.Data.Caching.ObjectCache/src/IBlobCacheBucket.cs @@ -0,0 +1,58 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Data.Caching.ObjectCache +* File: ObjectCacheStore.cs +* +* ObjectCacheStore.cs is part of VNLib.Data.Caching.ObjectCache which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Data.Caching.ObjectCache is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Data.Caching.ObjectCache is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Threading; +using System.Threading.Tasks; + +namespace VNLib.Data.Caching.ObjectCache +{ + /// <summary> + /// Represents a singe cache bucket that maintains cache concurrent <see cref="IBlobCache"/> + /// operations. + /// </summary> + public interface IBlobCacheBucket : IDisposable + { + /// <summary> + /// Gets a <see cref="CacheBucketHandle"/> that holds an exclusive lock + /// for the current bucekt and holds a referrence to the stored + /// <see cref="IBlobCache"/> + /// </summary> + /// <param name="cancellation">A token to cancel the wait operation</param> + /// <returns>A <see cref="CacheBucketHandle"/> that holds the <see cref="IBlobCache"/> referrence</returns> + ValueTask<CacheBucketHandle> WaitAsync(CancellationToken cancellation); + + /// <summary> + /// Allows for waiting for the cache directly, IE without receiving a lock handle + /// </summary> + /// <param name="cancellation"></param> + /// <returns>The underlying <see cref="IBlobCache"/> that now has exlcusive access</returns> + ValueTask<IBlobCache> ManualWaitAsync(CancellationToken cancellation); + + /// <summary> + /// Releases an exlcusive lock on the current bucket, DO NOT CALL BY USER CODE + /// </summary> + void Release(); + } +} diff --git a/lib/VNLib.Data.Caching.ObjectCache/src/IBlobCacheTable.cs b/lib/VNLib.Data.Caching.ObjectCache/src/IBlobCacheTable.cs new file mode 100644 index 0000000..d84aecf --- /dev/null +++ b/lib/VNLib.Data.Caching.ObjectCache/src/IBlobCacheTable.cs @@ -0,0 +1,43 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Data.Caching.ObjectCache +* File: IBlobCacheTable.cs +* +* IBlobCacheTable.cs is part of VNLib.Data.Caching.ObjectCache which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Data.Caching.ObjectCache is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Data.Caching.ObjectCache is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Collections.Generic; + +namespace VNLib.Data.Caching.ObjectCache +{ + /// <summary> + /// A table that contains a collection of blob cache buckets + /// for improved cache concurrency + /// </summary> + public interface IBlobCacheTable : IEnumerable<IBlobCacheBucket>, IDisposable + { + /// <summary> + /// Gets a bucket that should contain the object by its id + /// </summary> + /// <param name="objectId">The id of the object to get the bucket for</param> + /// <returns>The bucket that should contain the object</returns> + IBlobCacheBucket GetBucket(ReadOnlySpan<char> objectId); + } +} diff --git a/lib/VNLib.Data.Caching.ObjectCache/src/ObjectCacheStore.cs b/lib/VNLib.Data.Caching.ObjectCache/src/ObjectCacheStore.cs deleted file mode 100644 index af1e730..0000000 --- a/lib/VNLib.Data.Caching.ObjectCache/src/ObjectCacheStore.cs +++ /dev/null @@ -1,463 +0,0 @@ -/* -* Copyright (c) 2022 Vaughn Nugent -* -* Library: VNLib -* Package: VNLib.Data.Caching.ObjectCache -* File: ObjectCacheStore.cs -* -* ObjectCacheStore.cs is part of VNLib.Data.Caching.ObjectCache which is part of the larger -* VNLib collection of libraries and utilities. -* -* VNLib.Data.Caching.ObjectCache is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License as -* published by the Free Software Foundation, either version 3 of the -* License, or (at your option) any later version. -* -* VNLib.Data.Caching.ObjectCache is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see https://www.gnu.org/licenses/. -*/ - -using System; -using System.IO; -using System.Threading; -using System.Threading.Tasks; - -using VNLib.Utils.Async; -using VNLib.Utils.Memory; -using VNLib.Utils.Logging; -using VNLib.Utils.Extensions; -using VNLib.Net.Messaging.FBM.Server; -using static VNLib.Data.Caching.Constants; - - -#pragma warning disable CA1849 // Call async methods when in an async method - -namespace VNLib.Data.Caching.ObjectCache -{ - public delegate ReadOnlySpan<byte> GetBodyDataCallback<T>(T state); - - /// <summary> - /// A <see cref="FBMListener"/> implementation of a <see cref="CacheListener"/> - /// </summary> - public class ObjectCacheStore : FBMListenerBase, IDisposable - { - private readonly SemaphoreSlim StoreLock; - private bool disposedValue; - - ///<inheritdoc/> - protected override ILogProvider Log { get; } - - /// <summary> - /// A queue that stores update and delete events - /// </summary> - public AsyncQueue<ChangeEvent> EventQueue { get; } - - /// <summary> - /// The Cache store to access data blobs - /// </summary> - private readonly BlobCache Cache; - - private readonly IUnmangedHeap Heap; - - - /// <summary> - /// Initialzies a new <see cref="ObjectCacheStore"/> - /// </summary> - /// <param name="cacheMax"></param> - /// <param name="log"></param> - /// <param name="heap"></param> - /// <param name="singleReader">A value that indicates if a single thread is processing events</param> - public ObjectCacheStore(int cacheMax, ILogProvider log, IUnmangedHeap heap, bool singleReader) - { - Log = log; - //We can use a single writer and single reader in this context - EventQueue = new(true, singleReader); - Cache = new(cacheMax); - Heap = heap; - InitListener(heap); - StoreLock = new(1,1); - } - - ///<inheritdoc/> - protected override Task ProcessAsync(FBMContext context, object? userState, CancellationToken exitToken) - { - try - { - //Get the action header - string action = context.Method(); - - //Optional newid header - string? alternateId = context.NewObjectId(); - - switch (action) - { - case Actions.Get: - { - //Get the object-id header - string objectId = context.ObjectId(); - - //Try read sync - if (StoreLock.Wait(0)) - { - try - { - UnsafeReadEntry(context, objectId); - } - finally - { - StoreLock.Release(); - } - - return Task.CompletedTask; - } - else - { - //Read entry async - return InternalReadEntryAsync(context, objectId, exitToken); - } - } - - case Actions.AddOrUpdate: - { - //Get the object-id header - string objectId = context.ObjectId(); - - //Create change event for the object - ChangeEvent change = new(objectId, alternateId, false); - - //Attempt to aquire lock sync - if (StoreLock.Wait(0)) - { - //aquired sync - try - { - //Update the item - UnsafeAddOrUpdate(objectId, alternateId, GetBodyData, context); - } - finally - { - StoreLock.Release(); - } - - //Add to event queue - EnqueEvent(change); - - //Set status code - context.CloseResponse(ResponseCodes.Okay); - - return Task.CompletedTask; - } - else - { - //Lock will be awaited async and - return InternalAddOrUpdateAsync(context, change, exitToken); - } - } - case Actions.Delete: - { - //Get the object-id header - string objectId = context.ObjectId(); - - //Create change event - ChangeEvent change = new(objectId, alternateId, true); - - //See if lock can be entered without waiting - if (StoreLock.Wait(0)) - { - bool found = false; - - try - { - //Sync - found = UnsafeDeleteEntry(objectId); - } - finally - { - StoreLock.Release(); - } - - //Notify change - EnqueEvent(change); - - //Set status code if found - context.CloseResponse(found ? ResponseCodes.Okay : ResponseCodes.NotFound); - - return Task.CompletedTask; - } - else - { - //lock will yeild async - return InternalDeleteAsync(context, change, exitToken); - } - } - // event queue dequeue request - case Actions.Dequeue: - { - static void SetResponse(ChangeEvent change, FBMContext context) - { - if (change.Deleted) - { - context.CloseResponse("deleted"); - context.Response.WriteHeader(ObjectId, change.CurrentId); - } - else - { - //Changed - context.CloseResponse("modified"); - context.Response.WriteHeader(ObjectId, change.CurrentId); - - //Set old id if an old id is set - if (change.CurrentId != null) - { - context.Response.WriteHeader(NewObjectId, change.AlternateId); - } - } - } - - static async Task DequeAsync(AsyncQueue<ChangeEvent> queue, FBMContext context, CancellationToken exitToken) - { - //Wait for a new message to process - ChangeEvent ev = await queue.DequeueAsync(exitToken); - - //Set the response - SetResponse(ev, context); - } - - //If no event bus is registered, then this is not a legal command - if (userState is not AsyncQueue<ChangeEvent> eventBus) - { - context.CloseResponse(ResponseCodes.NotFound); - - return Task.CompletedTask; - } - - //try to deq without awaiting - if (eventBus.TryDequeue(out ChangeEvent? change)) - { - SetResponse(change, context); - - return Task.CompletedTask; - } - else - { - //Process async - return DequeAsync(eventBus, context, exitToken); - } - } - - } - - Log.Error("Unhandled cache event!"); - } - catch (OperationCanceledException) - { - throw; - } - catch(Exception ex) - { - //Log error and set error status code - Log.Error(ex); - context.CloseResponse(ResponseCodes.Error); - } - - return Task.CompletedTask; - } - - - private static ReadOnlySpan<byte> GetBodyData(FBMContext ctx) => ctx.Request.BodyData; - - private void EnqueEvent(ChangeEvent change) - { - if (!EventQueue.TryEnque(change)) - { - Log.Warn("Change event {ev} was not enqued because the event queue is overflowing!", change.CurrentId); - } - } - - private void UnsafeReadEntry(FBMContext context, string objectId) - { - if (Cache!.TryGetValue(objectId, out CacheEntry data)) - { - //Set the status code and write the buffered data to the response buffer - context.CloseResponse(ResponseCodes.Okay); - - //Copy data to response buffer - context.Response.WriteBody(data.GetDataSegment()); - } - else - { - context.CloseResponse(ResponseCodes.NotFound); - } - } - - async Task InternalReadEntryAsync(FBMContext context, string objectId, CancellationToken cancellation) - { - //enter lock async - using SemSlimReleaser rel = await StoreLock.GetReleaserAsync(cancellation); - - UnsafeReadEntry(context, objectId); - } - - private async Task InternalAddOrUpdateAsync(FBMContext context, ChangeEvent change, CancellationToken cancellation) - { - //Wait for lock since we know it will yeild async - using (SemSlimReleaser rel = await StoreLock.GetReleaserAsync(cancellation)) - { - UnsafeAddOrUpdate(change.CurrentId, change.AlternateId, GetBodyData, context); - } - - //Add to event queue - EnqueEvent(change); - - //Set status code - context.CloseResponse(ResponseCodes.Okay); - } - - private void UnsafeAddOrUpdate<T>(string objectId, string? alternateId, GetBodyDataCallback<T> bodyData, T state) - { - CacheEntry entry; - - //See if new/alt session id was specified - if (string.IsNullOrWhiteSpace(alternateId)) - { - //See if blob exists - if (!Cache!.TryGetValue(objectId, out entry)) - { - //Create the new cache entry since it does not exist - entry = CacheEntry.Create(bodyData(state), Heap); - - //Add to cache - Cache.Add(objectId, entry); - } - else - { - //Reset the buffer state - entry.UpdateData(bodyData(state)); - } - } - //Need to change the id of the record - else - { - //Try to change the blob key - if (!Cache!.TryChangeKey(objectId, alternateId, out entry)) - { - //Create the new cache entry since it does not exist - entry = CacheEntry.Create(bodyData(state), Heap); - - //Add to cache by its alternate id - Cache.Add(alternateId, entry); - } - else - { - //Reset the buffer state - entry.UpdateData(bodyData(state)); - } - } - - //Update modified time to current utc time - entry.SetTime(DateTime.UtcNow); - } - - private async Task InternalDeleteAsync(FBMContext context, ChangeEvent change, CancellationToken cancellation) - { - bool found = false; - - //enter the lock - using(SemSlimReleaser rel = await StoreLock.GetReleaserAsync(cancellation)) - { - //Sync - found = UnsafeDeleteEntry(change.CurrentId); - } - - //Notify change - EnqueEvent(change); - - //Set status code if found - context.CloseResponse(found ? ResponseCodes.Okay : ResponseCodes.NotFound); - } - - private bool UnsafeDeleteEntry(string id) => Cache!.Remove(id); - - - /// <summary> - /// Asynchronously adds or updates an object in the store and optionally update's its id - /// </summary> - /// <param name="objectId">The current (or old) id of the object</param> - /// <param name="alternateId">An optional id to update the blob to</param> - /// <param name="bodyData">A callback that returns the data for the blob</param> - /// <param name="state">The state parameter to pass to the data callback</param> - /// <param name="token">A token to cancel the async operation</param> - /// <returns>A value task that represents the async operation</returns> - public async ValueTask AddOrUpdateBlobAsync<T>(string objectId, string? alternateId, GetBodyDataCallback<T> bodyData, T state, CancellationToken token = default) - { - //Test the lock before waiting async - if (!StoreLock.Wait(0)) - { - //Wait async to avoid task alloc - await StoreLock.WaitAsync(token); - } - try - { - UnsafeAddOrUpdate(objectId, alternateId, bodyData, state); - } - finally - { - StoreLock.Release(); - } - } - - /// <summary> - /// Asynchronously deletes a previously stored item - /// </summary> - /// <param name="id">The id of the object to delete</param> - /// <param name="token">A token to cancel the async lock await</param> - /// <returns>A task that completes when the item has been deleted</returns> - public async ValueTask<bool> DeleteItemAsync(string id, CancellationToken token = default) - { - //Test the lock before waiting async - if (!StoreLock.Wait(0)) - { - //Wait async to avoid task alloc - await StoreLock.WaitAsync(token); - } - try - { - return UnsafeDeleteEntry(id); - } - finally - { - StoreLock.Release(); - } - } - - - ///<inheritdoc/> - protected virtual void Dispose(bool disposing) - { - if (!disposedValue) - { - if (disposing) - { - Cache?.Clear(); - } - - StoreLock.Dispose(); - - disposedValue = true; - } - } - - ///<inheritdoc/> - public void Dispose() - { - // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method - Dispose(disposing: true); - GC.SuppressFinalize(this); - } - } -} diff --git a/lib/VNLib.Data.Caching.ObjectCache/src/VNLib.Data.Caching.ObjectCache.csproj b/lib/VNLib.Data.Caching.ObjectCache/src/VNLib.Data.Caching.ObjectCache.csproj index 119622f..3d1af7f 100644 --- a/lib/VNLib.Data.Caching.ObjectCache/src/VNLib.Data.Caching.ObjectCache.csproj +++ b/lib/VNLib.Data.Caching.ObjectCache/src/VNLib.Data.Caching.ObjectCache.csproj @@ -2,18 +2,25 @@ <PropertyGroup> <TargetFramework>net6.0</TargetFramework> - - <Authors>Vaughn Nugent</Authors> - <Copyright>Copyright © 2023 Vaughn Nugent</Copyright> + <RootNamespace>VNLib.Data.Caching.ObjectCache</RootNamespace> + <AssemblyName>VNLib.Data.Caching.ObjectCache</AssemblyName> <Nullable>enable</Nullable> <GenerateDocumentationFile>True</GenerateDocumentationFile> - <PackageProjectUrl>https://www.vaughnnugent.com/resources</PackageProjectUrl> - <AssemblyVersion>1.0.0.1</AssemblyVersion> - <Version>1.0.1.1</Version> <AnalysisLevel>latest-all</AnalysisLevel> - <SignAssembly>True</SignAssembly> - <AssemblyOriginatorKeyFile>\\vaughnnugent.com\Internal\Folder Redirection\vman\Documents\Programming\Software\StrongNameingKey.snk</AssemblyOriginatorKeyFile> - <Description>Provides server-side object-cache related infrastructure</Description> + </PropertyGroup> + + <PropertyGroup> + <Authors>Vaughn Nugent</Authors> + <Company>Vaughn Nugent</Company> + <Product>VNLib.Data.Caching.ObjectCache</Product> + <PackageId>VNLib.Data.Caching.ObjectCache</PackageId> + <Description> + A library for a high-performance in-memory object-data caching, based on key-derrived cache buckets + for wait distribution. + </Description> + <Copyright>Copyright © 2023 Vaughn Nugent</Copyright> + <PackageProjectUrl>https://www.vaughnnugent.com/resources/software/modules/VNLib.Data.Caching</PackageProjectUrl> + <RepositoryUrl>https://github.com/VnUgE/VNLib.Data.Caching/tree/master/lib/VNLib.Data.Caching.ObjectCache</RepositoryUrl> </PropertyGroup> <ItemGroup> |