diff options
Diffstat (limited to 'plugins')
9 files changed, 108 insertions, 33 deletions
diff --git a/plugins/ObjectCacheServer/server/container/Dockerfile b/plugins/ObjectCacheServer/server/container/Dockerfile index 6c466d4..725b9d1 100644 --- a/plugins/ObjectCacheServer/server/container/Dockerfile +++ b/plugins/ObjectCacheServer/server/container/Dockerfile @@ -55,6 +55,7 @@ ENV MAX_ENTRIES=10000 ENV CACHE_BUCKETS=100 ENV CACHE_MAX_MESSAGE=20480 ENV MAX_CONCURRENT_CONNECTIONS=1000 +ENV ENABLE_CHECKSUMS=true ENV VERIFY_IP=true ENV MAX_PEER_NODES=10 diff --git a/plugins/ObjectCacheServer/server/container/config-templates/ObjectCacheServer-template.json b/plugins/ObjectCacheServer/server/container/config-templates/ObjectCacheServer-template.json index 765c3d7..564039a 100644 --- a/plugins/ObjectCacheServer/server/container/config-templates/ObjectCacheServer-template.json +++ b/plugins/ObjectCacheServer/server/container/config-templates/ObjectCacheServer-template.json @@ -45,7 +45,8 @@ "buffer_recv_min": 8192, //min of 8k transfer buffer "buffer_header_max": 2048, //2k max header buffer size "buffer_header_min": 128, //128 byte min request header buffer size - "max_message_size": ${CACHE_MAX_MESSAGE} //Absolute maxium message size allowed, also the maxium size of cache entires + "max_message_size": ${CACHE_MAX_MESSAGE}, //Absolute maxium message size allowed, also the maxium size of cache entires + "enable_checksums": ${ENABLE_CHECKSUMS} //Enable checksums for cache entries }, //Known peers array, must point to well-known endpoint for discovery diff --git a/plugins/ObjectCacheServer/server/container/docker-compose.yaml b/plugins/ObjectCacheServer/server/container/docker-compose.yaml index c1b61fa..647c8c2 100644 --- a/plugins/ObjectCacheServer/server/container/docker-compose.yaml +++ b/plugins/ObjectCacheServer/server/container/docker-compose.yaml @@ -25,7 +25,8 @@ services: VERIFY_IP: "true" #verfies the IP address of clients during negotiation (recommended) MAX_PEER_NODES: "10" #max number of other peer nodes this node shoud connect to DISCOVERY_INTERVAL: "360" #time (in seconds) between peer node discovery - KNOWN_PEERS: '[]' #array of known peer nodes in the cluster + KNOWN_PEERS: '[]' #array of known peer nodes in the cluster + ENABLE_CHECKSUMS: "true" #enables checksums for messages #SECRETS (must be JWK formatted keys) CACHE_PRIV_KEY: "" #REQUIRED local private key used to identify and sign messages to clients and other nodes diff --git a/plugins/ObjectCacheServer/src/Cache/CacheMemoryConfiguration.cs b/plugins/ObjectCacheServer/src/Cache/CacheMemoryConfiguration.cs index c404cc5..0b81447 100644 --- a/plugins/ObjectCacheServer/src/Cache/CacheMemoryConfiguration.cs +++ b/plugins/ObjectCacheServer/src/Cache/CacheMemoryConfiguration.cs @@ -54,5 +54,8 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache [JsonPropertyName("memory_lib_path")] public string? ExternLibPath { get; set; } + + [JsonPropertyName("enable_checksums")] + public bool EnableChecksums { get; set; } = true; } } diff --git a/plugins/ObjectCacheServer/src/ObjectCacheSystemState.cs b/plugins/ObjectCacheServer/src/ObjectCacheSystemState.cs index cd5bf1b..f8ce8a9 100644 --- a/plugins/ObjectCacheServer/src/ObjectCacheSystemState.cs +++ b/plugins/ObjectCacheServer/src/ObjectCacheSystemState.cs @@ -187,15 +187,21 @@ namespace VNLib.Data.Caching.ObjectCache.Server CacheListenerPubQueue queue = new(plugin, PeerEventQueue); - //Must register background worker to listen for changes + //Must register the queue background worker to listen for changes _ = plugin.ObserveWork(queue, 150); + BlobCacheListenerConfig conf = new() + { + Log = plugin.Log.CreateScope(CacheConstants.LogScopes.BlobCacheListener), + MemoryManager = new SharedHeapFBMMemoryManager(SharedCacheHeap), + EnableMessageChecksums = MemoryConfiguration.EnableChecksums, + }; + //Endpoint only allows for a single reader Listener = new( plugin.LoadMemoryCacheSystem(config, manager, MemoryConfiguration), - queue, - plugin.Log.CreateScope(CacheConstants.LogScopes.BlobCacheListener), - new SharedHeapFBMMemoryManager(SharedCacheHeap) + conf, + queue ); InternalStore = new CacheStore(Listener.Cache); diff --git a/plugins/VNLib.Data.Caching.Providers.Redis/src/RedisClientCacheEntry.cs b/plugins/VNLib.Data.Caching.Providers.Redis/src/RedisClientCacheEntry.cs index 360be58..7e91fe7 100644 --- a/plugins/VNLib.Data.Caching.Providers.Redis/src/RedisClientCacheEntry.cs +++ b/plugins/VNLib.Data.Caching.Providers.Redis/src/RedisClientCacheEntry.cs @@ -69,8 +69,6 @@ namespace VNLib.Data.Caching.Providers.Redis public RedisClientCacheEntry(PluginBase plugin, IConfigScope config) { _defaultHeap = MemoryUtil.Shared; - DefaultDeserializer = new JsonCacheObjectSerializer(256); - DefaultSerializer = new JsonCacheObjectSerializer(256); ILogProvider redisLog = plugin.Log.CreateScope("REDIS"); @@ -121,6 +119,31 @@ namespace VNLib.Data.Caching.Providers.Redis redisLog.Information("Successfully connected to Redis server"); }); } + + string? serialzerDllPath = config.GetPropString("serializer_assebly_name"); + + //See if user has specified a custom serializer assembly + if (!string.IsNullOrWhiteSpace(serialzerDllPath)) + { + //Load the custom serializer assembly and get the serializer and deserializer instances + DefaultSerializer = plugin.CreateServiceExternal<ICacheObjectSerializer>(serialzerDllPath); + + //Avoid creating another instance if the deserializer is the same as the serializer + if (DefaultSerializer is ICacheObjectDeserializer cod) + { + DefaultDeserializer = cod; + } + else + { + DefaultDeserializer = plugin.CreateServiceExternal<ICacheObjectDeserializer>(serialzerDllPath); + } + } + else + { + //If no default serializer is set, use the default JSON serializer + DefaultDeserializer = new JsonCacheObjectSerializer(256); + DefaultSerializer = new JsonCacheObjectSerializer(256); + } } private static ConfigurationOptions GetOptionsFromConfig(IConfigScope config) @@ -311,7 +334,7 @@ namespace VNLib.Data.Caching.Providers.Redis ///<inheritdoc/> public object GetUnderlyingStore() { - return _database == null ? throw new InvalidOperationException("The cache store is not available") : _database; + return _database is null ? throw new InvalidOperationException("The cache store is not available") : _database; } private sealed class AddOrUpdateBuffer: VnDisposeable, IBufferWriter<byte> diff --git a/plugins/VNLib.Data.Caching.Providers.VNCache/src/FBMCacheClient.cs b/plugins/VNLib.Data.Caching.Providers.VNCache/src/FBMCacheClient.cs index 07fc9ee..e84a077 100644 --- a/plugins/VNLib.Data.Caching.Providers.VNCache/src/FBMCacheClient.cs +++ b/plugins/VNLib.Data.Caching.Providers.VNCache/src/FBMCacheClient.cs @@ -78,7 +78,8 @@ namespace VNLib.Data.Caching.Providers.VNCache public FBMCacheClient(PluginBase plugin, IConfigScope config) : this( config.Deserialze<VnCacheClientConfig>(), - plugin.IsDebug() ? plugin.Log : null + plugin.IsDebug() ? plugin.Log : null, + plugin ) { ILogProvider scoped = plugin.Log.CreateScope(LOG_NAME); @@ -103,7 +104,11 @@ namespace VNLib.Data.Caching.Providers.VNCache } } - public FBMCacheClient(VnCacheClientConfig config, ILogProvider? debugLog):base(config) + public FBMCacheClient(VnCacheClientConfig config, ILogProvider? debugLog) : this(config, debugLog, null) + { } + + + private FBMCacheClient(VnCacheClientConfig config, ILogProvider? debugLog, PluginBase? plugin) : base(config) { //Validate config (config as IOnConfigValidation).Validate(); @@ -115,13 +120,13 @@ namespace VNLib.Data.Caching.Providers.VNCache //Init the client with default settings FBMClientConfig conf = FBMDataCacheExtensions.GetDefaultConfig(BufferHeap, (int)config.MaxBlobSize, config.RequestTimeout, debugLog); - + FBMClientFactory clientFactory = new( - in conf, - new FBMFallbackClientWsFactory(), + in conf, + new FBMFallbackClientWsFactory(), 10 ); - + _cluster = (new CacheClientConfiguration()) .WithTls(config.UseTls) .WithInitialPeers(config.GetInitialNodeUris()) @@ -129,6 +134,9 @@ namespace VNLib.Data.Caching.Providers.VNCache //Init index _index = ClusterNodeIndex.CreateIndex(_cluster); + + //Init serializers + InitSerializers(config, plugin); } /* @@ -296,7 +304,7 @@ namespace VNLib.Data.Caching.Providers.VNCache public override Task<bool> DeleteAsync(string key, CancellationToken cancellation) { return !IsConnected - ? throw new InvalidOperationException("The underlying client is not connected to a cache node") + ? Task.FromException<bool>(new InvalidOperationException("The underlying client is not connected to a cache node")) : _client!.DeleteObjectAsync(key, cancellation); } @@ -304,7 +312,7 @@ namespace VNLib.Data.Caching.Providers.VNCache public override Task<T> GetAsync<T>(string key, ICacheObjectDeserializer deserializer, CancellationToken cancellation) { return !IsConnected - ? throw new InvalidOperationException("The underlying client is not connected to a cache node") + ? Task.FromException<T>(new InvalidOperationException("The underlying client is not connected to a cache node")) : _client!.GetObjectAsync<T>(key, deserializer, cancellation); } @@ -312,7 +320,7 @@ namespace VNLib.Data.Caching.Providers.VNCache public override Task AddOrUpdateAsync<T>(string key, string? newKey, T value, ICacheObjectSerializer serialzer, CancellationToken cancellation) { return !IsConnected - ? throw new InvalidOperationException("The underlying client is not connected to a cache node") + ? Task.FromException(new InvalidOperationException("The underlying client is not connected to a cache node")) : _client!.AddOrUpdateObjectAsync(key, newKey, value, serialzer, cancellation); } @@ -320,7 +328,7 @@ namespace VNLib.Data.Caching.Providers.VNCache public override Task GetAsync<T>(string key, ObjectDataSet<T> callback, T state, CancellationToken cancellation) { return !IsConnected - ? throw new InvalidOperationException("The underlying client is not connected to a cache node") + ? Task.FromException(new InvalidOperationException("The underlying client is not connected to a cache node")) : _client!.GetObjectAsync(key, callback, state, cancellation); } @@ -328,7 +336,7 @@ namespace VNLib.Data.Caching.Providers.VNCache public override Task AddOrUpdateAsync<T>(string key, string? newKey, ObjectDataGet<T> callback, T state, CancellationToken cancellation) { return !IsConnected - ? throw new InvalidOperationException("The underlying client is not connected to a cache node") + ? Task.FromException(new InvalidOperationException("The underlying client is not connected to a cache node")) : _client!.AddOrUpdateObjectAsync(key, newKey, callback, state, cancellation); } diff --git a/plugins/VNLib.Data.Caching.Providers.VNCache/src/VNCacheBase.cs b/plugins/VNLib.Data.Caching.Providers.VNCache/src/VNCacheBase.cs index c337ef4..dc1ab8f 100644 --- a/plugins/VNLib.Data.Caching.Providers.VNCache/src/VNCacheBase.cs +++ b/plugins/VNLib.Data.Caching.Providers.VNCache/src/VNCacheBase.cs @@ -25,26 +25,22 @@ using System.Threading; using System.Threading.Tasks; +using VNLib.Plugins; +using VNLib.Plugins.Extensions.Loading; + namespace VNLib.Data.Caching.Providers.VNCache { - internal abstract class VNCacheBase : IGlobalCacheProvider + internal abstract class VNCacheBase(VNCacheConfig conf) : IGlobalCacheProvider { ///<inheritdoc/> public abstract bool IsConnected { get; } ///<inheritdoc/> - public virtual ICacheObjectDeserializer DefaultDeserializer { get; } + public virtual ICacheObjectDeserializer DefaultDeserializer => conf.CacheObjectDeserializer!; ///<inheritdoc/> - public virtual ICacheObjectSerializer DefaultSerializer { get; } - - protected VNCacheBase(VNCacheConfig config) - { - //Set default serializers - DefaultDeserializer = config.CacheObjectDeserializer ?? new JsonCacheObjectSerializer(256); - DefaultSerializer = config.CacheObjectSerializer ?? new JsonCacheObjectSerializer(256); - } - + public virtual ICacheObjectSerializer DefaultSerializer => conf.CacheObjectSerializer!; + ///<inheritdoc/> public abstract Task AddOrUpdateAsync<T>(string key, string? newKey, T value, ICacheObjectSerializer serialzer, CancellationToken cancellation); @@ -62,5 +58,36 @@ namespace VNLib.Data.Caching.Providers.VNCache ///<inheritdoc/> public abstract object GetUnderlyingStore(); + + /// <summary> + /// Initializes a set of cache object serializers and deserializers + /// for the configuration instance and loads external serializers if specified + /// by the user. + /// </summary> + /// <param name="config">The configuration instance to initialzie</param> + /// <param name="plugin">Optional plugin for loading external serializers</param> + protected static void InitSerializers(VNCacheConfig config, PluginBase? plugin) + { + //See if user has specified a custom serializer assembly + if (!string.IsNullOrWhiteSpace(config.SerializerDllPath)) + { + //Load the custom serializer assembly and get the serializer and deserializer instances + config.CacheObjectSerializer = plugin.CreateServiceExternal<ICacheObjectSerializer>(config.SerializerDllPath); + + //Avoid creating another instance if the deserializer is the same as the serializer + if(config.CacheObjectSerializer is ICacheObjectDeserializer cod) + { + config.CacheObjectDeserializer = cod; + } + else + { + config.CacheObjectDeserializer = plugin.CreateServiceExternal<ICacheObjectDeserializer>(config.SerializerDllPath); + } + } + + //If no default serializer is set, use the default JSON serializer + config.CacheObjectSerializer ??= new JsonCacheObjectSerializer(256); + config.CacheObjectDeserializer ??= new JsonCacheObjectSerializer(256); + } } }
\ No newline at end of file diff --git a/plugins/VNLib.Data.Caching.Providers.VNCache/src/VNCacheConfig.cs b/plugins/VNLib.Data.Caching.Providers.VNCache/src/VNCacheConfig.cs index 8311519..24008b3 100644 --- a/plugins/VNLib.Data.Caching.Providers.VNCache/src/VNCacheConfig.cs +++ b/plugins/VNLib.Data.Caching.Providers.VNCache/src/VNCacheConfig.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2023 Vaughn Nugent +* Copyright (c) 2024 Vaughn Nugent * * Library: VNLib * Package: VNLib.Data.Caching.Providers.VNCache @@ -98,6 +98,11 @@ namespace VNLib.Data.Caching.Providers.VNCache throw new ArgumentException("You must configure a maximum object size", "max_object_size"); } } - + + /// <summary> + /// Optional external cache serializer library to load + /// </summary> + [JsonPropertyName("serializer_assebly_name")] + public string? SerializerDllPath { get; set; } } }
\ No newline at end of file |