aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLibravatar vnugent <public@vaughnnugent.com>2023-11-29 00:17:13 -0500
committerLibravatar vnugent <public@vaughnnugent.com>2023-11-29 00:17:13 -0500
commite25666bbf408ff33c09dc8e2c5fe2d052363595f (patch)
treece7a7290e8f013afdc38529b1be7fd8de0827ffd
parent78a47dd6887fbbe33d9526194b6af300a72448fa (diff)
Integrate FBM core changes for immutable client instances
-rw-r--r--lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs42
-rw-r--r--lib/VNLib.Data.Caching.ObjectCache/src/CacheEntry.cs4
-rw-r--r--lib/VNLib.Data.Caching/src/ClientExtensions.cs6
-rw-r--r--lib/VNLib.Data.Caching/src/ClientRetryManager.cs5
-rw-r--r--lib/VNLib.Data.Caching/src/JsonCacheObjectSerializer.cs35
-rw-r--r--lib/VNLib.Data.Caching/src/ReusableJsonWriter.cs69
-rw-r--r--lib/VNLib.Plugins.Extensions.VNCache/src/DataModel/EntityCacheExtensions.cs77
-rw-r--r--plugins/ObjectCacheServer/src/Cache/CacheSystemUtil.cs180
-rw-r--r--plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs10
-rw-r--r--plugins/VNLib.Data.Caching.Providers.Redis/src/RedisClientCacheEntry.cs4
-rw-r--r--plugins/VNLib.Data.Caching.Providers.VNCache/src/FBMCacheClient.cs80
-rw-r--r--plugins/VNLib.Data.Caching.Providers.VNCache/src/MemoryCache.cs1
-rw-r--r--plugins/VNLib.Data.Caching.Providers.VNCache/src/RemoteBackedMemoryCache.cs2
-rw-r--r--plugins/VNLib.Data.Caching.Providers.VNCache/src/VNCacheBase.cs4
-rw-r--r--plugins/VNLib.Data.Caching.Providers.VNCache/src/VnCacheClientConfig.cs6
15 files changed, 222 insertions, 303 deletions
diff --git a/lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs b/lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs
index 708b3f5..bd86461 100644
--- a/lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs
+++ b/lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs
@@ -88,7 +88,7 @@ namespace VNLib.Data.Caching.Extensions
return adapter;
}
- private static readonly ConditionalWeakTable<FBMClient, CacheClientConfiguration> ClientCacheConfig = new();
+ private static readonly ConditionalWeakTable<FBMClientFactory, CacheClientConfiguration> ClientCacheConfig = new();
/// <summary>
/// Gets a <see cref="FBMClientConfig"/> preconfigured object caching
@@ -358,7 +358,7 @@ namespace VNLib.Data.Caching.Extensions
/// </summary>
/// <param name="client"></param>
/// <returns>A fluent api configuration builder for the current client</returns>
- public static CacheClientConfiguration GetCacheConfiguration(this FBMClient client) => ClientCacheConfig.GetOrCreateValue(client);
+ public static CacheClientConfiguration GetCacheConfiguration(this FBMClientFactory client) => ClientCacheConfig.GetOrCreateValue(client);
/// <summary>
/// Explicitly set the client cache configuration for the current client
@@ -366,7 +366,7 @@ namespace VNLib.Data.Caching.Extensions
/// <param name="client"></param>
/// <param name="config">The cache node configuration</param>
/// <returns>The config instance</returns>
- public static CacheClientConfiguration SetCacheConfiguration(this FBMClient client, CacheClientConfiguration config)
+ public static CacheClientConfiguration SetCacheConfiguration(this FBMClientFactory client, CacheClientConfiguration config)
{
ClientCacheConfig.AddOrUpdate(client, config);
return config;
@@ -378,7 +378,7 @@ namespace VNLib.Data.Caching.Extensions
/// <param name="client"></param>
/// <param name="nodeConfig">The cache node configuration</param>
/// <returns>The config instance</returns>
- public static CacheNodeConfiguration SetCacheConfiguration(this FBMClient client, CacheNodeConfiguration nodeConfig)
+ public static CacheNodeConfiguration SetCacheConfiguration(this FBMClientFactory client, CacheNodeConfiguration nodeConfig)
{
ClientCacheConfig.AddOrUpdate(client, nodeConfig);
return nodeConfig;
@@ -425,7 +425,7 @@ namespace VNLib.Data.Caching.Extensions
/// <param name="client"></param>
/// <param name="cancellation">A token to cancel the operation</param>
/// <returns>A task that completes when all nodes have been discovered</returns>
- public static Task DiscoverAvailableNodesAsync(this FBMClient client, CancellationToken cancellation = default)
+ public static Task DiscoverAvailableNodesAsync(this FBMClientFactory client, CancellationToken cancellation = default)
{
//Get stored config
CacheClientConfiguration conf = ClientCacheConfig.GetOrCreateValue(client);
@@ -446,7 +446,7 @@ namespace VNLib.Data.Caching.Extensions
/// <exception cref="ArgumentNullException"></exception>
/// <exception cref="SecurityException"></exception>
/// <exception cref="ObjectDisposedException"></exception>
- public static async Task<CacheNodeAdvertisment> ConnectToRandomCacheAsync(this FBMClient client, CancellationToken cancellation = default)
+ public static async Task<CacheNodeAdvertisment> ConnectToRandomCacheAsync(this FBMClientFactory client, CancellationToken cancellation = default)
{
//Get stored config
CacheClientConfiguration conf = ClientCacheConfig.GetOrCreateValue(client);
@@ -467,7 +467,7 @@ namespace VNLib.Data.Caching.Extensions
/// <summary>
/// Connects to the specified server on the configured cache client
/// </summary>
- /// <param name="client"></param>
+ /// <param name="factory"></param>
/// <param name="server">The server to connect to</param>
/// <param name="token">A token to cancel the operation</param>
/// <returns>A task that resolves when the client is connected to the cache server</returns>
@@ -477,16 +477,28 @@ namespace VNLib.Data.Caching.Extensions
/// <exception cref="ArgumentNullException"></exception>
/// <exception cref="SecurityException"></exception>
/// <exception cref="ObjectDisposedException"></exception>
- public static Task ConnectToCacheAsync(this FBMClient client, CacheNodeAdvertisment server, CancellationToken token = default)
+ public static async Task<FBMClient> ConnectToCacheAsync(this FBMClientFactory factory, CacheNodeAdvertisment server, CancellationToken token = default)
{
- _ = client ?? throw new ArgumentNullException(nameof(client));
+ _ = factory ?? throw new ArgumentNullException(nameof(factory));
_ = server ?? throw new ArgumentNullException(nameof(server));
//Get stored config
- CacheClientConfiguration conf = ClientCacheConfig.GetOrCreateValue(client);
+ CacheClientConfiguration conf = ClientCacheConfig.GetOrCreateValue(factory);
- //Connect to server (no server id because client not replication server)
- return ConnectToCacheAsync(client, conf, server, token);
+ //Create new client
+ FBMClient client = factory.CreateClient();
+
+ try
+ {
+ //Connect to server (no server id because client not replication server)
+ await ConnectToCacheAsync(client, conf, server, token);
+ return client;
+ }
+ catch
+ {
+ client.Dispose();
+ throw;
+ }
}
/// <summary>
@@ -549,19 +561,19 @@ namespace VNLib.Data.Caching.Extensions
client.LogDebug("Server negotiation validated, connecting to server");
//The client authorization header is the exact response
- client.ClientSocket.Headers[HttpRequestHeader.Authorization] = response.Content!;
+ client.Headers[HttpRequestHeader.Authorization] = response.Content!;
//See if the supplied config is for a cache node
CacheNodeConfiguration? cnc = config as CacheNodeConfiguration;
//Compute the signature of the upgrade token
- client.ClientSocket.Headers[X_UPGRADE_SIG_HEADER] = config.AuthManager.GetBase64UpgradeSignature(response.Content, cnc != null);
+ client.Headers[X_UPGRADE_SIG_HEADER] = config.AuthManager.GetBase64UpgradeSignature(response.Content, cnc != null);
//Check to see if adversize self is enabled
if (cnc?.BroadcastAdverisment == true)
{
//Set advertisment header
- client.ClientSocket.Headers[X_NODE_DISCOVERY_HEADER] = GetAdvertismentHeader(cnc);
+ client.Headers[X_NODE_DISCOVERY_HEADER] = GetAdvertismentHeader(cnc);
}
//build ws uri from the connect endpoint
diff --git a/lib/VNLib.Data.Caching.ObjectCache/src/CacheEntry.cs b/lib/VNLib.Data.Caching.ObjectCache/src/CacheEntry.cs
index 917052f..9370901 100644
--- a/lib/VNLib.Data.Caching.ObjectCache/src/CacheEntry.cs
+++ b/lib/VNLib.Data.Caching.ObjectCache/src/CacheEntry.cs
@@ -191,10 +191,8 @@ namespace VNLib.Data.Caching
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public readonly Span<byte> GetDataSegment()
{
- //Get the actual length of the segment
- uint length = GetLength();
//Get the segment from its begining offset and
- return _manager.GetSpan(_handle, DATA_SEGMENT_START, length);
+ return _manager.GetSpan(_handle, DATA_SEGMENT_START, GetLength());
}
/// <summary>
diff --git a/lib/VNLib.Data.Caching/src/ClientExtensions.cs b/lib/VNLib.Data.Caching/src/ClientExtensions.cs
index a2ec27d..8273486 100644
--- a/lib/VNLib.Data.Caching/src/ClientExtensions.cs
+++ b/lib/VNLib.Data.Caching/src/ClientExtensions.cs
@@ -45,7 +45,7 @@ namespace VNLib.Data.Caching
public static class ClientExtensions
{
- private static readonly JsonCacheObjectSerializer DefaultSerializer = new();
+ private static readonly JsonCacheObjectSerializer DefaultSerializer = new(256);
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static void LogDebug(this FBMClient client, string message, params object?[] args)
@@ -192,7 +192,7 @@ namespace VNLib.Data.Caching
/// <exception cref="InvalidResponseException"></exception>
/// <exception cref="MessageTooLargeException"></exception>
/// <exception cref="ObjectNotFoundException"></exception>
- public async static Task AddOrUpdateObjectAsync<T>(this FBMClient client, string objectId, string? newId, ObjectDataReader<T> callback, T state, CancellationToken cancellationToken = default)
+ public static async Task AddOrUpdateObjectAsync<T>(this FBMClient client, string objectId, string? newId, ObjectDataReader<T> callback, T state, CancellationToken cancellationToken = default)
{
_ = client ?? throw new ArgumentNullException(nameof(client));
_ = callback ?? throw new ArgumentNullException(nameof(callback));
@@ -306,7 +306,7 @@ namespace VNLib.Data.Caching
return deserialzer.Deserialize<T>(response.ResponseBody);
}
- //Object may not exist on the server yet
+ //Object may not exist on the server yet
if (status.Value.Equals(ResponseCodes.NotFound, StringComparison.Ordinal))
{
return default;
diff --git a/lib/VNLib.Data.Caching/src/ClientRetryManager.cs b/lib/VNLib.Data.Caching/src/ClientRetryManager.cs
index 5cee583..b7a5f2a 100644
--- a/lib/VNLib.Data.Caching/src/ClientRetryManager.cs
+++ b/lib/VNLib.Data.Caching/src/ClientRetryManager.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2022 Vaughn Nugent
+* Copyright (c) 2023 Vaughn Nugent
*
* Library: VNLib
* Package: VNLib.Data.Caching
@@ -32,7 +32,7 @@ using VNLib.Net.Messaging.FBM.Client;
namespace VNLib.Data.Caching
{
/// <summary>
- /// Manages a <see cref="FBMClientWorkerBase"/> reconnect policy
+ /// Manages a <see cref="IStatefulConnection"/> reconnect policy
/// </summary>
public class ClientRetryManager<T> : VnDisposeable where T: IStatefulConnection
{
@@ -62,6 +62,7 @@ namespace VNLib.Data.Caching
/// Raised before client is to be reconnected
/// </summary>
public event Action<T>? OnBeforeReconnect;
+
/// <summary>
/// Raised when the client fails to reconnect. Should return a value that instructs the
/// manager to reconnect
diff --git a/lib/VNLib.Data.Caching/src/JsonCacheObjectSerializer.cs b/lib/VNLib.Data.Caching/src/JsonCacheObjectSerializer.cs
index 85d1184..09fbd6c 100644
--- a/lib/VNLib.Data.Caching/src/JsonCacheObjectSerializer.cs
+++ b/lib/VNLib.Data.Caching/src/JsonCacheObjectSerializer.cs
@@ -23,12 +23,13 @@
*/
using System;
+
+using System.IO;
using System.Buffers;
+using System.Threading;
using System.Text.Json;
using System.Text.Json.Serialization;
-using VNLib.Utils.Memory.Caching;
-
namespace VNLib.Data.Caching
{
/// <summary>
@@ -37,8 +38,7 @@ namespace VNLib.Data.Caching
/// </summary>
public class JsonCacheObjectSerializer : ICacheObjectSerializer, ICacheObjectDeserializer
{
- //Create threadlocal writer for attempted lock-free writer reuse
- private static readonly ObjectRental<ReusableJsonWriter> JsonWriterPool = ObjectRental.CreateThreadLocal<ReusableJsonWriter>();
+ private static readonly ThreadLocal<Utf8JsonWriter> _writer = new(static () => new(Stream.Null));
private readonly JsonSerializerOptions? _options;
@@ -46,16 +46,13 @@ namespace VNLib.Data.Caching
/// Initializes a new <see cref="JsonCacheObjectSerializer"/>
/// </summary>
/// <param name="options">JSON serialization/deserialization options</param>
- public JsonCacheObjectSerializer(JsonSerializerOptions options)
- {
- _options = options;
- }
+ public JsonCacheObjectSerializer(JsonSerializerOptions options) => _options = options;
/// <summary>
/// Initializes a new <see cref="JsonCacheObjectSerializer"/> using
/// the default serialization rules
/// </summary>
- public JsonCacheObjectSerializer()
+ public JsonCacheObjectSerializer(int bufferSize)
{
//Configure default serialzation options
_options = new()
@@ -68,9 +65,7 @@ namespace VNLib.Data.Caching
IgnoreReadOnlyFields = true,
PropertyNameCaseInsensitive = true,
IncludeFields = false,
-
- //Use small buffers
- DefaultBufferSize = 128
+ DefaultBufferSize = bufferSize
};
}
@@ -80,17 +75,23 @@ namespace VNLib.Data.Caching
///<inheritdoc/>
public virtual void Serialize<T>(T obj, IBufferWriter<byte> finiteWriter)
{
- //Rent new json writer
- ReusableJsonWriter writer = JsonWriterPool.Rent();
+ //Read thread-local writer
+ Utf8JsonWriter localWriter = _writer.Value!;
+ //Init the writer with the new buffer writer
+ localWriter.Reset(finiteWriter);
try
{
- //Serialize the message
- writer.Serialize(finiteWriter, obj, _options);
+ //Serialize message
+ JsonSerializer.Serialize(localWriter, obj, _options);
+
+ //Flush writer to underlying buffer
+ localWriter.Flush();
}
finally
{
- JsonWriterPool.Return(writer);
+ //Unlink the writer
+ localWriter.Reset(Stream.Null);
}
}
}
diff --git a/lib/VNLib.Data.Caching/src/ReusableJsonWriter.cs b/lib/VNLib.Data.Caching/src/ReusableJsonWriter.cs
deleted file mode 100644
index c763f91..0000000
--- a/lib/VNLib.Data.Caching/src/ReusableJsonWriter.cs
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
-* Copyright (c) 2022 Vaughn Nugent
-*
-* Library: VNLib
-* Package: VNLib.Data.Caching
-* File: ReusableJsonWriter.cs
-*
-* ReusableJsonWriter.cs is part of VNLib.Data.Caching which is part of the larger
-* VNLib collection of libraries and utilities.
-*
-* VNLib.Data.Caching is free software: you can redistribute it and/or modify
-* it under the terms of the GNU Affero General Public License as
-* published by the Free Software Foundation, either version 3 of the
-* License, or (at your option) any later version.
-*
-* VNLib.Data.Caching is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-* GNU Affero General Public License for more details.
-*
-* You should have received a copy of the GNU Affero General Public License
-* along with this program. If not, see https://www.gnu.org/licenses/.
-*/
-
-using System.IO;
-using System.Buffers;
-using System.Text.Json;
-
-using VNLib.Utils;
-
-namespace VNLib.Data.Caching
-{
- internal sealed class ReusableJsonWriter : VnDisposeable
- {
- private readonly Utf8JsonWriter _writer;
-
- public ReusableJsonWriter()
- {
- _writer = new(Stream.Null);
- }
-
- /// <summary>
- /// Serializes the message and writes the serialzied data to the buffer writer
- /// </summary>
- /// <typeparam name="T"></typeparam>
- /// <param name="writer">The buffer writer to store data at</param>
- /// <param name="value">The object to serialize</param>
- /// <param name="options">Optional - serializer options</param>
- public void Serialize<T>(IBufferWriter<byte> writer, T value, JsonSerializerOptions? options = null)
- {
- //Init the writer with the new buffer writer
- _writer.Reset(writer);
- try
- {
- //Serialize message
- JsonSerializer.Serialize(_writer, value, options);
- //Flush writer to underlying buffer
- _writer.Flush();
- }
- finally
- {
- //Unlink the writer
- _writer.Reset(Stream.Null);
- }
- }
-
- protected override void Free() => _writer.Dispose();
- }
-}
diff --git a/lib/VNLib.Plugins.Extensions.VNCache/src/DataModel/EntityCacheExtensions.cs b/lib/VNLib.Plugins.Extensions.VNCache/src/DataModel/EntityCacheExtensions.cs
index bf2fa2a..562c220 100644
--- a/lib/VNLib.Plugins.Extensions.VNCache/src/DataModel/EntityCacheExtensions.cs
+++ b/lib/VNLib.Plugins.Extensions.VNCache/src/DataModel/EntityCacheExtensions.cs
@@ -107,15 +107,88 @@ namespace VNLib.Plugins.Extensions.VNCache.DataModel
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="cache"></param>
+ /// <param name="bufferSize">The default serializer buffer size</param>
/// <returns>The new <see cref="IEntityCache{T}"/> wrapper using json serialization</returns>
/// <exception cref="ArgumentNullException"></exception>
- public static IEntityCache<T> CreateJsonEntityCache<T>(this IGlobalCacheProvider cache) where T: class
+ public static IEntityCache<T> CreateJsonEntityCache<T>(this IGlobalCacheProvider cache, int bufferSize) where T: class
{
_ = cache ?? throw new ArgumentNullException(nameof(cache));
- JsonCacheObjectSerializer json = new();
+ JsonCacheObjectSerializer json = new(bufferSize);
return CreateEntityCache<T>(cache, json, json);
}
+ /// <summary>
+ /// Attemts to recover an entity from cache if possible, if a miss occurs, the
+ /// factory function is called to produce a value from a backing store. If the store
+ /// returns a result it is writen back to the cache before this method returns
+ /// </summary>
+ /// <typeparam name="T"></typeparam>
+ /// <param name="cache"></param>
+ /// <param name="id">The id of the entity to get or laod</param>
+ /// <param name="factory">The factory callback function to produce a value when a cache miss occurs</param>
+ /// <param name="cancellation">A token to cancel the operation</param>
+ /// <returns>A task that completes by returning the entity</returns>
+ /// <exception cref="ArgumentNullException"></exception>
+ public static async Task<T?> GetOrLoadAsync<T>(this IEntityCache<T> cache, string id, Func<string, Task<T?>> factory, CancellationToken cancellation = default) where T : class
+ {
+ _ = cache ?? throw new ArgumentNullException(nameof(cache));
+ _ = id ?? throw new ArgumentNullException(nameof(id));
+ _ = factory ?? throw new ArgumentNullException(nameof(factory));
+
+ //try to load the value from cache
+ T? record = await cache.GetAsync(id, cancellation);
+
+ //If record was not found in cache, load it from the factory
+ if (record is null)
+ {
+ record = await factory(id);
+
+ //If new record found, write to cache
+ if (record is not null)
+ {
+ await cache.UpsertAsync(id, record, cancellation);
+ }
+ }
+
+ return record;
+ }
+
+ /// <summary>
+ /// Attemts to recover an entity from cache if possible, if a miss occurs, the
+ /// factory function is called to produce a value from a backing store. If the store
+ /// returns a result it is writen back to the cache before this method returns
+ /// </summary>
+ /// <typeparam name="T"></typeparam>
+ /// <param name="cache"></param>
+ /// <param name="id">The id of the entity to get or laod</param>
+ /// <param name="factory">The factory callback function to produce a value when a cache miss occurs</param>
+ /// <param name="cancellation">A token to cancel the operation</param>
+ /// <returns>A task that completes by returning the entity</returns>
+ /// <exception cref="ArgumentNullException"></exception>
+ public static async Task<T?> GetOrLoadAsync<T>(this IEntityCache<T> cache, string id, Func<string, CancellationToken, Task<T?>> factory, CancellationToken cancellation = default) where T : class
+ {
+ _ = cache ?? throw new ArgumentNullException(nameof(cache));
+ _ = id ?? throw new ArgumentNullException(nameof(id));
+ _ = factory ?? throw new ArgumentNullException(nameof(factory));
+
+ //try to load the value from cache
+ T? record = await cache.GetAsync(id, cancellation);
+
+ //If record was not found in cache, load it from the factory
+ if (record is null)
+ {
+ record = await factory(id, cancellation);
+
+ //If new record found, write to cache
+ if(record is not null)
+ {
+ await cache.UpsertAsync(id, record, cancellation);
+ }
+ }
+
+ return record;
+ }
+
private sealed class EntityCacheImpl<T> : IEntityCache<T> where T : class
{
private readonly IGlobalCacheProvider _cacheProvider;
diff --git a/plugins/ObjectCacheServer/src/Cache/CacheSystemUtil.cs b/plugins/ObjectCacheServer/src/Cache/CacheSystemUtil.cs
index a02fe75..b7bf83f 100644
--- a/plugins/ObjectCacheServer/src/Cache/CacheSystemUtil.cs
+++ b/plugins/ObjectCacheServer/src/Cache/CacheSystemUtil.cs
@@ -25,10 +25,8 @@
using System;
using System.IO;
using System.Text.Json;
-using System.Collections;
-using System.Collections.Generic;
-using System.Runtime.CompilerServices;
+using VNLib.Utils.Resources;
using VNLib.Plugins;
using VNLib.Plugins.Extensions.Loading;
@@ -38,8 +36,8 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache
{
const string PERSISTANT_ASM_CONFIF_KEY = "persistant_cache_asm";
const string USER_CACHE_ASM_CONFIG_KEY = "custom_cache_impl_asm";
- const string LOAD_METHOD_NAME = "OnRuntimeLoad";
- const string TEARDOWN_METHOD_NAME = "OnSystemDetach";
+ const string INITIALIZE_METHOD_NAME = "OnInitializeForBucket";
+ const string LOAD_METHOD_NAME = "OnRuntimeLoad";
/// <summary>
/// Loads the <see cref="IBlobCacheTable"/> implementation (dynamic or default) into the process
@@ -53,8 +51,12 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache
/// <exception cref="FileNotFoundException"></exception>
public static IBlobCacheTable LoadMemoryCacheSystem(this PluginBase plugin, IConfigScope config, ICacheMemoryManagerFactory heap, CacheConfiguration cacheConf)
{
+#pragma warning disable CA2000 // Dispose objects before losing scope
+
//First, try to load persitant cache store
- PersistantCacheManager? pCManager = GetPersistantStore(plugin, config);
+ IPersistantCacheStore? pCManager = GetPersistantStore(plugin, config);
+
+#pragma warning restore CA2000 // Dispose objects before losing scope
IBlobCacheTable table;
@@ -64,46 +66,42 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache
string asmName = customEl.GetString() ?? throw new FileNotFoundException("User defined a custom blob cache assembly but the file name was null");
//Return the runtime loaded table
- table = LoadCustomMemCacheTable(plugin, asmName, pCManager);
+ table = plugin.CreateServiceExternal<IBlobCacheTable>(asmName);
+
+ //Try to get the load method and pass the persistant cache instance
+ ManagedLibrary.TryGetMethod<Action<PluginBase, IPersistantCacheStore?>>(table, LOAD_METHOD_NAME)?.Invoke(plugin, pCManager);
}
else
{
//Default type
- table = GetInternalBlobCache(heap, cacheConf, pCManager);
+ table = new BlobCacheTable(cacheConf.BucketCount, cacheConf.MaxCacheEntries, heap, pCManager);
}
- //Initialize the subsystem from the cache table
- pCManager?.InitializeSubsystem(table);
+ if(pCManager != null)
+ {
+ //Initialize the subsystem from the cache table
+ InitializeSubsystem(pCManager, table);
+ }
return table;
}
- private static IBlobCacheTable GetInternalBlobCache(ICacheMemoryManagerFactory heap, CacheConfiguration config, IPersistantCacheStore? store)
- {
- return new BlobCacheTable(config.BucketCount, config.MaxCacheEntries, heap, store);
- }
-
- private static IBlobCacheTable LoadCustomMemCacheTable(PluginBase plugin, string asmName, IPersistantCacheStore? store)
+ private static void InitializeSubsystem(IPersistantCacheStore store, IBlobCacheTable table)
{
- //Load the custom assembly
- AssemblyLoader<IBlobCacheTable> customTable = plugin.LoadAssembly<IBlobCacheTable>(asmName);
+ //Try to get the Initialize method
+ Action<uint>? initMethod = ManagedLibrary.TryGetMethod<Action<uint>>(store, INITIALIZE_METHOD_NAME);
- try
+ if(initMethod != null)
{
- //Try get onload method and pass the persistant cache instance
- Action<PluginBase, IPersistantCacheStore?>? onLoad = customTable.TryGetMethod<Action<PluginBase, IPersistantCacheStore?>>(LOAD_METHOD_NAME);
- onLoad?.Invoke(plugin, store);
- }
- catch
- {
- customTable.Dispose();
- throw;
+ //Itterate all buckets
+ foreach (IBlobCacheBucket bucket in table)
+ {
+ initMethod.Invoke(bucket.Id);
+ }
}
-
- return new RuntimeBlobCacheTable(customTable);
}
- private static PersistantCacheManager? GetPersistantStore(PluginBase plugin, IConfigScope config)
+ private static IPersistantCacheStore? GetPersistantStore(PluginBase plugin, IConfigScope config)
{
//Get the persistant assembly
if (!config.TryGetValue(PERSISTANT_ASM_CONFIF_KEY, out JsonElement asmEl))
@@ -112,130 +110,14 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache
}
string? asmName = asmEl.GetString();
- if (asmName == null)
+ if (string.IsNullOrWhiteSpace(asmName))
{
return null;
}
- //Load the dynamic assembly into the alc
- AssemblyLoader<IPersistantCacheStore> loader = plugin.LoadAssembly<IPersistantCacheStore>(asmName);
- try
- {
- //Call the OnLoad method
- Action<PluginBase, IConfigScope>? loadMethod = loader.TryGetMethod<Action<PluginBase, IConfigScope>>(LOAD_METHOD_NAME);
-
- loadMethod?.Invoke(plugin, config);
- }
- catch
- {
- loader.Dispose();
- throw;
- }
-
//Return the
- return new(loader);
- }
-
-
- private sealed class RuntimeBlobCacheTable : IBlobCacheTable
- {
-
- private readonly IBlobCacheTable _table;
- private readonly Action? OnDetatch;
-
- public RuntimeBlobCacheTable(AssemblyLoader<IBlobCacheTable> loader)
- {
- OnDetatch = loader.TryGetMethod<Action>(TEARDOWN_METHOD_NAME);
- _table = loader.Resource;
- }
-
- public void Dispose()
- {
- //We can let the loader dispose the cache table, but we can notify of detatch
- OnDetatch?.Invoke();
- }
-
-
- ///<inheritdoc/>
- [MethodImpl(MethodImplOptions.AggressiveInlining)]
- IBlobCacheBucket IBlobCacheTable.GetBucket(ReadOnlySpan<char> objectId) => _table.GetBucket(objectId);
-
- ///<inheritdoc/>
- [MethodImpl(MethodImplOptions.AggressiveInlining)]
- public IEnumerator<IBlobCacheBucket> GetEnumerator() => _table.GetEnumerator();
-
- ///<inheritdoc/>
- [MethodImpl(MethodImplOptions.AggressiveInlining)]
- IEnumerator IEnumerable.GetEnumerator() => ((IEnumerable)_table).GetEnumerator();
- }
-
- internal sealed class PersistantCacheManager : IPersistantCacheStore
- {
- const string INITIALIZE_METHOD_NAME = "OnInitializeForBucket";
-
-
- /*
- * Our referrence can be technically unloaded, but so will
- * this instance, since its loaded into the current ALC, so
- * this referrence may exist for the lifetime of this instance.
- *
- * It also implements IDisposable, which the assembly loader class
- * will call when this plugin is unloaded, we dont need to call
- * it here, but we can signal a detach.
- *
- * Since the store implements IDisposable, its likely going to
- * check for dispose on each call, so we don't need to add
- * and additional disposed check since the method calls must be fast.
- */
-
- private readonly IPersistantCacheStore store;
-
- private readonly Action<uint>? InitMethod;
- private readonly Action? OnServiceDetatch;
-
- public PersistantCacheManager(AssemblyLoader<IPersistantCacheStore> loader)
- {
- //Try to get the Initialize method
- InitMethod = loader.TryGetMethod<Action<uint>>(INITIALIZE_METHOD_NAME);
-
- //Get the optional detatch method
- OnServiceDetatch = loader.TryGetMethod<Action>(TEARDOWN_METHOD_NAME);
-
- store = loader.Resource;
- }
-
- /// <summary>
- /// Optionally initializes the backing store by publishing the table's bucket
- /// id's so it's made aware of the memory cache bucket system.
- /// </summary>
- /// <param name="table">The table containing buckets to publish</param>
- public void InitializeSubsystem(IBlobCacheTable table)
- {
- //Itterate all buckets
- foreach (IBlobCacheBucket bucket in table)
- {
- InitMethod?.Invoke(bucket.Id);
- }
- }
-
- void IDisposable.Dispose()
- {
- //Assembly loader will dispose the type, we can just signal a detach
-
- OnServiceDetatch?.Invoke();
- }
-
- [MethodImpl(MethodImplOptions.AggressiveInlining)]
- bool IPersistantCacheStore.OnCacheMiss(uint bucketId, string key, IMemoryCacheEntryFactory factory, out CacheEntry entry)
- {
- return store.OnCacheMiss(bucketId, key, factory, out entry);
- }
-
- [MethodImpl(MethodImplOptions.AggressiveInlining)]
- void IPersistantCacheStore.OnEntryDeleted(uint bucketId, string key) => store.OnEntryDeleted(bucketId, key);
-
- [MethodImpl(MethodImplOptions.AggressiveInlining)]
- void IPersistantCacheStore.OnEntryEvicted(uint bucketId, string key, in CacheEntry entry) => store.OnEntryEvicted(bucketId, key, in entry);
+ return plugin.CreateServiceExternal<IPersistantCacheStore>(asmName);
}
+
}
}
diff --git a/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs b/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs
index dbfd091..a240dde 100644
--- a/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs
+++ b/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs
@@ -65,7 +65,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
private readonly NodeConfig _nodeConfig;
private readonly ICacheStore _cacheStore;
private readonly ICachePeerAdapter _peerAdapter;
- private readonly FBMClientConfig _replicationClientConfig;
+ private readonly FBMClientFactory _clientFactory;
private readonly bool _isDebug;
@@ -79,12 +79,16 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
_peerAdapter = plugin.GetOrCreateSingleton<PeerDiscoveryManager>();
//Init fbm config with fixed message size
- _replicationClientConfig = FBMDataCacheExtensions.GetDefaultConfig(
+ FBMClientConfig clientConfig = FBMDataCacheExtensions.GetDefaultConfig(
(plugin as ObjectCacheServerEntry)!.ListenerHeap,
MAX_MESSAGE_SIZE,
debugLog: plugin.IsDebug() ? plugin.Log : null
);
+ //Init ws fallback factory and client factory
+ FBMFallbackClientWsFactory wsFactory = new();
+ _clientFactory = new(in clientConfig, wsFactory);
+
_plugin = plugin;
_isDebug = plugin.IsDebug();
_log = plugin.Log.CreateScope(LOG_SCOPE_NAME);
@@ -149,7 +153,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
_ = newPeer ?? throw new ArgumentNullException(nameof(newPeer));
//Setup client
- FBMClient client = new(_replicationClientConfig);
+ FBMClient client = _clientFactory.CreateClient();
//Add peer to monitor
_peerAdapter.OnPeerListenerAttached(newPeer);
diff --git a/plugins/VNLib.Data.Caching.Providers.Redis/src/RedisClientCacheEntry.cs b/plugins/VNLib.Data.Caching.Providers.Redis/src/RedisClientCacheEntry.cs
index 72e6020..7f2f3ed 100644
--- a/plugins/VNLib.Data.Caching.Providers.Redis/src/RedisClientCacheEntry.cs
+++ b/plugins/VNLib.Data.Caching.Providers.Redis/src/RedisClientCacheEntry.cs
@@ -69,8 +69,8 @@ namespace VNLib.Data.Caching.Providers.Redis
public RedisClientCacheEntry(PluginBase plugin, IConfigScope config)
{
_defaultHeap = MemoryUtil.Shared;
- DefaultDeserializer = new JsonCacheObjectSerializer();
- DefaultSerializer = new JsonCacheObjectSerializer();
+ DefaultDeserializer = new JsonCacheObjectSerializer(256);
+ DefaultSerializer = new JsonCacheObjectSerializer(256);
ILogProvider redisLog = plugin.Log.CreateScope("REDIS");
diff --git a/plugins/VNLib.Data.Caching.Providers.VNCache/src/FBMCacheClient.cs b/plugins/VNLib.Data.Caching.Providers.VNCache/src/FBMCacheClient.cs
index 5fbebcd..e21cf4a 100644
--- a/plugins/VNLib.Data.Caching.Providers.VNCache/src/FBMCacheClient.cs
+++ b/plugins/VNLib.Data.Caching.Providers.VNCache/src/FBMCacheClient.cs
@@ -59,13 +59,16 @@ namespace VNLib.Data.Caching.Providers.VNCache
private readonly VnCacheClientConfig _config;
private readonly IClusterNodeIndex _index;
+ private readonly FBMClientFactory _clientFactory;
+ private readonly TimeSpan _initNodeDelay;
private bool _isConnected;
+ private FBMClient? _client;
/// <summary>
- /// The internal client
+ /// The internal heap used for FBMClients
/// </summary>
- public FBMClient Client { get; }
+ public IUnmangedHeap BufferHeap { get; } = MemoryUtil.Shared;
/// <summary>
/// Gets a value that determines if the client is currently connected to a server
@@ -73,30 +76,30 @@ namespace VNLib.Data.Caching.Providers.VNCache
public override bool IsConnected => _isConnected;
public FBMCacheClient(PluginBase plugin, IConfigScope config)
- : this(
- config.Deserialze<VnCacheClientConfig>(),
- plugin.IsDebug() ? plugin.Log : null
- )
+ : this(
+ config.Deserialze<VnCacheClientConfig>(),
+ plugin.IsDebug() ? plugin.Log : null
+ )
{
ILogProvider scoped = plugin.Log.CreateScope(LOG_NAME);
//Set authenticator and error handler
- Client.GetCacheConfiguration()
+ _clientFactory.GetCacheConfiguration()
.WithAuthenticator(new AuthManager(plugin))
.WithErrorHandler(new DiscoveryErrHAndler(scoped));
//Only the master index is schedulable
- if(_index is IIntervalScheduleable sch)
+ if (_index is IIntervalScheduleable sch)
{
//Schedule discovery interval
plugin.ScheduleInterval(sch, _config.DiscoveryInterval);
//Run discovery after initial delay if interval is greater than initial delay
- if (_config.DiscoveryInterval > InitialDelay)
+ if (_config.DiscoveryInterval > _initNodeDelay)
{
//Run a manual initial load
- scoped.Information("Running initial discovery in {delay}", InitialDelay);
- _ = plugin.ObserveWork(() => sch.OnIntervalAsync(scoped, plugin.UnloadToken), (int)InitialDelay.TotalMilliseconds);
+ scoped.Information("Running initial discovery in {delay}", _initNodeDelay);
+ _ = plugin.ObserveWork(() => sch.OnIntervalAsync(scoped, plugin.UnloadToken), (int)_initNodeDelay.TotalMilliseconds);
}
}
}
@@ -108,18 +111,22 @@ namespace VNLib.Data.Caching.Providers.VNCache
_config = config;
+ //Set a default node delay if null
+ _initNodeDelay = _config.InitialNodeDelay.HasValue ? TimeSpan.FromSeconds(_config.InitialNodeDelay.Value) : InitialDelay;
+
//Init the client with default settings
- FBMClientConfig conf = FBMDataCacheExtensions.GetDefaultConfig(MemoryUtil.Shared, (int)config.MaxBlobSize, config.RequestTimeout, debugLog);
+ FBMClientConfig conf = FBMDataCacheExtensions.GetDefaultConfig(BufferHeap, (int)config.MaxBlobSize, config.RequestTimeout, debugLog);
- Client = new(conf);
+ FBMFallbackClientWsFactory wsFactory = new();
+ _clientFactory = new(in conf, wsFactory);
//Add the configuration to the client
- Client.GetCacheConfiguration()
+ _clientFactory.GetCacheConfiguration()
.WithTls(config.UseTls)
.WithInitialPeers(config.GetInitialNodeUris());
//Init index
- _index = ClusterNodeIndex.CreateIndex(Client.GetCacheConfiguration());
+ _index = ClusterNodeIndex.CreateIndex(_clientFactory.GetCacheConfiguration());
}
/*
@@ -135,13 +142,17 @@ namespace VNLib.Data.Caching.Providers.VNCache
{
//Initial delay
pluginLog.Debug("Worker started, waiting for startup delay");
- await Task.Delay((int)InitialDelay.TotalMilliseconds + 1000, exitToken);
+ await Task.Delay(_initNodeDelay, exitToken);
CacheNodeAdvertisment? node = null;
while (true)
{
- //Check for master index
+ /*
+ * The cache node index is shared across plugin boundries. If the current
+ * instance is holding the master index, it will be scheduleable, and
+ * can be manually invoked if no nodes are found
+ */
if (_index is IIntervalScheduleable sch)
{
try
@@ -204,8 +215,8 @@ namespace VNLib.Data.Caching.Providers.VNCache
{
pluginLog.Debug("Connecting to {node}", node);
- //Connect to the node
- await Client.ConnectToCacheAsync(node, exitToken);
+ //Connect to the node and save new client
+ _client = await _clientFactory.ConnectToCacheAsync(node, exitToken);
if (pluginLog.IsEnabled(LogLevel.Debug))
{
@@ -220,7 +231,7 @@ namespace VNLib.Data.Caching.Providers.VNCache
_isConnected = true;
//Wait for disconnect
- await Client.WaitForExitAsync(exitToken);
+ await _client.WaitForExitAsync(exitToken);
pluginLog.Information("Cache server disconnected");
}
@@ -248,6 +259,9 @@ namespace VNLib.Data.Caching.Providers.VNCache
finally
{
_isConnected = false;
+
+ //Cleanup client
+ _client?.Dispose();
}
//Loop again
@@ -265,11 +279,7 @@ namespace VNLib.Data.Caching.Providers.VNCache
{
pluginLog.Error(ex, "Unhandled exception occured in background cache client listening task");
}
- finally
- {
- //Dispose the client on exit
- Client.Dispose();
- }
+
pluginLog.Information("Cache client exited");
}
@@ -279,43 +289,43 @@ namespace VNLib.Data.Caching.Providers.VNCache
{
return !IsConnected
? throw new InvalidOperationException("The underlying client is not connected to a cache node")
- : Client!.DeleteObjectAsync(key, cancellation);
+ : _client!.DeleteObjectAsync(key, cancellation);
}
///<inheritdoc/>
public override Task<T> GetAsync<T>(string key, ICacheObjectDeserializer deserializer, CancellationToken cancellation)
{
return !IsConnected
- ? throw new InvalidOperationException("The underlying client is not connected to a cache node")
- : Client!.GetObjectAsync<T>(key, deserializer, cancellation);
+ ? throw new InvalidOperationException("The underlying client is not connected to a cache node")
+ : _client!.GetObjectAsync<T>(key, deserializer, cancellation);
}
///<inheritdoc/>
public override Task AddOrUpdateAsync<T>(string key, string? newKey, T value, ICacheObjectSerializer serialzer, CancellationToken cancellation)
{
return !IsConnected
- ? throw new InvalidOperationException("The underlying client is not connected to a cache node")
- : Client!.AddOrUpdateObjectAsync(key, newKey, value, serialzer, cancellation);
+ ? throw new InvalidOperationException("The underlying client is not connected to a cache node")
+ : _client!.AddOrUpdateObjectAsync(key, newKey, value, serialzer, cancellation);
}
///<inheritdoc/>
public override Task GetAsync<T>(string key, ObjectDataSet<T> callback, T state, CancellationToken cancellation)
{
return !IsConnected
- ? throw new InvalidOperationException("The underlying client is not connected to a cache node")
- : Client!.GetObjectAsync(key, callback, state, cancellation);
+ ? throw new InvalidOperationException("The underlying client is not connected to a cache node")
+ : _client!.GetObjectAsync(key, callback, state, cancellation);
}
///<inheritdoc/>
public override Task AddOrUpdateAsync<T>(string key, string? newKey, ObjectDataReader<T> callback, T state, CancellationToken cancellation)
{
return !IsConnected
- ? throw new InvalidOperationException("The underlying client is not connected to a cache node")
- : Client!.AddOrUpdateObjectAsync(key, newKey, callback, state, cancellation);
+ ? throw new InvalidOperationException("The underlying client is not connected to a cache node")
+ : _client!.AddOrUpdateObjectAsync(key, newKey, callback, state, cancellation);
}
///<inheritdoc/>
- public override object GetUnderlyingStore() => Client; //Client is the underlying "store"
+ public override object GetUnderlyingStore() => _client ?? throw new InvalidOperationException("The client is not currently connected");
private sealed class AuthManager : ICacheAuthManager
{
diff --git a/plugins/VNLib.Data.Caching.Providers.VNCache/src/MemoryCache.cs b/plugins/VNLib.Data.Caching.Providers.VNCache/src/MemoryCache.cs
index 79348f0..98f6a3d 100644
--- a/plugins/VNLib.Data.Caching.Providers.VNCache/src/MemoryCache.cs
+++ b/plugins/VNLib.Data.Caching.Providers.VNCache/src/MemoryCache.cs
@@ -26,6 +26,7 @@ using System;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
+
using VNLib.Utils.Memory;
using VNLib.Utils.Logging;
using VNLib.Utils.Memory.Diagnostics;
diff --git a/plugins/VNLib.Data.Caching.Providers.VNCache/src/RemoteBackedMemoryCache.cs b/plugins/VNLib.Data.Caching.Providers.VNCache/src/RemoteBackedMemoryCache.cs
index 2068805..c7952b4 100644
--- a/plugins/VNLib.Data.Caching.Providers.VNCache/src/RemoteBackedMemoryCache.cs
+++ b/plugins/VNLib.Data.Caching.Providers.VNCache/src/RemoteBackedMemoryCache.cs
@@ -100,7 +100,7 @@ namespace VNLib.Data.Caching.Providers.VNCache
_memCache = new BlobCacheTable(memCache.TableSize, memCache.BucketSize, factory, null);
//If backing store is a VnCacheClient, steal it's buffer heap
- _bufferHeap = backingStore is FBMCacheClient client && client.Client.Config.MemoryManager.TryGetHeap(out IUnmangedHeap? heap) ? heap : MemoryUtil.Shared;
+ _bufferHeap = backingStore is FBMCacheClient client ? client.BufferHeap : MemoryUtil.Shared;
_cacheConfig = memCache;
_backing = backingStore;
diff --git a/plugins/VNLib.Data.Caching.Providers.VNCache/src/VNCacheBase.cs b/plugins/VNLib.Data.Caching.Providers.VNCache/src/VNCacheBase.cs
index fde2e44..f8a9ca6 100644
--- a/plugins/VNLib.Data.Caching.Providers.VNCache/src/VNCacheBase.cs
+++ b/plugins/VNLib.Data.Caching.Providers.VNCache/src/VNCacheBase.cs
@@ -41,8 +41,8 @@ namespace VNLib.Data.Caching.Providers.VNCache
protected VNCacheBase(VNCacheConfig config)
{
//Set default serializers
- DefaultDeserializer = config.CacheObjectDeserializer ?? new JsonCacheObjectSerializer();
- DefaultSerializer = config.CacheObjectSerializer ?? new JsonCacheObjectSerializer();
+ DefaultDeserializer = config.CacheObjectDeserializer ?? new JsonCacheObjectSerializer(256);
+ DefaultSerializer = config.CacheObjectSerializer ?? new JsonCacheObjectSerializer(256);
}
///<inheritdoc/>
diff --git a/plugins/VNLib.Data.Caching.Providers.VNCache/src/VnCacheClientConfig.cs b/plugins/VNLib.Data.Caching.Providers.VNCache/src/VnCacheClientConfig.cs
index 9a21c79..383c979 100644
--- a/plugins/VNLib.Data.Caching.Providers.VNCache/src/VnCacheClientConfig.cs
+++ b/plugins/VNLib.Data.Caching.Providers.VNCache/src/VnCacheClientConfig.cs
@@ -69,6 +69,12 @@ namespace VNLib.Data.Caching.Providers.VNCache
internal TimeSpan RequestTimeout => TimeSpan.FromSeconds(RequestTimeoutSeconds!.Value);
/// <summary>
+ /// The time in milliseconds for the initial node delay
+ /// </summary>
+ [JsonPropertyName("intial_delay_ms")]
+ public uint? InitialNodeDelay { get; set; }
+
+ /// <summary>
/// The initial peers to connect to
/// </summary>
[JsonPropertyName("initial_nodes")]