/* * Copyright (c) 2022 Vaughn Nugent * * Library: VNLib * Package: VNLib.Data.Caching.ObjectCache * File: ObjectCacheStore.cs * * ObjectCacheStore.cs is part of VNLib.Data.Caching.ObjectCache which is part of the larger * VNLib collection of libraries and utilities. * * VNLib.Data.Caching.ObjectCache is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License as * published by the Free Software Foundation, either version 3 of the * License, or (at your option) any later version. * * VNLib.Data.Caching.ObjectCache is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see https://www.gnu.org/licenses/. */ using System; using System.IO; using System.Threading; using System.Threading.Tasks; using VNLib.Utils.Async; using VNLib.Utils.Memory; using VNLib.Utils.Logging; using VNLib.Utils.Extensions; using VNLib.Net.Messaging.FBM.Server; using static VNLib.Data.Caching.Constants; #pragma warning disable CA1849 // Call async methods when in an async method namespace VNLib.Data.Caching.ObjectCache { public delegate ReadOnlySpan GetBodyDataCallback(T state); /// /// A implementation of a /// public class ObjectCacheStore : FBMListenerBase, IDisposable { private readonly SemaphoreSlim StoreLock; private bool disposedValue; /// protected override ILogProvider Log { get; } /// /// A queue that stores update and delete events /// public AsyncQueue EventQueue { get; } /// /// The Cache store to access data blobs /// private readonly BlobCache Cache; private readonly IUnmangedHeap Heap; /// /// Initialzies a new /// /// /// /// /// A value that indicates if a single thread is processing events public ObjectCacheStore(int cacheMax, ILogProvider log, IUnmangedHeap heap, bool singleReader) { Log = log; //We can use a single writer and single reader in this context EventQueue = new(true, singleReader); Cache = new(cacheMax); Heap = heap; InitListener(heap); StoreLock = new(1,1); } /// protected override Task ProcessAsync(FBMContext context, object? userState, CancellationToken exitToken) { try { //Get the action header string action = context.Method(); //Optional newid header string? alternateId = context.NewObjectId(); switch (action) { case Actions.Get: { //Get the object-id header string objectId = context.ObjectId(); //Try read sync if (StoreLock.Wait(0)) { try { UnsafeReadEntry(context, objectId); } finally { StoreLock.Release(); } return Task.CompletedTask; } else { //Read entry async return InternalReadEntryAsync(context, objectId, exitToken); } } case Actions.AddOrUpdate: { //Get the object-id header string objectId = context.ObjectId(); //Create change event for the object ChangeEvent change = new(objectId, alternateId, false); //Attempt to aquire lock sync if (StoreLock.Wait(0)) { //aquired sync try { //Update the item UnsafeAddOrUpdate(objectId, alternateId, GetBodyData, context); } finally { StoreLock.Release(); } //Add to event queue EnqueEvent(change); //Set status code context.CloseResponse(ResponseCodes.Okay); return Task.CompletedTask; } else { //Lock will be awaited async and return InternalAddOrUpdateAsync(context, change, exitToken); } } case Actions.Delete: { //Get the object-id header string objectId = context.ObjectId(); //Create change event ChangeEvent change = new(objectId, alternateId, true); //See if lock can be entered without waiting if (StoreLock.Wait(0)) { bool found = false; try { //Sync found = UnsafeDeleteEntry(objectId); } finally { StoreLock.Release(); } //Notify change EnqueEvent(change); //Set status code if found context.CloseResponse(found ? ResponseCodes.Okay : ResponseCodes.NotFound); return Task.CompletedTask; } else { //lock will yeild async return InternalDeleteAsync(context, change, exitToken); } } // event queue dequeue request case Actions.Dequeue: { static void SetResponse(ChangeEvent change, FBMContext context) { if (change.Deleted) { context.CloseResponse("deleted"); context.Response.WriteHeader(ObjectId, change.CurrentId); } else { //Changed context.CloseResponse("modified"); context.Response.WriteHeader(ObjectId, change.CurrentId); //Set old id if an old id is set if (change.CurrentId != null) { context.Response.WriteHeader(NewObjectId, change.AlternateId); } } } static async Task DequeAsync(AsyncQueue queue, FBMContext context, CancellationToken exitToken) { //Wait for a new message to process ChangeEvent ev = await queue.DequeueAsync(exitToken); //Set the response SetResponse(ev, context); } //If no event bus is registered, then this is not a legal command if (userState is not AsyncQueue eventBus) { context.CloseResponse(ResponseCodes.NotFound); return Task.CompletedTask; } //try to deq without awaiting if (eventBus.TryDequeue(out ChangeEvent? change)) { SetResponse(change, context); return Task.CompletedTask; } else { //Process async return DequeAsync(eventBus, context, exitToken); } } } Log.Error("Unhandled cache event!"); } catch (OperationCanceledException) { throw; } catch(Exception ex) { //Log error and set error status code Log.Error(ex); context.CloseResponse(ResponseCodes.Error); } return Task.CompletedTask; } private static ReadOnlySpan GetBodyData(FBMContext ctx) => ctx.Request.BodyData; private void EnqueEvent(ChangeEvent change) { if (!EventQueue.TryEnque(change)) { Log.Warn("Change event {ev} was not enqued because the event queue is overflowing!", change.CurrentId); } } private void UnsafeReadEntry(FBMContext context, string objectId) { if (Cache!.TryGetValue(objectId, out CacheEntry data)) { //Set the status code and write the buffered data to the response buffer context.CloseResponse(ResponseCodes.Okay); //Copy data to response buffer context.Response.WriteBody(data.GetDataSegment()); } else { context.CloseResponse(ResponseCodes.NotFound); } } async Task InternalReadEntryAsync(FBMContext context, string objectId, CancellationToken cancellation) { //enter lock async using SemSlimReleaser rel = await StoreLock.GetReleaserAsync(cancellation); UnsafeReadEntry(context, objectId); } private async Task InternalAddOrUpdateAsync(FBMContext context, ChangeEvent change, CancellationToken cancellation) { //Wait for lock since we know it will yeild async using (SemSlimReleaser rel = await StoreLock.GetReleaserAsync(cancellation)) { UnsafeAddOrUpdate(change.CurrentId, change.AlternateId, GetBodyData, context); } //Add to event queue EnqueEvent(change); //Set status code context.CloseResponse(ResponseCodes.Okay); } private void UnsafeAddOrUpdate(string objectId, string? alternateId, GetBodyDataCallback bodyData, T state) { CacheEntry entry; //See if new/alt session id was specified if (string.IsNullOrWhiteSpace(alternateId)) { //See if blob exists if (!Cache!.TryGetValue(objectId, out entry)) { //Create the new cache entry since it does not exist entry = CacheEntry.Create(bodyData(state), Heap); //Add to cache Cache.Add(objectId, entry); } else { //Reset the buffer state entry.UpdateData(bodyData(state)); } } //Need to change the id of the record else { //Try to change the blob key if (!Cache!.TryChangeKey(objectId, alternateId, out entry)) { //Create the new cache entry since it does not exist entry = CacheEntry.Create(bodyData(state), Heap); //Add to cache by its alternate id Cache.Add(alternateId, entry); } else { //Reset the buffer state entry.UpdateData(bodyData(state)); } } //Update modified time to current utc time entry.SetTime(DateTime.UtcNow); } private async Task InternalDeleteAsync(FBMContext context, ChangeEvent change, CancellationToken cancellation) { bool found = false; //enter the lock using(SemSlimReleaser rel = await StoreLock.GetReleaserAsync(cancellation)) { //Sync found = UnsafeDeleteEntry(change.CurrentId); } //Notify change EnqueEvent(change); //Set status code if found context.CloseResponse(found ? ResponseCodes.Okay : ResponseCodes.NotFound); } private bool UnsafeDeleteEntry(string id) => Cache!.Remove(id); /// /// Asynchronously adds or updates an object in the store and optionally update's its id /// /// The current (or old) id of the object /// An optional id to update the blob to /// A callback that returns the data for the blob /// The state parameter to pass to the data callback /// A token to cancel the async operation /// A value task that represents the async operation public async ValueTask AddOrUpdateBlobAsync(string objectId, string? alternateId, GetBodyDataCallback bodyData, T state, CancellationToken token = default) { //Test the lock before waiting async if (!StoreLock.Wait(0)) { //Wait async to avoid task alloc await StoreLock.WaitAsync(token); } try { UnsafeAddOrUpdate(objectId, alternateId, bodyData, state); } finally { StoreLock.Release(); } } /// /// Asynchronously deletes a previously stored item /// /// The id of the object to delete /// A token to cancel the async lock await /// A task that completes when the item has been deleted public async ValueTask DeleteItemAsync(string id, CancellationToken token = default) { //Test the lock before waiting async if (!StoreLock.Wait(0)) { //Wait async to avoid task alloc await StoreLock.WaitAsync(token); } try { return UnsafeDeleteEntry(id); } finally { StoreLock.Release(); } } /// protected virtual void Dispose(bool disposing) { if (!disposedValue) { if (disposing) { Cache?.Clear(); } StoreLock.Dispose(); disposedValue = true; } } /// public void Dispose() { // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method Dispose(disposing: true); GC.SuppressFinalize(this); } } }