aboutsummaryrefslogtreecommitdiff
path: root/lib/VNLib.Data.Caching.ObjectCache/src
diff options
context:
space:
mode:
Diffstat (limited to 'lib/VNLib.Data.Caching.ObjectCache/src')
-rw-r--r--lib/VNLib.Data.Caching.ObjectCache/src/BlobCacheExtensions.cs39
-rw-r--r--lib/VNLib.Data.Caching.ObjectCache/src/BlobCacheLIstener.cs25
-rw-r--r--lib/VNLib.Data.Caching.ObjectCache/src/ICacheListenerEventQueue.cs9
3 files changed, 46 insertions, 27 deletions
diff --git a/lib/VNLib.Data.Caching.ObjectCache/src/BlobCacheExtensions.cs b/lib/VNLib.Data.Caching.ObjectCache/src/BlobCacheExtensions.cs
index a114236..ded89d2 100644
--- a/lib/VNLib.Data.Caching.ObjectCache/src/BlobCacheExtensions.cs
+++ b/lib/VNLib.Data.Caching.ObjectCache/src/BlobCacheExtensions.cs
@@ -129,7 +129,8 @@ namespace VNLib.Data.Caching.ObjectCache
/// <summary>
- /// Asynchronously adds or updates an object in the store and optionally update's it's id
+ /// Asynchronously adds or updates an object in the store and optionally update's it's id.
+ /// If the alternate key already exists, it's data is overwritten.
/// </summary>
/// <param name="table"></param>
/// <param name="objectId">The current (or old) id of the object</param>
@@ -143,7 +144,7 @@ namespace VNLib.Data.Caching.ObjectCache
this IBlobCacheTable table,
string objectId,
string? alternateId,
- GetBodyDataCallback<T> bodyData,
+ ObjectDataReader<T> bodyData,
T state,
DateTime time,
CancellationToken cancellation = default)
@@ -202,23 +203,43 @@ namespace VNLib.Data.Caching.ObjectCache
{
try
{
- //Update the handle data and reuse the entry
- entry.UpdateData(bodyData(state));
+ //Try to see if the alternate key already exists
+ if (alternateHandle.Cache.TryGetValue(alternateId, out CacheEntry existing))
+ {
+ existing.UpdateData(bodyData(state));
- //Add the updated entry to the alternate table
- alternateHandle.Cache.Add(alternateId, entry);
+ //dispose the old entry since we don't need it
+ entry.Dispose();
+ }
+ else
+ {
+ //Update the entry buffer and reuse the entry
+ entry.UpdateData(bodyData(state));
+
+ //Add the updated entry to the alternate table
+ alternateHandle.Cache.Add(alternateId, entry);
+ }
}
catch
{
- //Cleanup handle if error adding
+ //Cleanup removed entry if error adding
entry.Dispose();
throw;
}
}
else
{
- //Old entry did not exist, we need to create a new entry for the alternate bucket
- _ = alternateHandle.Cache.CreateEntry(alternateId, bodyData(state), time);
+ //Try to see if the alternate key already exists in the target store
+ if (alternateHandle.Cache.TryGetValue(alternateId, out CacheEntry existing))
+ {
+ //overwrite the existing entry data
+ existing.UpdateData(bodyData(state));
+ }
+ else
+ {
+ //Old entry did not exist, we need to create a new entry for the alternate bucket
+ _ = alternateHandle.Cache.CreateEntry(alternateId, bodyData(state), time);
+ }
}
}
}
diff --git a/lib/VNLib.Data.Caching.ObjectCache/src/BlobCacheLIstener.cs b/lib/VNLib.Data.Caching.ObjectCache/src/BlobCacheLIstener.cs
index d69c6bb..5139746 100644
--- a/lib/VNLib.Data.Caching.ObjectCache/src/BlobCacheLIstener.cs
+++ b/lib/VNLib.Data.Caching.ObjectCache/src/BlobCacheLIstener.cs
@@ -42,57 +42,54 @@ using System;
using System.Threading;
using System.Threading.Tasks;
-using VNLib.Utils.Memory;
using VNLib.Utils.Logging;
+using VNLib.Net.Messaging.FBM;
using VNLib.Net.Messaging.FBM.Server;
using static VNLib.Data.Caching.Constants;
namespace VNLib.Data.Caching.ObjectCache
{
- public delegate ReadOnlySpan<byte> GetBodyDataCallback<T>(T state);
/// <summary>
/// An <see cref="FBMListener"/> for key-value object data caching servers.
/// </summary>
- public class BlobCacheListener : FBMListenerBase, IDisposable
+ public class BlobCacheListener<T> : FBMListenerBase<T>, IDisposable
{
private bool disposedValue;
///<inheritdoc/>
protected override ILogProvider Log { get; }
+ ///<inheritdoc/>
+ protected override FBMListener Listener { get; }
/// <summary>
/// A queue that stores update and delete events
/// </summary>
- public ICacheListenerEventQueue EventQueue { get; }
+ public ICacheListenerEventQueue<T> EventQueue { get; }
/// <summary>
/// The Cache store to access data blobs
/// </summary>
public IBlobCacheTable Cache { get; }
-
/// <summary>
- /// Initialzies a new <see cref="BlobCacheListener"/>
+ /// Initialzies a new <see cref="BlobCacheListener{T}"/>
/// </summary>
/// <param name="cache">The cache table to work from</param>
/// <param name="queue">The event queue to publish changes to</param>
/// <param name="log">Writes error and debug logging information</param>
- /// <param name="heap">The heap to alloc FBM buffers and <see cref="CacheEntry"/> cache buffers from</param>
+ /// <param name="memoryManager">The heap to alloc FBM buffers and <see cref="CacheEntry"/> cache buffers from</param>
/// <exception cref="ArgumentNullException"></exception>
- public BlobCacheListener(IBlobCacheTable cache, ICacheListenerEventQueue queue, ILogProvider log, IUnmangedHeap heap)
+ public BlobCacheListener(IBlobCacheTable cache, ICacheListenerEventQueue<T> queue, ILogProvider log, IFBMMemoryManager memoryManager)
{
- Log = log;
-
+ Log = log;
Cache = cache ?? throw new ArgumentNullException(nameof(cache));
-
EventQueue = queue ?? throw new ArgumentNullException(nameof(queue));
-
- InitListener(heap);
+ Listener = new(memoryManager);
}
///<inheritdoc/>
- protected override async Task ProcessAsync(FBMContext context, object? userState, CancellationToken exitToken)
+ protected override async Task ProcessAsync(FBMContext context, T? userState, CancellationToken exitToken)
{
try
{
diff --git a/lib/VNLib.Data.Caching.ObjectCache/src/ICacheListenerEventQueue.cs b/lib/VNLib.Data.Caching.ObjectCache/src/ICacheListenerEventQueue.cs
index 439de94..9f4146d 100644
--- a/lib/VNLib.Data.Caching.ObjectCache/src/ICacheListenerEventQueue.cs
+++ b/lib/VNLib.Data.Caching.ObjectCache/src/ICacheListenerEventQueue.cs
@@ -32,14 +32,15 @@ namespace VNLib.Data.Caching.ObjectCache
/// <summary>
/// Represents a single client's event queue
/// </summary>
- public interface ICacheListenerEventQueue
+ /// <typeparam name="T">The user state parameter type</typeparam>
+ public interface ICacheListenerEventQueue<T>
{
/// <summary>
/// Determines if the queue is enabled for the given user state
/// </summary>
/// <param name="userState">The unique state of the connection</param>
/// <returns>True if event queuing is enabled</returns>
- bool IsEnabled([NotNullWhen(true)] object? userState);
+ bool IsEnabled([NotNullWhen(true)] T? userState);
/// <summary>
/// Attempts to dequeue a single event from the queue without blocking
@@ -47,7 +48,7 @@ namespace VNLib.Data.Caching.ObjectCache
/// <param name="userState">A user state object to associate with the wait operation</param>
/// <param name="changeEvent">The dequeued event if successfully dequeued</param>
/// <returns>True if an event was waiting and could be dequeued, false otherwise</returns>
- bool TryDequeue(object userState, out ChangeEvent changeEvent);
+ bool TryDequeue(T userState, out ChangeEvent changeEvent);
/// <summary>
/// Waits asynchronously for an event to be dequeued
@@ -55,7 +56,7 @@ namespace VNLib.Data.Caching.ObjectCache
/// <param name="userState">A user state object to associate with the wait operation</param>
/// <param name="cancellation">A token to cancel the wait operation</param>
/// <returns>The <see cref="ChangeEvent"/> that as a result of the dequeue operation</returns>
- ValueTask<ChangeEvent> DequeueAsync(object userState, CancellationToken cancellation);
+ ValueTask<ChangeEvent> DequeueAsync(T userState, CancellationToken cancellation);
/// <summary>
/// Publishes an event to the queue