diff options
author | vnugent <public@vaughnnugent.com> | 2023-01-12 17:47:40 -0500 |
---|---|---|
committer | vnugent <public@vaughnnugent.com> | 2023-01-12 17:47:40 -0500 |
commit | b75668b164d398b99ee942beced06aa27ef65a50 (patch) | |
tree | c1faf6df3caa78083dcc38eb1a7247e456bbe754 /VNLib.Data.Caching.ObjectCache/ObjectCacheStore.cs | |
parent | cea64e619e714f6dbe51d37ca8329b58d8c271fb (diff) |
Large project reorder and consolidation
Diffstat (limited to 'VNLib.Data.Caching.ObjectCache/ObjectCacheStore.cs')
-rw-r--r-- | VNLib.Data.Caching.ObjectCache/ObjectCacheStore.cs | 263 |
1 files changed, 0 insertions, 263 deletions
diff --git a/VNLib.Data.Caching.ObjectCache/ObjectCacheStore.cs b/VNLib.Data.Caching.ObjectCache/ObjectCacheStore.cs deleted file mode 100644 index 514d00b..0000000 --- a/VNLib.Data.Caching.ObjectCache/ObjectCacheStore.cs +++ /dev/null @@ -1,263 +0,0 @@ -/* -* Copyright (c) 2022 Vaughn Nugent -* -* Library: VNLib -* Package: VNLib.Data.Caching.ObjectCache -* File: ObjectCacheStore.cs -* -* ObjectCacheStore.cs is part of VNLib.Data.Caching.ObjectCache which is part of the larger -* VNLib collection of libraries and utilities. -* -* VNLib.Data.Caching.ObjectCache is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License as -* published by the Free Software Foundation, either version 3 of the -* License, or (at your option) any later version. -* -* VNLib.Data.Caching.ObjectCache is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see https://www.gnu.org/licenses/. -*/ - -using System; -using System.IO; -using System.Threading; -using System.Threading.Tasks; - -using VNLib.Utils.Async; -using VNLib.Utils.Memory; -using VNLib.Utils.Logging; -using VNLib.Utils.Extensions; -using VNLib.Net.Messaging.FBM.Server; -using static VNLib.Data.Caching.Constants; - -namespace VNLib.Data.Caching.ObjectCache -{ - public delegate ReadOnlySpan<byte> GetBodyDataCallback<T>(T state); - - /// <summary> - /// A <see cref="FBMListener"/> implementation of a <see cref="CacheListener"/> - /// </summary> - public class ObjectCacheStore : CacheListener, IDisposable - { - private readonly SemaphoreSlim StoreLock; - private bool disposedValue; - - ///<inheritdoc/> - protected override ILogProvider Log { get; } - - /// <summary> - /// A queue that stores update and delete events - /// </summary> - public AsyncQueue<ChangeEvent> EventQueue { get; } - - /// <summary> - /// Initialzies a new <see cref="ObjectCacheStore"/> - /// </summary> - /// <param name="dir">The <see cref="DirectoryInfo"/> to store blob files to</param> - /// <param name="cacheMax"></param> - /// <param name="log"></param> - /// <param name="heap"></param> - /// <param name="singleReader">A value that indicates if a single thread is processing events</param> - public ObjectCacheStore(DirectoryInfo dir, int cacheMax, ILogProvider log, IUnmangedHeap heap, bool singleReader) - { - Log = log; - //We can use a single writer and single reader in this context - EventQueue = new(true, singleReader); - InitCache(dir, cacheMax, heap); - InitListener(heap); - StoreLock = new(1,1); - } - - ///<inheritdoc/> - protected override async Task ProcessAsync(FBMContext context, object? userState, CancellationToken cancellationToken) - { - try - { - //Get the action header - string action = context.Method(); - //Optional newid header - string? alternateId = context.NewObjectId(); - - switch (action) - { - case Actions.Get: - { - //Get the object-id header - string objectId = context.ObjectId(); - //Take lock on store - using SemSlimReleaser rel = await StoreLock.GetReleaserAsync(cancellationToken: cancellationToken); - if (Cache!.TryGetValue(objectId, out MemoryHandle<byte>? data)) - { - //Set the status code and write the buffered data to the response buffer - context.CloseResponse(ResponseCodes.Okay); - //Copy data to response buffer - context.Response.WriteBody(data.Span); - } - else - { - context.CloseResponse(ResponseCodes.NotFound); - } - } - break; - case Actions.AddOrUpdate: - { - //Get the object-id header - string objectId = context.ObjectId(); - //Add/update a blob async - await AddOrUpdateBlobAsync(objectId, alternateId, static context => context.Request.BodyData, context); - //Notify update the event bus - await EventQueue.EnqueueAsync(new(objectId, alternateId, false), cancellationToken); - //Set status code - context.CloseResponse(ResponseCodes.Okay); - } - break; - case Actions.Delete: - { - //Get the object-id header - string objectId = context.ObjectId(); - - if (await DeleteItemAsync(objectId)) - { - //Notify deleted - await EventQueue.EnqueueAsync(new(objectId, null, true), cancellationToken); - //Set status header - context.CloseResponse(ResponseCodes.Okay); - } - else - { - //Set status header - context.CloseResponse(ResponseCodes.NotFound); - } - } - break; - // event queue dequeue request - case Actions.Dequeue: - { - //If no event bus is registered, then this is not a legal command - if (userState is not AsyncQueue<ChangeEvent> eventBus) - { - context.CloseResponse(ResponseCodes.NotFound); - break; - } - //Wait for a new message to process - ChangeEvent ev = await eventBus.DequeueAsync(cancellationToken); - if (ev.Deleted) - { - context.CloseResponse("deleted"); - context.Response.WriteHeader(ObjectId, ev.CurrentId); - } - else - { - //Changed - context.CloseResponse("modified"); - context.Response.WriteHeader(ObjectId, ev.CurrentId); - //Set old id if an old id is set - if (ev.CurrentId != null) - { - context.Response.WriteHeader(NewObjectId, ev.AlternateId); - } - } - } - break; - } - } - catch (OperationCanceledException) - { - throw; - } - catch(Exception ex) - { - //Log error and set error status code - Log.Error(ex); - context.CloseResponse(ResponseCodes.Error); - } - } - - /// <summary> - /// Asynchronously deletes a previously stored item - /// </summary> - /// <param name="id">The id of the object to delete</param> - /// <returns>A task that completes when the item has been deleted</returns> - public async Task<bool> DeleteItemAsync(string id) - { - using SemSlimReleaser rel = await StoreLock.GetReleaserAsync(); - return Cache!.Remove(id); - } - - /// <summary> - /// Asynchronously adds or updates an object in the store and optionally update's its id - /// </summary> - /// <param name="objectId">The current (or old) id of the object</param> - /// <param name="alternateId">An optional id to update the blob to</param> - /// <param name="bodyData">A callback that returns the data for the blob</param> - /// <param name="state">The state parameter to pass to the data callback</param> - /// <returns></returns> - public async Task AddOrUpdateBlobAsync<T>(string objectId, string? alternateId, GetBodyDataCallback<T> bodyData, T state) - { - MemoryHandle<byte>? blob; - //See if new/alt session id was specified - if (string.IsNullOrWhiteSpace(alternateId)) - { - //Take lock on store - using SemSlimReleaser rel = await StoreLock.GetReleaserAsync(); - //See if blob exists - if (!Cache!.TryGetValue(objectId, out blob)) - { - //If not, create new blob and add to store - blob = Heap.AllocAndCopy(bodyData(state)); - Cache.Add(objectId, blob); - } - else - { - //Reset the buffer state - blob.WriteAndResize(bodyData(state)); - } - } - //Need to change the id of the record - else - { - //Take lock on store - using SemSlimReleaser rel = await StoreLock.GetReleaserAsync(); - //Try to change the blob key - if (!Cache!.TryChangeKey(objectId, alternateId, out blob)) - { - //Blob not found, create new blob - blob = Heap.AllocAndCopy(bodyData(state)); - Cache.Add(alternateId, blob); - } - else - { - //Reset the buffer state - blob.WriteAndResize(bodyData(state)); - } - } - } - - ///<inheritdoc/> - protected virtual void Dispose(bool disposing) - { - if (!disposedValue) - { - if (disposing) - { - Cache?.Clear(); - } - - StoreLock.Dispose(); - - disposedValue = true; - } - } - ///<inheritdoc/> - public void Dispose() - { - // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method - Dispose(disposing: true); - GC.SuppressFinalize(this); - } - } -} |