aboutsummaryrefslogtreecommitdiff
path: root/lib/Utils
diff options
context:
space:
mode:
authorLibravatar vnugent <public@vaughnnugent.com>2023-09-27 01:09:11 -0400
committerLibravatar vnugent <public@vaughnnugent.com>2023-09-27 01:09:11 -0400
commit5150e2ad6d07a8b12c6678329d18fdb8acacf7f0 (patch)
tree81321ccac14a359ec3b087632fc7f3fdfa7ac689 /lib/Utils
parent3f2982f9b48dcef1a7434d396c45d57bb61d0a3c (diff)
serializer overhaul, plugin cleanup round 3 or 4, more tests
Diffstat (limited to 'lib/Utils')
-rw-r--r--lib/Utils/src/Async/AsyncAccessSerializer.cs277
-rw-r--r--lib/Utils/src/Memory/MemoryUtil.cs7
-rw-r--r--lib/Utils/tests/.runsettings1
-rw-r--r--lib/Utils/tests/Async/AsyncAccessSerializerTests.cs149
-rw-r--r--lib/Utils/tests/Memory/MemoryUtilTests.cs1
-rw-r--r--lib/Utils/tests/Memory/NativeHeapTests.cs6
-rw-r--r--lib/Utils/tests/VnEncodingTests.cs2
7 files changed, 369 insertions, 74 deletions
diff --git a/lib/Utils/src/Async/AsyncAccessSerializer.cs b/lib/Utils/src/Async/AsyncAccessSerializer.cs
index 0ee5a57..bdd8114 100644
--- a/lib/Utils/src/Async/AsyncAccessSerializer.cs
+++ b/lib/Utils/src/Async/AsyncAccessSerializer.cs
@@ -81,24 +81,87 @@ namespace VNLib.Utils.Async
cancellation.ThrowIfCancellationRequested();
WaitEnterToken token;
+ WaitEntry? wait;
- lock (StoreLock)
+ if (cancellation.CanBeCanceled)
+ {
+ lock (StoreLock)
+ {
+ //See if the entry already exists, otherwise get a new wait entry
+ if (!WaitTable.TryGetValue(moniker, out wait))
+ {
+ GetWaitEntry(ref wait, moniker);
+
+ //Add entry to store
+ WaitTable[moniker] = wait;
+ }
+
+ //Get waiter before leaving lock
+ wait.ScheduleWait(cancellation, out token);
+ }
+
+ //Enter wait and setup cancellation continuation
+ return EnterCancellableWait(in token, wait);
+ }
+ else
{
- //See if the entry already exists, otherwise get a new wait entry
- if (!WaitTable.TryGetValue(moniker, out WaitEntry? wait))
+ lock (StoreLock)
{
- GetWaitEntry(ref wait, moniker);
+ //See if the entry already exists, otherwise get a new wait entry
+ if (!WaitTable.TryGetValue(moniker, out wait))
+ {
+ GetWaitEntry(ref wait, moniker);
+
+ //Add entry to store
+ WaitTable[moniker] = wait;
+ }
- //Add entry to store
- WaitTable[moniker] = wait;
+ //Get waiter before leaving lock
+ wait.ScheduleWait(out token);
}
- //Get waiter before leaving lock
- wait.GetWaiter(out token);
+ //Enter the waiter without any cancellation support
+ return token.EnterWaitAsync();
}
+ }
+
+ /// <summary>
+ /// Enters a cancellable wait and sets up a continuation to release the wait entry
+ /// if a cancellation occurs
+ /// </summary>
+ /// <param name="token"></param>
+ /// <param name="entry"></param>
+ /// <returns></returns>
+ protected Task EnterCancellableWait(in WaitEnterToken token, WaitEntry entry)
+ {
+ //Inspect for a task that is already completed
+ if (token.MayYield)
+ {
+ Task awaitable = token.EnterWaitAsync();
+ _ = awaitable.ContinueWith(OnCancellableTaskContinuation, entry, TaskScheduler.Default);
+ return awaitable;
+ }
+ else
+ {
+ return token.EnterWaitAsync();
+ }
+ }
- return token.EnterWaitAsync(cancellation);
+ private void OnCancellableTaskContinuation(Task task, object? state)
+ {
+ if (!task.IsCompletedSuccessfully)
+ {
+ Debug.Assert(task.IsCanceled, "A wait task did not complete successfully but was not cancelled, this is an unexpected condition");
+
+ //store lock must be held during wait entry transition
+ lock (StoreLock)
+ {
+ //Release the wait entry
+ (state as WaitEntry)!.OnCancelled(task);
+ }
+ }
}
+
///<inheritdoc/>
public virtual void Release(TMoniker moniker)
@@ -123,7 +186,7 @@ namespace VNLib.Utils.Async
WaitEntry entry = WaitTable[moniker];
//Call release while holding store lock
- if (entry.Release(out releaser) == 0)
+ if (entry.ExitWait(out releaser) == 0)
{
//No more waiters
WaitTable.Remove(moniker);
@@ -132,6 +195,9 @@ namespace VNLib.Utils.Async
* We must release the semaphore before returning to pool,
* its safe because there are no more waiters
*/
+
+ Debug.Assert(!releaser.WillTransition, "The wait entry referrence count was 0 but a release token was issued that would cause a lock transision");
+
releaser.Release();
ReturnEntry(entry);
@@ -140,6 +206,7 @@ namespace VNLib.Utils.Async
releaser = default;
}
}
+
//Release sem outside of lock
releaser.Release();
}
@@ -177,11 +244,6 @@ namespace VNLib.Utils.Async
{
EntryPool.Push(entry);
}
- else
- {
- //Dispose entry since were not storing it
- entry.Dispose();
- }
}
/// <summary>
@@ -204,19 +266,20 @@ namespace VNLib.Utils.Async
//Cleanup the wait store
WaitTable.TrimExcess();
}
-
- //Dispose entires
- Array.ForEach(pooled, static pooled => pooled.Dispose());
}
/// <summary>
/// An entry within the wait table representing a serializer entry
/// for a given moniker
/// </summary>
- protected class WaitEntry : VnDisposeable
+ protected class WaitEntry
{
private uint _waitCount;
- private readonly SemaphoreSlim _waitHandle = new(1, 1);
+
+ /*
+ * Head of the waiting task queue
+ */
+ private TaskNode? _head;
/// <summary>
/// A stored referrnece to the moniker while the wait exists
@@ -226,12 +289,46 @@ namespace VNLib.Utils.Async
/// <summary>
/// Gets a token used to enter the lock which may block, or yield async
/// outside of a nested lock
+ /// <para>This method and release method are not thread safe</para>
+ /// </summary>
+ /// <param name="enterToken">A referrence to the wait entry token</param>
+ /// <returns>
+ /// The incremented reference count.
+ /// </returns>
+ public uint ScheduleWait(out WaitEnterToken enterToken)
+ {
+ /*
+ * Increment wait count before entering the lock
+ * A cancellation is the only way out, so cover that
+ * during the async, only if the token is cancelable
+ */
+
+ _waitCount++;
+
+ if (_waitCount != 1)
+ {
+ TaskNode waiter = InitAndEnqueueWaiter(default);
+ enterToken = new(waiter);
+ return _waitCount;
+ }
+ else
+ {
+ enterToken = default;
+ return _waitCount;
+ }
+ }
+
+ /// <summary>
+ /// Gets a token used to enter the lock which may block, or yield async
+ /// outside of a nested lock, with cancellation support
+ /// <para>This method and release method are not thread safe</para>
/// </summary>
+ /// <param name="cancellation"></param>
/// <param name="enterToken">A referrence to the wait entry token</param>
/// <returns>
/// The incremented reference count.
/// </returns>
- public uint GetWaiter(out WaitEnterToken enterToken)
+ public uint ScheduleWait(CancellationToken cancellation, out WaitEnterToken enterToken)
{
/*
* Increment wait count before entering the lock
@@ -239,25 +336,48 @@ namespace VNLib.Utils.Async
* during the async, only if the token is cancelable
*/
- enterToken = new(this);
- return Interlocked.Increment(ref _waitCount);
+ _waitCount++;
+
+ if (_waitCount != 1)
+ {
+ TaskNode waiter = InitAndEnqueueWaiter(cancellation);
+ enterToken = new(waiter);
+ return _waitCount;
+ }
+ else
+ {
+ enterToken = default;
+ return _waitCount;
+ }
}
/// <summary>
/// Prepares a release token and atomically decrements the waiter count
/// and returns the remaining number of waiters.
+ /// <para>This method and enter method are not thread safe</para>
/// </summary>
/// <param name="releaser">
/// The token that should be used to release the exclusive lock held on
/// a moniker
/// </param>
/// <returns>The number of remaining waiters</returns>
- public uint Release(out WaitReleaseToken releaser)
+ public uint ExitWait(out WaitReleaseToken releaser)
{
- releaser = new(_waitHandle);
-
//Decrement release count before leaving
- return Interlocked.Decrement(ref _waitCount);
+ --_waitCount;
+
+ TaskNode? next = _head;
+
+ if(next != null)
+ {
+ //Remove task from queue
+ _head = next.Next;
+ }
+
+ //Init releaser
+ releaser = new(next);
+
+ return _waitCount;
}
/// <summary>
@@ -270,7 +390,7 @@ namespace VNLib.Utils.Async
Moniker = moniker;
//Wait count should be 0 on calls to prepare, its a bug if not
- Debug.Assert(_waitCount == 0);
+ Debug.Assert(_waitCount == 0, "Async serializer wait count should have been reset before pooling");
}
/*
@@ -278,50 +398,70 @@ namespace VNLib.Utils.Async
* outside a nested lock
*/
- internal Task WaitAsync(CancellationToken cancellation)
- {
-
- //See if lock can be entered synchronously
- if (_waitHandle.Wait(0, CancellationToken.None))
- {
- //Lock was entered successfully without async yield
- return Task.CompletedTask;
- }
- //Lock must be entered async
+ private TaskNode InitAndEnqueueWaiter(CancellationToken cancellation)
+ {
+ TaskNode newNode = new(OnTaskCompleted, this, cancellation);
- //Check to confirm cancellation may happen
- if (cancellation.CanBeCanceled)
+ //find the tail
+ TaskNode? tail = _head;
+ if (tail == null)
{
- //Task may be cancelled, so we need to monitor the results to properly set waiting count
- Task wait = _waitHandle.WaitAsync(cancellation);
- return WaitForLockEntryWithCancellationAsync(wait);
+ _head = newNode;
}
else
{
- //Task cannot be canceled, so we dont need to monitor the results
- return _waitHandle.WaitAsync(CancellationToken.None);
+ //Find end of queue
+ while (tail.Next != null)
+ {
+ tail = tail.Next;
+ }
+ //Store new tail
+ tail.Next = newNode;
}
+ return newNode;
}
- private async Task WaitForLockEntryWithCancellationAsync(Task wait)
+ private void RemoveTask(TaskNode task)
{
- try
+ //search entire queue for task
+ TaskNode? node = _head;
+ while (node != null)
{
- await wait.ConfigureAwait(false);
- }
- catch
- {
- //Decrement wait count on error entering lock async
- _ = Interlocked.Decrement(ref _waitCount);
- throw;
+ if (node.Next == task)
+ {
+ //Remove task from queue
+ node.Next = task.Next;
+ break;
+ }
+ node = node.Next;
}
}
- ///<inheritdoc/>
- protected override void Free()
+ internal uint OnCancelled(Task instance)
+ {
+ RemoveTask((instance as TaskNode)!);
+
+ //Decrement release count before leaving
+ return --_waitCount;
+ }
+
+ private static void OnTaskCompleted(object? state)
+ { }
+
+
+ /*
+ * A linked list style task node that is used to store the
+ * next task in the queue and be awaitable as a task
+ */
+
+ private sealed class TaskNode : Task
{
- _waitHandle.Dispose();
+ public TaskNode(Action<object?> callback, object item, CancellationToken cancellation) : base(callback, item, cancellation)
+ { }
+
+ public TaskNode? Next { get; set; }
+
}
}
@@ -331,16 +471,21 @@ namespace VNLib.Utils.Async
/// </summary>
protected readonly ref struct WaitReleaseToken
{
- private readonly SemaphoreSlim? _sem;
+ private readonly Task? _nextWaiter;
- internal WaitReleaseToken(SemaphoreSlim sem) => _sem = sem;
+ /// <summary>
+ /// Indicates if releasing the lock will cause scheduling of another thread
+ /// </summary>
+ public readonly bool WillTransition => _nextWaiter != null;
+
+ internal WaitReleaseToken(Task? nextWaiter) => _nextWaiter = nextWaiter;
/// <summary>
/// Releases the exclusive lock held by the token. NOTE:
/// this method may only be called ONCE after a wait has been
/// released
/// </summary>
- public readonly void Release() => _sem?.Release();
+ public readonly void Release() => _nextWaiter?.Start();
}
/// <summary>
@@ -348,17 +493,21 @@ namespace VNLib.Utils.Async
/// </summary>
protected readonly ref struct WaitEnterToken
{
- private readonly WaitEntry _entry;
+ private readonly Task? _waiter;
+
+ /// <summary>
+ /// Indicates if a call to EnterWaitAsync will cause an awaiter to yield
+ /// </summary>
+ public bool MayYield => _waiter != null;
- internal WaitEnterToken(WaitEntry entry) => _entry = entry;
+ internal WaitEnterToken(Task wait) => _waiter = wait;
/// <summary>
/// Enters the wait for the WaitEntry. This method may not block
/// or yield (IE Return <see cref="Task.CompletedTask"/>)
/// </summary>
- /// <param name="cancellation">A token to cancel the wait for the resource</param>
/// <returns></returns>
- public Task EnterWaitAsync(CancellationToken cancellation) => _entry.WaitAsync(cancellation);
+ public Task EnterWaitAsync() => _waiter ?? Task.CompletedTask;
}
}
} \ No newline at end of file
diff --git a/lib/Utils/src/Memory/MemoryUtil.cs b/lib/Utils/src/Memory/MemoryUtil.cs
index f4482c0..f671da0 100644
--- a/lib/Utils/src/Memory/MemoryUtil.cs
+++ b/lib/Utils/src/Memory/MemoryUtil.cs
@@ -672,6 +672,8 @@ namespace VNLib.Utils.Memory
//Get array base address
void* basePtr = (void*)arrHandle.AddrOfPinnedObject();
+ Debug.Assert(basePtr != null);
+
//Get element offset
void* indexOffet = Unsafe.Add<T>(basePtr, elementOffset);
@@ -686,10 +688,7 @@ namespace VNLib.Utils.Memory
/// <param name="pinnable">An optional <see cref="IPinnable"/> instace to wrap with the handle</param>
/// <returns>The <see cref="MemoryHandle"/> wrapper</returns>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
- public static MemoryHandle GetMemoryHandleFromPointer(IntPtr value, GCHandle handle = default, IPinnable? pinnable = null)
- {
- return new MemoryHandle(value.ToPointer(), handle, pinnable);
- }
+ public static MemoryHandle GetMemoryHandleFromPointer(IntPtr value, GCHandle handle = default, IPinnable? pinnable = null) => new (value.ToPointer(), handle, pinnable);
/// <summary>
/// Gets a <see cref="Span{T}"/> from the supplied address
diff --git a/lib/Utils/tests/.runsettings b/lib/Utils/tests/.runsettings
index 977a261..0e7a703 100644
--- a/lib/Utils/tests/.runsettings
+++ b/lib/Utils/tests/.runsettings
@@ -2,7 +2,6 @@
<RunSettings>
<RunConfiguration>
<EnvironmentVariables>
- <VNLIB_SHARED_HEAP_FILE_PATH></VNLIB_SHARED_HEAP_FILE_PATH>
<VNLIB_SHARED_HEAP_DIAGNOSTICS>1</VNLIB_SHARED_HEAP_DIAGNOSTICS>
</EnvironmentVariables>
</RunConfiguration>
diff --git a/lib/Utils/tests/Async/AsyncAccessSerializerTests.cs b/lib/Utils/tests/Async/AsyncAccessSerializerTests.cs
index 5bb8b8a..7119d21 100644
--- a/lib/Utils/tests/Async/AsyncAccessSerializerTests.cs
+++ b/lib/Utils/tests/Async/AsyncAccessSerializerTests.cs
@@ -5,6 +5,9 @@ using System;
using System.Threading;
using System.Threading.Tasks;
using System.Collections.Generic;
+using System.Diagnostics;
+using System.Runtime.CompilerServices;
+using System.Linq;
namespace VNLib.Utils.Async.Tests
{
@@ -84,7 +87,151 @@ namespace VNLib.Utils.Async.Tests
cts.Cancel();
//Confirm the task raises cancellation
- Assert.ThrowsException<OperationCanceledException>(() => reentry.GetAwaiter().GetResult());
+ Assert.ThrowsException<TaskCanceledException>(() => reentry.GetAwaiter().GetResult());
+ }
+
+ [TestMethod()]
+ [MethodImpl(MethodImplOptions.NoOptimization)]
+ public void MultiThreadedAASTest()
+ {
+ const string DEFAULT_KEY = "default";
+
+ //Alloc serailzer base on string
+ IAsyncAccessSerializer<string> serializer = new AsyncAccessSerializer<string>(100, 100, StringComparer.Ordinal);
+
+ int maxCount = 64;
+
+ Task[] asyncArr = new int[maxCount].Select(async p =>
+ {
+ //Take a lock then random delay, then release
+ Task entry = serializer.WaitAsync(DEFAULT_KEY);
+
+ bool isCompleted = entry.IsCompleted;
+
+ Trace.WriteLineIf(isCompleted, "Wait was entered synchronously");
+
+ await Task.Delay(Random.Shared.Next(0, 10));
+
+ Trace.WriteLineIf(isCompleted != entry.IsCompleted, "Wait has transitioned to completed while waiting");
+ Trace.WriteLineIf(!entry.IsCompleted, "A call to wait will yield");
+
+ await entry;
+
+ serializer.Release(DEFAULT_KEY);
+
+ }).ToArray();
+
+ Task.WaitAll(asyncArr);
+ }
+
+ [TestMethod()]
+ [MethodImpl(MethodImplOptions.NoOptimization)]
+ public void RaceProtectionAASTest()
+ {
+ /*
+ * A very basic critical section to confirm threading consistency.
+ *
+ * Mutuating a string should be a reasonably large number of instructions due
+ * to the allocation copy, and assignment, to cause a race condition.
+ *
+ * Testing to make sure the string is updated consitently during a multi threaded
+ * process, and that a race condition occured when not using the serializer is a
+ * non-ideal test, but it is a simple test to confirm the serializer is working.
+ */
+
+ const string DEFAULT_KEY = "default";
+
+ //Alloc serailzer base on string
+ IAsyncAccessSerializer<string> serializer = new AsyncAccessSerializer<string>(100, 100, StringComparer.Ordinal);
+
+ int maxCount = 128;
+ string serialized = "";
+
+ using CancellationTokenSource cts = new(500);
+
+ Task[] asyncArr = new int[maxCount].Select(async p =>
+ {
+ //Take a lock then random delay, then release
+ await serializer.WaitAsync(DEFAULT_KEY, cts.Token);
+
+ //Increment count
+ serialized += "0";
+
+ serializer.Release(DEFAULT_KEY);
+
+ }).ToArray();
+
+ Task.WaitAll(asyncArr);
+
+ //Make sure count did not encounter any race conditions
+ Assert.AreEqual(maxCount, serialized.Length);
+ }
+
+ [TestMethod()]
+ public void SimplePerformanceComparisonTest()
+ {
+ const string DEFAULT_KEY = "default";
+
+ //Alloc serailzer base on string
+ IAsyncAccessSerializer<string> serializer = new AsyncAccessSerializer<string>(100, 100, StringComparer.Ordinal);
+
+ int maxCount = 128;
+ string test = "";
+ Stopwatch timer = new();
+
+ using CancellationTokenSource cts = new(500);
+
+ for (int i = 0; i < 10; i++)
+ {
+ test = "";
+ timer.Restart();
+
+ Task[] asyncArr = new int[maxCount].Select(async p =>
+ {
+ //Take a lock then random delay, then release
+ await serializer.WaitAsync(DEFAULT_KEY, cts.Token);
+
+ //Increment count
+ test += "0";
+
+ serializer.Release(DEFAULT_KEY);
+
+ }).ToArray();
+
+ Task.WaitAll(asyncArr);
+
+ timer.Stop();
+
+ Trace.WriteLine($"Async serialzier test completed in {timer.ElapsedTicks / 10} microseconds");
+ Assert.AreEqual(maxCount, test.Length);
+ }
+
+ using SemaphoreSlim slim = new(1,1);
+
+ for (int i = 0; i < 10; i++)
+ {
+ test = "";
+ timer.Restart();
+
+ Task[] asyncArr = new int[maxCount].Select(async p =>
+ {
+ //Take a lock then random delay, then release
+ await slim.WaitAsync(cts.Token);
+
+ //Increment count
+ test += "0";
+
+ slim.Release();
+ }).ToArray();
+
+ Task.WaitAll(asyncArr);
+
+ timer.Stop();
+
+ Trace.WriteLine($"SemaphoreSlim test completed in {timer.ElapsedTicks / 10} microseconds");
+
+ Assert.AreEqual(maxCount, test.Length);
+ }
}
}
} \ No newline at end of file
diff --git a/lib/Utils/tests/Memory/MemoryUtilTests.cs b/lib/Utils/tests/Memory/MemoryUtilTests.cs
index 85d3a60..2166eea 100644
--- a/lib/Utils/tests/Memory/MemoryUtilTests.cs
+++ b/lib/Utils/tests/Memory/MemoryUtilTests.cs
@@ -366,6 +366,7 @@ namespace VNLib.Utils.Memory.Tests
//Get stats
HeapStatistics postTest = MemoryUtil.GetSharedHeapStats();
+ Assert.IsFalse(postTest == default);
Assert.IsTrue(postTest.AllocatedBytes == preTest.AllocatedBytes + 1024);
Assert.IsTrue(postTest.AllocatedBlocks == preTest.AllocatedBlocks + 1);
diff --git a/lib/Utils/tests/Memory/NativeHeapTests.cs b/lib/Utils/tests/Memory/NativeHeapTests.cs
index d27d5fd..dde12cd 100644
--- a/lib/Utils/tests/Memory/NativeHeapTests.cs
+++ b/lib/Utils/tests/Memory/NativeHeapTests.cs
@@ -7,13 +7,13 @@ namespace VNLib.Utils.Memory.Tests
[TestClass()]
public class NativeHeapTests
{
+ const string RpMallocLibPath = "../../../../../Utils.Memory/vnlib_rpmalloc/build/Debug/vnlib_rpmalloc.dll";
+
[TestMethod()]
public void LoadHeapTest()
{
- const string TEST_HEAP_FILENAME = @"rpmalloc.dll";
-
//Try to load the global heap
- using NativeHeap heap = NativeHeap.LoadHeap(TEST_HEAP_FILENAME, System.Runtime.InteropServices.DllImportSearchPath.SafeDirectories, HeapCreation.None, 0);
+ using NativeHeap heap = NativeHeap.LoadHeap(RpMallocLibPath, System.Runtime.InteropServices.DllImportSearchPath.SafeDirectories, HeapCreation.None, 0);
Assert.IsFalse(heap.IsInvalid);
diff --git a/lib/Utils/tests/VnEncodingTests.cs b/lib/Utils/tests/VnEncodingTests.cs
index f2b5e85..a467843 100644
--- a/lib/Utils/tests/VnEncodingTests.cs
+++ b/lib/Utils/tests/VnEncodingTests.cs
@@ -26,11 +26,11 @@ using System;
using System.Linq;
using System.Text;
using System.Buffers;
+using System.Diagnostics;
using System.Buffers.Text;
using System.Security.Cryptography;
using Microsoft.VisualStudio.TestTools.UnitTesting;
-using System.Diagnostics;
namespace VNLib.Utils.Tests
{