aboutsummaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorLibravatar vnugent <public@vaughnnugent.com>2023-01-27 21:13:16 -0500
committerLibravatar vnugent <public@vaughnnugent.com>2023-01-27 21:13:16 -0500
commit892bbaaa5c1f62631070cc74820f349c4c80f55d (patch)
treec04dfa8c5a3ead1522502635d4bc9696102d28dc /lib
parent0ea612dde50e82d722b0654e0e569fd4e7469978 (diff)
Object cache overhaul and logger updates
Diffstat (limited to 'lib')
-rw-r--r--lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs20
-rw-r--r--lib/VNLib.Data.Caching.ObjectCache/src/BlobCache.cs106
-rw-r--r--lib/VNLib.Data.Caching.ObjectCache/src/BlobItem.cs207
-rw-r--r--lib/VNLib.Data.Caching.ObjectCache/src/CacheEntry.cs227
-rw-r--r--lib/VNLib.Data.Caching.ObjectCache/src/CacheListener.cs64
-rw-r--r--lib/VNLib.Data.Caching.ObjectCache/src/ChangeEvent.cs20
-rw-r--r--lib/VNLib.Data.Caching.ObjectCache/src/ObjectCacheStore.cs357
-rw-r--r--lib/VNLib.Data.Caching/src/ClientExtensions.cs30
-rw-r--r--lib/VNLib.Plugins.Extensions.VNCache/src/VNCacheExtensions.cs2
9 files changed, 617 insertions, 416 deletions
diff --git a/lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs b/lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs
index ebdfd5b..222240a 100644
--- a/lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs
+++ b/lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs
@@ -81,14 +81,28 @@ namespace VNLib.Data.Caching.Extensions
/// <returns>A preconfigured <see cref="FBMClientConfig"/> for object caching</returns>
public static FBMClientConfig GetDefaultConfig(IUnmangedHeap heap, int maxMessageSize, ILogProvider? debugLog = null)
{
+ /*
+ * Max message size (for server) should account for max data + the additional header buffer
+ */
+ int maxExtra = (int)Helpers.ToNearestKb((int)(maxMessageSize * 1.2) + MAX_FBM_MESSAGE_HEADER_SIZE);
+
return new()
{
BufferHeap = heap,
- MaxMessageSize = maxMessageSize * 2,
- RecvBufferSize = maxMessageSize,
- MessageBufferSize = maxMessageSize,
+
+ //Max message size is referrences
+ MaxMessageSize = maxExtra,
+
+ //The size of the buffer used for buffering incoming messages
+ RecvBufferSize = maxExtra,
+ //Message buffer should be max message + headers
+ MessageBufferSize = (int)Helpers.ToNearestKb(maxMessageSize + MAX_FBM_MESSAGE_HEADER_SIZE),
+
+ //Caching only requires a fixed number of request headers, so we can used a fixed buffer size
MaxHeaderBufferSize = MAX_FBM_MESSAGE_HEADER_SIZE,
+
+ //Set the optional cache sub-protocol
SubProtocol = CACHE_WS_SUB_PROCOL,
HeaderEncoding = Helpers.DefaultEncoding,
diff --git a/lib/VNLib.Data.Caching.ObjectCache/src/BlobCache.cs b/lib/VNLib.Data.Caching.ObjectCache/src/BlobCache.cs
index 26df351..fbb2dcc 100644
--- a/lib/VNLib.Data.Caching.ObjectCache/src/BlobCache.cs
+++ b/lib/VNLib.Data.Caching.ObjectCache/src/BlobCache.cs
@@ -23,116 +23,134 @@
*/
using System;
-using System.IO;
-using System.Linq;
using System.Collections.Generic;
-using System.Diagnostics.CodeAnalysis;
-using VNLib.Utils.IO;
-using VNLib.Utils.Logging;
-using VNLib.Utils.Memory;
using VNLib.Utils.Memory.Caching;
namespace VNLib.Data.Caching
{
+
/// <summary>
/// A general purpose binary data storage
/// </summary>
- public class BlobCache : LRUCache<string, MemoryHandle<byte>>
+ public class BlobCache : LRUCache<string, CacheEntry>
{
- readonly IUnmangedHeap Heap;
- readonly DirectoryInfo SwapDir;
- readonly ILogProvider Log;
///<inheritdoc/>
public override bool IsReadOnly { get; }
+
///<inheritdoc/>
protected override int MaxCapacity { get; }
-
-
+
+
/// <summary>
/// Initializes a new <see cref="BlobCache"/> store
/// </summary>
- /// <param name="swapDir">The <see cref="IsolatedStorageDirectory"/> to swap blob data to when cache</param>
/// <param name="maxCapacity">The maximum number of items to keep in memory</param>
- /// <param name="log">A <see cref="ILogProvider"/> to write log data to</param>
- /// <param name="heap">A <see cref="IUnmangedHeap"/> to allocate buffers and store <see cref="BlobItem"/> data in memory</param>
- public BlobCache(DirectoryInfo swapDir, int maxCapacity, ILogProvider log, IUnmangedHeap heap)
+ /// <exception cref="ArgumentException"></exception>
+ public BlobCache(int maxCapacity)
:base(StringComparer.Ordinal)
{
- IsReadOnly = false;
+ if(maxCapacity < 1)
+ {
+ throw new ArgumentException("The maxium capacity of the store must be a positive integer larger than 0", nameof(maxCapacity));
+ }
+
MaxCapacity = maxCapacity;
- SwapDir = swapDir;
+
//Update the lookup table size
LookupTable.EnsureCapacity(maxCapacity);
- //Set default heap if not specified
- Heap = heap;
- Log = log;
}
+
///<inheritdoc/>
- protected override bool CacheMiss(string key, [NotNullWhen(true)] out MemoryHandle<byte>? value)
+ protected override bool CacheMiss(string key, out CacheEntry value)
{
- value = null;
+ value = default;
return false;
}
+
///<inheritdoc/>
- protected override void Evicted(KeyValuePair<string, MemoryHandle<byte>> evicted)
+ protected override void Evicted(ref KeyValuePair<string, CacheEntry> evicted)
{
- //Dispose the blob
+ //Dispose the cache item
evicted.Value.Dispose();
}
+
/// <summary>
- /// If the <see cref="BlobItem"/> is found in the store, changes the key
+ /// If the <see cref="CacheEntry"/> is found in the store, changes the key
/// that referrences the blob.
/// </summary>
/// <param name="currentKey">The key that currently referrences the blob in the store</param>
/// <param name="newKey">The new key that will referrence the blob</param>
- /// <param name="blob">The <see cref="BlobItem"/> if its found in the store</param>
+ /// <param name="blob">The <see cref="CacheEntry"/> if its found in the store</param>
/// <returns>True if the record was found and the key was changes</returns>
- public bool TryChangeKey(string currentKey, string newKey, [NotNullWhen(true)] out MemoryHandle<byte>? blob)
+ public bool TryChangeKey(string currentKey, string newKey, out CacheEntry blob)
{
- if (LookupTable.Remove(currentKey, out LinkedListNode<KeyValuePair<string, MemoryHandle<byte>>>? node))
+ //Try to get the node at the current key
+ if (LookupTable.Remove(currentKey, out LinkedListNode<KeyValuePair<string, CacheEntry>> ? node))
{
//Remove the node from the ll
List.Remove(node);
- //Update the node kvp
- blob = node.Value.Value;
- node.Value = new KeyValuePair<string, MemoryHandle<byte>>(newKey, blob);
+
+ //Get the stored blob
+ blob = node.ValueRef.Value;
+
+ //Update the
+ node.Value = new KeyValuePair<string, CacheEntry>(newKey, blob);
+
//Add to end of list
List.AddLast(node);
+
//Re-add to lookup table with new key
LookupTable.Add(newKey, node);
+
return true;
}
- blob = null;
+
+ blob = default;
return false;
}
+
/// <summary>
- /// Removes the <see cref="BlobItem"/> from the store without disposing the blobl
+ /// Removes the <see cref="CacheEntry"/> from the store, and frees its resources
/// </summary>
- /// <param name="key">The key that referrences the <see cref="BlobItem"/> in the store</param>
+ /// <param name="key">The key that referrences the <see cref="CacheEntry"/> in the store</param>
/// <returns>A value indicating if the blob was removed</returns>
public override bool Remove(string key)
{
//Remove the item from the lookup table and if it exists, remove the node from the list
- if (LookupTable.Remove(key, out LinkedListNode<KeyValuePair<string, MemoryHandle<byte>>>? node))
+ if (!LookupTable.Remove(key, out LinkedListNode<KeyValuePair<string, CacheEntry>>? node))
+ {
+ return false;
+ }
+
+ //always dispose blob
+ using (node.ValueRef.Value)
{
- //Remove the new from the list
+ //Remove the node from the list
List.Remove(node);
- //dispose the buffer
- node.Value.Value.Dispose();
- return true;
}
- return false;
+ return true;
}
+
/// <summary>
- /// Removes and disposes all blobl elements in cache (or in the backing store)
+ /// Removes all cache entires and disposes their held resources
/// </summary>
public override void Clear()
{
- foreach (MemoryHandle<byte> blob in List.Select(kp => kp.Value))
+ //Start from first node
+ LinkedListNode<KeyValuePair<string, CacheEntry>>? node = List.First;
+
+ //Classic ll node itteration
+ while(node != null)
{
- blob.Dispose();
+ //Dispose the cache entry
+ node.ValueRef.Value.Dispose();
+
+ //Move to next node
+ node = node.Next;
}
+
+ //empty all cache entires in the store
base.Clear();
}
}
diff --git a/lib/VNLib.Data.Caching.ObjectCache/src/BlobItem.cs b/lib/VNLib.Data.Caching.ObjectCache/src/BlobItem.cs
deleted file mode 100644
index 728875f..0000000
--- a/lib/VNLib.Data.Caching.ObjectCache/src/BlobItem.cs
+++ /dev/null
@@ -1,207 +0,0 @@
-/*
-* Copyright (c) 2022 Vaughn Nugent
-*
-* Library: VNLib
-* Package: VNLib.Data.Caching.ObjectCache
-* File: BlobItem.cs
-*
-* BlobItem.cs is part of VNLib.Data.Caching.ObjectCache which is part of the larger
-* VNLib collection of libraries and utilities.
-*
-* VNLib.Data.Caching.ObjectCache is free software: you can redistribute it and/or modify
-* it under the terms of the GNU Affero General Public License as
-* published by the Free Software Foundation, either version 3 of the
-* License, or (at your option) any later version.
-*
-* VNLib.Data.Caching.ObjectCache is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-* GNU Affero General Public License for more details.
-*
-* You should have received a copy of the GNU Affero General Public License
-* along with this program. If not, see https://www.gnu.org/licenses/.
-*/
-
-using System;
-using System.IO;
-using System.Threading;
-using System.Threading.Tasks;
-
-using VNLib.Utils;
-using VNLib.Utils.IO;
-using VNLib.Utils.Memory;
-using VNLib.Utils.Logging;
-using VNLib.Utils.Extensions;
-
-namespace VNLib.Data.Caching
-{
- /// <summary>
- /// A general purpose binary storage item
- /// </summary>
- public class BlobItem //: VnDisposeable
- {
- /*
- private static readonly JoinableTaskContext JTX = new();
- private static readonly Semaphore CentralSwapLock = new(Environment.ProcessorCount, Environment.ProcessorCount);
-
- private readonly VnMemoryStream _loadedData;
- private bool _loaded;
-
- /// <summary>
- /// The time the blob was last modified
- /// </summary>
- public DateTimeOffset LastAccessed { get; private set; }
-
-
- /// <summary>
- /// Gets the current size of the file (in bytes) as an atomic operation
- /// </summary>
- public int FileSize => (int)_loadedData.Length;
- /// <summary>
- /// The operation synchronization lock
- /// </summary>
- public AsyncReaderWriterLock OpLock { get; }
- /// <summary>
- /// Initializes a new <see cref="BlobItem"/>
- /// </summary>
- /// <param name="heap">The heap to allocate buffers from</param>
- internal BlobItem(IUnmangedHeap heap)
- {
- _loadedData = new(heap);
- OpLock = new AsyncReaderWriterLock(JTX);
- _loaded = true;
- LastAccessed = DateTimeOffset.UtcNow;
- }
- ///<inheritdoc/>
- protected override void Free()
- {
- _loadedData.Dispose();
- OpLock.Dispose();
- }
-
- /// <summary>
- /// Reads data from the internal buffer and copies it to the specified buffer.
- /// Use the <see cref="FileSize"/> property to obtain the size of the internal buffer
- /// </summary>
- /// <param name="buffer">The buffer to copy data to</param>
- /// <returns>When completed, the number of bytes copied to the buffer</returns>
- public int Read(Span<byte> buffer)
- {
- //Make sure the blob has been swapped back into memory
- if (!_loaded)
- {
- throw new InvalidOperationException("The blob was not loaded from the disk");
- }
- //Read all data from the buffer and write it to the output buffer
- _loadedData.AsSpan().CopyTo(buffer);
- //Update last-accessed
- LastAccessed = DateTimeOffset.UtcNow;
- return (int)_loadedData.Length;
- }
- /// <summary>
- /// Overwrites the internal buffer with the contents of the supplied buffer
- /// </summary>
- /// <param name="buffer">The buffer containing data to store within the blob</param>
- /// <returns>A <see cref="ValueTask"/> that completes when write access has been granted and copied</returns>
- /// <exception cref="InvalidOperationException"></exception>
- public void Write(ReadOnlySpan<byte> buffer)
- {
- //Make sure the blob has been swapped back into memory
- if (!_loaded)
- {
- throw new InvalidOperationException("The blob was not loaded from the disk");
- }
- //Reset the buffer
- _loadedData.SetLength(buffer.Length);
- _loadedData.Seek(0, SeekOrigin.Begin);
- _loadedData.Write(buffer);
- LastAccessed = DateTimeOffset.UtcNow;
- }
-
- /// <summary>
- /// Writes the contents of the memory buffer to its designated file on the disk
- /// </summary>
- /// <param name="heap">The heap to allocate buffers from</param>
- /// <param name="swapDir">The <see cref="IsolatedStorageDirectory"/> that stores the file</param>
- /// <param name="filename">The name of the file to write data do</param>
- /// <param name="log">A log to write errors to</param>
- /// <returns>A task that completes when the swap to disk is complete</returns>
- internal async Task SwapToDiskAsync(IUnmangedHeap heap, DirectoryInfo swapDir, string filename, ILogProvider log)
- {
- try
- {
- //Wait for write lock
- await using (AsyncReaderWriterLock.Releaser releaser = await OpLock.WriteLockAsync())
- {
- //Enter swap lock
- await CentralSwapLock;
- try
- {
- //Open swap file data stream
- await using FileStream swapFile = swapDir.OpenFile(filename, FileMode.OpenOrCreate, FileAccess.ReadWrite, bufferSize: 8128);
- //reset swap file
- swapFile.SetLength(0);
- //Seek loaded-data back to 0 before writing
- _loadedData.Seek(0, SeekOrigin.Begin);
- //Write loaded data to disk
- await _loadedData.CopyToAsync(swapFile, 8128, heap);
- }
- finally
- {
- CentralSwapLock.Release();
- }
- //Release memory held by stream
- _loadedData.SetLength(0);
- //Clear loaded flag
- _loaded = false;
- LastAccessed = DateTimeOffset.UtcNow;
- }
- log.Debug("Blob {name} swapped to disk", filename);
- }
- catch(Exception ex)
- {
- log.Error(ex, "Blob swap to disk error");
- }
- }
- /// <summary>
- /// Reads the contents of the blob into a memory buffer from its designated file on disk
- /// </summary>
- /// <param name="heap">The heap to allocate buffers from</param>
- /// <param name="swapDir">The <see cref="IsolatedStorageDirectory"/> that stores the file</param>
- /// <param name="filename">The name of the file to write the blob data to</param>
- /// <param name="log">A log to write errors to</param>
- /// <returns>A task that completes when the swap from disk is complete</returns>
- internal async Task SwapFromDiskAsync(IUnmangedHeap heap, DirectoryInfo swapDir, string filename, ILogProvider log)
- {
- try
- {
- //Wait for write lock
- await using (AsyncReaderWriterLock.Releaser releaser = await OpLock.WriteLockAsync())
- {
- //Enter swap lock
- await CentralSwapLock;
- try
- {
- //Open swap file data stream
- await using FileStream swapFile = swapDir.OpenFile(filename, FileMode.OpenOrCreate, FileAccess.Read, bufferSize:8128);
- //Copy from disk to memory
- await swapFile.CopyToAsync(_loadedData, 8128, heap);
- }
- finally
- {
- CentralSwapLock.Release();
- }
- //Set loaded flag
- _loaded = true;
- LastAccessed = DateTimeOffset.UtcNow;
- }
- log.Debug("Blob {name} swapped from disk", filename);
- }
- catch(Exception ex)
- {
- log.Error(ex, "Blob swap from disk error");
- }
- }
- */
- }
-}
diff --git a/lib/VNLib.Data.Caching.ObjectCache/src/CacheEntry.cs b/lib/VNLib.Data.Caching.ObjectCache/src/CacheEntry.cs
new file mode 100644
index 0000000..8644d1d
--- /dev/null
+++ b/lib/VNLib.Data.Caching.ObjectCache/src/CacheEntry.cs
@@ -0,0 +1,227 @@
+/*
+* Copyright (c) 2022 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Data.Caching.ObjectCache
+* File: BlobCache.cs
+*
+* BlobCache.cs is part of VNLib.Data.Caching.ObjectCache which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* VNLib.Data.Caching.ObjectCache is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Data.Caching.ObjectCache is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+using System.Buffers.Binary;
+using System.Runtime.CompilerServices;
+
+using VNLib.Utils.Memory;
+using VNLib.Utils.Extensions;
+
+namespace VNLib.Data.Caching
+{
+ /// <summary>
+ /// A structure that represents an item in cache
+ /// </summary>
+ public readonly struct CacheEntry : IDisposable, IEquatable<CacheEntry>
+ {
+ private const int TIME_SEGMENT_SIZE = sizeof(long);
+
+ private const int LENGTH_SEGMENT_SIZE = sizeof(int);
+
+ private const int DATA_SEGMENT_START = TIME_SEGMENT_SIZE + LENGTH_SEGMENT_SIZE;
+
+
+ //Only contain ref to backing handle to keep struct size small
+ private readonly MemoryHandle<byte> _handle;
+
+
+ /// <summary>
+ /// Creates a new <see cref="CacheEntry"/> and copies the initial data to the internal buffer
+ /// </summary>
+ /// <param name="data">The initial data to store</param>
+ /// <param name="heap">The heap to allocate the buffer from</param>
+ /// <returns>The new <see cref="CacheEntry"/></returns>
+ public static CacheEntry Create(ReadOnlySpan<byte> data, IUnmangedHeap heap)
+ {
+ //Calc buffer size
+ int bufferSize = GetRequiredHandleSize(data.Length);
+
+ //Alloc buffer
+ MemoryHandle<byte> handle = heap.Alloc<byte>(bufferSize);
+
+ //Create new entry from handle
+ CacheEntry entry = new (handle, data.Length);
+
+ //Get the data segment
+ Span<byte> segment = entry.GetDataSegment();
+
+ //Copy data segment
+ data.CopyTo(segment);
+
+ return entry;
+ }
+
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ private static int GetRequiredHandleSize(int size)
+ {
+ //Caculate the minimum handle size to store all required information, rounded to nearest page
+ return (int)MemoryUtil.NearestPage(size + DATA_SEGMENT_START);
+ }
+
+ private CacheEntry(MemoryHandle<byte> handle, int length)
+ {
+ _handle = handle;
+ //Store data length, assumes the handle is large enough to store it
+ SetLength(length);
+ }
+
+
+ ///<inheritdoc/>
+ public readonly void Dispose() => _handle.Dispose();
+
+
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ private readonly Span<byte> GetTimeSegment() => _handle.AsSpan(0, TIME_SEGMENT_SIZE);
+
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ private readonly Span<byte> GetLengthSegment() => _handle.AsSpan(TIME_SEGMENT_SIZE, LENGTH_SEGMENT_SIZE);
+
+ /// <summary>
+ /// Gets the size of the block of memory held by the underlying handle
+ /// </summary>
+ /// <returns>The size of the block held by the current entry</returns>
+ /// <exception cref="ObjectDisposedException"></exception>
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public readonly nuint GetMemoryUsage()
+ {
+ _handle.ThrowIfClosed();
+ return _handle.ByteLength;
+ }
+
+
+ /// <summary>
+ /// Gets the last set time
+ /// </summary>
+ /// <returns>The last date stored</returns>
+ /// <exception cref="ObjectDisposedException"></exception>
+ public readonly DateTime GetCreatedTime()
+ {
+ //Get the time segment and write the value in big endian
+ ReadOnlySpan<byte> segment = GetTimeSegment();
+
+ long ticks = BinaryPrimitives.ReadInt64BigEndian(segment);
+
+ //ticks back to
+ return new(ticks);
+ }
+
+ /// <summary>
+ /// Sets the last modified time
+ /// </summary>
+ /// <param name="time">The new time to set the handle to</param>
+ /// <exception cref="ObjectDisposedException"></exception>
+ public readonly void SetTime(DateTime time)
+ {
+ //Get native ticks value
+ long timeData = time.Ticks;
+
+ //Get the time segment and write the value in big endian
+ Span<byte> segment = GetTimeSegment();
+
+ BinaryPrimitives.WriteInt64BigEndian(segment, timeData);
+ }
+
+ /// <summary>
+ /// Gets the length of the data segment
+ /// </summary>
+ /// <returns>The length of the data segment</returns>
+ /// <exception cref="ObjectDisposedException"></exception>
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public readonly int GetLength()
+ {
+ //Get the length segment
+ ReadOnlySpan<byte> segment = GetLengthSegment();
+ //Recover the integer
+ return BinaryPrimitives.ReadInt32BigEndian(segment);
+ }
+
+ private readonly void SetLength(int length)
+ {
+ //Get the length segment
+ Span<byte> segment = GetLengthSegment();
+
+ //Update the length value
+ BinaryPrimitives.WriteInt32BigEndian(segment, length);
+ }
+
+ /// <summary>
+ /// Gets the stored data segment
+ /// </summary>
+ /// <returns>The data segment</returns>
+ /// <exception cref="ObjectDisposedException"></exception>
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public readonly Span<byte> GetDataSegment()
+ {
+ //Get the actual length of the segment
+ int length = GetLength();
+ //Get the segment from its begining offset and
+ return _handle.AsSpan(DATA_SEGMENT_START, length);
+ }
+
+ /// <summary>
+ /// Writes the specified segment to the internal buffer and resizes the buffer if necessary.
+ /// This operation overwrites any previously stored data
+ /// </summary>
+ /// <param name="data">The data segment to store</param>
+ /// <exception cref="ObjectDisposedException"></exception>
+ public readonly void UpdateData(ReadOnlySpan<byte> data)
+ {
+ //Calc required buffer size
+ int bufferSize = GetRequiredHandleSize(data.Length);
+
+ //Resize handle if required
+ _handle.ResizeIfSmaller(bufferSize);
+
+ //Reset data length
+ SetLength(data.Length);
+
+ //Get the data segment
+ Span<byte> segment = GetDataSegment();
+
+#if DEBUG
+ //Test segment length is equvalent to the requested data length
+ System.Diagnostics.Debug.Assert(segment.Length == data.Length);
+#endif
+ //Copy data segment
+ data.CopyTo(segment);
+ }
+
+
+ ///<inheritdoc/>
+ public override bool Equals(object? obj) => obj is CacheEntry entry && Equals(entry);
+
+ ///<inheritdoc/>
+ public override int GetHashCode() => _handle.GetHashCode();
+
+ ///<inheritdoc/>
+ public static bool operator ==(CacheEntry left, CacheEntry right) => left.Equals(right);
+
+ ///<inheritdoc/>
+ public static bool operator !=(CacheEntry left, CacheEntry right) => !(left == right);
+
+ ///<inheritdoc/>
+ public bool Equals(CacheEntry other) => other.GetHashCode() == GetHashCode();
+ }
+}
diff --git a/lib/VNLib.Data.Caching.ObjectCache/src/CacheListener.cs b/lib/VNLib.Data.Caching.ObjectCache/src/CacheListener.cs
deleted file mode 100644
index 7f544e1..0000000
--- a/lib/VNLib.Data.Caching.ObjectCache/src/CacheListener.cs
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
-* Copyright (c) 2022 Vaughn Nugent
-*
-* Library: VNLib
-* Package: VNLib.Data.Caching.ObjectCache
-* File: CacheListener.cs
-*
-* CacheListener.cs is part of VNLib.Data.Caching.ObjectCache which is part of the larger
-* VNLib collection of libraries and utilities.
-*
-* VNLib.Data.Caching.ObjectCache is free software: you can redistribute it and/or modify
-* it under the terms of the GNU Affero General Public License as
-* published by the Free Software Foundation, either version 3 of the
-* License, or (at your option) any later version.
-*
-* VNLib.Data.Caching.ObjectCache is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-* GNU Affero General Public License for more details.
-*
-* You should have received a copy of the GNU Affero General Public License
-* along with this program. If not, see https://www.gnu.org/licenses/.
-*/
-
-using System;
-using System.IO;
-
-using VNLib.Utils.Memory;
-using VNLib.Net.Messaging.FBM.Server;
-
-namespace VNLib.Data.Caching
-{
- /// <summary>
- /// A base implementation of a memory/disk LRU data cache FBM listener
- /// </summary>
- public abstract class CacheListener : FBMListenerBase
- {
- /// <summary>
- /// The directory swap files will be stored
- /// </summary>
- public DirectoryInfo? Directory { get; private set; }
- /// <summary>
- /// The Cache store to access data blobs
- /// </summary>
- protected BlobCache? Cache { get; private set; }
- /// <summary>
- /// The <see cref="IUnmangedHeap"/> to allocate buffers from
- /// </summary>
- protected IUnmangedHeap? Heap { get; private set; }
-
- /// <summary>
- /// Initializes the <see cref="Cache"/> data store
- /// </summary>
- /// <param name="dir">The directory to swap cache records to</param>
- /// <param name="cacheSize">The size of the LRU cache</param>
- /// <param name="heap">The heap to allocate buffers from</param>
- protected void InitCache(DirectoryInfo dir, int cacheSize, IUnmangedHeap heap)
- {
- Heap = heap;
- Cache = new(dir, cacheSize, Log, Heap);
- Directory = dir;
- }
- }
-}
diff --git a/lib/VNLib.Data.Caching.ObjectCache/src/ChangeEvent.cs b/lib/VNLib.Data.Caching.ObjectCache/src/ChangeEvent.cs
index 7ee4427..2e8641e 100644
--- a/lib/VNLib.Data.Caching.ObjectCache/src/ChangeEvent.cs
+++ b/lib/VNLib.Data.Caching.ObjectCache/src/ChangeEvent.cs
@@ -27,11 +27,23 @@ namespace VNLib.Data.Caching.ObjectCache
/// <summary>
/// An event object that is passed when change events occur
/// </summary>
- public class ChangeEvent
+ public sealed class ChangeEvent
{
- public readonly string CurrentId;
- public readonly string? AlternateId;
- public readonly bool Deleted;
+ /// <summary>
+ /// The current id of the changed object
+ /// </summary>
+ public string CurrentId { get; }
+
+ /// <summary>
+ /// The alternate id of the changed object if specified
+ /// </summary>
+ public string? AlternateId { get; }
+
+ /// <summary>
+ /// A value that indicates if the object was deleted
+ /// </summary>
+ public bool Deleted { get; }
+
internal ChangeEvent(string id, string? alternate, bool deleted)
{
CurrentId = id;
diff --git a/lib/VNLib.Data.Caching.ObjectCache/src/ObjectCacheStore.cs b/lib/VNLib.Data.Caching.ObjectCache/src/ObjectCacheStore.cs
index 514d00b..05a4f02 100644
--- a/lib/VNLib.Data.Caching.ObjectCache/src/ObjectCacheStore.cs
+++ b/lib/VNLib.Data.Caching.ObjectCache/src/ObjectCacheStore.cs
@@ -34,6 +34,9 @@ using VNLib.Utils.Extensions;
using VNLib.Net.Messaging.FBM.Server;
using static VNLib.Data.Caching.Constants;
+
+#pragma warning disable CA1849 // Call async methods when in an async method
+
namespace VNLib.Data.Caching.ObjectCache
{
public delegate ReadOnlySpan<byte> GetBodyDataCallback<T>(T state);
@@ -41,7 +44,7 @@ namespace VNLib.Data.Caching.ObjectCache
/// <summary>
/// A <see cref="FBMListener"/> implementation of a <see cref="CacheListener"/>
/// </summary>
- public class ObjectCacheStore : CacheListener, IDisposable
+ public class ObjectCacheStore : FBMListenerBase, IDisposable
{
private readonly SemaphoreSlim StoreLock;
private bool disposedValue;
@@ -55,6 +58,14 @@ namespace VNLib.Data.Caching.ObjectCache
public AsyncQueue<ChangeEvent> EventQueue { get; }
/// <summary>
+ /// The Cache store to access data blobs
+ /// </summary>
+ private readonly BlobCache Cache;
+
+ private readonly IUnmangedHeap Heap;
+
+
+ /// <summary>
/// Initialzies a new <see cref="ObjectCacheStore"/>
/// </summary>
/// <param name="dir">The <see cref="DirectoryInfo"/> to store blob files to</param>
@@ -62,23 +73,25 @@ namespace VNLib.Data.Caching.ObjectCache
/// <param name="log"></param>
/// <param name="heap"></param>
/// <param name="singleReader">A value that indicates if a single thread is processing events</param>
- public ObjectCacheStore(DirectoryInfo dir, int cacheMax, ILogProvider log, IUnmangedHeap heap, bool singleReader)
+ public ObjectCacheStore(int cacheMax, ILogProvider log, IUnmangedHeap heap, bool singleReader)
{
Log = log;
//We can use a single writer and single reader in this context
EventQueue = new(true, singleReader);
- InitCache(dir, cacheMax, heap);
+ Cache = new(cacheMax);
+ Heap = heap;
InitListener(heap);
StoreLock = new(1,1);
}
///<inheritdoc/>
- protected override async Task ProcessAsync(FBMContext context, object? userState, CancellationToken cancellationToken)
+ protected override Task ProcessAsync(FBMContext context, object? userState, CancellationToken exitToken)
{
try
{
//Get the action header
string action = context.Method();
+
//Optional newid header
string? alternateId = context.NewObjectId();
@@ -88,82 +101,159 @@ namespace VNLib.Data.Caching.ObjectCache
{
//Get the object-id header
string objectId = context.ObjectId();
- //Take lock on store
- using SemSlimReleaser rel = await StoreLock.GetReleaserAsync(cancellationToken: cancellationToken);
- if (Cache!.TryGetValue(objectId, out MemoryHandle<byte>? data))
+
+ //Try read sync
+ if (StoreLock.Wait(0))
{
- //Set the status code and write the buffered data to the response buffer
- context.CloseResponse(ResponseCodes.Okay);
- //Copy data to response buffer
- context.Response.WriteBody(data.Span);
+ try
+ {
+ UnsafeReadEntry(context, objectId);
+ }
+ finally
+ {
+ StoreLock.Release();
+ }
+
+ return Task.CompletedTask;
}
else
{
- context.CloseResponse(ResponseCodes.NotFound);
+ //Read entry async
+ return InternalReadEntryAsync(context, objectId, exitToken);
}
}
- break;
+
case Actions.AddOrUpdate:
{
//Get the object-id header
string objectId = context.ObjectId();
- //Add/update a blob async
- await AddOrUpdateBlobAsync(objectId, alternateId, static context => context.Request.BodyData, context);
- //Notify update the event bus
- await EventQueue.EnqueueAsync(new(objectId, alternateId, false), cancellationToken);
- //Set status code
- context.CloseResponse(ResponseCodes.Okay);
+
+ //Create change event for the object
+ ChangeEvent change = new(objectId, alternateId, false);
+
+ //Attempt to aquire lock sync
+ if (StoreLock.Wait(0))
+ {
+ //aquired sync
+ try
+ {
+ //Update the item
+ UnsafeAddOrUpdate(objectId, alternateId, GetBodyData, context);
+ }
+ finally
+ {
+ StoreLock.Release();
+ }
+
+ //Add to event queue
+ EnqueEvent(change);
+
+ //Set status code
+ context.CloseResponse(ResponseCodes.Okay);
+
+ return Task.CompletedTask;
+ }
+ else
+ {
+ //Lock will be awaited async and
+ return InternalAddOrUpdateAsync(context, change, exitToken);
+ }
}
- break;
case Actions.Delete:
{
//Get the object-id header
string objectId = context.ObjectId();
-
- if (await DeleteItemAsync(objectId))
+
+ //Create change event
+ ChangeEvent change = new(objectId, alternateId, true);
+
+ //See if lock can be entered without waiting
+ if (StoreLock.Wait(0))
{
- //Notify deleted
- await EventQueue.EnqueueAsync(new(objectId, null, true), cancellationToken);
- //Set status header
- context.CloseResponse(ResponseCodes.Okay);
+ bool found = false;
+
+ try
+ {
+ //Sync
+ found = UnsafeDeleteEntry(objectId);
+ }
+ finally
+ {
+ StoreLock.Release();
+ }
+
+ //Notify change
+ EnqueEvent(change);
+
+ //Set status code if found
+ context.CloseResponse(found ? ResponseCodes.Okay : ResponseCodes.NotFound);
+
+ return Task.CompletedTask;
}
else
{
- //Set status header
- context.CloseResponse(ResponseCodes.NotFound);
+ //lock will yeild async
+ return InternalDeleteAsync(context, change, exitToken);
}
}
- break;
// event queue dequeue request
case Actions.Dequeue:
{
+ static void SetResponse(ChangeEvent change, FBMContext context)
+ {
+ if (change.Deleted)
+ {
+ context.CloseResponse("deleted");
+ context.Response.WriteHeader(ObjectId, change.CurrentId);
+ }
+ else
+ {
+ //Changed
+ context.CloseResponse("modified");
+ context.Response.WriteHeader(ObjectId, change.CurrentId);
+
+ //Set old id if an old id is set
+ if (change.CurrentId != null)
+ {
+ context.Response.WriteHeader(NewObjectId, change.AlternateId);
+ }
+ }
+ }
+
+ static async Task DequeAsync(AsyncQueue<ChangeEvent> queue, FBMContext context, CancellationToken exitToken)
+ {
+ //Wait for a new message to process
+ ChangeEvent ev = await queue.DequeueAsync(exitToken);
+
+ //Set the response
+ SetResponse(ev, context);
+ }
+
//If no event bus is registered, then this is not a legal command
if (userState is not AsyncQueue<ChangeEvent> eventBus)
{
context.CloseResponse(ResponseCodes.NotFound);
- break;
+
+ return Task.CompletedTask;
}
- //Wait for a new message to process
- ChangeEvent ev = await eventBus.DequeueAsync(cancellationToken);
- if (ev.Deleted)
+
+ //try to deq without awaiting
+ if (eventBus.TryDequeue(out ChangeEvent? change))
{
- context.CloseResponse("deleted");
- context.Response.WriteHeader(ObjectId, ev.CurrentId);
+ SetResponse(change, context);
+
+ return Task.CompletedTask;
}
else
{
- //Changed
- context.CloseResponse("modified");
- context.Response.WriteHeader(ObjectId, ev.CurrentId);
- //Set old id if an old id is set
- if (ev.CurrentId != null)
- {
- context.Response.WriteHeader(NewObjectId, ev.AlternateId);
- }
+ //Process async
+ return DequeAsync(eventBus, context, exitToken);
}
}
- break;
+
}
+
+ Log.Error("Unhandled cache event!");
}
catch (OperationCanceledException)
{
@@ -175,68 +265,178 @@ namespace VNLib.Data.Caching.ObjectCache
Log.Error(ex);
context.CloseResponse(ResponseCodes.Error);
}
+
+ return Task.CompletedTask;
}
+
+
+ private static ReadOnlySpan<byte> GetBodyData(FBMContext ctx) => ctx.Request.BodyData;
- /// <summary>
- /// Asynchronously deletes a previously stored item
- /// </summary>
- /// <param name="id">The id of the object to delete</param>
- /// <returns>A task that completes when the item has been deleted</returns>
- public async Task<bool> DeleteItemAsync(string id)
+ private void EnqueEvent(ChangeEvent change)
{
- using SemSlimReleaser rel = await StoreLock.GetReleaserAsync();
- return Cache!.Remove(id);
+ if (!EventQueue.TryEnque(change))
+ {
+ Log.Warn("Change event {ev} was not enqued because the event queue is overflowing!", change.CurrentId);
+ }
}
-
- /// <summary>
- /// Asynchronously adds or updates an object in the store and optionally update's its id
- /// </summary>
- /// <param name="objectId">The current (or old) id of the object</param>
- /// <param name="alternateId">An optional id to update the blob to</param>
- /// <param name="bodyData">A callback that returns the data for the blob</param>
- /// <param name="state">The state parameter to pass to the data callback</param>
- /// <returns></returns>
- public async Task AddOrUpdateBlobAsync<T>(string objectId, string? alternateId, GetBodyDataCallback<T> bodyData, T state)
+
+ private void UnsafeReadEntry(FBMContext context, string objectId)
{
- MemoryHandle<byte>? blob;
+ if (Cache!.TryGetValue(objectId, out CacheEntry data))
+ {
+ //Set the status code and write the buffered data to the response buffer
+ context.CloseResponse(ResponseCodes.Okay);
+
+ //Copy data to response buffer
+ context.Response.WriteBody(data.GetDataSegment());
+ }
+ else
+ {
+ context.CloseResponse(ResponseCodes.NotFound);
+ }
+ }
+
+ async Task InternalReadEntryAsync(FBMContext context, string objectId, CancellationToken cancellation)
+ {
+ //enter lock async
+ using SemSlimReleaser rel = await StoreLock.GetReleaserAsync(cancellation);
+
+ UnsafeReadEntry(context, objectId);
+ }
+
+ private async Task InternalAddOrUpdateAsync(FBMContext context, ChangeEvent change, CancellationToken cancellation)
+ {
+ //Wait for lock since we know it will yeild async
+ using (SemSlimReleaser rel = await StoreLock.GetReleaserAsync(cancellation))
+ {
+ UnsafeAddOrUpdate(change.CurrentId, change.AlternateId, GetBodyData, context);
+ }
+
+ //Add to event queue
+ EnqueEvent(change);
+
+ //Set status code
+ context.CloseResponse(ResponseCodes.Okay);
+ }
+
+ private void UnsafeAddOrUpdate<T>(string objectId, string? alternateId, GetBodyDataCallback<T> bodyData, T state)
+ {
+ CacheEntry entry;
+
//See if new/alt session id was specified
if (string.IsNullOrWhiteSpace(alternateId))
{
- //Take lock on store
- using SemSlimReleaser rel = await StoreLock.GetReleaserAsync();
//See if blob exists
- if (!Cache!.TryGetValue(objectId, out blob))
+ if (!Cache!.TryGetValue(objectId, out entry))
{
- //If not, create new blob and add to store
- blob = Heap.AllocAndCopy(bodyData(state));
- Cache.Add(objectId, blob);
+ //Create the new cache entry since it does not exist
+ entry = CacheEntry.Create(bodyData(state), Heap);
+
+ //Add to cache
+ Cache.Add(objectId, entry);
}
else
{
//Reset the buffer state
- blob.WriteAndResize(bodyData(state));
+ entry.UpdateData(bodyData(state));
}
}
//Need to change the id of the record
else
{
- //Take lock on store
- using SemSlimReleaser rel = await StoreLock.GetReleaserAsync();
//Try to change the blob key
- if (!Cache!.TryChangeKey(objectId, alternateId, out blob))
+ if (!Cache!.TryChangeKey(objectId, alternateId, out entry))
{
- //Blob not found, create new blob
- blob = Heap.AllocAndCopy(bodyData(state));
- Cache.Add(alternateId, blob);
+ //Create the new cache entry since it does not exist
+ entry = CacheEntry.Create(bodyData(state), Heap);
+
+ //Add to cache by its alternate id
+ Cache.Add(alternateId, entry);
}
else
{
//Reset the buffer state
- blob.WriteAndResize(bodyData(state));
+ entry.UpdateData(bodyData(state));
}
}
+
+ //Update modified time to current utc time
+ entry.SetTime(DateTime.UtcNow);
+ }
+
+ private async Task InternalDeleteAsync(FBMContext context, ChangeEvent change, CancellationToken cancellation)
+ {
+ bool found = false;
+
+ //enter the lock
+ using(SemSlimReleaser rel = await StoreLock.GetReleaserAsync(cancellation))
+ {
+ //Sync
+ found = UnsafeDeleteEntry(change.CurrentId);
+ }
+
+ //Notify change
+ EnqueEvent(change);
+
+ //Set status code if found
+ context.CloseResponse(found ? ResponseCodes.Okay : ResponseCodes.NotFound);
}
-
+
+ private bool UnsafeDeleteEntry(string id) => Cache!.Remove(id);
+
+
+ /// <summary>
+ /// Asynchronously adds or updates an object in the store and optionally update's its id
+ /// </summary>
+ /// <param name="objectId">The current (or old) id of the object</param>
+ /// <param name="alternateId">An optional id to update the blob to</param>
+ /// <param name="bodyData">A callback that returns the data for the blob</param>
+ /// <param name="state">The state parameter to pass to the data callback</param>
+ /// <param name="token">A token to cancel the async operation</param>
+ /// <returns>A value task that represents the async operation</returns>
+ public async ValueTask AddOrUpdateBlobAsync<T>(string objectId, string? alternateId, GetBodyDataCallback<T> bodyData, T state, CancellationToken token = default)
+ {
+ //Test the lock before waiting async
+ if (!StoreLock.Wait(0))
+ {
+ //Wait async to avoid task alloc
+ await StoreLock.WaitAsync(token);
+ }
+ try
+ {
+ UnsafeAddOrUpdate(objectId, alternateId, bodyData, state);
+ }
+ finally
+ {
+ StoreLock.Release();
+ }
+ }
+
+ /// <summary>
+ /// Asynchronously deletes a previously stored item
+ /// </summary>
+ /// <param name="id">The id of the object to delete</param>
+ /// <param name="token">A token to cancel the async lock await</param>
+ /// <returns>A task that completes when the item has been deleted</returns>
+ public async ValueTask<bool> DeleteItemAsync(string id, CancellationToken token = default)
+ {
+ //Test the lock before waiting async
+ if (!StoreLock.Wait(0))
+ {
+ //Wait async to avoid task alloc
+ await StoreLock.WaitAsync(token);
+ }
+ try
+ {
+ return UnsafeDeleteEntry(id);
+ }
+ finally
+ {
+ StoreLock.Release();
+ }
+ }
+
+
///<inheritdoc/>
protected virtual void Dispose(bool disposing)
{
@@ -252,6 +452,7 @@ namespace VNLib.Data.Caching.ObjectCache
disposedValue = true;
}
}
+
///<inheritdoc/>
public void Dispose()
{
diff --git a/lib/VNLib.Data.Caching/src/ClientExtensions.cs b/lib/VNLib.Data.Caching/src/ClientExtensions.cs
index 0a24a83..353bf67 100644
--- a/lib/VNLib.Data.Caching/src/ClientExtensions.cs
+++ b/lib/VNLib.Data.Caching/src/ClientExtensions.cs
@@ -105,16 +105,16 @@ namespace VNLib.Data.Caching
response.ThrowIfNotSet();
//Get the status code
- ReadOnlyMemory<char> status = response.Headers.FirstOrDefault(static a => a.Key == HeaderCommand.Status).Value;
+ FBMMessageHeader status = response.Headers.FirstOrDefault(static a => a.Header == HeaderCommand.Status);
//Check ok status code, then its safe to deserialize
- if (status.Span.Equals(ResponseCodes.Okay, StringComparison.Ordinal))
+ if (status.Value.Equals(ResponseCodes.Okay, StringComparison.Ordinal))
{
return JsonSerializer.Deserialize<T>(response.ResponseBody, LocalOptions);
}
//Object may not exist on the server yet
- if (status.Span.Equals(ResponseCodes.NotFound, StringComparison.Ordinal))
+ if (status.Value.Equals(ResponseCodes.NotFound, StringComparison.Ordinal))
{
return default;
}
@@ -181,14 +181,14 @@ namespace VNLib.Data.Caching
response.ThrowIfNotSet();
//Get the status code
- ReadOnlyMemory<char> status = response.Headers.FirstOrDefault(static a => a.Key == HeaderCommand.Status).Value;
+ FBMMessageHeader status = response.Headers.FirstOrDefault(static a => a.Header == HeaderCommand.Status);
//Check status code
- if (status.Span.Equals(ResponseCodes.Okay, StringComparison.OrdinalIgnoreCase))
+ if (status.Value.Equals(ResponseCodes.Okay, StringComparison.OrdinalIgnoreCase))
{
return;
}
- else if(status.Span.Equals(ResponseCodes.NotFound, StringComparison.OrdinalIgnoreCase))
+ else if(status.Value.Equals(ResponseCodes.NotFound, StringComparison.OrdinalIgnoreCase))
{
throw new ObjectNotFoundException($"object {objectId} not found on remote server");
}
@@ -236,13 +236,13 @@ namespace VNLib.Data.Caching
response.ThrowIfNotSet();
//Get the status code
- ReadOnlyMemory<char> status = response.Headers.FirstOrDefault(static a => a.Key == HeaderCommand.Status).Value;
+ FBMMessageHeader status = response.Headers.FirstOrDefault(static a => a.Header == HeaderCommand.Status);
- if (status.Span.Equals(ResponseCodes.Okay, StringComparison.Ordinal))
+ if (status.Value.Equals(ResponseCodes.Okay, StringComparison.Ordinal))
{
return;
}
- else if(status.Span.Equals(ResponseCodes.NotFound, StringComparison.OrdinalIgnoreCase))
+ else if(status.Value.Equals(ResponseCodes.NotFound, StringComparison.OrdinalIgnoreCase))
{
throw new ObjectNotFoundException($"object {objectId} not found on remote server");
}
@@ -277,9 +277,9 @@ namespace VNLib.Data.Caching
return new()
{
- Status = response.Headers.FirstOrDefault(static a => a.Key == HeaderCommand.Status).Value.ToString(),
- CurrentId = response.Headers.SingleOrDefault(static v => v.Key == Constants.ObjectId).Value.ToString(),
- NewId = response.Headers.SingleOrDefault(static v => v.Key == Constants.NewObjectId).Value.ToString()
+ Status = response.Headers.FirstOrDefault(static a => a.Header == HeaderCommand.Status).Value.ToString(),
+ CurrentId = response.Headers.SingleOrDefault(static v => v.Header == Constants.ObjectId).Value.ToString(),
+ NewId = response.Headers.SingleOrDefault(static v => v.Header == Constants.NewObjectId).Value.ToString()
};
}
finally
@@ -297,7 +297,7 @@ namespace VNLib.Data.Caching
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static string ObjectId(this FBMContext context)
{
- return context.Request.Headers.First(static kvp => kvp.Key == Constants.ObjectId).Value.ToString();
+ return context.Request.Headers.First(static kvp => kvp.Header == Constants.ObjectId).Value.ToString();
}
/// <summary>
@@ -308,7 +308,7 @@ namespace VNLib.Data.Caching
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static string? NewObjectId(this FBMContext context)
{
- return context.Request.Headers.FirstOrDefault(static kvp => kvp.Key == Constants.NewObjectId).Value.ToString();
+ return context.Request.Headers.FirstOrDefault(static kvp => kvp.Header == Constants.NewObjectId).Value.ToString();
}
/// <summary>
@@ -319,7 +319,7 @@ namespace VNLib.Data.Caching
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static string Method(this FBMContext context)
{
- return context.Request.Headers.First(static kvp => kvp.Key == HeaderCommand.Action).Value.ToString();
+ return context.Request.Headers.First(static kvp => kvp.Header == HeaderCommand.Action).Value.ToString();
}
/// <summary>
diff --git a/lib/VNLib.Plugins.Extensions.VNCache/src/VNCacheExtensions.cs b/lib/VNLib.Plugins.Extensions.VNCache/src/VNCacheExtensions.cs
index e5e6ee3..75b9bd4 100644
--- a/lib/VNLib.Plugins.Extensions.VNCache/src/VNCacheExtensions.cs
+++ b/lib/VNLib.Plugins.Extensions.VNCache/src/VNCacheExtensions.cs
@@ -67,7 +67,7 @@ namespace VNLib.Plugins.Extensions.VNCache
VnCacheClient client = new(debugLog);
//Begin cache connections by scheduling a task on the plugin's scheduler
- _ = pbase.DeferTask(() => RunClientAsync(pbase, config, localized, client), 250);
+ _ = pbase.ObserveTask(() => RunClientAsync(pbase, config, localized, client), 250);
return client;
}