aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLibravatar vnugent <public@vaughnnugent.com>2023-01-27 21:13:16 -0500
committerLibravatar vnugent <public@vaughnnugent.com>2023-01-27 21:13:16 -0500
commit892bbaaa5c1f62631070cc74820f349c4c80f55d (patch)
treec04dfa8c5a3ead1522502635d4bc9696102d28dc
parent0ea612dde50e82d722b0654e0e569fd4e7469978 (diff)
Object cache overhaul and logger updates
-rw-r--r--lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs20
-rw-r--r--lib/VNLib.Data.Caching.ObjectCache/src/BlobCache.cs106
-rw-r--r--lib/VNLib.Data.Caching.ObjectCache/src/BlobItem.cs207
-rw-r--r--lib/VNLib.Data.Caching.ObjectCache/src/CacheEntry.cs227
-rw-r--r--lib/VNLib.Data.Caching.ObjectCache/src/CacheListener.cs64
-rw-r--r--lib/VNLib.Data.Caching.ObjectCache/src/ChangeEvent.cs20
-rw-r--r--lib/VNLib.Data.Caching.ObjectCache/src/ObjectCacheStore.cs357
-rw-r--r--lib/VNLib.Data.Caching/src/ClientExtensions.cs30
-rw-r--r--lib/VNLib.Plugins.Extensions.VNCache/src/VNCacheExtensions.cs2
-rw-r--r--plugins/CacheBroker/src/CacheBroker.csproj26
-rw-r--r--plugins/CacheBroker/src/Endpoints/BrokerRegistrationEndpoint.cs13
-rw-r--r--plugins/ObjectCacheServer/src/Endpoints/BrokerHeartBeat.cs4
-rw-r--r--plugins/ObjectCacheServer/src/Endpoints/CacheConfiguration.cs54
-rw-r--r--plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs220
-rw-r--r--plugins/ObjectCacheServer/src/Endpoints/ICacheStore.cs56
-rw-r--r--plugins/ObjectCacheServer/src/ObjectCacheServer.csproj31
-rw-r--r--plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs150
17 files changed, 1012 insertions, 575 deletions
diff --git a/lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs b/lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs
index ebdfd5b..222240a 100644
--- a/lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs
+++ b/lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs
@@ -81,14 +81,28 @@ namespace VNLib.Data.Caching.Extensions
/// <returns>A preconfigured <see cref="FBMClientConfig"/> for object caching</returns>
public static FBMClientConfig GetDefaultConfig(IUnmangedHeap heap, int maxMessageSize, ILogProvider? debugLog = null)
{
+ /*
+ * Max message size (for server) should account for max data + the additional header buffer
+ */
+ int maxExtra = (int)Helpers.ToNearestKb((int)(maxMessageSize * 1.2) + MAX_FBM_MESSAGE_HEADER_SIZE);
+
return new()
{
BufferHeap = heap,
- MaxMessageSize = maxMessageSize * 2,
- RecvBufferSize = maxMessageSize,
- MessageBufferSize = maxMessageSize,
+
+ //Max message size is referrences
+ MaxMessageSize = maxExtra,
+
+ //The size of the buffer used for buffering incoming messages
+ RecvBufferSize = maxExtra,
+ //Message buffer should be max message + headers
+ MessageBufferSize = (int)Helpers.ToNearestKb(maxMessageSize + MAX_FBM_MESSAGE_HEADER_SIZE),
+
+ //Caching only requires a fixed number of request headers, so we can used a fixed buffer size
MaxHeaderBufferSize = MAX_FBM_MESSAGE_HEADER_SIZE,
+
+ //Set the optional cache sub-protocol
SubProtocol = CACHE_WS_SUB_PROCOL,
HeaderEncoding = Helpers.DefaultEncoding,
diff --git a/lib/VNLib.Data.Caching.ObjectCache/src/BlobCache.cs b/lib/VNLib.Data.Caching.ObjectCache/src/BlobCache.cs
index 26df351..fbb2dcc 100644
--- a/lib/VNLib.Data.Caching.ObjectCache/src/BlobCache.cs
+++ b/lib/VNLib.Data.Caching.ObjectCache/src/BlobCache.cs
@@ -23,116 +23,134 @@
*/
using System;
-using System.IO;
-using System.Linq;
using System.Collections.Generic;
-using System.Diagnostics.CodeAnalysis;
-using VNLib.Utils.IO;
-using VNLib.Utils.Logging;
-using VNLib.Utils.Memory;
using VNLib.Utils.Memory.Caching;
namespace VNLib.Data.Caching
{
+
/// <summary>
/// A general purpose binary data storage
/// </summary>
- public class BlobCache : LRUCache<string, MemoryHandle<byte>>
+ public class BlobCache : LRUCache<string, CacheEntry>
{
- readonly IUnmangedHeap Heap;
- readonly DirectoryInfo SwapDir;
- readonly ILogProvider Log;
///<inheritdoc/>
public override bool IsReadOnly { get; }
+
///<inheritdoc/>
protected override int MaxCapacity { get; }
-
-
+
+
/// <summary>
/// Initializes a new <see cref="BlobCache"/> store
/// </summary>
- /// <param name="swapDir">The <see cref="IsolatedStorageDirectory"/> to swap blob data to when cache</param>
/// <param name="maxCapacity">The maximum number of items to keep in memory</param>
- /// <param name="log">A <see cref="ILogProvider"/> to write log data to</param>
- /// <param name="heap">A <see cref="IUnmangedHeap"/> to allocate buffers and store <see cref="BlobItem"/> data in memory</param>
- public BlobCache(DirectoryInfo swapDir, int maxCapacity, ILogProvider log, IUnmangedHeap heap)
+ /// <exception cref="ArgumentException"></exception>
+ public BlobCache(int maxCapacity)
:base(StringComparer.Ordinal)
{
- IsReadOnly = false;
+ if(maxCapacity < 1)
+ {
+ throw new ArgumentException("The maxium capacity of the store must be a positive integer larger than 0", nameof(maxCapacity));
+ }
+
MaxCapacity = maxCapacity;
- SwapDir = swapDir;
+
//Update the lookup table size
LookupTable.EnsureCapacity(maxCapacity);
- //Set default heap if not specified
- Heap = heap;
- Log = log;
}
+
///<inheritdoc/>
- protected override bool CacheMiss(string key, [NotNullWhen(true)] out MemoryHandle<byte>? value)
+ protected override bool CacheMiss(string key, out CacheEntry value)
{
- value = null;
+ value = default;
return false;
}
+
///<inheritdoc/>
- protected override void Evicted(KeyValuePair<string, MemoryHandle<byte>> evicted)
+ protected override void Evicted(ref KeyValuePair<string, CacheEntry> evicted)
{
- //Dispose the blob
+ //Dispose the cache item
evicted.Value.Dispose();
}
+
/// <summary>
- /// If the <see cref="BlobItem"/> is found in the store, changes the key
+ /// If the <see cref="CacheEntry"/> is found in the store, changes the key
/// that referrences the blob.
/// </summary>
/// <param name="currentKey">The key that currently referrences the blob in the store</param>
/// <param name="newKey">The new key that will referrence the blob</param>
- /// <param name="blob">The <see cref="BlobItem"/> if its found in the store</param>
+ /// <param name="blob">The <see cref="CacheEntry"/> if its found in the store</param>
/// <returns>True if the record was found and the key was changes</returns>
- public bool TryChangeKey(string currentKey, string newKey, [NotNullWhen(true)] out MemoryHandle<byte>? blob)
+ public bool TryChangeKey(string currentKey, string newKey, out CacheEntry blob)
{
- if (LookupTable.Remove(currentKey, out LinkedListNode<KeyValuePair<string, MemoryHandle<byte>>>? node))
+ //Try to get the node at the current key
+ if (LookupTable.Remove(currentKey, out LinkedListNode<KeyValuePair<string, CacheEntry>> ? node))
{
//Remove the node from the ll
List.Remove(node);
- //Update the node kvp
- blob = node.Value.Value;
- node.Value = new KeyValuePair<string, MemoryHandle<byte>>(newKey, blob);
+
+ //Get the stored blob
+ blob = node.ValueRef.Value;
+
+ //Update the
+ node.Value = new KeyValuePair<string, CacheEntry>(newKey, blob);
+
//Add to end of list
List.AddLast(node);
+
//Re-add to lookup table with new key
LookupTable.Add(newKey, node);
+
return true;
}
- blob = null;
+
+ blob = default;
return false;
}
+
/// <summary>
- /// Removes the <see cref="BlobItem"/> from the store without disposing the blobl
+ /// Removes the <see cref="CacheEntry"/> from the store, and frees its resources
/// </summary>
- /// <param name="key">The key that referrences the <see cref="BlobItem"/> in the store</param>
+ /// <param name="key">The key that referrences the <see cref="CacheEntry"/> in the store</param>
/// <returns>A value indicating if the blob was removed</returns>
public override bool Remove(string key)
{
//Remove the item from the lookup table and if it exists, remove the node from the list
- if (LookupTable.Remove(key, out LinkedListNode<KeyValuePair<string, MemoryHandle<byte>>>? node))
+ if (!LookupTable.Remove(key, out LinkedListNode<KeyValuePair<string, CacheEntry>>? node))
+ {
+ return false;
+ }
+
+ //always dispose blob
+ using (node.ValueRef.Value)
{
- //Remove the new from the list
+ //Remove the node from the list
List.Remove(node);
- //dispose the buffer
- node.Value.Value.Dispose();
- return true;
}
- return false;
+ return true;
}
+
/// <summary>
- /// Removes and disposes all blobl elements in cache (or in the backing store)
+ /// Removes all cache entires and disposes their held resources
/// </summary>
public override void Clear()
{
- foreach (MemoryHandle<byte> blob in List.Select(kp => kp.Value))
+ //Start from first node
+ LinkedListNode<KeyValuePair<string, CacheEntry>>? node = List.First;
+
+ //Classic ll node itteration
+ while(node != null)
{
- blob.Dispose();
+ //Dispose the cache entry
+ node.ValueRef.Value.Dispose();
+
+ //Move to next node
+ node = node.Next;
}
+
+ //empty all cache entires in the store
base.Clear();
}
}
diff --git a/lib/VNLib.Data.Caching.ObjectCache/src/BlobItem.cs b/lib/VNLib.Data.Caching.ObjectCache/src/BlobItem.cs
deleted file mode 100644
index 728875f..0000000
--- a/lib/VNLib.Data.Caching.ObjectCache/src/BlobItem.cs
+++ /dev/null
@@ -1,207 +0,0 @@
-/*
-* Copyright (c) 2022 Vaughn Nugent
-*
-* Library: VNLib
-* Package: VNLib.Data.Caching.ObjectCache
-* File: BlobItem.cs
-*
-* BlobItem.cs is part of VNLib.Data.Caching.ObjectCache which is part of the larger
-* VNLib collection of libraries and utilities.
-*
-* VNLib.Data.Caching.ObjectCache is free software: you can redistribute it and/or modify
-* it under the terms of the GNU Affero General Public License as
-* published by the Free Software Foundation, either version 3 of the
-* License, or (at your option) any later version.
-*
-* VNLib.Data.Caching.ObjectCache is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-* GNU Affero General Public License for more details.
-*
-* You should have received a copy of the GNU Affero General Public License
-* along with this program. If not, see https://www.gnu.org/licenses/.
-*/
-
-using System;
-using System.IO;
-using System.Threading;
-using System.Threading.Tasks;
-
-using VNLib.Utils;
-using VNLib.Utils.IO;
-using VNLib.Utils.Memory;
-using VNLib.Utils.Logging;
-using VNLib.Utils.Extensions;
-
-namespace VNLib.Data.Caching
-{
- /// <summary>
- /// A general purpose binary storage item
- /// </summary>
- public class BlobItem //: VnDisposeable
- {
- /*
- private static readonly JoinableTaskContext JTX = new();
- private static readonly Semaphore CentralSwapLock = new(Environment.ProcessorCount, Environment.ProcessorCount);
-
- private readonly VnMemoryStream _loadedData;
- private bool _loaded;
-
- /// <summary>
- /// The time the blob was last modified
- /// </summary>
- public DateTimeOffset LastAccessed { get; private set; }
-
-
- /// <summary>
- /// Gets the current size of the file (in bytes) as an atomic operation
- /// </summary>
- public int FileSize => (int)_loadedData.Length;
- /// <summary>
- /// The operation synchronization lock
- /// </summary>
- public AsyncReaderWriterLock OpLock { get; }
- /// <summary>
- /// Initializes a new <see cref="BlobItem"/>
- /// </summary>
- /// <param name="heap">The heap to allocate buffers from</param>
- internal BlobItem(IUnmangedHeap heap)
- {
- _loadedData = new(heap);
- OpLock = new AsyncReaderWriterLock(JTX);
- _loaded = true;
- LastAccessed = DateTimeOffset.UtcNow;
- }
- ///<inheritdoc/>
- protected override void Free()
- {
- _loadedData.Dispose();
- OpLock.Dispose();
- }
-
- /// <summary>
- /// Reads data from the internal buffer and copies it to the specified buffer.
- /// Use the <see cref="FileSize"/> property to obtain the size of the internal buffer
- /// </summary>
- /// <param name="buffer">The buffer to copy data to</param>
- /// <returns>When completed, the number of bytes copied to the buffer</returns>
- public int Read(Span<byte> buffer)
- {
- //Make sure the blob has been swapped back into memory
- if (!_loaded)
- {
- throw new InvalidOperationException("The blob was not loaded from the disk");
- }
- //Read all data from the buffer and write it to the output buffer
- _loadedData.AsSpan().CopyTo(buffer);
- //Update last-accessed
- LastAccessed = DateTimeOffset.UtcNow;
- return (int)_loadedData.Length;
- }
- /// <summary>
- /// Overwrites the internal buffer with the contents of the supplied buffer
- /// </summary>
- /// <param name="buffer">The buffer containing data to store within the blob</param>
- /// <returns>A <see cref="ValueTask"/> that completes when write access has been granted and copied</returns>
- /// <exception cref="InvalidOperationException"></exception>
- public void Write(ReadOnlySpan<byte> buffer)
- {
- //Make sure the blob has been swapped back into memory
- if (!_loaded)
- {
- throw new InvalidOperationException("The blob was not loaded from the disk");
- }
- //Reset the buffer
- _loadedData.SetLength(buffer.Length);
- _loadedData.Seek(0, SeekOrigin.Begin);
- _loadedData.Write(buffer);
- LastAccessed = DateTimeOffset.UtcNow;
- }
-
- /// <summary>
- /// Writes the contents of the memory buffer to its designated file on the disk
- /// </summary>
- /// <param name="heap">The heap to allocate buffers from</param>
- /// <param name="swapDir">The <see cref="IsolatedStorageDirectory"/> that stores the file</param>
- /// <param name="filename">The name of the file to write data do</param>
- /// <param name="log">A log to write errors to</param>
- /// <returns>A task that completes when the swap to disk is complete</returns>
- internal async Task SwapToDiskAsync(IUnmangedHeap heap, DirectoryInfo swapDir, string filename, ILogProvider log)
- {
- try
- {
- //Wait for write lock
- await using (AsyncReaderWriterLock.Releaser releaser = await OpLock.WriteLockAsync())
- {
- //Enter swap lock
- await CentralSwapLock;
- try
- {
- //Open swap file data stream
- await using FileStream swapFile = swapDir.OpenFile(filename, FileMode.OpenOrCreate, FileAccess.ReadWrite, bufferSize: 8128);
- //reset swap file
- swapFile.SetLength(0);
- //Seek loaded-data back to 0 before writing
- _loadedData.Seek(0, SeekOrigin.Begin);
- //Write loaded data to disk
- await _loadedData.CopyToAsync(swapFile, 8128, heap);
- }
- finally
- {
- CentralSwapLock.Release();
- }
- //Release memory held by stream
- _loadedData.SetLength(0);
- //Clear loaded flag
- _loaded = false;
- LastAccessed = DateTimeOffset.UtcNow;
- }
- log.Debug("Blob {name} swapped to disk", filename);
- }
- catch(Exception ex)
- {
- log.Error(ex, "Blob swap to disk error");
- }
- }
- /// <summary>
- /// Reads the contents of the blob into a memory buffer from its designated file on disk
- /// </summary>
- /// <param name="heap">The heap to allocate buffers from</param>
- /// <param name="swapDir">The <see cref="IsolatedStorageDirectory"/> that stores the file</param>
- /// <param name="filename">The name of the file to write the blob data to</param>
- /// <param name="log">A log to write errors to</param>
- /// <returns>A task that completes when the swap from disk is complete</returns>
- internal async Task SwapFromDiskAsync(IUnmangedHeap heap, DirectoryInfo swapDir, string filename, ILogProvider log)
- {
- try
- {
- //Wait for write lock
- await using (AsyncReaderWriterLock.Releaser releaser = await OpLock.WriteLockAsync())
- {
- //Enter swap lock
- await CentralSwapLock;
- try
- {
- //Open swap file data stream
- await using FileStream swapFile = swapDir.OpenFile(filename, FileMode.OpenOrCreate, FileAccess.Read, bufferSize:8128);
- //Copy from disk to memory
- await swapFile.CopyToAsync(_loadedData, 8128, heap);
- }
- finally
- {
- CentralSwapLock.Release();
- }
- //Set loaded flag
- _loaded = true;
- LastAccessed = DateTimeOffset.UtcNow;
- }
- log.Debug("Blob {name} swapped from disk", filename);
- }
- catch(Exception ex)
- {
- log.Error(ex, "Blob swap from disk error");
- }
- }
- */
- }
-}
diff --git a/lib/VNLib.Data.Caching.ObjectCache/src/CacheEntry.cs b/lib/VNLib.Data.Caching.ObjectCache/src/CacheEntry.cs
new file mode 100644
index 0000000..8644d1d
--- /dev/null
+++ b/lib/VNLib.Data.Caching.ObjectCache/src/CacheEntry.cs
@@ -0,0 +1,227 @@
+/*
+* Copyright (c) 2022 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Data.Caching.ObjectCache
+* File: BlobCache.cs
+*
+* BlobCache.cs is part of VNLib.Data.Caching.ObjectCache which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* VNLib.Data.Caching.ObjectCache is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Data.Caching.ObjectCache is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+using System.Buffers.Binary;
+using System.Runtime.CompilerServices;
+
+using VNLib.Utils.Memory;
+using VNLib.Utils.Extensions;
+
+namespace VNLib.Data.Caching
+{
+ /// <summary>
+ /// A structure that represents an item in cache
+ /// </summary>
+ public readonly struct CacheEntry : IDisposable, IEquatable<CacheEntry>
+ {
+ private const int TIME_SEGMENT_SIZE = sizeof(long);
+
+ private const int LENGTH_SEGMENT_SIZE = sizeof(int);
+
+ private const int DATA_SEGMENT_START = TIME_SEGMENT_SIZE + LENGTH_SEGMENT_SIZE;
+
+
+ //Only contain ref to backing handle to keep struct size small
+ private readonly MemoryHandle<byte> _handle;
+
+
+ /// <summary>
+ /// Creates a new <see cref="CacheEntry"/> and copies the initial data to the internal buffer
+ /// </summary>
+ /// <param name="data">The initial data to store</param>
+ /// <param name="heap">The heap to allocate the buffer from</param>
+ /// <returns>The new <see cref="CacheEntry"/></returns>
+ public static CacheEntry Create(ReadOnlySpan<byte> data, IUnmangedHeap heap)
+ {
+ //Calc buffer size
+ int bufferSize = GetRequiredHandleSize(data.Length);
+
+ //Alloc buffer
+ MemoryHandle<byte> handle = heap.Alloc<byte>(bufferSize);
+
+ //Create new entry from handle
+ CacheEntry entry = new (handle, data.Length);
+
+ //Get the data segment
+ Span<byte> segment = entry.GetDataSegment();
+
+ //Copy data segment
+ data.CopyTo(segment);
+
+ return entry;
+ }
+
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ private static int GetRequiredHandleSize(int size)
+ {
+ //Caculate the minimum handle size to store all required information, rounded to nearest page
+ return (int)MemoryUtil.NearestPage(size + DATA_SEGMENT_START);
+ }
+
+ private CacheEntry(MemoryHandle<byte> handle, int length)
+ {
+ _handle = handle;
+ //Store data length, assumes the handle is large enough to store it
+ SetLength(length);
+ }
+
+
+ ///<inheritdoc/>
+ public readonly void Dispose() => _handle.Dispose();
+
+
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ private readonly Span<byte> GetTimeSegment() => _handle.AsSpan(0, TIME_SEGMENT_SIZE);
+
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ private readonly Span<byte> GetLengthSegment() => _handle.AsSpan(TIME_SEGMENT_SIZE, LENGTH_SEGMENT_SIZE);
+
+ /// <summary>
+ /// Gets the size of the block of memory held by the underlying handle
+ /// </summary>
+ /// <returns>The size of the block held by the current entry</returns>
+ /// <exception cref="ObjectDisposedException"></exception>
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public readonly nuint GetMemoryUsage()
+ {
+ _handle.ThrowIfClosed();
+ return _handle.ByteLength;
+ }
+
+
+ /// <summary>
+ /// Gets the last set time
+ /// </summary>
+ /// <returns>The last date stored</returns>
+ /// <exception cref="ObjectDisposedException"></exception>
+ public readonly DateTime GetCreatedTime()
+ {
+ //Get the time segment and write the value in big endian
+ ReadOnlySpan<byte> segment = GetTimeSegment();
+
+ long ticks = BinaryPrimitives.ReadInt64BigEndian(segment);
+
+ //ticks back to
+ return new(ticks);
+ }
+
+ /// <summary>
+ /// Sets the last modified time
+ /// </summary>
+ /// <param name="time">The new time to set the handle to</param>
+ /// <exception cref="ObjectDisposedException"></exception>
+ public readonly void SetTime(DateTime time)
+ {
+ //Get native ticks value
+ long timeData = time.Ticks;
+
+ //Get the time segment and write the value in big endian
+ Span<byte> segment = GetTimeSegment();
+
+ BinaryPrimitives.WriteInt64BigEndian(segment, timeData);
+ }
+
+ /// <summary>
+ /// Gets the length of the data segment
+ /// </summary>
+ /// <returns>The length of the data segment</returns>
+ /// <exception cref="ObjectDisposedException"></exception>
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public readonly int GetLength()
+ {
+ //Get the length segment
+ ReadOnlySpan<byte> segment = GetLengthSegment();
+ //Recover the integer
+ return BinaryPrimitives.ReadInt32BigEndian(segment);
+ }
+
+ private readonly void SetLength(int length)
+ {
+ //Get the length segment
+ Span<byte> segment = GetLengthSegment();
+
+ //Update the length value
+ BinaryPrimitives.WriteInt32BigEndian(segment, length);
+ }
+
+ /// <summary>
+ /// Gets the stored data segment
+ /// </summary>
+ /// <returns>The data segment</returns>
+ /// <exception cref="ObjectDisposedException"></exception>
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public readonly Span<byte> GetDataSegment()
+ {
+ //Get the actual length of the segment
+ int length = GetLength();
+ //Get the segment from its begining offset and
+ return _handle.AsSpan(DATA_SEGMENT_START, length);
+ }
+
+ /// <summary>
+ /// Writes the specified segment to the internal buffer and resizes the buffer if necessary.
+ /// This operation overwrites any previously stored data
+ /// </summary>
+ /// <param name="data">The data segment to store</param>
+ /// <exception cref="ObjectDisposedException"></exception>
+ public readonly void UpdateData(ReadOnlySpan<byte> data)
+ {
+ //Calc required buffer size
+ int bufferSize = GetRequiredHandleSize(data.Length);
+
+ //Resize handle if required
+ _handle.ResizeIfSmaller(bufferSize);
+
+ //Reset data length
+ SetLength(data.Length);
+
+ //Get the data segment
+ Span<byte> segment = GetDataSegment();
+
+#if DEBUG
+ //Test segment length is equvalent to the requested data length
+ System.Diagnostics.Debug.Assert(segment.Length == data.Length);
+#endif
+ //Copy data segment
+ data.CopyTo(segment);
+ }
+
+
+ ///<inheritdoc/>
+ public override bool Equals(object? obj) => obj is CacheEntry entry && Equals(entry);
+
+ ///<inheritdoc/>
+ public override int GetHashCode() => _handle.GetHashCode();
+
+ ///<inheritdoc/>
+ public static bool operator ==(CacheEntry left, CacheEntry right) => left.Equals(right);
+
+ ///<inheritdoc/>
+ public static bool operator !=(CacheEntry left, CacheEntry right) => !(left == right);
+
+ ///<inheritdoc/>
+ public bool Equals(CacheEntry other) => other.GetHashCode() == GetHashCode();
+ }
+}
diff --git a/lib/VNLib.Data.Caching.ObjectCache/src/CacheListener.cs b/lib/VNLib.Data.Caching.ObjectCache/src/CacheListener.cs
deleted file mode 100644
index 7f544e1..0000000
--- a/lib/VNLib.Data.Caching.ObjectCache/src/CacheListener.cs
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
-* Copyright (c) 2022 Vaughn Nugent
-*
-* Library: VNLib
-* Package: VNLib.Data.Caching.ObjectCache
-* File: CacheListener.cs
-*
-* CacheListener.cs is part of VNLib.Data.Caching.ObjectCache which is part of the larger
-* VNLib collection of libraries and utilities.
-*
-* VNLib.Data.Caching.ObjectCache is free software: you can redistribute it and/or modify
-* it under the terms of the GNU Affero General Public License as
-* published by the Free Software Foundation, either version 3 of the
-* License, or (at your option) any later version.
-*
-* VNLib.Data.Caching.ObjectCache is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-* GNU Affero General Public License for more details.
-*
-* You should have received a copy of the GNU Affero General Public License
-* along with this program. If not, see https://www.gnu.org/licenses/.
-*/
-
-using System;
-using System.IO;
-
-using VNLib.Utils.Memory;
-using VNLib.Net.Messaging.FBM.Server;
-
-namespace VNLib.Data.Caching
-{
- /// <summary>
- /// A base implementation of a memory/disk LRU data cache FBM listener
- /// </summary>
- public abstract class CacheListener : FBMListenerBase
- {
- /// <summary>
- /// The directory swap files will be stored
- /// </summary>
- public DirectoryInfo? Directory { get; private set; }
- /// <summary>
- /// The Cache store to access data blobs
- /// </summary>
- protected BlobCache? Cache { get; private set; }
- /// <summary>
- /// The <see cref="IUnmangedHeap"/> to allocate buffers from
- /// </summary>
- protected IUnmangedHeap? Heap { get; private set; }
-
- /// <summary>
- /// Initializes the <see cref="Cache"/> data store
- /// </summary>
- /// <param name="dir">The directory to swap cache records to</param>
- /// <param name="cacheSize">The size of the LRU cache</param>
- /// <param name="heap">The heap to allocate buffers from</param>
- protected void InitCache(DirectoryInfo dir, int cacheSize, IUnmangedHeap heap)
- {
- Heap = heap;
- Cache = new(dir, cacheSize, Log, Heap);
- Directory = dir;
- }
- }
-}
diff --git a/lib/VNLib.Data.Caching.ObjectCache/src/ChangeEvent.cs b/lib/VNLib.Data.Caching.ObjectCache/src/ChangeEvent.cs
index 7ee4427..2e8641e 100644
--- a/lib/VNLib.Data.Caching.ObjectCache/src/ChangeEvent.cs
+++ b/lib/VNLib.Data.Caching.ObjectCache/src/ChangeEvent.cs
@@ -27,11 +27,23 @@ namespace VNLib.Data.Caching.ObjectCache
/// <summary>
/// An event object that is passed when change events occur
/// </summary>
- public class ChangeEvent
+ public sealed class ChangeEvent
{
- public readonly string CurrentId;
- public readonly string? AlternateId;
- public readonly bool Deleted;
+ /// <summary>
+ /// The current id of the changed object
+ /// </summary>
+ public string CurrentId { get; }
+
+ /// <summary>
+ /// The alternate id of the changed object if specified
+ /// </summary>
+ public string? AlternateId { get; }
+
+ /// <summary>
+ /// A value that indicates if the object was deleted
+ /// </summary>
+ public bool Deleted { get; }
+
internal ChangeEvent(string id, string? alternate, bool deleted)
{
CurrentId = id;
diff --git a/lib/VNLib.Data.Caching.ObjectCache/src/ObjectCacheStore.cs b/lib/VNLib.Data.Caching.ObjectCache/src/ObjectCacheStore.cs
index 514d00b..05a4f02 100644
--- a/lib/VNLib.Data.Caching.ObjectCache/src/ObjectCacheStore.cs
+++ b/lib/VNLib.Data.Caching.ObjectCache/src/ObjectCacheStore.cs
@@ -34,6 +34,9 @@ using VNLib.Utils.Extensions;
using VNLib.Net.Messaging.FBM.Server;
using static VNLib.Data.Caching.Constants;
+
+#pragma warning disable CA1849 // Call async methods when in an async method
+
namespace VNLib.Data.Caching.ObjectCache
{
public delegate ReadOnlySpan<byte> GetBodyDataCallback<T>(T state);
@@ -41,7 +44,7 @@ namespace VNLib.Data.Caching.ObjectCache
/// <summary>
/// A <see cref="FBMListener"/> implementation of a <see cref="CacheListener"/>
/// </summary>
- public class ObjectCacheStore : CacheListener, IDisposable
+ public class ObjectCacheStore : FBMListenerBase, IDisposable
{
private readonly SemaphoreSlim StoreLock;
private bool disposedValue;
@@ -55,6 +58,14 @@ namespace VNLib.Data.Caching.ObjectCache
public AsyncQueue<ChangeEvent> EventQueue { get; }
/// <summary>
+ /// The Cache store to access data blobs
+ /// </summary>
+ private readonly BlobCache Cache;
+
+ private readonly IUnmangedHeap Heap;
+
+
+ /// <summary>
/// Initialzies a new <see cref="ObjectCacheStore"/>
/// </summary>
/// <param name="dir">The <see cref="DirectoryInfo"/> to store blob files to</param>
@@ -62,23 +73,25 @@ namespace VNLib.Data.Caching.ObjectCache
/// <param name="log"></param>
/// <param name="heap"></param>
/// <param name="singleReader">A value that indicates if a single thread is processing events</param>
- public ObjectCacheStore(DirectoryInfo dir, int cacheMax, ILogProvider log, IUnmangedHeap heap, bool singleReader)
+ public ObjectCacheStore(int cacheMax, ILogProvider log, IUnmangedHeap heap, bool singleReader)
{
Log = log;
//We can use a single writer and single reader in this context
EventQueue = new(true, singleReader);
- InitCache(dir, cacheMax, heap);
+ Cache = new(cacheMax);
+ Heap = heap;
InitListener(heap);
StoreLock = new(1,1);
}
///<inheritdoc/>
- protected override async Task ProcessAsync(FBMContext context, object? userState, CancellationToken cancellationToken)
+ protected override Task ProcessAsync(FBMContext context, object? userState, CancellationToken exitToken)
{
try
{
//Get the action header
string action = context.Method();
+
//Optional newid header
string? alternateId = context.NewObjectId();
@@ -88,82 +101,159 @@ namespace VNLib.Data.Caching.ObjectCache
{
//Get the object-id header
string objectId = context.ObjectId();
- //Take lock on store
- using SemSlimReleaser rel = await StoreLock.GetReleaserAsync(cancellationToken: cancellationToken);
- if (Cache!.TryGetValue(objectId, out MemoryHandle<byte>? data))
+
+ //Try read sync
+ if (StoreLock.Wait(0))
{
- //Set the status code and write the buffered data to the response buffer
- context.CloseResponse(ResponseCodes.Okay);
- //Copy data to response buffer
- context.Response.WriteBody(data.Span);
+ try
+ {
+ UnsafeReadEntry(context, objectId);
+ }
+ finally
+ {
+ StoreLock.Release();
+ }
+
+ return Task.CompletedTask;
}
else
{
- context.CloseResponse(ResponseCodes.NotFound);
+ //Read entry async
+ return InternalReadEntryAsync(context, objectId, exitToken);
}
}
- break;
+
case Actions.AddOrUpdate:
{
//Get the object-id header
string objectId = context.ObjectId();
- //Add/update a blob async
- await AddOrUpdateBlobAsync(objectId, alternateId, static context => context.Request.BodyData, context);
- //Notify update the event bus
- await EventQueue.EnqueueAsync(new(objectId, alternateId, false), cancellationToken);
- //Set status code
- context.CloseResponse(ResponseCodes.Okay);
+
+ //Create change event for the object
+ ChangeEvent change = new(objectId, alternateId, false);
+
+ //Attempt to aquire lock sync
+ if (StoreLock.Wait(0))
+ {
+ //aquired sync
+ try
+ {
+ //Update the item
+ UnsafeAddOrUpdate(objectId, alternateId, GetBodyData, context);
+ }
+ finally
+ {
+ StoreLock.Release();
+ }
+
+ //Add to event queue
+ EnqueEvent(change);
+
+ //Set status code
+ context.CloseResponse(ResponseCodes.Okay);
+
+ return Task.CompletedTask;
+ }
+ else
+ {
+ //Lock will be awaited async and
+ return InternalAddOrUpdateAsync(context, change, exitToken);
+ }
}
- break;
case Actions.Delete:
{
//Get the object-id header
string objectId = context.ObjectId();
-
- if (await DeleteItemAsync(objectId))
+
+ //Create change event
+ ChangeEvent change = new(objectId, alternateId, true);
+
+ //See if lock can be entered without waiting
+ if (StoreLock.Wait(0))
{
- //Notify deleted
- await EventQueue.EnqueueAsync(new(objectId, null, true), cancellationToken);
- //Set status header
- context.CloseResponse(ResponseCodes.Okay);
+ bool found = false;
+
+ try
+ {
+ //Sync
+ found = UnsafeDeleteEntry(objectId);
+ }
+ finally
+ {
+ StoreLock.Release();
+ }
+
+ //Notify change
+ EnqueEvent(change);
+
+ //Set status code if found
+ context.CloseResponse(found ? ResponseCodes.Okay : ResponseCodes.NotFound);
+
+ return Task.CompletedTask;
}
else
{
- //Set status header
- context.CloseResponse(ResponseCodes.NotFound);
+ //lock will yeild async
+ return InternalDeleteAsync(context, change, exitToken);
}
}
- break;
// event queue dequeue request
case Actions.Dequeue:
{
+ static void SetResponse(ChangeEvent change, FBMContext context)
+ {
+ if (change.Deleted)
+ {
+ context.CloseResponse("deleted");
+ context.Response.WriteHeader(ObjectId, change.CurrentId);
+ }
+ else
+ {
+ //Changed
+ context.CloseResponse("modified");
+ context.Response.WriteHeader(ObjectId, change.CurrentId);
+
+ //Set old id if an old id is set
+ if (change.CurrentId != null)
+ {
+ context.Response.WriteHeader(NewObjectId, change.AlternateId);
+ }
+ }
+ }
+
+ static async Task DequeAsync(AsyncQueue<ChangeEvent> queue, FBMContext context, CancellationToken exitToken)
+ {
+ //Wait for a new message to process
+ ChangeEvent ev = await queue.DequeueAsync(exitToken);
+
+ //Set the response
+ SetResponse(ev, context);
+ }
+
//If no event bus is registered, then this is not a legal command
if (userState is not AsyncQueue<ChangeEvent> eventBus)
{
context.CloseResponse(ResponseCodes.NotFound);
- break;
+
+ return Task.CompletedTask;
}
- //Wait for a new message to process
- ChangeEvent ev = await eventBus.DequeueAsync(cancellationToken);
- if (ev.Deleted)
+
+ //try to deq without awaiting
+ if (eventBus.TryDequeue(out ChangeEvent? change))
{
- context.CloseResponse("deleted");
- context.Response.WriteHeader(ObjectId, ev.CurrentId);
+ SetResponse(change, context);
+
+ return Task.CompletedTask;
}
else
{
- //Changed
- context.CloseResponse("modified");
- context.Response.WriteHeader(ObjectId, ev.CurrentId);
- //Set old id if an old id is set
- if (ev.CurrentId != null)
- {
- context.Response.WriteHeader(NewObjectId, ev.AlternateId);
- }
+ //Process async
+ return DequeAsync(eventBus, context, exitToken);
}
}
- break;
+
}
+
+ Log.Error("Unhandled cache event!");
}
catch (OperationCanceledException)
{
@@ -175,68 +265,178 @@ namespace VNLib.Data.Caching.ObjectCache
Log.Error(ex);
context.CloseResponse(ResponseCodes.Error);
}
+
+ return Task.CompletedTask;
}
+
+
+ private static ReadOnlySpan<byte> GetBodyData(FBMContext ctx) => ctx.Request.BodyData;
- /// <summary>
- /// Asynchronously deletes a previously stored item
- /// </summary>
- /// <param name="id">The id of the object to delete</param>
- /// <returns>A task that completes when the item has been deleted</returns>
- public async Task<bool> DeleteItemAsync(string id)
+ private void EnqueEvent(ChangeEvent change)
{
- using SemSlimReleaser rel = await StoreLock.GetReleaserAsync();
- return Cache!.Remove(id);
+ if (!EventQueue.TryEnque(change))
+ {
+ Log.Warn("Change event {ev} was not enqued because the event queue is overflowing!", change.CurrentId);
+ }
}
-
- /// <summary>
- /// Asynchronously adds or updates an object in the store and optionally update's its id
- /// </summary>
- /// <param name="objectId">The current (or old) id of the object</param>
- /// <param name="alternateId">An optional id to update the blob to</param>
- /// <param name="bodyData">A callback that returns the data for the blob</param>
- /// <param name="state">The state parameter to pass to the data callback</param>
- /// <returns></returns>
- public async Task AddOrUpdateBlobAsync<T>(string objectId, string? alternateId, GetBodyDataCallback<T> bodyData, T state)
+
+ private void UnsafeReadEntry(FBMContext context, string objectId)
{
- MemoryHandle<byte>? blob;
+ if (Cache!.TryGetValue(objectId, out CacheEntry data))
+ {
+ //Set the status code and write the buffered data to the response buffer
+ context.CloseResponse(ResponseCodes.Okay);
+
+ //Copy data to response buffer
+ context.Response.WriteBody(data.GetDataSegment());
+ }
+ else
+ {
+ context.CloseResponse(ResponseCodes.NotFound);
+ }
+ }
+
+ async Task InternalReadEntryAsync(FBMContext context, string objectId, CancellationToken cancellation)
+ {
+ //enter lock async
+ using SemSlimReleaser rel = await StoreLock.GetReleaserAsync(cancellation);
+
+ UnsafeReadEntry(context, objectId);
+ }
+
+ private async Task InternalAddOrUpdateAsync(FBMContext context, ChangeEvent change, CancellationToken cancellation)
+ {
+ //Wait for lock since we know it will yeild async
+ using (SemSlimReleaser rel = await StoreLock.GetReleaserAsync(cancellation))
+ {
+ UnsafeAddOrUpdate(change.CurrentId, change.AlternateId, GetBodyData, context);
+ }
+
+ //Add to event queue
+ EnqueEvent(change);
+
+ //Set status code
+ context.CloseResponse(ResponseCodes.Okay);
+ }
+
+ private void UnsafeAddOrUpdate<T>(string objectId, string? alternateId, GetBodyDataCallback<T> bodyData, T state)
+ {
+ CacheEntry entry;
+
//See if new/alt session id was specified
if (string.IsNullOrWhiteSpace(alternateId))
{
- //Take lock on store
- using SemSlimReleaser rel = await StoreLock.GetReleaserAsync();
//See if blob exists
- if (!Cache!.TryGetValue(objectId, out blob))
+ if (!Cache!.TryGetValue(objectId, out entry))
{
- //If not, create new blob and add to store
- blob = Heap.AllocAndCopy(bodyData(state));
- Cache.Add(objectId, blob);
+ //Create the new cache entry since it does not exist
+ entry = CacheEntry.Create(bodyData(state), Heap);
+
+ //Add to cache
+ Cache.Add(objectId, entry);
}
else
{
//Reset the buffer state
- blob.WriteAndResize(bodyData(state));
+ entry.UpdateData(bodyData(state));
}
}
//Need to change the id of the record
else
{
- //Take lock on store
- using SemSlimReleaser rel = await StoreLock.GetReleaserAsync();
//Try to change the blob key
- if (!Cache!.TryChangeKey(objectId, alternateId, out blob))
+ if (!Cache!.TryChangeKey(objectId, alternateId, out entry))
{
- //Blob not found, create new blob
- blob = Heap.AllocAndCopy(bodyData(state));
- Cache.Add(alternateId, blob);
+ //Create the new cache entry since it does not exist
+ entry = CacheEntry.Create(bodyData(state), Heap);
+
+ //Add to cache by its alternate id
+ Cache.Add(alternateId, entry);
}
else
{
//Reset the buffer state
- blob.WriteAndResize(bodyData(state));
+ entry.UpdateData(bodyData(state));
}
}
+
+ //Update modified time to current utc time
+ entry.SetTime(DateTime.UtcNow);
+ }
+
+ private async Task InternalDeleteAsync(FBMContext context, ChangeEvent change, CancellationToken cancellation)
+ {
+ bool found = false;
+
+ //enter the lock
+ using(SemSlimReleaser rel = await StoreLock.GetReleaserAsync(cancellation))
+ {
+ //Sync
+ found = UnsafeDeleteEntry(change.CurrentId);
+ }
+
+ //Notify change
+ EnqueEvent(change);
+
+ //Set status code if found
+ context.CloseResponse(found ? ResponseCodes.Okay : ResponseCodes.NotFound);
}
-
+
+ private bool UnsafeDeleteEntry(string id) => Cache!.Remove(id);
+
+
+ /// <summary>
+ /// Asynchronously adds or updates an object in the store and optionally update's its id
+ /// </summary>
+ /// <param name="objectId">The current (or old) id of the object</param>
+ /// <param name="alternateId">An optional id to update the blob to</param>
+ /// <param name="bodyData">A callback that returns the data for the blob</param>
+ /// <param name="state">The state parameter to pass to the data callback</param>
+ /// <param name="token">A token to cancel the async operation</param>
+ /// <returns>A value task that represents the async operation</returns>
+ public async ValueTask AddOrUpdateBlobAsync<T>(string objectId, string? alternateId, GetBodyDataCallback<T> bodyData, T state, CancellationToken token = default)
+ {
+ //Test the lock before waiting async
+ if (!StoreLock.Wait(0))
+ {
+ //Wait async to avoid task alloc
+ await StoreLock.WaitAsync(token);
+ }
+ try
+ {
+ UnsafeAddOrUpdate(objectId, alternateId, bodyData, state);
+ }
+ finally
+ {
+ StoreLock.Release();
+ }
+ }
+
+ /// <summary>
+ /// Asynchronously deletes a previously stored item
+ /// </summary>
+ /// <param name="id">The id of the object to delete</param>
+ /// <param name="token">A token to cancel the async lock await</param>
+ /// <returns>A task that completes when the item has been deleted</returns>
+ public async ValueTask<bool> DeleteItemAsync(string id, CancellationToken token = default)
+ {
+ //Test the lock before waiting async
+ if (!StoreLock.Wait(0))
+ {
+ //Wait async to avoid task alloc
+ await StoreLock.WaitAsync(token);
+ }
+ try
+ {
+ return UnsafeDeleteEntry(id);
+ }
+ finally
+ {
+ StoreLock.Release();
+ }
+ }
+
+
///<inheritdoc/>
protected virtual void Dispose(bool disposing)
{
@@ -252,6 +452,7 @@ namespace VNLib.Data.Caching.ObjectCache
disposedValue = true;
}
}
+
///<inheritdoc/>
public void Dispose()
{
diff --git a/lib/VNLib.Data.Caching/src/ClientExtensions.cs b/lib/VNLib.Data.Caching/src/ClientExtensions.cs
index 0a24a83..353bf67 100644
--- a/lib/VNLib.Data.Caching/src/ClientExtensions.cs
+++ b/lib/VNLib.Data.Caching/src/ClientExtensions.cs
@@ -105,16 +105,16 @@ namespace VNLib.Data.Caching
response.ThrowIfNotSet();
//Get the status code
- ReadOnlyMemory<char> status = response.Headers.FirstOrDefault(static a => a.Key == HeaderCommand.Status).Value;
+ FBMMessageHeader status = response.Headers.FirstOrDefault(static a => a.Header == HeaderCommand.Status);
//Check ok status code, then its safe to deserialize
- if (status.Span.Equals(ResponseCodes.Okay, StringComparison.Ordinal))
+ if (status.Value.Equals(ResponseCodes.Okay, StringComparison.Ordinal))
{
return JsonSerializer.Deserialize<T>(response.ResponseBody, LocalOptions);
}
//Object may not exist on the server yet
- if (status.Span.Equals(ResponseCodes.NotFound, StringComparison.Ordinal))
+ if (status.Value.Equals(ResponseCodes.NotFound, StringComparison.Ordinal))
{
return default;
}
@@ -181,14 +181,14 @@ namespace VNLib.Data.Caching
response.ThrowIfNotSet();
//Get the status code
- ReadOnlyMemory<char> status = response.Headers.FirstOrDefault(static a => a.Key == HeaderCommand.Status).Value;
+ FBMMessageHeader status = response.Headers.FirstOrDefault(static a => a.Header == HeaderCommand.Status);
//Check status code
- if (status.Span.Equals(ResponseCodes.Okay, StringComparison.OrdinalIgnoreCase))
+ if (status.Value.Equals(ResponseCodes.Okay, StringComparison.OrdinalIgnoreCase))
{
return;
}
- else if(status.Span.Equals(ResponseCodes.NotFound, StringComparison.OrdinalIgnoreCase))
+ else if(status.Value.Equals(ResponseCodes.NotFound, StringComparison.OrdinalIgnoreCase))
{
throw new ObjectNotFoundException($"object {objectId} not found on remote server");
}
@@ -236,13 +236,13 @@ namespace VNLib.Data.Caching
response.ThrowIfNotSet();
//Get the status code
- ReadOnlyMemory<char> status = response.Headers.FirstOrDefault(static a => a.Key == HeaderCommand.Status).Value;
+ FBMMessageHeader status = response.Headers.FirstOrDefault(static a => a.Header == HeaderCommand.Status);
- if (status.Span.Equals(ResponseCodes.Okay, StringComparison.Ordinal))
+ if (status.Value.Equals(ResponseCodes.Okay, StringComparison.Ordinal))
{
return;
}
- else if(status.Span.Equals(ResponseCodes.NotFound, StringComparison.OrdinalIgnoreCase))
+ else if(status.Value.Equals(ResponseCodes.NotFound, StringComparison.OrdinalIgnoreCase))
{
throw new ObjectNotFoundException($"object {objectId} not found on remote server");
}
@@ -277,9 +277,9 @@ namespace VNLib.Data.Caching
return new()
{
- Status = response.Headers.FirstOrDefault(static a => a.Key == HeaderCommand.Status).Value.ToString(),
- CurrentId = response.Headers.SingleOrDefault(static v => v.Key == Constants.ObjectId).Value.ToString(),
- NewId = response.Headers.SingleOrDefault(static v => v.Key == Constants.NewObjectId).Value.ToString()
+ Status = response.Headers.FirstOrDefault(static a => a.Header == HeaderCommand.Status).Value.ToString(),
+ CurrentId = response.Headers.SingleOrDefault(static v => v.Header == Constants.ObjectId).Value.ToString(),
+ NewId = response.Headers.SingleOrDefault(static v => v.Header == Constants.NewObjectId).Value.ToString()
};
}
finally
@@ -297,7 +297,7 @@ namespace VNLib.Data.Caching
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static string ObjectId(this FBMContext context)
{
- return context.Request.Headers.First(static kvp => kvp.Key == Constants.ObjectId).Value.ToString();
+ return context.Request.Headers.First(static kvp => kvp.Header == Constants.ObjectId).Value.ToString();
}
/// <summary>
@@ -308,7 +308,7 @@ namespace VNLib.Data.Caching
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static string? NewObjectId(this FBMContext context)
{
- return context.Request.Headers.FirstOrDefault(static kvp => kvp.Key == Constants.NewObjectId).Value.ToString();
+ return context.Request.Headers.FirstOrDefault(static kvp => kvp.Header == Constants.NewObjectId).Value.ToString();
}
/// <summary>
@@ -319,7 +319,7 @@ namespace VNLib.Data.Caching
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static string Method(this FBMContext context)
{
- return context.Request.Headers.First(static kvp => kvp.Key == HeaderCommand.Action).Value.ToString();
+ return context.Request.Headers.First(static kvp => kvp.Header == HeaderCommand.Action).Value.ToString();
}
/// <summary>
diff --git a/lib/VNLib.Plugins.Extensions.VNCache/src/VNCacheExtensions.cs b/lib/VNLib.Plugins.Extensions.VNCache/src/VNCacheExtensions.cs
index e5e6ee3..75b9bd4 100644
--- a/lib/VNLib.Plugins.Extensions.VNCache/src/VNCacheExtensions.cs
+++ b/lib/VNLib.Plugins.Extensions.VNCache/src/VNCacheExtensions.cs
@@ -67,7 +67,7 @@ namespace VNLib.Plugins.Extensions.VNCache
VnCacheClient client = new(debugLog);
//Begin cache connections by scheduling a task on the plugin's scheduler
- _ = pbase.DeferTask(() => RunClientAsync(pbase, config, localized, client), 250);
+ _ = pbase.ObserveTask(() => RunClientAsync(pbase, config, localized, client), 250);
return client;
}
diff --git a/plugins/CacheBroker/src/CacheBroker.csproj b/plugins/CacheBroker/src/CacheBroker.csproj
index 285d797..fa28dce 100644
--- a/plugins/CacheBroker/src/CacheBroker.csproj
+++ b/plugins/CacheBroker/src/CacheBroker.csproj
@@ -2,21 +2,15 @@
<PropertyGroup>
<TargetFramework>net6.0</TargetFramework>
- <Copyright>Copyright © 2023 Vaughn Nugent</Copyright>
<RootNamespace>VNLib.Plugins.Cache.Broker</RootNamespace>
- <Authors>Vaughn Nugent</Authors>
<Version>1.0.1.2</Version>
+ <GenerateDocumentationFile>True</GenerateDocumentationFile>
+ <AnalysisLevel>latest-all</AnalysisLevel>
<SignAssembly>True</SignAssembly>
<AssemblyOriginatorKeyFile>\\vaughnnugent.com\Internal\Folder Redirection\vman\Documents\Programming\Software\StrongNameingKey.snk</AssemblyOriginatorKeyFile>
</PropertyGroup>
<ItemGroup>
- <Compile Remove="liveplugin\**" />
- <EmbeddedResource Remove="liveplugin\**" />
- <None Remove="liveplugin\**" />
- </ItemGroup>
-
- <ItemGroup>
<PackageReference Include="ErrorProne.NET.CoreAnalyzers" Version="0.1.2">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
@@ -26,14 +20,13 @@
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
</ItemGroup>
-
+
<PropertyGroup>
<!--Enable dynamic loading-->
<EnableDynamicLoading>true</EnableDynamicLoading>
- <GenerateDocumentationFile>True</GenerateDocumentationFile>
+ <Authors>Vaughn Nugent</Authors>
+ <Copyright>Copyright © 2023 Vaughn Nugent</Copyright>
<PackageProjectUrl>https://www.vaughnnugent.com/resources/software</PackageProjectUrl>
- <AnalysisLevel>latest-all</AnalysisLevel>
-
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'">
@@ -50,12 +43,5 @@
<ProjectReference Include="..\..\..\..\..\core\lib\Utils\src\VNLib.Utils.csproj" />
<ProjectReference Include="..\..\..\..\Extensions\lib\VNLib.Plugins.Extensions.Loading\src\VNLib.Plugins.Extensions.Loading.csproj" />
</ItemGroup>
-
-
- <ItemGroup>
- <None Update="CacheBroker.json">
- <CopyToOutputDirectory>Always</CopyToOutputDirectory>
- </None>
- </ItemGroup>
-
+
</Project>
diff --git a/plugins/CacheBroker/src/Endpoints/BrokerRegistrationEndpoint.cs b/plugins/CacheBroker/src/Endpoints/BrokerRegistrationEndpoint.cs
index c2e0b84..8f983ac 100644
--- a/plugins/CacheBroker/src/Endpoints/BrokerRegistrationEndpoint.cs
+++ b/plugins/CacheBroker/src/Endpoints/BrokerRegistrationEndpoint.cs
@@ -56,11 +56,12 @@ using VNLib.Net.Rest.Client;
namespace VNLib.Plugins.Cache.Broker.Endpoints
{
[ConfigurationName("broker_endpoint")]
- public sealed class BrokerRegistrationEndpoint : ResourceEndpointBase
+ public sealed class BrokerRegistrationEndpoint : ResourceEndpointBase, IDisposable
{
const string HEARTBEAT_PATH = "/heartbeat";
- private static readonly RestClientPool ClientPool = new(10,new RestClientOptions()
+ //Client pool is instance based and may be disposed when the plugin is unloaded
+ private readonly RestClientPool ClientPool = new(10,new RestClientOptions()
{
Encoding = Encoding.UTF8,
FollowRedirects = false,
@@ -69,6 +70,7 @@ namespace VNLib.Plugins.Cache.Broker.Endpoints
}, null);
+ //Matches the json schema set by the FBM caching extensions library
private class ActiveServer
{
[JsonIgnore]
@@ -408,5 +410,12 @@ namespace VNLib.Plugins.Cache.Broker.Endpoints
_ = ActiveServers.Remove(server.ServerId!);
}
}
+
+
+ void IDisposable.Dispose()
+ {
+ //Cleanup client pool when exiting
+ ClientPool.Dispose();
+ }
}
}
diff --git a/plugins/ObjectCacheServer/src/Endpoints/BrokerHeartBeat.cs b/plugins/ObjectCacheServer/src/Endpoints/BrokerHeartBeat.cs
index bd1233e..3930f90 100644
--- a/plugins/ObjectCacheServer/src/Endpoints/BrokerHeartBeat.cs
+++ b/plugins/ObjectCacheServer/src/Endpoints/BrokerHeartBeat.cs
@@ -30,12 +30,14 @@ using System.Threading;
using System.Threading.Tasks;
using System.Collections.Generic;
+using VNLib.Plugins;
+using VNLib.Plugins.Essentials;
using VNLib.Hashing.IdentityUtility;
using VNLib.Plugins.Essentials.Endpoints;
using VNLib.Plugins.Essentials.Extensions;
using VNLib.Plugins.Extensions.Loading;
-namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints
+namespace VNLib.Data.Caching.ObjectCache.Server
{
internal sealed class BrokerHeartBeat : ResourceEndpointBase
{
diff --git a/plugins/ObjectCacheServer/src/Endpoints/CacheConfiguration.cs b/plugins/ObjectCacheServer/src/Endpoints/CacheConfiguration.cs
new file mode 100644
index 0000000..e9584b6
--- /dev/null
+++ b/plugins/ObjectCacheServer/src/Endpoints/CacheConfiguration.cs
@@ -0,0 +1,54 @@
+/*
+* Copyright (c) 2022 Vaughn Nugent
+*
+* Library: VNLib
+* Package: ObjectCacheServer
+* File: ConnectEndpoint.cs
+*
+* ConnectEndpoint.cs is part of ObjectCacheServer which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* ObjectCacheServer is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* ObjectCacheServer is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System.Text.Json.Serialization;
+
+namespace VNLib.Data.Caching.ObjectCache.Server
+{
+ internal sealed class CacheConfiguration
+ {
+ [JsonPropertyName("buffer_recv_max")]
+ public int MaxRecvBufferSize { get; set; } = 1000 * 1024;
+ [JsonPropertyName("buffer_recv_min")]
+ public int MinRecvBufferSize { get; set; } = 8 * 1024;
+
+
+ [JsonPropertyName("buffer_header_max")]
+ public int MaxHeaderBufferSize { get; set; } = 2 * 1024;
+ [JsonPropertyName("buffer_header_min")]
+ public int MinHeaderBufferSize { get; set; } = 128;
+
+
+ [JsonPropertyName("max_message_size")]
+ public int MaxMessageSize { get; set; } = 1000 * 1024;
+
+
+ [JsonPropertyName("change_queue_max_depth")]
+ public int MaxEventQueueDepth { get; set; } = 10 * 1000;
+
+
+ [JsonPropertyName("max_cache")]
+ public int MaxCacheEntries { get; set; } = 10000;
+ }
+}
diff --git a/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs b/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs
index 15cc086..9a1ece0 100644
--- a/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs
+++ b/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs
@@ -31,34 +31,29 @@ using System.Threading.Channels;
using System.Collections.Generic;
using System.Collections.Concurrent;
-using VNLib.Net.Http;
+using VNLib.Plugins;
using VNLib.Hashing;
+using VNLib.Net.Http;
using VNLib.Utils.Async;
+using VNLib.Utils.Memory;
using VNLib.Utils.Logging;
using VNLib.Hashing.IdentityUtility;
using VNLib.Net.Messaging.FBM;
using VNLib.Net.Messaging.FBM.Client;
using VNLib.Net.Messaging.FBM.Server;
-using VNLib.Data.Caching.ObjectCache;
+using VNLib.Plugins.Essentials;
using VNLib.Plugins.Extensions.Loading;
using VNLib.Plugins.Essentials.Endpoints;
using VNLib.Plugins.Essentials.Extensions;
+using System.Text.Json.Serialization;
-
-namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints
+namespace VNLib.Data.Caching.ObjectCache.Server
{
- internal sealed class ConnectEndpoint : ResourceEndpointBase
- {
- const int MAX_RECV_BUF_SIZE = 1000 * 1024;
- const int MIN_RECV_BUF_SIZE = 8 * 1024;
- const int MAX_HEAD_BUF_SIZE = 2048;
- const int MIN_MESSAGE_SIZE = 10 * 1024;
- const int MAX_MESSAGE_SIZE = 1000 * 1024;
- const int MIN_HEAD_BUF_SIZE = 128;
- const int MAX_EVENT_QUEUE_SIZE = 10000;
- const int MAX_RESPONSE_BUFFER_SIZE = 10 * 1024;
- private static readonly TimeSpan AuthTokenExpiration = TimeSpan.FromSeconds(30);
+ [ConfigurationName("store")]
+ internal sealed class ConnectEndpoint : ResourceEndpointBase, IDisposable, IAsyncBackgroundWork
+ {
+ private static readonly TimeSpan AuthTokenExpiration = TimeSpan.FromSeconds(30);
private readonly string AudienceLocalServerId;
private readonly ObjectCacheStore Store;
@@ -68,8 +63,16 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints
private uint _connectedClients;
+ /// <summary>
+ /// Gets the number of active connections
+ /// </summary>
public uint ConnectedClients => _connectedClients;
+ /// <summary>
+ /// The cache store configuration
+ /// </summary>
+ public CacheConfiguration CacheConfig { get; }
+
//Loosen up protection settings
protected override ProtectionSettings EndpointProtectionSettings { get; } = new()
{
@@ -78,20 +81,89 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints
DisableCrossSiteDenied = true
};
- public ConnectEndpoint(string path, ObjectCacheStore store, PluginBase pbase)
+ public ConnectEndpoint(PluginBase plugin, IReadOnlyDictionary<string, JsonElement> config)
{
- InitPathAndLog(path, pbase.Log);
- Store = store;//Load client public key to verify signed messages
- Pbase = pbase;
+ string? path = config["path"].GetString();
+ InitPathAndLog(path, plugin.Log);
+
+ Pbase = plugin;
+
+ //Parse cache config or use default
+ if(config.TryGetValue("cache", out JsonElement confEl))
+ {
+ CacheConfig = confEl.Deserialize<CacheConfiguration>()!;
+ }
+ else
+ {
+ //Init default config if not fount
+ CacheConfig = new();
+
+ Log.Verbose("Loading default cache buffer configuration");
+ }
+
+ //Create event queue client lookup table
StatefulEventQueue = new(StringComparer.OrdinalIgnoreCase);
- //Start the queue worker
- _ = pbase.DeferTask(() => ChangeWorkerAsync(pbase.UnloadToken), 10);
+ //Init the cache store
+ Store = InitializeCache((ObjectCacheServerEntry)plugin, CacheConfig.MaxCacheEntries);
+ /*
+ * Generate a random guid for the current server when created so we
+ * know client tokens belong to us when singed by the same key
+ */
AudienceLocalServerId = Guid.NewGuid().ToString("N");
+
+ //Schedule the queue worker to be run
+ _ = plugin.ObserveWork(this, 100);
+ }
+
+ private static ObjectCacheStore InitializeCache(ObjectCacheServerEntry plugin, int maxCache)
+ {
+ if(maxCache < 2)
+ {
+ throw new ArgumentException("You must configure a 'max_cache' size larger than 1 item");
+ }
+
+ //Suggestion
+ if(maxCache < 200)
+ {
+ plugin.Log.Information("Suggestion: You may want a larger cache size, you have less than 200 items in cache");
+ }
+
+ //Endpoint only allows for a single reader
+ return new (maxCache, plugin.Log, plugin.CacheHeap, true);
}
+ /// <summary>
+ /// Gets the configured cache store
+ /// </summary>
+ /// <returns></returns>
+ public ICacheStore GetCacheStore() => new CacheStore(Store);
+
+
+ //Dispose will be called by the host plugin on unload
+ void IDisposable.Dispose()
+ {
+ //Dispose the store on cleanup
+ Store.Dispose();
+ }
+
+
+ private async Task<ReadOnlyJsonWebKey> GetClientPubAsync()
+ {
+ return await Pbase.TryGetSecretAsync("client_public_key").ToJsonWebKey() ?? throw new KeyNotFoundException("Missing required secret : client_public_key");
+ }
+ private async Task<ReadOnlyJsonWebKey> GetCachePubAsync()
+ {
+ return await Pbase.TryGetSecretAsync("cache_public_key").ToJsonWebKey() ?? throw new KeyNotFoundException("Missing required secret : client_public_key");
+ }
+ private async Task<ReadOnlyJsonWebKey> GetCachePrivateKeyAsync()
+ {
+ return await Pbase.TryGetSecretAsync("cache_private_key").ToJsonWebKey() ?? throw new KeyNotFoundException("Missing required secret : client_public_key");
+ }
+
+
/*
* Used as a client negotiation and verification request
*
@@ -132,7 +204,7 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints
{
verified = true;
}
- //May be signed by a cahce server
+ //May be signed by a cache server
else
{
using ReadOnlyJsonWebKey cacheCert = await GetCachePubAsync();
@@ -163,8 +235,10 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints
}
Log.Debug("Received negotiation request from node {node}", nodeId);
+
//Verified, now we can create an auth message with a short expiration
using JsonWebToken auth = new();
+
//Sign the auth message from the cache certificate's private key
using (ReadOnlyJsonWebKey cert = await GetCachePrivateKeyAsync())
{
@@ -179,9 +253,9 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints
//Specify the server's node id if set
.AddClaim("sub", nodeId!)
//Add negotiaion args
- .AddClaim(FBMClient.REQ_HEAD_BUF_QUERY_ARG, MAX_HEAD_BUF_SIZE)
- .AddClaim(FBMClient.REQ_RECV_BUF_QUERY_ARG, MAX_RECV_BUF_SIZE)
- .AddClaim(FBMClient.REQ_MAX_MESS_QUERY_ARG, MAX_MESSAGE_SIZE)
+ .AddClaim(FBMClient.REQ_HEAD_BUF_QUERY_ARG, CacheConfig.MaxHeaderBufferSize)
+ .AddClaim(FBMClient.REQ_RECV_BUF_QUERY_ARG, CacheConfig.MaxRecvBufferSize)
+ .AddClaim(FBMClient.REQ_MAX_MESS_QUERY_ARG, CacheConfig.MaxMessageSize)
.CommitClaims();
auth.SignFromJwk(cert);
@@ -192,27 +266,17 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints
return VfReturnType.VirtualSkip;
}
- private async Task<ReadOnlyJsonWebKey> GetClientPubAsync()
- {
- return await Pbase.TryGetSecretAsync("client_public_key").ToJsonWebKey() ?? throw new KeyNotFoundException("Missing required secret : client_public_key");
- }
- private async Task<ReadOnlyJsonWebKey> GetCachePubAsync()
- {
- return await Pbase.TryGetSecretAsync("cache_public_key").ToJsonWebKey() ?? throw new KeyNotFoundException("Missing required secret : client_public_key");
- }
- private async Task<ReadOnlyJsonWebKey> GetCachePrivateKeyAsync()
- {
- return await Pbase.TryGetSecretAsync("cache_private_key").ToJsonWebKey() ?? throw new KeyNotFoundException("Missing required secret : client_public_key");
- }
- private async Task ChangeWorkerAsync(CancellationToken cancellation)
+ //Background worker to process event queue items
+ async Task IAsyncBackgroundWork.DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken)
{
try
{
//Listen for changes
while (true)
{
- ChangeEvent ev = await Store.EventQueue.DequeueAsync(cancellation);
+ ChangeEvent ev = await Store.EventQueue.DequeueAsync(exitToken);
+
//Add event to queues
foreach (AsyncQueue<ChangeEvent> queue in StatefulEventQueue.Values)
{
@@ -224,10 +288,8 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints
}
}
catch (OperationCanceledException)
- { }
- catch (Exception ex)
{
- Log.Error(ex);
+ //Normal exit
}
}
@@ -238,6 +300,12 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints
public int MaxMessageSize { get; init; }
public int MaxResponseBufferSize { get; init; }
public AsyncQueue<ChangeEvent>? SyncQueue { get; init; }
+
+ public override string ToString()
+ {
+ return
+ $"{nameof(RecvBufferSize)}:{RecvBufferSize}, {nameof(MaxHeaderBufferSize)}: {MaxHeaderBufferSize}, {nameof(MaxMessageSize)}:{MaxMessageSize}, {nameof(MaxResponseBufferSize)}:{MaxResponseBufferSize}";
+ }
}
protected override async ValueTask<VfReturnType> WebsocketRequestedAsync(HttpEntity entity)
@@ -246,6 +314,7 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints
{
//Parse jwt from authorization
string? jwtAuth = entity.Server.Headers[HttpRequestHeader.Authorization];
+
if (string.IsNullOrWhiteSpace(jwtAuth))
{
entity.CloseResponse(HttpStatusCode.Unauthorized);
@@ -253,6 +322,7 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints
}
string? nodeId = null;
+
//Parse jwt
using (JsonWebToken jwt = JsonWebToken.Parse(jwtAuth))
{
@@ -301,11 +371,12 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints
string maxMessageSizeCmd = entity.QueryArgs[FBMClient.REQ_MAX_MESS_QUERY_ARG];
//Parse recv buffer size
- int recvBufSize = int.TryParse(recvBufCmd, out int rbs) ? rbs : MIN_RECV_BUF_SIZE;
- int maxHeadBufSize = int.TryParse(maxHeaderCharCmd, out int hbs) ? hbs : MIN_HEAD_BUF_SIZE;
- int maxMessageSize = int.TryParse(maxMessageSizeCmd, out int mxs) ? mxs : MIN_MESSAGE_SIZE;
+ int recvBufSize = int.TryParse(recvBufCmd, out int rbs) ? rbs : CacheConfig.MinRecvBufferSize;
+ int maxHeadBufSize = int.TryParse(maxHeaderCharCmd, out int hbs) ? hbs : CacheConfig.MinHeaderBufferSize;
+ int maxMessageSize = int.TryParse(maxMessageSizeCmd, out int mxs) ? mxs : CacheConfig.MaxMessageSize;
AsyncQueue<ChangeEvent>? nodeQueue = null;
+
//The connection may be a caching server node, so get its node-id
if (!string.IsNullOrWhiteSpace(nodeId))
{
@@ -317,7 +388,7 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints
* and change events may be processed on mutliple threads.
*/
- BoundedChannelOptions queueOptions = new(MAX_EVENT_QUEUE_SIZE)
+ BoundedChannelOptions queueOptions = new(CacheConfig.MaxEventQueueDepth)
{
AllowSynchronousContinuations = true,
SingleReader = false,
@@ -327,21 +398,41 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints
};
_ = StatefulEventQueue.TryAdd(nodeId, new(queueOptions));
+
//Get the queue
nodeQueue = StatefulEventQueue[nodeId];
}
-
+
+ /*
+ * Buffer sizing can get messy as the response/resquest sizes can vary
+ * and will include headers, this is a drawback of the FBM protocol
+ * so we need to properly calculate efficient buffer sizes as
+ * negotiated with the client.
+ */
+
+ int maxMessageSizeClamp = Math.Clamp(maxMessageSize, CacheConfig.MinRecvBufferSize, CacheConfig.MaxRecvBufferSize);
+
//Init new ws state object and clamp the suggested buffer sizes
WsUserState state = new()
{
- RecvBufferSize = Math.Clamp(recvBufSize, MIN_RECV_BUF_SIZE, MAX_RECV_BUF_SIZE),
- MaxHeaderBufferSize = Math.Clamp(maxHeadBufSize, MIN_HEAD_BUF_SIZE, MAX_HEAD_BUF_SIZE),
- MaxMessageSize = Math.Clamp(maxMessageSize, MIN_MESSAGE_SIZE, MAX_MESSAGE_SIZE),
- MaxResponseBufferSize = Math.Min(maxMessageSize, MAX_RESPONSE_BUFFER_SIZE),
+ RecvBufferSize = Math.Clamp(recvBufSize, CacheConfig.MinRecvBufferSize, CacheConfig.MaxRecvBufferSize),
+ MaxHeaderBufferSize = Math.Clamp(maxHeadBufSize, CacheConfig.MinHeaderBufferSize, CacheConfig.MaxHeaderBufferSize),
+
+ MaxMessageSize = maxMessageSizeClamp,
+
+ /*
+ * Response buffer needs to be large enough to store a max message
+ * as a response along with all response headers
+ */
+ MaxResponseBufferSize = (int)MemoryUtil.NearestPage(maxMessageSizeClamp),
+
SyncQueue = nodeQueue
};
Log.Debug("Client recv buffer suggestion {recv}, header buffer size {head}, response buffer size {r}", recvBufCmd, maxHeaderCharCmd, state.MaxResponseBufferSize);
+
+ //Print state message to console
+ Log.Verbose("Client buffer state {state}", state);
//Accept socket and pass state object
entity.AcceptWebSocket(WebsocketAcceptedAsync, state);
@@ -370,6 +461,7 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints
RecvBufferSize = state.RecvBufferSize,
ResponseBufferSize = state.MaxResponseBufferSize,
MaxHeaderBufferSize = state.MaxHeaderBufferSize,
+
HeaderEncoding = Helpers.DefaultEncoding,
};
@@ -395,5 +487,31 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints
}
Log.Debug("Server websocket exited");
}
+
+
+ private sealed class CacheStore : ICacheStore
+ {
+ private readonly ObjectCacheStore _cache;
+
+ public CacheStore(ObjectCacheStore cache)
+ {
+ _cache = cache;
+ }
+
+ ValueTask ICacheStore.AddOrUpdateBlobAsync<T>(string objectId, string? alternateId, GetBodyDataCallback<T> bodyData, T state, CancellationToken token)
+ {
+ return _cache.AddOrUpdateBlobAsync(objectId, alternateId, bodyData, state, token);
+ }
+
+ void ICacheStore.Clear()
+ {
+ throw new NotImplementedException();
+ }
+
+ ValueTask<bool> ICacheStore.DeleteItemAsync(string id, CancellationToken token)
+ {
+ return _cache.DeleteItemAsync(id, token);
+ }
+ }
}
}
diff --git a/plugins/ObjectCacheServer/src/Endpoints/ICacheStore.cs b/plugins/ObjectCacheServer/src/Endpoints/ICacheStore.cs
new file mode 100644
index 0000000..3776269
--- /dev/null
+++ b/plugins/ObjectCacheServer/src/Endpoints/ICacheStore.cs
@@ -0,0 +1,56 @@
+/*
+* Copyright (c) 2022 Vaughn Nugent
+*
+* Library: VNLib
+* Package: ObjectCacheServer
+* File: ConnectEndpoint.cs
+*
+* ConnectEndpoint.cs is part of ObjectCacheServer which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* ObjectCacheServer is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* ObjectCacheServer is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace VNLib.Data.Caching.ObjectCache.Server
+{
+ internal interface ICacheStore
+ {
+ /// <summary>
+ /// Asynchronously adds or updates an object in the store and optionally update's its id
+ /// </summary>
+ /// <param name="objectId">The current (or old) id of the object</param>
+ /// <param name="alternateId">An optional id to update the blob to</param>
+ /// <param name="bodyData">A callback that returns the data for the blob</param>
+ /// <param name="state">The state parameter to pass to the data callback</param>
+ /// <param name="token">A token to cancel the async operation</param>
+ /// <returns>A value task that represents the async operation</returns>
+ ValueTask AddOrUpdateBlobAsync<T>(string objectId, string? alternateId, GetBodyDataCallback<T> bodyData, T state, CancellationToken token = default);
+
+ /// <summary>
+ /// Clears all items from the store
+ /// </summary>
+ void Clear();
+
+ /// <summary>
+ /// Asynchronously deletes a previously stored item
+ /// </summary>
+ /// <param name="id">The id of the object to delete</param>
+ /// <param name="token">A token to cancel the async lock await</param>
+ /// <returns>A task that completes when the item has been deleted</returns>
+ ValueTask<bool> DeleteItemAsync(string id, CancellationToken token = default);
+ }
+}
diff --git a/plugins/ObjectCacheServer/src/ObjectCacheServer.csproj b/plugins/ObjectCacheServer/src/ObjectCacheServer.csproj
index 672597b..38f5b97 100644
--- a/plugins/ObjectCacheServer/src/ObjectCacheServer.csproj
+++ b/plugins/ObjectCacheServer/src/ObjectCacheServer.csproj
@@ -2,32 +2,23 @@
<PropertyGroup>
<TargetFramework>net6.0</TargetFramework>
<Nullable>enable</Nullable>
- <Authors>Vaughn Nugent</Authors>
+ <RootNamespace>VNLib.Data.Caching.ObjectCache.Server</RootNamespace>
<Version>1.0.1.1</Version>
- <RootNamespace>VNLib.Plugins.Essentials.Sessions.Server</RootNamespace>
- <Copyright>Copyright © 2023 Vaughn Nugent</Copyright>
<SignAssembly>True</SignAssembly>
<AssemblyOriginatorKeyFile>\\vaughnnugent.com\Internal\Folder Redirection\vman\Documents\Programming\Software\StrongNameingKey.snk</AssemblyOriginatorKeyFile>
+ <EnableDynamicLoading>true</EnableDynamicLoading>
+ <AnalysisLevel>latest-all</AnalysisLevel>
+ <ProduceReferenceAssembly>True</ProduceReferenceAssembly>
+ <GenerateDocumentationFile>False</GenerateDocumentationFile>
</PropertyGroup>
<!-- Resolve nuget dll files and store them in the output dir -->
<PropertyGroup>
- <EnableDynamicLoading>true</EnableDynamicLoading>
- <GenerateDocumentationFile>False</GenerateDocumentationFile>
- <PackageProjectUrl>https://www.vaughnnugent.com/resources</PackageProjectUrl>
- <AnalysisLevel>latest-all</AnalysisLevel>
- <ProduceReferenceAssembly>True</ProduceReferenceAssembly>
-
+ <Authors>Vaughn Nugent</Authors>
+ <Copyright>Copyright © 2023 Vaughn Nugent</Copyright>
+ <PackageProjectUrl>https://www.vaughnnugent.com/resources/software</PackageProjectUrl>
</PropertyGroup>
<ItemGroup>
- <Compile Remove="liveplugin2\**" />
- <Compile Remove="liveplugin\**" />
- <EmbeddedResource Remove="liveplugin2\**" />
- <EmbeddedResource Remove="liveplugin\**" />
- <None Remove="liveplugin2\**" />
- <None Remove="liveplugin\**" />
- </ItemGroup>
- <ItemGroup>
<PackageReference Include="ErrorProne.NET.CoreAnalyzers" Version="0.1.2">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
@@ -42,10 +33,4 @@
<ProjectReference Include="..\..\..\lib\VNLib.Data.Caching.ObjectCache\src\VNLib.Data.Caching.ObjectCache.csproj" />
<ProjectReference Include="..\..\CacheBroker\src\CacheBroker.csproj" />
</ItemGroup>
- <ItemGroup>
- <None Update="ObjectCacheServer.json">
- <CopyToOutputDirectory>Always</CopyToOutputDirectory>
- </None>
- </ItemGroup>
-
</Project>
diff --git a/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs b/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs
index 7b7b9fb..d6dbd9b 100644
--- a/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs
+++ b/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs
@@ -34,34 +34,67 @@ using System.Threading.Tasks;
using System.Collections.Generic;
using System.Security.Cryptography;
+using VNLib.Plugins;
using VNLib.Utils.Memory;
using VNLib.Utils.Logging;
-using VNLib.Utils.Extensions;
+using VNLib.Utils.Memory.Diagnostics;
using VNLib.Hashing;
using VNLib.Hashing.IdentityUtility;
-using VNLib.Data.Caching;
using VNLib.Data.Caching.Extensions;
-using VNLib.Data.Caching.ObjectCache;
using static VNLib.Data.Caching.Constants;
using VNLib.Net.Messaging.FBM;
using VNLib.Net.Messaging.FBM.Client;
using VNLib.Plugins.Cache.Broker.Endpoints;
using VNLib.Plugins.Extensions.Loading;
using VNLib.Plugins.Extensions.Loading.Routing;
-using VNLib.Plugins.Essentials.Sessions.Server.Endpoints;
-namespace VNLib.Plugins.Essentials.Sessions.Server
+namespace VNLib.Data.Caching.ObjectCache.Server
{
public sealed class ObjectCacheServerEntry : PluginBase
{
public override string PluginName => "ObjectCache.Service";
- private string? BrokerHeartBeatToken;
+ private readonly Lazy<IUnmangedHeap> _cacheHeap;
+ private readonly object ServerLock;
+ private readonly HashSet<ActiveServer> ListeningServers;
+ private readonly ManualResetEvent BrokerSyncHandle;
+
+ /// <summary>
+ /// Gets the shared heap for the plugin
+ /// </summary>
+ internal IUnmangedHeap CacheHeap => _cacheHeap.Value;
- private readonly object ServerLock = new();
- private readonly HashSet<ActiveServer> ListeningServers = new();
+ public ObjectCacheServerEntry()
+ {
+ //Init heap
+ _cacheHeap = new Lazy<IUnmangedHeap>(InitializeHeap, LazyThreadSafetyMode.PublicationOnly);
+
+ ServerLock = new();
+ ListeningServers = new();
+ //Set sync handle
+ BrokerSyncHandle = new(false);
+ }
+
+ private IUnmangedHeap InitializeHeap()
+ {
+ //Create default heap
+ IUnmangedHeap _heap = MemoryUtil.InitializeNewHeapForProcess();
+ try
+ {
+ //If the plugin is in debug mode enable heap tracking
+ return this.IsDebug() ? new TrackedHeapWrapper(_heap) : _heap;
+ }
+ catch
+ {
+ _heap.Dispose();
+ throw;
+ }
+ }
+
+
+ private string? BrokerHeartBeatToken;
private void RemoveServer(ActiveServer server)
{
@@ -71,55 +104,45 @@ namespace VNLib.Plugins.Essentials.Sessions.Server
}
}
+ private FBMClientConfig ClientConfig;
+
+
protected override void OnLoad()
{
- //Create default heap
- IUnmangedHeap CacheHeap = MemoryUtil.InitializeNewHeapForProcess();
try
{
IReadOnlyDictionary<string, JsonElement> clusterConf = this.GetConfig("cluster");
- string brokerAddress = clusterConf["broker_address"].GetString() ?? throw new KeyNotFoundException("Missing required key 'broker_address' for config 'cluster'");
-
- string swapDir = PluginConfig.GetProperty("swap_dir").GetString() ?? throw new KeyNotFoundException("Missing required key 'swap_dir' for config");
- int cacheSize = PluginConfig.GetProperty("max_cache").GetInt32();
- string connectPath = PluginConfig.GetProperty("connect_path").GetString() ?? throw new KeyNotFoundException("Missing required element 'connect_path' for config 'cluster'");
- //TimeSpan cleanupInterval = PluginConfig.GetProperty("cleanup_interval_sec").GetTimeSpan(TimeParseType.Seconds);
- //TimeSpan validFor = PluginConfig.GetProperty("valid_for_sec").GetTimeSpan(TimeParseType.Seconds);
- int maxMessageSize = PluginConfig.GetProperty("max_blob_size").GetInt32();
+ Uri brokerAddress = new(clusterConf["broker_address"].GetString() ?? throw new KeyNotFoundException("Missing required key 'broker_address' for config 'cluster'"));
- //Init dir
- DirectoryInfo dir = new(swapDir);
- dir.Create();
- //Init cache listener, single threaded reader
- ObjectCacheStore CacheListener = new(dir, cacheSize, Log, CacheHeap, true);
-
+
//Init connect endpoint
- {
- //Init connect endpoint
- ConnectEndpoint endpoint = new(connectPath, CacheListener, this);
- Route(endpoint);
- }
-
+ ConnectEndpoint endpoint = this.Route<ConnectEndpoint>();
+
+ //Get the cache store from the connection endpoint
+ ICacheStore store = endpoint.GetCacheStore();
+
+ //Log max memory usage
+ Log.Debug("Maxium memory consumption {mx}Mb", ((ulong)endpoint.CacheConfig.MaxCacheEntries * (ulong)endpoint.CacheConfig.MaxMessageSize) / (ulong)(1024 * 1000));
+
//Setup broker and regitration
{
- //init mre to pass the broker heartbeat signal to the registration worker
- ManualResetEvent mre = new(false);
+
//Route the broker endpoint
- BrokerHeartBeat brokerEp = new(() => BrokerHeartBeatToken!, mre, new Uri(brokerAddress), this);
+ BrokerHeartBeat brokerEp = new(() => BrokerHeartBeatToken!, BrokerSyncHandle, brokerAddress, this);
Route(brokerEp);
//start registration
- _ = this.DeferTask(() => RegisterServerAsync(mre), 200);
+ _ = this.ObserveTask(() => RegisterServerAsync(endpoint.Path), 200);
}
//Setup cluster worker
{
//Get pre-configured fbm client config for caching
- FBMClientConfig conf = FBMDataCacheExtensions.GetDefaultConfig(CacheHeap, maxMessageSize, this.IsDebug() ? Log : null);
+ ClientConfig = FBMDataCacheExtensions.GetDefaultConfig(CacheHeap, endpoint.CacheConfig.MaxMessageSize / 2, this.IsDebug() ? Log : null);
//Start Client runner
- _ = this.DeferTask(() => RunClientAsync(CacheListener, new Uri(brokerAddress), conf), 300);
+ _ = this.ObserveTask(() => RunClientAsync(store, brokerAddress), 300);
}
//Load a cache broker to the current server if the config is defined
@@ -129,38 +152,32 @@ namespace VNLib.Plugins.Essentials.Sessions.Server
this.Route<BrokerRegistrationEndpoint>();
}
}
-
- void Cleanup()
- {
- CacheHeap.Dispose();
- CacheListener.Dispose();
- }
-
- //Regsiter cleanup
- _ = UnloadToken.RegisterUnobserved(Cleanup);
Log.Information("Plugin loaded");
}
catch (KeyNotFoundException kne)
{
- CacheHeap.Dispose();
Log.Error("Missing required configuration variables {m}", kne.Message);
}
- catch
- {
- CacheHeap.Dispose();
- throw;
- }
}
protected override void OnUnLoad()
{
+ //dispose heap if initialized
+ if(_cacheHeap.IsValueCreated)
+ {
+ _cacheHeap.Value.Dispose();
+ }
+
+ //Dispose mre sync handle
+ BrokerSyncHandle.Dispose();
+
Log.Information("Plugin unloaded");
}
#region Registration
- private async Task RegisterServerAsync(ManualResetEvent keepaliveWait)
+ private async Task RegisterServerAsync(string connectPath)
{
try
{
@@ -169,9 +186,9 @@ namespace VNLib.Plugins.Essentials.Sessions.Server
//Server id is just dns name for now
string serverId = Dns.GetHostName();
- int heartBeatDelayMs = clusterConfig["heartbeat_timeout_sec"].GetInt32() * 1000;
- string? connectPath = PluginConfig.GetProperty("connect_path").GetString();
+ int heartBeatDelayMs = clusterConfig["heartbeat_timeout_sec"].GetInt32() * 1000;
+
//Get the port of the primary webserver
int port;
@@ -227,15 +244,17 @@ namespace VNLib.Plugins.Essentials.Sessions.Server
while (true)
{
await Task.Delay(heartBeatDelayMs, UnloadToken);
+
//Set the timeout to 0 to it will just check the status without blocking
- if (!keepaliveWait.WaitOne(0))
+ if (!BrokerSyncHandle.WaitOne(0))
{
//server miseed a keepalive event, time to break the loop and retry
Log.Debug("Broker missed a heartbeat request, attempting to re-register");
break;
}
+
//Reset the msr
- keepaliveWait.Reset();
+ BrokerSyncHandle.Reset();
}
}
catch (TaskCanceledException)
@@ -275,7 +294,6 @@ namespace VNLib.Plugins.Essentials.Sessions.Server
}
finally
{
- keepaliveWait.Dispose();
BrokerHeartBeatToken = null;
}
Log.Debug("Registration worker exited");
@@ -305,20 +323,25 @@ namespace VNLib.Plugins.Essentials.Sessions.Server
/// <param name="serverId">The node-id of the current server</param>
/// <param name="clientConf">The configuration to use when initializing synchronization clients</param>
/// <returns>A task that resolves when the plugin unloads</returns>
- private async Task RunClientAsync(ObjectCacheStore cacheStore, Uri brokerAddress, FBMClientConfig clientConf)
+ private async Task RunClientAsync(ICacheStore cacheStore, Uri brokerAddress)
{
TimeSpan noServerDelay = TimeSpan.FromSeconds(10);
+
+ //The node id is just the dns hostname of the current machine
string nodeId = Dns.GetHostName();
+
ListServerRequest listRequest = new(brokerAddress);
try
{
//Get the broker config element
IReadOnlyDictionary<string, JsonElement> clusterConf = this.GetConfig("cluster");
+
int serverCheckMs = clusterConf["update_interval_sec"].GetInt32() * 1000;
//Setup signing and verification certificates
ReadOnlyJsonWebKey cacheSig = await GetCachePrivate();
ReadOnlyJsonWebKey brokerPub = await GetBrokerPublic();
+
//Import certificates
listRequest.WithVerificationKey(brokerPub)
.WithSigningKey(cacheSig);
@@ -340,6 +363,7 @@ namespace VNLib.Plugins.Essentials.Sessions.Server
//Get server list
servers = await FBMDataCacheExtensions.ListServersAsync(listRequest, UnloadToken);
+
//Servers are loaded, so continue
break;
}
@@ -355,8 +379,10 @@ namespace VNLib.Plugins.Essentials.Sessions.Server
{
Log.Warn(ex, "Failed to get server list from broker");
}
+
//Gen random ms delay
int randomMsDelay = RandomNumberGenerator.GetInt32(1000, 2000);
+
//Delay
await Task.Delay(randomMsDelay, UnloadToken);
}
@@ -386,7 +412,7 @@ namespace VNLib.Plugins.Essentials.Sessions.Server
ListeningServers.Add(server);
//Run listener background task
- _ = this.DeferTask(() => RunSyncTaskAsync(server, cacheStore, clientConf, nodeId));
+ _ = this.ObserveTask(() => RunSyncTaskAsync(server, cacheStore, nodeId));
}
}
}
@@ -417,10 +443,10 @@ namespace VNLib.Plugins.Essentials.Sessions.Server
Log.Debug("Cluster sync worker exited");
}
- private async Task RunSyncTaskAsync(ActiveServer server, ObjectCacheStore cacheStore, FBMClientConfig conf, string nodeId)
+ private async Task RunSyncTaskAsync(ActiveServer server, ICacheStore cacheStore, string nodeId)
{
//Setup client
- FBMClient client = new(conf);
+ FBMClient client = new(ClientConfig);
try
{
async Task UpdateRecordAsync(string objectId, string newId)
@@ -440,7 +466,7 @@ namespace VNLib.Plugins.Essentials.Sessions.Server
response.ThrowIfNotSet();
//Check response code
- string status = response.Headers.First(static s => s.Key == HeaderCommand.Status).Value.ToString();
+ string status = response.Headers.First(static s => s.Header == HeaderCommand.Status).Value.ToString();
if (ResponseCodes.Okay.Equals(status, StringComparison.Ordinal))
{
//Update the record