aboutsummaryrefslogtreecommitdiff
path: root/lib/Utils/src/Async
diff options
context:
space:
mode:
authorLibravatar vnugent <public@vaughnnugent.com>2023-10-20 12:18:44 -0400
committerLibravatar vnugent <public@vaughnnugent.com>2023-10-20 12:18:44 -0400
commitd997950a29ec3ce29cd652298e678d708218fdad (patch)
tree65733374ba223b08c99c98629a51c3789c0aa2e6 /lib/Utils/src/Async
parentf44cdf8f2685c37e5a1d77018a5227942b578863 (diff)
compression rpmalloc static linking, encoder memory callbacks, and more tweaks
Diffstat (limited to 'lib/Utils/src/Async')
-rw-r--r--lib/Utils/src/Async/AsyncQueue.cs45
-rw-r--r--lib/Utils/src/Async/IAsyncEventSource.cs45
-rw-r--r--lib/Utils/src/Async/IAsyncQueue.cs76
3 files changed, 136 insertions, 30 deletions
diff --git a/lib/Utils/src/Async/AsyncQueue.cs b/lib/Utils/src/Async/AsyncQueue.cs
index ba45513..45f1219 100644
--- a/lib/Utils/src/Async/AsyncQueue.cs
+++ b/lib/Utils/src/Async/AsyncQueue.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2022 Vaughn Nugent
+* Copyright (c) 2023 Vaughn Nugent
*
* Library: VNLib
* Package: VNLib.Utils
@@ -30,11 +30,12 @@ using System.Diagnostics.CodeAnalysis;
namespace VNLib.Utils.Async
{
+
/// <summary>
/// Provides a <see cref="Channel{T}"/> based asynchronous queue
/// </summary>
/// <typeparam name="T">The event object type</typeparam>
- public class AsyncQueue<T>
+ public class AsyncQueue<T> : IAsyncQueue<T>
{
private readonly Channel<T> _channel;
@@ -45,12 +46,13 @@ namespace VNLib.Utils.Async
/// </summary>
/// <param name="capacity">The maxium number of items to allow in the queue</param>
public AsyncQueue(int capacity):this(false, false, capacity)
- {}
+ { }
+
/// <summary>
/// Initalizes a new multi-threaded unbound channel queue
/// </summary>
public AsyncQueue():this(false, false)
- {}
+ { }
/// <summary>
/// Initalizes a new queue that allows specifying concurrency requirements
@@ -105,39 +107,22 @@ namespace VNLib.Utils.Async
_channel = Channel.CreateBounded<T>(options);
}
- /// <summary>
- /// Attemts to enqeue an item if the queue has the capacity
- /// </summary>
- /// <param name="item">The item to eqneue</param>
- /// <returns>True if the queue can accept another item, false otherwise</returns>
+ /// <inheritdoc/>
public bool TryEnque(T item) => _channel.Writer.TryWrite(item);
- /// <summary>
- /// Enqueues an item to the end of the queue and notifies a waiter that an item was enqueued
- /// </summary>
- /// <param name="item">The item to enqueue</param>
- /// <param name="cancellationToken"></param>
+
+ /// <inheritdoc/>
/// <exception cref="ObjectDisposedException"></exception>
public ValueTask EnqueueAsync(T item, CancellationToken cancellationToken = default) => _channel.Writer.WriteAsync(item, cancellationToken);
- /// <summary>
- /// Asynchronously waits for an item to be Enqueued to the end of the queue.
- /// </summary>
- /// <returns>The item at the begining of the queue</returns>
+
+ /// <inheritdoc/>
/// <exception cref="ObjectDisposedException"></exception>
public ValueTask<T> DequeueAsync(CancellationToken cancellationToken = default) => _channel.Reader.ReadAsync(cancellationToken);
- /// <summary>
- /// Removes the object at the beginning of the queue and stores it to the result parameter. Without waiting for a change
- /// event.
- /// </summary>
- /// <param name="result">The item that was at the begining of the queue</param>
- /// <returns>True if the queue could be read synchronously, false if the lock could not be entered, or the queue contains no items</returns>
+
+ /// <inheritdoc/>
/// <exception cref="ObjectDisposedException"></exception>
public bool TryDequeue([MaybeNullWhen(false)] out T result) => _channel.Reader.TryRead(out result);
- /// <summary>
- /// Peeks the object at the beginning of the queue and stores it to the result parameter. Without waiting for a change
- /// event.
- /// </summary>
- /// <param name="result">The item that was at the begining of the queue</param>
- /// <returns>True if the queue could be read synchronously, false if the lock could not be entered, or the queue contains no items</returns>
+
+ /// <inheritdoc/>
/// <exception cref="ObjectDisposedException"></exception>
public bool TryPeek([MaybeNullWhen(false)] out T result) => _channel.Reader.TryPeek(out result);
}
diff --git a/lib/Utils/src/Async/IAsyncEventSource.cs b/lib/Utils/src/Async/IAsyncEventSource.cs
new file mode 100644
index 0000000..968f94c
--- /dev/null
+++ b/lib/Utils/src/Async/IAsyncEventSource.cs
@@ -0,0 +1,45 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Utils
+* File: IAsyncEventSource.cs
+*
+* IAsyncEventSource.cs is part of VNLib.Utils which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* VNLib.Utils is free software: you can redistribute it and/or modify
+* it under the terms of the GNU General Public License as published
+* by the Free Software Foundation, either version 2 of the License,
+* or (at your option) any later version.
+*
+* VNLib.Utils is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* General Public License for more details.
+*
+* You should have received a copy of the GNU General Public License
+* along with VNLib.Utils. If not, see http://www.gnu.org/licenses/.
+*/
+
+namespace VNLib.Utils.Async
+{
+ /// <summary>
+ /// A type that publishes events to asynchronous event queues
+ /// </summary>
+ /// <typeparam name="T">The event item type to publish</typeparam>
+ public interface IAsyncEventSource<T>
+ {
+ /// <summary>
+ /// Subscribes a new queue to publish events to
+ /// </summary>
+ /// <param name="queue">The queue instance to publish new events to</param>
+ void Subscribe(IAsyncQueue<T> queue);
+
+ /// <summary>
+ /// Unsubscribes a previously subscribed queue from receiving events
+ /// </summary>
+ /// <param name="queue">The queue instance to unregister from events</param>
+ void Unsubscribe(IAsyncQueue<T> queue);
+ }
+}
diff --git a/lib/Utils/src/Async/IAsyncQueue.cs b/lib/Utils/src/Async/IAsyncQueue.cs
new file mode 100644
index 0000000..ab786f1
--- /dev/null
+++ b/lib/Utils/src/Async/IAsyncQueue.cs
@@ -0,0 +1,76 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Utils
+* File: IAsyncQueue.cs
+*
+* IAsyncQueue.cs is part of VNLib.Utils which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* VNLib.Utils is free software: you can redistribute it and/or modify
+* it under the terms of the GNU General Public License as published
+* by the Free Software Foundation, either version 2 of the License,
+* or (at your option) any later version.
+*
+* VNLib.Utils is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* General Public License for more details.
+*
+* You should have received a copy of the GNU General Public License
+* along with VNLib.Utils. If not, see http://www.gnu.org/licenses/.
+*/
+
+using System.Threading;
+using System.Threading.Tasks;
+using System.Diagnostics.CodeAnalysis;
+
+namespace VNLib.Utils.Async
+{
+ /// <summary>
+ /// Provides a generic asynchronous queue
+ /// </summary>
+ /// <typeparam name="T">The item message type</typeparam>
+#pragma warning disable CA1711 // Identifiers should not have incorrect suffix
+ public interface IAsyncQueue<T>
+#pragma warning restore CA1711 // Identifiers should not have incorrect suffix
+ {
+ /// <summary>
+ /// Attemts to enqueue an item if the queue has the capacity
+ /// </summary>
+ /// <param name="item">The item to eqneue</param>
+ /// <returns>True if the queue can accept another item, false otherwise</returns>
+ bool TryEnque(T item);
+
+ /// <summary>
+ /// Enqueues an item to the end of the queue and notifies a waiter that an item was enqueued
+ /// </summary>
+ /// <param name="item">The item to enqueue</param>
+ /// <param name="cancellationToken">A token to cancel the operation</param>
+ ValueTask EnqueueAsync(T item, CancellationToken cancellationToken = default);
+
+ /// <summary>
+ /// Asynchronously waits for an item to be Enqueued to the end of the queue.
+ /// </summary>
+ /// <param name="cancellationToken">A token to cancel the operation</param>
+ /// <returns>The item at the begining of the queue</returns>
+ ValueTask<T> DequeueAsync(CancellationToken cancellationToken = default);
+
+ /// <summary>
+ /// Removes the object at the beginning of the queue and stores it to the result parameter. Without waiting for a change
+ /// event.
+ /// </summary>
+ /// <param name="result">The item that was at the begining of the queue</param>
+ /// <returns>True if the queue could be read synchronously, false if the lock could not be entered, or the queue contains no items</returns>
+ bool TryDequeue([MaybeNullWhen(false)] out T result);
+
+ /// <summary>
+ /// Peeks the object at the beginning of the queue and stores it to the result parameter. Without waiting for a change
+ /// event.
+ /// </summary>
+ /// <param name="result">The item that was at the begining of the queue</param>
+ /// <returns>True if the queue could be read synchronously, false if the lock could not be entered, or the queue contains no items</returns>
+ bool TryPeek([MaybeNullWhen(false)] out T result);
+ }
+}