diff options
17 files changed, 1012 insertions, 575 deletions
diff --git a/lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs b/lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs index ebdfd5b..222240a 100644 --- a/lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs +++ b/lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs @@ -81,14 +81,28 @@ namespace VNLib.Data.Caching.Extensions /// <returns>A preconfigured <see cref="FBMClientConfig"/> for object caching</returns> public static FBMClientConfig GetDefaultConfig(IUnmangedHeap heap, int maxMessageSize, ILogProvider? debugLog = null) { + /* + * Max message size (for server) should account for max data + the additional header buffer + */ + int maxExtra = (int)Helpers.ToNearestKb((int)(maxMessageSize * 1.2) + MAX_FBM_MESSAGE_HEADER_SIZE); + return new() { BufferHeap = heap, - MaxMessageSize = maxMessageSize * 2, - RecvBufferSize = maxMessageSize, - MessageBufferSize = maxMessageSize, + + //Max message size is referrences + MaxMessageSize = maxExtra, + + //The size of the buffer used for buffering incoming messages + RecvBufferSize = maxExtra, + //Message buffer should be max message + headers + MessageBufferSize = (int)Helpers.ToNearestKb(maxMessageSize + MAX_FBM_MESSAGE_HEADER_SIZE), + + //Caching only requires a fixed number of request headers, so we can used a fixed buffer size MaxHeaderBufferSize = MAX_FBM_MESSAGE_HEADER_SIZE, + + //Set the optional cache sub-protocol SubProtocol = CACHE_WS_SUB_PROCOL, HeaderEncoding = Helpers.DefaultEncoding, diff --git a/lib/VNLib.Data.Caching.ObjectCache/src/BlobCache.cs b/lib/VNLib.Data.Caching.ObjectCache/src/BlobCache.cs index 26df351..fbb2dcc 100644 --- a/lib/VNLib.Data.Caching.ObjectCache/src/BlobCache.cs +++ b/lib/VNLib.Data.Caching.ObjectCache/src/BlobCache.cs @@ -23,116 +23,134 @@ */ using System; -using System.IO; -using System.Linq; using System.Collections.Generic; -using System.Diagnostics.CodeAnalysis; -using VNLib.Utils.IO; -using VNLib.Utils.Logging; -using VNLib.Utils.Memory; using VNLib.Utils.Memory.Caching; namespace VNLib.Data.Caching { + /// <summary> /// A general purpose binary data storage /// </summary> - public class BlobCache : LRUCache<string, MemoryHandle<byte>> + public class BlobCache : LRUCache<string, CacheEntry> { - readonly IUnmangedHeap Heap; - readonly DirectoryInfo SwapDir; - readonly ILogProvider Log; ///<inheritdoc/> public override bool IsReadOnly { get; } + ///<inheritdoc/> protected override int MaxCapacity { get; } - - + + /// <summary> /// Initializes a new <see cref="BlobCache"/> store /// </summary> - /// <param name="swapDir">The <see cref="IsolatedStorageDirectory"/> to swap blob data to when cache</param> /// <param name="maxCapacity">The maximum number of items to keep in memory</param> - /// <param name="log">A <see cref="ILogProvider"/> to write log data to</param> - /// <param name="heap">A <see cref="IUnmangedHeap"/> to allocate buffers and store <see cref="BlobItem"/> data in memory</param> - public BlobCache(DirectoryInfo swapDir, int maxCapacity, ILogProvider log, IUnmangedHeap heap) + /// <exception cref="ArgumentException"></exception> + public BlobCache(int maxCapacity) :base(StringComparer.Ordinal) { - IsReadOnly = false; + if(maxCapacity < 1) + { + throw new ArgumentException("The maxium capacity of the store must be a positive integer larger than 0", nameof(maxCapacity)); + } + MaxCapacity = maxCapacity; - SwapDir = swapDir; + //Update the lookup table size LookupTable.EnsureCapacity(maxCapacity); - //Set default heap if not specified - Heap = heap; - Log = log; } + ///<inheritdoc/> - protected override bool CacheMiss(string key, [NotNullWhen(true)] out MemoryHandle<byte>? value) + protected override bool CacheMiss(string key, out CacheEntry value) { - value = null; + value = default; return false; } + ///<inheritdoc/> - protected override void Evicted(KeyValuePair<string, MemoryHandle<byte>> evicted) + protected override void Evicted(ref KeyValuePair<string, CacheEntry> evicted) { - //Dispose the blob + //Dispose the cache item evicted.Value.Dispose(); } + /// <summary> - /// If the <see cref="BlobItem"/> is found in the store, changes the key + /// If the <see cref="CacheEntry"/> is found in the store, changes the key /// that referrences the blob. /// </summary> /// <param name="currentKey">The key that currently referrences the blob in the store</param> /// <param name="newKey">The new key that will referrence the blob</param> - /// <param name="blob">The <see cref="BlobItem"/> if its found in the store</param> + /// <param name="blob">The <see cref="CacheEntry"/> if its found in the store</param> /// <returns>True if the record was found and the key was changes</returns> - public bool TryChangeKey(string currentKey, string newKey, [NotNullWhen(true)] out MemoryHandle<byte>? blob) + public bool TryChangeKey(string currentKey, string newKey, out CacheEntry blob) { - if (LookupTable.Remove(currentKey, out LinkedListNode<KeyValuePair<string, MemoryHandle<byte>>>? node)) + //Try to get the node at the current key + if (LookupTable.Remove(currentKey, out LinkedListNode<KeyValuePair<string, CacheEntry>> ? node)) { //Remove the node from the ll List.Remove(node); - //Update the node kvp - blob = node.Value.Value; - node.Value = new KeyValuePair<string, MemoryHandle<byte>>(newKey, blob); + + //Get the stored blob + blob = node.ValueRef.Value; + + //Update the + node.Value = new KeyValuePair<string, CacheEntry>(newKey, blob); + //Add to end of list List.AddLast(node); + //Re-add to lookup table with new key LookupTable.Add(newKey, node); + return true; } - blob = null; + + blob = default; return false; } + /// <summary> - /// Removes the <see cref="BlobItem"/> from the store without disposing the blobl + /// Removes the <see cref="CacheEntry"/> from the store, and frees its resources /// </summary> - /// <param name="key">The key that referrences the <see cref="BlobItem"/> in the store</param> + /// <param name="key">The key that referrences the <see cref="CacheEntry"/> in the store</param> /// <returns>A value indicating if the blob was removed</returns> public override bool Remove(string key) { //Remove the item from the lookup table and if it exists, remove the node from the list - if (LookupTable.Remove(key, out LinkedListNode<KeyValuePair<string, MemoryHandle<byte>>>? node)) + if (!LookupTable.Remove(key, out LinkedListNode<KeyValuePair<string, CacheEntry>>? node)) + { + return false; + } + + //always dispose blob + using (node.ValueRef.Value) { - //Remove the new from the list + //Remove the node from the list List.Remove(node); - //dispose the buffer - node.Value.Value.Dispose(); - return true; } - return false; + return true; } + /// <summary> - /// Removes and disposes all blobl elements in cache (or in the backing store) + /// Removes all cache entires and disposes their held resources /// </summary> public override void Clear() { - foreach (MemoryHandle<byte> blob in List.Select(kp => kp.Value)) + //Start from first node + LinkedListNode<KeyValuePair<string, CacheEntry>>? node = List.First; + + //Classic ll node itteration + while(node != null) { - blob.Dispose(); + //Dispose the cache entry + node.ValueRef.Value.Dispose(); + + //Move to next node + node = node.Next; } + + //empty all cache entires in the store base.Clear(); } } diff --git a/lib/VNLib.Data.Caching.ObjectCache/src/BlobItem.cs b/lib/VNLib.Data.Caching.ObjectCache/src/BlobItem.cs deleted file mode 100644 index 728875f..0000000 --- a/lib/VNLib.Data.Caching.ObjectCache/src/BlobItem.cs +++ /dev/null @@ -1,207 +0,0 @@ -/* -* Copyright (c) 2022 Vaughn Nugent -* -* Library: VNLib -* Package: VNLib.Data.Caching.ObjectCache -* File: BlobItem.cs -* -* BlobItem.cs is part of VNLib.Data.Caching.ObjectCache which is part of the larger -* VNLib collection of libraries and utilities. -* -* VNLib.Data.Caching.ObjectCache is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License as -* published by the Free Software Foundation, either version 3 of the -* License, or (at your option) any later version. -* -* VNLib.Data.Caching.ObjectCache is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see https://www.gnu.org/licenses/. -*/ - -using System; -using System.IO; -using System.Threading; -using System.Threading.Tasks; - -using VNLib.Utils; -using VNLib.Utils.IO; -using VNLib.Utils.Memory; -using VNLib.Utils.Logging; -using VNLib.Utils.Extensions; - -namespace VNLib.Data.Caching -{ - /// <summary> - /// A general purpose binary storage item - /// </summary> - public class BlobItem //: VnDisposeable - { - /* - private static readonly JoinableTaskContext JTX = new(); - private static readonly Semaphore CentralSwapLock = new(Environment.ProcessorCount, Environment.ProcessorCount); - - private readonly VnMemoryStream _loadedData; - private bool _loaded; - - /// <summary> - /// The time the blob was last modified - /// </summary> - public DateTimeOffset LastAccessed { get; private set; } - - - /// <summary> - /// Gets the current size of the file (in bytes) as an atomic operation - /// </summary> - public int FileSize => (int)_loadedData.Length; - /// <summary> - /// The operation synchronization lock - /// </summary> - public AsyncReaderWriterLock OpLock { get; } - /// <summary> - /// Initializes a new <see cref="BlobItem"/> - /// </summary> - /// <param name="heap">The heap to allocate buffers from</param> - internal BlobItem(IUnmangedHeap heap) - { - _loadedData = new(heap); - OpLock = new AsyncReaderWriterLock(JTX); - _loaded = true; - LastAccessed = DateTimeOffset.UtcNow; - } - ///<inheritdoc/> - protected override void Free() - { - _loadedData.Dispose(); - OpLock.Dispose(); - } - - /// <summary> - /// Reads data from the internal buffer and copies it to the specified buffer. - /// Use the <see cref="FileSize"/> property to obtain the size of the internal buffer - /// </summary> - /// <param name="buffer">The buffer to copy data to</param> - /// <returns>When completed, the number of bytes copied to the buffer</returns> - public int Read(Span<byte> buffer) - { - //Make sure the blob has been swapped back into memory - if (!_loaded) - { - throw new InvalidOperationException("The blob was not loaded from the disk"); - } - //Read all data from the buffer and write it to the output buffer - _loadedData.AsSpan().CopyTo(buffer); - //Update last-accessed - LastAccessed = DateTimeOffset.UtcNow; - return (int)_loadedData.Length; - } - /// <summary> - /// Overwrites the internal buffer with the contents of the supplied buffer - /// </summary> - /// <param name="buffer">The buffer containing data to store within the blob</param> - /// <returns>A <see cref="ValueTask"/> that completes when write access has been granted and copied</returns> - /// <exception cref="InvalidOperationException"></exception> - public void Write(ReadOnlySpan<byte> buffer) - { - //Make sure the blob has been swapped back into memory - if (!_loaded) - { - throw new InvalidOperationException("The blob was not loaded from the disk"); - } - //Reset the buffer - _loadedData.SetLength(buffer.Length); - _loadedData.Seek(0, SeekOrigin.Begin); - _loadedData.Write(buffer); - LastAccessed = DateTimeOffset.UtcNow; - } - - /// <summary> - /// Writes the contents of the memory buffer to its designated file on the disk - /// </summary> - /// <param name="heap">The heap to allocate buffers from</param> - /// <param name="swapDir">The <see cref="IsolatedStorageDirectory"/> that stores the file</param> - /// <param name="filename">The name of the file to write data do</param> - /// <param name="log">A log to write errors to</param> - /// <returns>A task that completes when the swap to disk is complete</returns> - internal async Task SwapToDiskAsync(IUnmangedHeap heap, DirectoryInfo swapDir, string filename, ILogProvider log) - { - try - { - //Wait for write lock - await using (AsyncReaderWriterLock.Releaser releaser = await OpLock.WriteLockAsync()) - { - //Enter swap lock - await CentralSwapLock; - try - { - //Open swap file data stream - await using FileStream swapFile = swapDir.OpenFile(filename, FileMode.OpenOrCreate, FileAccess.ReadWrite, bufferSize: 8128); - //reset swap file - swapFile.SetLength(0); - //Seek loaded-data back to 0 before writing - _loadedData.Seek(0, SeekOrigin.Begin); - //Write loaded data to disk - await _loadedData.CopyToAsync(swapFile, 8128, heap); - } - finally - { - CentralSwapLock.Release(); - } - //Release memory held by stream - _loadedData.SetLength(0); - //Clear loaded flag - _loaded = false; - LastAccessed = DateTimeOffset.UtcNow; - } - log.Debug("Blob {name} swapped to disk", filename); - } - catch(Exception ex) - { - log.Error(ex, "Blob swap to disk error"); - } - } - /// <summary> - /// Reads the contents of the blob into a memory buffer from its designated file on disk - /// </summary> - /// <param name="heap">The heap to allocate buffers from</param> - /// <param name="swapDir">The <see cref="IsolatedStorageDirectory"/> that stores the file</param> - /// <param name="filename">The name of the file to write the blob data to</param> - /// <param name="log">A log to write errors to</param> - /// <returns>A task that completes when the swap from disk is complete</returns> - internal async Task SwapFromDiskAsync(IUnmangedHeap heap, DirectoryInfo swapDir, string filename, ILogProvider log) - { - try - { - //Wait for write lock - await using (AsyncReaderWriterLock.Releaser releaser = await OpLock.WriteLockAsync()) - { - //Enter swap lock - await CentralSwapLock; - try - { - //Open swap file data stream - await using FileStream swapFile = swapDir.OpenFile(filename, FileMode.OpenOrCreate, FileAccess.Read, bufferSize:8128); - //Copy from disk to memory - await swapFile.CopyToAsync(_loadedData, 8128, heap); - } - finally - { - CentralSwapLock.Release(); - } - //Set loaded flag - _loaded = true; - LastAccessed = DateTimeOffset.UtcNow; - } - log.Debug("Blob {name} swapped from disk", filename); - } - catch(Exception ex) - { - log.Error(ex, "Blob swap from disk error"); - } - } - */ - } -} diff --git a/lib/VNLib.Data.Caching.ObjectCache/src/CacheEntry.cs b/lib/VNLib.Data.Caching.ObjectCache/src/CacheEntry.cs new file mode 100644 index 0000000..8644d1d --- /dev/null +++ b/lib/VNLib.Data.Caching.ObjectCache/src/CacheEntry.cs @@ -0,0 +1,227 @@ +/* +* Copyright (c) 2022 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Data.Caching.ObjectCache +* File: BlobCache.cs +* +* BlobCache.cs is part of VNLib.Data.Caching.ObjectCache which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Data.Caching.ObjectCache is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Data.Caching.ObjectCache is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Buffers.Binary; +using System.Runtime.CompilerServices; + +using VNLib.Utils.Memory; +using VNLib.Utils.Extensions; + +namespace VNLib.Data.Caching +{ + /// <summary> + /// A structure that represents an item in cache + /// </summary> + public readonly struct CacheEntry : IDisposable, IEquatable<CacheEntry> + { + private const int TIME_SEGMENT_SIZE = sizeof(long); + + private const int LENGTH_SEGMENT_SIZE = sizeof(int); + + private const int DATA_SEGMENT_START = TIME_SEGMENT_SIZE + LENGTH_SEGMENT_SIZE; + + + //Only contain ref to backing handle to keep struct size small + private readonly MemoryHandle<byte> _handle; + + + /// <summary> + /// Creates a new <see cref="CacheEntry"/> and copies the initial data to the internal buffer + /// </summary> + /// <param name="data">The initial data to store</param> + /// <param name="heap">The heap to allocate the buffer from</param> + /// <returns>The new <see cref="CacheEntry"/></returns> + public static CacheEntry Create(ReadOnlySpan<byte> data, IUnmangedHeap heap) + { + //Calc buffer size + int bufferSize = GetRequiredHandleSize(data.Length); + + //Alloc buffer + MemoryHandle<byte> handle = heap.Alloc<byte>(bufferSize); + + //Create new entry from handle + CacheEntry entry = new (handle, data.Length); + + //Get the data segment + Span<byte> segment = entry.GetDataSegment(); + + //Copy data segment + data.CopyTo(segment); + + return entry; + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private static int GetRequiredHandleSize(int size) + { + //Caculate the minimum handle size to store all required information, rounded to nearest page + return (int)MemoryUtil.NearestPage(size + DATA_SEGMENT_START); + } + + private CacheEntry(MemoryHandle<byte> handle, int length) + { + _handle = handle; + //Store data length, assumes the handle is large enough to store it + SetLength(length); + } + + + ///<inheritdoc/> + public readonly void Dispose() => _handle.Dispose(); + + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private readonly Span<byte> GetTimeSegment() => _handle.AsSpan(0, TIME_SEGMENT_SIZE); + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private readonly Span<byte> GetLengthSegment() => _handle.AsSpan(TIME_SEGMENT_SIZE, LENGTH_SEGMENT_SIZE); + + /// <summary> + /// Gets the size of the block of memory held by the underlying handle + /// </summary> + /// <returns>The size of the block held by the current entry</returns> + /// <exception cref="ObjectDisposedException"></exception> + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public readonly nuint GetMemoryUsage() + { + _handle.ThrowIfClosed(); + return _handle.ByteLength; + } + + + /// <summary> + /// Gets the last set time + /// </summary> + /// <returns>The last date stored</returns> + /// <exception cref="ObjectDisposedException"></exception> + public readonly DateTime GetCreatedTime() + { + //Get the time segment and write the value in big endian + ReadOnlySpan<byte> segment = GetTimeSegment(); + + long ticks = BinaryPrimitives.ReadInt64BigEndian(segment); + + //ticks back to + return new(ticks); + } + + /// <summary> + /// Sets the last modified time + /// </summary> + /// <param name="time">The new time to set the handle to</param> + /// <exception cref="ObjectDisposedException"></exception> + public readonly void SetTime(DateTime time) + { + //Get native ticks value + long timeData = time.Ticks; + + //Get the time segment and write the value in big endian + Span<byte> segment = GetTimeSegment(); + + BinaryPrimitives.WriteInt64BigEndian(segment, timeData); + } + + /// <summary> + /// Gets the length of the data segment + /// </summary> + /// <returns>The length of the data segment</returns> + /// <exception cref="ObjectDisposedException"></exception> + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public readonly int GetLength() + { + //Get the length segment + ReadOnlySpan<byte> segment = GetLengthSegment(); + //Recover the integer + return BinaryPrimitives.ReadInt32BigEndian(segment); + } + + private readonly void SetLength(int length) + { + //Get the length segment + Span<byte> segment = GetLengthSegment(); + + //Update the length value + BinaryPrimitives.WriteInt32BigEndian(segment, length); + } + + /// <summary> + /// Gets the stored data segment + /// </summary> + /// <returns>The data segment</returns> + /// <exception cref="ObjectDisposedException"></exception> + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public readonly Span<byte> GetDataSegment() + { + //Get the actual length of the segment + int length = GetLength(); + //Get the segment from its begining offset and + return _handle.AsSpan(DATA_SEGMENT_START, length); + } + + /// <summary> + /// Writes the specified segment to the internal buffer and resizes the buffer if necessary. + /// This operation overwrites any previously stored data + /// </summary> + /// <param name="data">The data segment to store</param> + /// <exception cref="ObjectDisposedException"></exception> + public readonly void UpdateData(ReadOnlySpan<byte> data) + { + //Calc required buffer size + int bufferSize = GetRequiredHandleSize(data.Length); + + //Resize handle if required + _handle.ResizeIfSmaller(bufferSize); + + //Reset data length + SetLength(data.Length); + + //Get the data segment + Span<byte> segment = GetDataSegment(); + +#if DEBUG + //Test segment length is equvalent to the requested data length + System.Diagnostics.Debug.Assert(segment.Length == data.Length); +#endif + //Copy data segment + data.CopyTo(segment); + } + + + ///<inheritdoc/> + public override bool Equals(object? obj) => obj is CacheEntry entry && Equals(entry); + + ///<inheritdoc/> + public override int GetHashCode() => _handle.GetHashCode(); + + ///<inheritdoc/> + public static bool operator ==(CacheEntry left, CacheEntry right) => left.Equals(right); + + ///<inheritdoc/> + public static bool operator !=(CacheEntry left, CacheEntry right) => !(left == right); + + ///<inheritdoc/> + public bool Equals(CacheEntry other) => other.GetHashCode() == GetHashCode(); + } +} diff --git a/lib/VNLib.Data.Caching.ObjectCache/src/CacheListener.cs b/lib/VNLib.Data.Caching.ObjectCache/src/CacheListener.cs deleted file mode 100644 index 7f544e1..0000000 --- a/lib/VNLib.Data.Caching.ObjectCache/src/CacheListener.cs +++ /dev/null @@ -1,64 +0,0 @@ -/* -* Copyright (c) 2022 Vaughn Nugent -* -* Library: VNLib -* Package: VNLib.Data.Caching.ObjectCache -* File: CacheListener.cs -* -* CacheListener.cs is part of VNLib.Data.Caching.ObjectCache which is part of the larger -* VNLib collection of libraries and utilities. -* -* VNLib.Data.Caching.ObjectCache is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License as -* published by the Free Software Foundation, either version 3 of the -* License, or (at your option) any later version. -* -* VNLib.Data.Caching.ObjectCache is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see https://www.gnu.org/licenses/. -*/ - -using System; -using System.IO; - -using VNLib.Utils.Memory; -using VNLib.Net.Messaging.FBM.Server; - -namespace VNLib.Data.Caching -{ - /// <summary> - /// A base implementation of a memory/disk LRU data cache FBM listener - /// </summary> - public abstract class CacheListener : FBMListenerBase - { - /// <summary> - /// The directory swap files will be stored - /// </summary> - public DirectoryInfo? Directory { get; private set; } - /// <summary> - /// The Cache store to access data blobs - /// </summary> - protected BlobCache? Cache { get; private set; } - /// <summary> - /// The <see cref="IUnmangedHeap"/> to allocate buffers from - /// </summary> - protected IUnmangedHeap? Heap { get; private set; } - - /// <summary> - /// Initializes the <see cref="Cache"/> data store - /// </summary> - /// <param name="dir">The directory to swap cache records to</param> - /// <param name="cacheSize">The size of the LRU cache</param> - /// <param name="heap">The heap to allocate buffers from</param> - protected void InitCache(DirectoryInfo dir, int cacheSize, IUnmangedHeap heap) - { - Heap = heap; - Cache = new(dir, cacheSize, Log, Heap); - Directory = dir; - } - } -} diff --git a/lib/VNLib.Data.Caching.ObjectCache/src/ChangeEvent.cs b/lib/VNLib.Data.Caching.ObjectCache/src/ChangeEvent.cs index 7ee4427..2e8641e 100644 --- a/lib/VNLib.Data.Caching.ObjectCache/src/ChangeEvent.cs +++ b/lib/VNLib.Data.Caching.ObjectCache/src/ChangeEvent.cs @@ -27,11 +27,23 @@ namespace VNLib.Data.Caching.ObjectCache /// <summary> /// An event object that is passed when change events occur /// </summary> - public class ChangeEvent + public sealed class ChangeEvent { - public readonly string CurrentId; - public readonly string? AlternateId; - public readonly bool Deleted; + /// <summary> + /// The current id of the changed object + /// </summary> + public string CurrentId { get; } + + /// <summary> + /// The alternate id of the changed object if specified + /// </summary> + public string? AlternateId { get; } + + /// <summary> + /// A value that indicates if the object was deleted + /// </summary> + public bool Deleted { get; } + internal ChangeEvent(string id, string? alternate, bool deleted) { CurrentId = id; diff --git a/lib/VNLib.Data.Caching.ObjectCache/src/ObjectCacheStore.cs b/lib/VNLib.Data.Caching.ObjectCache/src/ObjectCacheStore.cs index 514d00b..05a4f02 100644 --- a/lib/VNLib.Data.Caching.ObjectCache/src/ObjectCacheStore.cs +++ b/lib/VNLib.Data.Caching.ObjectCache/src/ObjectCacheStore.cs @@ -34,6 +34,9 @@ using VNLib.Utils.Extensions; using VNLib.Net.Messaging.FBM.Server; using static VNLib.Data.Caching.Constants; + +#pragma warning disable CA1849 // Call async methods when in an async method + namespace VNLib.Data.Caching.ObjectCache { public delegate ReadOnlySpan<byte> GetBodyDataCallback<T>(T state); @@ -41,7 +44,7 @@ namespace VNLib.Data.Caching.ObjectCache /// <summary> /// A <see cref="FBMListener"/> implementation of a <see cref="CacheListener"/> /// </summary> - public class ObjectCacheStore : CacheListener, IDisposable + public class ObjectCacheStore : FBMListenerBase, IDisposable { private readonly SemaphoreSlim StoreLock; private bool disposedValue; @@ -55,6 +58,14 @@ namespace VNLib.Data.Caching.ObjectCache public AsyncQueue<ChangeEvent> EventQueue { get; } /// <summary> + /// The Cache store to access data blobs + /// </summary> + private readonly BlobCache Cache; + + private readonly IUnmangedHeap Heap; + + + /// <summary> /// Initialzies a new <see cref="ObjectCacheStore"/> /// </summary> /// <param name="dir">The <see cref="DirectoryInfo"/> to store blob files to</param> @@ -62,23 +73,25 @@ namespace VNLib.Data.Caching.ObjectCache /// <param name="log"></param> /// <param name="heap"></param> /// <param name="singleReader">A value that indicates if a single thread is processing events</param> - public ObjectCacheStore(DirectoryInfo dir, int cacheMax, ILogProvider log, IUnmangedHeap heap, bool singleReader) + public ObjectCacheStore(int cacheMax, ILogProvider log, IUnmangedHeap heap, bool singleReader) { Log = log; //We can use a single writer and single reader in this context EventQueue = new(true, singleReader); - InitCache(dir, cacheMax, heap); + Cache = new(cacheMax); + Heap = heap; InitListener(heap); StoreLock = new(1,1); } ///<inheritdoc/> - protected override async Task ProcessAsync(FBMContext context, object? userState, CancellationToken cancellationToken) + protected override Task ProcessAsync(FBMContext context, object? userState, CancellationToken exitToken) { try { //Get the action header string action = context.Method(); + //Optional newid header string? alternateId = context.NewObjectId(); @@ -88,82 +101,159 @@ namespace VNLib.Data.Caching.ObjectCache { //Get the object-id header string objectId = context.ObjectId(); - //Take lock on store - using SemSlimReleaser rel = await StoreLock.GetReleaserAsync(cancellationToken: cancellationToken); - if (Cache!.TryGetValue(objectId, out MemoryHandle<byte>? data)) + + //Try read sync + if (StoreLock.Wait(0)) { - //Set the status code and write the buffered data to the response buffer - context.CloseResponse(ResponseCodes.Okay); - //Copy data to response buffer - context.Response.WriteBody(data.Span); + try + { + UnsafeReadEntry(context, objectId); + } + finally + { + StoreLock.Release(); + } + + return Task.CompletedTask; } else { - context.CloseResponse(ResponseCodes.NotFound); + //Read entry async + return InternalReadEntryAsync(context, objectId, exitToken); } } - break; + case Actions.AddOrUpdate: { //Get the object-id header string objectId = context.ObjectId(); - //Add/update a blob async - await AddOrUpdateBlobAsync(objectId, alternateId, static context => context.Request.BodyData, context); - //Notify update the event bus - await EventQueue.EnqueueAsync(new(objectId, alternateId, false), cancellationToken); - //Set status code - context.CloseResponse(ResponseCodes.Okay); + + //Create change event for the object + ChangeEvent change = new(objectId, alternateId, false); + + //Attempt to aquire lock sync + if (StoreLock.Wait(0)) + { + //aquired sync + try + { + //Update the item + UnsafeAddOrUpdate(objectId, alternateId, GetBodyData, context); + } + finally + { + StoreLock.Release(); + } + + //Add to event queue + EnqueEvent(change); + + //Set status code + context.CloseResponse(ResponseCodes.Okay); + + return Task.CompletedTask; + } + else + { + //Lock will be awaited async and + return InternalAddOrUpdateAsync(context, change, exitToken); + } } - break; case Actions.Delete: { //Get the object-id header string objectId = context.ObjectId(); - - if (await DeleteItemAsync(objectId)) + + //Create change event + ChangeEvent change = new(objectId, alternateId, true); + + //See if lock can be entered without waiting + if (StoreLock.Wait(0)) { - //Notify deleted - await EventQueue.EnqueueAsync(new(objectId, null, true), cancellationToken); - //Set status header - context.CloseResponse(ResponseCodes.Okay); + bool found = false; + + try + { + //Sync + found = UnsafeDeleteEntry(objectId); + } + finally + { + StoreLock.Release(); + } + + //Notify change + EnqueEvent(change); + + //Set status code if found + context.CloseResponse(found ? ResponseCodes.Okay : ResponseCodes.NotFound); + + return Task.CompletedTask; } else { - //Set status header - context.CloseResponse(ResponseCodes.NotFound); + //lock will yeild async + return InternalDeleteAsync(context, change, exitToken); } } - break; // event queue dequeue request case Actions.Dequeue: { + static void SetResponse(ChangeEvent change, FBMContext context) + { + if (change.Deleted) + { + context.CloseResponse("deleted"); + context.Response.WriteHeader(ObjectId, change.CurrentId); + } + else + { + //Changed + context.CloseResponse("modified"); + context.Response.WriteHeader(ObjectId, change.CurrentId); + + //Set old id if an old id is set + if (change.CurrentId != null) + { + context.Response.WriteHeader(NewObjectId, change.AlternateId); + } + } + } + + static async Task DequeAsync(AsyncQueue<ChangeEvent> queue, FBMContext context, CancellationToken exitToken) + { + //Wait for a new message to process + ChangeEvent ev = await queue.DequeueAsync(exitToken); + + //Set the response + SetResponse(ev, context); + } + //If no event bus is registered, then this is not a legal command if (userState is not AsyncQueue<ChangeEvent> eventBus) { context.CloseResponse(ResponseCodes.NotFound); - break; + + return Task.CompletedTask; } - //Wait for a new message to process - ChangeEvent ev = await eventBus.DequeueAsync(cancellationToken); - if (ev.Deleted) + + //try to deq without awaiting + if (eventBus.TryDequeue(out ChangeEvent? change)) { - context.CloseResponse("deleted"); - context.Response.WriteHeader(ObjectId, ev.CurrentId); + SetResponse(change, context); + + return Task.CompletedTask; } else { - //Changed - context.CloseResponse("modified"); - context.Response.WriteHeader(ObjectId, ev.CurrentId); - //Set old id if an old id is set - if (ev.CurrentId != null) - { - context.Response.WriteHeader(NewObjectId, ev.AlternateId); - } + //Process async + return DequeAsync(eventBus, context, exitToken); } } - break; + } + + Log.Error("Unhandled cache event!"); } catch (OperationCanceledException) { @@ -175,68 +265,178 @@ namespace VNLib.Data.Caching.ObjectCache Log.Error(ex); context.CloseResponse(ResponseCodes.Error); } + + return Task.CompletedTask; } + + + private static ReadOnlySpan<byte> GetBodyData(FBMContext ctx) => ctx.Request.BodyData; - /// <summary> - /// Asynchronously deletes a previously stored item - /// </summary> - /// <param name="id">The id of the object to delete</param> - /// <returns>A task that completes when the item has been deleted</returns> - public async Task<bool> DeleteItemAsync(string id) + private void EnqueEvent(ChangeEvent change) { - using SemSlimReleaser rel = await StoreLock.GetReleaserAsync(); - return Cache!.Remove(id); + if (!EventQueue.TryEnque(change)) + { + Log.Warn("Change event {ev} was not enqued because the event queue is overflowing!", change.CurrentId); + } } - - /// <summary> - /// Asynchronously adds or updates an object in the store and optionally update's its id - /// </summary> - /// <param name="objectId">The current (or old) id of the object</param> - /// <param name="alternateId">An optional id to update the blob to</param> - /// <param name="bodyData">A callback that returns the data for the blob</param> - /// <param name="state">The state parameter to pass to the data callback</param> - /// <returns></returns> - public async Task AddOrUpdateBlobAsync<T>(string objectId, string? alternateId, GetBodyDataCallback<T> bodyData, T state) + + private void UnsafeReadEntry(FBMContext context, string objectId) { - MemoryHandle<byte>? blob; + if (Cache!.TryGetValue(objectId, out CacheEntry data)) + { + //Set the status code and write the buffered data to the response buffer + context.CloseResponse(ResponseCodes.Okay); + + //Copy data to response buffer + context.Response.WriteBody(data.GetDataSegment()); + } + else + { + context.CloseResponse(ResponseCodes.NotFound); + } + } + + async Task InternalReadEntryAsync(FBMContext context, string objectId, CancellationToken cancellation) + { + //enter lock async + using SemSlimReleaser rel = await StoreLock.GetReleaserAsync(cancellation); + + UnsafeReadEntry(context, objectId); + } + + private async Task InternalAddOrUpdateAsync(FBMContext context, ChangeEvent change, CancellationToken cancellation) + { + //Wait for lock since we know it will yeild async + using (SemSlimReleaser rel = await StoreLock.GetReleaserAsync(cancellation)) + { + UnsafeAddOrUpdate(change.CurrentId, change.AlternateId, GetBodyData, context); + } + + //Add to event queue + EnqueEvent(change); + + //Set status code + context.CloseResponse(ResponseCodes.Okay); + } + + private void UnsafeAddOrUpdate<T>(string objectId, string? alternateId, GetBodyDataCallback<T> bodyData, T state) + { + CacheEntry entry; + //See if new/alt session id was specified if (string.IsNullOrWhiteSpace(alternateId)) { - //Take lock on store - using SemSlimReleaser rel = await StoreLock.GetReleaserAsync(); //See if blob exists - if (!Cache!.TryGetValue(objectId, out blob)) + if (!Cache!.TryGetValue(objectId, out entry)) { - //If not, create new blob and add to store - blob = Heap.AllocAndCopy(bodyData(state)); - Cache.Add(objectId, blob); + //Create the new cache entry since it does not exist + entry = CacheEntry.Create(bodyData(state), Heap); + + //Add to cache + Cache.Add(objectId, entry); } else { //Reset the buffer state - blob.WriteAndResize(bodyData(state)); + entry.UpdateData(bodyData(state)); } } //Need to change the id of the record else { - //Take lock on store - using SemSlimReleaser rel = await StoreLock.GetReleaserAsync(); //Try to change the blob key - if (!Cache!.TryChangeKey(objectId, alternateId, out blob)) + if (!Cache!.TryChangeKey(objectId, alternateId, out entry)) { - //Blob not found, create new blob - blob = Heap.AllocAndCopy(bodyData(state)); - Cache.Add(alternateId, blob); + //Create the new cache entry since it does not exist + entry = CacheEntry.Create(bodyData(state), Heap); + + //Add to cache by its alternate id + Cache.Add(alternateId, entry); } else { //Reset the buffer state - blob.WriteAndResize(bodyData(state)); + entry.UpdateData(bodyData(state)); } } + + //Update modified time to current utc time + entry.SetTime(DateTime.UtcNow); + } + + private async Task InternalDeleteAsync(FBMContext context, ChangeEvent change, CancellationToken cancellation) + { + bool found = false; + + //enter the lock + using(SemSlimReleaser rel = await StoreLock.GetReleaserAsync(cancellation)) + { + //Sync + found = UnsafeDeleteEntry(change.CurrentId); + } + + //Notify change + EnqueEvent(change); + + //Set status code if found + context.CloseResponse(found ? ResponseCodes.Okay : ResponseCodes.NotFound); } - + + private bool UnsafeDeleteEntry(string id) => Cache!.Remove(id); + + + /// <summary> + /// Asynchronously adds or updates an object in the store and optionally update's its id + /// </summary> + /// <param name="objectId">The current (or old) id of the object</param> + /// <param name="alternateId">An optional id to update the blob to</param> + /// <param name="bodyData">A callback that returns the data for the blob</param> + /// <param name="state">The state parameter to pass to the data callback</param> + /// <param name="token">A token to cancel the async operation</param> + /// <returns>A value task that represents the async operation</returns> + public async ValueTask AddOrUpdateBlobAsync<T>(string objectId, string? alternateId, GetBodyDataCallback<T> bodyData, T state, CancellationToken token = default) + { + //Test the lock before waiting async + if (!StoreLock.Wait(0)) + { + //Wait async to avoid task alloc + await StoreLock.WaitAsync(token); + } + try + { + UnsafeAddOrUpdate(objectId, alternateId, bodyData, state); + } + finally + { + StoreLock.Release(); + } + } + + /// <summary> + /// Asynchronously deletes a previously stored item + /// </summary> + /// <param name="id">The id of the object to delete</param> + /// <param name="token">A token to cancel the async lock await</param> + /// <returns>A task that completes when the item has been deleted</returns> + public async ValueTask<bool> DeleteItemAsync(string id, CancellationToken token = default) + { + //Test the lock before waiting async + if (!StoreLock.Wait(0)) + { + //Wait async to avoid task alloc + await StoreLock.WaitAsync(token); + } + try + { + return UnsafeDeleteEntry(id); + } + finally + { + StoreLock.Release(); + } + } + + ///<inheritdoc/> protected virtual void Dispose(bool disposing) { @@ -252,6 +452,7 @@ namespace VNLib.Data.Caching.ObjectCache disposedValue = true; } } + ///<inheritdoc/> public void Dispose() { diff --git a/lib/VNLib.Data.Caching/src/ClientExtensions.cs b/lib/VNLib.Data.Caching/src/ClientExtensions.cs index 0a24a83..353bf67 100644 --- a/lib/VNLib.Data.Caching/src/ClientExtensions.cs +++ b/lib/VNLib.Data.Caching/src/ClientExtensions.cs @@ -105,16 +105,16 @@ namespace VNLib.Data.Caching response.ThrowIfNotSet(); //Get the status code - ReadOnlyMemory<char> status = response.Headers.FirstOrDefault(static a => a.Key == HeaderCommand.Status).Value; + FBMMessageHeader status = response.Headers.FirstOrDefault(static a => a.Header == HeaderCommand.Status); //Check ok status code, then its safe to deserialize - if (status.Span.Equals(ResponseCodes.Okay, StringComparison.Ordinal)) + if (status.Value.Equals(ResponseCodes.Okay, StringComparison.Ordinal)) { return JsonSerializer.Deserialize<T>(response.ResponseBody, LocalOptions); } //Object may not exist on the server yet - if (status.Span.Equals(ResponseCodes.NotFound, StringComparison.Ordinal)) + if (status.Value.Equals(ResponseCodes.NotFound, StringComparison.Ordinal)) { return default; } @@ -181,14 +181,14 @@ namespace VNLib.Data.Caching response.ThrowIfNotSet(); //Get the status code - ReadOnlyMemory<char> status = response.Headers.FirstOrDefault(static a => a.Key == HeaderCommand.Status).Value; + FBMMessageHeader status = response.Headers.FirstOrDefault(static a => a.Header == HeaderCommand.Status); //Check status code - if (status.Span.Equals(ResponseCodes.Okay, StringComparison.OrdinalIgnoreCase)) + if (status.Value.Equals(ResponseCodes.Okay, StringComparison.OrdinalIgnoreCase)) { return; } - else if(status.Span.Equals(ResponseCodes.NotFound, StringComparison.OrdinalIgnoreCase)) + else if(status.Value.Equals(ResponseCodes.NotFound, StringComparison.OrdinalIgnoreCase)) { throw new ObjectNotFoundException($"object {objectId} not found on remote server"); } @@ -236,13 +236,13 @@ namespace VNLib.Data.Caching response.ThrowIfNotSet(); //Get the status code - ReadOnlyMemory<char> status = response.Headers.FirstOrDefault(static a => a.Key == HeaderCommand.Status).Value; + FBMMessageHeader status = response.Headers.FirstOrDefault(static a => a.Header == HeaderCommand.Status); - if (status.Span.Equals(ResponseCodes.Okay, StringComparison.Ordinal)) + if (status.Value.Equals(ResponseCodes.Okay, StringComparison.Ordinal)) { return; } - else if(status.Span.Equals(ResponseCodes.NotFound, StringComparison.OrdinalIgnoreCase)) + else if(status.Value.Equals(ResponseCodes.NotFound, StringComparison.OrdinalIgnoreCase)) { throw new ObjectNotFoundException($"object {objectId} not found on remote server"); } @@ -277,9 +277,9 @@ namespace VNLib.Data.Caching return new() { - Status = response.Headers.FirstOrDefault(static a => a.Key == HeaderCommand.Status).Value.ToString(), - CurrentId = response.Headers.SingleOrDefault(static v => v.Key == Constants.ObjectId).Value.ToString(), - NewId = response.Headers.SingleOrDefault(static v => v.Key == Constants.NewObjectId).Value.ToString() + Status = response.Headers.FirstOrDefault(static a => a.Header == HeaderCommand.Status).Value.ToString(), + CurrentId = response.Headers.SingleOrDefault(static v => v.Header == Constants.ObjectId).Value.ToString(), + NewId = response.Headers.SingleOrDefault(static v => v.Header == Constants.NewObjectId).Value.ToString() }; } finally @@ -297,7 +297,7 @@ namespace VNLib.Data.Caching [MethodImpl(MethodImplOptions.AggressiveInlining)] public static string ObjectId(this FBMContext context) { - return context.Request.Headers.First(static kvp => kvp.Key == Constants.ObjectId).Value.ToString(); + return context.Request.Headers.First(static kvp => kvp.Header == Constants.ObjectId).Value.ToString(); } /// <summary> @@ -308,7 +308,7 @@ namespace VNLib.Data.Caching [MethodImpl(MethodImplOptions.AggressiveInlining)] public static string? NewObjectId(this FBMContext context) { - return context.Request.Headers.FirstOrDefault(static kvp => kvp.Key == Constants.NewObjectId).Value.ToString(); + return context.Request.Headers.FirstOrDefault(static kvp => kvp.Header == Constants.NewObjectId).Value.ToString(); } /// <summary> @@ -319,7 +319,7 @@ namespace VNLib.Data.Caching [MethodImpl(MethodImplOptions.AggressiveInlining)] public static string Method(this FBMContext context) { - return context.Request.Headers.First(static kvp => kvp.Key == HeaderCommand.Action).Value.ToString(); + return context.Request.Headers.First(static kvp => kvp.Header == HeaderCommand.Action).Value.ToString(); } /// <summary> diff --git a/lib/VNLib.Plugins.Extensions.VNCache/src/VNCacheExtensions.cs b/lib/VNLib.Plugins.Extensions.VNCache/src/VNCacheExtensions.cs index e5e6ee3..75b9bd4 100644 --- a/lib/VNLib.Plugins.Extensions.VNCache/src/VNCacheExtensions.cs +++ b/lib/VNLib.Plugins.Extensions.VNCache/src/VNCacheExtensions.cs @@ -67,7 +67,7 @@ namespace VNLib.Plugins.Extensions.VNCache VnCacheClient client = new(debugLog); //Begin cache connections by scheduling a task on the plugin's scheduler - _ = pbase.DeferTask(() => RunClientAsync(pbase, config, localized, client), 250); + _ = pbase.ObserveTask(() => RunClientAsync(pbase, config, localized, client), 250); return client; } diff --git a/plugins/CacheBroker/src/CacheBroker.csproj b/plugins/CacheBroker/src/CacheBroker.csproj index 285d797..fa28dce 100644 --- a/plugins/CacheBroker/src/CacheBroker.csproj +++ b/plugins/CacheBroker/src/CacheBroker.csproj @@ -2,21 +2,15 @@ <PropertyGroup> <TargetFramework>net6.0</TargetFramework> - <Copyright>Copyright © 2023 Vaughn Nugent</Copyright> <RootNamespace>VNLib.Plugins.Cache.Broker</RootNamespace> - <Authors>Vaughn Nugent</Authors> <Version>1.0.1.2</Version> + <GenerateDocumentationFile>True</GenerateDocumentationFile> + <AnalysisLevel>latest-all</AnalysisLevel> <SignAssembly>True</SignAssembly> <AssemblyOriginatorKeyFile>\\vaughnnugent.com\Internal\Folder Redirection\vman\Documents\Programming\Software\StrongNameingKey.snk</AssemblyOriginatorKeyFile> </PropertyGroup> <ItemGroup> - <Compile Remove="liveplugin\**" /> - <EmbeddedResource Remove="liveplugin\**" /> - <None Remove="liveplugin\**" /> - </ItemGroup> - - <ItemGroup> <PackageReference Include="ErrorProne.NET.CoreAnalyzers" Version="0.1.2"> <PrivateAssets>all</PrivateAssets> <IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets> @@ -26,14 +20,13 @@ <IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets> </PackageReference> </ItemGroup> - + <PropertyGroup> <!--Enable dynamic loading--> <EnableDynamicLoading>true</EnableDynamicLoading> - <GenerateDocumentationFile>True</GenerateDocumentationFile> + <Authors>Vaughn Nugent</Authors> + <Copyright>Copyright © 2023 Vaughn Nugent</Copyright> <PackageProjectUrl>https://www.vaughnnugent.com/resources/software</PackageProjectUrl> - <AnalysisLevel>latest-all</AnalysisLevel> - </PropertyGroup> <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'"> @@ -50,12 +43,5 @@ <ProjectReference Include="..\..\..\..\..\core\lib\Utils\src\VNLib.Utils.csproj" /> <ProjectReference Include="..\..\..\..\Extensions\lib\VNLib.Plugins.Extensions.Loading\src\VNLib.Plugins.Extensions.Loading.csproj" /> </ItemGroup> - - - <ItemGroup> - <None Update="CacheBroker.json"> - <CopyToOutputDirectory>Always</CopyToOutputDirectory> - </None> - </ItemGroup> - + </Project> diff --git a/plugins/CacheBroker/src/Endpoints/BrokerRegistrationEndpoint.cs b/plugins/CacheBroker/src/Endpoints/BrokerRegistrationEndpoint.cs index c2e0b84..8f983ac 100644 --- a/plugins/CacheBroker/src/Endpoints/BrokerRegistrationEndpoint.cs +++ b/plugins/CacheBroker/src/Endpoints/BrokerRegistrationEndpoint.cs @@ -56,11 +56,12 @@ using VNLib.Net.Rest.Client; namespace VNLib.Plugins.Cache.Broker.Endpoints { [ConfigurationName("broker_endpoint")] - public sealed class BrokerRegistrationEndpoint : ResourceEndpointBase + public sealed class BrokerRegistrationEndpoint : ResourceEndpointBase, IDisposable { const string HEARTBEAT_PATH = "/heartbeat"; - private static readonly RestClientPool ClientPool = new(10,new RestClientOptions() + //Client pool is instance based and may be disposed when the plugin is unloaded + private readonly RestClientPool ClientPool = new(10,new RestClientOptions() { Encoding = Encoding.UTF8, FollowRedirects = false, @@ -69,6 +70,7 @@ namespace VNLib.Plugins.Cache.Broker.Endpoints }, null); + //Matches the json schema set by the FBM caching extensions library private class ActiveServer { [JsonIgnore] @@ -408,5 +410,12 @@ namespace VNLib.Plugins.Cache.Broker.Endpoints _ = ActiveServers.Remove(server.ServerId!); } } + + + void IDisposable.Dispose() + { + //Cleanup client pool when exiting + ClientPool.Dispose(); + } } } diff --git a/plugins/ObjectCacheServer/src/Endpoints/BrokerHeartBeat.cs b/plugins/ObjectCacheServer/src/Endpoints/BrokerHeartBeat.cs index bd1233e..3930f90 100644 --- a/plugins/ObjectCacheServer/src/Endpoints/BrokerHeartBeat.cs +++ b/plugins/ObjectCacheServer/src/Endpoints/BrokerHeartBeat.cs @@ -30,12 +30,14 @@ using System.Threading; using System.Threading.Tasks; using System.Collections.Generic; +using VNLib.Plugins; +using VNLib.Plugins.Essentials; using VNLib.Hashing.IdentityUtility; using VNLib.Plugins.Essentials.Endpoints; using VNLib.Plugins.Essentials.Extensions; using VNLib.Plugins.Extensions.Loading; -namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints +namespace VNLib.Data.Caching.ObjectCache.Server { internal sealed class BrokerHeartBeat : ResourceEndpointBase { diff --git a/plugins/ObjectCacheServer/src/Endpoints/CacheConfiguration.cs b/plugins/ObjectCacheServer/src/Endpoints/CacheConfiguration.cs new file mode 100644 index 0000000..e9584b6 --- /dev/null +++ b/plugins/ObjectCacheServer/src/Endpoints/CacheConfiguration.cs @@ -0,0 +1,54 @@ +/* +* Copyright (c) 2022 Vaughn Nugent +* +* Library: VNLib +* Package: ObjectCacheServer +* File: ConnectEndpoint.cs +* +* ConnectEndpoint.cs is part of ObjectCacheServer which is part of the larger +* VNLib collection of libraries and utilities. +* +* ObjectCacheServer is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* ObjectCacheServer is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System.Text.Json.Serialization; + +namespace VNLib.Data.Caching.ObjectCache.Server +{ + internal sealed class CacheConfiguration + { + [JsonPropertyName("buffer_recv_max")] + public int MaxRecvBufferSize { get; set; } = 1000 * 1024; + [JsonPropertyName("buffer_recv_min")] + public int MinRecvBufferSize { get; set; } = 8 * 1024; + + + [JsonPropertyName("buffer_header_max")] + public int MaxHeaderBufferSize { get; set; } = 2 * 1024; + [JsonPropertyName("buffer_header_min")] + public int MinHeaderBufferSize { get; set; } = 128; + + + [JsonPropertyName("max_message_size")] + public int MaxMessageSize { get; set; } = 1000 * 1024; + + + [JsonPropertyName("change_queue_max_depth")] + public int MaxEventQueueDepth { get; set; } = 10 * 1000; + + + [JsonPropertyName("max_cache")] + public int MaxCacheEntries { get; set; } = 10000; + } +} diff --git a/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs b/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs index 15cc086..9a1ece0 100644 --- a/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs +++ b/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs @@ -31,34 +31,29 @@ using System.Threading.Channels; using System.Collections.Generic; using System.Collections.Concurrent; -using VNLib.Net.Http; +using VNLib.Plugins; using VNLib.Hashing; +using VNLib.Net.Http; using VNLib.Utils.Async; +using VNLib.Utils.Memory; using VNLib.Utils.Logging; using VNLib.Hashing.IdentityUtility; using VNLib.Net.Messaging.FBM; using VNLib.Net.Messaging.FBM.Client; using VNLib.Net.Messaging.FBM.Server; -using VNLib.Data.Caching.ObjectCache; +using VNLib.Plugins.Essentials; using VNLib.Plugins.Extensions.Loading; using VNLib.Plugins.Essentials.Endpoints; using VNLib.Plugins.Essentials.Extensions; +using System.Text.Json.Serialization; - -namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints +namespace VNLib.Data.Caching.ObjectCache.Server { - internal sealed class ConnectEndpoint : ResourceEndpointBase - { - const int MAX_RECV_BUF_SIZE = 1000 * 1024; - const int MIN_RECV_BUF_SIZE = 8 * 1024; - const int MAX_HEAD_BUF_SIZE = 2048; - const int MIN_MESSAGE_SIZE = 10 * 1024; - const int MAX_MESSAGE_SIZE = 1000 * 1024; - const int MIN_HEAD_BUF_SIZE = 128; - const int MAX_EVENT_QUEUE_SIZE = 10000; - const int MAX_RESPONSE_BUFFER_SIZE = 10 * 1024; - private static readonly TimeSpan AuthTokenExpiration = TimeSpan.FromSeconds(30); + [ConfigurationName("store")] + internal sealed class ConnectEndpoint : ResourceEndpointBase, IDisposable, IAsyncBackgroundWork + { + private static readonly TimeSpan AuthTokenExpiration = TimeSpan.FromSeconds(30); private readonly string AudienceLocalServerId; private readonly ObjectCacheStore Store; @@ -68,8 +63,16 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints private uint _connectedClients; + /// <summary> + /// Gets the number of active connections + /// </summary> public uint ConnectedClients => _connectedClients; + /// <summary> + /// The cache store configuration + /// </summary> + public CacheConfiguration CacheConfig { get; } + //Loosen up protection settings protected override ProtectionSettings EndpointProtectionSettings { get; } = new() { @@ -78,20 +81,89 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints DisableCrossSiteDenied = true }; - public ConnectEndpoint(string path, ObjectCacheStore store, PluginBase pbase) + public ConnectEndpoint(PluginBase plugin, IReadOnlyDictionary<string, JsonElement> config) { - InitPathAndLog(path, pbase.Log); - Store = store;//Load client public key to verify signed messages - Pbase = pbase; + string? path = config["path"].GetString(); + InitPathAndLog(path, plugin.Log); + + Pbase = plugin; + + //Parse cache config or use default + if(config.TryGetValue("cache", out JsonElement confEl)) + { + CacheConfig = confEl.Deserialize<CacheConfiguration>()!; + } + else + { + //Init default config if not fount + CacheConfig = new(); + + Log.Verbose("Loading default cache buffer configuration"); + } + + //Create event queue client lookup table StatefulEventQueue = new(StringComparer.OrdinalIgnoreCase); - //Start the queue worker - _ = pbase.DeferTask(() => ChangeWorkerAsync(pbase.UnloadToken), 10); + //Init the cache store + Store = InitializeCache((ObjectCacheServerEntry)plugin, CacheConfig.MaxCacheEntries); + /* + * Generate a random guid for the current server when created so we + * know client tokens belong to us when singed by the same key + */ AudienceLocalServerId = Guid.NewGuid().ToString("N"); + + //Schedule the queue worker to be run + _ = plugin.ObserveWork(this, 100); + } + + private static ObjectCacheStore InitializeCache(ObjectCacheServerEntry plugin, int maxCache) + { + if(maxCache < 2) + { + throw new ArgumentException("You must configure a 'max_cache' size larger than 1 item"); + } + + //Suggestion + if(maxCache < 200) + { + plugin.Log.Information("Suggestion: You may want a larger cache size, you have less than 200 items in cache"); + } + + //Endpoint only allows for a single reader + return new (maxCache, plugin.Log, plugin.CacheHeap, true); } + /// <summary> + /// Gets the configured cache store + /// </summary> + /// <returns></returns> + public ICacheStore GetCacheStore() => new CacheStore(Store); + + + //Dispose will be called by the host plugin on unload + void IDisposable.Dispose() + { + //Dispose the store on cleanup + Store.Dispose(); + } + + + private async Task<ReadOnlyJsonWebKey> GetClientPubAsync() + { + return await Pbase.TryGetSecretAsync("client_public_key").ToJsonWebKey() ?? throw new KeyNotFoundException("Missing required secret : client_public_key"); + } + private async Task<ReadOnlyJsonWebKey> GetCachePubAsync() + { + return await Pbase.TryGetSecretAsync("cache_public_key").ToJsonWebKey() ?? throw new KeyNotFoundException("Missing required secret : client_public_key"); + } + private async Task<ReadOnlyJsonWebKey> GetCachePrivateKeyAsync() + { + return await Pbase.TryGetSecretAsync("cache_private_key").ToJsonWebKey() ?? throw new KeyNotFoundException("Missing required secret : client_public_key"); + } + + /* * Used as a client negotiation and verification request * @@ -132,7 +204,7 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints { verified = true; } - //May be signed by a cahce server + //May be signed by a cache server else { using ReadOnlyJsonWebKey cacheCert = await GetCachePubAsync(); @@ -163,8 +235,10 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints } Log.Debug("Received negotiation request from node {node}", nodeId); + //Verified, now we can create an auth message with a short expiration using JsonWebToken auth = new(); + //Sign the auth message from the cache certificate's private key using (ReadOnlyJsonWebKey cert = await GetCachePrivateKeyAsync()) { @@ -179,9 +253,9 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints //Specify the server's node id if set .AddClaim("sub", nodeId!) //Add negotiaion args - .AddClaim(FBMClient.REQ_HEAD_BUF_QUERY_ARG, MAX_HEAD_BUF_SIZE) - .AddClaim(FBMClient.REQ_RECV_BUF_QUERY_ARG, MAX_RECV_BUF_SIZE) - .AddClaim(FBMClient.REQ_MAX_MESS_QUERY_ARG, MAX_MESSAGE_SIZE) + .AddClaim(FBMClient.REQ_HEAD_BUF_QUERY_ARG, CacheConfig.MaxHeaderBufferSize) + .AddClaim(FBMClient.REQ_RECV_BUF_QUERY_ARG, CacheConfig.MaxRecvBufferSize) + .AddClaim(FBMClient.REQ_MAX_MESS_QUERY_ARG, CacheConfig.MaxMessageSize) .CommitClaims(); auth.SignFromJwk(cert); @@ -192,27 +266,17 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints return VfReturnType.VirtualSkip; } - private async Task<ReadOnlyJsonWebKey> GetClientPubAsync() - { - return await Pbase.TryGetSecretAsync("client_public_key").ToJsonWebKey() ?? throw new KeyNotFoundException("Missing required secret : client_public_key"); - } - private async Task<ReadOnlyJsonWebKey> GetCachePubAsync() - { - return await Pbase.TryGetSecretAsync("cache_public_key").ToJsonWebKey() ?? throw new KeyNotFoundException("Missing required secret : client_public_key"); - } - private async Task<ReadOnlyJsonWebKey> GetCachePrivateKeyAsync() - { - return await Pbase.TryGetSecretAsync("cache_private_key").ToJsonWebKey() ?? throw new KeyNotFoundException("Missing required secret : client_public_key"); - } - private async Task ChangeWorkerAsync(CancellationToken cancellation) + //Background worker to process event queue items + async Task IAsyncBackgroundWork.DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken) { try { //Listen for changes while (true) { - ChangeEvent ev = await Store.EventQueue.DequeueAsync(cancellation); + ChangeEvent ev = await Store.EventQueue.DequeueAsync(exitToken); + //Add event to queues foreach (AsyncQueue<ChangeEvent> queue in StatefulEventQueue.Values) { @@ -224,10 +288,8 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints } } catch (OperationCanceledException) - { } - catch (Exception ex) { - Log.Error(ex); + //Normal exit } } @@ -238,6 +300,12 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints public int MaxMessageSize { get; init; } public int MaxResponseBufferSize { get; init; } public AsyncQueue<ChangeEvent>? SyncQueue { get; init; } + + public override string ToString() + { + return + $"{nameof(RecvBufferSize)}:{RecvBufferSize}, {nameof(MaxHeaderBufferSize)}: {MaxHeaderBufferSize}, {nameof(MaxMessageSize)}:{MaxMessageSize}, {nameof(MaxResponseBufferSize)}:{MaxResponseBufferSize}"; + } } protected override async ValueTask<VfReturnType> WebsocketRequestedAsync(HttpEntity entity) @@ -246,6 +314,7 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints { //Parse jwt from authorization string? jwtAuth = entity.Server.Headers[HttpRequestHeader.Authorization]; + if (string.IsNullOrWhiteSpace(jwtAuth)) { entity.CloseResponse(HttpStatusCode.Unauthorized); @@ -253,6 +322,7 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints } string? nodeId = null; + //Parse jwt using (JsonWebToken jwt = JsonWebToken.Parse(jwtAuth)) { @@ -301,11 +371,12 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints string maxMessageSizeCmd = entity.QueryArgs[FBMClient.REQ_MAX_MESS_QUERY_ARG]; //Parse recv buffer size - int recvBufSize = int.TryParse(recvBufCmd, out int rbs) ? rbs : MIN_RECV_BUF_SIZE; - int maxHeadBufSize = int.TryParse(maxHeaderCharCmd, out int hbs) ? hbs : MIN_HEAD_BUF_SIZE; - int maxMessageSize = int.TryParse(maxMessageSizeCmd, out int mxs) ? mxs : MIN_MESSAGE_SIZE; + int recvBufSize = int.TryParse(recvBufCmd, out int rbs) ? rbs : CacheConfig.MinRecvBufferSize; + int maxHeadBufSize = int.TryParse(maxHeaderCharCmd, out int hbs) ? hbs : CacheConfig.MinHeaderBufferSize; + int maxMessageSize = int.TryParse(maxMessageSizeCmd, out int mxs) ? mxs : CacheConfig.MaxMessageSize; AsyncQueue<ChangeEvent>? nodeQueue = null; + //The connection may be a caching server node, so get its node-id if (!string.IsNullOrWhiteSpace(nodeId)) { @@ -317,7 +388,7 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints * and change events may be processed on mutliple threads. */ - BoundedChannelOptions queueOptions = new(MAX_EVENT_QUEUE_SIZE) + BoundedChannelOptions queueOptions = new(CacheConfig.MaxEventQueueDepth) { AllowSynchronousContinuations = true, SingleReader = false, @@ -327,21 +398,41 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints }; _ = StatefulEventQueue.TryAdd(nodeId, new(queueOptions)); + //Get the queue nodeQueue = StatefulEventQueue[nodeId]; } - + + /* + * Buffer sizing can get messy as the response/resquest sizes can vary + * and will include headers, this is a drawback of the FBM protocol + * so we need to properly calculate efficient buffer sizes as + * negotiated with the client. + */ + + int maxMessageSizeClamp = Math.Clamp(maxMessageSize, CacheConfig.MinRecvBufferSize, CacheConfig.MaxRecvBufferSize); + //Init new ws state object and clamp the suggested buffer sizes WsUserState state = new() { - RecvBufferSize = Math.Clamp(recvBufSize, MIN_RECV_BUF_SIZE, MAX_RECV_BUF_SIZE), - MaxHeaderBufferSize = Math.Clamp(maxHeadBufSize, MIN_HEAD_BUF_SIZE, MAX_HEAD_BUF_SIZE), - MaxMessageSize = Math.Clamp(maxMessageSize, MIN_MESSAGE_SIZE, MAX_MESSAGE_SIZE), - MaxResponseBufferSize = Math.Min(maxMessageSize, MAX_RESPONSE_BUFFER_SIZE), + RecvBufferSize = Math.Clamp(recvBufSize, CacheConfig.MinRecvBufferSize, CacheConfig.MaxRecvBufferSize), + MaxHeaderBufferSize = Math.Clamp(maxHeadBufSize, CacheConfig.MinHeaderBufferSize, CacheConfig.MaxHeaderBufferSize), + + MaxMessageSize = maxMessageSizeClamp, + + /* + * Response buffer needs to be large enough to store a max message + * as a response along with all response headers + */ + MaxResponseBufferSize = (int)MemoryUtil.NearestPage(maxMessageSizeClamp), + SyncQueue = nodeQueue }; Log.Debug("Client recv buffer suggestion {recv}, header buffer size {head}, response buffer size {r}", recvBufCmd, maxHeaderCharCmd, state.MaxResponseBufferSize); + + //Print state message to console + Log.Verbose("Client buffer state {state}", state); //Accept socket and pass state object entity.AcceptWebSocket(WebsocketAcceptedAsync, state); @@ -370,6 +461,7 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints RecvBufferSize = state.RecvBufferSize, ResponseBufferSize = state.MaxResponseBufferSize, MaxHeaderBufferSize = state.MaxHeaderBufferSize, + HeaderEncoding = Helpers.DefaultEncoding, }; @@ -395,5 +487,31 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints } Log.Debug("Server websocket exited"); } + + + private sealed class CacheStore : ICacheStore + { + private readonly ObjectCacheStore _cache; + + public CacheStore(ObjectCacheStore cache) + { + _cache = cache; + } + + ValueTask ICacheStore.AddOrUpdateBlobAsync<T>(string objectId, string? alternateId, GetBodyDataCallback<T> bodyData, T state, CancellationToken token) + { + return _cache.AddOrUpdateBlobAsync(objectId, alternateId, bodyData, state, token); + } + + void ICacheStore.Clear() + { + throw new NotImplementedException(); + } + + ValueTask<bool> ICacheStore.DeleteItemAsync(string id, CancellationToken token) + { + return _cache.DeleteItemAsync(id, token); + } + } } } diff --git a/plugins/ObjectCacheServer/src/Endpoints/ICacheStore.cs b/plugins/ObjectCacheServer/src/Endpoints/ICacheStore.cs new file mode 100644 index 0000000..3776269 --- /dev/null +++ b/plugins/ObjectCacheServer/src/Endpoints/ICacheStore.cs @@ -0,0 +1,56 @@ +/* +* Copyright (c) 2022 Vaughn Nugent +* +* Library: VNLib +* Package: ObjectCacheServer +* File: ConnectEndpoint.cs +* +* ConnectEndpoint.cs is part of ObjectCacheServer which is part of the larger +* VNLib collection of libraries and utilities. +* +* ObjectCacheServer is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* ObjectCacheServer is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System.Threading; +using System.Threading.Tasks; + +namespace VNLib.Data.Caching.ObjectCache.Server +{ + internal interface ICacheStore + { + /// <summary> + /// Asynchronously adds or updates an object in the store and optionally update's its id + /// </summary> + /// <param name="objectId">The current (or old) id of the object</param> + /// <param name="alternateId">An optional id to update the blob to</param> + /// <param name="bodyData">A callback that returns the data for the blob</param> + /// <param name="state">The state parameter to pass to the data callback</param> + /// <param name="token">A token to cancel the async operation</param> + /// <returns>A value task that represents the async operation</returns> + ValueTask AddOrUpdateBlobAsync<T>(string objectId, string? alternateId, GetBodyDataCallback<T> bodyData, T state, CancellationToken token = default); + + /// <summary> + /// Clears all items from the store + /// </summary> + void Clear(); + + /// <summary> + /// Asynchronously deletes a previously stored item + /// </summary> + /// <param name="id">The id of the object to delete</param> + /// <param name="token">A token to cancel the async lock await</param> + /// <returns>A task that completes when the item has been deleted</returns> + ValueTask<bool> DeleteItemAsync(string id, CancellationToken token = default); + } +} diff --git a/plugins/ObjectCacheServer/src/ObjectCacheServer.csproj b/plugins/ObjectCacheServer/src/ObjectCacheServer.csproj index 672597b..38f5b97 100644 --- a/plugins/ObjectCacheServer/src/ObjectCacheServer.csproj +++ b/plugins/ObjectCacheServer/src/ObjectCacheServer.csproj @@ -2,32 +2,23 @@ <PropertyGroup> <TargetFramework>net6.0</TargetFramework> <Nullable>enable</Nullable> - <Authors>Vaughn Nugent</Authors> + <RootNamespace>VNLib.Data.Caching.ObjectCache.Server</RootNamespace> <Version>1.0.1.1</Version> - <RootNamespace>VNLib.Plugins.Essentials.Sessions.Server</RootNamespace> - <Copyright>Copyright © 2023 Vaughn Nugent</Copyright> <SignAssembly>True</SignAssembly> <AssemblyOriginatorKeyFile>\\vaughnnugent.com\Internal\Folder Redirection\vman\Documents\Programming\Software\StrongNameingKey.snk</AssemblyOriginatorKeyFile> + <EnableDynamicLoading>true</EnableDynamicLoading> + <AnalysisLevel>latest-all</AnalysisLevel> + <ProduceReferenceAssembly>True</ProduceReferenceAssembly> + <GenerateDocumentationFile>False</GenerateDocumentationFile> </PropertyGroup> <!-- Resolve nuget dll files and store them in the output dir --> <PropertyGroup> - <EnableDynamicLoading>true</EnableDynamicLoading> - <GenerateDocumentationFile>False</GenerateDocumentationFile> - <PackageProjectUrl>https://www.vaughnnugent.com/resources</PackageProjectUrl> - <AnalysisLevel>latest-all</AnalysisLevel> - <ProduceReferenceAssembly>True</ProduceReferenceAssembly> - + <Authors>Vaughn Nugent</Authors> + <Copyright>Copyright © 2023 Vaughn Nugent</Copyright> + <PackageProjectUrl>https://www.vaughnnugent.com/resources/software</PackageProjectUrl> </PropertyGroup> <ItemGroup> - <Compile Remove="liveplugin2\**" /> - <Compile Remove="liveplugin\**" /> - <EmbeddedResource Remove="liveplugin2\**" /> - <EmbeddedResource Remove="liveplugin\**" /> - <None Remove="liveplugin2\**" /> - <None Remove="liveplugin\**" /> - </ItemGroup> - <ItemGroup> <PackageReference Include="ErrorProne.NET.CoreAnalyzers" Version="0.1.2"> <PrivateAssets>all</PrivateAssets> <IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets> @@ -42,10 +33,4 @@ <ProjectReference Include="..\..\..\lib\VNLib.Data.Caching.ObjectCache\src\VNLib.Data.Caching.ObjectCache.csproj" /> <ProjectReference Include="..\..\CacheBroker\src\CacheBroker.csproj" /> </ItemGroup> - <ItemGroup> - <None Update="ObjectCacheServer.json"> - <CopyToOutputDirectory>Always</CopyToOutputDirectory> - </None> - </ItemGroup> - </Project> diff --git a/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs b/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs index 7b7b9fb..d6dbd9b 100644 --- a/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs +++ b/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs @@ -34,34 +34,67 @@ using System.Threading.Tasks; using System.Collections.Generic; using System.Security.Cryptography; +using VNLib.Plugins; using VNLib.Utils.Memory; using VNLib.Utils.Logging; -using VNLib.Utils.Extensions; +using VNLib.Utils.Memory.Diagnostics; using VNLib.Hashing; using VNLib.Hashing.IdentityUtility; -using VNLib.Data.Caching; using VNLib.Data.Caching.Extensions; -using VNLib.Data.Caching.ObjectCache; using static VNLib.Data.Caching.Constants; using VNLib.Net.Messaging.FBM; using VNLib.Net.Messaging.FBM.Client; using VNLib.Plugins.Cache.Broker.Endpoints; using VNLib.Plugins.Extensions.Loading; using VNLib.Plugins.Extensions.Loading.Routing; -using VNLib.Plugins.Essentials.Sessions.Server.Endpoints; -namespace VNLib.Plugins.Essentials.Sessions.Server +namespace VNLib.Data.Caching.ObjectCache.Server { public sealed class ObjectCacheServerEntry : PluginBase { public override string PluginName => "ObjectCache.Service"; - private string? BrokerHeartBeatToken; + private readonly Lazy<IUnmangedHeap> _cacheHeap; + private readonly object ServerLock; + private readonly HashSet<ActiveServer> ListeningServers; + private readonly ManualResetEvent BrokerSyncHandle; + + /// <summary> + /// Gets the shared heap for the plugin + /// </summary> + internal IUnmangedHeap CacheHeap => _cacheHeap.Value; - private readonly object ServerLock = new(); - private readonly HashSet<ActiveServer> ListeningServers = new(); + public ObjectCacheServerEntry() + { + //Init heap + _cacheHeap = new Lazy<IUnmangedHeap>(InitializeHeap, LazyThreadSafetyMode.PublicationOnly); + + ServerLock = new(); + ListeningServers = new(); + //Set sync handle + BrokerSyncHandle = new(false); + } + + private IUnmangedHeap InitializeHeap() + { + //Create default heap + IUnmangedHeap _heap = MemoryUtil.InitializeNewHeapForProcess(); + try + { + //If the plugin is in debug mode enable heap tracking + return this.IsDebug() ? new TrackedHeapWrapper(_heap) : _heap; + } + catch + { + _heap.Dispose(); + throw; + } + } + + + private string? BrokerHeartBeatToken; private void RemoveServer(ActiveServer server) { @@ -71,55 +104,45 @@ namespace VNLib.Plugins.Essentials.Sessions.Server } } + private FBMClientConfig ClientConfig; + + protected override void OnLoad() { - //Create default heap - IUnmangedHeap CacheHeap = MemoryUtil.InitializeNewHeapForProcess(); try { IReadOnlyDictionary<string, JsonElement> clusterConf = this.GetConfig("cluster"); - string brokerAddress = clusterConf["broker_address"].GetString() ?? throw new KeyNotFoundException("Missing required key 'broker_address' for config 'cluster'"); - - string swapDir = PluginConfig.GetProperty("swap_dir").GetString() ?? throw new KeyNotFoundException("Missing required key 'swap_dir' for config"); - int cacheSize = PluginConfig.GetProperty("max_cache").GetInt32(); - string connectPath = PluginConfig.GetProperty("connect_path").GetString() ?? throw new KeyNotFoundException("Missing required element 'connect_path' for config 'cluster'"); - //TimeSpan cleanupInterval = PluginConfig.GetProperty("cleanup_interval_sec").GetTimeSpan(TimeParseType.Seconds); - //TimeSpan validFor = PluginConfig.GetProperty("valid_for_sec").GetTimeSpan(TimeParseType.Seconds); - int maxMessageSize = PluginConfig.GetProperty("max_blob_size").GetInt32(); + Uri brokerAddress = new(clusterConf["broker_address"].GetString() ?? throw new KeyNotFoundException("Missing required key 'broker_address' for config 'cluster'")); - //Init dir - DirectoryInfo dir = new(swapDir); - dir.Create(); - //Init cache listener, single threaded reader - ObjectCacheStore CacheListener = new(dir, cacheSize, Log, CacheHeap, true); - + //Init connect endpoint - { - //Init connect endpoint - ConnectEndpoint endpoint = new(connectPath, CacheListener, this); - Route(endpoint); - } - + ConnectEndpoint endpoint = this.Route<ConnectEndpoint>(); + + //Get the cache store from the connection endpoint + ICacheStore store = endpoint.GetCacheStore(); + + //Log max memory usage + Log.Debug("Maxium memory consumption {mx}Mb", ((ulong)endpoint.CacheConfig.MaxCacheEntries * (ulong)endpoint.CacheConfig.MaxMessageSize) / (ulong)(1024 * 1000)); + //Setup broker and regitration { - //init mre to pass the broker heartbeat signal to the registration worker - ManualResetEvent mre = new(false); + //Route the broker endpoint - BrokerHeartBeat brokerEp = new(() => BrokerHeartBeatToken!, mre, new Uri(brokerAddress), this); + BrokerHeartBeat brokerEp = new(() => BrokerHeartBeatToken!, BrokerSyncHandle, brokerAddress, this); Route(brokerEp); //start registration - _ = this.DeferTask(() => RegisterServerAsync(mre), 200); + _ = this.ObserveTask(() => RegisterServerAsync(endpoint.Path), 200); } //Setup cluster worker { //Get pre-configured fbm client config for caching - FBMClientConfig conf = FBMDataCacheExtensions.GetDefaultConfig(CacheHeap, maxMessageSize, this.IsDebug() ? Log : null); + ClientConfig = FBMDataCacheExtensions.GetDefaultConfig(CacheHeap, endpoint.CacheConfig.MaxMessageSize / 2, this.IsDebug() ? Log : null); //Start Client runner - _ = this.DeferTask(() => RunClientAsync(CacheListener, new Uri(brokerAddress), conf), 300); + _ = this.ObserveTask(() => RunClientAsync(store, brokerAddress), 300); } //Load a cache broker to the current server if the config is defined @@ -129,38 +152,32 @@ namespace VNLib.Plugins.Essentials.Sessions.Server this.Route<BrokerRegistrationEndpoint>(); } } - - void Cleanup() - { - CacheHeap.Dispose(); - CacheListener.Dispose(); - } - - //Regsiter cleanup - _ = UnloadToken.RegisterUnobserved(Cleanup); Log.Information("Plugin loaded"); } catch (KeyNotFoundException kne) { - CacheHeap.Dispose(); Log.Error("Missing required configuration variables {m}", kne.Message); } - catch - { - CacheHeap.Dispose(); - throw; - } } protected override void OnUnLoad() { + //dispose heap if initialized + if(_cacheHeap.IsValueCreated) + { + _cacheHeap.Value.Dispose(); + } + + //Dispose mre sync handle + BrokerSyncHandle.Dispose(); + Log.Information("Plugin unloaded"); } #region Registration - private async Task RegisterServerAsync(ManualResetEvent keepaliveWait) + private async Task RegisterServerAsync(string connectPath) { try { @@ -169,9 +186,9 @@ namespace VNLib.Plugins.Essentials.Sessions.Server //Server id is just dns name for now string serverId = Dns.GetHostName(); - int heartBeatDelayMs = clusterConfig["heartbeat_timeout_sec"].GetInt32() * 1000; - string? connectPath = PluginConfig.GetProperty("connect_path").GetString(); + int heartBeatDelayMs = clusterConfig["heartbeat_timeout_sec"].GetInt32() * 1000; + //Get the port of the primary webserver int port; @@ -227,15 +244,17 @@ namespace VNLib.Plugins.Essentials.Sessions.Server while (true) { await Task.Delay(heartBeatDelayMs, UnloadToken); + //Set the timeout to 0 to it will just check the status without blocking - if (!keepaliveWait.WaitOne(0)) + if (!BrokerSyncHandle.WaitOne(0)) { //server miseed a keepalive event, time to break the loop and retry Log.Debug("Broker missed a heartbeat request, attempting to re-register"); break; } + //Reset the msr - keepaliveWait.Reset(); + BrokerSyncHandle.Reset(); } } catch (TaskCanceledException) @@ -275,7 +294,6 @@ namespace VNLib.Plugins.Essentials.Sessions.Server } finally { - keepaliveWait.Dispose(); BrokerHeartBeatToken = null; } Log.Debug("Registration worker exited"); @@ -305,20 +323,25 @@ namespace VNLib.Plugins.Essentials.Sessions.Server /// <param name="serverId">The node-id of the current server</param> /// <param name="clientConf">The configuration to use when initializing synchronization clients</param> /// <returns>A task that resolves when the plugin unloads</returns> - private async Task RunClientAsync(ObjectCacheStore cacheStore, Uri brokerAddress, FBMClientConfig clientConf) + private async Task RunClientAsync(ICacheStore cacheStore, Uri brokerAddress) { TimeSpan noServerDelay = TimeSpan.FromSeconds(10); + + //The node id is just the dns hostname of the current machine string nodeId = Dns.GetHostName(); + ListServerRequest listRequest = new(brokerAddress); try { //Get the broker config element IReadOnlyDictionary<string, JsonElement> clusterConf = this.GetConfig("cluster"); + int serverCheckMs = clusterConf["update_interval_sec"].GetInt32() * 1000; //Setup signing and verification certificates ReadOnlyJsonWebKey cacheSig = await GetCachePrivate(); ReadOnlyJsonWebKey brokerPub = await GetBrokerPublic(); + //Import certificates listRequest.WithVerificationKey(brokerPub) .WithSigningKey(cacheSig); @@ -340,6 +363,7 @@ namespace VNLib.Plugins.Essentials.Sessions.Server //Get server list servers = await FBMDataCacheExtensions.ListServersAsync(listRequest, UnloadToken); + //Servers are loaded, so continue break; } @@ -355,8 +379,10 @@ namespace VNLib.Plugins.Essentials.Sessions.Server { Log.Warn(ex, "Failed to get server list from broker"); } + //Gen random ms delay int randomMsDelay = RandomNumberGenerator.GetInt32(1000, 2000); + //Delay await Task.Delay(randomMsDelay, UnloadToken); } @@ -386,7 +412,7 @@ namespace VNLib.Plugins.Essentials.Sessions.Server ListeningServers.Add(server); //Run listener background task - _ = this.DeferTask(() => RunSyncTaskAsync(server, cacheStore, clientConf, nodeId)); + _ = this.ObserveTask(() => RunSyncTaskAsync(server, cacheStore, nodeId)); } } } @@ -417,10 +443,10 @@ namespace VNLib.Plugins.Essentials.Sessions.Server Log.Debug("Cluster sync worker exited"); } - private async Task RunSyncTaskAsync(ActiveServer server, ObjectCacheStore cacheStore, FBMClientConfig conf, string nodeId) + private async Task RunSyncTaskAsync(ActiveServer server, ICacheStore cacheStore, string nodeId) { //Setup client - FBMClient client = new(conf); + FBMClient client = new(ClientConfig); try { async Task UpdateRecordAsync(string objectId, string newId) @@ -440,7 +466,7 @@ namespace VNLib.Plugins.Essentials.Sessions.Server response.ThrowIfNotSet(); //Check response code - string status = response.Headers.First(static s => s.Key == HeaderCommand.Status).Value.ToString(); + string status = response.Headers.First(static s => s.Header == HeaderCommand.Status).Value.ToString(); if (ResponseCodes.Okay.Equals(status, StringComparison.Ordinal)) { //Update the record |