aboutsummaryrefslogtreecommitdiff
path: root/Plugins/SessionCacheServer/Endpoints/ConnectEndpoint.cs
diff options
context:
space:
mode:
authorLibravatar vman <public@vaughnnugent.com>2022-11-30 14:58:14 -0500
committerLibravatar vman <public@vaughnnugent.com>2022-11-30 14:58:14 -0500
commita0d5a8d40de9806e21e64475e3297a2a84effe22 (patch)
tree510ffabe5a8617e7a9388641bf5aefb2fd51742d /Plugins/SessionCacheServer/Endpoints/ConnectEndpoint.cs
parent71ad09dda9ac67ef481d115fb9544dcd56834f22 (diff)
Project cleanup + analyzer updates
Diffstat (limited to 'Plugins/SessionCacheServer/Endpoints/ConnectEndpoint.cs')
-rw-r--r--Plugins/SessionCacheServer/Endpoints/ConnectEndpoint.cs195
1 files changed, 168 insertions, 27 deletions
diff --git a/Plugins/SessionCacheServer/Endpoints/ConnectEndpoint.cs b/Plugins/SessionCacheServer/Endpoints/ConnectEndpoint.cs
index dc94825..fee1ea7 100644
--- a/Plugins/SessionCacheServer/Endpoints/ConnectEndpoint.cs
+++ b/Plugins/SessionCacheServer/Endpoints/ConnectEndpoint.cs
@@ -33,7 +33,10 @@ using System.Collections.Generic;
using System.Security.Cryptography;
using System.Collections.Concurrent;
+using VNLib.Net.Http;
+using VNLib.Hashing;
using VNLib.Utils.Async;
+using VNLib.Utils.Memory;
using VNLib.Utils.Logging;
using VNLib.Hashing.IdentityUtility;
using VNLib.Net.Messaging.FBM;
@@ -50,7 +53,6 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints
{
class ConnectEndpoint : ResourceEndpointBase
{
-
const int MAX_RECV_BUF_SIZE = 1000 * 1024;
const int MIN_RECV_BUF_SIZE = 8 * 1024;
const int MAX_HEAD_BUF_SIZE = 2048;
@@ -60,8 +62,9 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints
const int MAX_EVENT_QUEUE_SIZE = 10000;
const int MAX_RESPONSE_BUFFER_SIZE = 10 * 1024;
- private static readonly Encoding FBMHeaderEncoding = Helpers.DefaultEncoding;
+ private static readonly TimeSpan AuthTokenExpiration = TimeSpan.FromSeconds(30);
+ private readonly string AudienceLocalServerId;
private readonly ObjectCacheStore Store;
private readonly PluginBase Pbase;
@@ -71,7 +74,13 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints
public uint ConnectedClients => _connectedClients;
- protected override ProtectionSettings EndpointProtectionSettings { get; }
+ //Loosen up protection settings
+ protected override ProtectionSettings EndpointProtectionSettings { get; } = new()
+ {
+ BrowsersOnly = false,
+ SessionsRequired = false,
+ CrossSiteDenied = false
+ };
public ConnectEndpoint(string path, ObjectCacheStore store, PluginBase pbase)
{
@@ -79,18 +88,107 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints
Store = store;//Load client public key to verify signed messages
Pbase = pbase;
-
StatefulEventQueue = new(StringComparer.OrdinalIgnoreCase);
+
//Start the queue worker
- _ = ChangeWorkerAsync().ConfigureAwait(false);
+ _ = pbase.DeferTask(() => ChangeWorkerAsync(pbase.UnloadToken), 10);
+
+ AudienceLocalServerId = Guid.NewGuid().ToString("N");
+ }
+
+ /*
+ * Used as a client negotiation and verification request
+ *
+ * The token created during this request will be verified by the client
+ * and is already verified by this server, will be passed back
+ * via the authorization header during the websocket upgrade.
+ *
+ * This server must verify the authenticity of the returned token
+ *
+ * The tokens are very short lived as requests are intended to be made
+ * directly after verification
+ */
+
+ protected override async ValueTask<VfReturnType> GetAsync(HttpEntity entity)
+ {
+ //Parse jwt from authoriation
+ string? jwtAuth = entity.Server.Headers[HttpRequestHeader.Authorization];
+ if (string.IsNullOrWhiteSpace(jwtAuth))
+ {
+ entity.CloseResponse(HttpStatusCode.Unauthorized);
+ return VfReturnType.VirtualSkip;
+ }
+
+ string? nodeId = null;
+ string? challenge = null;
+
+ // Parse jwt
+ using (JsonWebToken jwt = JsonWebToken.Parse(jwtAuth))
+ {
+ //Get the client public key
+ byte[] clientPub = await GetClientPubAsync();
+
+ //Init sig alg
+ using ECDsa verAlg = ECDsa.Create(FBMDataCacheExtensions.CacheCurve);
+ //Import client pub key
+ verAlg.ImportSubjectPublicKeyInfo(clientPub, out _);
+
+ //verify signature for client
+ if (!jwt.Verify(verAlg, in FBMDataCacheExtensions.CacheJwtAlgorithm))
+ {
+ Log.Information("Client signature verification failed");
+ entity.CloseResponse(HttpStatusCode.Unauthorized);
+ return VfReturnType.VirtualSkip;
+ }
+
+ //Recover json body
+ using JsonDocument doc = jwt.GetPayload();
+ if (doc.RootElement.TryGetProperty("sub", out JsonElement servIdEl))
+ {
+ nodeId = servIdEl.GetString();
+ }
+ if (doc.RootElement.TryGetProperty("chl", out JsonElement challengeEl))
+ {
+ challenge = challengeEl.GetString();
+ }
+ }
- //Loosen up protection settings
- EndpointProtectionSettings = new()
+ Log.Debug("Received negotiation request from node {node}", nodeId);
+
+ //Verified, now we can create an auth message with a short expiration
+ using JsonWebToken auth = new();
+ auth.WriteHeader(FBMDataCacheExtensions.JwtMessageHeader.Span);
+ auth.InitPayloadClaim()
+ .AddClaim("aud", AudienceLocalServerId)
+ .AddClaim("exp", DateTimeOffset.UtcNow.Add(AuthTokenExpiration).ToUnixTimeSeconds())
+ .AddClaim("nonce", RandomHash.GetRandomHex(8))
+ .AddClaim("chl", challenge)
+ //Specify the server's node id if set
+ .AddClaim("sub", nodeId)
+ //Add negotiaion args
+ .AddClaim(FBMClient.REQ_HEAD_BUF_QUERY_ARG, MAX_HEAD_BUF_SIZE)
+ .AddClaim(FBMClient.REQ_RECV_BUF_QUERY_ARG, MAX_RECV_BUF_SIZE)
+ .AddClaim(FBMClient.REQ_MAX_MESS_QUERY_ARG, MAX_MESSAGE_SIZE)
+ .CommitClaims();
+
+ //Sign the auth message
+ ECDsa sigAlg = ECDsa.Create(FBMDataCacheExtensions.CacheCurve);
+ byte[] cachePrivate = await GetCachePrivateKeyAsync();
+ try
{
- BrowsersOnly = false,
- SessionsRequired = false,
- CrossSiteDenied = false
- };
+ sigAlg.ImportPkcs8PrivateKey(cachePrivate, out _);
+ //sign jwt
+ auth.Sign(sigAlg, in FBMDataCacheExtensions.CacheJwtAlgorithm, 256);
+ }
+ finally
+ {
+ Memory.InitializeBlock(cachePrivate.AsSpan());
+ sigAlg.Dispose();
+ }
+
+ //Close response
+ entity.CloseResponse(HttpStatusCode.OK, ContentType.Text, auth.DataBuffer);
+ return VfReturnType.VirtualSkip;
}
private async Task<byte[]> GetClientPubAsync()
@@ -99,15 +197,31 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints
return Convert.FromBase64String(brokerPubKey);
}
+ private async Task<byte[]> GetCachePubAsync()
+ {
+ string? brokerPubKey = await Pbase.TryGetSecretAsync("cache_public_key") ?? throw new KeyNotFoundException("Missing required secret : client_public_key");
+
+ return Convert.FromBase64String(brokerPubKey);
+ }
+ private async Task<byte[]> GetCachePrivateKeyAsync()
+ {
+ string? cachePrivate = await Pbase.TryGetSecretAsync("cache_private_key") ?? throw new KeyNotFoundException("Missing required secret : client_public_key");
+
+ byte[] data = Convert.FromBase64String(cachePrivate);
+
+ Memory.UnsafeZeroMemory<char>(cachePrivate);
+
+ return data;
+ }
- private async Task ChangeWorkerAsync()
+ private async Task ChangeWorkerAsync(CancellationToken cancellation)
{
try
{
//Listen for changes
while (true)
{
- ChangeEvent ev = await Store.EventQueue.DequeueAsync(Pbase.UnloadToken);
+ ChangeEvent ev = await Store.EventQueue.DequeueAsync(cancellation);
//Add event to queues
foreach (AsyncQueue<ChangeEvent> queue in StatefulEventQueue.Values)
{
@@ -139,45 +253,70 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints
{
try
{
- //Parse jwt from authoriation
+ //Parse jwt from authorization
string? jwtAuth = entity.Server.Headers[HttpRequestHeader.Authorization];
if (string.IsNullOrWhiteSpace(jwtAuth))
{
entity.CloseResponse(HttpStatusCode.Unauthorized);
return VfReturnType.VirtualSkip;
}
+
string? nodeId = null;
//Parse jwt
using (JsonWebToken jwt = JsonWebToken.Parse(jwtAuth))
{
- //Get the client public key
- byte[] clientPub = await GetClientPubAsync();
-
- //Init sig alg
- using ECDsa sigAlg = ECDsa.Create(FBMDataCacheExtensions.CacheCurve);
- //Import client pub key
- sigAlg.ImportSubjectPublicKeyInfo(clientPub, out _);
- //verify signature for client
- if (!jwt.Verify(sigAlg, FBMDataCacheExtensions.CacheJwtAlgorithm))
+ //Init sig alg, we will verify that the token was signed by this server
+ using (ECDsa sigAlg = ECDsa.Create(FBMDataCacheExtensions.CacheCurve))
{
- entity.CloseResponse(HttpStatusCode.Unauthorized);
- return VfReturnType.VirtualSkip;
+ //Get the cache public key
+ byte[] cachePub = await GetCachePubAsync();
+
+ sigAlg.ImportSubjectPublicKeyInfo(cachePub, out _);
+
+ //verify signature against the cache public key, since this server have signed it
+ if (!jwt.Verify(sigAlg, in FBMDataCacheExtensions.CacheJwtAlgorithm))
+ {
+ entity.CloseResponse(HttpStatusCode.Unauthorized);
+ return VfReturnType.VirtualSkip;
+ }
}
+
//Recover json body
using JsonDocument doc = jwt.GetPayload();
- if (doc.RootElement.TryGetProperty("server_id", out JsonElement servIdEl))
+
+ //Verify audience, expiration
+
+ if (!doc.RootElement.TryGetProperty("aud", out JsonElement audEl)
+ || AudienceLocalServerId.Equals(audEl.GetString(), StringComparison.OrdinalIgnoreCase))
+ {
+ entity.CloseResponse(HttpStatusCode.Unauthorized);
+ return VfReturnType.VirtualSkip;
+ }
+
+ if (!doc.RootElement.TryGetProperty("exp", out JsonElement expEl)
+ || DateTimeOffset.FromUnixTimeSeconds(expEl.GetInt64()) < DateTimeOffset.UtcNow)
+ {
+ entity.CloseResponse(HttpStatusCode.Unauthorized);
+ return VfReturnType.VirtualSkip;
+ }
+
+ //The node id is optional and stored in the 'sub' field
+ if (doc.RootElement.TryGetProperty("sub", out JsonElement servIdEl))
{
nodeId = servIdEl.GetString();
}
}
+
//Get query config suggestions from the client
string recvBufCmd = entity.QueryArgs[FBMClient.REQ_RECV_BUF_QUERY_ARG];
string maxHeaderCharCmd = entity.QueryArgs[FBMClient.REQ_HEAD_BUF_QUERY_ARG];
string maxMessageSizeCmd = entity.QueryArgs[FBMClient.REQ_MAX_MESS_QUERY_ARG];
+
//Parse recv buffer size
int recvBufSize = int.TryParse(recvBufCmd, out int rbs) ? rbs : MIN_RECV_BUF_SIZE;
int maxHeadBufSize = int.TryParse(maxHeaderCharCmd, out int hbs) ? hbs : MIN_HEAD_BUF_SIZE;
int maxMessageSize = int.TryParse(maxMessageSizeCmd, out int mxs) ? mxs : MIN_MESSAGE_SIZE;
+
AsyncQueue<ChangeEvent>? nodeQueue = null;
//The connection may be a caching server node, so get its node-id
if (!string.IsNullOrWhiteSpace(nodeId))
@@ -212,7 +351,9 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints
MaxResponseBufferSize = Math.Min(maxMessageSize, MAX_RESPONSE_BUFFER_SIZE),
SyncQueue = nodeQueue
};
+
Log.Debug("Client recv buffer suggestion {recv}, header buffer size {head}, response buffer size {r}", recvBufCmd, maxHeaderCharCmd, state.MaxResponseBufferSize);
+
//Accept socket and pass state object
entity.AcceptWebSocket(WebsocketAcceptedAsync, state);
return VfReturnType.VirtualSkip;
@@ -239,7 +380,7 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints
RecvBufferSize = state.RecvBufferSize,
ResponseBufferSize = state.MaxResponseBufferSize,
MaxHeaderBufferSize = state.MaxHeaderBufferSize,
- HeaderEncoding = FBMHeaderEncoding,
+ HeaderEncoding = Helpers.DefaultEncoding,
};
//Listen for requests