diff options
Diffstat (limited to 'lib/VNLib.Data.Caching.ObjectCache/src')
-rw-r--r-- | lib/VNLib.Data.Caching.ObjectCache/src/BlobCacheLIstener.cs | 38 | ||||
-rw-r--r-- | lib/VNLib.Data.Caching.ObjectCache/src/ICacheListenerEventQueue.cs | 65 |
2 files changed, 78 insertions, 25 deletions
diff --git a/lib/VNLib.Data.Caching.ObjectCache/src/BlobCacheLIstener.cs b/lib/VNLib.Data.Caching.ObjectCache/src/BlobCacheLIstener.cs index f69c2a4..0fe0663 100644 --- a/lib/VNLib.Data.Caching.ObjectCache/src/BlobCacheLIstener.cs +++ b/lib/VNLib.Data.Caching.ObjectCache/src/BlobCacheLIstener.cs @@ -42,7 +42,6 @@ using System; using System.Threading; using System.Threading.Tasks; -using VNLib.Utils.Async; using VNLib.Utils.Memory; using VNLib.Utils.Logging; using VNLib.Net.Messaging.FBM.Server; @@ -65,7 +64,7 @@ namespace VNLib.Data.Caching.ObjectCache /// <summary> /// A queue that stores update and delete events /// </summary> - public AsyncQueue<ChangeEvent> EventQueue { get; } + public ICacheListenerEventQueue EventQueue { get; } /// <summary> /// The Cache store to access data blobs @@ -77,18 +76,17 @@ namespace VNLib.Data.Caching.ObjectCache /// Initialzies a new <see cref="BlobCacheListener"/> /// </summary> /// <param name="cache">The cache table to work from</param> + /// <param name="queue">The event queue to publish changes to</param> /// <param name="log">Writes error and debug logging information</param> /// <param name="heap">The heap to alloc FBM buffers and <see cref="CacheEntry"/> cache buffers from</param> - /// <param name="singleReader">A value that indicates if a single thread is processing events</param> /// <exception cref="ArgumentNullException"></exception> - public BlobCacheListener(IBlobCacheTable cache, ILogProvider log, IUnmangedHeap heap, bool singleReader) + public BlobCacheListener(IBlobCacheTable cache, ICacheListenerEventQueue queue, ILogProvider log, IUnmangedHeap heap) { Log = log; Cache = cache ?? throw new ArgumentNullException(nameof(cache)); - //Writes may happen from multple threads with bucket design and no lock - EventQueue = new(false, singleReader); + EventQueue = queue ?? throw new ArgumentNullException(nameof(queue)); InitListener(heap); } @@ -161,32 +159,25 @@ namespace VNLib.Data.Caching.ObjectCache } } - static async Task DequeAsync(AsyncQueue<ChangeEvent> queue, FBMContext context, CancellationToken exitToken) - { - //Wait for a new message to process - ChangeEvent ev = await queue.DequeueAsync(exitToken); - - //Set the response - SetResponse(ev, context); - } - - //If no event bus is registered, then this is not a legal command - if (userState is not AsyncQueue<ChangeEvent> eventBus) + //Determine if the queue is enabled for the user + if(!EventQueue.IsEnabled(userState!)) { context.CloseResponse(ResponseCodes.NotFound); - return; } //try to deq without awaiting - if (eventBus.TryDequeue(out ChangeEvent? change)) + if (EventQueue.TryDequeue(userState!, out ChangeEvent? change)) { SetResponse(change, context); } else { - //Process async - await DequeAsync(eventBus, context, exitToken); + //Wait for a new message to process + ChangeEvent ev = await EventQueue.DequeueAsync(userState!, exitToken); + + //Set the response + SetResponse(ev, context); } return; @@ -257,10 +248,7 @@ namespace VNLib.Data.Caching.ObjectCache private void EnqueEvent(ChangeEvent change) { - if (!EventQueue.TryEnque(change)) - { - Log.Warn("Change event {ev} was not enqued because the event queue is overflowing!", change.CurrentId); - } + EventQueue.PublishEvent(change); } diff --git a/lib/VNLib.Data.Caching.ObjectCache/src/ICacheListenerEventQueue.cs b/lib/VNLib.Data.Caching.ObjectCache/src/ICacheListenerEventQueue.cs new file mode 100644 index 0000000..06be4fa --- /dev/null +++ b/lib/VNLib.Data.Caching.ObjectCache/src/ICacheListenerEventQueue.cs @@ -0,0 +1,65 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Data.Caching.ObjectCache +* File: ICacheListenerEventQueue.cs +* +* ICacheListenerEventQueue.cs is part of VNLib.Data.Caching.ObjectCache which +* is part of the larger VNLib collection of libraries and utilities. +* +* VNLib.Data.Caching.ObjectCache is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Data.Caching.ObjectCache is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + + +using System.Threading; +using System.Threading.Tasks; + +namespace VNLib.Data.Caching.ObjectCache +{ + /// <summary> + /// Represents a single client's event queue + /// </summary> + public interface ICacheListenerEventQueue + { + /// <summary> + /// Determines if the queue is enabled for the given user state + /// </summary> + /// <param name="userState">The unique state of the connection</param> + /// <returns>True if event queuing is enabled</returns> + bool IsEnabled(object userState); + + /// <summary> + /// Attempts to dequeue a single event from the queue without blocking + /// </summary> + /// <param name="userState">A user state object to associate with the wait operation</param> + /// <param name="changeEvent">The dequeued event if successfully dequeued</param> + /// <returns>True if an event was waiting and could be dequeued, false otherwise</returns> + bool TryDequeue(object userState, out ChangeEvent changeEvent); + + /// <summary> + /// Waits asynchronously for an event to be dequeued + /// </summary> + /// <param name="userState">A user state object to associate with the wait operation</param> + /// <param name="cancellation">A token to cancel the wait operation</param> + /// <returns>The <see cref="ChangeEvent"/> that as a result of the dequeue operation</returns> + ValueTask<ChangeEvent> DequeueAsync(object userState, CancellationToken cancellation); + + /// <summary> + /// Publishes an event to the queue + /// </summary> + /// <param name="changeEvent">The change event to publish</param> + void PublishEvent(ChangeEvent changeEvent); + } +} |