diff options
Diffstat (limited to 'lib/VNLib.Data.Caching.ObjectCache/src')
3 files changed, 46 insertions, 27 deletions
diff --git a/lib/VNLib.Data.Caching.ObjectCache/src/BlobCacheExtensions.cs b/lib/VNLib.Data.Caching.ObjectCache/src/BlobCacheExtensions.cs index a114236..ded89d2 100644 --- a/lib/VNLib.Data.Caching.ObjectCache/src/BlobCacheExtensions.cs +++ b/lib/VNLib.Data.Caching.ObjectCache/src/BlobCacheExtensions.cs @@ -129,7 +129,8 @@ namespace VNLib.Data.Caching.ObjectCache /// <summary> - /// Asynchronously adds or updates an object in the store and optionally update's it's id + /// Asynchronously adds or updates an object in the store and optionally update's it's id. + /// If the alternate key already exists, it's data is overwritten. /// </summary> /// <param name="table"></param> /// <param name="objectId">The current (or old) id of the object</param> @@ -143,7 +144,7 @@ namespace VNLib.Data.Caching.ObjectCache this IBlobCacheTable table, string objectId, string? alternateId, - GetBodyDataCallback<T> bodyData, + ObjectDataReader<T> bodyData, T state, DateTime time, CancellationToken cancellation = default) @@ -202,23 +203,43 @@ namespace VNLib.Data.Caching.ObjectCache { try { - //Update the handle data and reuse the entry - entry.UpdateData(bodyData(state)); + //Try to see if the alternate key already exists + if (alternateHandle.Cache.TryGetValue(alternateId, out CacheEntry existing)) + { + existing.UpdateData(bodyData(state)); - //Add the updated entry to the alternate table - alternateHandle.Cache.Add(alternateId, entry); + //dispose the old entry since we don't need it + entry.Dispose(); + } + else + { + //Update the entry buffer and reuse the entry + entry.UpdateData(bodyData(state)); + + //Add the updated entry to the alternate table + alternateHandle.Cache.Add(alternateId, entry); + } } catch { - //Cleanup handle if error adding + //Cleanup removed entry if error adding entry.Dispose(); throw; } } else { - //Old entry did not exist, we need to create a new entry for the alternate bucket - _ = alternateHandle.Cache.CreateEntry(alternateId, bodyData(state), time); + //Try to see if the alternate key already exists in the target store + if (alternateHandle.Cache.TryGetValue(alternateId, out CacheEntry existing)) + { + //overwrite the existing entry data + existing.UpdateData(bodyData(state)); + } + else + { + //Old entry did not exist, we need to create a new entry for the alternate bucket + _ = alternateHandle.Cache.CreateEntry(alternateId, bodyData(state), time); + } } } } diff --git a/lib/VNLib.Data.Caching.ObjectCache/src/BlobCacheLIstener.cs b/lib/VNLib.Data.Caching.ObjectCache/src/BlobCacheLIstener.cs index d69c6bb..5139746 100644 --- a/lib/VNLib.Data.Caching.ObjectCache/src/BlobCacheLIstener.cs +++ b/lib/VNLib.Data.Caching.ObjectCache/src/BlobCacheLIstener.cs @@ -42,57 +42,54 @@ using System; using System.Threading; using System.Threading.Tasks; -using VNLib.Utils.Memory; using VNLib.Utils.Logging; +using VNLib.Net.Messaging.FBM; using VNLib.Net.Messaging.FBM.Server; using static VNLib.Data.Caching.Constants; namespace VNLib.Data.Caching.ObjectCache { - public delegate ReadOnlySpan<byte> GetBodyDataCallback<T>(T state); /// <summary> /// An <see cref="FBMListener"/> for key-value object data caching servers. /// </summary> - public class BlobCacheListener : FBMListenerBase, IDisposable + public class BlobCacheListener<T> : FBMListenerBase<T>, IDisposable { private bool disposedValue; ///<inheritdoc/> protected override ILogProvider Log { get; } + ///<inheritdoc/> + protected override FBMListener Listener { get; } /// <summary> /// A queue that stores update and delete events /// </summary> - public ICacheListenerEventQueue EventQueue { get; } + public ICacheListenerEventQueue<T> EventQueue { get; } /// <summary> /// The Cache store to access data blobs /// </summary> public IBlobCacheTable Cache { get; } - /// <summary> - /// Initialzies a new <see cref="BlobCacheListener"/> + /// Initialzies a new <see cref="BlobCacheListener{T}"/> /// </summary> /// <param name="cache">The cache table to work from</param> /// <param name="queue">The event queue to publish changes to</param> /// <param name="log">Writes error and debug logging information</param> - /// <param name="heap">The heap to alloc FBM buffers and <see cref="CacheEntry"/> cache buffers from</param> + /// <param name="memoryManager">The heap to alloc FBM buffers and <see cref="CacheEntry"/> cache buffers from</param> /// <exception cref="ArgumentNullException"></exception> - public BlobCacheListener(IBlobCacheTable cache, ICacheListenerEventQueue queue, ILogProvider log, IUnmangedHeap heap) + public BlobCacheListener(IBlobCacheTable cache, ICacheListenerEventQueue<T> queue, ILogProvider log, IFBMMemoryManager memoryManager) { - Log = log; - + Log = log; Cache = cache ?? throw new ArgumentNullException(nameof(cache)); - EventQueue = queue ?? throw new ArgumentNullException(nameof(queue)); - - InitListener(heap); + Listener = new(memoryManager); } ///<inheritdoc/> - protected override async Task ProcessAsync(FBMContext context, object? userState, CancellationToken exitToken) + protected override async Task ProcessAsync(FBMContext context, T? userState, CancellationToken exitToken) { try { diff --git a/lib/VNLib.Data.Caching.ObjectCache/src/ICacheListenerEventQueue.cs b/lib/VNLib.Data.Caching.ObjectCache/src/ICacheListenerEventQueue.cs index 439de94..9f4146d 100644 --- a/lib/VNLib.Data.Caching.ObjectCache/src/ICacheListenerEventQueue.cs +++ b/lib/VNLib.Data.Caching.ObjectCache/src/ICacheListenerEventQueue.cs @@ -32,14 +32,15 @@ namespace VNLib.Data.Caching.ObjectCache /// <summary> /// Represents a single client's event queue /// </summary> - public interface ICacheListenerEventQueue + /// <typeparam name="T">The user state parameter type</typeparam> + public interface ICacheListenerEventQueue<T> { /// <summary> /// Determines if the queue is enabled for the given user state /// </summary> /// <param name="userState">The unique state of the connection</param> /// <returns>True if event queuing is enabled</returns> - bool IsEnabled([NotNullWhen(true)] object? userState); + bool IsEnabled([NotNullWhen(true)] T? userState); /// <summary> /// Attempts to dequeue a single event from the queue without blocking @@ -47,7 +48,7 @@ namespace VNLib.Data.Caching.ObjectCache /// <param name="userState">A user state object to associate with the wait operation</param> /// <param name="changeEvent">The dequeued event if successfully dequeued</param> /// <returns>True if an event was waiting and could be dequeued, false otherwise</returns> - bool TryDequeue(object userState, out ChangeEvent changeEvent); + bool TryDequeue(T userState, out ChangeEvent changeEvent); /// <summary> /// Waits asynchronously for an event to be dequeued @@ -55,7 +56,7 @@ namespace VNLib.Data.Caching.ObjectCache /// <param name="userState">A user state object to associate with the wait operation</param> /// <param name="cancellation">A token to cancel the wait operation</param> /// <returns>The <see cref="ChangeEvent"/> that as a result of the dequeue operation</returns> - ValueTask<ChangeEvent> DequeueAsync(object userState, CancellationToken cancellation); + ValueTask<ChangeEvent> DequeueAsync(T userState, CancellationToken cancellation); /// <summary> /// Publishes an event to the queue |