aboutsummaryrefslogtreecommitdiff
path: root/Plugins/SessionCacheServer/Endpoints
diff options
context:
space:
mode:
Diffstat (limited to 'Plugins/SessionCacheServer/Endpoints')
-rw-r--r--Plugins/SessionCacheServer/Endpoints/BrokerHeartBeat.cs130
-rw-r--r--Plugins/SessionCacheServer/Endpoints/ConnectEndpoint.cs399
2 files changed, 0 insertions, 529 deletions
diff --git a/Plugins/SessionCacheServer/Endpoints/BrokerHeartBeat.cs b/Plugins/SessionCacheServer/Endpoints/BrokerHeartBeat.cs
deleted file mode 100644
index bd1233e..0000000
--- a/Plugins/SessionCacheServer/Endpoints/BrokerHeartBeat.cs
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
-* Copyright (c) 2022 Vaughn Nugent
-*
-* Library: VNLib
-* Package: ObjectCacheServer
-* File: BrokerHeartBeat.cs
-*
-* BrokerHeartBeat.cs is part of ObjectCacheServer which is part of the larger
-* VNLib collection of libraries and utilities.
-*
-* ObjectCacheServer is free software: you can redistribute it and/or modify
-* it under the terms of the GNU Affero General Public License as
-* published by the Free Software Foundation, either version 3 of the
-* License, or (at your option) any later version.
-*
-* ObjectCacheServer is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-* GNU Affero General Public License for more details.
-*
-* You should have received a copy of the GNU Affero General Public License
-* along with this program. If not, see https://www.gnu.org/licenses/.
-*/
-
-using System;
-using System.Net;
-using System.Linq;
-using System.Text.Json;
-using System.Threading;
-using System.Threading.Tasks;
-using System.Collections.Generic;
-
-using VNLib.Hashing.IdentityUtility;
-using VNLib.Plugins.Essentials.Endpoints;
-using VNLib.Plugins.Essentials.Extensions;
-using VNLib.Plugins.Extensions.Loading;
-
-namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints
-{
- internal sealed class BrokerHeartBeat : ResourceEndpointBase
- {
- public override string Path => "/heartbeat";
-
- private readonly Func<string> Token;
- private readonly ManualResetEvent KeepaliveSet;
- private readonly Task<IPAddress[]> BrokerIpList;
- private readonly PluginBase Pbase;
-
- protected override ProtectionSettings EndpointProtectionSettings { get; } = new()
- {
- DisableBrowsersOnly = true,
- DisableSessionsRequired = true,
- DisableVerifySessionCors = true
- };
-
- public BrokerHeartBeat(Func<string> token, ManualResetEvent keepaliveSet, Uri brokerUri, PluginBase pbase)
- {
- Token = token;
- KeepaliveSet = keepaliveSet;
- BrokerIpList = Dns.GetHostAddressesAsync(brokerUri.DnsSafeHost);
-
- this.Pbase = pbase;
- }
-
- private async Task<ReadOnlyJsonWebKey> GetBrokerPubAsync()
- {
- return await Pbase.TryGetSecretAsync("broker_public_key").ToJsonWebKey() ?? throw new KeyNotFoundException("Missing required secret : broker_public_key");
- }
-
- protected override async ValueTask<VfReturnType> GetAsync(HttpEntity entity)
- {
- //If-not loopback then verify server address
- if (!entity.Server.IsLoopBack())
- {
- //Load and verify the broker's ip address matches with an address we have stored
- IPAddress[] addresses = await BrokerIpList;
- if (!addresses.Contains(entity.TrustedRemoteIp))
- {
- //Token invalid
- entity.CloseResponse(HttpStatusCode.Forbidden);
- return VfReturnType.VirtualSkip;
- }
- }
- //Get the authorization jwt
- string? jwtAuth = entity.Server.Headers[HttpRequestHeader.Authorization];
-
- if (string.IsNullOrWhiteSpace(jwtAuth))
- {
- //Token invalid
- entity.CloseResponse(HttpStatusCode.Forbidden);
- return VfReturnType.VirtualSkip;
- }
-
- //Parse the jwt
- using JsonWebToken jwt = JsonWebToken.Parse(jwtAuth);
-
- //Verify the jwt using the broker's public key certificate
- using (ReadOnlyJsonWebKey cert = await GetBrokerPubAsync())
- {
- //Verify the jwt
- if (!jwt.VerifyFromJwk(cert))
- {
- //Token invalid
- entity.CloseResponse(HttpStatusCode.Forbidden);
- return VfReturnType.VirtualSkip;
- }
- }
-
- string? auth;
- //Recover the auth token from the jwt
- using (JsonDocument doc = jwt.GetPayload())
- {
- auth = doc.RootElement.GetProperty("token").GetString();
- }
-
- //Verify token
- if(Token().Equals(auth, StringComparison.Ordinal))
- {
- //Signal keepalive
- KeepaliveSet.Set();
- entity.CloseResponse(HttpStatusCode.OK);
- return VfReturnType.VirtualSkip;
- }
-
- //Token invalid
- entity.CloseResponse(HttpStatusCode.Forbidden);
- return VfReturnType.VirtualSkip;
- }
- }
-}
diff --git a/Plugins/SessionCacheServer/Endpoints/ConnectEndpoint.cs b/Plugins/SessionCacheServer/Endpoints/ConnectEndpoint.cs
deleted file mode 100644
index 2fe0994..0000000
--- a/Plugins/SessionCacheServer/Endpoints/ConnectEndpoint.cs
+++ /dev/null
@@ -1,399 +0,0 @@
-/*
-* Copyright (c) 2022 Vaughn Nugent
-*
-* Library: VNLib
-* Package: ObjectCacheServer
-* File: ConnectEndpoint.cs
-*
-* ConnectEndpoint.cs is part of ObjectCacheServer which is part of the larger
-* VNLib collection of libraries and utilities.
-*
-* ObjectCacheServer is free software: you can redistribute it and/or modify
-* it under the terms of the GNU Affero General Public License as
-* published by the Free Software Foundation, either version 3 of the
-* License, or (at your option) any later version.
-*
-* ObjectCacheServer is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-* GNU Affero General Public License for more details.
-*
-* You should have received a copy of the GNU Affero General Public License
-* along with this program. If not, see https://www.gnu.org/licenses/.
-*/
-
-using System;
-using System.Net;
-using System.Text.Json;
-using System.Threading;
-using System.Threading.Tasks;
-using System.Threading.Channels;
-using System.Collections.Generic;
-using System.Collections.Concurrent;
-
-using VNLib.Net.Http;
-using VNLib.Hashing;
-using VNLib.Utils.Async;
-using VNLib.Utils.Logging;
-using VNLib.Hashing.IdentityUtility;
-using VNLib.Net.Messaging.FBM;
-using VNLib.Net.Messaging.FBM.Client;
-using VNLib.Net.Messaging.FBM.Server;
-using VNLib.Data.Caching.ObjectCache;
-using VNLib.Plugins.Extensions.Loading;
-using VNLib.Plugins.Essentials.Endpoints;
-using VNLib.Plugins.Essentials.Extensions;
-
-
-namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints
-{
- internal sealed class ConnectEndpoint : ResourceEndpointBase
- {
- const int MAX_RECV_BUF_SIZE = 1000 * 1024;
- const int MIN_RECV_BUF_SIZE = 8 * 1024;
- const int MAX_HEAD_BUF_SIZE = 2048;
- const int MIN_MESSAGE_SIZE = 10 * 1024;
- const int MAX_MESSAGE_SIZE = 1000 * 1024;
- const int MIN_HEAD_BUF_SIZE = 128;
- const int MAX_EVENT_QUEUE_SIZE = 10000;
- const int MAX_RESPONSE_BUFFER_SIZE = 10 * 1024;
-
- private static readonly TimeSpan AuthTokenExpiration = TimeSpan.FromSeconds(30);
-
- private readonly string AudienceLocalServerId;
- private readonly ObjectCacheStore Store;
- private readonly PluginBase Pbase;
-
- private readonly ConcurrentDictionary<string, AsyncQueue<ChangeEvent>> StatefulEventQueue;
-
- private uint _connectedClients;
-
- public uint ConnectedClients => _connectedClients;
-
- //Loosen up protection settings
- protected override ProtectionSettings EndpointProtectionSettings { get; } = new()
- {
- DisableBrowsersOnly = true,
- DisableSessionsRequired = true,
- DisableCrossSiteDenied = true
- };
-
- public ConnectEndpoint(string path, ObjectCacheStore store, PluginBase pbase)
- {
- InitPathAndLog(path, pbase.Log);
- Store = store;//Load client public key to verify signed messages
- Pbase = pbase;
-
- StatefulEventQueue = new(StringComparer.OrdinalIgnoreCase);
-
- //Start the queue worker
- _ = pbase.DeferTask(() => ChangeWorkerAsync(pbase.UnloadToken), 10);
-
- AudienceLocalServerId = Guid.NewGuid().ToString("N");
- }
-
- /*
- * Used as a client negotiation and verification request
- *
- * The token created during this request will be verified by the client
- * and is already verified by this server, will be passed back
- * via the authorization header during the websocket upgrade.
- *
- * This server must verify the authenticity of the returned token
- *
- * The tokens are very short lived as requests are intended to be made
- * directly after verification
- */
-
- protected override async ValueTask<VfReturnType> GetAsync(HttpEntity entity)
- {
- //Parse jwt from authoriation
- string? jwtAuth = entity.Server.Headers[HttpRequestHeader.Authorization];
- if (string.IsNullOrWhiteSpace(jwtAuth))
- {
- entity.CloseResponse(HttpStatusCode.Unauthorized);
- return VfReturnType.VirtualSkip;
- }
-
- string? nodeId = null;
- string? challenge = null;
- bool isPeer = false;
-
- // Parse jwt
- using (JsonWebToken jwt = JsonWebToken.Parse(jwtAuth))
- {
- bool verified = false;
-
- //Get the client public key certificate to verify the client's message
- using(ReadOnlyJsonWebKey cert = await GetClientPubAsync())
- {
- //verify signature for client
- if (jwt.VerifyFromJwk(cert))
- {
- verified = true;
- }
- //May be signed by a cahce server
- else
- {
- using ReadOnlyJsonWebKey cacheCert = await GetCachePubAsync();
-
- //Set peer and verified flag since the another cache server signed the request
- isPeer = verified = jwt.VerifyFromJwk(cacheCert);
- }
- }
-
- //Check flag
- if (!verified)
- {
- Log.Information("Client signature verification failed");
- entity.CloseResponse(HttpStatusCode.Unauthorized);
- return VfReturnType.VirtualSkip;
- }
-
- //Recover json body
- using JsonDocument doc = jwt.GetPayload();
- if (doc.RootElement.TryGetProperty("sub", out JsonElement servIdEl))
- {
- nodeId = servIdEl.GetString();
- }
- if (doc.RootElement.TryGetProperty("chl", out JsonElement challengeEl))
- {
- challenge = challengeEl.GetString();
- }
- }
-
- Log.Debug("Received negotiation request from node {node}", nodeId);
- //Verified, now we can create an auth message with a short expiration
- using JsonWebToken auth = new();
- //Sign the auth message from the cache certificate's private key
- using (ReadOnlyJsonWebKey cert = await GetCachePrivateKeyAsync())
- {
- auth.WriteHeader(cert.JwtHeader);
- auth.InitPayloadClaim()
- .AddClaim("aud", AudienceLocalServerId)
- .AddClaim("exp", DateTimeOffset.UtcNow.Add(AuthTokenExpiration).ToUnixTimeSeconds())
- .AddClaim("nonce", RandomHash.GetRandomBase32(8))
- .AddClaim("chl", challenge!)
- //Set the ispeer flag if the request was signed by a cache server
- .AddClaim("isPeer", isPeer)
- //Specify the server's node id if set
- .AddClaim("sub", nodeId!)
- //Add negotiaion args
- .AddClaim(FBMClient.REQ_HEAD_BUF_QUERY_ARG, MAX_HEAD_BUF_SIZE)
- .AddClaim(FBMClient.REQ_RECV_BUF_QUERY_ARG, MAX_RECV_BUF_SIZE)
- .AddClaim(FBMClient.REQ_MAX_MESS_QUERY_ARG, MAX_MESSAGE_SIZE)
- .CommitClaims();
-
- auth.SignFromJwk(cert);
- }
-
- //Close response
- entity.CloseResponse(HttpStatusCode.OK, ContentType.Text, auth.DataBuffer);
- return VfReturnType.VirtualSkip;
- }
-
- private async Task<ReadOnlyJsonWebKey> GetClientPubAsync()
- {
- return await Pbase.TryGetSecretAsync("client_public_key").ToJsonWebKey() ?? throw new KeyNotFoundException("Missing required secret : client_public_key");
- }
- private async Task<ReadOnlyJsonWebKey> GetCachePubAsync()
- {
- return await Pbase.TryGetSecretAsync("cache_public_key").ToJsonWebKey() ?? throw new KeyNotFoundException("Missing required secret : client_public_key");
- }
- private async Task<ReadOnlyJsonWebKey> GetCachePrivateKeyAsync()
- {
- return await Pbase.TryGetSecretAsync("cache_private_key").ToJsonWebKey() ?? throw new KeyNotFoundException("Missing required secret : client_public_key");
- }
-
- private async Task ChangeWorkerAsync(CancellationToken cancellation)
- {
- try
- {
- //Listen for changes
- while (true)
- {
- ChangeEvent ev = await Store.EventQueue.DequeueAsync(cancellation);
- //Add event to queues
- foreach (AsyncQueue<ChangeEvent> queue in StatefulEventQueue.Values)
- {
- if (!queue.TryEnque(ev))
- {
- Log.Debug("Listener queue has exeeded capacity, change events will be lost");
- }
- }
- }
- }
- catch (OperationCanceledException)
- { }
- catch (Exception ex)
- {
- Log.Error(ex);
- }
- }
-
- private class WsUserState
- {
- public int RecvBufferSize { get; init; }
- public int MaxHeaderBufferSize { get; init; }
- public int MaxMessageSize { get; init; }
- public int MaxResponseBufferSize { get; init; }
- public AsyncQueue<ChangeEvent>? SyncQueue { get; init; }
- }
-
- protected override async ValueTask<VfReturnType> WebsocketRequestedAsync(HttpEntity entity)
- {
- try
- {
- //Parse jwt from authorization
- string? jwtAuth = entity.Server.Headers[HttpRequestHeader.Authorization];
- if (string.IsNullOrWhiteSpace(jwtAuth))
- {
- entity.CloseResponse(HttpStatusCode.Unauthorized);
- return VfReturnType.VirtualSkip;
- }
-
- string? nodeId = null;
- //Parse jwt
- using (JsonWebToken jwt = JsonWebToken.Parse(jwtAuth))
- {
- //Get the client public key certificate to verify the client's message
- using (ReadOnlyJsonWebKey cert = await GetCachePubAsync())
- {
- //verify signature against the cache public key, since this server must have signed it
- if (!jwt.VerifyFromJwk(cert))
- {
- entity.CloseResponse(HttpStatusCode.Unauthorized);
- return VfReturnType.VirtualSkip;
- }
- }
-
- //Recover json body
- using JsonDocument doc = jwt.GetPayload();
-
- //Verify audience, expiration
-
- if (!doc.RootElement.TryGetProperty("aud", out JsonElement audEl) || !AudienceLocalServerId.Equals(audEl.GetString(), StringComparison.OrdinalIgnoreCase))
- {
- entity.CloseResponse(HttpStatusCode.Unauthorized);
- return VfReturnType.VirtualSkip;
- }
-
- if (!doc.RootElement.TryGetProperty("exp", out JsonElement expEl)
- || DateTimeOffset.FromUnixTimeSeconds(expEl.GetInt64()) < DateTimeOffset.UtcNow)
- {
- entity.CloseResponse(HttpStatusCode.Unauthorized);
- return VfReturnType.VirtualSkip;
- }
-
- //Check if the client is a peer
- bool isPeer = doc.RootElement.TryGetProperty("isPeer", out JsonElement isPeerEl) && isPeerEl.GetBoolean();
-
- //The node id is optional and stored in the 'sub' field, ignore if the client is not a peer
- if (isPeer && doc.RootElement.TryGetProperty("sub", out JsonElement servIdEl))
- {
- nodeId = servIdEl.GetString();
- }
- }
-
- //Get query config suggestions from the client
- string recvBufCmd = entity.QueryArgs[FBMClient.REQ_RECV_BUF_QUERY_ARG];
- string maxHeaderCharCmd = entity.QueryArgs[FBMClient.REQ_HEAD_BUF_QUERY_ARG];
- string maxMessageSizeCmd = entity.QueryArgs[FBMClient.REQ_MAX_MESS_QUERY_ARG];
-
- //Parse recv buffer size
- int recvBufSize = int.TryParse(recvBufCmd, out int rbs) ? rbs : MIN_RECV_BUF_SIZE;
- int maxHeadBufSize = int.TryParse(maxHeaderCharCmd, out int hbs) ? hbs : MIN_HEAD_BUF_SIZE;
- int maxMessageSize = int.TryParse(maxMessageSizeCmd, out int mxs) ? mxs : MIN_MESSAGE_SIZE;
-
- AsyncQueue<ChangeEvent>? nodeQueue = null;
- //The connection may be a caching server node, so get its node-id
- if (!string.IsNullOrWhiteSpace(nodeId))
- {
- /*
- * Store a new async queue, or get an old queue for the current node
- *
- * We should use a bounded queue and disacard LRU items, we also know
- * only a single writer is needed as the queue is processed on a single thread
- * and change events may be processed on mutliple threads.
- */
-
- BoundedChannelOptions queueOptions = new(MAX_EVENT_QUEUE_SIZE)
- {
- AllowSynchronousContinuations = true,
- SingleReader = false,
- SingleWriter = true,
- //Drop oldest item in queue if full
- FullMode = BoundedChannelFullMode.DropOldest,
- };
-
- _ = StatefulEventQueue.TryAdd(nodeId, new(queueOptions));
- //Get the queue
- nodeQueue = StatefulEventQueue[nodeId];
- }
-
- //Init new ws state object and clamp the suggested buffer sizes
- WsUserState state = new()
- {
- RecvBufferSize = Math.Clamp(recvBufSize, MIN_RECV_BUF_SIZE, MAX_RECV_BUF_SIZE),
- MaxHeaderBufferSize = Math.Clamp(maxHeadBufSize, MIN_HEAD_BUF_SIZE, MAX_HEAD_BUF_SIZE),
- MaxMessageSize = Math.Clamp(maxMessageSize, MIN_MESSAGE_SIZE, MAX_MESSAGE_SIZE),
- MaxResponseBufferSize = Math.Min(maxMessageSize, MAX_RESPONSE_BUFFER_SIZE),
- SyncQueue = nodeQueue
- };
-
- Log.Debug("Client recv buffer suggestion {recv}, header buffer size {head}, response buffer size {r}", recvBufCmd, maxHeaderCharCmd, state.MaxResponseBufferSize);
-
- //Accept socket and pass state object
- entity.AcceptWebSocket(WebsocketAcceptedAsync, state);
- return VfReturnType.VirtualSkip;
- }
- catch (KeyNotFoundException)
- {
- return VfReturnType.BadRequest;
- }
- }
-
- private async Task WebsocketAcceptedAsync(WebSocketSession wss)
- {
- //Inc connected count
- Interlocked.Increment(ref _connectedClients);
- //Register plugin exit token to cancel the connected socket
- CancellationTokenRegistration reg = Pbase.UnloadToken.Register(wss.CancelAll);
- try
- {
- WsUserState state = (wss.UserState as WsUserState)!;
-
- //Init listener args from request
- FBMListenerSessionParams args = new()
- {
- MaxMessageSize = state.MaxMessageSize,
- RecvBufferSize = state.RecvBufferSize,
- ResponseBufferSize = state.MaxResponseBufferSize,
- MaxHeaderBufferSize = state.MaxHeaderBufferSize,
- HeaderEncoding = Helpers.DefaultEncoding,
- };
-
- //Listen for requests
- await Store.ListenAsync(wss, args, state.SyncQueue);
- }
- catch (OperationCanceledException)
- {
- Log.Debug("Websocket connection was canceled");
- //Disconnect the socket
- await wss.CloseSocketOutputAsync(System.Net.WebSockets.WebSocketCloseStatus.NormalClosure, "unload", CancellationToken.None);
- }
- catch (Exception ex)
- {
- Log.Debug(ex);
- }
- finally
- {
- //Dec connected count
- Interlocked.Decrement(ref _connectedClients);
- //Unregister the
- reg.Unregister();
- }
- Log.Debug("Server websocket exited");
- }
- }
-}