From dc7ad57c845cc9b1b502e5e8b12ce96af4183dc4 Mon Sep 17 00:00:00 2001 From: vman Date: Fri, 18 Nov 2022 17:15:36 -0500 Subject: Add project files. --- VNLib.Data.Caching/src/BlobCache.cs | 118 +++++++ VNLib.Data.Caching/src/BlobItem.cs | 185 +++++++++++ VNLib.Data.Caching/src/CacheListener.cs | 40 +++ VNLib.Data.Caching/src/ClientExtensions.cs | 310 ++++++++++++++++++ VNLib.Data.Caching/src/ClientRetryManager.cs | 83 +++++ VNLib.Data.Caching/src/Constants.cs | 32 ++ .../src/Exceptions/InvalidStatusException.cs | 39 +++ .../src/Exceptions/MessageTooLargeException.cs | 26 ++ .../src/Exceptions/ObjectNotFoundException.cs | 23 ++ VNLib.Data.Caching/src/VNLib.Data.Caching.csproj | 34 ++ VNLib.Data.Caching/src/VNLib.Data.Caching.xml | 354 +++++++++++++++++++++ VNLib.Data.Caching/src/WaitForChangeResult.cs | 21 ++ 12 files changed, 1265 insertions(+) create mode 100644 VNLib.Data.Caching/src/BlobCache.cs create mode 100644 VNLib.Data.Caching/src/BlobItem.cs create mode 100644 VNLib.Data.Caching/src/CacheListener.cs create mode 100644 VNLib.Data.Caching/src/ClientExtensions.cs create mode 100644 VNLib.Data.Caching/src/ClientRetryManager.cs create mode 100644 VNLib.Data.Caching/src/Constants.cs create mode 100644 VNLib.Data.Caching/src/Exceptions/InvalidStatusException.cs create mode 100644 VNLib.Data.Caching/src/Exceptions/MessageTooLargeException.cs create mode 100644 VNLib.Data.Caching/src/Exceptions/ObjectNotFoundException.cs create mode 100644 VNLib.Data.Caching/src/VNLib.Data.Caching.csproj create mode 100644 VNLib.Data.Caching/src/VNLib.Data.Caching.xml create mode 100644 VNLib.Data.Caching/src/WaitForChangeResult.cs (limited to 'VNLib.Data.Caching/src') diff --git a/VNLib.Data.Caching/src/BlobCache.cs b/VNLib.Data.Caching/src/BlobCache.cs new file mode 100644 index 0000000..89818be --- /dev/null +++ b/VNLib.Data.Caching/src/BlobCache.cs @@ -0,0 +1,118 @@ +using System; +using System.IO; +using System.Linq; +using System.Collections.Generic; +using System.Diagnostics.CodeAnalysis; + +using VNLib.Utils.IO; +using VNLib.Utils.Logging; +using VNLib.Utils.Memory; +using VNLib.Utils.Memory.Caching; + + +#nullable enable + +namespace VNLib.Data.Caching +{ + /// + /// A general purpose binary data storage + /// + public class BlobCache : LRUCache> + { + readonly IUnmangedHeap Heap; + readonly DirectoryInfo SwapDir; + readonly ILogProvider Log; + /// + public override bool IsReadOnly { get; } + /// + protected override int MaxCapacity { get; } + + + /// + /// Initializes a new store + /// + /// The to swap blob data to when cache + /// The maximum number of items to keep in memory + /// A to write log data to + /// A to allocate buffers and store data in memory + public BlobCache(DirectoryInfo swapDir, int maxCapacity, ILogProvider log, IUnmangedHeap heap) + :base(StringComparer.Ordinal) + { + IsReadOnly = false; + MaxCapacity = maxCapacity; + SwapDir = swapDir; + //Update the lookup table size + LookupTable.EnsureCapacity(maxCapacity); + //Set default heap if not specified + Heap = heap; + Log = log; + } + /// + protected override bool CacheMiss(string key, [NotNullWhen(true)] out MemoryHandle? value) + { + value = null; + return false; + } + /// + protected override void Evicted(KeyValuePair> evicted) + { + //Dispose the blob + evicted.Value.Dispose(); + } + /// + /// If the is found in the store, changes the key + /// that referrences the blob. + /// + /// The key that currently referrences the blob in the store + /// The new key that will referrence the blob + /// The if its found in the store + /// True if the record was found and the key was changes + public bool TryChangeKey(string currentKey, string newKey, [NotNullWhen(true)] out MemoryHandle? blob) + { + if (LookupTable.Remove(currentKey, out LinkedListNode>>? node)) + { + //Remove the node from the ll + List.Remove(node); + //Update the node kvp + blob = node.Value.Value; + node.Value = new KeyValuePair>(newKey, blob); + //Add to end of list + List.AddLast(node); + //Re-add to lookup table with new key + LookupTable.Add(newKey, node); + return true; + } + blob = null; + return false; + } + /// + /// Removes the from the store without disposing the blobl + /// + /// The key that referrences the in the store + /// A value indicating if the blob was removed + public override bool Remove(string key) + { + //Remove the item from the lookup table and if it exists, remove the node from the list + if (LookupTable.Remove(key, out LinkedListNode>>? node)) + { + //Remove the new from the list + List.Remove(node); + //dispose the buffer + node.Value.Value.Dispose(); + return true; + } + return false; + } + /// + /// Removes and disposes all blobl elements in cache (or in the backing store) + /// + public override void Clear() + { + foreach (MemoryHandle blob in List.Select(kp => kp.Value)) + { + blob.Dispose(); + } + base.Clear(); + } + } +} diff --git a/VNLib.Data.Caching/src/BlobItem.cs b/VNLib.Data.Caching/src/BlobItem.cs new file mode 100644 index 0000000..a5630e9 --- /dev/null +++ b/VNLib.Data.Caching/src/BlobItem.cs @@ -0,0 +1,185 @@ +using System; +using System.IO; +using System.Threading; +using System.Threading.Tasks; + +using VNLib.Utils; +using VNLib.Utils.IO; +using VNLib.Utils.Memory; +using VNLib.Utils.Logging; +using VNLib.Utils.Extensions; + +#nullable enable + +namespace VNLib.Data.Caching +{ + /// + /// A general purpose binary storage item + /// + public class BlobItem //: VnDisposeable + { + /* + private static readonly JoinableTaskContext JTX = new(); + private static readonly Semaphore CentralSwapLock = new(Environment.ProcessorCount, Environment.ProcessorCount); + + private readonly VnMemoryStream _loadedData; + private bool _loaded; + + /// + /// The time the blob was last modified + /// + public DateTimeOffset LastAccessed { get; private set; } + + + /// + /// Gets the current size of the file (in bytes) as an atomic operation + /// + public int FileSize => (int)_loadedData.Length; + /// + /// The operation synchronization lock + /// + public AsyncReaderWriterLock OpLock { get; } + /// + /// Initializes a new + /// + /// The heap to allocate buffers from + internal BlobItem(IUnmangedHeap heap) + { + _loadedData = new(heap); + OpLock = new AsyncReaderWriterLock(JTX); + _loaded = true; + LastAccessed = DateTimeOffset.UtcNow; + } + /// + protected override void Free() + { + _loadedData.Dispose(); + OpLock.Dispose(); + } + + /// + /// Reads data from the internal buffer and copies it to the specified buffer. + /// Use the property to obtain the size of the internal buffer + /// + /// The buffer to copy data to + /// When completed, the number of bytes copied to the buffer + public int Read(Span buffer) + { + //Make sure the blob has been swapped back into memory + if (!_loaded) + { + throw new InvalidOperationException("The blob was not loaded from the disk"); + } + //Read all data from the buffer and write it to the output buffer + _loadedData.AsSpan().CopyTo(buffer); + //Update last-accessed + LastAccessed = DateTimeOffset.UtcNow; + return (int)_loadedData.Length; + } + /// + /// Overwrites the internal buffer with the contents of the supplied buffer + /// + /// The buffer containing data to store within the blob + /// A that completes when write access has been granted and copied + /// + public void Write(ReadOnlySpan buffer) + { + //Make sure the blob has been swapped back into memory + if (!_loaded) + { + throw new InvalidOperationException("The blob was not loaded from the disk"); + } + //Reset the buffer + _loadedData.SetLength(buffer.Length); + _loadedData.Seek(0, SeekOrigin.Begin); + _loadedData.Write(buffer); + LastAccessed = DateTimeOffset.UtcNow; + } + + /// + /// Writes the contents of the memory buffer to its designated file on the disk + /// + /// The heap to allocate buffers from + /// The that stores the file + /// The name of the file to write data do + /// A log to write errors to + /// A task that completes when the swap to disk is complete + internal async Task SwapToDiskAsync(IUnmangedHeap heap, DirectoryInfo swapDir, string filename, ILogProvider log) + { + try + { + //Wait for write lock + await using (AsyncReaderWriterLock.Releaser releaser = await OpLock.WriteLockAsync()) + { + //Enter swap lock + await CentralSwapLock; + try + { + //Open swap file data stream + await using FileStream swapFile = swapDir.OpenFile(filename, FileMode.OpenOrCreate, FileAccess.ReadWrite, bufferSize: 8128); + //reset swap file + swapFile.SetLength(0); + //Seek loaded-data back to 0 before writing + _loadedData.Seek(0, SeekOrigin.Begin); + //Write loaded data to disk + await _loadedData.CopyToAsync(swapFile, 8128, heap); + } + finally + { + CentralSwapLock.Release(); + } + //Release memory held by stream + _loadedData.SetLength(0); + //Clear loaded flag + _loaded = false; + LastAccessed = DateTimeOffset.UtcNow; + } + log.Debug("Blob {name} swapped to disk", filename); + } + catch(Exception ex) + { + log.Error(ex, "Blob swap to disk error"); + } + } + /// + /// Reads the contents of the blob into a memory buffer from its designated file on disk + /// + /// The heap to allocate buffers from + /// The that stores the file + /// The name of the file to write the blob data to + /// A log to write errors to + /// A task that completes when the swap from disk is complete + internal async Task SwapFromDiskAsync(IUnmangedHeap heap, DirectoryInfo swapDir, string filename, ILogProvider log) + { + try + { + //Wait for write lock + await using (AsyncReaderWriterLock.Releaser releaser = await OpLock.WriteLockAsync()) + { + //Enter swap lock + await CentralSwapLock; + try + { + //Open swap file data stream + await using FileStream swapFile = swapDir.OpenFile(filename, FileMode.OpenOrCreate, FileAccess.Read, bufferSize:8128); + //Copy from disk to memory + await swapFile.CopyToAsync(_loadedData, 8128, heap); + } + finally + { + CentralSwapLock.Release(); + } + //Set loaded flag + _loaded = true; + LastAccessed = DateTimeOffset.UtcNow; + } + log.Debug("Blob {name} swapped from disk", filename); + } + catch(Exception ex) + { + log.Error(ex, "Blob swap from disk error"); + } + } + */ + } +} diff --git a/VNLib.Data.Caching/src/CacheListener.cs b/VNLib.Data.Caching/src/CacheListener.cs new file mode 100644 index 0000000..18e9684 --- /dev/null +++ b/VNLib.Data.Caching/src/CacheListener.cs @@ -0,0 +1,40 @@ +using System; +using System.IO; + +using VNLib.Utils.Memory; +using VNLib.Net.Messaging.FBM.Server; + +namespace VNLib.Data.Caching +{ + /// + /// A base implementation of a memory/disk LRU data cache FBM listener + /// + public abstract class CacheListener : FBMListenerBase + { + /// + /// The directory swap files will be stored + /// + public DirectoryInfo? Directory { get; private set; } + /// + /// The Cache store to access data blobs + /// + protected BlobCache? Cache { get; private set; } + /// + /// The to allocate buffers from + /// + protected IUnmangedHeap? Heap { get; private set; } + + /// + /// Initializes the data store + /// + /// The directory to swap cache records to + /// The size of the LRU cache + /// The heap to allocate buffers from + protected void InitCache(DirectoryInfo dir, int cacheSize, IUnmangedHeap heap) + { + Heap = heap; + Cache = new(dir, cacheSize, Log, Heap); + Directory = dir; + } + } +} diff --git a/VNLib.Data.Caching/src/ClientExtensions.cs b/VNLib.Data.Caching/src/ClientExtensions.cs new file mode 100644 index 0000000..18a1aa9 --- /dev/null +++ b/VNLib.Data.Caching/src/ClientExtensions.cs @@ -0,0 +1,310 @@ +using System; +using System.IO; +using System.Linq; +using System.Buffers; +using System.Text.Json; +using System.Threading; +using System.Threading.Tasks; +using System.Collections.Generic; +using System.Text.Json.Serialization; +using System.Runtime.CompilerServices; + +using VNLib.Utils.Logging; +using VNLib.Net.Messaging.FBM; +using VNLib.Net.Messaging.FBM.Client; +using VNLib.Net.Messaging.FBM.Server; +using VNLib.Data.Caching.Exceptions; + +using static VNLib.Data.Caching.Constants; + +namespace VNLib.Data.Caching +{ + + /// + /// Provides caching extension methods for + /// + public static class ClientExtensions + { + private static readonly JsonSerializerOptions LocalOptions = new() + { + DictionaryKeyPolicy = JsonNamingPolicy.CamelCase, + NumberHandling = JsonNumberHandling.Strict, + ReadCommentHandling = JsonCommentHandling.Disallow, + WriteIndented = false, + DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull, + IgnoreReadOnlyFields = true, + PropertyNameCaseInsensitive = true, + IncludeFields = false, + + //Use small buffers + DefaultBufferSize = 128 + }; + + + private static readonly ConditionalWeakTable GetLock = new(); + private static readonly ConditionalWeakTable UpdateLock = new(); + + private static SemaphoreSlim GetLockCtor(FBMClient client) => new (50); + + private static SemaphoreSlim UpdateLockCtor(FBMClient client) => new (25); + + /// + /// Gets an object from the server if it exists + /// + /// + /// + /// The id of the object to get + /// A token to cancel the operation + /// A task that completes to return the results of the response payload + /// + /// + /// + /// + /// + public static async Task GetObjectAsync(this FBMClient client, string objectId, CancellationToken cancellationToken = default) + { + client.Config.DebugLog?.Debug("[DEBUG] Getting object {id}", objectId); + SemaphoreSlim getLock = GetLock.GetValue(client, GetLockCtor); + //Wait for entry + await getLock.WaitAsync(cancellationToken); + //Rent a new request + FBMRequest request = client.RentRequest(); + try + { + //Set action as get/create + request.WriteHeader(HeaderCommand.Action, Actions.Get); + //Set session-id header + request.WriteHeader(Constants.ObjectId, objectId); + + //Make request + using FBMResponse response = await client.SendAsync(request, cancellationToken); + + response.ThrowIfNotSet(); + //Get the status code + ReadOnlyMemory status = response.Headers.FirstOrDefault(static a => a.Key == HeaderCommand.Status).Value; + if (status.Span.Equals(ResponseCodes.Okay, StringComparison.Ordinal)) + { + return JsonSerializer.Deserialize(response.ResponseBody, LocalOptions); + } + //Session may not exist on the server yet + if (status.Span.Equals(ResponseCodes.NotFound, StringComparison.Ordinal)) + { + return default; + } + throw new InvalidStatusException("Invalid status code recived for object get request", status.ToString()); + } + finally + { + getLock.Release(); + client.ReturnRequest(request); + } + } + + /// + /// Updates the state of the object, and optionally updates the ID of the object. The data + /// parameter is serialized, buffered, and streamed to the remote server + /// + /// + /// + /// The id of the object to update or replace + /// An optional parameter to specify a new ID for the old object + /// The payload data to serialize and set as the data state of the session + /// A token to cancel the operation + /// A task that resolves when the server responds + /// + /// + /// + /// + /// + /// + /// + public static async Task AddOrUpdateObjectAsync(this FBMClient client, string objectId, string? newId, T data, CancellationToken cancellationToken = default) + { + client.Config.DebugLog?.Debug("[DEBUG] Updating object {id}, newid {nid}", objectId, newId); + SemaphoreSlim updateLock = UpdateLock.GetValue(client, UpdateLockCtor); + //Wait for entry + await updateLock.WaitAsync(cancellationToken); + //Rent a new request + FBMRequest request = client.RentRequest(); + try + { + //Set action as get/create + request.WriteHeader(HeaderCommand.Action, Actions.AddOrUpdate); + //Set session-id header + request.WriteHeader(Constants.ObjectId, objectId); + //if new-id set, set the new-id header + if (!string.IsNullOrWhiteSpace(newId)) + { + request.WriteHeader(Constants.NewObjectId, newId); + } + //Get the body writer for the message + IBufferWriter bodyWriter = request.GetBodyWriter(); + //Write json data to the message + using (Utf8JsonWriter jsonWriter = new(bodyWriter)) + { + JsonSerializer.Serialize(jsonWriter, data, LocalOptions); + } + + //Make request + using FBMResponse response = await client.SendAsync(request, cancellationToken); + + response.ThrowIfNotSet(); + //Get the status code + ReadOnlyMemory status = response.Headers.FirstOrDefault(static a => a.Key == HeaderCommand.Status).Value; + //Check status code + if (status.Span.Equals(ResponseCodes.Okay, StringComparison.OrdinalIgnoreCase)) + { + return; + } + else if(status.Span.Equals(ResponseCodes.NotFound, StringComparison.OrdinalIgnoreCase)) + { + throw new ObjectNotFoundException($"object {objectId} not found on remote server"); + } + //Invalid status + throw new InvalidStatusException("Invalid status code recived for object upsert request", status.ToString()); + } + finally + { + updateLock.Release(); + //Return the request(clears data and reset) + client.ReturnRequest(request); + } + } + + /// + /// Asynchronously deletes an object in the remote store + /// + /// + /// The id of the object to update or replace + /// A token to cancel the operation + /// A task that resolves when the operation has completed + /// + /// + /// + /// + public static async Task DeleteObjectAsync(this FBMClient client, string objectId, CancellationToken cancellationToken = default) + { + client.Config.DebugLog?.Debug("[DEBUG] Deleting object {id}", objectId); + + SemaphoreSlim updateLock = UpdateLock.GetValue(client, UpdateLockCtor); + //Wait for entry + await updateLock.WaitAsync(cancellationToken); + //Rent a new request + FBMRequest request = client.RentRequest(); + try + { + //Set action as delete + request.WriteHeader(HeaderCommand.Action, Actions.Delete); + //Set session-id header + request.WriteHeader(Constants.ObjectId, objectId); + + //Make request + using FBMResponse response = await client.SendAsync(request, cancellationToken); + + response.ThrowIfNotSet(); + //Get the status code + ReadOnlyMemory status = response.Headers.FirstOrDefault(static a => a.Key == HeaderCommand.Status).Value; + if (status.Span.Equals(ResponseCodes.Okay, StringComparison.Ordinal)) + { + return; + } + else if(status.Span.Equals(ResponseCodes.NotFound, StringComparison.OrdinalIgnoreCase)) + { + throw new ObjectNotFoundException($"object {objectId} not found on remote server"); + } + throw new InvalidStatusException("Invalid status code recived for object get request", status.ToString()); + } + finally + { + updateLock.Release(); + client.ReturnRequest(request); + } + } + + /// + /// Dequeues a change event from the server event queue for the current connection, or waits until a change happens + /// + /// + /// A token to cancel the deuque operation + /// A that contains the modified object id and optionally its new id + public static async Task WaitForChangeAsync(this FBMClient client, CancellationToken cancellationToken = default) + { + //Rent a new request + FBMRequest request = client.RentRequest(); + try + { + //Set action as event dequeue to dequeue a change event + request.WriteHeader(HeaderCommand.Action, Actions.Dequeue); + + //Make request + using FBMResponse response = await client.SendAsync(request, cancellationToken); + + response.ThrowIfNotSet(); + + return new() + { + Status = response.Headers.FirstOrDefault(static a => a.Key == HeaderCommand.Status).Value.ToString(), + CurrentId = response.Headers.SingleOrDefault(static v => v.Key == Constants.ObjectId).Value.ToString(), + NewId = response.Headers.SingleOrDefault(static v => v.Key == Constants.NewObjectId).Value.ToString() + }; + } + finally + { + client.ReturnRequest(request); + } + } + + /// + /// Gets the Object-id for the request message, or throws an if not specified + /// + /// + /// The id of the object requested + /// + public static string ObjectId(this FBMContext context) + { + return context.Request.Headers.First(static kvp => kvp.Key == Constants.ObjectId).Value.ToString(); + } + /// + /// Gets the new ID of the object if specified from the request. Null if the request did not specify an id update + /// + /// + /// The new ID of the object if speicifed, null otherwise + public static string? NewObjectId(this FBMContext context) + { + return context.Request.Headers.FirstOrDefault(static kvp => kvp.Key == Constants.NewObjectId).Value.ToString(); + } + /// + /// Gets the request method for the request + /// + /// + /// The request method string + public static string Method(this FBMContext context) + { + return context.Request.Headers.First(static kvp => kvp.Key == HeaderCommand.Action).Value.ToString(); + } + /// + /// Closes a response with a status code + /// + /// + /// The status code to send to the client + public static void CloseResponse(this FBMContext context, string responseCode) + { + context.Response.WriteHeader(HeaderCommand.Status, responseCode); + } + + + /// + /// Initializes the worker for a reconnect policy and returns an object that can listen for changes + /// and configure the connection as necessary + /// + /// + /// The amount of time to wait between retries + /// The uri to reconnect the client to + /// A for listening for retry events + public static ClientRetryManager SetReconnectPolicy(this T worker, TimeSpan retryDelay, Uri serverUri) where T: IStatefulConnection + { + //Return new manager + return new (worker, retryDelay, serverUri); + } + } +} diff --git a/VNLib.Data.Caching/src/ClientRetryManager.cs b/VNLib.Data.Caching/src/ClientRetryManager.cs new file mode 100644 index 0000000..97d3d3a --- /dev/null +++ b/VNLib.Data.Caching/src/ClientRetryManager.cs @@ -0,0 +1,83 @@ +using System; +using System.Threading.Tasks; +using System.Security.Cryptography; + +using VNLib.Utils; +using VNLib.Net.Messaging.FBM.Client; + +namespace VNLib.Data.Caching +{ + /// + /// Manages a reconnect policy + /// + public class ClientRetryManager : VnDisposeable where T: IStatefulConnection + { + const int RetryRandMaxMsDelay = 1000; + + private readonly TimeSpan RetryDelay; + private readonly T Client; + private readonly Uri ServerUri; + + internal ClientRetryManager(T worker, TimeSpan delay, Uri serverUri) + { + this.Client = worker; + this.RetryDelay = delay; + this.ServerUri = serverUri; + //Register disconnect listener + worker.ConnectionClosed += Worker_Disconnected; + } + + private void Worker_Disconnected(object? sender, EventArgs args) + { + //Exec retry on exit + _ = RetryAsync().ConfigureAwait(false); + } + + + /// + /// Raised before client is to be reconnected + /// + public event Action? OnBeforeReconnect; + /// + /// Raised when the client fails to reconnect. Should return a value that instructs the + /// manager to reconnect + /// + public event Func? OnReconnectFailed; + + async Task RetryAsync() + { + + //Begin random delay with retry ms + int randomDelayMs = (int)RetryDelay.TotalMilliseconds; + //random delay to add to prevent retry-storm + randomDelayMs += RandomNumberGenerator.GetInt32(RetryRandMaxMsDelay); + //Retry loop + bool retry = true; + while (retry) + { + try + { + //Inform Listener for the retry + OnBeforeReconnect?.Invoke(Client); + //wait for delay before reconnecting + await Task.Delay(randomDelayMs); + //Reconnect async + await Client.ConnectAsync(ServerUri).ConfigureAwait(false); + break; + } + catch (Exception Ex) + { + //Invoke error handler, may be null, incase exit + retry = OnReconnectFailed?.Invoke(Client, Ex) ?? false; + } + } + } + + /// + protected override void Free() + { + //Unregister the event listener + Client.ConnectionClosed -= Worker_Disconnected; + } + } +} diff --git a/VNLib.Data.Caching/src/Constants.cs b/VNLib.Data.Caching/src/Constants.cs new file mode 100644 index 0000000..b06ec1f --- /dev/null +++ b/VNLib.Data.Caching/src/Constants.cs @@ -0,0 +1,32 @@ +using System; + +using VNLib.Net.Messaging.FBM; + +namespace VNLib.Data.Caching +{ + public static class Constants + { + /// + /// Contains constants the define actions + /// + public static class Actions + { + public const string Get= "g"; + public const string AddOrUpdate = "u"; + public const string Delete = "d"; + public const string Dequeue = "dq"; + } + /// + /// Containts constants for operation response codes + /// + public static class ResponseCodes + { + public const string Okay = "ok"; + public const string Error = "err"; + public const string NotFound = "nf"; + } + + public const HeaderCommand ObjectId = (HeaderCommand)0xAA; + public const HeaderCommand NewObjectId = (HeaderCommand)0xAB; + } +} diff --git a/VNLib.Data.Caching/src/Exceptions/InvalidStatusException.cs b/VNLib.Data.Caching/src/Exceptions/InvalidStatusException.cs new file mode 100644 index 0000000..f5e35f4 --- /dev/null +++ b/VNLib.Data.Caching/src/Exceptions/InvalidStatusException.cs @@ -0,0 +1,39 @@ +using System; + +using VNLib.Net.Messaging.FBM; + +namespace VNLib.Data.Caching.Exceptions +{ + /// + /// Raised when the response status code of an FBM Request message is not valid for + /// the specified request + /// + public class InvalidStatusException : InvalidResponseException + { + private readonly string? StatusCode; + /// + /// Initalizes a new with the specfied status code + /// + /// + /// + public InvalidStatusException(string message, string statusCode):this(message) + { + this.StatusCode = statusCode; + } + + /// + public InvalidStatusException() + { + } + /// + public InvalidStatusException(string message) : base(message) + { + } + /// + public InvalidStatusException(string message, Exception innerException) : base(message, innerException) + { + } + /// + public override string Message => $"InvalidStatusException: Status Code {StatusCode} \r\n {base.Message}"; + } +} diff --git a/VNLib.Data.Caching/src/Exceptions/MessageTooLargeException.cs b/VNLib.Data.Caching/src/Exceptions/MessageTooLargeException.cs new file mode 100644 index 0000000..e8f19c5 --- /dev/null +++ b/VNLib.Data.Caching/src/Exceptions/MessageTooLargeException.cs @@ -0,0 +1,26 @@ +using System; +using System.Runtime.Serialization; + +using VNLib.Net.Messaging.FBM; + +namespace VNLib.Data.Caching.Exceptions +{ + /// + /// Raised when a request (or server response) calculates the size of the message to be too large to proccess + /// + public class MessageTooLargeException : FBMException + { + /// + public MessageTooLargeException() + {} + /// + public MessageTooLargeException(string message) : base(message) + {} + /// + public MessageTooLargeException(string message, Exception innerException) : base(message, innerException) + {} + /// + protected MessageTooLargeException(SerializationInfo info, StreamingContext context) : base(info, context) + {} + } +} diff --git a/VNLib.Data.Caching/src/Exceptions/ObjectNotFoundException.cs b/VNLib.Data.Caching/src/Exceptions/ObjectNotFoundException.cs new file mode 100644 index 0000000..fb284f3 --- /dev/null +++ b/VNLib.Data.Caching/src/Exceptions/ObjectNotFoundException.cs @@ -0,0 +1,23 @@ +using System; + +namespace VNLib.Data.Caching.Exceptions +{ + /// + /// Raised when a command was executed on a desired object in the remote cache + /// but the object was not found + /// + public class ObjectNotFoundException : InvalidStatusException + { + internal ObjectNotFoundException() + {} + + internal ObjectNotFoundException(string message) : base(message) + {} + + internal ObjectNotFoundException(string message, string statusCode) : base(message, statusCode) + {} + + internal ObjectNotFoundException(string message, Exception innerException) : base(message, innerException) + {} + } +} diff --git a/VNLib.Data.Caching/src/VNLib.Data.Caching.csproj b/VNLib.Data.Caching/src/VNLib.Data.Caching.csproj new file mode 100644 index 0000000..b12da09 --- /dev/null +++ b/VNLib.Data.Caching/src/VNLib.Data.Caching.csproj @@ -0,0 +1,34 @@ + + + + net6.0 + AnyCPU;x64 + Vaughn Nugent + Copyright © 2022 Vaughn Nugent + 1.0.0.1 + True + x64 + enable + + + + + + + + + all + runtime; build; native; contentfiles; analyzers; buildtransitive + + + all + runtime; build; native; contentfiles; analyzers; buildtransitive + + + + + + + + + diff --git a/VNLib.Data.Caching/src/VNLib.Data.Caching.xml b/VNLib.Data.Caching/src/VNLib.Data.Caching.xml new file mode 100644 index 0000000..f1ec423 --- /dev/null +++ b/VNLib.Data.Caching/src/VNLib.Data.Caching.xml @@ -0,0 +1,354 @@ + + + + VNLib.Data.Caching + + + + + A general purpose binary data storage + + + + + + + + + + + Initializes a new store + + The to swap blob data to when cache + The maximum number of items to keep in memory + A to write log data to + A to allocate buffers and store data in memory + + + + Swaps all s that are cached in memory + to disk. + + + + + + + + + + + If the is found in the store, changes the key + that referrences the blob. + + The key that currently referrences the blob in the store + The new key that will referrence the blob + The if its found in the store + True if the record was found and the key was changes + + + + Removes the from the store without disposing the blobl + + The key that referrences the in the store + A value indicating if the blob was removed + + + + Removes and disposes all blobl elements in cache (or in the backing store) + + + + + A general purpose binary storage item + + + + + The time the blob was last modified + + + + + Gets the current size of the file (in bytes) as an atomic operation + + + + + The operation synchronization lock + + + + + Initializes a new + + The heap to allocate buffers from + + + + + + + Reads data from the internal buffer and copies it to the specified buffer. + Use the property to obtain the size of the internal buffer + + The buffer to copy data to + When completed, the number of bytes copied to the buffer + + + + Overwrites the internal buffer with the contents of the supplied buffer + + The buffer containing data to store within the blob + A that completes when write access has been granted and copied + + + + + Writes the contents of the memory buffer to its designated file on the disk + + The heap to allocate buffers from + The that stores the file + The name of the file to write data do + A log to write errors to + A task that completes when the swap to disk is complete + + + + Reads the contents of the blob into a memory buffer from its designated file on disk + + The heap to allocate buffers from + The that stores the file + The name of the file to write the blob data to + A log to write errors to + A task that completes when the swap from disk is complete + + + + A base implementation of a memory/disk LRU data cache FBM listener + + + + + The Cache store to access data blobs + + + + + The to allocate buffers from + + + + + Initializes the data store + + + + + + + + Asynchronously swaps all blobs to the disk + + + + + + + + + + + + Provides caching extension methods for + + + + + Gets an object from the server if it exists + + + + The id of the object to get + The to use for serialization + A token to cancel the operation + A task that completes to return the results of the response payload + + + + + + + + + Updates the state of the session, and optionally updates the ID of the session. The data + property is buffered and streamed to the remote server + + + + The id of the object to update or replace + An optional parameter to specify a new ID for the old object + The payload data to serialize and set as the data state of the session + Optional + A token to cancel the operation + A task that resolves when the server responds + + + + + + + + + + Asynchronously deletes an object in the remote store + + + The id of the object to update or replace + A token to cancel the operation + A task that resolves when the operation has completed + + + + + + + Gets the Object-id for the request message, or throws an if not specified + + + The id of the object requested + + + + + Gets the new ID of the object if specified from the request. Null if the request did not specify an id update + + + The new ID of the object if speicifed, null otherwise + + + + Gets the request method for the request + + + The request method string + + + + Closes a response with a status code + + + The status code to send to the client + The payload to send to the client + + + + Closes a response with a status code + + + The status code to send to the client + + + + Computes the authorization headers for the initial client connection + + + The pre-shared secret used to compute a secure hash of a random token + The size (in bytes) of the salt to compute the hash of + + + + Determines if the client has the proper authorization by verifying the client data can compute the same hash result with + the specified secret + + + The pre-shared secret used to verify the data + True if the authorization headers compute to the proper hash result (keys match), false otherwise + The verification is fixed-time + + + + + Initializes the worker for a reconnect policy and returns an object that can listen for changes + and configure the connection as necessary + + + The amount of time to wait between retries + The uri to reconnect the client to + A for listening for retry events + + + + Manages a reconnect policy + + + + + Raised before client is to be reconnected + + + + + Raised when the client fails to reconnect. Should return a value that instructs the + manager to reconnect + + + + + Raised when the client websocket is successfully reconnected + + + + + Contains constants the define actions + + + + + Containts constants for operation response codes + + + + + Raised when the response status code of an FBM Request message is not valid for + the specified request + + + + + Initalizes a new with the specfied status code + + + + + + + + + + + + + + + + + + + Raised when a request (or server response) calculates the size of the message to be too large to proccess + + + + + + + + + + + + + + + + diff --git a/VNLib.Data.Caching/src/WaitForChangeResult.cs b/VNLib.Data.Caching/src/WaitForChangeResult.cs new file mode 100644 index 0000000..a309c7c --- /dev/null +++ b/VNLib.Data.Caching/src/WaitForChangeResult.cs @@ -0,0 +1,21 @@ +namespace VNLib.Data.Caching +{ + /// + /// The result of a cache server change event + /// + public readonly struct WaitForChangeResult + { + /// + /// The operation status code + /// + public readonly string Status { get; init; } + /// + /// The current (or old) id of the element that changed + /// + public readonly string CurrentId { get; init; } + /// + /// The new id of the element that changed + /// + public readonly string NewId { get; init; } + } +} -- cgit