aboutsummaryrefslogtreecommitdiff
path: root/lib/VNLib.Data.Caching.ObjectCache
diff options
context:
space:
mode:
authorLibravatar vnugent <public@vaughnnugent.com>2023-06-22 21:16:28 -0400
committerLibravatar vnugent <public@vaughnnugent.com>2023-06-22 21:16:28 -0400
commit1a8ab1457244d15b19ddcc94958f645f5ec2abc7 (patch)
tree3994806e0737cf6f519a72cca8836c6e81eac7e2 /lib/VNLib.Data.Caching.ObjectCache
parentdc0fc53fd3c3f6c32c8b0d063922c7018fa2c48f (diff)
Save checkpoint
Diffstat (limited to 'lib/VNLib.Data.Caching.ObjectCache')
-rw-r--r--lib/VNLib.Data.Caching.ObjectCache/src/BlobCacheLIstener.cs38
-rw-r--r--lib/VNLib.Data.Caching.ObjectCache/src/ICacheListenerEventQueue.cs65
2 files changed, 78 insertions, 25 deletions
diff --git a/lib/VNLib.Data.Caching.ObjectCache/src/BlobCacheLIstener.cs b/lib/VNLib.Data.Caching.ObjectCache/src/BlobCacheLIstener.cs
index f69c2a4..0fe0663 100644
--- a/lib/VNLib.Data.Caching.ObjectCache/src/BlobCacheLIstener.cs
+++ b/lib/VNLib.Data.Caching.ObjectCache/src/BlobCacheLIstener.cs
@@ -42,7 +42,6 @@ using System;
using System.Threading;
using System.Threading.Tasks;
-using VNLib.Utils.Async;
using VNLib.Utils.Memory;
using VNLib.Utils.Logging;
using VNLib.Net.Messaging.FBM.Server;
@@ -65,7 +64,7 @@ namespace VNLib.Data.Caching.ObjectCache
/// <summary>
/// A queue that stores update and delete events
/// </summary>
- public AsyncQueue<ChangeEvent> EventQueue { get; }
+ public ICacheListenerEventQueue EventQueue { get; }
/// <summary>
/// The Cache store to access data blobs
@@ -77,18 +76,17 @@ namespace VNLib.Data.Caching.ObjectCache
/// Initialzies a new <see cref="BlobCacheListener"/>
/// </summary>
/// <param name="cache">The cache table to work from</param>
+ /// <param name="queue">The event queue to publish changes to</param>
/// <param name="log">Writes error and debug logging information</param>
/// <param name="heap">The heap to alloc FBM buffers and <see cref="CacheEntry"/> cache buffers from</param>
- /// <param name="singleReader">A value that indicates if a single thread is processing events</param>
/// <exception cref="ArgumentNullException"></exception>
- public BlobCacheListener(IBlobCacheTable cache, ILogProvider log, IUnmangedHeap heap, bool singleReader)
+ public BlobCacheListener(IBlobCacheTable cache, ICacheListenerEventQueue queue, ILogProvider log, IUnmangedHeap heap)
{
Log = log;
Cache = cache ?? throw new ArgumentNullException(nameof(cache));
- //Writes may happen from multple threads with bucket design and no lock
- EventQueue = new(false, singleReader);
+ EventQueue = queue ?? throw new ArgumentNullException(nameof(queue));
InitListener(heap);
}
@@ -161,32 +159,25 @@ namespace VNLib.Data.Caching.ObjectCache
}
}
- static async Task DequeAsync(AsyncQueue<ChangeEvent> queue, FBMContext context, CancellationToken exitToken)
- {
- //Wait for a new message to process
- ChangeEvent ev = await queue.DequeueAsync(exitToken);
-
- //Set the response
- SetResponse(ev, context);
- }
-
- //If no event bus is registered, then this is not a legal command
- if (userState is not AsyncQueue<ChangeEvent> eventBus)
+ //Determine if the queue is enabled for the user
+ if(!EventQueue.IsEnabled(userState!))
{
context.CloseResponse(ResponseCodes.NotFound);
-
return;
}
//try to deq without awaiting
- if (eventBus.TryDequeue(out ChangeEvent? change))
+ if (EventQueue.TryDequeue(userState!, out ChangeEvent? change))
{
SetResponse(change, context);
}
else
{
- //Process async
- await DequeAsync(eventBus, context, exitToken);
+ //Wait for a new message to process
+ ChangeEvent ev = await EventQueue.DequeueAsync(userState!, exitToken);
+
+ //Set the response
+ SetResponse(ev, context);
}
return;
@@ -257,10 +248,7 @@ namespace VNLib.Data.Caching.ObjectCache
private void EnqueEvent(ChangeEvent change)
{
- if (!EventQueue.TryEnque(change))
- {
- Log.Warn("Change event {ev} was not enqued because the event queue is overflowing!", change.CurrentId);
- }
+ EventQueue.PublishEvent(change);
}
diff --git a/lib/VNLib.Data.Caching.ObjectCache/src/ICacheListenerEventQueue.cs b/lib/VNLib.Data.Caching.ObjectCache/src/ICacheListenerEventQueue.cs
new file mode 100644
index 0000000..06be4fa
--- /dev/null
+++ b/lib/VNLib.Data.Caching.ObjectCache/src/ICacheListenerEventQueue.cs
@@ -0,0 +1,65 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Data.Caching.ObjectCache
+* File: ICacheListenerEventQueue.cs
+*
+* ICacheListenerEventQueue.cs is part of VNLib.Data.Caching.ObjectCache which
+* is part of the larger VNLib collection of libraries and utilities.
+*
+* VNLib.Data.Caching.ObjectCache is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Data.Caching.ObjectCache is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace VNLib.Data.Caching.ObjectCache
+{
+ /// <summary>
+ /// Represents a single client's event queue
+ /// </summary>
+ public interface ICacheListenerEventQueue
+ {
+ /// <summary>
+ /// Determines if the queue is enabled for the given user state
+ /// </summary>
+ /// <param name="userState">The unique state of the connection</param>
+ /// <returns>True if event queuing is enabled</returns>
+ bool IsEnabled(object userState);
+
+ /// <summary>
+ /// Attempts to dequeue a single event from the queue without blocking
+ /// </summary>
+ /// <param name="userState">A user state object to associate with the wait operation</param>
+ /// <param name="changeEvent">The dequeued event if successfully dequeued</param>
+ /// <returns>True if an event was waiting and could be dequeued, false otherwise</returns>
+ bool TryDequeue(object userState, out ChangeEvent changeEvent);
+
+ /// <summary>
+ /// Waits asynchronously for an event to be dequeued
+ /// </summary>
+ /// <param name="userState">A user state object to associate with the wait operation</param>
+ /// <param name="cancellation">A token to cancel the wait operation</param>
+ /// <returns>The <see cref="ChangeEvent"/> that as a result of the dequeue operation</returns>
+ ValueTask<ChangeEvent> DequeueAsync(object userState, CancellationToken cancellation);
+
+ /// <summary>
+ /// Publishes an event to the queue
+ /// </summary>
+ /// <param name="changeEvent">The change event to publish</param>
+ void PublishEvent(ChangeEvent changeEvent);
+ }
+}