diff options
Diffstat (limited to 'VNLib.Data.Caching.ObjectCache')
3 files changed, 303 insertions, 0 deletions
diff --git a/VNLib.Data.Caching.ObjectCache/ChangeEvent.cs b/VNLib.Data.Caching.ObjectCache/ChangeEvent.cs new file mode 100644 index 0000000..b61b4c2 --- /dev/null +++ b/VNLib.Data.Caching.ObjectCache/ChangeEvent.cs @@ -0,0 +1,19 @@ + +namespace VNLib.Data.Caching.ObjectCache +{ + /// <summary> + /// An event object that is passed when change events occur + /// </summary> + public class ChangeEvent + { + public readonly string CurrentId; + public readonly string? AlternateId; + public readonly bool Deleted; + internal ChangeEvent(string id, string? alternate, bool deleted) + { + CurrentId = id; + AlternateId = alternate; + Deleted = deleted; + } + } +} diff --git a/VNLib.Data.Caching.ObjectCache/ObjectCacheStore.cs b/VNLib.Data.Caching.ObjectCache/ObjectCacheStore.cs new file mode 100644 index 0000000..3fd4999 --- /dev/null +++ b/VNLib.Data.Caching.ObjectCache/ObjectCacheStore.cs @@ -0,0 +1,237 @@ +using System; +using System.IO; +using System.Threading; +using System.Threading.Tasks; + +using VNLib.Utils.IO; +using VNLib.Utils.Async; +using VNLib.Utils.Memory; +using VNLib.Utils.Logging; +using VNLib.Utils.Extensions; +using VNLib.Net.Messaging.FBM.Server; +using static VNLib.Data.Caching.Constants; + +namespace VNLib.Data.Caching.ObjectCache +{ + public delegate ReadOnlySpan<byte> GetBodyDataCallback<T>(T state); + + /// <summary> + /// A <see cref="FBMListener"/> implementation of a <see cref="CacheListener"/> + /// </summary> + public class ObjectCacheStore : CacheListener, IDisposable + { + private readonly SemaphoreSlim StoreLock; + private bool disposedValue; + + ///<inheritdoc/> + protected override ILogProvider Log { get; } + + /// <summary> + /// A queue that stores update and delete events + /// </summary> + public AsyncQueue<ChangeEvent> EventQueue { get; } + + /// <summary> + /// Initialzies a new <see cref="ObjectCacheStore"/> + /// </summary> + /// <param name="dir">The <see cref="DirectoryInfo"/> to store blob files to</param> + /// <param name="cacheMax"></param> + /// <param name="log"></param> + /// <param name="heap"></param> + /// <param name="singleReader">A value that indicates if a single thread is processing events</param> + public ObjectCacheStore(DirectoryInfo dir, int cacheMax, ILogProvider log, IUnmangedHeap heap, bool singleReader) + { + Log = log; + //We can use a single writer and single reader in this context + EventQueue = new(true, singleReader); + InitCache(dir, cacheMax, heap); + InitListener(heap); + StoreLock = new(1,1); + } + + ///<inheritdoc/> + protected override async Task ProcessAsync(FBMContext context, object? userState, CancellationToken cancellationToken) + { + try + { + //Get the action header + string action = context.Method(); + //Optional newid header + string? alternateId = context.NewObjectId(); + + switch (action) + { + case Actions.Get: + { + //Get the object-id header + string objectId = context.ObjectId(); + //Take lock on store + using SemSlimReleaser rel = await StoreLock.GetReleaserAsync(cancellationToken: cancellationToken); + if (Cache!.TryGetValue(objectId, out MemoryHandle<byte>? data)) + { + //Set the status code and write the buffered data to the response buffer + context.CloseResponse(ResponseCodes.Okay); + //Copy data to response buffer + context.Response.WriteBody(data.Span); + } + else + { + context.CloseResponse(ResponseCodes.NotFound); + } + } + break; + case Actions.AddOrUpdate: + { + //Get the object-id header + string objectId = context.ObjectId(); + //Add/update a blob async + await AddOrUpdateBlobAsync(objectId, alternateId, static context => context.Request.BodyData, context); + //Notify update the event bus + await EventQueue.EnqueueAsync(new(objectId, alternateId, false), cancellationToken); + //Set status code + context.CloseResponse(ResponseCodes.Okay); + } + break; + case Actions.Delete: + { + //Get the object-id header + string objectId = context.ObjectId(); + + if (await DeleteItemAsync(objectId)) + { + //Notify deleted + await EventQueue.EnqueueAsync(new(objectId, null, true), cancellationToken); + //Set status header + context.CloseResponse(ResponseCodes.Okay); + } + else + { + //Set status header + context.CloseResponse(ResponseCodes.NotFound); + } + } + break; + // event queue dequeue request + case Actions.Dequeue: + { + //If no event bus is registered, then this is not a legal command + if (userState is not AsyncQueue<ChangeEvent> eventBus) + { + context.CloseResponse(ResponseCodes.NotFound); + break; + } + //Wait for a new message to process + ChangeEvent ev = await eventBus.DequeueAsync(cancellationToken); + if (ev.Deleted) + { + context.CloseResponse("deleted"); + context.Response.WriteHeader(ObjectId, ev.CurrentId); + } + else + { + //Changed + context.CloseResponse("modified"); + context.Response.WriteHeader(ObjectId, ev.CurrentId); + //Set old id if an old id is set + if (ev.CurrentId != null) + { + context.Response.WriteHeader(NewObjectId, ev.AlternateId); + } + } + } + break; + } + } + catch (OperationCanceledException) + { + throw; + } + catch(Exception ex) + { + //Log error and set error status code + Log.Error(ex); + context.CloseResponse(ResponseCodes.Error); + } + } + + /// <summary> + /// Asynchronously deletes a previously stored item + /// </summary> + /// <param name="id">The id of the object to delete</param> + /// <returns>A task that completes when the item has been deleted</returns> + public async Task<bool> DeleteItemAsync(string id) + { + using SemSlimReleaser rel = await StoreLock.GetReleaserAsync(); + return Cache!.Remove(id); + } + + /// <summary> + /// Asynchronously adds or updates an object in the store and optionally update's its id + /// </summary> + /// <param name="objectId">The current (or old) id of the object</param> + /// <param name="alternateId">An optional id to update the blob to</param> + /// <param name="bodyData">A callback that returns the data for the blob</param> + /// <param name="state">The state parameter to pass to the data callback</param> + /// <returns></returns> + public async Task AddOrUpdateBlobAsync<T>(string objectId, string? alternateId, GetBodyDataCallback<T> bodyData, T state) + { + MemoryHandle<byte>? blob; + //See if new/alt session id was specified + if (string.IsNullOrWhiteSpace(alternateId)) + { + //Take lock on store + using SemSlimReleaser rel = await StoreLock.GetReleaserAsync(); + //See if blob exists + if (!Cache!.TryGetValue(objectId, out blob)) + { + //If not, create new blob and add to store + blob = Heap.AllocAndCopy(bodyData(state)); + Cache.Add(objectId, blob); + } + else + { + //Reset the buffer state + blob.WriteAndResize(bodyData(state)); + } + } + //Need to change the id of the record + else + { + //Take lock on store + using SemSlimReleaser rel = await StoreLock.GetReleaserAsync(); + //Try to change the blob key + if (!Cache!.TryChangeKey(objectId, alternateId, out blob)) + { + //Blob not found, create new blob + blob = Heap.AllocAndCopy(bodyData(state)); + Cache.Add(alternateId, blob); + } + else + { + //Reset the buffer state + blob.WriteAndResize(bodyData(state)); + } + } + } + + ///<inheritdoc/> + protected virtual void Dispose(bool disposing) + { + if (!disposedValue) + { + if (disposing) + { + Cache?.Clear(); + } + disposedValue = true; + } + } + ///<inheritdoc/> + public void Dispose() + { + // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method + Dispose(disposing: true); + GC.SuppressFinalize(this); + } + } +} diff --git a/VNLib.Data.Caching.ObjectCache/VNLib.Data.Caching.ObjectCache.csproj b/VNLib.Data.Caching.ObjectCache/VNLib.Data.Caching.ObjectCache.csproj new file mode 100644 index 0000000..389cabd --- /dev/null +++ b/VNLib.Data.Caching.ObjectCache/VNLib.Data.Caching.ObjectCache.csproj @@ -0,0 +1,47 @@ +<Project Sdk="Microsoft.NET.Sdk"> + + <PropertyGroup> + <TargetFramework>net6.0</TargetFramework> + <Platforms>AnyCPU;x64</Platforms> + <Authors>Vaughn Nugent</Authors> + <Copyright>Copyright © 2022 Vaughn Nugent</Copyright> + <Nullable>enable</Nullable> + <GenerateDocumentationFile>True</GenerateDocumentationFile> + <PackageProjectUrl>www.vaughnnugent.com/resources</PackageProjectUrl> + <AssemblyVersion>1.0.0.1</AssemblyVersion> + </PropertyGroup> + + <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|x64'"> + <DocumentationFile></DocumentationFile> + <CheckForOverflowUnderflow>False</CheckForOverflowUnderflow> + </PropertyGroup> + + <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'"> + <CheckForOverflowUnderflow>False</CheckForOverflowUnderflow> + </PropertyGroup> + + <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|AnyCPU'"> + <CheckForOverflowUnderflow>False</CheckForOverflowUnderflow> + </PropertyGroup> + + <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|x64'"> + <CheckForOverflowUnderflow>False</CheckForOverflowUnderflow> + </PropertyGroup> + + <ItemGroup> + <PackageReference Include="ErrorProne.NET.CoreAnalyzers" Version="0.1.2"> + <PrivateAssets>all</PrivateAssets> + <IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets> + </PackageReference> + <PackageReference Include="ErrorProne.NET.Structs" Version="0.1.2"> + <PrivateAssets>all</PrivateAssets> + <IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets> + </PackageReference> + </ItemGroup> + + <ItemGroup> + <ProjectReference Include="..\..\..\VNLib\Utils\src\VNLib.Utils.csproj" /> + <ProjectReference Include="..\VNLib.Data.Caching\src\VNLib.Data.Caching.csproj" /> + </ItemGroup> + +</Project> |