aboutsummaryrefslogtreecommitdiff
path: root/lib/VNLib.Data.Caching.ObjectCache
diff options
context:
space:
mode:
Diffstat (limited to 'lib/VNLib.Data.Caching.ObjectCache')
-rw-r--r--lib/VNLib.Data.Caching.ObjectCache/src/BlobCache.cs81
-rw-r--r--lib/VNLib.Data.Caching.ObjectCache/src/BlobCacheBucket.cs95
-rw-r--r--lib/VNLib.Data.Caching.ObjectCache/src/BlobCacheExtensions.cs197
-rw-r--r--lib/VNLib.Data.Caching.ObjectCache/src/BlobCacheLIstener.cs285
-rw-r--r--lib/VNLib.Data.Caching.ObjectCache/src/BlobCacheTable.cs139
-rw-r--r--lib/VNLib.Data.Caching.ObjectCache/src/CacheBucketHandle.cs111
-rw-r--r--lib/VNLib.Data.Caching.ObjectCache/src/CacheEntry.cs7
-rw-r--r--lib/VNLib.Data.Caching.ObjectCache/src/IBlobCache.cs87
-rw-r--r--lib/VNLib.Data.Caching.ObjectCache/src/IBlobCacheBucket.cs58
-rw-r--r--lib/VNLib.Data.Caching.ObjectCache/src/IBlobCacheTable.cs43
-rw-r--r--lib/VNLib.Data.Caching.ObjectCache/src/ObjectCacheStore.cs463
-rw-r--r--lib/VNLib.Data.Caching.ObjectCache/src/VNLib.Data.Caching.ObjectCache.csproj25
12 files changed, 1095 insertions, 496 deletions
diff --git a/lib/VNLib.Data.Caching.ObjectCache/src/BlobCache.cs b/lib/VNLib.Data.Caching.ObjectCache/src/BlobCache.cs
index fbb2dcc..440981a 100644
--- a/lib/VNLib.Data.Caching.ObjectCache/src/BlobCache.cs
+++ b/lib/VNLib.Data.Caching.ObjectCache/src/BlobCache.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2022 Vaughn Nugent
+* Copyright (c) 2023 Vaughn Nugent
*
* Library: VNLib
* Package: VNLib.Data.Caching.ObjectCache
@@ -24,30 +24,38 @@
using System;
using System.Collections.Generic;
+using System.Diagnostics;
+using VNLib.Utils.Memory;
using VNLib.Utils.Memory.Caching;
-namespace VNLib.Data.Caching
+namespace VNLib.Data.Caching.ObjectCache
{
/// <summary>
/// A general purpose binary data storage
/// </summary>
- public class BlobCache : LRUCache<string, CacheEntry>
+ public sealed class BlobCache : LRUCache<string, CacheEntry>, IBlobCache
{
+ private bool disposedValue;
+
///<inheritdoc/>
public override bool IsReadOnly { get; }
///<inheritdoc/>
protected override int MaxCapacity { get; }
+
+ ///<inheritdoc/>
+ public IUnmangedHeap CacheHeap { get; }
/// <summary>
/// Initializes a new <see cref="BlobCache"/> store
/// </summary>
/// <param name="maxCapacity">The maximum number of items to keep in memory</param>
+ /// <param name="heap">The unmanaged heap used to allocate cache entry buffers from</param>
/// <exception cref="ArgumentException"></exception>
- public BlobCache(int maxCapacity)
+ public BlobCache(int maxCapacity, IUnmangedHeap heap)
:base(StringComparer.Ordinal)
{
if(maxCapacity < 1)
@@ -55,6 +63,8 @@ namespace VNLib.Data.Caching
throw new ArgumentException("The maxium capacity of the store must be a positive integer larger than 0", nameof(maxCapacity));
}
+ CacheHeap = heap;
+
MaxCapacity = maxCapacity;
//Update the lookup table size
@@ -75,18 +85,11 @@ namespace VNLib.Data.Caching
evicted.Value.Dispose();
}
- /// <summary>
- /// If the <see cref="CacheEntry"/> is found in the store, changes the key
- /// that referrences the blob.
- /// </summary>
- /// <param name="currentKey">The key that currently referrences the blob in the store</param>
- /// <param name="newKey">The new key that will referrence the blob</param>
- /// <param name="blob">The <see cref="CacheEntry"/> if its found in the store</param>
- /// <returns>True if the record was found and the key was changes</returns>
- public bool TryChangeKey(string currentKey, string newKey, out CacheEntry blob)
+ ///<inheritdoc/>
+ public bool TryChangeKey(string objectId, string newId, out CacheEntry blob)
{
//Try to get the node at the current key
- if (LookupTable.Remove(currentKey, out LinkedListNode<KeyValuePair<string, CacheEntry>> ? node))
+ if (LookupTable.Remove(objectId, out LinkedListNode<KeyValuePair<string, CacheEntry>> ? node))
{
//Remove the node from the ll
List.Remove(node);
@@ -95,13 +98,13 @@ namespace VNLib.Data.Caching
blob = node.ValueRef.Value;
//Update the
- node.Value = new KeyValuePair<string, CacheEntry>(newKey, blob);
+ node.Value = new KeyValuePair<string, CacheEntry>(newId, blob);
//Add to end of list
List.AddLast(node);
//Re-add to lookup table with new key
- LookupTable.Add(newKey, node);
+ LookupTable.Add(newId, node);
return true;
}
@@ -110,11 +113,7 @@ namespace VNLib.Data.Caching
return false;
}
- /// <summary>
- /// Removes the <see cref="CacheEntry"/> from the store, and frees its resources
- /// </summary>
- /// <param name="key">The key that referrences the <see cref="CacheEntry"/> in the store</param>
- /// <returns>A value indicating if the blob was removed</returns>
+ ///<inheritdoc/>
public override bool Remove(string key)
{
//Remove the item from the lookup table and if it exists, remove the node from the list
@@ -129,6 +128,7 @@ namespace VNLib.Data.Caching
//Remove the node from the list
List.Remove(node);
}
+
return true;
}
@@ -153,5 +153,44 @@ namespace VNLib.Data.Caching
//empty all cache entires in the store
base.Clear();
}
+
+ ///<inheritdoc/>
+ public bool Remove(string objectId, out CacheEntry entry)
+ {
+ //Try to get the stored object
+ if(TryGetValue(objectId, out entry))
+ {
+ //remove the entry and bypass the disposal
+ bool result = base.Remove(objectId);
+#if DEBUG
+ Debug.Assert(result == true);
+#endif
+ return true;
+ }
+
+ entry = default;
+ return false;
+ }
+
+ ///<inheritdoc/>
+ void Dispose(bool disposing)
+ {
+ if (!disposedValue)
+ {
+ if (disposing)
+ {
+ Clear();
+ }
+ disposedValue = true;
+ }
+ }
+
+ ///<inheritdoc/>
+ public void Dispose()
+ {
+ // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method
+ Dispose(disposing: true);
+ GC.SuppressFinalize(this);
+ }
}
}
diff --git a/lib/VNLib.Data.Caching.ObjectCache/src/BlobCacheBucket.cs b/lib/VNLib.Data.Caching.ObjectCache/src/BlobCacheBucket.cs
new file mode 100644
index 0000000..f79db3f
--- /dev/null
+++ b/lib/VNLib.Data.Caching.ObjectCache/src/BlobCacheBucket.cs
@@ -0,0 +1,95 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Data.Caching.ObjectCache
+* File: BlobCacheBucket.cs
+*
+* BlobCacheBucket.cs is part of VNLib.Data.Caching.ObjectCache which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* VNLib.Data.Caching.ObjectCache is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Data.Caching.ObjectCache is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System.Threading;
+using System.Threading.Tasks;
+
+using VNLib.Utils.Memory;
+
+namespace VNLib.Data.Caching.ObjectCache
+{
+ public sealed class BlobCacheBucket : IBlobCacheBucket
+ {
+ private readonly IBlobCache _cacheTable;
+ private readonly SemaphoreSlim _lock;
+
+ /// <summary>
+ /// Initialzies a new <see cref="BlobCacheBucket"/> and its underlying
+ /// <see cref="IBlobCache"/>
+ /// </summary>
+ /// <param name="bucketCapacity">
+ /// The maxium number of entries allowed in the LRU cache
+ /// before LRU overflow happens.
+ /// </param>
+ /// <param name="heap">The heap to allocate object cache buffers</param>
+ public BlobCacheBucket(int bucketCapacity, IUnmangedHeap heap)
+ {
+ _lock = new(1, 1);
+ _cacheTable = new BlobCache(bucketCapacity, heap);
+ }
+
+ ///<inheritdoc/>
+ public void Dispose()
+ {
+ _cacheTable.Dispose();
+ _lock.Dispose();
+ }
+
+ ///<inheritdoc/>
+ public async ValueTask<IBlobCache> ManualWaitAsync(CancellationToken cancellation)
+ {
+ //try to enter the lock synchronously
+ if (_lock.Wait(0, CancellationToken.None))
+ {
+ return _cacheTable;
+ }
+ else
+ {
+ await _lock.WaitAsync(cancellation).ConfigureAwait(false);
+ return _cacheTable;
+ }
+ }
+
+ ///<inheritdoc/>
+ public void Release()
+ {
+ _lock.Release();
+ }
+
+ ///<inheritdoc/>
+ public async ValueTask<CacheBucketHandle> WaitAsync(CancellationToken cancellation)
+ {
+ //try to enter the lock synchronously
+ if (_lock.Wait(0, CancellationToken.None))
+ {
+ return new(this, _cacheTable);
+ }
+ else
+ {
+ await _lock.WaitAsync(cancellation).ConfigureAwait(false);
+ return new(this, _cacheTable);
+ }
+ }
+ }
+}
diff --git a/lib/VNLib.Data.Caching.ObjectCache/src/BlobCacheExtensions.cs b/lib/VNLib.Data.Caching.ObjectCache/src/BlobCacheExtensions.cs
new file mode 100644
index 0000000..4a8692d
--- /dev/null
+++ b/lib/VNLib.Data.Caching.ObjectCache/src/BlobCacheExtensions.cs
@@ -0,0 +1,197 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Data.Caching.ObjectCache
+* File: BlobCacheExtensions.cs
+*
+* BlobCacheExtensions.cs is part of VNLib.Data.Caching.ObjectCache which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* VNLib.Data.Caching.ObjectCache is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Data.Caching.ObjectCache is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace VNLib.Data.Caching.ObjectCache
+{
+ /// <summary>
+ /// Provides blob cache extension methods
+ /// </summary>
+ public static class BlobCacheExtensions
+ {
+ internal static CacheEntry CreateEntry(this IBlobCache cache, string objectId, ReadOnlySpan<byte> initialData, DateTime time)
+ {
+ CacheEntry entry = CacheEntry.Create(initialData, cache.CacheHeap);
+ try
+ {
+ //try to add the entry, but if exists, let it throw
+ cache.Add(objectId, entry);
+ entry.SetTime(time);
+ return entry;
+ }
+ catch
+ {
+ entry.Dispose();
+ throw;
+ }
+ }
+
+ internal static CacheEntry AddOrUpdateEntry(this IBlobCache cache, string objectId, ReadOnlySpan<byte> data, DateTime time)
+ {
+ //See if blob exists
+ if (cache.TryGetValue(objectId, out CacheEntry entry))
+ {
+ //Update the entry since it exists
+ entry.UpdateData(data);
+
+ entry.SetTime(time);
+ }
+ else
+ {
+ //Create the new entry
+ entry = cache.CreateEntry(objectId, data, time);
+ }
+
+ return entry;
+ }
+
+ internal static CacheEntry TryChangeKey(this IBlobCache cache, string objectId, string alternateId, ReadOnlySpan<byte> data, DateTime time)
+ {
+ //Change the key of the blob item and update its data
+ if (cache.TryChangeKey(objectId, alternateId, out CacheEntry entry))
+ {
+ //If date is 0 length do not overwrite the old entry if found
+ if (data.IsEmpty)
+ {
+ return entry;
+ }
+
+ //Otherwise update the entry
+ entry.UpdateData(data);
+ entry.SetTime(time);
+
+ return entry;
+ }
+ else
+ {
+ //entry does not exist at the old id, so we can create a new one at the alternate id
+ return cache.CreateEntry(objectId, data, time);
+ }
+ }
+
+
+ /// <summary>
+ /// Asynchronously adds or updates an object in the store and optionally update's it's id
+ /// </summary>
+ /// <param name="table"></param>
+ /// <param name="objectId">The current (or old) id of the object</param>
+ /// <param name="alternateId">An optional id to update the blob to</param>
+ /// <param name="bodyData">A callback that returns the data for the blob</param>
+ /// <param name="state">The state parameter to pass to the data callback</param>
+ /// <param name="time">The time to set on the cache record</param>
+ /// <param name="cancellation">A token to cancel the async operation</param>
+ /// <returns>A value task that represents the async operation</returns>
+ public static async ValueTask AddOrUpdateObjectAsync<T>(
+ this IBlobCacheTable table,
+ string objectId,
+ string? alternateId,
+ GetBodyDataCallback<T> bodyData,
+ T state,
+ DateTime time,
+ CancellationToken cancellation = default)
+ {
+ //See if an id change is required
+ if (string.IsNullOrWhiteSpace(alternateId))
+ {
+ //safe to get the bucket for the primary id
+ IBlobCacheBucket bucket = table.GetBucket(objectId);
+
+ //Wait for the bucket
+ using CacheBucketHandle handle = await bucket.WaitAsync(cancellation);
+
+ //add/update for single entity
+ _ = handle.Cache.AddOrUpdateEntry(objectId, bodyData(state), time);
+ }
+ else
+ {
+ //Buckets for each id need to be obtained
+ IBlobCacheBucket primary = table.GetBucket(objectId);
+ IBlobCacheBucket alternate = table.GetBucket(alternateId);
+
+ //Same bucket
+ if (ReferenceEquals(primary, alternate))
+ {
+ //wait for lock on only one bucket otherwise dealock
+ using CacheBucketHandle handle = await primary.WaitAsync(cancellation);
+
+ //Update the entry for the single bucket
+ _ = handle.Cache.TryChangeKey(objectId, alternateId, bodyData(state), time);
+ }
+ else
+ {
+ //Buckets are different must be awaited individually
+ using CacheBucketHandle primaryHandle = await primary.WaitAsync(cancellation);
+ using CacheBucketHandle alternateHandle = await alternate.WaitAsync(cancellation);
+
+ //Get the entry from the primary hande
+ if (primaryHandle.Cache.Remove(objectId, out CacheEntry entry))
+ {
+ try
+ {
+ //Update the handle data and reuse the entry
+ entry.UpdateData(bodyData(state));
+
+ //Add the updated entry to the alternate table
+ alternateHandle.Cache.Add(alternateId, entry);
+ }
+ catch
+ {
+ //Cleanup handle if error adding
+ entry.Dispose();
+ throw;
+ }
+ }
+ else
+ {
+ //Old entry did not exist, we need to create a new entry for the alternate bucket
+ _ = alternateHandle.Cache.CreateEntry(alternateId, bodyData(state), time);
+ }
+ }
+ }
+ }
+
+ /// <summary>
+ /// Asynchronously deletes a previously stored item
+ /// </summary>
+ /// <param name="table"></param>
+ /// <param name="objectId">The id of the object to delete</param>
+ /// <param name="cancellation">A token to cancel the async lock await</param>
+ /// <returns>A task that completes when the item has been deleted</returns>
+ public static async ValueTask<bool> DeleteObjectAsync(this IBlobCacheTable table, string objectId, CancellationToken cancellation = default)
+ {
+ //Try to get the bucket that the id should belong to
+ IBlobCacheBucket bucket = table.GetBucket(objectId);
+
+ //Wait for lock on bucket async
+ using CacheBucketHandle handle = await bucket.WaitAsync(cancellation);
+
+ //Remove the object from the blob store
+ return handle.Cache.Remove(objectId);
+ }
+ }
+}
diff --git a/lib/VNLib.Data.Caching.ObjectCache/src/BlobCacheLIstener.cs b/lib/VNLib.Data.Caching.ObjectCache/src/BlobCacheLIstener.cs
new file mode 100644
index 0000000..818dfcf
--- /dev/null
+++ b/lib/VNLib.Data.Caching.ObjectCache/src/BlobCacheLIstener.cs
@@ -0,0 +1,285 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Data.Caching.ObjectCache
+* File: BlobCacheLIstener.cs
+*
+* BlobCacheLIstener.cs is part of VNLib.Data.Caching.ObjectCache which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* VNLib.Data.Caching.ObjectCache is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Data.Caching.ObjectCache is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+
+/*
+ * The latest bucket based cache store relys on bucket level locking
+ * to distribute locking across threads and reduce contention.
+ *
+ * This design relys on holding the bucket lock for the entire duration
+ * of the CacheEntry manipulation, its id, movment, and reading/writing
+ * the entirie's contents.
+ *
+ * Some drawbacks are the basics with key-derrived bucket systems:
+ * bucket imbalance due to key distribtion.
+ *
+ * Design perfers average speed, but will need to be tested heavily per
+ * use-case.
+ */
+
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+
+using VNLib.Utils.Async;
+using VNLib.Utils.Memory;
+using VNLib.Utils.Logging;
+using VNLib.Net.Messaging.FBM.Server;
+using static VNLib.Data.Caching.Constants;
+
+namespace VNLib.Data.Caching.ObjectCache
+{
+ public delegate ReadOnlySpan<byte> GetBodyDataCallback<T>(T state);
+
+ /// <summary>
+ /// A <see cref="FBMListener"/> implementation of a <see cref="CacheListener"/>
+ /// </summary>
+ public class BlobCacheLIstener : FBMListenerBase, IDisposable
+ {
+ private bool disposedValue;
+
+ ///<inheritdoc/>
+ protected override ILogProvider Log { get; }
+
+ /// <summary>
+ /// A queue that stores update and delete events
+ /// </summary>
+ public AsyncQueue<ChangeEvent> EventQueue { get; }
+
+ /// <summary>
+ /// The Cache store to access data blobs
+ /// </summary>
+ public IBlobCacheTable Cache { get; }
+
+
+ /// <summary>
+ /// Initialzies a new <see cref="BlobCacheLIstener"/>
+ /// </summary>
+ /// <param name="cacheMax">The maxium number of items per bucket</param>
+ /// <param name="buckets">The number of cache store buckets</param>
+ /// <param name="log"></param>
+ /// <param name="heap">The heap to alloc FBM buffers and <see cref="CacheEntry"/> cache buffers from</param>
+ /// <param name="singleReader">A value that indicates if a single thread is processing events</param>
+ public BlobCacheLIstener(uint buckets, uint cacheMax, ILogProvider log, IUnmangedHeap heap, bool singleReader)
+ {
+ Log = log;
+
+ //Writes may happen from multple threads with bucket design and no lock
+ EventQueue = new(false, singleReader);
+
+ Cache = new BlobCacheTable(buckets, cacheMax, heap);
+ InitListener(heap);
+ }
+
+ ///<inheritdoc/>
+ protected override async Task ProcessAsync(FBMContext context, object? userState, CancellationToken exitToken)
+ {
+ try
+ {
+ //Get the action header
+ string action = context.Method();
+
+ //Optional newid header
+ string? alternateId = context.NewObjectId();
+
+ switch (action)
+ {
+ case Actions.Get:
+ {
+ //Get the object-id header
+ string objectId = context.ObjectId();
+
+ //Process read
+ await ReadEntryAsync(context, objectId, exitToken);
+ return;
+ }
+ case Actions.AddOrUpdate:
+ {
+ //Get the object-id header
+ string objectId = context.ObjectId();
+
+ //Create change event for the object
+ ChangeEvent change = new(objectId, alternateId, false);
+
+ await AddOrUpdateAsync(context, change, exitToken);
+ return;
+ }
+ case Actions.Delete:
+ {
+ //Get the object-id header
+ string objectId = context.ObjectId();
+
+ //Create change event
+ ChangeEvent change = new(objectId, alternateId, true);
+
+ await DeleteEntryAsync(context, change, exitToken);
+ return;
+ }
+ // event queue dequeue request
+ case Actions.Dequeue:
+ {
+ static void SetResponse(ChangeEvent change, FBMContext context)
+ {
+ if (change.Deleted)
+ {
+ context.CloseResponse("deleted");
+ context.Response.WriteHeader(ObjectId, change.CurrentId);
+ }
+ else
+ {
+ //Changed
+ context.CloseResponse("modified");
+ context.Response.WriteHeader(ObjectId, change.CurrentId);
+
+ //Set old id if an old id is set
+ if (change.CurrentId != null)
+ {
+ context.Response.WriteHeader(NewObjectId, change.AlternateId);
+ }
+ }
+ }
+
+ static async Task DequeAsync(AsyncQueue<ChangeEvent> queue, FBMContext context, CancellationToken exitToken)
+ {
+ //Wait for a new message to process
+ ChangeEvent ev = await queue.DequeueAsync(exitToken);
+
+ //Set the response
+ SetResponse(ev, context);
+ }
+
+ //If no event bus is registered, then this is not a legal command
+ if (userState is not AsyncQueue<ChangeEvent> eventBus)
+ {
+ context.CloseResponse(ResponseCodes.NotFound);
+
+ return;
+ }
+
+ //try to deq without awaiting
+ if (eventBus.TryDequeue(out ChangeEvent? change))
+ {
+ SetResponse(change, context);
+ }
+ else
+ {
+ //Process async
+ await DequeAsync(eventBus, context, exitToken);
+ }
+
+ return;
+ }
+
+ }
+
+ Log.Error("Unhandled cache event for session {id}", context.Request.ConnectionId);
+ context.CloseResponse(ResponseCodes.Error);
+ }
+ catch (OperationCanceledException)
+ {
+ throw;
+ }
+ catch(Exception ex)
+ {
+ //Log error and set error status code
+ Log.Error(ex);
+ context.CloseResponse(ResponseCodes.Error);
+ }
+ }
+
+ private async ValueTask ReadEntryAsync(FBMContext context, string objectId, CancellationToken cancellation)
+ {
+ //Try to get the bucket that the id should belong to
+ IBlobCacheBucket bucket = Cache.GetBucket(objectId);
+
+ //Wait for lock on bucket async
+ using CacheBucketHandle handle = await bucket.WaitAsync(cancellation);
+
+ if (handle.Cache.TryGetValue(objectId, out CacheEntry data))
+ {
+ //Set the status code and write the buffered data to the response buffer
+ context.CloseResponse(ResponseCodes.Okay);
+
+ //Copy data to response buffer
+ context.Response.WriteBody(data.GetDataSegment());
+ }
+ else
+ {
+ context.CloseResponse(ResponseCodes.NotFound);
+ }
+ }
+
+ private async ValueTask DeleteEntryAsync(FBMContext context, ChangeEvent change, CancellationToken cancellation)
+ {
+ //Remove the object from the blob store
+ bool found = await Cache.DeleteObjectAsync(change.CurrentId, cancellation);
+
+ context.CloseResponse(found ? ResponseCodes.Okay : ResponseCodes.NotFound);
+
+ //Enque change if item was successfully deleted
+ if (found)
+ {
+ EnqueEvent(change);
+ }
+ }
+
+ private async ValueTask AddOrUpdateAsync(FBMContext context, ChangeEvent change, CancellationToken cancellation)
+ {
+ //Run add/update and get the valuetask
+ await Cache.AddOrUpdateObjectAsync(change.CurrentId, change.AlternateId, static r => r.BodyData, context.Request, default, cancellation);
+
+ EnqueEvent(change);
+
+ context.CloseResponse(ResponseCodes.Okay);
+ }
+
+ private void EnqueEvent(ChangeEvent change)
+ {
+ if (!EventQueue.TryEnque(change))
+ {
+ Log.Warn("Change event {ev} was not enqued because the event queue is overflowing!", change.CurrentId);
+ }
+ }
+
+
+ ///<inheritdoc/>
+ protected virtual void Dispose(bool disposing)
+ {
+ if (!disposedValue)
+ {
+ Cache.Dispose();
+
+ disposedValue = true;
+ }
+ }
+
+ ///<inheritdoc/>
+ public void Dispose()
+ {
+ // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method
+ Dispose(disposing: true);
+ GC.SuppressFinalize(this);
+ }
+ }
+}
diff --git a/lib/VNLib.Data.Caching.ObjectCache/src/BlobCacheTable.cs b/lib/VNLib.Data.Caching.ObjectCache/src/BlobCacheTable.cs
new file mode 100644
index 0000000..270cf1e
--- /dev/null
+++ b/lib/VNLib.Data.Caching.ObjectCache/src/BlobCacheTable.cs
@@ -0,0 +1,139 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Data.Caching.ObjectCache
+* File: BlobCacheTable.cs
+*
+* BlobCacheTable.cs is part of VNLib.Data.Caching.ObjectCache which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* VNLib.Data.Caching.ObjectCache is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Data.Caching.ObjectCache is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+using System.Linq;
+using System.Collections;
+using System.Collections.Generic;
+
+using VNLib.Utils;
+using VNLib.Utils.Memory;
+
+namespace VNLib.Data.Caching.ObjectCache
+{
+ /// <summary>
+ /// A concrete implementation of a <see cref="IBlobCacheTable"/>
+ /// </summary>
+ public sealed class BlobCacheTable : VnDisposeable, IBlobCacheTable
+ {
+ private readonly uint _tableSize;
+ private readonly IBlobCacheBucket[] _buckets;
+
+ /// <summary>
+ /// Initializes a new <see cref="BlobCacheTable"/>
+ /// </summary>
+ /// <param name="bucketSize">The number of elements in each bucket</param>
+ /// <param name="tableSize">The number of buckets within the table</param>
+ /// <param name="heap">The heap used to allocate cache entry buffers from</param>
+ /// <exception cref="ArgumentNullException"></exception>
+ /// <exception cref="ArgumentException"></exception>
+ public BlobCacheTable(uint tableSize, uint bucketSize, IUnmangedHeap heap)
+ {
+ _ = heap ?? throw new ArgumentNullException(nameof(heap));
+
+ if(tableSize == 0)
+ {
+ throw new ArgumentException("Cache table must have atleast 1 bucket");
+ }
+
+ //Init bucket table
+ _tableSize = tableSize;
+ _buckets = new IBlobCacheBucket[tableSize];
+
+ //Init buckets
+ InitBuckets(tableSize, bucketSize, _buckets, heap);
+ }
+
+
+ private static void InitBuckets(uint size, uint bucketSize, IBlobCacheBucket[] table, IUnmangedHeap heap)
+ {
+ for(int i = 0; i < size; i++)
+ {
+ table[i] = new BlobCacheBucket((int)bucketSize, heap);
+ }
+ }
+
+ /*
+ * A very simple algorithm that captures unique values
+ * from an object id and builds an unsigned 32bit integer
+ * used to determine the bucked index within the table.
+ *
+ * This method will alawys result in the same index for
+ * for a given object-id
+ */
+
+ private uint FastGetBucketIndexFromId(ReadOnlySpan<char> objectId)
+ {
+ if (objectId.Length < 4)
+ {
+ throw new ArgumentException("Object id must be larger than 3 characters");
+ }
+
+ Span<byte> buffer = stackalloc byte[4];
+
+ //cast the characters
+ buffer[0] = (byte)objectId[0];
+ buffer[1] = (byte)objectId[objectId.Length / 2];
+ buffer[2] = (byte)objectId[1];
+ buffer[3] = (byte)objectId[^1];
+
+ //Read the buffer back to a uint and mod by the table size to get the bucket index
+ return BitConverter.ToUInt32(buffer) % _tableSize;
+ }
+
+
+ ///<inheritdoc/>
+ ///<exception cref="ObjectDisposedException"></exception>
+ public IBlobCacheBucket GetBucket(ReadOnlySpan<char> objectId)
+ {
+ Check();
+
+ //If tablesize is 1, skip lookup, otherwise perform bucket index lookup
+ uint index = _tableSize == 1 ? 0 : FastGetBucketIndexFromId(objectId);
+
+ return _buckets[index];
+ }
+
+ ///<inheritdoc/>
+ protected sealed override void Free()
+ {
+ //Dispose buckets
+ Array.ForEach(_buckets, static b => b.Dispose());
+ }
+
+ ///<inheritdoc/>
+ public IEnumerator<IBlobCacheBucket> GetEnumerator()
+ {
+ Check();
+ return _buckets.AsEnumerable().GetEnumerator();
+ }
+
+ ///<inheritdoc/>
+ IEnumerator IEnumerable.GetEnumerator()
+ {
+ Check();
+ return _buckets.AsEnumerable().GetEnumerator();
+ }
+ }
+}
diff --git a/lib/VNLib.Data.Caching.ObjectCache/src/CacheBucketHandle.cs b/lib/VNLib.Data.Caching.ObjectCache/src/CacheBucketHandle.cs
new file mode 100644
index 0000000..f9c1f17
--- /dev/null
+++ b/lib/VNLib.Data.Caching.ObjectCache/src/CacheBucketHandle.cs
@@ -0,0 +1,111 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Data.Caching.ObjectCache
+* File: CacheBucketHandle.cs
+*
+* CacheBucketHandle.cs is part of VNLib.Data.Caching.ObjectCache which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* VNLib.Data.Caching.ObjectCache is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Data.Caching.ObjectCache is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+using System.Diagnostics.CodeAnalysis;
+
+namespace VNLib.Data.Caching.ObjectCache
+{
+ /// <summary>
+ /// Holds an exclusive lock on a <see cref="IBlobCacheBucket"/>, and exposes
+ /// acess to its internal <see cref="IBlobCache"/>
+ /// </summary>
+ public readonly struct CacheBucketHandle : IDisposable, IEquatable<CacheBucketHandle>
+ {
+ private readonly IBlobCacheBucket? _bucket;
+
+ /// <summary>
+ /// The <see cref="IBlobCache"/> held by the current handle
+ /// </summary>
+ public readonly IBlobCache Cache { get; }
+
+ /// <summary>
+ /// Initializes an empty blobcache handle
+ /// </summary>
+ public CacheBucketHandle()
+ {
+ _bucket = null;
+ Cache = null!;
+ }
+
+ /// <summary>
+ /// Creates a new bucket lock handle to be released on dispose
+ /// </summary>
+ /// <param name="bucket">The bucket to release access to on dispose</param>
+ /// <param name="cache">The underlying <see cref="IBlobCache"/> provide exclusive access to</param>
+ public CacheBucketHandle(IBlobCacheBucket bucket, IBlobCache cache)
+ {
+ _bucket = bucket;
+ Cache = cache;
+ }
+
+ /// <summary>
+ /// Releases the exlusive lock held on the bucket
+ /// </summary>
+ public void Dispose()
+ {
+ //Release the bucket when disposed
+ _bucket?.Release();
+ }
+
+ /// <summary>
+ /// Determines if the other handle instance is equal to the current. Handles are
+ /// equal iff the underlying bucket referrence is equal.
+ /// </summary>
+ /// <param name="other">The other handle to compare</param>
+ /// <returns>True if the handles hold a referrence to the same bucket</returns>
+ public bool Equals(CacheBucketHandle other) => _bucket?.Equals(other._bucket) ?? false;
+ /// <summary>
+ /// Determines if the other handle instance is equal to the current. Handles are
+ /// equal iff the underlying bucket referrence is equal.
+ /// </summary>
+ /// <param name="obj">The other handle to compare</param>
+ /// <returns>True if the handles hold a referrence to the same bucket</returns>
+ public override bool Equals([NotNullWhen(true)] object? obj) => obj is CacheBucketHandle other && Equals(other);
+
+ /// <summary>
+ /// Gets the hashcode of the underlying bucket
+ /// </summary>
+ /// <returns></returns>
+ public override int GetHashCode() => _bucket?.GetHashCode() ?? -1;
+
+ /// <summary>
+ /// Determines if the handles are equal by the <see cref="Equals(CacheBucketHandle)"/>
+ /// method.
+ /// </summary>
+ /// <param name="left"></param>
+ /// <param name="right"></param>
+ /// <returns>True if the internal bucket references are equal</returns>
+ public static bool operator ==(CacheBucketHandle left, CacheBucketHandle right) => left.Equals(right);
+
+ /// <summary>
+ /// Determines if the handles are equal by the <see cref="Equals(CacheBucketHandle)"/>
+ /// method.
+ /// </summary>
+ /// <param name="left"></param>
+ /// <param name="right"></param>
+ /// <returns>True if the internal bucket references are NOT equal</returns>
+ public static bool operator !=(CacheBucketHandle left, CacheBucketHandle right) => !(left == right);
+ }
+}
diff --git a/lib/VNLib.Data.Caching.ObjectCache/src/CacheEntry.cs b/lib/VNLib.Data.Caching.ObjectCache/src/CacheEntry.cs
index 8644d1d..3d61790 100644
--- a/lib/VNLib.Data.Caching.ObjectCache/src/CacheEntry.cs
+++ b/lib/VNLib.Data.Caching.ObjectCache/src/CacheEntry.cs
@@ -32,7 +32,8 @@ using VNLib.Utils.Extensions;
namespace VNLib.Data.Caching
{
/// <summary>
- /// A structure that represents an item in cache
+ /// A structure that represents an item in cache. It contains the binary content
+ /// of a cache entry by its internal memory handle
/// </summary>
public readonly struct CacheEntry : IDisposable, IEquatable<CacheEntry>
{
@@ -89,7 +90,7 @@ namespace VNLib.Data.Caching
///<inheritdoc/>
- public readonly void Dispose() => _handle.Dispose();
+ public readonly void Dispose() => _handle?.Dispose();
[MethodImpl(MethodImplOptions.AggressiveInlining)]
@@ -116,7 +117,7 @@ namespace VNLib.Data.Caching
/// </summary>
/// <returns>The last date stored</returns>
/// <exception cref="ObjectDisposedException"></exception>
- public readonly DateTime GetCreatedTime()
+ public readonly DateTime GetTime()
{
//Get the time segment and write the value in big endian
ReadOnlySpan<byte> segment = GetTimeSegment();
diff --git a/lib/VNLib.Data.Caching.ObjectCache/src/IBlobCache.cs b/lib/VNLib.Data.Caching.ObjectCache/src/IBlobCache.cs
new file mode 100644
index 0000000..52d53ff
--- /dev/null
+++ b/lib/VNLib.Data.Caching.ObjectCache/src/IBlobCache.cs
@@ -0,0 +1,87 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Data.Caching.ObjectCache
+* File: IBlobCache.cs
+*
+* IBlobCache.cs is part of VNLib.Data.Caching.ObjectCache which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* VNLib.Data.Caching.ObjectCache is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Data.Caching.ObjectCache is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+using System.Collections.Generic;
+
+using VNLib.Utils.Memory;
+
+namespace VNLib.Data.Caching.ObjectCache
+{
+ /// <summary>
+ /// Represents a binary data cache store
+ /// </summary>
+ public interface IBlobCache : IEnumerable<KeyValuePair<string, CacheEntry>>, IDisposable
+ {
+ /// <summary>
+ /// The internal heap used to allocate <see cref="CacheEntry"/> buffers
+ /// </summary>
+ IUnmangedHeap CacheHeap { get; }
+
+ /// <summary>
+ /// Attempts to retreive the entry at the given id.
+ /// </summary>
+ /// <param name="objectId">The id of the object to locate</param>
+ /// <param name="entry">The cache entry if found, default otherwise</param>
+ /// <returns>True if the entry was assigned</returns>
+ bool TryGetValue(string objectId, out CacheEntry entry);
+
+ /// <summary>
+ /// Attempts to relocate the entry in the table by its new id.
+ /// </summary>
+ /// <param name="objectId">The original id of the entry to modify</param>
+ /// <param name="newId">The new id of the entry</param>
+ /// <param name="entry">The original entry if found, default otherwise</param>
+ /// <returns>True if the item was located and successfully updated, false if the operation failed</returns>
+ bool TryChangeKey(string objectId, string newId, out CacheEntry entry);
+
+ /// <summary>
+ /// Adds the entry to the table by the id
+ /// </summary>
+ /// <param name="objectId"></param>
+ /// <param name="entry">The entry to store in the table</param>
+ void Add(string objectId, CacheEntry entry);
+
+ /// <summary>
+ /// Attempts to remove the entry at the given id, and returns the
+ /// entry if located.
+ /// </summary>
+ /// <param name="objectId">The id of the entry to remove</param>
+ /// <param name="entry">The entry if found, default otherwise</param>
+ /// <returns>True if the entry existed in the store, false otherwise</returns>
+ /// <remarks>
+ /// NOTE: If the return value is true, the store no longer maintains the lifetime
+ /// of the returned <see cref="CacheEntry"/>. You must manually dispose the entry
+ /// to avoid memory leaks.
+ /// </remarks>
+ bool Remove(string objectId, out CacheEntry entry);
+
+ /// <summary>
+ /// Attempts to remove the entry at the given id, and release its memory.
+ /// </summary>
+ /// <param name="objectId">The id of the entry to remove</param>
+ /// <returns>True if the entry was found and disposed</returns>
+ bool Remove(string objectId);
+ }
+}
diff --git a/lib/VNLib.Data.Caching.ObjectCache/src/IBlobCacheBucket.cs b/lib/VNLib.Data.Caching.ObjectCache/src/IBlobCacheBucket.cs
new file mode 100644
index 0000000..4876c5f
--- /dev/null
+++ b/lib/VNLib.Data.Caching.ObjectCache/src/IBlobCacheBucket.cs
@@ -0,0 +1,58 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Data.Caching.ObjectCache
+* File: ObjectCacheStore.cs
+*
+* ObjectCacheStore.cs is part of VNLib.Data.Caching.ObjectCache which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* VNLib.Data.Caching.ObjectCache is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Data.Caching.ObjectCache is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace VNLib.Data.Caching.ObjectCache
+{
+ /// <summary>
+ /// Represents a singe cache bucket that maintains cache concurrent <see cref="IBlobCache"/>
+ /// operations.
+ /// </summary>
+ public interface IBlobCacheBucket : IDisposable
+ {
+ /// <summary>
+ /// Gets a <see cref="CacheBucketHandle"/> that holds an exclusive lock
+ /// for the current bucekt and holds a referrence to the stored
+ /// <see cref="IBlobCache"/>
+ /// </summary>
+ /// <param name="cancellation">A token to cancel the wait operation</param>
+ /// <returns>A <see cref="CacheBucketHandle"/> that holds the <see cref="IBlobCache"/> referrence</returns>
+ ValueTask<CacheBucketHandle> WaitAsync(CancellationToken cancellation);
+
+ /// <summary>
+ /// Allows for waiting for the cache directly, IE without receiving a lock handle
+ /// </summary>
+ /// <param name="cancellation"></param>
+ /// <returns>The underlying <see cref="IBlobCache"/> that now has exlcusive access</returns>
+ ValueTask<IBlobCache> ManualWaitAsync(CancellationToken cancellation);
+
+ /// <summary>
+ /// Releases an exlcusive lock on the current bucket, DO NOT CALL BY USER CODE
+ /// </summary>
+ void Release();
+ }
+}
diff --git a/lib/VNLib.Data.Caching.ObjectCache/src/IBlobCacheTable.cs b/lib/VNLib.Data.Caching.ObjectCache/src/IBlobCacheTable.cs
new file mode 100644
index 0000000..d84aecf
--- /dev/null
+++ b/lib/VNLib.Data.Caching.ObjectCache/src/IBlobCacheTable.cs
@@ -0,0 +1,43 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Data.Caching.ObjectCache
+* File: IBlobCacheTable.cs
+*
+* IBlobCacheTable.cs is part of VNLib.Data.Caching.ObjectCache which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* VNLib.Data.Caching.ObjectCache is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Data.Caching.ObjectCache is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+using System.Collections.Generic;
+
+namespace VNLib.Data.Caching.ObjectCache
+{
+ /// <summary>
+ /// A table that contains a collection of blob cache buckets
+ /// for improved cache concurrency
+ /// </summary>
+ public interface IBlobCacheTable : IEnumerable<IBlobCacheBucket>, IDisposable
+ {
+ /// <summary>
+ /// Gets a bucket that should contain the object by its id
+ /// </summary>
+ /// <param name="objectId">The id of the object to get the bucket for</param>
+ /// <returns>The bucket that should contain the object</returns>
+ IBlobCacheBucket GetBucket(ReadOnlySpan<char> objectId);
+ }
+}
diff --git a/lib/VNLib.Data.Caching.ObjectCache/src/ObjectCacheStore.cs b/lib/VNLib.Data.Caching.ObjectCache/src/ObjectCacheStore.cs
deleted file mode 100644
index af1e730..0000000
--- a/lib/VNLib.Data.Caching.ObjectCache/src/ObjectCacheStore.cs
+++ /dev/null
@@ -1,463 +0,0 @@
-/*
-* Copyright (c) 2022 Vaughn Nugent
-*
-* Library: VNLib
-* Package: VNLib.Data.Caching.ObjectCache
-* File: ObjectCacheStore.cs
-*
-* ObjectCacheStore.cs is part of VNLib.Data.Caching.ObjectCache which is part of the larger
-* VNLib collection of libraries and utilities.
-*
-* VNLib.Data.Caching.ObjectCache is free software: you can redistribute it and/or modify
-* it under the terms of the GNU Affero General Public License as
-* published by the Free Software Foundation, either version 3 of the
-* License, or (at your option) any later version.
-*
-* VNLib.Data.Caching.ObjectCache is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-* GNU Affero General Public License for more details.
-*
-* You should have received a copy of the GNU Affero General Public License
-* along with this program. If not, see https://www.gnu.org/licenses/.
-*/
-
-using System;
-using System.IO;
-using System.Threading;
-using System.Threading.Tasks;
-
-using VNLib.Utils.Async;
-using VNLib.Utils.Memory;
-using VNLib.Utils.Logging;
-using VNLib.Utils.Extensions;
-using VNLib.Net.Messaging.FBM.Server;
-using static VNLib.Data.Caching.Constants;
-
-
-#pragma warning disable CA1849 // Call async methods when in an async method
-
-namespace VNLib.Data.Caching.ObjectCache
-{
- public delegate ReadOnlySpan<byte> GetBodyDataCallback<T>(T state);
-
- /// <summary>
- /// A <see cref="FBMListener"/> implementation of a <see cref="CacheListener"/>
- /// </summary>
- public class ObjectCacheStore : FBMListenerBase, IDisposable
- {
- private readonly SemaphoreSlim StoreLock;
- private bool disposedValue;
-
- ///<inheritdoc/>
- protected override ILogProvider Log { get; }
-
- /// <summary>
- /// A queue that stores update and delete events
- /// </summary>
- public AsyncQueue<ChangeEvent> EventQueue { get; }
-
- /// <summary>
- /// The Cache store to access data blobs
- /// </summary>
- private readonly BlobCache Cache;
-
- private readonly IUnmangedHeap Heap;
-
-
- /// <summary>
- /// Initialzies a new <see cref="ObjectCacheStore"/>
- /// </summary>
- /// <param name="cacheMax"></param>
- /// <param name="log"></param>
- /// <param name="heap"></param>
- /// <param name="singleReader">A value that indicates if a single thread is processing events</param>
- public ObjectCacheStore(int cacheMax, ILogProvider log, IUnmangedHeap heap, bool singleReader)
- {
- Log = log;
- //We can use a single writer and single reader in this context
- EventQueue = new(true, singleReader);
- Cache = new(cacheMax);
- Heap = heap;
- InitListener(heap);
- StoreLock = new(1,1);
- }
-
- ///<inheritdoc/>
- protected override Task ProcessAsync(FBMContext context, object? userState, CancellationToken exitToken)
- {
- try
- {
- //Get the action header
- string action = context.Method();
-
- //Optional newid header
- string? alternateId = context.NewObjectId();
-
- switch (action)
- {
- case Actions.Get:
- {
- //Get the object-id header
- string objectId = context.ObjectId();
-
- //Try read sync
- if (StoreLock.Wait(0))
- {
- try
- {
- UnsafeReadEntry(context, objectId);
- }
- finally
- {
- StoreLock.Release();
- }
-
- return Task.CompletedTask;
- }
- else
- {
- //Read entry async
- return InternalReadEntryAsync(context, objectId, exitToken);
- }
- }
-
- case Actions.AddOrUpdate:
- {
- //Get the object-id header
- string objectId = context.ObjectId();
-
- //Create change event for the object
- ChangeEvent change = new(objectId, alternateId, false);
-
- //Attempt to aquire lock sync
- if (StoreLock.Wait(0))
- {
- //aquired sync
- try
- {
- //Update the item
- UnsafeAddOrUpdate(objectId, alternateId, GetBodyData, context);
- }
- finally
- {
- StoreLock.Release();
- }
-
- //Add to event queue
- EnqueEvent(change);
-
- //Set status code
- context.CloseResponse(ResponseCodes.Okay);
-
- return Task.CompletedTask;
- }
- else
- {
- //Lock will be awaited async and
- return InternalAddOrUpdateAsync(context, change, exitToken);
- }
- }
- case Actions.Delete:
- {
- //Get the object-id header
- string objectId = context.ObjectId();
-
- //Create change event
- ChangeEvent change = new(objectId, alternateId, true);
-
- //See if lock can be entered without waiting
- if (StoreLock.Wait(0))
- {
- bool found = false;
-
- try
- {
- //Sync
- found = UnsafeDeleteEntry(objectId);
- }
- finally
- {
- StoreLock.Release();
- }
-
- //Notify change
- EnqueEvent(change);
-
- //Set status code if found
- context.CloseResponse(found ? ResponseCodes.Okay : ResponseCodes.NotFound);
-
- return Task.CompletedTask;
- }
- else
- {
- //lock will yeild async
- return InternalDeleteAsync(context, change, exitToken);
- }
- }
- // event queue dequeue request
- case Actions.Dequeue:
- {
- static void SetResponse(ChangeEvent change, FBMContext context)
- {
- if (change.Deleted)
- {
- context.CloseResponse("deleted");
- context.Response.WriteHeader(ObjectId, change.CurrentId);
- }
- else
- {
- //Changed
- context.CloseResponse("modified");
- context.Response.WriteHeader(ObjectId, change.CurrentId);
-
- //Set old id if an old id is set
- if (change.CurrentId != null)
- {
- context.Response.WriteHeader(NewObjectId, change.AlternateId);
- }
- }
- }
-
- static async Task DequeAsync(AsyncQueue<ChangeEvent> queue, FBMContext context, CancellationToken exitToken)
- {
- //Wait for a new message to process
- ChangeEvent ev = await queue.DequeueAsync(exitToken);
-
- //Set the response
- SetResponse(ev, context);
- }
-
- //If no event bus is registered, then this is not a legal command
- if (userState is not AsyncQueue<ChangeEvent> eventBus)
- {
- context.CloseResponse(ResponseCodes.NotFound);
-
- return Task.CompletedTask;
- }
-
- //try to deq without awaiting
- if (eventBus.TryDequeue(out ChangeEvent? change))
- {
- SetResponse(change, context);
-
- return Task.CompletedTask;
- }
- else
- {
- //Process async
- return DequeAsync(eventBus, context, exitToken);
- }
- }
-
- }
-
- Log.Error("Unhandled cache event!");
- }
- catch (OperationCanceledException)
- {
- throw;
- }
- catch(Exception ex)
- {
- //Log error and set error status code
- Log.Error(ex);
- context.CloseResponse(ResponseCodes.Error);
- }
-
- return Task.CompletedTask;
- }
-
-
- private static ReadOnlySpan<byte> GetBodyData(FBMContext ctx) => ctx.Request.BodyData;
-
- private void EnqueEvent(ChangeEvent change)
- {
- if (!EventQueue.TryEnque(change))
- {
- Log.Warn("Change event {ev} was not enqued because the event queue is overflowing!", change.CurrentId);
- }
- }
-
- private void UnsafeReadEntry(FBMContext context, string objectId)
- {
- if (Cache!.TryGetValue(objectId, out CacheEntry data))
- {
- //Set the status code and write the buffered data to the response buffer
- context.CloseResponse(ResponseCodes.Okay);
-
- //Copy data to response buffer
- context.Response.WriteBody(data.GetDataSegment());
- }
- else
- {
- context.CloseResponse(ResponseCodes.NotFound);
- }
- }
-
- async Task InternalReadEntryAsync(FBMContext context, string objectId, CancellationToken cancellation)
- {
- //enter lock async
- using SemSlimReleaser rel = await StoreLock.GetReleaserAsync(cancellation);
-
- UnsafeReadEntry(context, objectId);
- }
-
- private async Task InternalAddOrUpdateAsync(FBMContext context, ChangeEvent change, CancellationToken cancellation)
- {
- //Wait for lock since we know it will yeild async
- using (SemSlimReleaser rel = await StoreLock.GetReleaserAsync(cancellation))
- {
- UnsafeAddOrUpdate(change.CurrentId, change.AlternateId, GetBodyData, context);
- }
-
- //Add to event queue
- EnqueEvent(change);
-
- //Set status code
- context.CloseResponse(ResponseCodes.Okay);
- }
-
- private void UnsafeAddOrUpdate<T>(string objectId, string? alternateId, GetBodyDataCallback<T> bodyData, T state)
- {
- CacheEntry entry;
-
- //See if new/alt session id was specified
- if (string.IsNullOrWhiteSpace(alternateId))
- {
- //See if blob exists
- if (!Cache!.TryGetValue(objectId, out entry))
- {
- //Create the new cache entry since it does not exist
- entry = CacheEntry.Create(bodyData(state), Heap);
-
- //Add to cache
- Cache.Add(objectId, entry);
- }
- else
- {
- //Reset the buffer state
- entry.UpdateData(bodyData(state));
- }
- }
- //Need to change the id of the record
- else
- {
- //Try to change the blob key
- if (!Cache!.TryChangeKey(objectId, alternateId, out entry))
- {
- //Create the new cache entry since it does not exist
- entry = CacheEntry.Create(bodyData(state), Heap);
-
- //Add to cache by its alternate id
- Cache.Add(alternateId, entry);
- }
- else
- {
- //Reset the buffer state
- entry.UpdateData(bodyData(state));
- }
- }
-
- //Update modified time to current utc time
- entry.SetTime(DateTime.UtcNow);
- }
-
- private async Task InternalDeleteAsync(FBMContext context, ChangeEvent change, CancellationToken cancellation)
- {
- bool found = false;
-
- //enter the lock
- using(SemSlimReleaser rel = await StoreLock.GetReleaserAsync(cancellation))
- {
- //Sync
- found = UnsafeDeleteEntry(change.CurrentId);
- }
-
- //Notify change
- EnqueEvent(change);
-
- //Set status code if found
- context.CloseResponse(found ? ResponseCodes.Okay : ResponseCodes.NotFound);
- }
-
- private bool UnsafeDeleteEntry(string id) => Cache!.Remove(id);
-
-
- /// <summary>
- /// Asynchronously adds or updates an object in the store and optionally update's its id
- /// </summary>
- /// <param name="objectId">The current (or old) id of the object</param>
- /// <param name="alternateId">An optional id to update the blob to</param>
- /// <param name="bodyData">A callback that returns the data for the blob</param>
- /// <param name="state">The state parameter to pass to the data callback</param>
- /// <param name="token">A token to cancel the async operation</param>
- /// <returns>A value task that represents the async operation</returns>
- public async ValueTask AddOrUpdateBlobAsync<T>(string objectId, string? alternateId, GetBodyDataCallback<T> bodyData, T state, CancellationToken token = default)
- {
- //Test the lock before waiting async
- if (!StoreLock.Wait(0))
- {
- //Wait async to avoid task alloc
- await StoreLock.WaitAsync(token);
- }
- try
- {
- UnsafeAddOrUpdate(objectId, alternateId, bodyData, state);
- }
- finally
- {
- StoreLock.Release();
- }
- }
-
- /// <summary>
- /// Asynchronously deletes a previously stored item
- /// </summary>
- /// <param name="id">The id of the object to delete</param>
- /// <param name="token">A token to cancel the async lock await</param>
- /// <returns>A task that completes when the item has been deleted</returns>
- public async ValueTask<bool> DeleteItemAsync(string id, CancellationToken token = default)
- {
- //Test the lock before waiting async
- if (!StoreLock.Wait(0))
- {
- //Wait async to avoid task alloc
- await StoreLock.WaitAsync(token);
- }
- try
- {
- return UnsafeDeleteEntry(id);
- }
- finally
- {
- StoreLock.Release();
- }
- }
-
-
- ///<inheritdoc/>
- protected virtual void Dispose(bool disposing)
- {
- if (!disposedValue)
- {
- if (disposing)
- {
- Cache?.Clear();
- }
-
- StoreLock.Dispose();
-
- disposedValue = true;
- }
- }
-
- ///<inheritdoc/>
- public void Dispose()
- {
- // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method
- Dispose(disposing: true);
- GC.SuppressFinalize(this);
- }
- }
-}
diff --git a/lib/VNLib.Data.Caching.ObjectCache/src/VNLib.Data.Caching.ObjectCache.csproj b/lib/VNLib.Data.Caching.ObjectCache/src/VNLib.Data.Caching.ObjectCache.csproj
index 119622f..3d1af7f 100644
--- a/lib/VNLib.Data.Caching.ObjectCache/src/VNLib.Data.Caching.ObjectCache.csproj
+++ b/lib/VNLib.Data.Caching.ObjectCache/src/VNLib.Data.Caching.ObjectCache.csproj
@@ -2,18 +2,25 @@
<PropertyGroup>
<TargetFramework>net6.0</TargetFramework>
-
- <Authors>Vaughn Nugent</Authors>
- <Copyright>Copyright © 2023 Vaughn Nugent</Copyright>
+ <RootNamespace>VNLib.Data.Caching.ObjectCache</RootNamespace>
+ <AssemblyName>VNLib.Data.Caching.ObjectCache</AssemblyName>
<Nullable>enable</Nullable>
<GenerateDocumentationFile>True</GenerateDocumentationFile>
- <PackageProjectUrl>https://www.vaughnnugent.com/resources</PackageProjectUrl>
- <AssemblyVersion>1.0.0.1</AssemblyVersion>
- <Version>1.0.1.1</Version>
<AnalysisLevel>latest-all</AnalysisLevel>
- <SignAssembly>True</SignAssembly>
- <AssemblyOriginatorKeyFile>\\vaughnnugent.com\Internal\Folder Redirection\vman\Documents\Programming\Software\StrongNameingKey.snk</AssemblyOriginatorKeyFile>
- <Description>Provides server-side object-cache related infrastructure</Description>
+ </PropertyGroup>
+
+ <PropertyGroup>
+ <Authors>Vaughn Nugent</Authors>
+ <Company>Vaughn Nugent</Company>
+ <Product>VNLib.Data.Caching.ObjectCache</Product>
+ <PackageId>VNLib.Data.Caching.ObjectCache</PackageId>
+ <Description>
+ A library for a high-performance in-memory object-data caching, based on key-derrived cache buckets
+ for wait distribution.
+ </Description>
+ <Copyright>Copyright © 2023 Vaughn Nugent</Copyright>
+ <PackageProjectUrl>https://www.vaughnnugent.com/resources/software/modules/VNLib.Data.Caching</PackageProjectUrl>
+ <RepositoryUrl>https://github.com/VnUgE/VNLib.Data.Caching/tree/master/lib/VNLib.Data.Caching.ObjectCache</RepositoryUrl>
</PropertyGroup>
<ItemGroup>