diff options
Diffstat (limited to 'lib/VNLib.Data.Caching.ObjectCache/src/ObjectCacheStore.cs')
-rw-r--r-- | lib/VNLib.Data.Caching.ObjectCache/src/ObjectCacheStore.cs | 357 |
1 files changed, 279 insertions, 78 deletions
diff --git a/lib/VNLib.Data.Caching.ObjectCache/src/ObjectCacheStore.cs b/lib/VNLib.Data.Caching.ObjectCache/src/ObjectCacheStore.cs index 514d00b..05a4f02 100644 --- a/lib/VNLib.Data.Caching.ObjectCache/src/ObjectCacheStore.cs +++ b/lib/VNLib.Data.Caching.ObjectCache/src/ObjectCacheStore.cs @@ -34,6 +34,9 @@ using VNLib.Utils.Extensions; using VNLib.Net.Messaging.FBM.Server; using static VNLib.Data.Caching.Constants; + +#pragma warning disable CA1849 // Call async methods when in an async method + namespace VNLib.Data.Caching.ObjectCache { public delegate ReadOnlySpan<byte> GetBodyDataCallback<T>(T state); @@ -41,7 +44,7 @@ namespace VNLib.Data.Caching.ObjectCache /// <summary> /// A <see cref="FBMListener"/> implementation of a <see cref="CacheListener"/> /// </summary> - public class ObjectCacheStore : CacheListener, IDisposable + public class ObjectCacheStore : FBMListenerBase, IDisposable { private readonly SemaphoreSlim StoreLock; private bool disposedValue; @@ -55,6 +58,14 @@ namespace VNLib.Data.Caching.ObjectCache public AsyncQueue<ChangeEvent> EventQueue { get; } /// <summary> + /// The Cache store to access data blobs + /// </summary> + private readonly BlobCache Cache; + + private readonly IUnmangedHeap Heap; + + + /// <summary> /// Initialzies a new <see cref="ObjectCacheStore"/> /// </summary> /// <param name="dir">The <see cref="DirectoryInfo"/> to store blob files to</param> @@ -62,23 +73,25 @@ namespace VNLib.Data.Caching.ObjectCache /// <param name="log"></param> /// <param name="heap"></param> /// <param name="singleReader">A value that indicates if a single thread is processing events</param> - public ObjectCacheStore(DirectoryInfo dir, int cacheMax, ILogProvider log, IUnmangedHeap heap, bool singleReader) + public ObjectCacheStore(int cacheMax, ILogProvider log, IUnmangedHeap heap, bool singleReader) { Log = log; //We can use a single writer and single reader in this context EventQueue = new(true, singleReader); - InitCache(dir, cacheMax, heap); + Cache = new(cacheMax); + Heap = heap; InitListener(heap); StoreLock = new(1,1); } ///<inheritdoc/> - protected override async Task ProcessAsync(FBMContext context, object? userState, CancellationToken cancellationToken) + protected override Task ProcessAsync(FBMContext context, object? userState, CancellationToken exitToken) { try { //Get the action header string action = context.Method(); + //Optional newid header string? alternateId = context.NewObjectId(); @@ -88,82 +101,159 @@ namespace VNLib.Data.Caching.ObjectCache { //Get the object-id header string objectId = context.ObjectId(); - //Take lock on store - using SemSlimReleaser rel = await StoreLock.GetReleaserAsync(cancellationToken: cancellationToken); - if (Cache!.TryGetValue(objectId, out MemoryHandle<byte>? data)) + + //Try read sync + if (StoreLock.Wait(0)) { - //Set the status code and write the buffered data to the response buffer - context.CloseResponse(ResponseCodes.Okay); - //Copy data to response buffer - context.Response.WriteBody(data.Span); + try + { + UnsafeReadEntry(context, objectId); + } + finally + { + StoreLock.Release(); + } + + return Task.CompletedTask; } else { - context.CloseResponse(ResponseCodes.NotFound); + //Read entry async + return InternalReadEntryAsync(context, objectId, exitToken); } } - break; + case Actions.AddOrUpdate: { //Get the object-id header string objectId = context.ObjectId(); - //Add/update a blob async - await AddOrUpdateBlobAsync(objectId, alternateId, static context => context.Request.BodyData, context); - //Notify update the event bus - await EventQueue.EnqueueAsync(new(objectId, alternateId, false), cancellationToken); - //Set status code - context.CloseResponse(ResponseCodes.Okay); + + //Create change event for the object + ChangeEvent change = new(objectId, alternateId, false); + + //Attempt to aquire lock sync + if (StoreLock.Wait(0)) + { + //aquired sync + try + { + //Update the item + UnsafeAddOrUpdate(objectId, alternateId, GetBodyData, context); + } + finally + { + StoreLock.Release(); + } + + //Add to event queue + EnqueEvent(change); + + //Set status code + context.CloseResponse(ResponseCodes.Okay); + + return Task.CompletedTask; + } + else + { + //Lock will be awaited async and + return InternalAddOrUpdateAsync(context, change, exitToken); + } } - break; case Actions.Delete: { //Get the object-id header string objectId = context.ObjectId(); - - if (await DeleteItemAsync(objectId)) + + //Create change event + ChangeEvent change = new(objectId, alternateId, true); + + //See if lock can be entered without waiting + if (StoreLock.Wait(0)) { - //Notify deleted - await EventQueue.EnqueueAsync(new(objectId, null, true), cancellationToken); - //Set status header - context.CloseResponse(ResponseCodes.Okay); + bool found = false; + + try + { + //Sync + found = UnsafeDeleteEntry(objectId); + } + finally + { + StoreLock.Release(); + } + + //Notify change + EnqueEvent(change); + + //Set status code if found + context.CloseResponse(found ? ResponseCodes.Okay : ResponseCodes.NotFound); + + return Task.CompletedTask; } else { - //Set status header - context.CloseResponse(ResponseCodes.NotFound); + //lock will yeild async + return InternalDeleteAsync(context, change, exitToken); } } - break; // event queue dequeue request case Actions.Dequeue: { + static void SetResponse(ChangeEvent change, FBMContext context) + { + if (change.Deleted) + { + context.CloseResponse("deleted"); + context.Response.WriteHeader(ObjectId, change.CurrentId); + } + else + { + //Changed + context.CloseResponse("modified"); + context.Response.WriteHeader(ObjectId, change.CurrentId); + + //Set old id if an old id is set + if (change.CurrentId != null) + { + context.Response.WriteHeader(NewObjectId, change.AlternateId); + } + } + } + + static async Task DequeAsync(AsyncQueue<ChangeEvent> queue, FBMContext context, CancellationToken exitToken) + { + //Wait for a new message to process + ChangeEvent ev = await queue.DequeueAsync(exitToken); + + //Set the response + SetResponse(ev, context); + } + //If no event bus is registered, then this is not a legal command if (userState is not AsyncQueue<ChangeEvent> eventBus) { context.CloseResponse(ResponseCodes.NotFound); - break; + + return Task.CompletedTask; } - //Wait for a new message to process - ChangeEvent ev = await eventBus.DequeueAsync(cancellationToken); - if (ev.Deleted) + + //try to deq without awaiting + if (eventBus.TryDequeue(out ChangeEvent? change)) { - context.CloseResponse("deleted"); - context.Response.WriteHeader(ObjectId, ev.CurrentId); + SetResponse(change, context); + + return Task.CompletedTask; } else { - //Changed - context.CloseResponse("modified"); - context.Response.WriteHeader(ObjectId, ev.CurrentId); - //Set old id if an old id is set - if (ev.CurrentId != null) - { - context.Response.WriteHeader(NewObjectId, ev.AlternateId); - } + //Process async + return DequeAsync(eventBus, context, exitToken); } } - break; + } + + Log.Error("Unhandled cache event!"); } catch (OperationCanceledException) { @@ -175,68 +265,178 @@ namespace VNLib.Data.Caching.ObjectCache Log.Error(ex); context.CloseResponse(ResponseCodes.Error); } + + return Task.CompletedTask; } + + + private static ReadOnlySpan<byte> GetBodyData(FBMContext ctx) => ctx.Request.BodyData; - /// <summary> - /// Asynchronously deletes a previously stored item - /// </summary> - /// <param name="id">The id of the object to delete</param> - /// <returns>A task that completes when the item has been deleted</returns> - public async Task<bool> DeleteItemAsync(string id) + private void EnqueEvent(ChangeEvent change) { - using SemSlimReleaser rel = await StoreLock.GetReleaserAsync(); - return Cache!.Remove(id); + if (!EventQueue.TryEnque(change)) + { + Log.Warn("Change event {ev} was not enqued because the event queue is overflowing!", change.CurrentId); + } } - - /// <summary> - /// Asynchronously adds or updates an object in the store and optionally update's its id - /// </summary> - /// <param name="objectId">The current (or old) id of the object</param> - /// <param name="alternateId">An optional id to update the blob to</param> - /// <param name="bodyData">A callback that returns the data for the blob</param> - /// <param name="state">The state parameter to pass to the data callback</param> - /// <returns></returns> - public async Task AddOrUpdateBlobAsync<T>(string objectId, string? alternateId, GetBodyDataCallback<T> bodyData, T state) + + private void UnsafeReadEntry(FBMContext context, string objectId) { - MemoryHandle<byte>? blob; + if (Cache!.TryGetValue(objectId, out CacheEntry data)) + { + //Set the status code and write the buffered data to the response buffer + context.CloseResponse(ResponseCodes.Okay); + + //Copy data to response buffer + context.Response.WriteBody(data.GetDataSegment()); + } + else + { + context.CloseResponse(ResponseCodes.NotFound); + } + } + + async Task InternalReadEntryAsync(FBMContext context, string objectId, CancellationToken cancellation) + { + //enter lock async + using SemSlimReleaser rel = await StoreLock.GetReleaserAsync(cancellation); + + UnsafeReadEntry(context, objectId); + } + + private async Task InternalAddOrUpdateAsync(FBMContext context, ChangeEvent change, CancellationToken cancellation) + { + //Wait for lock since we know it will yeild async + using (SemSlimReleaser rel = await StoreLock.GetReleaserAsync(cancellation)) + { + UnsafeAddOrUpdate(change.CurrentId, change.AlternateId, GetBodyData, context); + } + + //Add to event queue + EnqueEvent(change); + + //Set status code + context.CloseResponse(ResponseCodes.Okay); + } + + private void UnsafeAddOrUpdate<T>(string objectId, string? alternateId, GetBodyDataCallback<T> bodyData, T state) + { + CacheEntry entry; + //See if new/alt session id was specified if (string.IsNullOrWhiteSpace(alternateId)) { - //Take lock on store - using SemSlimReleaser rel = await StoreLock.GetReleaserAsync(); //See if blob exists - if (!Cache!.TryGetValue(objectId, out blob)) + if (!Cache!.TryGetValue(objectId, out entry)) { - //If not, create new blob and add to store - blob = Heap.AllocAndCopy(bodyData(state)); - Cache.Add(objectId, blob); + //Create the new cache entry since it does not exist + entry = CacheEntry.Create(bodyData(state), Heap); + + //Add to cache + Cache.Add(objectId, entry); } else { //Reset the buffer state - blob.WriteAndResize(bodyData(state)); + entry.UpdateData(bodyData(state)); } } //Need to change the id of the record else { - //Take lock on store - using SemSlimReleaser rel = await StoreLock.GetReleaserAsync(); //Try to change the blob key - if (!Cache!.TryChangeKey(objectId, alternateId, out blob)) + if (!Cache!.TryChangeKey(objectId, alternateId, out entry)) { - //Blob not found, create new blob - blob = Heap.AllocAndCopy(bodyData(state)); - Cache.Add(alternateId, blob); + //Create the new cache entry since it does not exist + entry = CacheEntry.Create(bodyData(state), Heap); + + //Add to cache by its alternate id + Cache.Add(alternateId, entry); } else { //Reset the buffer state - blob.WriteAndResize(bodyData(state)); + entry.UpdateData(bodyData(state)); } } + + //Update modified time to current utc time + entry.SetTime(DateTime.UtcNow); + } + + private async Task InternalDeleteAsync(FBMContext context, ChangeEvent change, CancellationToken cancellation) + { + bool found = false; + + //enter the lock + using(SemSlimReleaser rel = await StoreLock.GetReleaserAsync(cancellation)) + { + //Sync + found = UnsafeDeleteEntry(change.CurrentId); + } + + //Notify change + EnqueEvent(change); + + //Set status code if found + context.CloseResponse(found ? ResponseCodes.Okay : ResponseCodes.NotFound); } - + + private bool UnsafeDeleteEntry(string id) => Cache!.Remove(id); + + + /// <summary> + /// Asynchronously adds or updates an object in the store and optionally update's its id + /// </summary> + /// <param name="objectId">The current (or old) id of the object</param> + /// <param name="alternateId">An optional id to update the blob to</param> + /// <param name="bodyData">A callback that returns the data for the blob</param> + /// <param name="state">The state parameter to pass to the data callback</param> + /// <param name="token">A token to cancel the async operation</param> + /// <returns>A value task that represents the async operation</returns> + public async ValueTask AddOrUpdateBlobAsync<T>(string objectId, string? alternateId, GetBodyDataCallback<T> bodyData, T state, CancellationToken token = default) + { + //Test the lock before waiting async + if (!StoreLock.Wait(0)) + { + //Wait async to avoid task alloc + await StoreLock.WaitAsync(token); + } + try + { + UnsafeAddOrUpdate(objectId, alternateId, bodyData, state); + } + finally + { + StoreLock.Release(); + } + } + + /// <summary> + /// Asynchronously deletes a previously stored item + /// </summary> + /// <param name="id">The id of the object to delete</param> + /// <param name="token">A token to cancel the async lock await</param> + /// <returns>A task that completes when the item has been deleted</returns> + public async ValueTask<bool> DeleteItemAsync(string id, CancellationToken token = default) + { + //Test the lock before waiting async + if (!StoreLock.Wait(0)) + { + //Wait async to avoid task alloc + await StoreLock.WaitAsync(token); + } + try + { + return UnsafeDeleteEntry(id); + } + finally + { + StoreLock.Release(); + } + } + + ///<inheritdoc/> protected virtual void Dispose(bool disposing) { @@ -252,6 +452,7 @@ namespace VNLib.Data.Caching.ObjectCache disposedValue = true; } } + ///<inheritdoc/> public void Dispose() { |