From dc0fc53fd3c3f6c32c8b0d063922c7018fa2c48f Mon Sep 17 00:00:00 2001 From: vnugent Date: Mon, 12 Jun 2023 19:04:15 -0400 Subject: Baby steps for autonomous nodes --- .../src/Endpoints/BrokerHeartBeat.cs | 132 ------- .../src/Endpoints/BrokerHeartBeatEndpoint.cs | 149 +++++++ .../src/Endpoints/CacheConfiguration.cs | 57 --- .../ObjectCacheServer/src/Endpoints/CacheStore.cs | 91 +++++ .../src/Endpoints/ConnectEndpoint.cs | 437 +++++++++++---------- .../ObjectCacheServer/src/Endpoints/ICacheStore.cs | 56 --- .../src/Endpoints/PeerDiscoveryEndpoint.cs | 129 ++++++ 7 files changed, 589 insertions(+), 462 deletions(-) delete mode 100644 plugins/ObjectCacheServer/src/Endpoints/BrokerHeartBeat.cs create mode 100644 plugins/ObjectCacheServer/src/Endpoints/BrokerHeartBeatEndpoint.cs delete mode 100644 plugins/ObjectCacheServer/src/Endpoints/CacheConfiguration.cs create mode 100644 plugins/ObjectCacheServer/src/Endpoints/CacheStore.cs delete mode 100644 plugins/ObjectCacheServer/src/Endpoints/ICacheStore.cs create mode 100644 plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs (limited to 'plugins/ObjectCacheServer/src/Endpoints') diff --git a/plugins/ObjectCacheServer/src/Endpoints/BrokerHeartBeat.cs b/plugins/ObjectCacheServer/src/Endpoints/BrokerHeartBeat.cs deleted file mode 100644 index 97061b3..0000000 --- a/plugins/ObjectCacheServer/src/Endpoints/BrokerHeartBeat.cs +++ /dev/null @@ -1,132 +0,0 @@ -/* -* Copyright (c) 2022 Vaughn Nugent -* -* Library: VNLib -* Package: ObjectCacheServer -* File: BrokerHeartBeat.cs -* -* BrokerHeartBeat.cs is part of ObjectCacheServer which is part of the larger -* VNLib collection of libraries and utilities. -* -* ObjectCacheServer is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License as -* published by the Free Software Foundation, either version 3 of the -* License, or (at your option) any later version. -* -* ObjectCacheServer is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see https://www.gnu.org/licenses/. -*/ - -using System; -using System.Net; -using System.Linq; -using System.Text.Json; -using System.Threading; -using System.Threading.Tasks; -using System.Collections.Generic; - -using VNLib.Plugins; -using VNLib.Plugins.Essentials; -using VNLib.Hashing.IdentityUtility; -using VNLib.Plugins.Essentials.Endpoints; -using VNLib.Plugins.Essentials.Extensions; -using VNLib.Plugins.Extensions.Loading; - -namespace VNLib.Data.Caching.ObjectCache.Server -{ - internal sealed class BrokerHeartBeat : ResourceEndpointBase - { - public override string Path => "/heartbeat"; - - private readonly Func Token; - private readonly ManualResetEvent KeepaliveSet; - private readonly Task BrokerIpList; - private readonly PluginBase Pbase; - - /// - protected override ProtectionSettings EndpointProtectionSettings { get; } = new() - { - DisableBrowsersOnly = true, - DisableSessionsRequired = true - }; - - public BrokerHeartBeat(Func token, ManualResetEvent keepaliveSet, Uri brokerUri, PluginBase pbase) - { - Token = token; - KeepaliveSet = keepaliveSet; - BrokerIpList = Dns.GetHostAddressesAsync(brokerUri.DnsSafeHost); - - this.Pbase = pbase; - } - - private async Task GetBrokerPubAsync() - { - return await Pbase.TryGetSecretAsync("broker_public_key").ToJsonWebKey() ?? throw new KeyNotFoundException("Missing required secret : broker_public_key"); - } - - protected override async ValueTask GetAsync(HttpEntity entity) - { - //If-not loopback then verify server address - if (!entity.Server.IsLoopBack()) - { - //Load and verify the broker's ip address matches with an address we have stored - IPAddress[] addresses = await BrokerIpList; - if (!addresses.Contains(entity.TrustedRemoteIp)) - { - //Token invalid - entity.CloseResponse(HttpStatusCode.Forbidden); - return VfReturnType.VirtualSkip; - } - } - //Get the authorization jwt - string? jwtAuth = entity.Server.Headers[HttpRequestHeader.Authorization]; - - if (string.IsNullOrWhiteSpace(jwtAuth)) - { - //Token invalid - entity.CloseResponse(HttpStatusCode.Forbidden); - return VfReturnType.VirtualSkip; - } - - //Parse the jwt - using JsonWebToken jwt = JsonWebToken.Parse(jwtAuth); - - //Verify the jwt using the broker's public key certificate - using (ReadOnlyJsonWebKey cert = await GetBrokerPubAsync()) - { - //Verify the jwt - if (!jwt.VerifyFromJwk(cert)) - { - //Token invalid - entity.CloseResponse(HttpStatusCode.Forbidden); - return VfReturnType.VirtualSkip; - } - } - - string? auth; - //Recover the auth token from the jwt - using (JsonDocument doc = jwt.GetPayload()) - { - auth = doc.RootElement.GetProperty("token").GetString(); - } - - //Verify token - if(Token().Equals(auth, StringComparison.Ordinal)) - { - //Signal keepalive - KeepaliveSet.Set(); - entity.CloseResponse(HttpStatusCode.OK); - return VfReturnType.VirtualSkip; - } - - //Token invalid - entity.CloseResponse(HttpStatusCode.Forbidden); - return VfReturnType.VirtualSkip; - } - } -} diff --git a/plugins/ObjectCacheServer/src/Endpoints/BrokerHeartBeatEndpoint.cs b/plugins/ObjectCacheServer/src/Endpoints/BrokerHeartBeatEndpoint.cs new file mode 100644 index 0000000..b9c00e6 --- /dev/null +++ b/plugins/ObjectCacheServer/src/Endpoints/BrokerHeartBeatEndpoint.cs @@ -0,0 +1,149 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: ObjectCacheServer +* File: BrokerHeartBeatEndpoint.cs +* +* BrokerHeartBeatEndpoint.cs is part of ObjectCacheServer which is part of the larger +* VNLib collection of libraries and utilities. +* +* ObjectCacheServer is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* ObjectCacheServer is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Net; +using System.Linq; +using System.Text.Json; +using System.Threading.Tasks; + + +using VNLib.Plugins; +using VNLib.Utils.Logging; +using VNLib.Plugins.Essentials; +using VNLib.Hashing.IdentityUtility; +using VNLib.Plugins.Essentials.Endpoints; +using VNLib.Plugins.Essentials.Extensions; +using VNLib.Plugins.Extensions.Loading; + +namespace VNLib.Data.Caching.ObjectCache.Server +{ + internal sealed class BrokerHeartBeatEndpoint : ResourceEndpointBase + { + private readonly IBrokerHeartbeatNotifier _heartBeat; + private readonly Task BrokerIpList; + private readonly bool DebugMode; + + /// + protected override ProtectionSettings EndpointProtectionSettings { get; } = new() + { + DisableBrowsersOnly = true, + DisableSessionsRequired = true + }; + + public BrokerHeartBeatEndpoint(PluginBase plugin) + { + //Get debug flag + DebugMode = plugin.IsDebug(); + + //Get or create the current node config + _heartBeat = plugin.GetOrCreateSingleton(); + + /* + * Resolve the ip address of the broker and store it to verify connections + * later + */ + BrokerIpList = Dns.GetHostAddressesAsync(_heartBeat.GetBrokerAddress().DnsSafeHost); + + //Setup endpoint + InitPathAndLog("/heartbeat", plugin.Log); + } + + + protected override async ValueTask GetAsync(HttpEntity entity) + { + //If-not loopback then verify server address + if (!entity.Server.IsLoopBack()) + { + //Load and verify the broker's ip address matches with an address we have stored + IPAddress[] addresses = await BrokerIpList; + + if (!addresses.Contains(entity.TrustedRemoteIp)) + { + if (DebugMode) + { + Log.Debug("Received connection {ip} that was not a DNS safe address for the broker server, access denied"); + } + + //Token invalid + entity.CloseResponse(HttpStatusCode.Forbidden); + return VfReturnType.VirtualSkip; + } + } + + //Get the authorization jwt + string? jwtAuth = entity.Server.Headers[HttpRequestHeader.Authorization]; + + if (string.IsNullOrWhiteSpace(jwtAuth)) + { + //Token invalid + entity.CloseResponse(HttpStatusCode.Forbidden); + return VfReturnType.VirtualSkip; + } + + //Parse the jwt + using JsonWebToken jwt = JsonWebToken.Parse(jwtAuth); + + //Verify the jwt using the broker's public key certificate + using (ReadOnlyJsonWebKey cert = _heartBeat.GetBrokerPublicKey()) + { + //Verify the jwt + if (!jwt.VerifyFromJwk(cert)) + { + //Token invalid + entity.CloseResponse(HttpStatusCode.Forbidden); + return VfReturnType.VirtualSkip; + } + } + + string? auth; + //Recover the auth token from the jwt + using (JsonDocument doc = jwt.GetPayload()) + { + auth = doc.RootElement.GetProperty("token").GetString(); + } + + //Get our stored token used for registration + string? selfToken = _heartBeat.GetAuthToken(); + + //Verify token + if (selfToken != null && selfToken.Equals(auth, StringComparison.Ordinal)) + { + //Signal keepalive + _heartBeat.HearbeatReceived(); + entity.CloseResponse(HttpStatusCode.OK); + return VfReturnType.VirtualSkip; + } + + if (DebugMode) + { + Log.Debug("Invalid auth token recieved from broker sever, access denied"); + } + + //Token invalid + entity.CloseResponse(HttpStatusCode.Forbidden); + return VfReturnType.VirtualSkip; + } + } +} diff --git a/plugins/ObjectCacheServer/src/Endpoints/CacheConfiguration.cs b/plugins/ObjectCacheServer/src/Endpoints/CacheConfiguration.cs deleted file mode 100644 index f7adeb3..0000000 --- a/plugins/ObjectCacheServer/src/Endpoints/CacheConfiguration.cs +++ /dev/null @@ -1,57 +0,0 @@ -/* -* Copyright (c) 2023 Vaughn Nugent -* -* Library: VNLib -* Package: ObjectCacheServer -* File: CacheConfiguration.cs -* -* CacheConfiguration.cs is part of ObjectCacheServer which is part of the larger -* VNLib collection of libraries and utilities. -* -* ObjectCacheServer is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License as -* published by the Free Software Foundation, either version 3 of the -* License, or (at your option) any later version. -* -* ObjectCacheServer is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see https://www.gnu.org/licenses/. -*/ - -using System.Text.Json.Serialization; - -namespace VNLib.Data.Caching.ObjectCache.Server -{ - internal sealed class CacheConfiguration - { - [JsonPropertyName("buffer_recv_max")] - public int MaxRecvBufferSize { get; set; } = 1000 * 1024; - [JsonPropertyName("buffer_recv_min")] - public int MinRecvBufferSize { get; set; } = 8 * 1024; - - - [JsonPropertyName("buffer_header_max")] - public int MaxHeaderBufferSize { get; set; } = 2 * 1024; - [JsonPropertyName("buffer_header_min")] - public int MinHeaderBufferSize { get; set; } = 128; - - - [JsonPropertyName("max_message_size")] - public int MaxMessageSize { get; set; } = 1000 * 1024; - - - [JsonPropertyName("change_queue_max_depth")] - public int MaxEventQueueDepth { get; set; } = 10 * 1000; - - - [JsonPropertyName("max_cache")] - public uint MaxCacheEntries { get; set; } = 10000; - - [JsonPropertyName("buckets")] - public uint BucketCount { get; set; } = 10; - } -} diff --git a/plugins/ObjectCacheServer/src/Endpoints/CacheStore.cs b/plugins/ObjectCacheServer/src/Endpoints/CacheStore.cs new file mode 100644 index 0000000..67db433 --- /dev/null +++ b/plugins/ObjectCacheServer/src/Endpoints/CacheStore.cs @@ -0,0 +1,91 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: ObjectCacheServer +* File: ConnectEndpoint.cs +* +* ConnectEndpoint.cs is part of ObjectCacheServer which is part of the larger +* VNLib collection of libraries and utilities. +* +* ObjectCacheServer is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* ObjectCacheServer is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Threading; +using System.Threading.Tasks; +using VNLib.Utils.Logging; +using VNLib.Plugins; +using VNLib.Plugins.Extensions.Loading; + +namespace VNLib.Data.Caching.ObjectCache.Server +{ + [ConfigurationName("cache")] + sealed class CacheStore : ICacheStore, IDisposable + { + public BlobCacheListener Listener { get; } + + + public CacheStore(PluginBase plugin, IConfigScope config) + { + //Init cache + Listener = InitializeCache((ObjectCacheServerEntry)plugin, config); + } + + ValueTask ICacheStore.AddOrUpdateBlobAsync(string objectId, string? alternateId, GetBodyDataCallback bodyData, T state, CancellationToken token) + { + return Listener.Cache.AddOrUpdateObjectAsync(objectId, alternateId, bodyData, state, default, token); + } + + void ICacheStore.Clear() + { + throw new NotImplementedException(); + } + + ValueTask ICacheStore.DeleteItemAsync(string id, CancellationToken token) + { + return Listener.Cache.DeleteObjectAsync(id, token); + } + + private static BlobCacheListener InitializeCache(ObjectCacheServerEntry plugin, IConfigScope config) + { + //Deserialize the cache config + CacheConfiguration cacheConf = config.Deserialze(); + + if (cacheConf.MaxCacheEntries < 2) + { + throw new ArgumentException("You must configure a 'max_cache' size larger than 1 item"); + } + + //Suggestion + if (cacheConf.MaxCacheEntries < 200) + { + plugin.Log.Information("Suggestion: You may want a larger cache size, you have less than 200 items in cache"); + } + + plugin.Log.Verbose("Creating cache store with {bc} buckets, with {mc} items/bucket", cacheConf.BucketCount, cacheConf.MaxCacheEntries); + + //Load the blob cache table system + IBlobCacheTable bc = plugin.LoadMemoryCacheSystem(config, plugin.CacheHeap, cacheConf); + + //Endpoint only allows for a single reader + return new(bc, plugin.Log, plugin.CacheHeap, true); + } + + public void Dispose() + { + Listener.Dispose(); + } + } +} diff --git a/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs b/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs index 2f896bc..167a7e9 100644 --- a/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs +++ b/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs @@ -27,40 +27,46 @@ using System.Net; using System.Text.Json; using System.Threading; using System.Threading.Tasks; -using System.Threading.Channels; using System.Collections.Generic; -using System.Collections.Concurrent; -using VNLib.Plugins; using VNLib.Hashing; using VNLib.Net.Http; using VNLib.Utils.Async; using VNLib.Utils.Memory; using VNLib.Utils.Logging; +using VNLib.Utils.Extensions; using VNLib.Data.Caching; +using VNLib.Data.Caching.Extensions; using VNLib.Hashing.IdentityUtility; using VNLib.Net.Messaging.FBM; using VNLib.Net.Messaging.FBM.Client; using VNLib.Net.Messaging.FBM.Server; +using VNLib.Plugins; using VNLib.Plugins.Essentials; using VNLib.Plugins.Extensions.Loading; using VNLib.Plugins.Essentials.Endpoints; using VNLib.Plugins.Essentials.Extensions; - +using VNLib.Plugins.Extensions.Loading.Routing; +using VNLib.Data.Caching.ObjectCache.Server.Distribution; namespace VNLib.Data.Caching.ObjectCache.Server { - [ConfigurationName("store")] - internal sealed class ConnectEndpoint : ResourceEndpointBase, IDisposable, IAsyncBackgroundWork + [ConfigurationName("connect_endpoint")] + internal sealed class ConnectEndpoint : ResourceEndpointBase, IAsyncBackgroundWork { - private static readonly TimeSpan AuthTokenExpiration = TimeSpan.FromSeconds(30); + private static readonly TimeSpan AuthTokenExpiration = TimeSpan.FromSeconds(30); + + + private readonly CacheNodeConfiguration NodeConfiguration; + private readonly ICacheEventQueueManager PubSubManager; + private readonly IPeerMonitor Peers; - private readonly string AudienceLocalServerId; private readonly BlobCacheListener Store; - private readonly PluginBase Pbase; + private readonly CacheAuthKeyStore KeyStore; - private readonly ConcurrentDictionary> StatefulEventQueue; + private readonly bool VerifyIp; + private readonly string AudienceLocalServerId; private uint _connectedClients; @@ -87,27 +93,26 @@ namespace VNLib.Data.Caching.ObjectCache.Server string? path = config["path"].GetString(); InitPathAndLog(path, plugin.Log); - - Pbase = plugin; - //Parse cache config or use default - if(config.TryGetValue("cache", out JsonElement confEl)) - { - CacheConfig = confEl.Deserialize()!; - } - else - { - //Init default config if not fount - CacheConfig = new(); + KeyStore = new(plugin); - Log.Verbose("Loading default cache buffer configuration"); - } + //Check for ip-verification flag + VerifyIp = config.TryGetValue("verify_ip", out JsonElement vIpEl) && vIpEl.GetBoolean(); - //Create event queue client lookup table - StatefulEventQueue = new(StringComparer.OrdinalIgnoreCase); + //Setup pub/sub manager + PubSubManager = plugin.GetOrCreateSingleton(); + + //Get node configuration + NodeConfiguration = plugin.GetOrCreateSingleton().Config; + + //Get peer monitor + Peers = plugin.GetOrCreateSingleton(); //Init the cache store - Store = InitializeCache((ObjectCacheServerEntry)plugin, CacheConfig, config); + Store = plugin.GetOrCreateSingleton().Listener; + + //Get the cache store configuration + CacheConfig = plugin.GetConfigForType().Deserialze(); /* * Generate a random guid for the current server when created so we @@ -118,60 +123,8 @@ namespace VNLib.Data.Caching.ObjectCache.Server //Schedule the queue worker to be run _ = plugin.ObserveWork(this, 100); } - - - private static BlobCacheListener InitializeCache(ObjectCacheServerEntry plugin, CacheConfiguration cacheConf, IConfigScope config) - { - if(cacheConf.MaxCacheEntries < 2) - { - throw new ArgumentException("You must configure a 'max_cache' size larger than 1 item"); - } - - //Suggestion - if(cacheConf.MaxCacheEntries < 200) - { - plugin.Log.Information("Suggestion: You may want a larger cache size, you have less than 200 items in cache"); - } - - plugin.Log.Verbose("Creating cache store with {bc} buckets, with {mc} items/bucket", cacheConf.BucketCount, cacheConf.MaxCacheEntries); - - //Load the blob cache table system - IBlobCacheTable bc = plugin.LoadMemoryCacheSystem(config, plugin.CacheHeap, cacheConf); - - //Endpoint only allows for a single reader - return new (bc, plugin.Log, plugin.CacheHeap, true); - } - - - /// - /// Gets the configured cache store - /// - /// - public ICacheStore GetCacheStore() => new CacheStore(Store); - //Dispose will be called by the host plugin on unload - void IDisposable.Dispose() - { - //Dispose the store on cleanup - Store.Dispose(); - } - - - private async Task GetClientPubAsync() - { - return await Pbase.TryGetSecretAsync("client_public_key").ToJsonWebKey() ?? throw new KeyNotFoundException("Missing required secret : client_public_key"); - } - private async Task GetCachePubAsync() - { - return await Pbase.TryGetSecretAsync("cache_public_key").ToJsonWebKey() ?? throw new KeyNotFoundException("Missing required secret : client_public_key"); - } - private async Task GetCachePrivateKeyAsync() - { - return await Pbase.TryGetSecretAsync("cache_private_key").ToJsonWebKey() ?? throw new KeyNotFoundException("Missing required secret : client_public_key"); - } - - /* * Used as a client negotiation and verification request * @@ -183,6 +136,10 @@ namespace VNLib.Data.Caching.ObjectCache.Server * * The tokens are very short lived as requests are intended to be made * directly after verification + * + * Clients must also sign the entire token with their private key and + * set the signature in the x-upgrade-sig header so we can verify they + * received the messages properly */ protected override async ValueTask GetAsync(HttpEntity entity) @@ -205,7 +162,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server bool verified = false; //Get the client public key certificate to verify the client's message - using(ReadOnlyJsonWebKey cert = await GetClientPubAsync()) + using(ReadOnlyJsonWebKey cert = await KeyStore.GetClientPublicKeyAsync()) { //verify signature for client if (jwt.VerifyFromJwk(cert)) @@ -215,10 +172,8 @@ namespace VNLib.Data.Caching.ObjectCache.Server //May be signed by a cache server else { - using ReadOnlyJsonWebKey cacheCert = await GetCachePubAsync(); - //Set peer and verified flag since the another cache server signed the request - isPeer = verified = jwt.VerifyFromJwk(cacheCert); + isPeer = verified = NodeConfiguration.VerifyCache(jwt); } } @@ -232,10 +187,12 @@ namespace VNLib.Data.Caching.ObjectCache.Server //Recover json body using JsonDocument doc = jwt.GetPayload(); + if (doc.RootElement.TryGetProperty("sub", out JsonElement servIdEl)) { nodeId = servIdEl.GetString(); } + if (doc.RootElement.TryGetProperty("chl", out JsonElement challengeEl)) { challenge = challengeEl.GetString(); @@ -246,133 +203,147 @@ namespace VNLib.Data.Caching.ObjectCache.Server //Verified, now we can create an auth message with a short expiration using JsonWebToken auth = new(); + + auth.WriteHeader(NodeConfiguration.GetJwtHeader()); + auth.InitPayloadClaim() + .AddClaim("aud", AudienceLocalServerId) + .AddClaim("exp", entity.RequestedTimeUtc.Add(AuthTokenExpiration).ToUnixTimeSeconds()) + .AddClaim("nonce", RandomHash.GetRandomBase32(8)) + .AddClaim("chl", challenge!) + //Set the ispeer flag if the request was signed by a cache server + .AddClaim("isPeer", isPeer) + //Specify the server's node id if set + .AddClaim("sub", nodeId!) + //Set ip address + .AddClaim("ip", entity.TrustedRemoteIp.ToString()) + //Add negotiaion args + .AddClaim(FBMClient.REQ_HEAD_BUF_QUERY_ARG, CacheConfig.MaxHeaderBufferSize) + .AddClaim(FBMClient.REQ_RECV_BUF_QUERY_ARG, CacheConfig.MaxRecvBufferSize) + .AddClaim(FBMClient.REQ_MAX_MESS_QUERY_ARG, CacheConfig.MaxMessageSize) + .CommitClaims(); + + //Sign the auth message from our private key + NodeConfiguration.SignJwt(auth); - //Sign the auth message from the cache certificate's private key - using (ReadOnlyJsonWebKey cert = await GetCachePrivateKeyAsync()) - { - auth.WriteHeader(cert.JwtHeader); - auth.InitPayloadClaim() - .AddClaim("aud", AudienceLocalServerId) - .AddClaim("exp", entity.RequestedTimeUtc.Add(AuthTokenExpiration).ToUnixTimeSeconds()) - .AddClaim("nonce", RandomHash.GetRandomBase32(8)) - .AddClaim("chl", challenge!) - //Set the ispeer flag if the request was signed by a cache server - .AddClaim("isPeer", isPeer) - //Specify the server's node id if set - .AddClaim("sub", nodeId!) - //Add negotiaion args - .AddClaim(FBMClient.REQ_HEAD_BUF_QUERY_ARG, CacheConfig.MaxHeaderBufferSize) - .AddClaim(FBMClient.REQ_RECV_BUF_QUERY_ARG, CacheConfig.MaxRecvBufferSize) - .AddClaim(FBMClient.REQ_MAX_MESS_QUERY_ARG, CacheConfig.MaxMessageSize) - .CommitClaims(); - - auth.SignFromJwk(cert); - } - //Close response entity.CloseResponse(HttpStatusCode.OK, ContentType.Text, auth.DataBuffer); return VfReturnType.VirtualSkip; } - - //Background worker to process event queue items - async Task IAsyncBackgroundWork.DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken) + protected override async ValueTask WebsocketRequestedAsync(HttpEntity entity) { - try - { - //Listen for changes - while (true) - { - ChangeEvent ev = await Store.EventQueue.DequeueAsync(exitToken); + //Parse jwt from authorization + string? jwtAuth = entity.Server.Headers[HttpRequestHeader.Authorization]; - //Add event to queues - foreach (AsyncQueue queue in StatefulEventQueue.Values) - { - if (!queue.TryEnque(ev)) - { - Log.Debug("Listener queue has exeeded capacity, change events will be lost"); - } - } - } - } - catch (OperationCanceledException) + if (string.IsNullOrWhiteSpace(jwtAuth)) { - //Normal exit + entity.CloseResponse(HttpStatusCode.Unauthorized); + return VfReturnType.VirtualSkip; } - } - private class WsUserState - { - public int RecvBufferSize { get; init; } - public int MaxHeaderBufferSize { get; init; } - public int MaxMessageSize { get; init; } - public int MaxResponseBufferSize { get; init; } - public AsyncQueue? SyncQueue { get; init; } + //Get the upgrade signature header + string? clientSignature = entity.Server.Headers[FBMDataCacheExtensions.X_UPGRADE_SIG_HEADER]; - public override string ToString() + if (string.IsNullOrWhiteSpace(clientSignature)) { - return - $"{nameof(RecvBufferSize)}:{RecvBufferSize}, {nameof(MaxHeaderBufferSize)}: {MaxHeaderBufferSize}, {nameof(MaxMessageSize)}:{MaxMessageSize}, {nameof(MaxResponseBufferSize)}:{MaxResponseBufferSize}"; + entity.CloseResponse(HttpStatusCode.Unauthorized); + return VfReturnType.VirtualSkip; } - } - protected override async ValueTask WebsocketRequestedAsync(HttpEntity entity) - { - try + string? nodeId = null; + ICachePeerAdvertisment? discoveryAd = null; + + //Parse jwt + using (JsonWebToken jwt = JsonWebToken.Parse(jwtAuth)) { - //Parse jwt from authorization - string? jwtAuth = entity.Server.Headers[HttpRequestHeader.Authorization]; + //verify signature against the cache public key, since this server must have signed it + if (!NodeConfiguration.VerifyCache(jwt)) + { + entity.CloseResponse(HttpStatusCode.Unauthorized); + return VfReturnType.VirtualSkip; + } + + //Recover json body + using JsonDocument doc = jwt.GetPayload(); + + //Verify audience, expiration - if (string.IsNullOrWhiteSpace(jwtAuth)) + if (!doc.RootElement.TryGetProperty("aud", out JsonElement audEl) || !AudienceLocalServerId.Equals(audEl.GetString(), StringComparison.OrdinalIgnoreCase)) + { + entity.CloseResponse(HttpStatusCode.Unauthorized); + return VfReturnType.VirtualSkip; + } + + if (!doc.RootElement.TryGetProperty("exp", out JsonElement expEl) + || DateTimeOffset.FromUnixTimeSeconds(expEl.GetInt64()) < entity.RequestedTimeUtc) { entity.CloseResponse(HttpStatusCode.Unauthorized); return VfReturnType.VirtualSkip; } - - string? nodeId = null; - //Parse jwt - using (JsonWebToken jwt = JsonWebToken.Parse(jwtAuth)) + //Check node ip address matches if required + if (VerifyIp) { - //Get the client public key certificate to verify the client's message - using (ReadOnlyJsonWebKey cert = await GetCachePubAsync()) + if (!doc.RootElement.TryGetProperty("ip", out JsonElement ipEl)) { - //verify signature against the cache public key, since this server must have signed it - if (!jwt.VerifyFromJwk(cert)) - { - entity.CloseResponse(HttpStatusCode.Unauthorized); - return VfReturnType.VirtualSkip; - } + entity.CloseResponse(HttpStatusCode.Unauthorized); + return VfReturnType.VirtualSkip; } - - //Recover json body - using JsonDocument doc = jwt.GetPayload(); - - //Verify audience, expiration - if (!doc.RootElement.TryGetProperty("aud", out JsonElement audEl) || !AudienceLocalServerId.Equals(audEl.GetString(), StringComparison.OrdinalIgnoreCase)) + string? clientIp = ipEl.GetString(); + //Verify the client ip address matches the one in the token + if (clientIp == null || !IPAddress.TryParse(clientIp, out IPAddress? clientIpAddr) || !clientIpAddr.Equals(entity.TrustedRemoteIp)) { entity.CloseResponse(HttpStatusCode.Unauthorized); return VfReturnType.VirtualSkip; } + } + + //Check if the client is a peer + bool isPeer = doc.RootElement.TryGetProperty("isPeer", out JsonElement isPeerEl) && isPeerEl.GetBoolean(); - if (!doc.RootElement.TryGetProperty("exp", out JsonElement expEl) - || DateTimeOffset.FromUnixTimeSeconds(expEl.GetInt64()) < entity.RequestedTimeUtc) + //The node id is optional and stored in the 'sub' field, ignore if the client is not a peer + if (isPeer && doc.RootElement.TryGetProperty("sub", out JsonElement servIdEl)) + { + nodeId = servIdEl.GetString(); + } + + //Verify the signature the client included of the auth token + + if (isPeer) + { + //Verify token signature against a fellow cache public key + if (!NodeConfiguration.VerifyUpgradeToken(clientSignature, jwtAuth)) { entity.CloseResponse(HttpStatusCode.Unauthorized); return VfReturnType.VirtualSkip; } - //Check if the client is a peer - bool isPeer = doc.RootElement.TryGetProperty("isPeer", out JsonElement isPeerEl) && isPeerEl.GetBoolean(); + //Try to get the node advertisement header + string? discoveryHeader = entity.Server.Headers[FBMDataCacheExtensions.X_NODE_DISCOVERY_HEADER]; - //The node id is optional and stored in the 'sub' field, ignore if the client is not a peer - if (isPeer && doc.RootElement.TryGetProperty("sub", out JsonElement servIdEl)) + //Verify the node advertisement header and publish it + if (!string.IsNullOrWhiteSpace(discoveryHeader)) { - nodeId = servIdEl.GetString(); + discoveryAd = NodeConfiguration.VerifyPeerAdvertisment(discoveryHeader); } } - + else + { + //Not a peer, so verify against the client's public key + using ReadOnlyJsonWebKey clientPub = await KeyStore.GetClientPublicKeyAsync(); + + //Verify token signature + if (!FBMDataCacheExtensions.VerifyUpgradeToken(clientSignature, jwtAuth, clientPub)) + { + entity.CloseResponse(HttpStatusCode.Unauthorized); + return VfReturnType.VirtualSkip; + } + } + } + + try + { //Get query config suggestions from the client string recvBufCmd = entity.QueryArgs[FBMClient.REQ_RECV_BUF_QUERY_ARG]; string maxHeaderCharCmd = entity.QueryArgs[FBMClient.REQ_HEAD_BUF_QUERY_ARG]; @@ -382,34 +353,6 @@ namespace VNLib.Data.Caching.ObjectCache.Server int recvBufSize = int.TryParse(recvBufCmd, out int rbs) ? rbs : CacheConfig.MinRecvBufferSize; int maxHeadBufSize = int.TryParse(maxHeaderCharCmd, out int hbs) ? hbs : CacheConfig.MinHeaderBufferSize; int maxMessageSize = int.TryParse(maxMessageSizeCmd, out int mxs) ? mxs : CacheConfig.MaxMessageSize; - - AsyncQueue? nodeQueue = null; - - //The connection may be a caching server node, so get its node-id - if (!string.IsNullOrWhiteSpace(nodeId)) - { - /* - * Store a new async queue, or get an old queue for the current node - * - * We should use a bounded queue and disacard LRU items, we also know - * only a single writer is needed as the queue is processed on a single thread - * and change events may be processed on mutliple threads. - */ - - BoundedChannelOptions queueOptions = new(CacheConfig.MaxEventQueueDepth) - { - AllowSynchronousContinuations = true, - SingleReader = false, - SingleWriter = true, - //Drop oldest item in queue if full - FullMode = BoundedChannelFullMode.DropOldest, - }; - - _ = StatefulEventQueue.TryAdd(nodeId, new(queueOptions)); - - //Get the queue - nodeQueue = StatefulEventQueue[nodeId]; - } /* * Buffer sizing can get messy as the response/resquest sizes can vary @@ -434,7 +377,8 @@ namespace VNLib.Data.Caching.ObjectCache.Server */ MaxResponseBufferSize = (int)MemoryUtil.NearestPage(maxMessageSizeClamp), - SyncQueue = nodeQueue + NodeId = nodeId, + Advertisment = discoveryAd }; Log.Debug("Client recv buffer suggestion {recv}, header buffer size {head}, response buffer size {r}", recvBufCmd, maxHeaderCharCmd, state.MaxResponseBufferSize); @@ -454,14 +398,19 @@ namespace VNLib.Data.Caching.ObjectCache.Server private async Task WebsocketAcceptedAsync(WebSocketSession wss) { + WsUserState state = (WsUserState)wss.UserState!; + + //Notify peers of new connection + Peers.OnPeerConnected(state); + //Inc connected count Interlocked.Increment(ref _connectedClients); + //Register plugin exit token to cancel the connected socket - CancellationTokenRegistration reg = Pbase.UnloadToken.Register(wss.CancelAll); + CancellationTokenRegistration reg = this.GetPlugin().UnloadToken.Register(wss.CancelAll); + try { - WsUserState state = (wss.UserState as WsUserState)!; - //Init listener args from request FBMListenerSessionParams args = new() { @@ -473,12 +422,33 @@ namespace VNLib.Data.Caching.ObjectCache.Server HeaderEncoding = Helpers.DefaultEncoding, }; - //Listen for requests - await Store.ListenAsync(wss, args, state.SyncQueue); + //Check if the client is a peer node, if it is, subscribe to change events + if (!string.IsNullOrWhiteSpace(state.NodeId)) + { + //Get the event queue for the current node + AsyncQueue queue = PubSubManager.Subscribe(state); + + try + { + //Begin listening for messages with a queue + await Store.ListenAsync(wss, args, queue); + } + finally + { + //ALAWYS Detatch listener + PubSubManager.Unsubscribe(state); + } + } + else + { + //Begin listening for messages without a queue + await Store.ListenAsync(wss, args, null); + } } catch (OperationCanceledException) { Log.Debug("Websocket connection was canceled"); + //Disconnect the socket await wss.CloseSocketOutputAsync(System.Net.WebSockets.WebSocketCloseStatus.NormalClosure, "unload", CancellationToken.None); } @@ -490,35 +460,68 @@ namespace VNLib.Data.Caching.ObjectCache.Server { //Dec connected count Interlocked.Decrement(ref _connectedClients); + //Unregister the reg.Unregister(); } + + //Notify monitor of disconnect + Peers.OnPeerDisconnected(state); + Log.Debug("Server websocket exited"); } - - private sealed class CacheStore : ICacheStore + + //Background worker to process event queue items + async Task IAsyncBackgroundWork.DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken) { - private readonly BlobCacheListener _cache; + const int accumulatorSize = 64; - public CacheStore(BlobCacheListener cache) + try { - _cache = cache; - } + //Accumulator for events + ChangeEvent[] accumulator = new ChangeEvent[accumulatorSize]; + int ptr = 0; - ValueTask ICacheStore.AddOrUpdateBlobAsync(string objectId, string? alternateId, GetBodyDataCallback bodyData, T state, CancellationToken token) - { - return _cache.Cache.AddOrUpdateObjectAsync(objectId, alternateId, bodyData, state, default, token); - } + //Listen for changes + while (true) + { + //Wait for next event + accumulator[ptr++] = await Store.EventQueue.DequeueAsync(exitToken); - void ICacheStore.Clear() + //try to accumulate more events until we can't anymore + while (Store.EventQueue.TryDequeue(out ChangeEvent? ev) && ptr < accumulatorSize) + { + accumulator[ptr++] = ev; + } + + //Publish all events to subscribers + PubSubManager.PublishMultiple(accumulator.AsSpan(0, ptr)); + + //Reset pointer + ptr = 0; + } + } + catch (OperationCanceledException) { - throw new NotImplementedException(); + //Normal exit + pluginLog.Debug("Change queue listener worker exited"); } + } + + private class WsUserState : ICachePeer + { + public int RecvBufferSize { get; init; } + public int MaxHeaderBufferSize { get; init; } + public int MaxMessageSize { get; init; } + public int MaxResponseBufferSize { get; init; } + public string? NodeId { get; init; } + public ICachePeerAdvertisment? Advertisment { get; init; } - ValueTask ICacheStore.DeleteItemAsync(string id, CancellationToken token) + public override string ToString() { - return _cache.Cache.DeleteObjectAsync(id, token); + return + $"{nameof(RecvBufferSize)}:{RecvBufferSize}, {nameof(MaxHeaderBufferSize)}: {MaxHeaderBufferSize}, {nameof(MaxMessageSize)}:{MaxMessageSize}, {nameof(MaxResponseBufferSize)}:{MaxResponseBufferSize}"; } } } diff --git a/plugins/ObjectCacheServer/src/Endpoints/ICacheStore.cs b/plugins/ObjectCacheServer/src/Endpoints/ICacheStore.cs deleted file mode 100644 index f911af9..0000000 --- a/plugins/ObjectCacheServer/src/Endpoints/ICacheStore.cs +++ /dev/null @@ -1,56 +0,0 @@ -/* -* Copyright (c) 2023 Vaughn Nugent -* -* Library: VNLib -* Package: ObjectCacheServer -* File: ICacheStore.cs -* -* ICacheStore.cs is part of ObjectCacheServer which is part of the larger -* VNLib collection of libraries and utilities. -* -* ObjectCacheServer is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License as -* published by the Free Software Foundation, either version 3 of the -* License, or (at your option) any later version. -* -* ObjectCacheServer is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see https://www.gnu.org/licenses/. -*/ - -using System.Threading; -using System.Threading.Tasks; - -namespace VNLib.Data.Caching.ObjectCache.Server -{ - internal interface ICacheStore - { - /// - /// Asynchronously adds or updates an object in the store and optionally update's its id - /// - /// The current (or old) id of the object - /// An optional id to update the blob to - /// A callback that returns the data for the blob - /// The state parameter to pass to the data callback - /// A token to cancel the async operation - /// A value task that represents the async operation - ValueTask AddOrUpdateBlobAsync(string objectId, string? alternateId, GetBodyDataCallback bodyData, T state, CancellationToken token = default); - - /// - /// Clears all items from the store - /// - void Clear(); - - /// - /// Asynchronously deletes a previously stored item - /// - /// The id of the object to delete - /// A token to cancel the async lock await - /// A task that completes when the item has been deleted - ValueTask DeleteItemAsync(string id, CancellationToken token = default); - } -} diff --git a/plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs b/plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs new file mode 100644 index 0000000..670d624 --- /dev/null +++ b/plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs @@ -0,0 +1,129 @@ +/* +* Copyright (c) 2023 Vaughn Nugent +* +* Library: VNLib +* Package: ObjectCacheServer +* File: PeerDiscoveryEndpoint.cs +* +* PeerDiscoveryEndpoint.cs is part of ObjectCacheServer which is part of the larger +* VNLib collection of libraries and utilities. +* +* ObjectCacheServer is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* ObjectCacheServer is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Linq; +using System.Net; +using System.Text.Json; +using System.Threading.Tasks; + +using VNLib.Hashing; +using VNLib.Hashing.IdentityUtility; +using VNLib.Plugins; +using VNLib.Plugins.Essentials; +using VNLib.Plugins.Essentials.Endpoints; +using VNLib.Plugins.Essentials.Extensions; +using VNLib.Plugins.Extensions.Loading; +using VNLib.Data.Caching.Extensions; +using VNLib.Data.Caching.ObjectCache.Server.Distribution; + +namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints +{ + [ConfigurationName("discovery_endpoint")] + internal sealed class PeerDiscoveryEndpoint : UnprotectedWebEndpoint + { + private readonly IPeerMonitor PeerMonitor; + private readonly NodeConfig Config; + + public PeerDiscoveryEndpoint(PluginBase plugin, IConfigScope config) + { + string? path = config["path"].GetString(); + + InitPathAndLog(path, plugin.Log); + + //Get the peer monitor + PeerMonitor = plugin.GetOrCreateSingleton(); + + //Get the node config + Config = plugin.GetOrCreateSingleton(); + } + + protected override async ValueTask GetAsync(HttpEntity entity) + { + //Get auth token + string? authToken = entity.Server.Headers[HttpRequestHeader.Authorization]; + + if(string.IsNullOrWhiteSpace(authToken)) + { + entity.CloseResponse(HttpStatusCode.Unauthorized); + return VfReturnType.VirtualSkip; + } + + string subject = string.Empty; + + //Parse auth token + using(JsonWebToken jwt = JsonWebToken.Parse(authToken)) + { + //try to verify against cache node first + if (!Config.Config.VerifyCache(jwt)) + { + //failed... + + //try to verify against client key + using ReadOnlyJsonWebKey clientPub = await Config.KeyStore.GetClientPublicKeyAsync(); + + if (!jwt.VerifyFromJwk(clientPub)) + { + //invalid token + entity.CloseResponse(HttpStatusCode.Unauthorized); + return VfReturnType.VirtualSkip; + } + } + + using JsonDocument payload = jwt.GetPayload(); + + subject = payload.RootElement.GetProperty("sub").GetString() ?? string.Empty; + } + + //Valid key, get peer list to send to client + ICachePeerAdvertisment[] peers = PeerMonitor.GetAllPeers() + .Where(static p => p.Advertisment != null) + .Select(static p => p.Advertisment!) + .ToArray(); + + //Build response jwt + using JsonWebToken response = new(); + + //set header from cache config + response.WriteHeader(Config.Config.GetJwtHeader()); + + response.InitPayloadClaim() + .AddClaim("iss", Config.Config.NodeId) + //Audience is the requestor id + .AddClaim("sub", subject) + .AddClaim("iat", entity.RequestedTimeUtc.ToUnixTimeSeconds()) + //Send all peers as a json array + .AddClaim("peers", peers) + .AddClaim("nonce", RandomHash.GetRandomBase32(24)) + .CommitClaims(); + + //Sign the response + Config.Config.SignJwt(response); + + //Send response to client + entity.CloseResponse(HttpStatusCode.OK, Net.Http.ContentType.Text, response.DataBuffer); + return VfReturnType.VirtualSkip; + } + } +} -- cgit