diff options
Diffstat (limited to 'lib/VNLib.Data.Caching.ObjectCache/src')
-rw-r--r-- | lib/VNLib.Data.Caching.ObjectCache/src/BlobCache.cs | 106 | ||||
-rw-r--r-- | lib/VNLib.Data.Caching.ObjectCache/src/BlobItem.cs | 207 | ||||
-rw-r--r-- | lib/VNLib.Data.Caching.ObjectCache/src/CacheEntry.cs | 227 | ||||
-rw-r--r-- | lib/VNLib.Data.Caching.ObjectCache/src/CacheListener.cs | 64 | ||||
-rw-r--r-- | lib/VNLib.Data.Caching.ObjectCache/src/ChangeEvent.cs | 20 | ||||
-rw-r--r-- | lib/VNLib.Data.Caching.ObjectCache/src/ObjectCacheStore.cs | 357 |
6 files changed, 584 insertions, 397 deletions
diff --git a/lib/VNLib.Data.Caching.ObjectCache/src/BlobCache.cs b/lib/VNLib.Data.Caching.ObjectCache/src/BlobCache.cs index 26df351..fbb2dcc 100644 --- a/lib/VNLib.Data.Caching.ObjectCache/src/BlobCache.cs +++ b/lib/VNLib.Data.Caching.ObjectCache/src/BlobCache.cs @@ -23,116 +23,134 @@ */ using System; -using System.IO; -using System.Linq; using System.Collections.Generic; -using System.Diagnostics.CodeAnalysis; -using VNLib.Utils.IO; -using VNLib.Utils.Logging; -using VNLib.Utils.Memory; using VNLib.Utils.Memory.Caching; namespace VNLib.Data.Caching { + /// <summary> /// A general purpose binary data storage /// </summary> - public class BlobCache : LRUCache<string, MemoryHandle<byte>> + public class BlobCache : LRUCache<string, CacheEntry> { - readonly IUnmangedHeap Heap; - readonly DirectoryInfo SwapDir; - readonly ILogProvider Log; ///<inheritdoc/> public override bool IsReadOnly { get; } + ///<inheritdoc/> protected override int MaxCapacity { get; } - - + + /// <summary> /// Initializes a new <see cref="BlobCache"/> store /// </summary> - /// <param name="swapDir">The <see cref="IsolatedStorageDirectory"/> to swap blob data to when cache</param> /// <param name="maxCapacity">The maximum number of items to keep in memory</param> - /// <param name="log">A <see cref="ILogProvider"/> to write log data to</param> - /// <param name="heap">A <see cref="IUnmangedHeap"/> to allocate buffers and store <see cref="BlobItem"/> data in memory</param> - public BlobCache(DirectoryInfo swapDir, int maxCapacity, ILogProvider log, IUnmangedHeap heap) + /// <exception cref="ArgumentException"></exception> + public BlobCache(int maxCapacity) :base(StringComparer.Ordinal) { - IsReadOnly = false; + if(maxCapacity < 1) + { + throw new ArgumentException("The maxium capacity of the store must be a positive integer larger than 0", nameof(maxCapacity)); + } + MaxCapacity = maxCapacity; - SwapDir = swapDir; + //Update the lookup table size LookupTable.EnsureCapacity(maxCapacity); - //Set default heap if not specified - Heap = heap; - Log = log; } + ///<inheritdoc/> - protected override bool CacheMiss(string key, [NotNullWhen(true)] out MemoryHandle<byte>? value) + protected override bool CacheMiss(string key, out CacheEntry value) { - value = null; + value = default; return false; } + ///<inheritdoc/> - protected override void Evicted(KeyValuePair<string, MemoryHandle<byte>> evicted) + protected override void Evicted(ref KeyValuePair<string, CacheEntry> evicted) { - //Dispose the blob + //Dispose the cache item evicted.Value.Dispose(); } + /// <summary> - /// If the <see cref="BlobItem"/> is found in the store, changes the key + /// If the <see cref="CacheEntry"/> is found in the store, changes the key /// that referrences the blob. /// </summary> /// <param name="currentKey">The key that currently referrences the blob in the store</param> /// <param name="newKey">The new key that will referrence the blob</param> - /// <param name="blob">The <see cref="BlobItem"/> if its found in the store</param> + /// <param name="blob">The <see cref="CacheEntry"/> if its found in the store</param> /// <returns>True if the record was found and the key was changes</returns> - public bool TryChangeKey(string currentKey, string newKey, [NotNullWhen(true)] out MemoryHandle<byte>? blob) + public bool TryChangeKey(string currentKey, string newKey, out CacheEntry blob) { - if (LookupTable.Remove(currentKey, out LinkedListNode<KeyValuePair<string, MemoryHandle<byte>>>? node)) + //Try to get the node at the current key + if (LookupTable.Remove(currentKey, out LinkedListNode<KeyValuePair<string, CacheEntry>> ? node)) { //Remove the node from the ll List.Remove(node); - //Update the node kvp - blob = node.Value.Value; - node.Value = new KeyValuePair<string, MemoryHandle<byte>>(newKey, blob); + + //Get the stored blob + blob = node.ValueRef.Value; + + //Update the + node.Value = new KeyValuePair<string, CacheEntry>(newKey, blob); + //Add to end of list List.AddLast(node); + //Re-add to lookup table with new key LookupTable.Add(newKey, node); + return true; } - blob = null; + + blob = default; return false; } + /// <summary> - /// Removes the <see cref="BlobItem"/> from the store without disposing the blobl + /// Removes the <see cref="CacheEntry"/> from the store, and frees its resources /// </summary> - /// <param name="key">The key that referrences the <see cref="BlobItem"/> in the store</param> + /// <param name="key">The key that referrences the <see cref="CacheEntry"/> in the store</param> /// <returns>A value indicating if the blob was removed</returns> public override bool Remove(string key) { //Remove the item from the lookup table and if it exists, remove the node from the list - if (LookupTable.Remove(key, out LinkedListNode<KeyValuePair<string, MemoryHandle<byte>>>? node)) + if (!LookupTable.Remove(key, out LinkedListNode<KeyValuePair<string, CacheEntry>>? node)) + { + return false; + } + + //always dispose blob + using (node.ValueRef.Value) { - //Remove the new from the list + //Remove the node from the list List.Remove(node); - //dispose the buffer - node.Value.Value.Dispose(); - return true; } - return false; + return true; } + /// <summary> - /// Removes and disposes all blobl elements in cache (or in the backing store) + /// Removes all cache entires and disposes their held resources /// </summary> public override void Clear() { - foreach (MemoryHandle<byte> blob in List.Select(kp => kp.Value)) + //Start from first node + LinkedListNode<KeyValuePair<string, CacheEntry>>? node = List.First; + + //Classic ll node itteration + while(node != null) { - blob.Dispose(); + //Dispose the cache entry + node.ValueRef.Value.Dispose(); + + //Move to next node + node = node.Next; } + + //empty all cache entires in the store base.Clear(); } } diff --git a/lib/VNLib.Data.Caching.ObjectCache/src/BlobItem.cs b/lib/VNLib.Data.Caching.ObjectCache/src/BlobItem.cs deleted file mode 100644 index 728875f..0000000 --- a/lib/VNLib.Data.Caching.ObjectCache/src/BlobItem.cs +++ /dev/null @@ -1,207 +0,0 @@ -/* -* Copyright (c) 2022 Vaughn Nugent -* -* Library: VNLib -* Package: VNLib.Data.Caching.ObjectCache -* File: BlobItem.cs -* -* BlobItem.cs is part of VNLib.Data.Caching.ObjectCache which is part of the larger -* VNLib collection of libraries and utilities. -* -* VNLib.Data.Caching.ObjectCache is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License as -* published by the Free Software Foundation, either version 3 of the -* License, or (at your option) any later version. -* -* VNLib.Data.Caching.ObjectCache is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see https://www.gnu.org/licenses/. -*/ - -using System; -using System.IO; -using System.Threading; -using System.Threading.Tasks; - -using VNLib.Utils; -using VNLib.Utils.IO; -using VNLib.Utils.Memory; -using VNLib.Utils.Logging; -using VNLib.Utils.Extensions; - -namespace VNLib.Data.Caching -{ - /// <summary> - /// A general purpose binary storage item - /// </summary> - public class BlobItem //: VnDisposeable - { - /* - private static readonly JoinableTaskContext JTX = new(); - private static readonly Semaphore CentralSwapLock = new(Environment.ProcessorCount, Environment.ProcessorCount); - - private readonly VnMemoryStream _loadedData; - private bool _loaded; - - /// <summary> - /// The time the blob was last modified - /// </summary> - public DateTimeOffset LastAccessed { get; private set; } - - - /// <summary> - /// Gets the current size of the file (in bytes) as an atomic operation - /// </summary> - public int FileSize => (int)_loadedData.Length; - /// <summary> - /// The operation synchronization lock - /// </summary> - public AsyncReaderWriterLock OpLock { get; } - /// <summary> - /// Initializes a new <see cref="BlobItem"/> - /// </summary> - /// <param name="heap">The heap to allocate buffers from</param> - internal BlobItem(IUnmangedHeap heap) - { - _loadedData = new(heap); - OpLock = new AsyncReaderWriterLock(JTX); - _loaded = true; - LastAccessed = DateTimeOffset.UtcNow; - } - ///<inheritdoc/> - protected override void Free() - { - _loadedData.Dispose(); - OpLock.Dispose(); - } - - /// <summary> - /// Reads data from the internal buffer and copies it to the specified buffer. - /// Use the <see cref="FileSize"/> property to obtain the size of the internal buffer - /// </summary> - /// <param name="buffer">The buffer to copy data to</param> - /// <returns>When completed, the number of bytes copied to the buffer</returns> - public int Read(Span<byte> buffer) - { - //Make sure the blob has been swapped back into memory - if (!_loaded) - { - throw new InvalidOperationException("The blob was not loaded from the disk"); - } - //Read all data from the buffer and write it to the output buffer - _loadedData.AsSpan().CopyTo(buffer); - //Update last-accessed - LastAccessed = DateTimeOffset.UtcNow; - return (int)_loadedData.Length; - } - /// <summary> - /// Overwrites the internal buffer with the contents of the supplied buffer - /// </summary> - /// <param name="buffer">The buffer containing data to store within the blob</param> - /// <returns>A <see cref="ValueTask"/> that completes when write access has been granted and copied</returns> - /// <exception cref="InvalidOperationException"></exception> - public void Write(ReadOnlySpan<byte> buffer) - { - //Make sure the blob has been swapped back into memory - if (!_loaded) - { - throw new InvalidOperationException("The blob was not loaded from the disk"); - } - //Reset the buffer - _loadedData.SetLength(buffer.Length); - _loadedData.Seek(0, SeekOrigin.Begin); - _loadedData.Write(buffer); - LastAccessed = DateTimeOffset.UtcNow; - } - - /// <summary> - /// Writes the contents of the memory buffer to its designated file on the disk - /// </summary> - /// <param name="heap">The heap to allocate buffers from</param> - /// <param name="swapDir">The <see cref="IsolatedStorageDirectory"/> that stores the file</param> - /// <param name="filename">The name of the file to write data do</param> - /// <param name="log">A log to write errors to</param> - /// <returns>A task that completes when the swap to disk is complete</returns> - internal async Task SwapToDiskAsync(IUnmangedHeap heap, DirectoryInfo swapDir, string filename, ILogProvider log) - { - try - { - //Wait for write lock - await using (AsyncReaderWriterLock.Releaser releaser = await OpLock.WriteLockAsync()) - { - //Enter swap lock - await CentralSwapLock; - try - { - //Open swap file data stream - await using FileStream swapFile = swapDir.OpenFile(filename, FileMode.OpenOrCreate, FileAccess.ReadWrite, bufferSize: 8128); - //reset swap file - swapFile.SetLength(0); - //Seek loaded-data back to 0 before writing - _loadedData.Seek(0, SeekOrigin.Begin); - //Write loaded data to disk - await _loadedData.CopyToAsync(swapFile, 8128, heap); - } - finally - { - CentralSwapLock.Release(); - } - //Release memory held by stream - _loadedData.SetLength(0); - //Clear loaded flag - _loaded = false; - LastAccessed = DateTimeOffset.UtcNow; - } - log.Debug("Blob {name} swapped to disk", filename); - } - catch(Exception ex) - { - log.Error(ex, "Blob swap to disk error"); - } - } - /// <summary> - /// Reads the contents of the blob into a memory buffer from its designated file on disk - /// </summary> - /// <param name="heap">The heap to allocate buffers from</param> - /// <param name="swapDir">The <see cref="IsolatedStorageDirectory"/> that stores the file</param> - /// <param name="filename">The name of the file to write the blob data to</param> - /// <param name="log">A log to write errors to</param> - /// <returns>A task that completes when the swap from disk is complete</returns> - internal async Task SwapFromDiskAsync(IUnmangedHeap heap, DirectoryInfo swapDir, string filename, ILogProvider log) - { - try - { - //Wait for write lock - await using (AsyncReaderWriterLock.Releaser releaser = await OpLock.WriteLockAsync()) - { - //Enter swap lock - await CentralSwapLock; - try - { - //Open swap file data stream - await using FileStream swapFile = swapDir.OpenFile(filename, FileMode.OpenOrCreate, FileAccess.Read, bufferSize:8128); - //Copy from disk to memory - await swapFile.CopyToAsync(_loadedData, 8128, heap); - } - finally - { - CentralSwapLock.Release(); - } - //Set loaded flag - _loaded = true; - LastAccessed = DateTimeOffset.UtcNow; - } - log.Debug("Blob {name} swapped from disk", filename); - } - catch(Exception ex) - { - log.Error(ex, "Blob swap from disk error"); - } - } - */ - } -} diff --git a/lib/VNLib.Data.Caching.ObjectCache/src/CacheEntry.cs b/lib/VNLib.Data.Caching.ObjectCache/src/CacheEntry.cs new file mode 100644 index 0000000..8644d1d --- /dev/null +++ b/lib/VNLib.Data.Caching.ObjectCache/src/CacheEntry.cs @@ -0,0 +1,227 @@ +/* +* Copyright (c) 2022 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Data.Caching.ObjectCache +* File: BlobCache.cs +* +* BlobCache.cs is part of VNLib.Data.Caching.ObjectCache which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Data.Caching.ObjectCache is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Data.Caching.ObjectCache is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Buffers.Binary; +using System.Runtime.CompilerServices; + +using VNLib.Utils.Memory; +using VNLib.Utils.Extensions; + +namespace VNLib.Data.Caching +{ + /// <summary> + /// A structure that represents an item in cache + /// </summary> + public readonly struct CacheEntry : IDisposable, IEquatable<CacheEntry> + { + private const int TIME_SEGMENT_SIZE = sizeof(long); + + private const int LENGTH_SEGMENT_SIZE = sizeof(int); + + private const int DATA_SEGMENT_START = TIME_SEGMENT_SIZE + LENGTH_SEGMENT_SIZE; + + + //Only contain ref to backing handle to keep struct size small + private readonly MemoryHandle<byte> _handle; + + + /// <summary> + /// Creates a new <see cref="CacheEntry"/> and copies the initial data to the internal buffer + /// </summary> + /// <param name="data">The initial data to store</param> + /// <param name="heap">The heap to allocate the buffer from</param> + /// <returns>The new <see cref="CacheEntry"/></returns> + public static CacheEntry Create(ReadOnlySpan<byte> data, IUnmangedHeap heap) + { + //Calc buffer size + int bufferSize = GetRequiredHandleSize(data.Length); + + //Alloc buffer + MemoryHandle<byte> handle = heap.Alloc<byte>(bufferSize); + + //Create new entry from handle + CacheEntry entry = new (handle, data.Length); + + //Get the data segment + Span<byte> segment = entry.GetDataSegment(); + + //Copy data segment + data.CopyTo(segment); + + return entry; + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private static int GetRequiredHandleSize(int size) + { + //Caculate the minimum handle size to store all required information, rounded to nearest page + return (int)MemoryUtil.NearestPage(size + DATA_SEGMENT_START); + } + + private CacheEntry(MemoryHandle<byte> handle, int length) + { + _handle = handle; + //Store data length, assumes the handle is large enough to store it + SetLength(length); + } + + + ///<inheritdoc/> + public readonly void Dispose() => _handle.Dispose(); + + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private readonly Span<byte> GetTimeSegment() => _handle.AsSpan(0, TIME_SEGMENT_SIZE); + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private readonly Span<byte> GetLengthSegment() => _handle.AsSpan(TIME_SEGMENT_SIZE, LENGTH_SEGMENT_SIZE); + + /// <summary> + /// Gets the size of the block of memory held by the underlying handle + /// </summary> + /// <returns>The size of the block held by the current entry</returns> + /// <exception cref="ObjectDisposedException"></exception> + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public readonly nuint GetMemoryUsage() + { + _handle.ThrowIfClosed(); + return _handle.ByteLength; + } + + + /// <summary> + /// Gets the last set time + /// </summary> + /// <returns>The last date stored</returns> + /// <exception cref="ObjectDisposedException"></exception> + public readonly DateTime GetCreatedTime() + { + //Get the time segment and write the value in big endian + ReadOnlySpan<byte> segment = GetTimeSegment(); + + long ticks = BinaryPrimitives.ReadInt64BigEndian(segment); + + //ticks back to + return new(ticks); + } + + /// <summary> + /// Sets the last modified time + /// </summary> + /// <param name="time">The new time to set the handle to</param> + /// <exception cref="ObjectDisposedException"></exception> + public readonly void SetTime(DateTime time) + { + //Get native ticks value + long timeData = time.Ticks; + + //Get the time segment and write the value in big endian + Span<byte> segment = GetTimeSegment(); + + BinaryPrimitives.WriteInt64BigEndian(segment, timeData); + } + + /// <summary> + /// Gets the length of the data segment + /// </summary> + /// <returns>The length of the data segment</returns> + /// <exception cref="ObjectDisposedException"></exception> + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public readonly int GetLength() + { + //Get the length segment + ReadOnlySpan<byte> segment = GetLengthSegment(); + //Recover the integer + return BinaryPrimitives.ReadInt32BigEndian(segment); + } + + private readonly void SetLength(int length) + { + //Get the length segment + Span<byte> segment = GetLengthSegment(); + + //Update the length value + BinaryPrimitives.WriteInt32BigEndian(segment, length); + } + + /// <summary> + /// Gets the stored data segment + /// </summary> + /// <returns>The data segment</returns> + /// <exception cref="ObjectDisposedException"></exception> + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public readonly Span<byte> GetDataSegment() + { + //Get the actual length of the segment + int length = GetLength(); + //Get the segment from its begining offset and + return _handle.AsSpan(DATA_SEGMENT_START, length); + } + + /// <summary> + /// Writes the specified segment to the internal buffer and resizes the buffer if necessary. + /// This operation overwrites any previously stored data + /// </summary> + /// <param name="data">The data segment to store</param> + /// <exception cref="ObjectDisposedException"></exception> + public readonly void UpdateData(ReadOnlySpan<byte> data) + { + //Calc required buffer size + int bufferSize = GetRequiredHandleSize(data.Length); + + //Resize handle if required + _handle.ResizeIfSmaller(bufferSize); + + //Reset data length + SetLength(data.Length); + + //Get the data segment + Span<byte> segment = GetDataSegment(); + +#if DEBUG + //Test segment length is equvalent to the requested data length + System.Diagnostics.Debug.Assert(segment.Length == data.Length); +#endif + //Copy data segment + data.CopyTo(segment); + } + + + ///<inheritdoc/> + public override bool Equals(object? obj) => obj is CacheEntry entry && Equals(entry); + + ///<inheritdoc/> + public override int GetHashCode() => _handle.GetHashCode(); + + ///<inheritdoc/> + public static bool operator ==(CacheEntry left, CacheEntry right) => left.Equals(right); + + ///<inheritdoc/> + public static bool operator !=(CacheEntry left, CacheEntry right) => !(left == right); + + ///<inheritdoc/> + public bool Equals(CacheEntry other) => other.GetHashCode() == GetHashCode(); + } +} diff --git a/lib/VNLib.Data.Caching.ObjectCache/src/CacheListener.cs b/lib/VNLib.Data.Caching.ObjectCache/src/CacheListener.cs deleted file mode 100644 index 7f544e1..0000000 --- a/lib/VNLib.Data.Caching.ObjectCache/src/CacheListener.cs +++ /dev/null @@ -1,64 +0,0 @@ -/* -* Copyright (c) 2022 Vaughn Nugent -* -* Library: VNLib -* Package: VNLib.Data.Caching.ObjectCache -* File: CacheListener.cs -* -* CacheListener.cs is part of VNLib.Data.Caching.ObjectCache which is part of the larger -* VNLib collection of libraries and utilities. -* -* VNLib.Data.Caching.ObjectCache is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License as -* published by the Free Software Foundation, either version 3 of the -* License, or (at your option) any later version. -* -* VNLib.Data.Caching.ObjectCache is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see https://www.gnu.org/licenses/. -*/ - -using System; -using System.IO; - -using VNLib.Utils.Memory; -using VNLib.Net.Messaging.FBM.Server; - -namespace VNLib.Data.Caching -{ - /// <summary> - /// A base implementation of a memory/disk LRU data cache FBM listener - /// </summary> - public abstract class CacheListener : FBMListenerBase - { - /// <summary> - /// The directory swap files will be stored - /// </summary> - public DirectoryInfo? Directory { get; private set; } - /// <summary> - /// The Cache store to access data blobs - /// </summary> - protected BlobCache? Cache { get; private set; } - /// <summary> - /// The <see cref="IUnmangedHeap"/> to allocate buffers from - /// </summary> - protected IUnmangedHeap? Heap { get; private set; } - - /// <summary> - /// Initializes the <see cref="Cache"/> data store - /// </summary> - /// <param name="dir">The directory to swap cache records to</param> - /// <param name="cacheSize">The size of the LRU cache</param> - /// <param name="heap">The heap to allocate buffers from</param> - protected void InitCache(DirectoryInfo dir, int cacheSize, IUnmangedHeap heap) - { - Heap = heap; - Cache = new(dir, cacheSize, Log, Heap); - Directory = dir; - } - } -} diff --git a/lib/VNLib.Data.Caching.ObjectCache/src/ChangeEvent.cs b/lib/VNLib.Data.Caching.ObjectCache/src/ChangeEvent.cs index 7ee4427..2e8641e 100644 --- a/lib/VNLib.Data.Caching.ObjectCache/src/ChangeEvent.cs +++ b/lib/VNLib.Data.Caching.ObjectCache/src/ChangeEvent.cs @@ -27,11 +27,23 @@ namespace VNLib.Data.Caching.ObjectCache /// <summary> /// An event object that is passed when change events occur /// </summary> - public class ChangeEvent + public sealed class ChangeEvent { - public readonly string CurrentId; - public readonly string? AlternateId; - public readonly bool Deleted; + /// <summary> + /// The current id of the changed object + /// </summary> + public string CurrentId { get; } + + /// <summary> + /// The alternate id of the changed object if specified + /// </summary> + public string? AlternateId { get; } + + /// <summary> + /// A value that indicates if the object was deleted + /// </summary> + public bool Deleted { get; } + internal ChangeEvent(string id, string? alternate, bool deleted) { CurrentId = id; diff --git a/lib/VNLib.Data.Caching.ObjectCache/src/ObjectCacheStore.cs b/lib/VNLib.Data.Caching.ObjectCache/src/ObjectCacheStore.cs index 514d00b..05a4f02 100644 --- a/lib/VNLib.Data.Caching.ObjectCache/src/ObjectCacheStore.cs +++ b/lib/VNLib.Data.Caching.ObjectCache/src/ObjectCacheStore.cs @@ -34,6 +34,9 @@ using VNLib.Utils.Extensions; using VNLib.Net.Messaging.FBM.Server; using static VNLib.Data.Caching.Constants; + +#pragma warning disable CA1849 // Call async methods when in an async method + namespace VNLib.Data.Caching.ObjectCache { public delegate ReadOnlySpan<byte> GetBodyDataCallback<T>(T state); @@ -41,7 +44,7 @@ namespace VNLib.Data.Caching.ObjectCache /// <summary> /// A <see cref="FBMListener"/> implementation of a <see cref="CacheListener"/> /// </summary> - public class ObjectCacheStore : CacheListener, IDisposable + public class ObjectCacheStore : FBMListenerBase, IDisposable { private readonly SemaphoreSlim StoreLock; private bool disposedValue; @@ -55,6 +58,14 @@ namespace VNLib.Data.Caching.ObjectCache public AsyncQueue<ChangeEvent> EventQueue { get; } /// <summary> + /// The Cache store to access data blobs + /// </summary> + private readonly BlobCache Cache; + + private readonly IUnmangedHeap Heap; + + + /// <summary> /// Initialzies a new <see cref="ObjectCacheStore"/> /// </summary> /// <param name="dir">The <see cref="DirectoryInfo"/> to store blob files to</param> @@ -62,23 +73,25 @@ namespace VNLib.Data.Caching.ObjectCache /// <param name="log"></param> /// <param name="heap"></param> /// <param name="singleReader">A value that indicates if a single thread is processing events</param> - public ObjectCacheStore(DirectoryInfo dir, int cacheMax, ILogProvider log, IUnmangedHeap heap, bool singleReader) + public ObjectCacheStore(int cacheMax, ILogProvider log, IUnmangedHeap heap, bool singleReader) { Log = log; //We can use a single writer and single reader in this context EventQueue = new(true, singleReader); - InitCache(dir, cacheMax, heap); + Cache = new(cacheMax); + Heap = heap; InitListener(heap); StoreLock = new(1,1); } ///<inheritdoc/> - protected override async Task ProcessAsync(FBMContext context, object? userState, CancellationToken cancellationToken) + protected override Task ProcessAsync(FBMContext context, object? userState, CancellationToken exitToken) { try { //Get the action header string action = context.Method(); + //Optional newid header string? alternateId = context.NewObjectId(); @@ -88,82 +101,159 @@ namespace VNLib.Data.Caching.ObjectCache { //Get the object-id header string objectId = context.ObjectId(); - //Take lock on store - using SemSlimReleaser rel = await StoreLock.GetReleaserAsync(cancellationToken: cancellationToken); - if (Cache!.TryGetValue(objectId, out MemoryHandle<byte>? data)) + + //Try read sync + if (StoreLock.Wait(0)) { - //Set the status code and write the buffered data to the response buffer - context.CloseResponse(ResponseCodes.Okay); - //Copy data to response buffer - context.Response.WriteBody(data.Span); + try + { + UnsafeReadEntry(context, objectId); + } + finally + { + StoreLock.Release(); + } + + return Task.CompletedTask; } else { - context.CloseResponse(ResponseCodes.NotFound); + //Read entry async + return InternalReadEntryAsync(context, objectId, exitToken); } } - break; + case Actions.AddOrUpdate: { //Get the object-id header string objectId = context.ObjectId(); - //Add/update a blob async - await AddOrUpdateBlobAsync(objectId, alternateId, static context => context.Request.BodyData, context); - //Notify update the event bus - await EventQueue.EnqueueAsync(new(objectId, alternateId, false), cancellationToken); - //Set status code - context.CloseResponse(ResponseCodes.Okay); + + //Create change event for the object + ChangeEvent change = new(objectId, alternateId, false); + + //Attempt to aquire lock sync + if (StoreLock.Wait(0)) + { + //aquired sync + try + { + //Update the item + UnsafeAddOrUpdate(objectId, alternateId, GetBodyData, context); + } + finally + { + StoreLock.Release(); + } + + //Add to event queue + EnqueEvent(change); + + //Set status code + context.CloseResponse(ResponseCodes.Okay); + + return Task.CompletedTask; + } + else + { + //Lock will be awaited async and + return InternalAddOrUpdateAsync(context, change, exitToken); + } } - break; case Actions.Delete: { //Get the object-id header string objectId = context.ObjectId(); - - if (await DeleteItemAsync(objectId)) + + //Create change event + ChangeEvent change = new(objectId, alternateId, true); + + //See if lock can be entered without waiting + if (StoreLock.Wait(0)) { - //Notify deleted - await EventQueue.EnqueueAsync(new(objectId, null, true), cancellationToken); - //Set status header - context.CloseResponse(ResponseCodes.Okay); + bool found = false; + + try + { + //Sync + found = UnsafeDeleteEntry(objectId); + } + finally + { + StoreLock.Release(); + } + + //Notify change + EnqueEvent(change); + + //Set status code if found + context.CloseResponse(found ? ResponseCodes.Okay : ResponseCodes.NotFound); + + return Task.CompletedTask; } else { - //Set status header - context.CloseResponse(ResponseCodes.NotFound); + //lock will yeild async + return InternalDeleteAsync(context, change, exitToken); } } - break; // event queue dequeue request case Actions.Dequeue: { + static void SetResponse(ChangeEvent change, FBMContext context) + { + if (change.Deleted) + { + context.CloseResponse("deleted"); + context.Response.WriteHeader(ObjectId, change.CurrentId); + } + else + { + //Changed + context.CloseResponse("modified"); + context.Response.WriteHeader(ObjectId, change.CurrentId); + + //Set old id if an old id is set + if (change.CurrentId != null) + { + context.Response.WriteHeader(NewObjectId, change.AlternateId); + } + } + } + + static async Task DequeAsync(AsyncQueue<ChangeEvent> queue, FBMContext context, CancellationToken exitToken) + { + //Wait for a new message to process + ChangeEvent ev = await queue.DequeueAsync(exitToken); + + //Set the response + SetResponse(ev, context); + } + //If no event bus is registered, then this is not a legal command if (userState is not AsyncQueue<ChangeEvent> eventBus) { context.CloseResponse(ResponseCodes.NotFound); - break; + + return Task.CompletedTask; } - //Wait for a new message to process - ChangeEvent ev = await eventBus.DequeueAsync(cancellationToken); - if (ev.Deleted) + + //try to deq without awaiting + if (eventBus.TryDequeue(out ChangeEvent? change)) { - context.CloseResponse("deleted"); - context.Response.WriteHeader(ObjectId, ev.CurrentId); + SetResponse(change, context); + + return Task.CompletedTask; } else { - //Changed - context.CloseResponse("modified"); - context.Response.WriteHeader(ObjectId, ev.CurrentId); - //Set old id if an old id is set - if (ev.CurrentId != null) - { - context.Response.WriteHeader(NewObjectId, ev.AlternateId); - } + //Process async + return DequeAsync(eventBus, context, exitToken); } } - break; + } + + Log.Error("Unhandled cache event!"); } catch (OperationCanceledException) { @@ -175,68 +265,178 @@ namespace VNLib.Data.Caching.ObjectCache Log.Error(ex); context.CloseResponse(ResponseCodes.Error); } + + return Task.CompletedTask; } + + + private static ReadOnlySpan<byte> GetBodyData(FBMContext ctx) => ctx.Request.BodyData; - /// <summary> - /// Asynchronously deletes a previously stored item - /// </summary> - /// <param name="id">The id of the object to delete</param> - /// <returns>A task that completes when the item has been deleted</returns> - public async Task<bool> DeleteItemAsync(string id) + private void EnqueEvent(ChangeEvent change) { - using SemSlimReleaser rel = await StoreLock.GetReleaserAsync(); - return Cache!.Remove(id); + if (!EventQueue.TryEnque(change)) + { + Log.Warn("Change event {ev} was not enqued because the event queue is overflowing!", change.CurrentId); + } } - - /// <summary> - /// Asynchronously adds or updates an object in the store and optionally update's its id - /// </summary> - /// <param name="objectId">The current (or old) id of the object</param> - /// <param name="alternateId">An optional id to update the blob to</param> - /// <param name="bodyData">A callback that returns the data for the blob</param> - /// <param name="state">The state parameter to pass to the data callback</param> - /// <returns></returns> - public async Task AddOrUpdateBlobAsync<T>(string objectId, string? alternateId, GetBodyDataCallback<T> bodyData, T state) + + private void UnsafeReadEntry(FBMContext context, string objectId) { - MemoryHandle<byte>? blob; + if (Cache!.TryGetValue(objectId, out CacheEntry data)) + { + //Set the status code and write the buffered data to the response buffer + context.CloseResponse(ResponseCodes.Okay); + + //Copy data to response buffer + context.Response.WriteBody(data.GetDataSegment()); + } + else + { + context.CloseResponse(ResponseCodes.NotFound); + } + } + + async Task InternalReadEntryAsync(FBMContext context, string objectId, CancellationToken cancellation) + { + //enter lock async + using SemSlimReleaser rel = await StoreLock.GetReleaserAsync(cancellation); + + UnsafeReadEntry(context, objectId); + } + + private async Task InternalAddOrUpdateAsync(FBMContext context, ChangeEvent change, CancellationToken cancellation) + { + //Wait for lock since we know it will yeild async + using (SemSlimReleaser rel = await StoreLock.GetReleaserAsync(cancellation)) + { + UnsafeAddOrUpdate(change.CurrentId, change.AlternateId, GetBodyData, context); + } + + //Add to event queue + EnqueEvent(change); + + //Set status code + context.CloseResponse(ResponseCodes.Okay); + } + + private void UnsafeAddOrUpdate<T>(string objectId, string? alternateId, GetBodyDataCallback<T> bodyData, T state) + { + CacheEntry entry; + //See if new/alt session id was specified if (string.IsNullOrWhiteSpace(alternateId)) { - //Take lock on store - using SemSlimReleaser rel = await StoreLock.GetReleaserAsync(); //See if blob exists - if (!Cache!.TryGetValue(objectId, out blob)) + if (!Cache!.TryGetValue(objectId, out entry)) { - //If not, create new blob and add to store - blob = Heap.AllocAndCopy(bodyData(state)); - Cache.Add(objectId, blob); + //Create the new cache entry since it does not exist + entry = CacheEntry.Create(bodyData(state), Heap); + + //Add to cache + Cache.Add(objectId, entry); } else { //Reset the buffer state - blob.WriteAndResize(bodyData(state)); + entry.UpdateData(bodyData(state)); } } //Need to change the id of the record else { - //Take lock on store - using SemSlimReleaser rel = await StoreLock.GetReleaserAsync(); //Try to change the blob key - if (!Cache!.TryChangeKey(objectId, alternateId, out blob)) + if (!Cache!.TryChangeKey(objectId, alternateId, out entry)) { - //Blob not found, create new blob - blob = Heap.AllocAndCopy(bodyData(state)); - Cache.Add(alternateId, blob); + //Create the new cache entry since it does not exist + entry = CacheEntry.Create(bodyData(state), Heap); + + //Add to cache by its alternate id + Cache.Add(alternateId, entry); } else { //Reset the buffer state - blob.WriteAndResize(bodyData(state)); + entry.UpdateData(bodyData(state)); } } + + //Update modified time to current utc time + entry.SetTime(DateTime.UtcNow); + } + + private async Task InternalDeleteAsync(FBMContext context, ChangeEvent change, CancellationToken cancellation) + { + bool found = false; + + //enter the lock + using(SemSlimReleaser rel = await StoreLock.GetReleaserAsync(cancellation)) + { + //Sync + found = UnsafeDeleteEntry(change.CurrentId); + } + + //Notify change + EnqueEvent(change); + + //Set status code if found + context.CloseResponse(found ? ResponseCodes.Okay : ResponseCodes.NotFound); } - + + private bool UnsafeDeleteEntry(string id) => Cache!.Remove(id); + + + /// <summary> + /// Asynchronously adds or updates an object in the store and optionally update's its id + /// </summary> + /// <param name="objectId">The current (or old) id of the object</param> + /// <param name="alternateId">An optional id to update the blob to</param> + /// <param name="bodyData">A callback that returns the data for the blob</param> + /// <param name="state">The state parameter to pass to the data callback</param> + /// <param name="token">A token to cancel the async operation</param> + /// <returns>A value task that represents the async operation</returns> + public async ValueTask AddOrUpdateBlobAsync<T>(string objectId, string? alternateId, GetBodyDataCallback<T> bodyData, T state, CancellationToken token = default) + { + //Test the lock before waiting async + if (!StoreLock.Wait(0)) + { + //Wait async to avoid task alloc + await StoreLock.WaitAsync(token); + } + try + { + UnsafeAddOrUpdate(objectId, alternateId, bodyData, state); + } + finally + { + StoreLock.Release(); + } + } + + /// <summary> + /// Asynchronously deletes a previously stored item + /// </summary> + /// <param name="id">The id of the object to delete</param> + /// <param name="token">A token to cancel the async lock await</param> + /// <returns>A task that completes when the item has been deleted</returns> + public async ValueTask<bool> DeleteItemAsync(string id, CancellationToken token = default) + { + //Test the lock before waiting async + if (!StoreLock.Wait(0)) + { + //Wait async to avoid task alloc + await StoreLock.WaitAsync(token); + } + try + { + return UnsafeDeleteEntry(id); + } + finally + { + StoreLock.Release(); + } + } + + ///<inheritdoc/> protected virtual void Dispose(bool disposing) { @@ -252,6 +452,7 @@ namespace VNLib.Data.Caching.ObjectCache disposedValue = true; } } + ///<inheritdoc/> public void Dispose() { |