diff options
Diffstat (limited to 'lib/Utils/src')
-rw-r--r-- | lib/Utils/src/Async/AsyncAccessSerializer.cs | 277 | ||||
-rw-r--r-- | lib/Utils/src/Memory/MemoryUtil.cs | 7 |
2 files changed, 216 insertions, 68 deletions
diff --git a/lib/Utils/src/Async/AsyncAccessSerializer.cs b/lib/Utils/src/Async/AsyncAccessSerializer.cs index 0ee5a57..bdd8114 100644 --- a/lib/Utils/src/Async/AsyncAccessSerializer.cs +++ b/lib/Utils/src/Async/AsyncAccessSerializer.cs @@ -81,24 +81,87 @@ namespace VNLib.Utils.Async cancellation.ThrowIfCancellationRequested(); WaitEnterToken token; + WaitEntry? wait; - lock (StoreLock) + if (cancellation.CanBeCanceled) + { + lock (StoreLock) + { + //See if the entry already exists, otherwise get a new wait entry + if (!WaitTable.TryGetValue(moniker, out wait)) + { + GetWaitEntry(ref wait, moniker); + + //Add entry to store + WaitTable[moniker] = wait; + } + + //Get waiter before leaving lock + wait.ScheduleWait(cancellation, out token); + } + + //Enter wait and setup cancellation continuation + return EnterCancellableWait(in token, wait); + } + else { - //See if the entry already exists, otherwise get a new wait entry - if (!WaitTable.TryGetValue(moniker, out WaitEntry? wait)) + lock (StoreLock) { - GetWaitEntry(ref wait, moniker); + //See if the entry already exists, otherwise get a new wait entry + if (!WaitTable.TryGetValue(moniker, out wait)) + { + GetWaitEntry(ref wait, moniker); + + //Add entry to store + WaitTable[moniker] = wait; + } - //Add entry to store - WaitTable[moniker] = wait; + //Get waiter before leaving lock + wait.ScheduleWait(out token); } - //Get waiter before leaving lock - wait.GetWaiter(out token); + //Enter the waiter without any cancellation support + return token.EnterWaitAsync(); } + } + + /// <summary> + /// Enters a cancellable wait and sets up a continuation to release the wait entry + /// if a cancellation occurs + /// </summary> + /// <param name="token"></param> + /// <param name="entry"></param> + /// <returns></returns> + protected Task EnterCancellableWait(in WaitEnterToken token, WaitEntry entry) + { + //Inspect for a task that is already completed + if (token.MayYield) + { + Task awaitable = token.EnterWaitAsync(); + _ = awaitable.ContinueWith(OnCancellableTaskContinuation, entry, TaskScheduler.Default); + return awaitable; + } + else + { + return token.EnterWaitAsync(); + } + } - return token.EnterWaitAsync(cancellation); + private void OnCancellableTaskContinuation(Task task, object? state) + { + if (!task.IsCompletedSuccessfully) + { + Debug.Assert(task.IsCanceled, "A wait task did not complete successfully but was not cancelled, this is an unexpected condition"); + + //store lock must be held during wait entry transition + lock (StoreLock) + { + //Release the wait entry + (state as WaitEntry)!.OnCancelled(task); + } + } } + ///<inheritdoc/> public virtual void Release(TMoniker moniker) @@ -123,7 +186,7 @@ namespace VNLib.Utils.Async WaitEntry entry = WaitTable[moniker]; //Call release while holding store lock - if (entry.Release(out releaser) == 0) + if (entry.ExitWait(out releaser) == 0) { //No more waiters WaitTable.Remove(moniker); @@ -132,6 +195,9 @@ namespace VNLib.Utils.Async * We must release the semaphore before returning to pool, * its safe because there are no more waiters */ + + Debug.Assert(!releaser.WillTransition, "The wait entry referrence count was 0 but a release token was issued that would cause a lock transision"); + releaser.Release(); ReturnEntry(entry); @@ -140,6 +206,7 @@ namespace VNLib.Utils.Async releaser = default; } } + //Release sem outside of lock releaser.Release(); } @@ -177,11 +244,6 @@ namespace VNLib.Utils.Async { EntryPool.Push(entry); } - else - { - //Dispose entry since were not storing it - entry.Dispose(); - } } /// <summary> @@ -204,19 +266,20 @@ namespace VNLib.Utils.Async //Cleanup the wait store WaitTable.TrimExcess(); } - - //Dispose entires - Array.ForEach(pooled, static pooled => pooled.Dispose()); } /// <summary> /// An entry within the wait table representing a serializer entry /// for a given moniker /// </summary> - protected class WaitEntry : VnDisposeable + protected class WaitEntry { private uint _waitCount; - private readonly SemaphoreSlim _waitHandle = new(1, 1); + + /* + * Head of the waiting task queue + */ + private TaskNode? _head; /// <summary> /// A stored referrnece to the moniker while the wait exists @@ -226,12 +289,46 @@ namespace VNLib.Utils.Async /// <summary> /// Gets a token used to enter the lock which may block, or yield async /// outside of a nested lock + /// <para>This method and release method are not thread safe</para> + /// </summary> + /// <param name="enterToken">A referrence to the wait entry token</param> + /// <returns> + /// The incremented reference count. + /// </returns> + public uint ScheduleWait(out WaitEnterToken enterToken) + { + /* + * Increment wait count before entering the lock + * A cancellation is the only way out, so cover that + * during the async, only if the token is cancelable + */ + + _waitCount++; + + if (_waitCount != 1) + { + TaskNode waiter = InitAndEnqueueWaiter(default); + enterToken = new(waiter); + return _waitCount; + } + else + { + enterToken = default; + return _waitCount; + } + } + + /// <summary> + /// Gets a token used to enter the lock which may block, or yield async + /// outside of a nested lock, with cancellation support + /// <para>This method and release method are not thread safe</para> /// </summary> + /// <param name="cancellation"></param> /// <param name="enterToken">A referrence to the wait entry token</param> /// <returns> /// The incremented reference count. /// </returns> - public uint GetWaiter(out WaitEnterToken enterToken) + public uint ScheduleWait(CancellationToken cancellation, out WaitEnterToken enterToken) { /* * Increment wait count before entering the lock @@ -239,25 +336,48 @@ namespace VNLib.Utils.Async * during the async, only if the token is cancelable */ - enterToken = new(this); - return Interlocked.Increment(ref _waitCount); + _waitCount++; + + if (_waitCount != 1) + { + TaskNode waiter = InitAndEnqueueWaiter(cancellation); + enterToken = new(waiter); + return _waitCount; + } + else + { + enterToken = default; + return _waitCount; + } } /// <summary> /// Prepares a release token and atomically decrements the waiter count /// and returns the remaining number of waiters. + /// <para>This method and enter method are not thread safe</para> /// </summary> /// <param name="releaser"> /// The token that should be used to release the exclusive lock held on /// a moniker /// </param> /// <returns>The number of remaining waiters</returns> - public uint Release(out WaitReleaseToken releaser) + public uint ExitWait(out WaitReleaseToken releaser) { - releaser = new(_waitHandle); - //Decrement release count before leaving - return Interlocked.Decrement(ref _waitCount); + --_waitCount; + + TaskNode? next = _head; + + if(next != null) + { + //Remove task from queue + _head = next.Next; + } + + //Init releaser + releaser = new(next); + + return _waitCount; } /// <summary> @@ -270,7 +390,7 @@ namespace VNLib.Utils.Async Moniker = moniker; //Wait count should be 0 on calls to prepare, its a bug if not - Debug.Assert(_waitCount == 0); + Debug.Assert(_waitCount == 0, "Async serializer wait count should have been reset before pooling"); } /* @@ -278,50 +398,70 @@ namespace VNLib.Utils.Async * outside a nested lock */ - internal Task WaitAsync(CancellationToken cancellation) - { - - //See if lock can be entered synchronously - if (_waitHandle.Wait(0, CancellationToken.None)) - { - //Lock was entered successfully without async yield - return Task.CompletedTask; - } - //Lock must be entered async + private TaskNode InitAndEnqueueWaiter(CancellationToken cancellation) + { + TaskNode newNode = new(OnTaskCompleted, this, cancellation); - //Check to confirm cancellation may happen - if (cancellation.CanBeCanceled) + //find the tail + TaskNode? tail = _head; + if (tail == null) { - //Task may be cancelled, so we need to monitor the results to properly set waiting count - Task wait = _waitHandle.WaitAsync(cancellation); - return WaitForLockEntryWithCancellationAsync(wait); + _head = newNode; } else { - //Task cannot be canceled, so we dont need to monitor the results - return _waitHandle.WaitAsync(CancellationToken.None); + //Find end of queue + while (tail.Next != null) + { + tail = tail.Next; + } + //Store new tail + tail.Next = newNode; } + return newNode; } - private async Task WaitForLockEntryWithCancellationAsync(Task wait) + private void RemoveTask(TaskNode task) { - try + //search entire queue for task + TaskNode? node = _head; + while (node != null) { - await wait.ConfigureAwait(false); - } - catch - { - //Decrement wait count on error entering lock async - _ = Interlocked.Decrement(ref _waitCount); - throw; + if (node.Next == task) + { + //Remove task from queue + node.Next = task.Next; + break; + } + node = node.Next; } } - ///<inheritdoc/> - protected override void Free() + internal uint OnCancelled(Task instance) + { + RemoveTask((instance as TaskNode)!); + + //Decrement release count before leaving + return --_waitCount; + } + + private static void OnTaskCompleted(object? state) + { } + + + /* + * A linked list style task node that is used to store the + * next task in the queue and be awaitable as a task + */ + + private sealed class TaskNode : Task { - _waitHandle.Dispose(); + public TaskNode(Action<object?> callback, object item, CancellationToken cancellation) : base(callback, item, cancellation) + { } + + public TaskNode? Next { get; set; } + } } @@ -331,16 +471,21 @@ namespace VNLib.Utils.Async /// </summary> protected readonly ref struct WaitReleaseToken { - private readonly SemaphoreSlim? _sem; + private readonly Task? _nextWaiter; - internal WaitReleaseToken(SemaphoreSlim sem) => _sem = sem; + /// <summary> + /// Indicates if releasing the lock will cause scheduling of another thread + /// </summary> + public readonly bool WillTransition => _nextWaiter != null; + + internal WaitReleaseToken(Task? nextWaiter) => _nextWaiter = nextWaiter; /// <summary> /// Releases the exclusive lock held by the token. NOTE: /// this method may only be called ONCE after a wait has been /// released /// </summary> - public readonly void Release() => _sem?.Release(); + public readonly void Release() => _nextWaiter?.Start(); } /// <summary> @@ -348,17 +493,21 @@ namespace VNLib.Utils.Async /// </summary> protected readonly ref struct WaitEnterToken { - private readonly WaitEntry _entry; + private readonly Task? _waiter; + + /// <summary> + /// Indicates if a call to EnterWaitAsync will cause an awaiter to yield + /// </summary> + public bool MayYield => _waiter != null; - internal WaitEnterToken(WaitEntry entry) => _entry = entry; + internal WaitEnterToken(Task wait) => _waiter = wait; /// <summary> /// Enters the wait for the WaitEntry. This method may not block /// or yield (IE Return <see cref="Task.CompletedTask"/>) /// </summary> - /// <param name="cancellation">A token to cancel the wait for the resource</param> /// <returns></returns> - public Task EnterWaitAsync(CancellationToken cancellation) => _entry.WaitAsync(cancellation); + public Task EnterWaitAsync() => _waiter ?? Task.CompletedTask; } } }
\ No newline at end of file diff --git a/lib/Utils/src/Memory/MemoryUtil.cs b/lib/Utils/src/Memory/MemoryUtil.cs index f4482c0..f671da0 100644 --- a/lib/Utils/src/Memory/MemoryUtil.cs +++ b/lib/Utils/src/Memory/MemoryUtil.cs @@ -672,6 +672,8 @@ namespace VNLib.Utils.Memory //Get array base address void* basePtr = (void*)arrHandle.AddrOfPinnedObject(); + Debug.Assert(basePtr != null); + //Get element offset void* indexOffet = Unsafe.Add<T>(basePtr, elementOffset); @@ -686,10 +688,7 @@ namespace VNLib.Utils.Memory /// <param name="pinnable">An optional <see cref="IPinnable"/> instace to wrap with the handle</param> /// <returns>The <see cref="MemoryHandle"/> wrapper</returns> [MethodImpl(MethodImplOptions.AggressiveInlining)] - public static MemoryHandle GetMemoryHandleFromPointer(IntPtr value, GCHandle handle = default, IPinnable? pinnable = null) - { - return new MemoryHandle(value.ToPointer(), handle, pinnable); - } + public static MemoryHandle GetMemoryHandleFromPointer(IntPtr value, GCHandle handle = default, IPinnable? pinnable = null) => new (value.ToPointer(), handle, pinnable); /// <summary> /// Gets a <see cref="Span{T}"/> from the supplied address |