aboutsummaryrefslogtreecommitdiff
path: root/lib/VNLib.Data.Caching.ObjectCache/src/ObjectCacheStore.cs
diff options
context:
space:
mode:
Diffstat (limited to 'lib/VNLib.Data.Caching.ObjectCache/src/ObjectCacheStore.cs')
-rw-r--r--lib/VNLib.Data.Caching.ObjectCache/src/ObjectCacheStore.cs463
1 files changed, 0 insertions, 463 deletions
diff --git a/lib/VNLib.Data.Caching.ObjectCache/src/ObjectCacheStore.cs b/lib/VNLib.Data.Caching.ObjectCache/src/ObjectCacheStore.cs
deleted file mode 100644
index af1e730..0000000
--- a/lib/VNLib.Data.Caching.ObjectCache/src/ObjectCacheStore.cs
+++ /dev/null
@@ -1,463 +0,0 @@
-/*
-* Copyright (c) 2022 Vaughn Nugent
-*
-* Library: VNLib
-* Package: VNLib.Data.Caching.ObjectCache
-* File: ObjectCacheStore.cs
-*
-* ObjectCacheStore.cs is part of VNLib.Data.Caching.ObjectCache which is part of the larger
-* VNLib collection of libraries and utilities.
-*
-* VNLib.Data.Caching.ObjectCache is free software: you can redistribute it and/or modify
-* it under the terms of the GNU Affero General Public License as
-* published by the Free Software Foundation, either version 3 of the
-* License, or (at your option) any later version.
-*
-* VNLib.Data.Caching.ObjectCache is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-* GNU Affero General Public License for more details.
-*
-* You should have received a copy of the GNU Affero General Public License
-* along with this program. If not, see https://www.gnu.org/licenses/.
-*/
-
-using System;
-using System.IO;
-using System.Threading;
-using System.Threading.Tasks;
-
-using VNLib.Utils.Async;
-using VNLib.Utils.Memory;
-using VNLib.Utils.Logging;
-using VNLib.Utils.Extensions;
-using VNLib.Net.Messaging.FBM.Server;
-using static VNLib.Data.Caching.Constants;
-
-
-#pragma warning disable CA1849 // Call async methods when in an async method
-
-namespace VNLib.Data.Caching.ObjectCache
-{
- public delegate ReadOnlySpan<byte> GetBodyDataCallback<T>(T state);
-
- /// <summary>
- /// A <see cref="FBMListener"/> implementation of a <see cref="CacheListener"/>
- /// </summary>
- public class ObjectCacheStore : FBMListenerBase, IDisposable
- {
- private readonly SemaphoreSlim StoreLock;
- private bool disposedValue;
-
- ///<inheritdoc/>
- protected override ILogProvider Log { get; }
-
- /// <summary>
- /// A queue that stores update and delete events
- /// </summary>
- public AsyncQueue<ChangeEvent> EventQueue { get; }
-
- /// <summary>
- /// The Cache store to access data blobs
- /// </summary>
- private readonly BlobCache Cache;
-
- private readonly IUnmangedHeap Heap;
-
-
- /// <summary>
- /// Initialzies a new <see cref="ObjectCacheStore"/>
- /// </summary>
- /// <param name="cacheMax"></param>
- /// <param name="log"></param>
- /// <param name="heap"></param>
- /// <param name="singleReader">A value that indicates if a single thread is processing events</param>
- public ObjectCacheStore(int cacheMax, ILogProvider log, IUnmangedHeap heap, bool singleReader)
- {
- Log = log;
- //We can use a single writer and single reader in this context
- EventQueue = new(true, singleReader);
- Cache = new(cacheMax);
- Heap = heap;
- InitListener(heap);
- StoreLock = new(1,1);
- }
-
- ///<inheritdoc/>
- protected override Task ProcessAsync(FBMContext context, object? userState, CancellationToken exitToken)
- {
- try
- {
- //Get the action header
- string action = context.Method();
-
- //Optional newid header
- string? alternateId = context.NewObjectId();
-
- switch (action)
- {
- case Actions.Get:
- {
- //Get the object-id header
- string objectId = context.ObjectId();
-
- //Try read sync
- if (StoreLock.Wait(0))
- {
- try
- {
- UnsafeReadEntry(context, objectId);
- }
- finally
- {
- StoreLock.Release();
- }
-
- return Task.CompletedTask;
- }
- else
- {
- //Read entry async
- return InternalReadEntryAsync(context, objectId, exitToken);
- }
- }
-
- case Actions.AddOrUpdate:
- {
- //Get the object-id header
- string objectId = context.ObjectId();
-
- //Create change event for the object
- ChangeEvent change = new(objectId, alternateId, false);
-
- //Attempt to aquire lock sync
- if (StoreLock.Wait(0))
- {
- //aquired sync
- try
- {
- //Update the item
- UnsafeAddOrUpdate(objectId, alternateId, GetBodyData, context);
- }
- finally
- {
- StoreLock.Release();
- }
-
- //Add to event queue
- EnqueEvent(change);
-
- //Set status code
- context.CloseResponse(ResponseCodes.Okay);
-
- return Task.CompletedTask;
- }
- else
- {
- //Lock will be awaited async and
- return InternalAddOrUpdateAsync(context, change, exitToken);
- }
- }
- case Actions.Delete:
- {
- //Get the object-id header
- string objectId = context.ObjectId();
-
- //Create change event
- ChangeEvent change = new(objectId, alternateId, true);
-
- //See if lock can be entered without waiting
- if (StoreLock.Wait(0))
- {
- bool found = false;
-
- try
- {
- //Sync
- found = UnsafeDeleteEntry(objectId);
- }
- finally
- {
- StoreLock.Release();
- }
-
- //Notify change
- EnqueEvent(change);
-
- //Set status code if found
- context.CloseResponse(found ? ResponseCodes.Okay : ResponseCodes.NotFound);
-
- return Task.CompletedTask;
- }
- else
- {
- //lock will yeild async
- return InternalDeleteAsync(context, change, exitToken);
- }
- }
- // event queue dequeue request
- case Actions.Dequeue:
- {
- static void SetResponse(ChangeEvent change, FBMContext context)
- {
- if (change.Deleted)
- {
- context.CloseResponse("deleted");
- context.Response.WriteHeader(ObjectId, change.CurrentId);
- }
- else
- {
- //Changed
- context.CloseResponse("modified");
- context.Response.WriteHeader(ObjectId, change.CurrentId);
-
- //Set old id if an old id is set
- if (change.CurrentId != null)
- {
- context.Response.WriteHeader(NewObjectId, change.AlternateId);
- }
- }
- }
-
- static async Task DequeAsync(AsyncQueue<ChangeEvent> queue, FBMContext context, CancellationToken exitToken)
- {
- //Wait for a new message to process
- ChangeEvent ev = await queue.DequeueAsync(exitToken);
-
- //Set the response
- SetResponse(ev, context);
- }
-
- //If no event bus is registered, then this is not a legal command
- if (userState is not AsyncQueue<ChangeEvent> eventBus)
- {
- context.CloseResponse(ResponseCodes.NotFound);
-
- return Task.CompletedTask;
- }
-
- //try to deq without awaiting
- if (eventBus.TryDequeue(out ChangeEvent? change))
- {
- SetResponse(change, context);
-
- return Task.CompletedTask;
- }
- else
- {
- //Process async
- return DequeAsync(eventBus, context, exitToken);
- }
- }
-
- }
-
- Log.Error("Unhandled cache event!");
- }
- catch (OperationCanceledException)
- {
- throw;
- }
- catch(Exception ex)
- {
- //Log error and set error status code
- Log.Error(ex);
- context.CloseResponse(ResponseCodes.Error);
- }
-
- return Task.CompletedTask;
- }
-
-
- private static ReadOnlySpan<byte> GetBodyData(FBMContext ctx) => ctx.Request.BodyData;
-
- private void EnqueEvent(ChangeEvent change)
- {
- if (!EventQueue.TryEnque(change))
- {
- Log.Warn("Change event {ev} was not enqued because the event queue is overflowing!", change.CurrentId);
- }
- }
-
- private void UnsafeReadEntry(FBMContext context, string objectId)
- {
- if (Cache!.TryGetValue(objectId, out CacheEntry data))
- {
- //Set the status code and write the buffered data to the response buffer
- context.CloseResponse(ResponseCodes.Okay);
-
- //Copy data to response buffer
- context.Response.WriteBody(data.GetDataSegment());
- }
- else
- {
- context.CloseResponse(ResponseCodes.NotFound);
- }
- }
-
- async Task InternalReadEntryAsync(FBMContext context, string objectId, CancellationToken cancellation)
- {
- //enter lock async
- using SemSlimReleaser rel = await StoreLock.GetReleaserAsync(cancellation);
-
- UnsafeReadEntry(context, objectId);
- }
-
- private async Task InternalAddOrUpdateAsync(FBMContext context, ChangeEvent change, CancellationToken cancellation)
- {
- //Wait for lock since we know it will yeild async
- using (SemSlimReleaser rel = await StoreLock.GetReleaserAsync(cancellation))
- {
- UnsafeAddOrUpdate(change.CurrentId, change.AlternateId, GetBodyData, context);
- }
-
- //Add to event queue
- EnqueEvent(change);
-
- //Set status code
- context.CloseResponse(ResponseCodes.Okay);
- }
-
- private void UnsafeAddOrUpdate<T>(string objectId, string? alternateId, GetBodyDataCallback<T> bodyData, T state)
- {
- CacheEntry entry;
-
- //See if new/alt session id was specified
- if (string.IsNullOrWhiteSpace(alternateId))
- {
- //See if blob exists
- if (!Cache!.TryGetValue(objectId, out entry))
- {
- //Create the new cache entry since it does not exist
- entry = CacheEntry.Create(bodyData(state), Heap);
-
- //Add to cache
- Cache.Add(objectId, entry);
- }
- else
- {
- //Reset the buffer state
- entry.UpdateData(bodyData(state));
- }
- }
- //Need to change the id of the record
- else
- {
- //Try to change the blob key
- if (!Cache!.TryChangeKey(objectId, alternateId, out entry))
- {
- //Create the new cache entry since it does not exist
- entry = CacheEntry.Create(bodyData(state), Heap);
-
- //Add to cache by its alternate id
- Cache.Add(alternateId, entry);
- }
- else
- {
- //Reset the buffer state
- entry.UpdateData(bodyData(state));
- }
- }
-
- //Update modified time to current utc time
- entry.SetTime(DateTime.UtcNow);
- }
-
- private async Task InternalDeleteAsync(FBMContext context, ChangeEvent change, CancellationToken cancellation)
- {
- bool found = false;
-
- //enter the lock
- using(SemSlimReleaser rel = await StoreLock.GetReleaserAsync(cancellation))
- {
- //Sync
- found = UnsafeDeleteEntry(change.CurrentId);
- }
-
- //Notify change
- EnqueEvent(change);
-
- //Set status code if found
- context.CloseResponse(found ? ResponseCodes.Okay : ResponseCodes.NotFound);
- }
-
- private bool UnsafeDeleteEntry(string id) => Cache!.Remove(id);
-
-
- /// <summary>
- /// Asynchronously adds or updates an object in the store and optionally update's its id
- /// </summary>
- /// <param name="objectId">The current (or old) id of the object</param>
- /// <param name="alternateId">An optional id to update the blob to</param>
- /// <param name="bodyData">A callback that returns the data for the blob</param>
- /// <param name="state">The state parameter to pass to the data callback</param>
- /// <param name="token">A token to cancel the async operation</param>
- /// <returns>A value task that represents the async operation</returns>
- public async ValueTask AddOrUpdateBlobAsync<T>(string objectId, string? alternateId, GetBodyDataCallback<T> bodyData, T state, CancellationToken token = default)
- {
- //Test the lock before waiting async
- if (!StoreLock.Wait(0))
- {
- //Wait async to avoid task alloc
- await StoreLock.WaitAsync(token);
- }
- try
- {
- UnsafeAddOrUpdate(objectId, alternateId, bodyData, state);
- }
- finally
- {
- StoreLock.Release();
- }
- }
-
- /// <summary>
- /// Asynchronously deletes a previously stored item
- /// </summary>
- /// <param name="id">The id of the object to delete</param>
- /// <param name="token">A token to cancel the async lock await</param>
- /// <returns>A task that completes when the item has been deleted</returns>
- public async ValueTask<bool> DeleteItemAsync(string id, CancellationToken token = default)
- {
- //Test the lock before waiting async
- if (!StoreLock.Wait(0))
- {
- //Wait async to avoid task alloc
- await StoreLock.WaitAsync(token);
- }
- try
- {
- return UnsafeDeleteEntry(id);
- }
- finally
- {
- StoreLock.Release();
- }
- }
-
-
- ///<inheritdoc/>
- protected virtual void Dispose(bool disposing)
- {
- if (!disposedValue)
- {
- if (disposing)
- {
- Cache?.Clear();
- }
-
- StoreLock.Dispose();
-
- disposedValue = true;
- }
- }
-
- ///<inheritdoc/>
- public void Dispose()
- {
- // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method
- Dispose(disposing: true);
- GC.SuppressFinalize(this);
- }
- }
-}