From 1a8ab1457244d15b19ddcc94958f645f5ec2abc7 Mon Sep 17 00:00:00 2001 From: vnugent Date: Thu, 22 Jun 2023 21:16:28 -0400 Subject: Save checkpoint --- .../src/BlobCacheLIstener.cs | 38 +++++-------- .../src/ICacheListenerEventQueue.cs | 65 ++++++++++++++++++++++ 2 files changed, 78 insertions(+), 25 deletions(-) create mode 100644 lib/VNLib.Data.Caching.ObjectCache/src/ICacheListenerEventQueue.cs (limited to 'lib/VNLib.Data.Caching.ObjectCache/src') diff --git a/lib/VNLib.Data.Caching.ObjectCache/src/BlobCacheLIstener.cs b/lib/VNLib.Data.Caching.ObjectCache/src/BlobCacheLIstener.cs index f69c2a4..0fe0663 100644 --- a/lib/VNLib.Data.Caching.ObjectCache/src/BlobCacheLIstener.cs +++ b/lib/VNLib.Data.Caching.ObjectCache/src/BlobCacheLIstener.cs @@ -42,7 +42,6 @@ using System; using System.Threading; using System.Threading.Tasks; -using VNLib.Utils.Async; using VNLib.Utils.Memory; using VNLib.Utils.Logging; using VNLib.Net.Messaging.FBM.Server; @@ -65,7 +64,7 @@ namespace VNLib.Data.Caching.ObjectCache /// /// A queue that stores update and delete events /// - public AsyncQueue EventQueue { get; } + public ICacheListenerEventQueue EventQueue { get; } /// /// The Cache store to access data blobs @@ -77,18 +76,17 @@ namespace VNLib.Data.Caching.ObjectCache /// Initialzies a new /// /// The cache table to work from + /// The event queue to publish changes to /// Writes error and debug logging information /// The heap to alloc FBM buffers and cache buffers from - /// A value that indicates if a single thread is processing events /// - public BlobCacheListener(IBlobCacheTable cache, ILogProvider log, IUnmangedHeap heap, bool singleReader) + public BlobCacheListener(IBlobCacheTable cache, ICacheListenerEventQueue queue, ILogProvider log, IUnmangedHeap heap) { Log = log; Cache = cache ?? throw new ArgumentNullException(nameof(cache)); - //Writes may happen from multple threads with bucket design and no lock - EventQueue = new(false, singleReader); + EventQueue = queue ?? throw new ArgumentNullException(nameof(queue)); InitListener(heap); } @@ -161,32 +159,25 @@ namespace VNLib.Data.Caching.ObjectCache } } - static async Task DequeAsync(AsyncQueue queue, FBMContext context, CancellationToken exitToken) - { - //Wait for a new message to process - ChangeEvent ev = await queue.DequeueAsync(exitToken); - - //Set the response - SetResponse(ev, context); - } - - //If no event bus is registered, then this is not a legal command - if (userState is not AsyncQueue eventBus) + //Determine if the queue is enabled for the user + if(!EventQueue.IsEnabled(userState!)) { context.CloseResponse(ResponseCodes.NotFound); - return; } //try to deq without awaiting - if (eventBus.TryDequeue(out ChangeEvent? change)) + if (EventQueue.TryDequeue(userState!, out ChangeEvent? change)) { SetResponse(change, context); } else { - //Process async - await DequeAsync(eventBus, context, exitToken); + //Wait for a new message to process + ChangeEvent ev = await EventQueue.DequeueAsync(userState!, exitToken); + + //Set the response + SetResponse(ev, context); } return; @@ -257,10 +248,7 @@ namespace VNLib.Data.Caching.ObjectCache private void EnqueEvent(ChangeEvent change) { - if (!EventQueue.TryEnque(change)) - { - Log.Warn("Change event {ev} was not enqued because the event queue is overflowing!", change.CurrentId); - } + EventQueue.PublishEvent(change); } diff --git a/lib/VNLib.Data.Caching.ObjectCache/src/ICacheListenerEventQueue.cs b/lib/VNLib.Data.Caching.ObjectCache/src/ICacheListenerEventQueue.cs new file mode 100644 index 0000000..06be4fa --- /dev/null +++ b/lib/VNLib.Data.Caching.ObjectCache/src/ICacheListenerEventQueue.cs @@ -0,0 +1,65 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Data.Caching.ObjectCache +* File: ICacheListenerEventQueue.cs +* +* ICacheListenerEventQueue.cs is part of VNLib.Data.Caching.ObjectCache which +* is part of the larger VNLib collection of libraries and utilities. +* +* VNLib.Data.Caching.ObjectCache is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Data.Caching.ObjectCache is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + + +using System.Threading; +using System.Threading.Tasks; + +namespace VNLib.Data.Caching.ObjectCache +{ + /// + /// Represents a single client's event queue + /// + public interface ICacheListenerEventQueue + { + /// + /// Determines if the queue is enabled for the given user state + /// + /// The unique state of the connection + /// True if event queuing is enabled + bool IsEnabled(object userState); + + /// + /// Attempts to dequeue a single event from the queue without blocking + /// + /// A user state object to associate with the wait operation + /// The dequeued event if successfully dequeued + /// True if an event was waiting and could be dequeued, false otherwise + bool TryDequeue(object userState, out ChangeEvent changeEvent); + + /// + /// Waits asynchronously for an event to be dequeued + /// + /// A user state object to associate with the wait operation + /// A token to cancel the wait operation + /// The that as a result of the dequeue operation + ValueTask DequeueAsync(object userState, CancellationToken cancellation); + + /// + /// Publishes an event to the queue + /// + /// The change event to publish + void PublishEvent(ChangeEvent changeEvent); + } +} -- cgit