diff options
Diffstat (limited to 'plugins/ObjectCacheServer/src/Endpoints')
5 files changed, 33 insertions, 568 deletions
diff --git a/plugins/ObjectCacheServer/src/Endpoints/BrokerHeartBeatEndpoint.cs b/plugins/ObjectCacheServer/src/Endpoints/BrokerHeartBeatEndpoint.cs deleted file mode 100644 index b9c00e6..0000000 --- a/plugins/ObjectCacheServer/src/Endpoints/BrokerHeartBeatEndpoint.cs +++ /dev/null @@ -1,149 +0,0 @@ -/* -* Copyright (c) 2023 Vaughn Nugent -* -* Library: VNLib -* Package: ObjectCacheServer -* File: BrokerHeartBeatEndpoint.cs -* -* BrokerHeartBeatEndpoint.cs is part of ObjectCacheServer which is part of the larger -* VNLib collection of libraries and utilities. -* -* ObjectCacheServer is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License as -* published by the Free Software Foundation, either version 3 of the -* License, or (at your option) any later version. -* -* ObjectCacheServer is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see https://www.gnu.org/licenses/. -*/ - -using System; -using System.Net; -using System.Linq; -using System.Text.Json; -using System.Threading.Tasks; - - -using VNLib.Plugins; -using VNLib.Utils.Logging; -using VNLib.Plugins.Essentials; -using VNLib.Hashing.IdentityUtility; -using VNLib.Plugins.Essentials.Endpoints; -using VNLib.Plugins.Essentials.Extensions; -using VNLib.Plugins.Extensions.Loading; - -namespace VNLib.Data.Caching.ObjectCache.Server -{ - internal sealed class BrokerHeartBeatEndpoint : ResourceEndpointBase - { - private readonly IBrokerHeartbeatNotifier _heartBeat; - private readonly Task<IPAddress[]> BrokerIpList; - private readonly bool DebugMode; - - ///<inheritdoc/> - protected override ProtectionSettings EndpointProtectionSettings { get; } = new() - { - DisableBrowsersOnly = true, - DisableSessionsRequired = true - }; - - public BrokerHeartBeatEndpoint(PluginBase plugin) - { - //Get debug flag - DebugMode = plugin.IsDebug(); - - //Get or create the current node config - _heartBeat = plugin.GetOrCreateSingleton<NodeConfig>(); - - /* - * Resolve the ip address of the broker and store it to verify connections - * later - */ - BrokerIpList = Dns.GetHostAddressesAsync(_heartBeat.GetBrokerAddress().DnsSafeHost); - - //Setup endpoint - InitPathAndLog("/heartbeat", plugin.Log); - } - - - protected override async ValueTask<VfReturnType> GetAsync(HttpEntity entity) - { - //If-not loopback then verify server address - if (!entity.Server.IsLoopBack()) - { - //Load and verify the broker's ip address matches with an address we have stored - IPAddress[] addresses = await BrokerIpList; - - if (!addresses.Contains(entity.TrustedRemoteIp)) - { - if (DebugMode) - { - Log.Debug("Received connection {ip} that was not a DNS safe address for the broker server, access denied"); - } - - //Token invalid - entity.CloseResponse(HttpStatusCode.Forbidden); - return VfReturnType.VirtualSkip; - } - } - - //Get the authorization jwt - string? jwtAuth = entity.Server.Headers[HttpRequestHeader.Authorization]; - - if (string.IsNullOrWhiteSpace(jwtAuth)) - { - //Token invalid - entity.CloseResponse(HttpStatusCode.Forbidden); - return VfReturnType.VirtualSkip; - } - - //Parse the jwt - using JsonWebToken jwt = JsonWebToken.Parse(jwtAuth); - - //Verify the jwt using the broker's public key certificate - using (ReadOnlyJsonWebKey cert = _heartBeat.GetBrokerPublicKey()) - { - //Verify the jwt - if (!jwt.VerifyFromJwk(cert)) - { - //Token invalid - entity.CloseResponse(HttpStatusCode.Forbidden); - return VfReturnType.VirtualSkip; - } - } - - string? auth; - //Recover the auth token from the jwt - using (JsonDocument doc = jwt.GetPayload()) - { - auth = doc.RootElement.GetProperty("token").GetString(); - } - - //Get our stored token used for registration - string? selfToken = _heartBeat.GetAuthToken(); - - //Verify token - if (selfToken != null && selfToken.Equals(auth, StringComparison.Ordinal)) - { - //Signal keepalive - _heartBeat.HearbeatReceived(); - entity.CloseResponse(HttpStatusCode.OK); - return VfReturnType.VirtualSkip; - } - - if (DebugMode) - { - Log.Debug("Invalid auth token recieved from broker sever, access denied"); - } - - //Token invalid - entity.CloseResponse(HttpStatusCode.Forbidden); - return VfReturnType.VirtualSkip; - } - } -} diff --git a/plugins/ObjectCacheServer/src/Endpoints/CacheStore.cs b/plugins/ObjectCacheServer/src/Endpoints/CacheStore.cs deleted file mode 100644 index 67db433..0000000 --- a/plugins/ObjectCacheServer/src/Endpoints/CacheStore.cs +++ /dev/null @@ -1,91 +0,0 @@ -/* -* Copyright (c) 2023 Vaughn Nugent -* -* Library: VNLib -* Package: ObjectCacheServer -* File: ConnectEndpoint.cs -* -* ConnectEndpoint.cs is part of ObjectCacheServer which is part of the larger -* VNLib collection of libraries and utilities. -* -* ObjectCacheServer is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License as -* published by the Free Software Foundation, either version 3 of the -* License, or (at your option) any later version. -* -* ObjectCacheServer is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see https://www.gnu.org/licenses/. -*/ - -using System; -using System.Threading; -using System.Threading.Tasks; -using VNLib.Utils.Logging; -using VNLib.Plugins; -using VNLib.Plugins.Extensions.Loading; - -namespace VNLib.Data.Caching.ObjectCache.Server -{ - [ConfigurationName("cache")] - sealed class CacheStore : ICacheStore, IDisposable - { - public BlobCacheListener Listener { get; } - - - public CacheStore(PluginBase plugin, IConfigScope config) - { - //Init cache - Listener = InitializeCache((ObjectCacheServerEntry)plugin, config); - } - - ValueTask ICacheStore.AddOrUpdateBlobAsync<T>(string objectId, string? alternateId, GetBodyDataCallback<T> bodyData, T state, CancellationToken token) - { - return Listener.Cache.AddOrUpdateObjectAsync(objectId, alternateId, bodyData, state, default, token); - } - - void ICacheStore.Clear() - { - throw new NotImplementedException(); - } - - ValueTask<bool> ICacheStore.DeleteItemAsync(string id, CancellationToken token) - { - return Listener.Cache.DeleteObjectAsync(id, token); - } - - private static BlobCacheListener InitializeCache(ObjectCacheServerEntry plugin, IConfigScope config) - { - //Deserialize the cache config - CacheConfiguration cacheConf = config.Deserialze<CacheConfiguration>(); - - if (cacheConf.MaxCacheEntries < 2) - { - throw new ArgumentException("You must configure a 'max_cache' size larger than 1 item"); - } - - //Suggestion - if (cacheConf.MaxCacheEntries < 200) - { - plugin.Log.Information("Suggestion: You may want a larger cache size, you have less than 200 items in cache"); - } - - plugin.Log.Verbose("Creating cache store with {bc} buckets, with {mc} items/bucket", cacheConf.BucketCount, cacheConf.MaxCacheEntries); - - //Load the blob cache table system - IBlobCacheTable bc = plugin.LoadMemoryCacheSystem(config, plugin.CacheHeap, cacheConf); - - //Endpoint only allows for a single reader - return new(bc, plugin.Log, plugin.CacheHeap, true); - } - - public void Dispose() - { - Listener.Dispose(); - } - } -} diff --git a/plugins/ObjectCacheServer/src/Endpoints/CacheSystemUtil.cs b/plugins/ObjectCacheServer/src/Endpoints/CacheSystemUtil.cs deleted file mode 100644 index 669b84f..0000000 --- a/plugins/ObjectCacheServer/src/Endpoints/CacheSystemUtil.cs +++ /dev/null @@ -1,242 +0,0 @@ -/* -* Copyright (c) 2023 Vaughn Nugent -* -* Library: VNLib -* Package: ObjectCacheServer -* File: CacheSystemUtil.cs -* -* CacheSystemUtil.cs is part of ObjectCacheServer which is part of the larger -* VNLib collection of libraries and utilities. -* -* ObjectCacheServer is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License as -* published by the Free Software Foundation, either version 3 of the -* License, or (at your option) any later version. -* -* ObjectCacheServer is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see https://www.gnu.org/licenses/. -*/ - -using System; -using System.IO; -using System.Text.Json; -using System.Collections; -using System.Collections.Generic; -using System.Runtime.CompilerServices; - -using VNLib.Plugins; -using VNLib.Utils.Memory; -using VNLib.Plugins.Extensions.Loading; - -namespace VNLib.Data.Caching.ObjectCache.Server -{ - internal static class CacheSystemUtil - { - const string PERSISTANT_ASM_CONFIF_KEY = "persistant_cache_asm"; - const string USER_CACHE_ASM_CONFIG_KEY = "custom_cache_impl_asm"; - const string LOAD_METHOD_NAME = "OnRuntimeLoad"; - const string TEARDOWN_METHOD_NAME = "OnSystemDetach"; - - /// <summary> - /// Loads the <see cref="IBlobCacheTable"/> implementation (dynamic or default) into the process - /// and initializes it and it's backing store. - /// </summary> - /// <param name="plugin"></param> - /// <param name="config">The configuration object that contains loading variables</param> - /// <param name="heap">The heap for memory cache table to allocate buffers from</param> - /// <param name="cacheConf">The cache configuration object</param> - /// <returns>The loaded <see cref="IBlobCacheTable"/> implementation</returns> - /// <exception cref="FileNotFoundException"></exception> - public static IBlobCacheTable LoadMemoryCacheSystem(this PluginBase plugin, IConfigScope config, IUnmangedHeap heap, CacheConfiguration cacheConf) - { - //First, try to load persitant cache store - PersistantCacheManager? pCManager = GetPersistantStore(plugin, config); - - IBlobCacheTable table; - - //See if the user defined a custom cache table implementation - if (config.TryGetValue(USER_CACHE_ASM_CONFIG_KEY, out JsonElement customEl)) - { - string asmName = customEl.GetString() ?? throw new FileNotFoundException("User defined a custom blob cache assembly but the file name was null"); - - //Return the runtime loaded table - table = LoadCustomMemCacheTable(plugin, asmName, pCManager); - } - else - { - //Default type - table = GetInternalBlobCache(heap, cacheConf, pCManager); - } - - //Initialize the subsystem from the cache table - pCManager?.InitializeSubsystem(table); - - return table; - } - - private static IBlobCacheTable GetInternalBlobCache(IUnmangedHeap heap, CacheConfiguration config, IPersistantCacheStore? store) - { - return new BlobCacheTable(config.BucketCount, config.MaxCacheEntries, heap, store); - } - - private static IBlobCacheTable LoadCustomMemCacheTable(PluginBase plugin, string asmName, IPersistantCacheStore? store) - { - //Load the custom assembly - AssemblyLoader<IBlobCacheTable> customTable = plugin.LoadAssembly<IBlobCacheTable>(asmName); - - try - { - //Try get onload method and pass the persistant cache instance - Action<PluginBase, IPersistantCacheStore?>? onLoad = customTable.TryGetMethod<Action<PluginBase, IPersistantCacheStore?>>(LOAD_METHOD_NAME); - onLoad?.Invoke(plugin, store); - } - catch - { - customTable.Dispose(); - throw; - } - - return new RuntimeBlobCacheTable(customTable); - } - - private static PersistantCacheManager? GetPersistantStore(PluginBase plugin, IConfigScope config) - { - //Get the persistant assembly - if (!config.TryGetValue(PERSISTANT_ASM_CONFIF_KEY, out JsonElement asmEl)) - { - return null; - } - - string? asmName = asmEl.GetString(); - if (asmName == null) - { - return null; - } - - //Load the dynamic assembly into the alc - AssemblyLoader<IPersistantCacheStore> loader = plugin.LoadAssembly<IPersistantCacheStore>(asmName); - try - { - //Call the OnLoad method - Action<PluginBase, IConfigScope>? loadMethod = loader.TryGetMethod<Action<PluginBase, IConfigScope>>(LOAD_METHOD_NAME); - - loadMethod?.Invoke(plugin, config); - } - catch - { - loader.Dispose(); - throw; - } - - //Return the - return new(loader); - } - - - private sealed class RuntimeBlobCacheTable : IBlobCacheTable - { - - private readonly IBlobCacheTable _table; - private readonly Action? OnDetatch; - - public RuntimeBlobCacheTable(AssemblyLoader<IBlobCacheTable> loader) - { - OnDetatch = loader.TryGetMethod<Action>(TEARDOWN_METHOD_NAME); - _table = loader.Resource; - } - - public void Dispose() - { - //We can let the loader dispose the cache table, but we can notify of detatch - OnDetatch?.Invoke(); - } - - - ///<inheritdoc/> - [MethodImpl(MethodImplOptions.AggressiveInlining)] - IBlobCacheBucket IBlobCacheTable.GetBucket(ReadOnlySpan<char> objectId) => _table.GetBucket(objectId); - - ///<inheritdoc/> - [MethodImpl(MethodImplOptions.AggressiveInlining)] - public IEnumerator<IBlobCacheBucket> GetEnumerator() => _table.GetEnumerator(); - - ///<inheritdoc/> - [MethodImpl(MethodImplOptions.AggressiveInlining)] - IEnumerator IEnumerable.GetEnumerator() => ((IEnumerable)_table).GetEnumerator(); - } - - internal sealed class PersistantCacheManager : IPersistantCacheStore - { - const string INITIALIZE_METHOD_NAME = "OnInitializeForBucket"; - - - /* - * Our referrence can be technically unloaded, but so will - * this instance, since its loaded into the current ALC, so - * this referrence may exist for the lifetime of this instance. - * - * It also implements IDisposable, which the assembly loader class - * will call when this plugin is unloaded, we dont need to call - * it here, but we can signal a detach. - * - * Since the store implements IDisposable, its likely going to - * check for dispose on each call, so we don't need to add - * and additional disposed check since the method calls must be fast. - */ - - private readonly IPersistantCacheStore store; - - private readonly Action<uint>? InitMethod; - private readonly Action? OnServiceDetatch; - - public PersistantCacheManager(AssemblyLoader<IPersistantCacheStore> loader) - { - //Try to get the Initialize method - InitMethod = loader.TryGetMethod<Action<uint>>(INITIALIZE_METHOD_NAME); - - //Get the optional detatch method - OnServiceDetatch = loader.TryGetMethod<Action>(TEARDOWN_METHOD_NAME); - - store = loader.Resource; - } - - /// <summary> - /// Optionally initializes the backing store by publishing the table's bucket - /// id's so it's made aware of the memory cache bucket system. - /// </summary> - /// <param name="table">The table containing buckets to publish</param> - public void InitializeSubsystem(IBlobCacheTable table) - { - //Itterate all buckets - foreach (IBlobCacheBucket bucket in table) - { - InitMethod?.Invoke(bucket.Id); - } - } - - void IDisposable.Dispose() - { - //Assembly loader will dispose the type, we can just signal a detach - - OnServiceDetatch?.Invoke(); - } - - [MethodImpl(MethodImplOptions.AggressiveInlining)] - bool IPersistantCacheStore.OnCacheMiss(uint bucketId, string key, IMemoryCacheEntryFactory factory, out CacheEntry entry) - { - return store.OnCacheMiss(bucketId, key, factory, out entry); - } - - [MethodImpl(MethodImplOptions.AggressiveInlining)] - void IPersistantCacheStore.OnEntryDeleted(uint bucketId, string key) => store.OnEntryDeleted(bucketId, key); - - [MethodImpl(MethodImplOptions.AggressiveInlining)] - void IPersistantCacheStore.OnEntryEvicted(uint bucketId, string key, in CacheEntry entry) => store.OnEntryEvicted(bucketId, key, in entry); - } - } -} diff --git a/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs b/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs index 167a7e9..8352635 100644 --- a/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs +++ b/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs @@ -31,10 +31,8 @@ using System.Collections.Generic; using VNLib.Hashing; using VNLib.Net.Http; -using VNLib.Utils.Async; using VNLib.Utils.Memory; using VNLib.Utils.Logging; -using VNLib.Utils.Extensions; using VNLib.Data.Caching; using VNLib.Data.Caching.Extensions; using VNLib.Hashing.IdentityUtility; @@ -49,21 +47,21 @@ using VNLib.Plugins.Essentials.Extensions; using VNLib.Plugins.Extensions.Loading.Routing; using VNLib.Data.Caching.ObjectCache.Server.Distribution; -namespace VNLib.Data.Caching.ObjectCache.Server + +namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints { [ConfigurationName("connect_endpoint")] - internal sealed class ConnectEndpoint : ResourceEndpointBase, IAsyncBackgroundWork + internal sealed class ConnectEndpoint : ResourceEndpointBase { private static readonly TimeSpan AuthTokenExpiration = TimeSpan.FromSeconds(30); - private readonly CacheNodeConfiguration NodeConfiguration; + private readonly NodeConfig NodeConfiguration; private readonly ICacheEventQueueManager PubSubManager; private readonly IPeerMonitor Peers; private readonly BlobCacheListener Store; - private readonly CacheAuthKeyStore KeyStore; private readonly bool VerifyIp; private readonly string AudienceLocalServerId; @@ -94,8 +92,6 @@ namespace VNLib.Data.Caching.ObjectCache.Server InitPathAndLog(path, plugin.Log); - KeyStore = new(plugin); - //Check for ip-verification flag VerifyIp = config.TryGetValue("verify_ip", out JsonElement vIpEl) && vIpEl.GetBoolean(); @@ -103,7 +99,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server PubSubManager = plugin.GetOrCreateSingleton<CacheEventQueueManager>(); //Get node configuration - NodeConfiguration = plugin.GetOrCreateSingleton<NodeConfig>().Config; + NodeConfiguration = plugin.GetOrCreateSingleton<NodeConfig>(); //Get peer monitor Peers = plugin.GetOrCreateSingleton<CachePeerMonitor>(); @@ -119,9 +115,6 @@ namespace VNLib.Data.Caching.ObjectCache.Server * know client tokens belong to us when singed by the same key */ AudienceLocalServerId = Guid.NewGuid().ToString("N"); - - //Schedule the queue worker to be run - _ = plugin.ObserveWork(this, 100); } @@ -142,7 +135,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server * received the messages properly */ - protected override async ValueTask<VfReturnType> GetAsync(HttpEntity entity) + protected override VfReturnType Get(HttpEntity entity) { //Parse jwt from authoriation string? jwtAuth = entity.Server.Headers[HttpRequestHeader.Authorization]; @@ -161,22 +154,18 @@ namespace VNLib.Data.Caching.ObjectCache.Server { bool verified = false; - //Get the client public key certificate to verify the client's message - using(ReadOnlyJsonWebKey cert = await KeyStore.GetClientPublicKeyAsync()) + //verify signature for client + if (NodeConfiguration.KeyStore.VerifyJwt(jwt)) { - //verify signature for client - if (jwt.VerifyFromJwk(cert)) - { - verified = true; - } - //May be signed by a cache server - else - { - //Set peer and verified flag since the another cache server signed the request - isPeer = verified = NodeConfiguration.VerifyCache(jwt); - } + verified = true; + } + //May be signed by a cache server + else + { + //Set peer and verified flag since the another cache server signed the request + isPeer = verified = NodeConfiguration.KeyStore.VerifyCachePeer(jwt); } - + //Check flag if (!verified) { @@ -204,7 +193,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server //Verified, now we can create an auth message with a short expiration using JsonWebToken auth = new(); - auth.WriteHeader(NodeConfiguration.GetJwtHeader()); + auth.WriteHeader(NodeConfiguration.KeyStore.GetJwtHeader()); auth.InitPayloadClaim() .AddClaim("aud", AudienceLocalServerId) .AddClaim("exp", entity.RequestedTimeUtc.Add(AuthTokenExpiration).ToUnixTimeSeconds()) @@ -223,14 +212,14 @@ namespace VNLib.Data.Caching.ObjectCache.Server .CommitClaims(); //Sign the auth message from our private key - NodeConfiguration.SignJwt(auth); + NodeConfiguration.KeyStore.SignJwt(auth); //Close response entity.CloseResponse(HttpStatusCode.OK, ContentType.Text, auth.DataBuffer); return VfReturnType.VirtualSkip; } - protected override async ValueTask<VfReturnType> WebsocketRequestedAsync(HttpEntity entity) + protected override VfReturnType WebsocketRequested(HttpEntity entity) { //Parse jwt from authorization string? jwtAuth = entity.Server.Headers[HttpRequestHeader.Authorization]; @@ -251,13 +240,13 @@ namespace VNLib.Data.Caching.ObjectCache.Server } string? nodeId = null; - ICachePeerAdvertisment? discoveryAd = null; + ICacheNodeAdvertisment? discoveryAd = null; //Parse jwt using (JsonWebToken jwt = JsonWebToken.Parse(jwtAuth)) { //verify signature against the cache public key, since this server must have signed it - if (!NodeConfiguration.VerifyCache(jwt)) + if (!NodeConfiguration.KeyStore.VerifyCachePeer(jwt)) { entity.CloseResponse(HttpStatusCode.Unauthorized); return VfReturnType.VirtualSkip; @@ -313,7 +302,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server if (isPeer) { //Verify token signature against a fellow cache public key - if (!NodeConfiguration.VerifyUpgradeToken(clientSignature, jwtAuth)) + if (!NodeConfiguration.KeyStore.VerifyUpgradeToken(clientSignature, jwtAuth)) { entity.CloseResponse(HttpStatusCode.Unauthorized); return VfReturnType.VirtualSkip; @@ -325,16 +314,13 @@ namespace VNLib.Data.Caching.ObjectCache.Server //Verify the node advertisement header and publish it if (!string.IsNullOrWhiteSpace(discoveryHeader)) { - discoveryAd = NodeConfiguration.VerifyPeerAdvertisment(discoveryHeader); + discoveryAd = NodeConfiguration.KeyStore.VerifyPeerAdvertisment(discoveryHeader); } } else { //Not a peer, so verify against the client's public key - using ReadOnlyJsonWebKey clientPub = await KeyStore.GetClientPublicKeyAsync(); - - //Verify token signature - if (!FBMDataCacheExtensions.VerifyUpgradeToken(clientSignature, jwtAuth, clientPub)) + if (!NodeConfiguration.KeyStore.VerifyUpgradeToken(clientSignature, jwtAuth)) { entity.CloseResponse(HttpStatusCode.Unauthorized); return VfReturnType.VirtualSkip; @@ -426,7 +412,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server if (!string.IsNullOrWhiteSpace(state.NodeId)) { //Get the event queue for the current node - AsyncQueue<ChangeEvent> queue = PubSubManager.Subscribe(state); + IPeerEventQueue queue = PubSubManager.Subscribe(state); try { @@ -470,44 +456,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server Log.Debug("Server websocket exited"); } - - - //Background worker to process event queue items - async Task IAsyncBackgroundWork.DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken) - { - const int accumulatorSize = 64; - - try - { - //Accumulator for events - ChangeEvent[] accumulator = new ChangeEvent[accumulatorSize]; - int ptr = 0; - - //Listen for changes - while (true) - { - //Wait for next event - accumulator[ptr++] = await Store.EventQueue.DequeueAsync(exitToken); - - //try to accumulate more events until we can't anymore - while (Store.EventQueue.TryDequeue(out ChangeEvent? ev) && ptr < accumulatorSize) - { - accumulator[ptr++] = ev; - } - - //Publish all events to subscribers - PubSubManager.PublishMultiple(accumulator.AsSpan(0, ptr)); - - //Reset pointer - ptr = 0; - } - } - catch (OperationCanceledException) - { - //Normal exit - pluginLog.Debug("Change queue listener worker exited"); - } - } + private class WsUserState : ICachePeer { @@ -516,7 +465,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server public int MaxMessageSize { get; init; } public int MaxResponseBufferSize { get; init; } public string? NodeId { get; init; } - public ICachePeerAdvertisment? Advertisment { get; init; } + public ICacheNodeAdvertisment? Advertisment { get; init; } public override string ToString() { diff --git a/plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs b/plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs index 670d624..90ffca0 100644 --- a/plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs +++ b/plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs @@ -23,8 +23,8 @@ */ using System; -using System.Linq; using System.Net; +using System.Linq; using System.Text.Json; using System.Threading.Tasks; @@ -76,14 +76,12 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints using(JsonWebToken jwt = JsonWebToken.Parse(authToken)) { //try to verify against cache node first - if (!Config.Config.VerifyCache(jwt)) + if (!Config.KeyStore.VerifyCachePeer(jwt)) { //failed... //try to verify against client key - using ReadOnlyJsonWebKey clientPub = await Config.KeyStore.GetClientPublicKeyAsync(); - - if (!jwt.VerifyFromJwk(clientPub)) + if (!Config.KeyStore.VerifyJwt(jwt)) { //invalid token entity.CloseResponse(HttpStatusCode.Unauthorized); @@ -97,7 +95,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints } //Valid key, get peer list to send to client - ICachePeerAdvertisment[] peers = PeerMonitor.GetAllPeers() + ICacheNodeAdvertisment[] peers = PeerMonitor.GetAllPeers() .Where(static p => p.Advertisment != null) .Select(static p => p.Advertisment!) .ToArray(); @@ -106,7 +104,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints using JsonWebToken response = new(); //set header from cache config - response.WriteHeader(Config.Config.GetJwtHeader()); + response.WriteHeader(Config.KeyStore.GetJwtHeader()); response.InitPayloadClaim() .AddClaim("iss", Config.Config.NodeId) @@ -119,7 +117,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints .CommitClaims(); //Sign the response - Config.Config.SignJwt(response); + Config.KeyStore.SignJwt(response); //Send response to client entity.CloseResponse(HttpStatusCode.OK, Net.Http.ContentType.Text, response.DataBuffer); |