aboutsummaryrefslogtreecommitdiff
path: root/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs')
-rw-r--r--plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs105
1 files changed, 27 insertions, 78 deletions
diff --git a/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs b/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs
index 167a7e9..8352635 100644
--- a/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs
+++ b/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs
@@ -31,10 +31,8 @@ using System.Collections.Generic;
using VNLib.Hashing;
using VNLib.Net.Http;
-using VNLib.Utils.Async;
using VNLib.Utils.Memory;
using VNLib.Utils.Logging;
-using VNLib.Utils.Extensions;
using VNLib.Data.Caching;
using VNLib.Data.Caching.Extensions;
using VNLib.Hashing.IdentityUtility;
@@ -49,21 +47,21 @@ using VNLib.Plugins.Essentials.Extensions;
using VNLib.Plugins.Extensions.Loading.Routing;
using VNLib.Data.Caching.ObjectCache.Server.Distribution;
-namespace VNLib.Data.Caching.ObjectCache.Server
+
+namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
{
[ConfigurationName("connect_endpoint")]
- internal sealed class ConnectEndpoint : ResourceEndpointBase, IAsyncBackgroundWork
+ internal sealed class ConnectEndpoint : ResourceEndpointBase
{
private static readonly TimeSpan AuthTokenExpiration = TimeSpan.FromSeconds(30);
- private readonly CacheNodeConfiguration NodeConfiguration;
+ private readonly NodeConfig NodeConfiguration;
private readonly ICacheEventQueueManager PubSubManager;
private readonly IPeerMonitor Peers;
private readonly BlobCacheListener Store;
- private readonly CacheAuthKeyStore KeyStore;
private readonly bool VerifyIp;
private readonly string AudienceLocalServerId;
@@ -94,8 +92,6 @@ namespace VNLib.Data.Caching.ObjectCache.Server
InitPathAndLog(path, plugin.Log);
- KeyStore = new(plugin);
-
//Check for ip-verification flag
VerifyIp = config.TryGetValue("verify_ip", out JsonElement vIpEl) && vIpEl.GetBoolean();
@@ -103,7 +99,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server
PubSubManager = plugin.GetOrCreateSingleton<CacheEventQueueManager>();
//Get node configuration
- NodeConfiguration = plugin.GetOrCreateSingleton<NodeConfig>().Config;
+ NodeConfiguration = plugin.GetOrCreateSingleton<NodeConfig>();
//Get peer monitor
Peers = plugin.GetOrCreateSingleton<CachePeerMonitor>();
@@ -119,9 +115,6 @@ namespace VNLib.Data.Caching.ObjectCache.Server
* know client tokens belong to us when singed by the same key
*/
AudienceLocalServerId = Guid.NewGuid().ToString("N");
-
- //Schedule the queue worker to be run
- _ = plugin.ObserveWork(this, 100);
}
@@ -142,7 +135,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server
* received the messages properly
*/
- protected override async ValueTask<VfReturnType> GetAsync(HttpEntity entity)
+ protected override VfReturnType Get(HttpEntity entity)
{
//Parse jwt from authoriation
string? jwtAuth = entity.Server.Headers[HttpRequestHeader.Authorization];
@@ -161,22 +154,18 @@ namespace VNLib.Data.Caching.ObjectCache.Server
{
bool verified = false;
- //Get the client public key certificate to verify the client's message
- using(ReadOnlyJsonWebKey cert = await KeyStore.GetClientPublicKeyAsync())
+ //verify signature for client
+ if (NodeConfiguration.KeyStore.VerifyJwt(jwt))
{
- //verify signature for client
- if (jwt.VerifyFromJwk(cert))
- {
- verified = true;
- }
- //May be signed by a cache server
- else
- {
- //Set peer and verified flag since the another cache server signed the request
- isPeer = verified = NodeConfiguration.VerifyCache(jwt);
- }
+ verified = true;
+ }
+ //May be signed by a cache server
+ else
+ {
+ //Set peer and verified flag since the another cache server signed the request
+ isPeer = verified = NodeConfiguration.KeyStore.VerifyCachePeer(jwt);
}
-
+
//Check flag
if (!verified)
{
@@ -204,7 +193,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server
//Verified, now we can create an auth message with a short expiration
using JsonWebToken auth = new();
- auth.WriteHeader(NodeConfiguration.GetJwtHeader());
+ auth.WriteHeader(NodeConfiguration.KeyStore.GetJwtHeader());
auth.InitPayloadClaim()
.AddClaim("aud", AudienceLocalServerId)
.AddClaim("exp", entity.RequestedTimeUtc.Add(AuthTokenExpiration).ToUnixTimeSeconds())
@@ -223,14 +212,14 @@ namespace VNLib.Data.Caching.ObjectCache.Server
.CommitClaims();
//Sign the auth message from our private key
- NodeConfiguration.SignJwt(auth);
+ NodeConfiguration.KeyStore.SignJwt(auth);
//Close response
entity.CloseResponse(HttpStatusCode.OK, ContentType.Text, auth.DataBuffer);
return VfReturnType.VirtualSkip;
}
- protected override async ValueTask<VfReturnType> WebsocketRequestedAsync(HttpEntity entity)
+ protected override VfReturnType WebsocketRequested(HttpEntity entity)
{
//Parse jwt from authorization
string? jwtAuth = entity.Server.Headers[HttpRequestHeader.Authorization];
@@ -251,13 +240,13 @@ namespace VNLib.Data.Caching.ObjectCache.Server
}
string? nodeId = null;
- ICachePeerAdvertisment? discoveryAd = null;
+ ICacheNodeAdvertisment? discoveryAd = null;
//Parse jwt
using (JsonWebToken jwt = JsonWebToken.Parse(jwtAuth))
{
//verify signature against the cache public key, since this server must have signed it
- if (!NodeConfiguration.VerifyCache(jwt))
+ if (!NodeConfiguration.KeyStore.VerifyCachePeer(jwt))
{
entity.CloseResponse(HttpStatusCode.Unauthorized);
return VfReturnType.VirtualSkip;
@@ -313,7 +302,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server
if (isPeer)
{
//Verify token signature against a fellow cache public key
- if (!NodeConfiguration.VerifyUpgradeToken(clientSignature, jwtAuth))
+ if (!NodeConfiguration.KeyStore.VerifyUpgradeToken(clientSignature, jwtAuth))
{
entity.CloseResponse(HttpStatusCode.Unauthorized);
return VfReturnType.VirtualSkip;
@@ -325,16 +314,13 @@ namespace VNLib.Data.Caching.ObjectCache.Server
//Verify the node advertisement header and publish it
if (!string.IsNullOrWhiteSpace(discoveryHeader))
{
- discoveryAd = NodeConfiguration.VerifyPeerAdvertisment(discoveryHeader);
+ discoveryAd = NodeConfiguration.KeyStore.VerifyPeerAdvertisment(discoveryHeader);
}
}
else
{
//Not a peer, so verify against the client's public key
- using ReadOnlyJsonWebKey clientPub = await KeyStore.GetClientPublicKeyAsync();
-
- //Verify token signature
- if (!FBMDataCacheExtensions.VerifyUpgradeToken(clientSignature, jwtAuth, clientPub))
+ if (!NodeConfiguration.KeyStore.VerifyUpgradeToken(clientSignature, jwtAuth))
{
entity.CloseResponse(HttpStatusCode.Unauthorized);
return VfReturnType.VirtualSkip;
@@ -426,7 +412,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server
if (!string.IsNullOrWhiteSpace(state.NodeId))
{
//Get the event queue for the current node
- AsyncQueue<ChangeEvent> queue = PubSubManager.Subscribe(state);
+ IPeerEventQueue queue = PubSubManager.Subscribe(state);
try
{
@@ -470,44 +456,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server
Log.Debug("Server websocket exited");
}
-
-
- //Background worker to process event queue items
- async Task IAsyncBackgroundWork.DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken)
- {
- const int accumulatorSize = 64;
-
- try
- {
- //Accumulator for events
- ChangeEvent[] accumulator = new ChangeEvent[accumulatorSize];
- int ptr = 0;
-
- //Listen for changes
- while (true)
- {
- //Wait for next event
- accumulator[ptr++] = await Store.EventQueue.DequeueAsync(exitToken);
-
- //try to accumulate more events until we can't anymore
- while (Store.EventQueue.TryDequeue(out ChangeEvent? ev) && ptr < accumulatorSize)
- {
- accumulator[ptr++] = ev;
- }
-
- //Publish all events to subscribers
- PubSubManager.PublishMultiple(accumulator.AsSpan(0, ptr));
-
- //Reset pointer
- ptr = 0;
- }
- }
- catch (OperationCanceledException)
- {
- //Normal exit
- pluginLog.Debug("Change queue listener worker exited");
- }
- }
+
private class WsUserState : ICachePeer
{
@@ -516,7 +465,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server
public int MaxMessageSize { get; init; }
public int MaxResponseBufferSize { get; init; }
public string? NodeId { get; init; }
- public ICachePeerAdvertisment? Advertisment { get; init; }
+ public ICacheNodeAdvertisment? Advertisment { get; init; }
public override string ToString()
{