aboutsummaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorLibravatar vnugent <public@vaughnnugent.com>2023-11-29 00:17:13 -0500
committerLibravatar vnugent <public@vaughnnugent.com>2023-11-29 00:17:13 -0500
commite25666bbf408ff33c09dc8e2c5fe2d052363595f (patch)
treece7a7290e8f013afdc38529b1be7fd8de0827ffd /lib
parent78a47dd6887fbbe33d9526194b6af300a72448fa (diff)
Integrate FBM core changes for immutable client instances
Diffstat (limited to 'lib')
-rw-r--r--lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs42
-rw-r--r--lib/VNLib.Data.Caching.ObjectCache/src/CacheEntry.cs4
-rw-r--r--lib/VNLib.Data.Caching/src/ClientExtensions.cs6
-rw-r--r--lib/VNLib.Data.Caching/src/ClientRetryManager.cs5
-rw-r--r--lib/VNLib.Data.Caching/src/JsonCacheObjectSerializer.cs35
-rw-r--r--lib/VNLib.Data.Caching/src/ReusableJsonWriter.cs69
-rw-r--r--lib/VNLib.Plugins.Extensions.VNCache/src/DataModel/EntityCacheExtensions.cs77
7 files changed, 127 insertions, 111 deletions
diff --git a/lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs b/lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs
index 708b3f5..bd86461 100644
--- a/lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs
+++ b/lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs
@@ -88,7 +88,7 @@ namespace VNLib.Data.Caching.Extensions
return adapter;
}
- private static readonly ConditionalWeakTable<FBMClient, CacheClientConfiguration> ClientCacheConfig = new();
+ private static readonly ConditionalWeakTable<FBMClientFactory, CacheClientConfiguration> ClientCacheConfig = new();
/// <summary>
/// Gets a <see cref="FBMClientConfig"/> preconfigured object caching
@@ -358,7 +358,7 @@ namespace VNLib.Data.Caching.Extensions
/// </summary>
/// <param name="client"></param>
/// <returns>A fluent api configuration builder for the current client</returns>
- public static CacheClientConfiguration GetCacheConfiguration(this FBMClient client) => ClientCacheConfig.GetOrCreateValue(client);
+ public static CacheClientConfiguration GetCacheConfiguration(this FBMClientFactory client) => ClientCacheConfig.GetOrCreateValue(client);
/// <summary>
/// Explicitly set the client cache configuration for the current client
@@ -366,7 +366,7 @@ namespace VNLib.Data.Caching.Extensions
/// <param name="client"></param>
/// <param name="config">The cache node configuration</param>
/// <returns>The config instance</returns>
- public static CacheClientConfiguration SetCacheConfiguration(this FBMClient client, CacheClientConfiguration config)
+ public static CacheClientConfiguration SetCacheConfiguration(this FBMClientFactory client, CacheClientConfiguration config)
{
ClientCacheConfig.AddOrUpdate(client, config);
return config;
@@ -378,7 +378,7 @@ namespace VNLib.Data.Caching.Extensions
/// <param name="client"></param>
/// <param name="nodeConfig">The cache node configuration</param>
/// <returns>The config instance</returns>
- public static CacheNodeConfiguration SetCacheConfiguration(this FBMClient client, CacheNodeConfiguration nodeConfig)
+ public static CacheNodeConfiguration SetCacheConfiguration(this FBMClientFactory client, CacheNodeConfiguration nodeConfig)
{
ClientCacheConfig.AddOrUpdate(client, nodeConfig);
return nodeConfig;
@@ -425,7 +425,7 @@ namespace VNLib.Data.Caching.Extensions
/// <param name="client"></param>
/// <param name="cancellation">A token to cancel the operation</param>
/// <returns>A task that completes when all nodes have been discovered</returns>
- public static Task DiscoverAvailableNodesAsync(this FBMClient client, CancellationToken cancellation = default)
+ public static Task DiscoverAvailableNodesAsync(this FBMClientFactory client, CancellationToken cancellation = default)
{
//Get stored config
CacheClientConfiguration conf = ClientCacheConfig.GetOrCreateValue(client);
@@ -446,7 +446,7 @@ namespace VNLib.Data.Caching.Extensions
/// <exception cref="ArgumentNullException"></exception>
/// <exception cref="SecurityException"></exception>
/// <exception cref="ObjectDisposedException"></exception>
- public static async Task<CacheNodeAdvertisment> ConnectToRandomCacheAsync(this FBMClient client, CancellationToken cancellation = default)
+ public static async Task<CacheNodeAdvertisment> ConnectToRandomCacheAsync(this FBMClientFactory client, CancellationToken cancellation = default)
{
//Get stored config
CacheClientConfiguration conf = ClientCacheConfig.GetOrCreateValue(client);
@@ -467,7 +467,7 @@ namespace VNLib.Data.Caching.Extensions
/// <summary>
/// Connects to the specified server on the configured cache client
/// </summary>
- /// <param name="client"></param>
+ /// <param name="factory"></param>
/// <param name="server">The server to connect to</param>
/// <param name="token">A token to cancel the operation</param>
/// <returns>A task that resolves when the client is connected to the cache server</returns>
@@ -477,16 +477,28 @@ namespace VNLib.Data.Caching.Extensions
/// <exception cref="ArgumentNullException"></exception>
/// <exception cref="SecurityException"></exception>
/// <exception cref="ObjectDisposedException"></exception>
- public static Task ConnectToCacheAsync(this FBMClient client, CacheNodeAdvertisment server, CancellationToken token = default)
+ public static async Task<FBMClient> ConnectToCacheAsync(this FBMClientFactory factory, CacheNodeAdvertisment server, CancellationToken token = default)
{
- _ = client ?? throw new ArgumentNullException(nameof(client));
+ _ = factory ?? throw new ArgumentNullException(nameof(factory));
_ = server ?? throw new ArgumentNullException(nameof(server));
//Get stored config
- CacheClientConfiguration conf = ClientCacheConfig.GetOrCreateValue(client);
+ CacheClientConfiguration conf = ClientCacheConfig.GetOrCreateValue(factory);
- //Connect to server (no server id because client not replication server)
- return ConnectToCacheAsync(client, conf, server, token);
+ //Create new client
+ FBMClient client = factory.CreateClient();
+
+ try
+ {
+ //Connect to server (no server id because client not replication server)
+ await ConnectToCacheAsync(client, conf, server, token);
+ return client;
+ }
+ catch
+ {
+ client.Dispose();
+ throw;
+ }
}
/// <summary>
@@ -549,19 +561,19 @@ namespace VNLib.Data.Caching.Extensions
client.LogDebug("Server negotiation validated, connecting to server");
//The client authorization header is the exact response
- client.ClientSocket.Headers[HttpRequestHeader.Authorization] = response.Content!;
+ client.Headers[HttpRequestHeader.Authorization] = response.Content!;
//See if the supplied config is for a cache node
CacheNodeConfiguration? cnc = config as CacheNodeConfiguration;
//Compute the signature of the upgrade token
- client.ClientSocket.Headers[X_UPGRADE_SIG_HEADER] = config.AuthManager.GetBase64UpgradeSignature(response.Content, cnc != null);
+ client.Headers[X_UPGRADE_SIG_HEADER] = config.AuthManager.GetBase64UpgradeSignature(response.Content, cnc != null);
//Check to see if adversize self is enabled
if (cnc?.BroadcastAdverisment == true)
{
//Set advertisment header
- client.ClientSocket.Headers[X_NODE_DISCOVERY_HEADER] = GetAdvertismentHeader(cnc);
+ client.Headers[X_NODE_DISCOVERY_HEADER] = GetAdvertismentHeader(cnc);
}
//build ws uri from the connect endpoint
diff --git a/lib/VNLib.Data.Caching.ObjectCache/src/CacheEntry.cs b/lib/VNLib.Data.Caching.ObjectCache/src/CacheEntry.cs
index 917052f..9370901 100644
--- a/lib/VNLib.Data.Caching.ObjectCache/src/CacheEntry.cs
+++ b/lib/VNLib.Data.Caching.ObjectCache/src/CacheEntry.cs
@@ -191,10 +191,8 @@ namespace VNLib.Data.Caching
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public readonly Span<byte> GetDataSegment()
{
- //Get the actual length of the segment
- uint length = GetLength();
//Get the segment from its begining offset and
- return _manager.GetSpan(_handle, DATA_SEGMENT_START, length);
+ return _manager.GetSpan(_handle, DATA_SEGMENT_START, GetLength());
}
/// <summary>
diff --git a/lib/VNLib.Data.Caching/src/ClientExtensions.cs b/lib/VNLib.Data.Caching/src/ClientExtensions.cs
index a2ec27d..8273486 100644
--- a/lib/VNLib.Data.Caching/src/ClientExtensions.cs
+++ b/lib/VNLib.Data.Caching/src/ClientExtensions.cs
@@ -45,7 +45,7 @@ namespace VNLib.Data.Caching
public static class ClientExtensions
{
- private static readonly JsonCacheObjectSerializer DefaultSerializer = new();
+ private static readonly JsonCacheObjectSerializer DefaultSerializer = new(256);
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static void LogDebug(this FBMClient client, string message, params object?[] args)
@@ -192,7 +192,7 @@ namespace VNLib.Data.Caching
/// <exception cref="InvalidResponseException"></exception>
/// <exception cref="MessageTooLargeException"></exception>
/// <exception cref="ObjectNotFoundException"></exception>
- public async static Task AddOrUpdateObjectAsync<T>(this FBMClient client, string objectId, string? newId, ObjectDataReader<T> callback, T state, CancellationToken cancellationToken = default)
+ public static async Task AddOrUpdateObjectAsync<T>(this FBMClient client, string objectId, string? newId, ObjectDataReader<T> callback, T state, CancellationToken cancellationToken = default)
{
_ = client ?? throw new ArgumentNullException(nameof(client));
_ = callback ?? throw new ArgumentNullException(nameof(callback));
@@ -306,7 +306,7 @@ namespace VNLib.Data.Caching
return deserialzer.Deserialize<T>(response.ResponseBody);
}
- //Object may not exist on the server yet
+ //Object may not exist on the server yet
if (status.Value.Equals(ResponseCodes.NotFound, StringComparison.Ordinal))
{
return default;
diff --git a/lib/VNLib.Data.Caching/src/ClientRetryManager.cs b/lib/VNLib.Data.Caching/src/ClientRetryManager.cs
index 5cee583..b7a5f2a 100644
--- a/lib/VNLib.Data.Caching/src/ClientRetryManager.cs
+++ b/lib/VNLib.Data.Caching/src/ClientRetryManager.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2022 Vaughn Nugent
+* Copyright (c) 2023 Vaughn Nugent
*
* Library: VNLib
* Package: VNLib.Data.Caching
@@ -32,7 +32,7 @@ using VNLib.Net.Messaging.FBM.Client;
namespace VNLib.Data.Caching
{
/// <summary>
- /// Manages a <see cref="FBMClientWorkerBase"/> reconnect policy
+ /// Manages a <see cref="IStatefulConnection"/> reconnect policy
/// </summary>
public class ClientRetryManager<T> : VnDisposeable where T: IStatefulConnection
{
@@ -62,6 +62,7 @@ namespace VNLib.Data.Caching
/// Raised before client is to be reconnected
/// </summary>
public event Action<T>? OnBeforeReconnect;
+
/// <summary>
/// Raised when the client fails to reconnect. Should return a value that instructs the
/// manager to reconnect
diff --git a/lib/VNLib.Data.Caching/src/JsonCacheObjectSerializer.cs b/lib/VNLib.Data.Caching/src/JsonCacheObjectSerializer.cs
index 85d1184..09fbd6c 100644
--- a/lib/VNLib.Data.Caching/src/JsonCacheObjectSerializer.cs
+++ b/lib/VNLib.Data.Caching/src/JsonCacheObjectSerializer.cs
@@ -23,12 +23,13 @@
*/
using System;
+
+using System.IO;
using System.Buffers;
+using System.Threading;
using System.Text.Json;
using System.Text.Json.Serialization;
-using VNLib.Utils.Memory.Caching;
-
namespace VNLib.Data.Caching
{
/// <summary>
@@ -37,8 +38,7 @@ namespace VNLib.Data.Caching
/// </summary>
public class JsonCacheObjectSerializer : ICacheObjectSerializer, ICacheObjectDeserializer
{
- //Create threadlocal writer for attempted lock-free writer reuse
- private static readonly ObjectRental<ReusableJsonWriter> JsonWriterPool = ObjectRental.CreateThreadLocal<ReusableJsonWriter>();
+ private static readonly ThreadLocal<Utf8JsonWriter> _writer = new(static () => new(Stream.Null));
private readonly JsonSerializerOptions? _options;
@@ -46,16 +46,13 @@ namespace VNLib.Data.Caching
/// Initializes a new <see cref="JsonCacheObjectSerializer"/>
/// </summary>
/// <param name="options">JSON serialization/deserialization options</param>
- public JsonCacheObjectSerializer(JsonSerializerOptions options)
- {
- _options = options;
- }
+ public JsonCacheObjectSerializer(JsonSerializerOptions options) => _options = options;
/// <summary>
/// Initializes a new <see cref="JsonCacheObjectSerializer"/> using
/// the default serialization rules
/// </summary>
- public JsonCacheObjectSerializer()
+ public JsonCacheObjectSerializer(int bufferSize)
{
//Configure default serialzation options
_options = new()
@@ -68,9 +65,7 @@ namespace VNLib.Data.Caching
IgnoreReadOnlyFields = true,
PropertyNameCaseInsensitive = true,
IncludeFields = false,
-
- //Use small buffers
- DefaultBufferSize = 128
+ DefaultBufferSize = bufferSize
};
}
@@ -80,17 +75,23 @@ namespace VNLib.Data.Caching
///<inheritdoc/>
public virtual void Serialize<T>(T obj, IBufferWriter<byte> finiteWriter)
{
- //Rent new json writer
- ReusableJsonWriter writer = JsonWriterPool.Rent();
+ //Read thread-local writer
+ Utf8JsonWriter localWriter = _writer.Value!;
+ //Init the writer with the new buffer writer
+ localWriter.Reset(finiteWriter);
try
{
- //Serialize the message
- writer.Serialize(finiteWriter, obj, _options);
+ //Serialize message
+ JsonSerializer.Serialize(localWriter, obj, _options);
+
+ //Flush writer to underlying buffer
+ localWriter.Flush();
}
finally
{
- JsonWriterPool.Return(writer);
+ //Unlink the writer
+ localWriter.Reset(Stream.Null);
}
}
}
diff --git a/lib/VNLib.Data.Caching/src/ReusableJsonWriter.cs b/lib/VNLib.Data.Caching/src/ReusableJsonWriter.cs
deleted file mode 100644
index c763f91..0000000
--- a/lib/VNLib.Data.Caching/src/ReusableJsonWriter.cs
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
-* Copyright (c) 2022 Vaughn Nugent
-*
-* Library: VNLib
-* Package: VNLib.Data.Caching
-* File: ReusableJsonWriter.cs
-*
-* ReusableJsonWriter.cs is part of VNLib.Data.Caching which is part of the larger
-* VNLib collection of libraries and utilities.
-*
-* VNLib.Data.Caching is free software: you can redistribute it and/or modify
-* it under the terms of the GNU Affero General Public License as
-* published by the Free Software Foundation, either version 3 of the
-* License, or (at your option) any later version.
-*
-* VNLib.Data.Caching is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-* GNU Affero General Public License for more details.
-*
-* You should have received a copy of the GNU Affero General Public License
-* along with this program. If not, see https://www.gnu.org/licenses/.
-*/
-
-using System.IO;
-using System.Buffers;
-using System.Text.Json;
-
-using VNLib.Utils;
-
-namespace VNLib.Data.Caching
-{
- internal sealed class ReusableJsonWriter : VnDisposeable
- {
- private readonly Utf8JsonWriter _writer;
-
- public ReusableJsonWriter()
- {
- _writer = new(Stream.Null);
- }
-
- /// <summary>
- /// Serializes the message and writes the serialzied data to the buffer writer
- /// </summary>
- /// <typeparam name="T"></typeparam>
- /// <param name="writer">The buffer writer to store data at</param>
- /// <param name="value">The object to serialize</param>
- /// <param name="options">Optional - serializer options</param>
- public void Serialize<T>(IBufferWriter<byte> writer, T value, JsonSerializerOptions? options = null)
- {
- //Init the writer with the new buffer writer
- _writer.Reset(writer);
- try
- {
- //Serialize message
- JsonSerializer.Serialize(_writer, value, options);
- //Flush writer to underlying buffer
- _writer.Flush();
- }
- finally
- {
- //Unlink the writer
- _writer.Reset(Stream.Null);
- }
- }
-
- protected override void Free() => _writer.Dispose();
- }
-}
diff --git a/lib/VNLib.Plugins.Extensions.VNCache/src/DataModel/EntityCacheExtensions.cs b/lib/VNLib.Plugins.Extensions.VNCache/src/DataModel/EntityCacheExtensions.cs
index bf2fa2a..562c220 100644
--- a/lib/VNLib.Plugins.Extensions.VNCache/src/DataModel/EntityCacheExtensions.cs
+++ b/lib/VNLib.Plugins.Extensions.VNCache/src/DataModel/EntityCacheExtensions.cs
@@ -107,15 +107,88 @@ namespace VNLib.Plugins.Extensions.VNCache.DataModel
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="cache"></param>
+ /// <param name="bufferSize">The default serializer buffer size</param>
/// <returns>The new <see cref="IEntityCache{T}"/> wrapper using json serialization</returns>
/// <exception cref="ArgumentNullException"></exception>
- public static IEntityCache<T> CreateJsonEntityCache<T>(this IGlobalCacheProvider cache) where T: class
+ public static IEntityCache<T> CreateJsonEntityCache<T>(this IGlobalCacheProvider cache, int bufferSize) where T: class
{
_ = cache ?? throw new ArgumentNullException(nameof(cache));
- JsonCacheObjectSerializer json = new();
+ JsonCacheObjectSerializer json = new(bufferSize);
return CreateEntityCache<T>(cache, json, json);
}
+ /// <summary>
+ /// Attemts to recover an entity from cache if possible, if a miss occurs, the
+ /// factory function is called to produce a value from a backing store. If the store
+ /// returns a result it is writen back to the cache before this method returns
+ /// </summary>
+ /// <typeparam name="T"></typeparam>
+ /// <param name="cache"></param>
+ /// <param name="id">The id of the entity to get or laod</param>
+ /// <param name="factory">The factory callback function to produce a value when a cache miss occurs</param>
+ /// <param name="cancellation">A token to cancel the operation</param>
+ /// <returns>A task that completes by returning the entity</returns>
+ /// <exception cref="ArgumentNullException"></exception>
+ public static async Task<T?> GetOrLoadAsync<T>(this IEntityCache<T> cache, string id, Func<string, Task<T?>> factory, CancellationToken cancellation = default) where T : class
+ {
+ _ = cache ?? throw new ArgumentNullException(nameof(cache));
+ _ = id ?? throw new ArgumentNullException(nameof(id));
+ _ = factory ?? throw new ArgumentNullException(nameof(factory));
+
+ //try to load the value from cache
+ T? record = await cache.GetAsync(id, cancellation);
+
+ //If record was not found in cache, load it from the factory
+ if (record is null)
+ {
+ record = await factory(id);
+
+ //If new record found, write to cache
+ if (record is not null)
+ {
+ await cache.UpsertAsync(id, record, cancellation);
+ }
+ }
+
+ return record;
+ }
+
+ /// <summary>
+ /// Attemts to recover an entity from cache if possible, if a miss occurs, the
+ /// factory function is called to produce a value from a backing store. If the store
+ /// returns a result it is writen back to the cache before this method returns
+ /// </summary>
+ /// <typeparam name="T"></typeparam>
+ /// <param name="cache"></param>
+ /// <param name="id">The id of the entity to get or laod</param>
+ /// <param name="factory">The factory callback function to produce a value when a cache miss occurs</param>
+ /// <param name="cancellation">A token to cancel the operation</param>
+ /// <returns>A task that completes by returning the entity</returns>
+ /// <exception cref="ArgumentNullException"></exception>
+ public static async Task<T?> GetOrLoadAsync<T>(this IEntityCache<T> cache, string id, Func<string, CancellationToken, Task<T?>> factory, CancellationToken cancellation = default) where T : class
+ {
+ _ = cache ?? throw new ArgumentNullException(nameof(cache));
+ _ = id ?? throw new ArgumentNullException(nameof(id));
+ _ = factory ?? throw new ArgumentNullException(nameof(factory));
+
+ //try to load the value from cache
+ T? record = await cache.GetAsync(id, cancellation);
+
+ //If record was not found in cache, load it from the factory
+ if (record is null)
+ {
+ record = await factory(id, cancellation);
+
+ //If new record found, write to cache
+ if(record is not null)
+ {
+ await cache.UpsertAsync(id, record, cancellation);
+ }
+ }
+
+ return record;
+ }
+
private sealed class EntityCacheImpl<T> : IEntityCache<T> where T : class
{
private readonly IGlobalCacheProvider _cacheProvider;