aboutsummaryrefslogtreecommitdiff
path: root/VNLib.Data.Caching.ObjectCache
diff options
context:
space:
mode:
Diffstat (limited to 'VNLib.Data.Caching.ObjectCache')
-rw-r--r--VNLib.Data.Caching.ObjectCache/ChangeEvent.cs19
-rw-r--r--VNLib.Data.Caching.ObjectCache/ObjectCacheStore.cs237
-rw-r--r--VNLib.Data.Caching.ObjectCache/VNLib.Data.Caching.ObjectCache.csproj47
3 files changed, 303 insertions, 0 deletions
diff --git a/VNLib.Data.Caching.ObjectCache/ChangeEvent.cs b/VNLib.Data.Caching.ObjectCache/ChangeEvent.cs
new file mode 100644
index 0000000..b61b4c2
--- /dev/null
+++ b/VNLib.Data.Caching.ObjectCache/ChangeEvent.cs
@@ -0,0 +1,19 @@
+
+namespace VNLib.Data.Caching.ObjectCache
+{
+ /// <summary>
+ /// An event object that is passed when change events occur
+ /// </summary>
+ public class ChangeEvent
+ {
+ public readonly string CurrentId;
+ public readonly string? AlternateId;
+ public readonly bool Deleted;
+ internal ChangeEvent(string id, string? alternate, bool deleted)
+ {
+ CurrentId = id;
+ AlternateId = alternate;
+ Deleted = deleted;
+ }
+ }
+}
diff --git a/VNLib.Data.Caching.ObjectCache/ObjectCacheStore.cs b/VNLib.Data.Caching.ObjectCache/ObjectCacheStore.cs
new file mode 100644
index 0000000..3fd4999
--- /dev/null
+++ b/VNLib.Data.Caching.ObjectCache/ObjectCacheStore.cs
@@ -0,0 +1,237 @@
+using System;
+using System.IO;
+using System.Threading;
+using System.Threading.Tasks;
+
+using VNLib.Utils.IO;
+using VNLib.Utils.Async;
+using VNLib.Utils.Memory;
+using VNLib.Utils.Logging;
+using VNLib.Utils.Extensions;
+using VNLib.Net.Messaging.FBM.Server;
+using static VNLib.Data.Caching.Constants;
+
+namespace VNLib.Data.Caching.ObjectCache
+{
+ public delegate ReadOnlySpan<byte> GetBodyDataCallback<T>(T state);
+
+ /// <summary>
+ /// A <see cref="FBMListener"/> implementation of a <see cref="CacheListener"/>
+ /// </summary>
+ public class ObjectCacheStore : CacheListener, IDisposable
+ {
+ private readonly SemaphoreSlim StoreLock;
+ private bool disposedValue;
+
+ ///<inheritdoc/>
+ protected override ILogProvider Log { get; }
+
+ /// <summary>
+ /// A queue that stores update and delete events
+ /// </summary>
+ public AsyncQueue<ChangeEvent> EventQueue { get; }
+
+ /// <summary>
+ /// Initialzies a new <see cref="ObjectCacheStore"/>
+ /// </summary>
+ /// <param name="dir">The <see cref="DirectoryInfo"/> to store blob files to</param>
+ /// <param name="cacheMax"></param>
+ /// <param name="log"></param>
+ /// <param name="heap"></param>
+ /// <param name="singleReader">A value that indicates if a single thread is processing events</param>
+ public ObjectCacheStore(DirectoryInfo dir, int cacheMax, ILogProvider log, IUnmangedHeap heap, bool singleReader)
+ {
+ Log = log;
+ //We can use a single writer and single reader in this context
+ EventQueue = new(true, singleReader);
+ InitCache(dir, cacheMax, heap);
+ InitListener(heap);
+ StoreLock = new(1,1);
+ }
+
+ ///<inheritdoc/>
+ protected override async Task ProcessAsync(FBMContext context, object? userState, CancellationToken cancellationToken)
+ {
+ try
+ {
+ //Get the action header
+ string action = context.Method();
+ //Optional newid header
+ string? alternateId = context.NewObjectId();
+
+ switch (action)
+ {
+ case Actions.Get:
+ {
+ //Get the object-id header
+ string objectId = context.ObjectId();
+ //Take lock on store
+ using SemSlimReleaser rel = await StoreLock.GetReleaserAsync(cancellationToken: cancellationToken);
+ if (Cache!.TryGetValue(objectId, out MemoryHandle<byte>? data))
+ {
+ //Set the status code and write the buffered data to the response buffer
+ context.CloseResponse(ResponseCodes.Okay);
+ //Copy data to response buffer
+ context.Response.WriteBody(data.Span);
+ }
+ else
+ {
+ context.CloseResponse(ResponseCodes.NotFound);
+ }
+ }
+ break;
+ case Actions.AddOrUpdate:
+ {
+ //Get the object-id header
+ string objectId = context.ObjectId();
+ //Add/update a blob async
+ await AddOrUpdateBlobAsync(objectId, alternateId, static context => context.Request.BodyData, context);
+ //Notify update the event bus
+ await EventQueue.EnqueueAsync(new(objectId, alternateId, false), cancellationToken);
+ //Set status code
+ context.CloseResponse(ResponseCodes.Okay);
+ }
+ break;
+ case Actions.Delete:
+ {
+ //Get the object-id header
+ string objectId = context.ObjectId();
+
+ if (await DeleteItemAsync(objectId))
+ {
+ //Notify deleted
+ await EventQueue.EnqueueAsync(new(objectId, null, true), cancellationToken);
+ //Set status header
+ context.CloseResponse(ResponseCodes.Okay);
+ }
+ else
+ {
+ //Set status header
+ context.CloseResponse(ResponseCodes.NotFound);
+ }
+ }
+ break;
+ // event queue dequeue request
+ case Actions.Dequeue:
+ {
+ //If no event bus is registered, then this is not a legal command
+ if (userState is not AsyncQueue<ChangeEvent> eventBus)
+ {
+ context.CloseResponse(ResponseCodes.NotFound);
+ break;
+ }
+ //Wait for a new message to process
+ ChangeEvent ev = await eventBus.DequeueAsync(cancellationToken);
+ if (ev.Deleted)
+ {
+ context.CloseResponse("deleted");
+ context.Response.WriteHeader(ObjectId, ev.CurrentId);
+ }
+ else
+ {
+ //Changed
+ context.CloseResponse("modified");
+ context.Response.WriteHeader(ObjectId, ev.CurrentId);
+ //Set old id if an old id is set
+ if (ev.CurrentId != null)
+ {
+ context.Response.WriteHeader(NewObjectId, ev.AlternateId);
+ }
+ }
+ }
+ break;
+ }
+ }
+ catch (OperationCanceledException)
+ {
+ throw;
+ }
+ catch(Exception ex)
+ {
+ //Log error and set error status code
+ Log.Error(ex);
+ context.CloseResponse(ResponseCodes.Error);
+ }
+ }
+
+ /// <summary>
+ /// Asynchronously deletes a previously stored item
+ /// </summary>
+ /// <param name="id">The id of the object to delete</param>
+ /// <returns>A task that completes when the item has been deleted</returns>
+ public async Task<bool> DeleteItemAsync(string id)
+ {
+ using SemSlimReleaser rel = await StoreLock.GetReleaserAsync();
+ return Cache!.Remove(id);
+ }
+
+ /// <summary>
+ /// Asynchronously adds or updates an object in the store and optionally update's its id
+ /// </summary>
+ /// <param name="objectId">The current (or old) id of the object</param>
+ /// <param name="alternateId">An optional id to update the blob to</param>
+ /// <param name="bodyData">A callback that returns the data for the blob</param>
+ /// <param name="state">The state parameter to pass to the data callback</param>
+ /// <returns></returns>
+ public async Task AddOrUpdateBlobAsync<T>(string objectId, string? alternateId, GetBodyDataCallback<T> bodyData, T state)
+ {
+ MemoryHandle<byte>? blob;
+ //See if new/alt session id was specified
+ if (string.IsNullOrWhiteSpace(alternateId))
+ {
+ //Take lock on store
+ using SemSlimReleaser rel = await StoreLock.GetReleaserAsync();
+ //See if blob exists
+ if (!Cache!.TryGetValue(objectId, out blob))
+ {
+ //If not, create new blob and add to store
+ blob = Heap.AllocAndCopy(bodyData(state));
+ Cache.Add(objectId, blob);
+ }
+ else
+ {
+ //Reset the buffer state
+ blob.WriteAndResize(bodyData(state));
+ }
+ }
+ //Need to change the id of the record
+ else
+ {
+ //Take lock on store
+ using SemSlimReleaser rel = await StoreLock.GetReleaserAsync();
+ //Try to change the blob key
+ if (!Cache!.TryChangeKey(objectId, alternateId, out blob))
+ {
+ //Blob not found, create new blob
+ blob = Heap.AllocAndCopy(bodyData(state));
+ Cache.Add(alternateId, blob);
+ }
+ else
+ {
+ //Reset the buffer state
+ blob.WriteAndResize(bodyData(state));
+ }
+ }
+ }
+
+ ///<inheritdoc/>
+ protected virtual void Dispose(bool disposing)
+ {
+ if (!disposedValue)
+ {
+ if (disposing)
+ {
+ Cache?.Clear();
+ }
+ disposedValue = true;
+ }
+ }
+ ///<inheritdoc/>
+ public void Dispose()
+ {
+ // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method
+ Dispose(disposing: true);
+ GC.SuppressFinalize(this);
+ }
+ }
+}
diff --git a/VNLib.Data.Caching.ObjectCache/VNLib.Data.Caching.ObjectCache.csproj b/VNLib.Data.Caching.ObjectCache/VNLib.Data.Caching.ObjectCache.csproj
new file mode 100644
index 0000000..389cabd
--- /dev/null
+++ b/VNLib.Data.Caching.ObjectCache/VNLib.Data.Caching.ObjectCache.csproj
@@ -0,0 +1,47 @@
+<Project Sdk="Microsoft.NET.Sdk">
+
+ <PropertyGroup>
+ <TargetFramework>net6.0</TargetFramework>
+ <Platforms>AnyCPU;x64</Platforms>
+ <Authors>Vaughn Nugent</Authors>
+ <Copyright>Copyright © 2022 Vaughn Nugent</Copyright>
+ <Nullable>enable</Nullable>
+ <GenerateDocumentationFile>True</GenerateDocumentationFile>
+ <PackageProjectUrl>www.vaughnnugent.com/resources</PackageProjectUrl>
+ <AssemblyVersion>1.0.0.1</AssemblyVersion>
+ </PropertyGroup>
+
+ <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">
+ <DocumentationFile></DocumentationFile>
+ <CheckForOverflowUnderflow>False</CheckForOverflowUnderflow>
+ </PropertyGroup>
+
+ <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'">
+ <CheckForOverflowUnderflow>False</CheckForOverflowUnderflow>
+ </PropertyGroup>
+
+ <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|AnyCPU'">
+ <CheckForOverflowUnderflow>False</CheckForOverflowUnderflow>
+ </PropertyGroup>
+
+ <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|x64'">
+ <CheckForOverflowUnderflow>False</CheckForOverflowUnderflow>
+ </PropertyGroup>
+
+ <ItemGroup>
+ <PackageReference Include="ErrorProne.NET.CoreAnalyzers" Version="0.1.2">
+ <PrivateAssets>all</PrivateAssets>
+ <IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
+ </PackageReference>
+ <PackageReference Include="ErrorProne.NET.Structs" Version="0.1.2">
+ <PrivateAssets>all</PrivateAssets>
+ <IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
+ </PackageReference>
+ </ItemGroup>
+
+ <ItemGroup>
+ <ProjectReference Include="..\..\..\VNLib\Utils\src\VNLib.Utils.csproj" />
+ <ProjectReference Include="..\VNLib.Data.Caching\src\VNLib.Data.Caching.csproj" />
+ </ItemGroup>
+
+</Project>