aboutsummaryrefslogtreecommitdiff
path: root/plugins/ObjectCacheServer/src/Endpoints
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/ObjectCacheServer/src/Endpoints')
-rw-r--r--plugins/ObjectCacheServer/src/Endpoints/BrokerHeartBeatEndpoint.cs149
-rw-r--r--plugins/ObjectCacheServer/src/Endpoints/CacheStore.cs91
-rw-r--r--plugins/ObjectCacheServer/src/Endpoints/CacheSystemUtil.cs242
-rw-r--r--plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs105
-rw-r--r--plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs14
5 files changed, 33 insertions, 568 deletions
diff --git a/plugins/ObjectCacheServer/src/Endpoints/BrokerHeartBeatEndpoint.cs b/plugins/ObjectCacheServer/src/Endpoints/BrokerHeartBeatEndpoint.cs
deleted file mode 100644
index b9c00e6..0000000
--- a/plugins/ObjectCacheServer/src/Endpoints/BrokerHeartBeatEndpoint.cs
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
-* Copyright (c) 2023 Vaughn Nugent
-*
-* Library: VNLib
-* Package: ObjectCacheServer
-* File: BrokerHeartBeatEndpoint.cs
-*
-* BrokerHeartBeatEndpoint.cs is part of ObjectCacheServer which is part of the larger
-* VNLib collection of libraries and utilities.
-*
-* ObjectCacheServer is free software: you can redistribute it and/or modify
-* it under the terms of the GNU Affero General Public License as
-* published by the Free Software Foundation, either version 3 of the
-* License, or (at your option) any later version.
-*
-* ObjectCacheServer is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-* GNU Affero General Public License for more details.
-*
-* You should have received a copy of the GNU Affero General Public License
-* along with this program. If not, see https://www.gnu.org/licenses/.
-*/
-
-using System;
-using System.Net;
-using System.Linq;
-using System.Text.Json;
-using System.Threading.Tasks;
-
-
-using VNLib.Plugins;
-using VNLib.Utils.Logging;
-using VNLib.Plugins.Essentials;
-using VNLib.Hashing.IdentityUtility;
-using VNLib.Plugins.Essentials.Endpoints;
-using VNLib.Plugins.Essentials.Extensions;
-using VNLib.Plugins.Extensions.Loading;
-
-namespace VNLib.Data.Caching.ObjectCache.Server
-{
- internal sealed class BrokerHeartBeatEndpoint : ResourceEndpointBase
- {
- private readonly IBrokerHeartbeatNotifier _heartBeat;
- private readonly Task<IPAddress[]> BrokerIpList;
- private readonly bool DebugMode;
-
- ///<inheritdoc/>
- protected override ProtectionSettings EndpointProtectionSettings { get; } = new()
- {
- DisableBrowsersOnly = true,
- DisableSessionsRequired = true
- };
-
- public BrokerHeartBeatEndpoint(PluginBase plugin)
- {
- //Get debug flag
- DebugMode = plugin.IsDebug();
-
- //Get or create the current node config
- _heartBeat = plugin.GetOrCreateSingleton<NodeConfig>();
-
- /*
- * Resolve the ip address of the broker and store it to verify connections
- * later
- */
- BrokerIpList = Dns.GetHostAddressesAsync(_heartBeat.GetBrokerAddress().DnsSafeHost);
-
- //Setup endpoint
- InitPathAndLog("/heartbeat", plugin.Log);
- }
-
-
- protected override async ValueTask<VfReturnType> GetAsync(HttpEntity entity)
- {
- //If-not loopback then verify server address
- if (!entity.Server.IsLoopBack())
- {
- //Load and verify the broker's ip address matches with an address we have stored
- IPAddress[] addresses = await BrokerIpList;
-
- if (!addresses.Contains(entity.TrustedRemoteIp))
- {
- if (DebugMode)
- {
- Log.Debug("Received connection {ip} that was not a DNS safe address for the broker server, access denied");
- }
-
- //Token invalid
- entity.CloseResponse(HttpStatusCode.Forbidden);
- return VfReturnType.VirtualSkip;
- }
- }
-
- //Get the authorization jwt
- string? jwtAuth = entity.Server.Headers[HttpRequestHeader.Authorization];
-
- if (string.IsNullOrWhiteSpace(jwtAuth))
- {
- //Token invalid
- entity.CloseResponse(HttpStatusCode.Forbidden);
- return VfReturnType.VirtualSkip;
- }
-
- //Parse the jwt
- using JsonWebToken jwt = JsonWebToken.Parse(jwtAuth);
-
- //Verify the jwt using the broker's public key certificate
- using (ReadOnlyJsonWebKey cert = _heartBeat.GetBrokerPublicKey())
- {
- //Verify the jwt
- if (!jwt.VerifyFromJwk(cert))
- {
- //Token invalid
- entity.CloseResponse(HttpStatusCode.Forbidden);
- return VfReturnType.VirtualSkip;
- }
- }
-
- string? auth;
- //Recover the auth token from the jwt
- using (JsonDocument doc = jwt.GetPayload())
- {
- auth = doc.RootElement.GetProperty("token").GetString();
- }
-
- //Get our stored token used for registration
- string? selfToken = _heartBeat.GetAuthToken();
-
- //Verify token
- if (selfToken != null && selfToken.Equals(auth, StringComparison.Ordinal))
- {
- //Signal keepalive
- _heartBeat.HearbeatReceived();
- entity.CloseResponse(HttpStatusCode.OK);
- return VfReturnType.VirtualSkip;
- }
-
- if (DebugMode)
- {
- Log.Debug("Invalid auth token recieved from broker sever, access denied");
- }
-
- //Token invalid
- entity.CloseResponse(HttpStatusCode.Forbidden);
- return VfReturnType.VirtualSkip;
- }
- }
-}
diff --git a/plugins/ObjectCacheServer/src/Endpoints/CacheStore.cs b/plugins/ObjectCacheServer/src/Endpoints/CacheStore.cs
deleted file mode 100644
index 67db433..0000000
--- a/plugins/ObjectCacheServer/src/Endpoints/CacheStore.cs
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
-* Copyright (c) 2023 Vaughn Nugent
-*
-* Library: VNLib
-* Package: ObjectCacheServer
-* File: ConnectEndpoint.cs
-*
-* ConnectEndpoint.cs is part of ObjectCacheServer which is part of the larger
-* VNLib collection of libraries and utilities.
-*
-* ObjectCacheServer is free software: you can redistribute it and/or modify
-* it under the terms of the GNU Affero General Public License as
-* published by the Free Software Foundation, either version 3 of the
-* License, or (at your option) any later version.
-*
-* ObjectCacheServer is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-* GNU Affero General Public License for more details.
-*
-* You should have received a copy of the GNU Affero General Public License
-* along with this program. If not, see https://www.gnu.org/licenses/.
-*/
-
-using System;
-using System.Threading;
-using System.Threading.Tasks;
-using VNLib.Utils.Logging;
-using VNLib.Plugins;
-using VNLib.Plugins.Extensions.Loading;
-
-namespace VNLib.Data.Caching.ObjectCache.Server
-{
- [ConfigurationName("cache")]
- sealed class CacheStore : ICacheStore, IDisposable
- {
- public BlobCacheListener Listener { get; }
-
-
- public CacheStore(PluginBase plugin, IConfigScope config)
- {
- //Init cache
- Listener = InitializeCache((ObjectCacheServerEntry)plugin, config);
- }
-
- ValueTask ICacheStore.AddOrUpdateBlobAsync<T>(string objectId, string? alternateId, GetBodyDataCallback<T> bodyData, T state, CancellationToken token)
- {
- return Listener.Cache.AddOrUpdateObjectAsync(objectId, alternateId, bodyData, state, default, token);
- }
-
- void ICacheStore.Clear()
- {
- throw new NotImplementedException();
- }
-
- ValueTask<bool> ICacheStore.DeleteItemAsync(string id, CancellationToken token)
- {
- return Listener.Cache.DeleteObjectAsync(id, token);
- }
-
- private static BlobCacheListener InitializeCache(ObjectCacheServerEntry plugin, IConfigScope config)
- {
- //Deserialize the cache config
- CacheConfiguration cacheConf = config.Deserialze<CacheConfiguration>();
-
- if (cacheConf.MaxCacheEntries < 2)
- {
- throw new ArgumentException("You must configure a 'max_cache' size larger than 1 item");
- }
-
- //Suggestion
- if (cacheConf.MaxCacheEntries < 200)
- {
- plugin.Log.Information("Suggestion: You may want a larger cache size, you have less than 200 items in cache");
- }
-
- plugin.Log.Verbose("Creating cache store with {bc} buckets, with {mc} items/bucket", cacheConf.BucketCount, cacheConf.MaxCacheEntries);
-
- //Load the blob cache table system
- IBlobCacheTable bc = plugin.LoadMemoryCacheSystem(config, plugin.CacheHeap, cacheConf);
-
- //Endpoint only allows for a single reader
- return new(bc, plugin.Log, plugin.CacheHeap, true);
- }
-
- public void Dispose()
- {
- Listener.Dispose();
- }
- }
-}
diff --git a/plugins/ObjectCacheServer/src/Endpoints/CacheSystemUtil.cs b/plugins/ObjectCacheServer/src/Endpoints/CacheSystemUtil.cs
deleted file mode 100644
index 669b84f..0000000
--- a/plugins/ObjectCacheServer/src/Endpoints/CacheSystemUtil.cs
+++ /dev/null
@@ -1,242 +0,0 @@
-/*
-* Copyright (c) 2023 Vaughn Nugent
-*
-* Library: VNLib
-* Package: ObjectCacheServer
-* File: CacheSystemUtil.cs
-*
-* CacheSystemUtil.cs is part of ObjectCacheServer which is part of the larger
-* VNLib collection of libraries and utilities.
-*
-* ObjectCacheServer is free software: you can redistribute it and/or modify
-* it under the terms of the GNU Affero General Public License as
-* published by the Free Software Foundation, either version 3 of the
-* License, or (at your option) any later version.
-*
-* ObjectCacheServer is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-* GNU Affero General Public License for more details.
-*
-* You should have received a copy of the GNU Affero General Public License
-* along with this program. If not, see https://www.gnu.org/licenses/.
-*/
-
-using System;
-using System.IO;
-using System.Text.Json;
-using System.Collections;
-using System.Collections.Generic;
-using System.Runtime.CompilerServices;
-
-using VNLib.Plugins;
-using VNLib.Utils.Memory;
-using VNLib.Plugins.Extensions.Loading;
-
-namespace VNLib.Data.Caching.ObjectCache.Server
-{
- internal static class CacheSystemUtil
- {
- const string PERSISTANT_ASM_CONFIF_KEY = "persistant_cache_asm";
- const string USER_CACHE_ASM_CONFIG_KEY = "custom_cache_impl_asm";
- const string LOAD_METHOD_NAME = "OnRuntimeLoad";
- const string TEARDOWN_METHOD_NAME = "OnSystemDetach";
-
- /// <summary>
- /// Loads the <see cref="IBlobCacheTable"/> implementation (dynamic or default) into the process
- /// and initializes it and it's backing store.
- /// </summary>
- /// <param name="plugin"></param>
- /// <param name="config">The configuration object that contains loading variables</param>
- /// <param name="heap">The heap for memory cache table to allocate buffers from</param>
- /// <param name="cacheConf">The cache configuration object</param>
- /// <returns>The loaded <see cref="IBlobCacheTable"/> implementation</returns>
- /// <exception cref="FileNotFoundException"></exception>
- public static IBlobCacheTable LoadMemoryCacheSystem(this PluginBase plugin, IConfigScope config, IUnmangedHeap heap, CacheConfiguration cacheConf)
- {
- //First, try to load persitant cache store
- PersistantCacheManager? pCManager = GetPersistantStore(plugin, config);
-
- IBlobCacheTable table;
-
- //See if the user defined a custom cache table implementation
- if (config.TryGetValue(USER_CACHE_ASM_CONFIG_KEY, out JsonElement customEl))
- {
- string asmName = customEl.GetString() ?? throw new FileNotFoundException("User defined a custom blob cache assembly but the file name was null");
-
- //Return the runtime loaded table
- table = LoadCustomMemCacheTable(plugin, asmName, pCManager);
- }
- else
- {
- //Default type
- table = GetInternalBlobCache(heap, cacheConf, pCManager);
- }
-
- //Initialize the subsystem from the cache table
- pCManager?.InitializeSubsystem(table);
-
- return table;
- }
-
- private static IBlobCacheTable GetInternalBlobCache(IUnmangedHeap heap, CacheConfiguration config, IPersistantCacheStore? store)
- {
- return new BlobCacheTable(config.BucketCount, config.MaxCacheEntries, heap, store);
- }
-
- private static IBlobCacheTable LoadCustomMemCacheTable(PluginBase plugin, string asmName, IPersistantCacheStore? store)
- {
- //Load the custom assembly
- AssemblyLoader<IBlobCacheTable> customTable = plugin.LoadAssembly<IBlobCacheTable>(asmName);
-
- try
- {
- //Try get onload method and pass the persistant cache instance
- Action<PluginBase, IPersistantCacheStore?>? onLoad = customTable.TryGetMethod<Action<PluginBase, IPersistantCacheStore?>>(LOAD_METHOD_NAME);
- onLoad?.Invoke(plugin, store);
- }
- catch
- {
- customTable.Dispose();
- throw;
- }
-
- return new RuntimeBlobCacheTable(customTable);
- }
-
- private static PersistantCacheManager? GetPersistantStore(PluginBase plugin, IConfigScope config)
- {
- //Get the persistant assembly
- if (!config.TryGetValue(PERSISTANT_ASM_CONFIF_KEY, out JsonElement asmEl))
- {
- return null;
- }
-
- string? asmName = asmEl.GetString();
- if (asmName == null)
- {
- return null;
- }
-
- //Load the dynamic assembly into the alc
- AssemblyLoader<IPersistantCacheStore> loader = plugin.LoadAssembly<IPersistantCacheStore>(asmName);
- try
- {
- //Call the OnLoad method
- Action<PluginBase, IConfigScope>? loadMethod = loader.TryGetMethod<Action<PluginBase, IConfigScope>>(LOAD_METHOD_NAME);
-
- loadMethod?.Invoke(plugin, config);
- }
- catch
- {
- loader.Dispose();
- throw;
- }
-
- //Return the
- return new(loader);
- }
-
-
- private sealed class RuntimeBlobCacheTable : IBlobCacheTable
- {
-
- private readonly IBlobCacheTable _table;
- private readonly Action? OnDetatch;
-
- public RuntimeBlobCacheTable(AssemblyLoader<IBlobCacheTable> loader)
- {
- OnDetatch = loader.TryGetMethod<Action>(TEARDOWN_METHOD_NAME);
- _table = loader.Resource;
- }
-
- public void Dispose()
- {
- //We can let the loader dispose the cache table, but we can notify of detatch
- OnDetatch?.Invoke();
- }
-
-
- ///<inheritdoc/>
- [MethodImpl(MethodImplOptions.AggressiveInlining)]
- IBlobCacheBucket IBlobCacheTable.GetBucket(ReadOnlySpan<char> objectId) => _table.GetBucket(objectId);
-
- ///<inheritdoc/>
- [MethodImpl(MethodImplOptions.AggressiveInlining)]
- public IEnumerator<IBlobCacheBucket> GetEnumerator() => _table.GetEnumerator();
-
- ///<inheritdoc/>
- [MethodImpl(MethodImplOptions.AggressiveInlining)]
- IEnumerator IEnumerable.GetEnumerator() => ((IEnumerable)_table).GetEnumerator();
- }
-
- internal sealed class PersistantCacheManager : IPersistantCacheStore
- {
- const string INITIALIZE_METHOD_NAME = "OnInitializeForBucket";
-
-
- /*
- * Our referrence can be technically unloaded, but so will
- * this instance, since its loaded into the current ALC, so
- * this referrence may exist for the lifetime of this instance.
- *
- * It also implements IDisposable, which the assembly loader class
- * will call when this plugin is unloaded, we dont need to call
- * it here, but we can signal a detach.
- *
- * Since the store implements IDisposable, its likely going to
- * check for dispose on each call, so we don't need to add
- * and additional disposed check since the method calls must be fast.
- */
-
- private readonly IPersistantCacheStore store;
-
- private readonly Action<uint>? InitMethod;
- private readonly Action? OnServiceDetatch;
-
- public PersistantCacheManager(AssemblyLoader<IPersistantCacheStore> loader)
- {
- //Try to get the Initialize method
- InitMethod = loader.TryGetMethod<Action<uint>>(INITIALIZE_METHOD_NAME);
-
- //Get the optional detatch method
- OnServiceDetatch = loader.TryGetMethod<Action>(TEARDOWN_METHOD_NAME);
-
- store = loader.Resource;
- }
-
- /// <summary>
- /// Optionally initializes the backing store by publishing the table's bucket
- /// id's so it's made aware of the memory cache bucket system.
- /// </summary>
- /// <param name="table">The table containing buckets to publish</param>
- public void InitializeSubsystem(IBlobCacheTable table)
- {
- //Itterate all buckets
- foreach (IBlobCacheBucket bucket in table)
- {
- InitMethod?.Invoke(bucket.Id);
- }
- }
-
- void IDisposable.Dispose()
- {
- //Assembly loader will dispose the type, we can just signal a detach
-
- OnServiceDetatch?.Invoke();
- }
-
- [MethodImpl(MethodImplOptions.AggressiveInlining)]
- bool IPersistantCacheStore.OnCacheMiss(uint bucketId, string key, IMemoryCacheEntryFactory factory, out CacheEntry entry)
- {
- return store.OnCacheMiss(bucketId, key, factory, out entry);
- }
-
- [MethodImpl(MethodImplOptions.AggressiveInlining)]
- void IPersistantCacheStore.OnEntryDeleted(uint bucketId, string key) => store.OnEntryDeleted(bucketId, key);
-
- [MethodImpl(MethodImplOptions.AggressiveInlining)]
- void IPersistantCacheStore.OnEntryEvicted(uint bucketId, string key, in CacheEntry entry) => store.OnEntryEvicted(bucketId, key, in entry);
- }
- }
-}
diff --git a/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs b/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs
index 167a7e9..8352635 100644
--- a/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs
+++ b/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs
@@ -31,10 +31,8 @@ using System.Collections.Generic;
using VNLib.Hashing;
using VNLib.Net.Http;
-using VNLib.Utils.Async;
using VNLib.Utils.Memory;
using VNLib.Utils.Logging;
-using VNLib.Utils.Extensions;
using VNLib.Data.Caching;
using VNLib.Data.Caching.Extensions;
using VNLib.Hashing.IdentityUtility;
@@ -49,21 +47,21 @@ using VNLib.Plugins.Essentials.Extensions;
using VNLib.Plugins.Extensions.Loading.Routing;
using VNLib.Data.Caching.ObjectCache.Server.Distribution;
-namespace VNLib.Data.Caching.ObjectCache.Server
+
+namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
{
[ConfigurationName("connect_endpoint")]
- internal sealed class ConnectEndpoint : ResourceEndpointBase, IAsyncBackgroundWork
+ internal sealed class ConnectEndpoint : ResourceEndpointBase
{
private static readonly TimeSpan AuthTokenExpiration = TimeSpan.FromSeconds(30);
- private readonly CacheNodeConfiguration NodeConfiguration;
+ private readonly NodeConfig NodeConfiguration;
private readonly ICacheEventQueueManager PubSubManager;
private readonly IPeerMonitor Peers;
private readonly BlobCacheListener Store;
- private readonly CacheAuthKeyStore KeyStore;
private readonly bool VerifyIp;
private readonly string AudienceLocalServerId;
@@ -94,8 +92,6 @@ namespace VNLib.Data.Caching.ObjectCache.Server
InitPathAndLog(path, plugin.Log);
- KeyStore = new(plugin);
-
//Check for ip-verification flag
VerifyIp = config.TryGetValue("verify_ip", out JsonElement vIpEl) && vIpEl.GetBoolean();
@@ -103,7 +99,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server
PubSubManager = plugin.GetOrCreateSingleton<CacheEventQueueManager>();
//Get node configuration
- NodeConfiguration = plugin.GetOrCreateSingleton<NodeConfig>().Config;
+ NodeConfiguration = plugin.GetOrCreateSingleton<NodeConfig>();
//Get peer monitor
Peers = plugin.GetOrCreateSingleton<CachePeerMonitor>();
@@ -119,9 +115,6 @@ namespace VNLib.Data.Caching.ObjectCache.Server
* know client tokens belong to us when singed by the same key
*/
AudienceLocalServerId = Guid.NewGuid().ToString("N");
-
- //Schedule the queue worker to be run
- _ = plugin.ObserveWork(this, 100);
}
@@ -142,7 +135,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server
* received the messages properly
*/
- protected override async ValueTask<VfReturnType> GetAsync(HttpEntity entity)
+ protected override VfReturnType Get(HttpEntity entity)
{
//Parse jwt from authoriation
string? jwtAuth = entity.Server.Headers[HttpRequestHeader.Authorization];
@@ -161,22 +154,18 @@ namespace VNLib.Data.Caching.ObjectCache.Server
{
bool verified = false;
- //Get the client public key certificate to verify the client's message
- using(ReadOnlyJsonWebKey cert = await KeyStore.GetClientPublicKeyAsync())
+ //verify signature for client
+ if (NodeConfiguration.KeyStore.VerifyJwt(jwt))
{
- //verify signature for client
- if (jwt.VerifyFromJwk(cert))
- {
- verified = true;
- }
- //May be signed by a cache server
- else
- {
- //Set peer and verified flag since the another cache server signed the request
- isPeer = verified = NodeConfiguration.VerifyCache(jwt);
- }
+ verified = true;
+ }
+ //May be signed by a cache server
+ else
+ {
+ //Set peer and verified flag since the another cache server signed the request
+ isPeer = verified = NodeConfiguration.KeyStore.VerifyCachePeer(jwt);
}
-
+
//Check flag
if (!verified)
{
@@ -204,7 +193,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server
//Verified, now we can create an auth message with a short expiration
using JsonWebToken auth = new();
- auth.WriteHeader(NodeConfiguration.GetJwtHeader());
+ auth.WriteHeader(NodeConfiguration.KeyStore.GetJwtHeader());
auth.InitPayloadClaim()
.AddClaim("aud", AudienceLocalServerId)
.AddClaim("exp", entity.RequestedTimeUtc.Add(AuthTokenExpiration).ToUnixTimeSeconds())
@@ -223,14 +212,14 @@ namespace VNLib.Data.Caching.ObjectCache.Server
.CommitClaims();
//Sign the auth message from our private key
- NodeConfiguration.SignJwt(auth);
+ NodeConfiguration.KeyStore.SignJwt(auth);
//Close response
entity.CloseResponse(HttpStatusCode.OK, ContentType.Text, auth.DataBuffer);
return VfReturnType.VirtualSkip;
}
- protected override async ValueTask<VfReturnType> WebsocketRequestedAsync(HttpEntity entity)
+ protected override VfReturnType WebsocketRequested(HttpEntity entity)
{
//Parse jwt from authorization
string? jwtAuth = entity.Server.Headers[HttpRequestHeader.Authorization];
@@ -251,13 +240,13 @@ namespace VNLib.Data.Caching.ObjectCache.Server
}
string? nodeId = null;
- ICachePeerAdvertisment? discoveryAd = null;
+ ICacheNodeAdvertisment? discoveryAd = null;
//Parse jwt
using (JsonWebToken jwt = JsonWebToken.Parse(jwtAuth))
{
//verify signature against the cache public key, since this server must have signed it
- if (!NodeConfiguration.VerifyCache(jwt))
+ if (!NodeConfiguration.KeyStore.VerifyCachePeer(jwt))
{
entity.CloseResponse(HttpStatusCode.Unauthorized);
return VfReturnType.VirtualSkip;
@@ -313,7 +302,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server
if (isPeer)
{
//Verify token signature against a fellow cache public key
- if (!NodeConfiguration.VerifyUpgradeToken(clientSignature, jwtAuth))
+ if (!NodeConfiguration.KeyStore.VerifyUpgradeToken(clientSignature, jwtAuth))
{
entity.CloseResponse(HttpStatusCode.Unauthorized);
return VfReturnType.VirtualSkip;
@@ -325,16 +314,13 @@ namespace VNLib.Data.Caching.ObjectCache.Server
//Verify the node advertisement header and publish it
if (!string.IsNullOrWhiteSpace(discoveryHeader))
{
- discoveryAd = NodeConfiguration.VerifyPeerAdvertisment(discoveryHeader);
+ discoveryAd = NodeConfiguration.KeyStore.VerifyPeerAdvertisment(discoveryHeader);
}
}
else
{
//Not a peer, so verify against the client's public key
- using ReadOnlyJsonWebKey clientPub = await KeyStore.GetClientPublicKeyAsync();
-
- //Verify token signature
- if (!FBMDataCacheExtensions.VerifyUpgradeToken(clientSignature, jwtAuth, clientPub))
+ if (!NodeConfiguration.KeyStore.VerifyUpgradeToken(clientSignature, jwtAuth))
{
entity.CloseResponse(HttpStatusCode.Unauthorized);
return VfReturnType.VirtualSkip;
@@ -426,7 +412,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server
if (!string.IsNullOrWhiteSpace(state.NodeId))
{
//Get the event queue for the current node
- AsyncQueue<ChangeEvent> queue = PubSubManager.Subscribe(state);
+ IPeerEventQueue queue = PubSubManager.Subscribe(state);
try
{
@@ -470,44 +456,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server
Log.Debug("Server websocket exited");
}
-
-
- //Background worker to process event queue items
- async Task IAsyncBackgroundWork.DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken)
- {
- const int accumulatorSize = 64;
-
- try
- {
- //Accumulator for events
- ChangeEvent[] accumulator = new ChangeEvent[accumulatorSize];
- int ptr = 0;
-
- //Listen for changes
- while (true)
- {
- //Wait for next event
- accumulator[ptr++] = await Store.EventQueue.DequeueAsync(exitToken);
-
- //try to accumulate more events until we can't anymore
- while (Store.EventQueue.TryDequeue(out ChangeEvent? ev) && ptr < accumulatorSize)
- {
- accumulator[ptr++] = ev;
- }
-
- //Publish all events to subscribers
- PubSubManager.PublishMultiple(accumulator.AsSpan(0, ptr));
-
- //Reset pointer
- ptr = 0;
- }
- }
- catch (OperationCanceledException)
- {
- //Normal exit
- pluginLog.Debug("Change queue listener worker exited");
- }
- }
+
private class WsUserState : ICachePeer
{
@@ -516,7 +465,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server
public int MaxMessageSize { get; init; }
public int MaxResponseBufferSize { get; init; }
public string? NodeId { get; init; }
- public ICachePeerAdvertisment? Advertisment { get; init; }
+ public ICacheNodeAdvertisment? Advertisment { get; init; }
public override string ToString()
{
diff --git a/plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs b/plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs
index 670d624..90ffca0 100644
--- a/plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs
+++ b/plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs
@@ -23,8 +23,8 @@
*/
using System;
-using System.Linq;
using System.Net;
+using System.Linq;
using System.Text.Json;
using System.Threading.Tasks;
@@ -76,14 +76,12 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
using(JsonWebToken jwt = JsonWebToken.Parse(authToken))
{
//try to verify against cache node first
- if (!Config.Config.VerifyCache(jwt))
+ if (!Config.KeyStore.VerifyCachePeer(jwt))
{
//failed...
//try to verify against client key
- using ReadOnlyJsonWebKey clientPub = await Config.KeyStore.GetClientPublicKeyAsync();
-
- if (!jwt.VerifyFromJwk(clientPub))
+ if (!Config.KeyStore.VerifyJwt(jwt))
{
//invalid token
entity.CloseResponse(HttpStatusCode.Unauthorized);
@@ -97,7 +95,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
}
//Valid key, get peer list to send to client
- ICachePeerAdvertisment[] peers = PeerMonitor.GetAllPeers()
+ ICacheNodeAdvertisment[] peers = PeerMonitor.GetAllPeers()
.Where(static p => p.Advertisment != null)
.Select(static p => p.Advertisment!)
.ToArray();
@@ -106,7 +104,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
using JsonWebToken response = new();
//set header from cache config
- response.WriteHeader(Config.Config.GetJwtHeader());
+ response.WriteHeader(Config.KeyStore.GetJwtHeader());
response.InitPayloadClaim()
.AddClaim("iss", Config.Config.NodeId)
@@ -119,7 +117,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
.CommitClaims();
//Sign the response
- Config.Config.SignJwt(response);
+ Config.KeyStore.SignJwt(response);
//Send response to client
entity.CloseResponse(HttpStatusCode.OK, Net.Http.ContentType.Text, response.DataBuffer);