From a0d5a8d40de9806e21e64475e3297a2a84effe22 Mon Sep 17 00:00:00 2001 From: vman Date: Wed, 30 Nov 2022 14:58:14 -0500 Subject: Project cleanup + analyzer updates --- .../Endpoints/ConnectEndpoint.cs | 195 ++++++++++++++++++--- 1 file changed, 168 insertions(+), 27 deletions(-) (limited to 'Plugins/SessionCacheServer/Endpoints/ConnectEndpoint.cs') diff --git a/Plugins/SessionCacheServer/Endpoints/ConnectEndpoint.cs b/Plugins/SessionCacheServer/Endpoints/ConnectEndpoint.cs index dc94825..fee1ea7 100644 --- a/Plugins/SessionCacheServer/Endpoints/ConnectEndpoint.cs +++ b/Plugins/SessionCacheServer/Endpoints/ConnectEndpoint.cs @@ -33,7 +33,10 @@ using System.Collections.Generic; using System.Security.Cryptography; using System.Collections.Concurrent; +using VNLib.Net.Http; +using VNLib.Hashing; using VNLib.Utils.Async; +using VNLib.Utils.Memory; using VNLib.Utils.Logging; using VNLib.Hashing.IdentityUtility; using VNLib.Net.Messaging.FBM; @@ -50,7 +53,6 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints { class ConnectEndpoint : ResourceEndpointBase { - const int MAX_RECV_BUF_SIZE = 1000 * 1024; const int MIN_RECV_BUF_SIZE = 8 * 1024; const int MAX_HEAD_BUF_SIZE = 2048; @@ -60,8 +62,9 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints const int MAX_EVENT_QUEUE_SIZE = 10000; const int MAX_RESPONSE_BUFFER_SIZE = 10 * 1024; - private static readonly Encoding FBMHeaderEncoding = Helpers.DefaultEncoding; + private static readonly TimeSpan AuthTokenExpiration = TimeSpan.FromSeconds(30); + private readonly string AudienceLocalServerId; private readonly ObjectCacheStore Store; private readonly PluginBase Pbase; @@ -71,7 +74,13 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints public uint ConnectedClients => _connectedClients; - protected override ProtectionSettings EndpointProtectionSettings { get; } + //Loosen up protection settings + protected override ProtectionSettings EndpointProtectionSettings { get; } = new() + { + BrowsersOnly = false, + SessionsRequired = false, + CrossSiteDenied = false + }; public ConnectEndpoint(string path, ObjectCacheStore store, PluginBase pbase) { @@ -79,18 +88,107 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints Store = store;//Load client public key to verify signed messages Pbase = pbase; - StatefulEventQueue = new(StringComparer.OrdinalIgnoreCase); + //Start the queue worker - _ = ChangeWorkerAsync().ConfigureAwait(false); + _ = pbase.DeferTask(() => ChangeWorkerAsync(pbase.UnloadToken), 10); + + AudienceLocalServerId = Guid.NewGuid().ToString("N"); + } + + /* + * Used as a client negotiation and verification request + * + * The token created during this request will be verified by the client + * and is already verified by this server, will be passed back + * via the authorization header during the websocket upgrade. + * + * This server must verify the authenticity of the returned token + * + * The tokens are very short lived as requests are intended to be made + * directly after verification + */ + + protected override async ValueTask GetAsync(HttpEntity entity) + { + //Parse jwt from authoriation + string? jwtAuth = entity.Server.Headers[HttpRequestHeader.Authorization]; + if (string.IsNullOrWhiteSpace(jwtAuth)) + { + entity.CloseResponse(HttpStatusCode.Unauthorized); + return VfReturnType.VirtualSkip; + } + + string? nodeId = null; + string? challenge = null; + + // Parse jwt + using (JsonWebToken jwt = JsonWebToken.Parse(jwtAuth)) + { + //Get the client public key + byte[] clientPub = await GetClientPubAsync(); + + //Init sig alg + using ECDsa verAlg = ECDsa.Create(FBMDataCacheExtensions.CacheCurve); + //Import client pub key + verAlg.ImportSubjectPublicKeyInfo(clientPub, out _); + + //verify signature for client + if (!jwt.Verify(verAlg, in FBMDataCacheExtensions.CacheJwtAlgorithm)) + { + Log.Information("Client signature verification failed"); + entity.CloseResponse(HttpStatusCode.Unauthorized); + return VfReturnType.VirtualSkip; + } + + //Recover json body + using JsonDocument doc = jwt.GetPayload(); + if (doc.RootElement.TryGetProperty("sub", out JsonElement servIdEl)) + { + nodeId = servIdEl.GetString(); + } + if (doc.RootElement.TryGetProperty("chl", out JsonElement challengeEl)) + { + challenge = challengeEl.GetString(); + } + } - //Loosen up protection settings - EndpointProtectionSettings = new() + Log.Debug("Received negotiation request from node {node}", nodeId); + + //Verified, now we can create an auth message with a short expiration + using JsonWebToken auth = new(); + auth.WriteHeader(FBMDataCacheExtensions.JwtMessageHeader.Span); + auth.InitPayloadClaim() + .AddClaim("aud", AudienceLocalServerId) + .AddClaim("exp", DateTimeOffset.UtcNow.Add(AuthTokenExpiration).ToUnixTimeSeconds()) + .AddClaim("nonce", RandomHash.GetRandomHex(8)) + .AddClaim("chl", challenge) + //Specify the server's node id if set + .AddClaim("sub", nodeId) + //Add negotiaion args + .AddClaim(FBMClient.REQ_HEAD_BUF_QUERY_ARG, MAX_HEAD_BUF_SIZE) + .AddClaim(FBMClient.REQ_RECV_BUF_QUERY_ARG, MAX_RECV_BUF_SIZE) + .AddClaim(FBMClient.REQ_MAX_MESS_QUERY_ARG, MAX_MESSAGE_SIZE) + .CommitClaims(); + + //Sign the auth message + ECDsa sigAlg = ECDsa.Create(FBMDataCacheExtensions.CacheCurve); + byte[] cachePrivate = await GetCachePrivateKeyAsync(); + try { - BrowsersOnly = false, - SessionsRequired = false, - CrossSiteDenied = false - }; + sigAlg.ImportPkcs8PrivateKey(cachePrivate, out _); + //sign jwt + auth.Sign(sigAlg, in FBMDataCacheExtensions.CacheJwtAlgorithm, 256); + } + finally + { + Memory.InitializeBlock(cachePrivate.AsSpan()); + sigAlg.Dispose(); + } + + //Close response + entity.CloseResponse(HttpStatusCode.OK, ContentType.Text, auth.DataBuffer); + return VfReturnType.VirtualSkip; } private async Task GetClientPubAsync() @@ -99,15 +197,31 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints return Convert.FromBase64String(brokerPubKey); } + private async Task GetCachePubAsync() + { + string? brokerPubKey = await Pbase.TryGetSecretAsync("cache_public_key") ?? throw new KeyNotFoundException("Missing required secret : client_public_key"); + + return Convert.FromBase64String(brokerPubKey); + } + private async Task GetCachePrivateKeyAsync() + { + string? cachePrivate = await Pbase.TryGetSecretAsync("cache_private_key") ?? throw new KeyNotFoundException("Missing required secret : client_public_key"); + + byte[] data = Convert.FromBase64String(cachePrivate); + + Memory.UnsafeZeroMemory(cachePrivate); + + return data; + } - private async Task ChangeWorkerAsync() + private async Task ChangeWorkerAsync(CancellationToken cancellation) { try { //Listen for changes while (true) { - ChangeEvent ev = await Store.EventQueue.DequeueAsync(Pbase.UnloadToken); + ChangeEvent ev = await Store.EventQueue.DequeueAsync(cancellation); //Add event to queues foreach (AsyncQueue queue in StatefulEventQueue.Values) { @@ -139,45 +253,70 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints { try { - //Parse jwt from authoriation + //Parse jwt from authorization string? jwtAuth = entity.Server.Headers[HttpRequestHeader.Authorization]; if (string.IsNullOrWhiteSpace(jwtAuth)) { entity.CloseResponse(HttpStatusCode.Unauthorized); return VfReturnType.VirtualSkip; } + string? nodeId = null; //Parse jwt using (JsonWebToken jwt = JsonWebToken.Parse(jwtAuth)) { - //Get the client public key - byte[] clientPub = await GetClientPubAsync(); - - //Init sig alg - using ECDsa sigAlg = ECDsa.Create(FBMDataCacheExtensions.CacheCurve); - //Import client pub key - sigAlg.ImportSubjectPublicKeyInfo(clientPub, out _); - //verify signature for client - if (!jwt.Verify(sigAlg, FBMDataCacheExtensions.CacheJwtAlgorithm)) + //Init sig alg, we will verify that the token was signed by this server + using (ECDsa sigAlg = ECDsa.Create(FBMDataCacheExtensions.CacheCurve)) { - entity.CloseResponse(HttpStatusCode.Unauthorized); - return VfReturnType.VirtualSkip; + //Get the cache public key + byte[] cachePub = await GetCachePubAsync(); + + sigAlg.ImportSubjectPublicKeyInfo(cachePub, out _); + + //verify signature against the cache public key, since this server have signed it + if (!jwt.Verify(sigAlg, in FBMDataCacheExtensions.CacheJwtAlgorithm)) + { + entity.CloseResponse(HttpStatusCode.Unauthorized); + return VfReturnType.VirtualSkip; + } } + //Recover json body using JsonDocument doc = jwt.GetPayload(); - if (doc.RootElement.TryGetProperty("server_id", out JsonElement servIdEl)) + + //Verify audience, expiration + + if (!doc.RootElement.TryGetProperty("aud", out JsonElement audEl) + || AudienceLocalServerId.Equals(audEl.GetString(), StringComparison.OrdinalIgnoreCase)) + { + entity.CloseResponse(HttpStatusCode.Unauthorized); + return VfReturnType.VirtualSkip; + } + + if (!doc.RootElement.TryGetProperty("exp", out JsonElement expEl) + || DateTimeOffset.FromUnixTimeSeconds(expEl.GetInt64()) < DateTimeOffset.UtcNow) + { + entity.CloseResponse(HttpStatusCode.Unauthorized); + return VfReturnType.VirtualSkip; + } + + //The node id is optional and stored in the 'sub' field + if (doc.RootElement.TryGetProperty("sub", out JsonElement servIdEl)) { nodeId = servIdEl.GetString(); } } + //Get query config suggestions from the client string recvBufCmd = entity.QueryArgs[FBMClient.REQ_RECV_BUF_QUERY_ARG]; string maxHeaderCharCmd = entity.QueryArgs[FBMClient.REQ_HEAD_BUF_QUERY_ARG]; string maxMessageSizeCmd = entity.QueryArgs[FBMClient.REQ_MAX_MESS_QUERY_ARG]; + //Parse recv buffer size int recvBufSize = int.TryParse(recvBufCmd, out int rbs) ? rbs : MIN_RECV_BUF_SIZE; int maxHeadBufSize = int.TryParse(maxHeaderCharCmd, out int hbs) ? hbs : MIN_HEAD_BUF_SIZE; int maxMessageSize = int.TryParse(maxMessageSizeCmd, out int mxs) ? mxs : MIN_MESSAGE_SIZE; + AsyncQueue? nodeQueue = null; //The connection may be a caching server node, so get its node-id if (!string.IsNullOrWhiteSpace(nodeId)) @@ -212,7 +351,9 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints MaxResponseBufferSize = Math.Min(maxMessageSize, MAX_RESPONSE_BUFFER_SIZE), SyncQueue = nodeQueue }; + Log.Debug("Client recv buffer suggestion {recv}, header buffer size {head}, response buffer size {r}", recvBufCmd, maxHeaderCharCmd, state.MaxResponseBufferSize); + //Accept socket and pass state object entity.AcceptWebSocket(WebsocketAcceptedAsync, state); return VfReturnType.VirtualSkip; @@ -239,7 +380,7 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints RecvBufferSize = state.RecvBufferSize, ResponseBufferSize = state.MaxResponseBufferSize, MaxHeaderBufferSize = state.MaxHeaderBufferSize, - HeaderEncoding = FBMHeaderEncoding, + HeaderEncoding = Helpers.DefaultEncoding, }; //Listen for requests -- cgit