diff options
Diffstat (limited to 'plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs')
-rw-r--r-- | plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs | 105 |
1 files changed, 27 insertions, 78 deletions
diff --git a/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs b/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs index 167a7e9..8352635 100644 --- a/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs +++ b/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs @@ -31,10 +31,8 @@ using System.Collections.Generic; using VNLib.Hashing; using VNLib.Net.Http; -using VNLib.Utils.Async; using VNLib.Utils.Memory; using VNLib.Utils.Logging; -using VNLib.Utils.Extensions; using VNLib.Data.Caching; using VNLib.Data.Caching.Extensions; using VNLib.Hashing.IdentityUtility; @@ -49,21 +47,21 @@ using VNLib.Plugins.Essentials.Extensions; using VNLib.Plugins.Extensions.Loading.Routing; using VNLib.Data.Caching.ObjectCache.Server.Distribution; -namespace VNLib.Data.Caching.ObjectCache.Server + +namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints { [ConfigurationName("connect_endpoint")] - internal sealed class ConnectEndpoint : ResourceEndpointBase, IAsyncBackgroundWork + internal sealed class ConnectEndpoint : ResourceEndpointBase { private static readonly TimeSpan AuthTokenExpiration = TimeSpan.FromSeconds(30); - private readonly CacheNodeConfiguration NodeConfiguration; + private readonly NodeConfig NodeConfiguration; private readonly ICacheEventQueueManager PubSubManager; private readonly IPeerMonitor Peers; private readonly BlobCacheListener Store; - private readonly CacheAuthKeyStore KeyStore; private readonly bool VerifyIp; private readonly string AudienceLocalServerId; @@ -94,8 +92,6 @@ namespace VNLib.Data.Caching.ObjectCache.Server InitPathAndLog(path, plugin.Log); - KeyStore = new(plugin); - //Check for ip-verification flag VerifyIp = config.TryGetValue("verify_ip", out JsonElement vIpEl) && vIpEl.GetBoolean(); @@ -103,7 +99,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server PubSubManager = plugin.GetOrCreateSingleton<CacheEventQueueManager>(); //Get node configuration - NodeConfiguration = plugin.GetOrCreateSingleton<NodeConfig>().Config; + NodeConfiguration = plugin.GetOrCreateSingleton<NodeConfig>(); //Get peer monitor Peers = plugin.GetOrCreateSingleton<CachePeerMonitor>(); @@ -119,9 +115,6 @@ namespace VNLib.Data.Caching.ObjectCache.Server * know client tokens belong to us when singed by the same key */ AudienceLocalServerId = Guid.NewGuid().ToString("N"); - - //Schedule the queue worker to be run - _ = plugin.ObserveWork(this, 100); } @@ -142,7 +135,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server * received the messages properly */ - protected override async ValueTask<VfReturnType> GetAsync(HttpEntity entity) + protected override VfReturnType Get(HttpEntity entity) { //Parse jwt from authoriation string? jwtAuth = entity.Server.Headers[HttpRequestHeader.Authorization]; @@ -161,22 +154,18 @@ namespace VNLib.Data.Caching.ObjectCache.Server { bool verified = false; - //Get the client public key certificate to verify the client's message - using(ReadOnlyJsonWebKey cert = await KeyStore.GetClientPublicKeyAsync()) + //verify signature for client + if (NodeConfiguration.KeyStore.VerifyJwt(jwt)) { - //verify signature for client - if (jwt.VerifyFromJwk(cert)) - { - verified = true; - } - //May be signed by a cache server - else - { - //Set peer and verified flag since the another cache server signed the request - isPeer = verified = NodeConfiguration.VerifyCache(jwt); - } + verified = true; + } + //May be signed by a cache server + else + { + //Set peer and verified flag since the another cache server signed the request + isPeer = verified = NodeConfiguration.KeyStore.VerifyCachePeer(jwt); } - + //Check flag if (!verified) { @@ -204,7 +193,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server //Verified, now we can create an auth message with a short expiration using JsonWebToken auth = new(); - auth.WriteHeader(NodeConfiguration.GetJwtHeader()); + auth.WriteHeader(NodeConfiguration.KeyStore.GetJwtHeader()); auth.InitPayloadClaim() .AddClaim("aud", AudienceLocalServerId) .AddClaim("exp", entity.RequestedTimeUtc.Add(AuthTokenExpiration).ToUnixTimeSeconds()) @@ -223,14 +212,14 @@ namespace VNLib.Data.Caching.ObjectCache.Server .CommitClaims(); //Sign the auth message from our private key - NodeConfiguration.SignJwt(auth); + NodeConfiguration.KeyStore.SignJwt(auth); //Close response entity.CloseResponse(HttpStatusCode.OK, ContentType.Text, auth.DataBuffer); return VfReturnType.VirtualSkip; } - protected override async ValueTask<VfReturnType> WebsocketRequestedAsync(HttpEntity entity) + protected override VfReturnType WebsocketRequested(HttpEntity entity) { //Parse jwt from authorization string? jwtAuth = entity.Server.Headers[HttpRequestHeader.Authorization]; @@ -251,13 +240,13 @@ namespace VNLib.Data.Caching.ObjectCache.Server } string? nodeId = null; - ICachePeerAdvertisment? discoveryAd = null; + ICacheNodeAdvertisment? discoveryAd = null; //Parse jwt using (JsonWebToken jwt = JsonWebToken.Parse(jwtAuth)) { //verify signature against the cache public key, since this server must have signed it - if (!NodeConfiguration.VerifyCache(jwt)) + if (!NodeConfiguration.KeyStore.VerifyCachePeer(jwt)) { entity.CloseResponse(HttpStatusCode.Unauthorized); return VfReturnType.VirtualSkip; @@ -313,7 +302,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server if (isPeer) { //Verify token signature against a fellow cache public key - if (!NodeConfiguration.VerifyUpgradeToken(clientSignature, jwtAuth)) + if (!NodeConfiguration.KeyStore.VerifyUpgradeToken(clientSignature, jwtAuth)) { entity.CloseResponse(HttpStatusCode.Unauthorized); return VfReturnType.VirtualSkip; @@ -325,16 +314,13 @@ namespace VNLib.Data.Caching.ObjectCache.Server //Verify the node advertisement header and publish it if (!string.IsNullOrWhiteSpace(discoveryHeader)) { - discoveryAd = NodeConfiguration.VerifyPeerAdvertisment(discoveryHeader); + discoveryAd = NodeConfiguration.KeyStore.VerifyPeerAdvertisment(discoveryHeader); } } else { //Not a peer, so verify against the client's public key - using ReadOnlyJsonWebKey clientPub = await KeyStore.GetClientPublicKeyAsync(); - - //Verify token signature - if (!FBMDataCacheExtensions.VerifyUpgradeToken(clientSignature, jwtAuth, clientPub)) + if (!NodeConfiguration.KeyStore.VerifyUpgradeToken(clientSignature, jwtAuth)) { entity.CloseResponse(HttpStatusCode.Unauthorized); return VfReturnType.VirtualSkip; @@ -426,7 +412,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server if (!string.IsNullOrWhiteSpace(state.NodeId)) { //Get the event queue for the current node - AsyncQueue<ChangeEvent> queue = PubSubManager.Subscribe(state); + IPeerEventQueue queue = PubSubManager.Subscribe(state); try { @@ -470,44 +456,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server Log.Debug("Server websocket exited"); } - - - //Background worker to process event queue items - async Task IAsyncBackgroundWork.DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken) - { - const int accumulatorSize = 64; - - try - { - //Accumulator for events - ChangeEvent[] accumulator = new ChangeEvent[accumulatorSize]; - int ptr = 0; - - //Listen for changes - while (true) - { - //Wait for next event - accumulator[ptr++] = await Store.EventQueue.DequeueAsync(exitToken); - - //try to accumulate more events until we can't anymore - while (Store.EventQueue.TryDequeue(out ChangeEvent? ev) && ptr < accumulatorSize) - { - accumulator[ptr++] = ev; - } - - //Publish all events to subscribers - PubSubManager.PublishMultiple(accumulator.AsSpan(0, ptr)); - - //Reset pointer - ptr = 0; - } - } - catch (OperationCanceledException) - { - //Normal exit - pluginLog.Debug("Change queue listener worker exited"); - } - } + private class WsUserState : ICachePeer { @@ -516,7 +465,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server public int MaxMessageSize { get; init; } public int MaxResponseBufferSize { get; init; } public string? NodeId { get; init; } - public ICachePeerAdvertisment? Advertisment { get; init; } + public ICacheNodeAdvertisment? Advertisment { get; init; } public override string ToString() { |