diff options
author | vnugent <public@vaughnnugent.com> | 2023-01-12 17:47:40 -0500 |
---|---|---|
committer | vnugent <public@vaughnnugent.com> | 2023-01-12 17:47:40 -0500 |
commit | b75668b164d398b99ee942beced06aa27ef65a50 (patch) | |
tree | c1faf6df3caa78083dcc38eb1a7247e456bbe754 /plugins/ObjectCacheServer/src | |
parent | cea64e619e714f6dbe51d37ca8329b58d8c271fb (diff) |
Large project reorder and consolidation
Diffstat (limited to 'plugins/ObjectCacheServer/src')
4 files changed, 1147 insertions, 0 deletions
diff --git a/plugins/ObjectCacheServer/src/Endpoints/BrokerHeartBeat.cs b/plugins/ObjectCacheServer/src/Endpoints/BrokerHeartBeat.cs new file mode 100644 index 0000000..bd1233e --- /dev/null +++ b/plugins/ObjectCacheServer/src/Endpoints/BrokerHeartBeat.cs @@ -0,0 +1,130 @@ +/* +* Copyright (c) 2022 Vaughn Nugent +* +* Library: VNLib +* Package: ObjectCacheServer +* File: BrokerHeartBeat.cs +* +* BrokerHeartBeat.cs is part of ObjectCacheServer which is part of the larger +* VNLib collection of libraries and utilities. +* +* ObjectCacheServer is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* ObjectCacheServer is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Net; +using System.Linq; +using System.Text.Json; +using System.Threading; +using System.Threading.Tasks; +using System.Collections.Generic; + +using VNLib.Hashing.IdentityUtility; +using VNLib.Plugins.Essentials.Endpoints; +using VNLib.Plugins.Essentials.Extensions; +using VNLib.Plugins.Extensions.Loading; + +namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints +{ + internal sealed class BrokerHeartBeat : ResourceEndpointBase + { + public override string Path => "/heartbeat"; + + private readonly Func<string> Token; + private readonly ManualResetEvent KeepaliveSet; + private readonly Task<IPAddress[]> BrokerIpList; + private readonly PluginBase Pbase; + + protected override ProtectionSettings EndpointProtectionSettings { get; } = new() + { + DisableBrowsersOnly = true, + DisableSessionsRequired = true, + DisableVerifySessionCors = true + }; + + public BrokerHeartBeat(Func<string> token, ManualResetEvent keepaliveSet, Uri brokerUri, PluginBase pbase) + { + Token = token; + KeepaliveSet = keepaliveSet; + BrokerIpList = Dns.GetHostAddressesAsync(brokerUri.DnsSafeHost); + + this.Pbase = pbase; + } + + private async Task<ReadOnlyJsonWebKey> GetBrokerPubAsync() + { + return await Pbase.TryGetSecretAsync("broker_public_key").ToJsonWebKey() ?? throw new KeyNotFoundException("Missing required secret : broker_public_key"); + } + + protected override async ValueTask<VfReturnType> GetAsync(HttpEntity entity) + { + //If-not loopback then verify server address + if (!entity.Server.IsLoopBack()) + { + //Load and verify the broker's ip address matches with an address we have stored + IPAddress[] addresses = await BrokerIpList; + if (!addresses.Contains(entity.TrustedRemoteIp)) + { + //Token invalid + entity.CloseResponse(HttpStatusCode.Forbidden); + return VfReturnType.VirtualSkip; + } + } + //Get the authorization jwt + string? jwtAuth = entity.Server.Headers[HttpRequestHeader.Authorization]; + + if (string.IsNullOrWhiteSpace(jwtAuth)) + { + //Token invalid + entity.CloseResponse(HttpStatusCode.Forbidden); + return VfReturnType.VirtualSkip; + } + + //Parse the jwt + using JsonWebToken jwt = JsonWebToken.Parse(jwtAuth); + + //Verify the jwt using the broker's public key certificate + using (ReadOnlyJsonWebKey cert = await GetBrokerPubAsync()) + { + //Verify the jwt + if (!jwt.VerifyFromJwk(cert)) + { + //Token invalid + entity.CloseResponse(HttpStatusCode.Forbidden); + return VfReturnType.VirtualSkip; + } + } + + string? auth; + //Recover the auth token from the jwt + using (JsonDocument doc = jwt.GetPayload()) + { + auth = doc.RootElement.GetProperty("token").GetString(); + } + + //Verify token + if(Token().Equals(auth, StringComparison.Ordinal)) + { + //Signal keepalive + KeepaliveSet.Set(); + entity.CloseResponse(HttpStatusCode.OK); + return VfReturnType.VirtualSkip; + } + + //Token invalid + entity.CloseResponse(HttpStatusCode.Forbidden); + return VfReturnType.VirtualSkip; + } + } +} diff --git a/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs b/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs new file mode 100644 index 0000000..2fe0994 --- /dev/null +++ b/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs @@ -0,0 +1,399 @@ +/* +* Copyright (c) 2022 Vaughn Nugent +* +* Library: VNLib +* Package: ObjectCacheServer +* File: ConnectEndpoint.cs +* +* ConnectEndpoint.cs is part of ObjectCacheServer which is part of the larger +* VNLib collection of libraries and utilities. +* +* ObjectCacheServer is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* ObjectCacheServer is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Net; +using System.Text.Json; +using System.Threading; +using System.Threading.Tasks; +using System.Threading.Channels; +using System.Collections.Generic; +using System.Collections.Concurrent; + +using VNLib.Net.Http; +using VNLib.Hashing; +using VNLib.Utils.Async; +using VNLib.Utils.Logging; +using VNLib.Hashing.IdentityUtility; +using VNLib.Net.Messaging.FBM; +using VNLib.Net.Messaging.FBM.Client; +using VNLib.Net.Messaging.FBM.Server; +using VNLib.Data.Caching.ObjectCache; +using VNLib.Plugins.Extensions.Loading; +using VNLib.Plugins.Essentials.Endpoints; +using VNLib.Plugins.Essentials.Extensions; + + +namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints +{ + internal sealed class ConnectEndpoint : ResourceEndpointBase + { + const int MAX_RECV_BUF_SIZE = 1000 * 1024; + const int MIN_RECV_BUF_SIZE = 8 * 1024; + const int MAX_HEAD_BUF_SIZE = 2048; + const int MIN_MESSAGE_SIZE = 10 * 1024; + const int MAX_MESSAGE_SIZE = 1000 * 1024; + const int MIN_HEAD_BUF_SIZE = 128; + const int MAX_EVENT_QUEUE_SIZE = 10000; + const int MAX_RESPONSE_BUFFER_SIZE = 10 * 1024; + + private static readonly TimeSpan AuthTokenExpiration = TimeSpan.FromSeconds(30); + + private readonly string AudienceLocalServerId; + private readonly ObjectCacheStore Store; + private readonly PluginBase Pbase; + + private readonly ConcurrentDictionary<string, AsyncQueue<ChangeEvent>> StatefulEventQueue; + + private uint _connectedClients; + + public uint ConnectedClients => _connectedClients; + + //Loosen up protection settings + protected override ProtectionSettings EndpointProtectionSettings { get; } = new() + { + DisableBrowsersOnly = true, + DisableSessionsRequired = true, + DisableCrossSiteDenied = true + }; + + public ConnectEndpoint(string path, ObjectCacheStore store, PluginBase pbase) + { + InitPathAndLog(path, pbase.Log); + Store = store;//Load client public key to verify signed messages + Pbase = pbase; + + StatefulEventQueue = new(StringComparer.OrdinalIgnoreCase); + + //Start the queue worker + _ = pbase.DeferTask(() => ChangeWorkerAsync(pbase.UnloadToken), 10); + + AudienceLocalServerId = Guid.NewGuid().ToString("N"); + } + + /* + * Used as a client negotiation and verification request + * + * The token created during this request will be verified by the client + * and is already verified by this server, will be passed back + * via the authorization header during the websocket upgrade. + * + * This server must verify the authenticity of the returned token + * + * The tokens are very short lived as requests are intended to be made + * directly after verification + */ + + protected override async ValueTask<VfReturnType> GetAsync(HttpEntity entity) + { + //Parse jwt from authoriation + string? jwtAuth = entity.Server.Headers[HttpRequestHeader.Authorization]; + if (string.IsNullOrWhiteSpace(jwtAuth)) + { + entity.CloseResponse(HttpStatusCode.Unauthorized); + return VfReturnType.VirtualSkip; + } + + string? nodeId = null; + string? challenge = null; + bool isPeer = false; + + // Parse jwt + using (JsonWebToken jwt = JsonWebToken.Parse(jwtAuth)) + { + bool verified = false; + + //Get the client public key certificate to verify the client's message + using(ReadOnlyJsonWebKey cert = await GetClientPubAsync()) + { + //verify signature for client + if (jwt.VerifyFromJwk(cert)) + { + verified = true; + } + //May be signed by a cahce server + else + { + using ReadOnlyJsonWebKey cacheCert = await GetCachePubAsync(); + + //Set peer and verified flag since the another cache server signed the request + isPeer = verified = jwt.VerifyFromJwk(cacheCert); + } + } + + //Check flag + if (!verified) + { + Log.Information("Client signature verification failed"); + entity.CloseResponse(HttpStatusCode.Unauthorized); + return VfReturnType.VirtualSkip; + } + + //Recover json body + using JsonDocument doc = jwt.GetPayload(); + if (doc.RootElement.TryGetProperty("sub", out JsonElement servIdEl)) + { + nodeId = servIdEl.GetString(); + } + if (doc.RootElement.TryGetProperty("chl", out JsonElement challengeEl)) + { + challenge = challengeEl.GetString(); + } + } + + Log.Debug("Received negotiation request from node {node}", nodeId); + //Verified, now we can create an auth message with a short expiration + using JsonWebToken auth = new(); + //Sign the auth message from the cache certificate's private key + using (ReadOnlyJsonWebKey cert = await GetCachePrivateKeyAsync()) + { + auth.WriteHeader(cert.JwtHeader); + auth.InitPayloadClaim() + .AddClaim("aud", AudienceLocalServerId) + .AddClaim("exp", DateTimeOffset.UtcNow.Add(AuthTokenExpiration).ToUnixTimeSeconds()) + .AddClaim("nonce", RandomHash.GetRandomBase32(8)) + .AddClaim("chl", challenge!) + //Set the ispeer flag if the request was signed by a cache server + .AddClaim("isPeer", isPeer) + //Specify the server's node id if set + .AddClaim("sub", nodeId!) + //Add negotiaion args + .AddClaim(FBMClient.REQ_HEAD_BUF_QUERY_ARG, MAX_HEAD_BUF_SIZE) + .AddClaim(FBMClient.REQ_RECV_BUF_QUERY_ARG, MAX_RECV_BUF_SIZE) + .AddClaim(FBMClient.REQ_MAX_MESS_QUERY_ARG, MAX_MESSAGE_SIZE) + .CommitClaims(); + + auth.SignFromJwk(cert); + } + + //Close response + entity.CloseResponse(HttpStatusCode.OK, ContentType.Text, auth.DataBuffer); + return VfReturnType.VirtualSkip; + } + + private async Task<ReadOnlyJsonWebKey> GetClientPubAsync() + { + return await Pbase.TryGetSecretAsync("client_public_key").ToJsonWebKey() ?? throw new KeyNotFoundException("Missing required secret : client_public_key"); + } + private async Task<ReadOnlyJsonWebKey> GetCachePubAsync() + { + return await Pbase.TryGetSecretAsync("cache_public_key").ToJsonWebKey() ?? throw new KeyNotFoundException("Missing required secret : client_public_key"); + } + private async Task<ReadOnlyJsonWebKey> GetCachePrivateKeyAsync() + { + return await Pbase.TryGetSecretAsync("cache_private_key").ToJsonWebKey() ?? throw new KeyNotFoundException("Missing required secret : client_public_key"); + } + + private async Task ChangeWorkerAsync(CancellationToken cancellation) + { + try + { + //Listen for changes + while (true) + { + ChangeEvent ev = await Store.EventQueue.DequeueAsync(cancellation); + //Add event to queues + foreach (AsyncQueue<ChangeEvent> queue in StatefulEventQueue.Values) + { + if (!queue.TryEnque(ev)) + { + Log.Debug("Listener queue has exeeded capacity, change events will be lost"); + } + } + } + } + catch (OperationCanceledException) + { } + catch (Exception ex) + { + Log.Error(ex); + } + } + + private class WsUserState + { + public int RecvBufferSize { get; init; } + public int MaxHeaderBufferSize { get; init; } + public int MaxMessageSize { get; init; } + public int MaxResponseBufferSize { get; init; } + public AsyncQueue<ChangeEvent>? SyncQueue { get; init; } + } + + protected override async ValueTask<VfReturnType> WebsocketRequestedAsync(HttpEntity entity) + { + try + { + //Parse jwt from authorization + string? jwtAuth = entity.Server.Headers[HttpRequestHeader.Authorization]; + if (string.IsNullOrWhiteSpace(jwtAuth)) + { + entity.CloseResponse(HttpStatusCode.Unauthorized); + return VfReturnType.VirtualSkip; + } + + string? nodeId = null; + //Parse jwt + using (JsonWebToken jwt = JsonWebToken.Parse(jwtAuth)) + { + //Get the client public key certificate to verify the client's message + using (ReadOnlyJsonWebKey cert = await GetCachePubAsync()) + { + //verify signature against the cache public key, since this server must have signed it + if (!jwt.VerifyFromJwk(cert)) + { + entity.CloseResponse(HttpStatusCode.Unauthorized); + return VfReturnType.VirtualSkip; + } + } + + //Recover json body + using JsonDocument doc = jwt.GetPayload(); + + //Verify audience, expiration + + if (!doc.RootElement.TryGetProperty("aud", out JsonElement audEl) || !AudienceLocalServerId.Equals(audEl.GetString(), StringComparison.OrdinalIgnoreCase)) + { + entity.CloseResponse(HttpStatusCode.Unauthorized); + return VfReturnType.VirtualSkip; + } + + if (!doc.RootElement.TryGetProperty("exp", out JsonElement expEl) + || DateTimeOffset.FromUnixTimeSeconds(expEl.GetInt64()) < DateTimeOffset.UtcNow) + { + entity.CloseResponse(HttpStatusCode.Unauthorized); + return VfReturnType.VirtualSkip; + } + + //Check if the client is a peer + bool isPeer = doc.RootElement.TryGetProperty("isPeer", out JsonElement isPeerEl) && isPeerEl.GetBoolean(); + + //The node id is optional and stored in the 'sub' field, ignore if the client is not a peer + if (isPeer && doc.RootElement.TryGetProperty("sub", out JsonElement servIdEl)) + { + nodeId = servIdEl.GetString(); + } + } + + //Get query config suggestions from the client + string recvBufCmd = entity.QueryArgs[FBMClient.REQ_RECV_BUF_QUERY_ARG]; + string maxHeaderCharCmd = entity.QueryArgs[FBMClient.REQ_HEAD_BUF_QUERY_ARG]; + string maxMessageSizeCmd = entity.QueryArgs[FBMClient.REQ_MAX_MESS_QUERY_ARG]; + + //Parse recv buffer size + int recvBufSize = int.TryParse(recvBufCmd, out int rbs) ? rbs : MIN_RECV_BUF_SIZE; + int maxHeadBufSize = int.TryParse(maxHeaderCharCmd, out int hbs) ? hbs : MIN_HEAD_BUF_SIZE; + int maxMessageSize = int.TryParse(maxMessageSizeCmd, out int mxs) ? mxs : MIN_MESSAGE_SIZE; + + AsyncQueue<ChangeEvent>? nodeQueue = null; + //The connection may be a caching server node, so get its node-id + if (!string.IsNullOrWhiteSpace(nodeId)) + { + /* + * Store a new async queue, or get an old queue for the current node + * + * We should use a bounded queue and disacard LRU items, we also know + * only a single writer is needed as the queue is processed on a single thread + * and change events may be processed on mutliple threads. + */ + + BoundedChannelOptions queueOptions = new(MAX_EVENT_QUEUE_SIZE) + { + AllowSynchronousContinuations = true, + SingleReader = false, + SingleWriter = true, + //Drop oldest item in queue if full + FullMode = BoundedChannelFullMode.DropOldest, + }; + + _ = StatefulEventQueue.TryAdd(nodeId, new(queueOptions)); + //Get the queue + nodeQueue = StatefulEventQueue[nodeId]; + } + + //Init new ws state object and clamp the suggested buffer sizes + WsUserState state = new() + { + RecvBufferSize = Math.Clamp(recvBufSize, MIN_RECV_BUF_SIZE, MAX_RECV_BUF_SIZE), + MaxHeaderBufferSize = Math.Clamp(maxHeadBufSize, MIN_HEAD_BUF_SIZE, MAX_HEAD_BUF_SIZE), + MaxMessageSize = Math.Clamp(maxMessageSize, MIN_MESSAGE_SIZE, MAX_MESSAGE_SIZE), + MaxResponseBufferSize = Math.Min(maxMessageSize, MAX_RESPONSE_BUFFER_SIZE), + SyncQueue = nodeQueue + }; + + Log.Debug("Client recv buffer suggestion {recv}, header buffer size {head}, response buffer size {r}", recvBufCmd, maxHeaderCharCmd, state.MaxResponseBufferSize); + + //Accept socket and pass state object + entity.AcceptWebSocket(WebsocketAcceptedAsync, state); + return VfReturnType.VirtualSkip; + } + catch (KeyNotFoundException) + { + return VfReturnType.BadRequest; + } + } + + private async Task WebsocketAcceptedAsync(WebSocketSession wss) + { + //Inc connected count + Interlocked.Increment(ref _connectedClients); + //Register plugin exit token to cancel the connected socket + CancellationTokenRegistration reg = Pbase.UnloadToken.Register(wss.CancelAll); + try + { + WsUserState state = (wss.UserState as WsUserState)!; + + //Init listener args from request + FBMListenerSessionParams args = new() + { + MaxMessageSize = state.MaxMessageSize, + RecvBufferSize = state.RecvBufferSize, + ResponseBufferSize = state.MaxResponseBufferSize, + MaxHeaderBufferSize = state.MaxHeaderBufferSize, + HeaderEncoding = Helpers.DefaultEncoding, + }; + + //Listen for requests + await Store.ListenAsync(wss, args, state.SyncQueue); + } + catch (OperationCanceledException) + { + Log.Debug("Websocket connection was canceled"); + //Disconnect the socket + await wss.CloseSocketOutputAsync(System.Net.WebSockets.WebSocketCloseStatus.NormalClosure, "unload", CancellationToken.None); + } + catch (Exception ex) + { + Log.Debug(ex); + } + finally + { + //Dec connected count + Interlocked.Decrement(ref _connectedClients); + //Unregister the + reg.Unregister(); + } + Log.Debug("Server websocket exited"); + } + } +} diff --git a/plugins/ObjectCacheServer/src/ObjectCacheServer.csproj b/plugins/ObjectCacheServer/src/ObjectCacheServer.csproj new file mode 100644 index 0000000..672597b --- /dev/null +++ b/plugins/ObjectCacheServer/src/ObjectCacheServer.csproj @@ -0,0 +1,51 @@ +<Project Sdk="Microsoft.NET.Sdk"> + <PropertyGroup> + <TargetFramework>net6.0</TargetFramework> + <Nullable>enable</Nullable> + <Authors>Vaughn Nugent</Authors> + <Version>1.0.1.1</Version> + <RootNamespace>VNLib.Plugins.Essentials.Sessions.Server</RootNamespace> + <Copyright>Copyright © 2023 Vaughn Nugent</Copyright> + <SignAssembly>True</SignAssembly> + <AssemblyOriginatorKeyFile>\\vaughnnugent.com\Internal\Folder Redirection\vman\Documents\Programming\Software\StrongNameingKey.snk</AssemblyOriginatorKeyFile> + </PropertyGroup> + + <!-- Resolve nuget dll files and store them in the output dir --> + <PropertyGroup> + <EnableDynamicLoading>true</EnableDynamicLoading> + <GenerateDocumentationFile>False</GenerateDocumentationFile> + <PackageProjectUrl>https://www.vaughnnugent.com/resources</PackageProjectUrl> + <AnalysisLevel>latest-all</AnalysisLevel> + <ProduceReferenceAssembly>True</ProduceReferenceAssembly> + + </PropertyGroup> + <ItemGroup> + <Compile Remove="liveplugin2\**" /> + <Compile Remove="liveplugin\**" /> + <EmbeddedResource Remove="liveplugin2\**" /> + <EmbeddedResource Remove="liveplugin\**" /> + <None Remove="liveplugin2\**" /> + <None Remove="liveplugin\**" /> + </ItemGroup> + <ItemGroup> + <PackageReference Include="ErrorProne.NET.CoreAnalyzers" Version="0.1.2"> + <PrivateAssets>all</PrivateAssets> + <IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets> + </PackageReference> + <PackageReference Include="ErrorProne.NET.Structs" Version="0.1.2"> + <PrivateAssets>all</PrivateAssets> + <IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets> + </PackageReference> + </ItemGroup> + <ItemGroup> + <ProjectReference Include="..\..\..\lib\VNLib.Data.Caching.Extensions\src\VNLib.Data.Caching.Extensions.csproj" /> + <ProjectReference Include="..\..\..\lib\VNLib.Data.Caching.ObjectCache\src\VNLib.Data.Caching.ObjectCache.csproj" /> + <ProjectReference Include="..\..\CacheBroker\src\CacheBroker.csproj" /> + </ItemGroup> + <ItemGroup> + <None Update="ObjectCacheServer.json"> + <CopyToOutputDirectory>Always</CopyToOutputDirectory> + </None> + </ItemGroup> + +</Project> diff --git a/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs b/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs new file mode 100644 index 0000000..85a7996 --- /dev/null +++ b/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs @@ -0,0 +1,567 @@ +/* +* Copyright (c) 2022 Vaughn Nugent +* +* Library: VNLib +* Package: ObjectCacheServer +* File: ObjectCacheServerEntry.cs +* +* ObjectCacheServerEntry.cs is part of ObjectCacheServer which is part of the larger +* VNLib collection of libraries and utilities. +* +* ObjectCacheServer is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* ObjectCacheServer is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.IO; +using System.Net; +using System.Linq; +using System.Net.Http; +using System.Text.Json; +using System.Threading; +using System.Net.Sockets; +using System.Threading.Tasks; +using System.Collections.Generic; +using System.Security.Cryptography; + +using VNLib.Utils.Memory; +using VNLib.Utils.Logging; +using VNLib.Utils.Extensions; +using VNLib.Hashing; +using VNLib.Hashing.IdentityUtility; +using VNLib.Data.Caching; +using VNLib.Data.Caching.Extensions; +using VNLib.Data.Caching.ObjectCache; +using static VNLib.Data.Caching.Constants; +using VNLib.Net.Messaging.FBM; +using VNLib.Net.Messaging.FBM.Client; +using VNLib.Plugins.Cache.Broker.Endpoints; +using VNLib.Plugins.Extensions.Loading; +using VNLib.Plugins.Extensions.Loading.Routing; +using VNLib.Plugins.Essentials.Sessions.Server.Endpoints; + + +namespace VNLib.Plugins.Essentials.Sessions.Server +{ + public sealed class ObjectCacheServerEntry : PluginBase + { + public override string PluginName => "ObjectCache.Service"; + + private string? BrokerHeartBeatToken; + + private readonly object ServerLock = new(); + private readonly HashSet<ActiveServer> ListeningServers = new(); + + + private void RemoveServer(ActiveServer server) + { + lock (ServerLock) + { + ListeningServers.Remove(server); + } + } + + protected override void OnLoad() + { + //Create default heap + IUnmangedHeap CacheHeap = Memory.InitializeNewHeapForProcess(); + try + { + IReadOnlyDictionary<string, JsonElement> clusterConf = this.GetConfig("cluster"); + + string brokerAddress = clusterConf["broker_address"].GetString() ?? throw new KeyNotFoundException("Missing required key 'broker_address' for config 'cluster'"); + + string swapDir = PluginConfig.GetProperty("swap_dir").GetString() ?? throw new KeyNotFoundException("Missing required key 'swap_dir' for config"); + int cacheSize = PluginConfig.GetProperty("max_cache").GetInt32(); + string connectPath = PluginConfig.GetProperty("connect_path").GetString() ?? throw new KeyNotFoundException("Missing required element 'connect_path' for config 'cluster'"); + //TimeSpan cleanupInterval = PluginConfig.GetProperty("cleanup_interval_sec").GetTimeSpan(TimeParseType.Seconds); + //TimeSpan validFor = PluginConfig.GetProperty("valid_for_sec").GetTimeSpan(TimeParseType.Seconds); + int maxMessageSize = PluginConfig.GetProperty("max_blob_size").GetInt32(); + + //Init dir + DirectoryInfo dir = new(swapDir); + dir.Create(); + //Init cache listener, single threaded reader + ObjectCacheStore CacheListener = new(dir, cacheSize, Log, CacheHeap, true); + + //Init connect endpoint + { + //Init connect endpoint + ConnectEndpoint endpoint = new(connectPath, CacheListener, this); + Route(endpoint); + } + + //Setup broker and regitration + { + //init mre to pass the broker heartbeat signal to the registration worker + ManualResetEvent mre = new(false); + //Route the broker endpoint + BrokerHeartBeat brokerEp = new(() => BrokerHeartBeatToken!, mre, new Uri(brokerAddress), this); + Route(brokerEp); + + //start registration + _ = this.DeferTask(() => RegisterServerAsync(mre), 200); + } + + //Setup cluster worker + { + //Get pre-configured fbm client config for caching + FBMClientConfig conf = FBMDataCacheExtensions.GetDefaultConfig(CacheHeap, maxMessageSize, this.IsDebug() ? Log : null); + + //Start Client runner + _ = this.DeferTask(() => RunClientAsync(CacheListener, new Uri(brokerAddress), conf), 300); + } + + //Load a cache broker to the current server if the config is defined + { + if(this.HasConfigForType<BrokerRegistrationEndpoint>()) + { + this.Route<BrokerRegistrationEndpoint>(); + } + } + + void Cleanup() + { + CacheHeap.Dispose(); + CacheListener.Dispose(); + } + + //Regsiter cleanup + _ = UnloadToken.RegisterUnobserved(Cleanup); + + Log.Information("Plugin loaded"); + } + catch (KeyNotFoundException kne) + { + CacheHeap.Dispose(); + Log.Error("Missing required configuration variables {m}", kne.Message); + } + catch + { + CacheHeap.Dispose(); + throw; + } + } + + protected override void OnUnLoad() + { + Log.Information("Plugin unloaded"); + } + + #region Registration + + private async Task RegisterServerAsync(ManualResetEvent keepaliveWait) + { + try + { + //Get the broker config element + IReadOnlyDictionary<string, JsonElement> clusterConfig = this.GetConfig("cluster"); + + //Server id is just dns name for now + string serverId = Dns.GetHostName(); + int heartBeatDelayMs = clusterConfig["heartbeat_timeout_sec"].GetInt32() * 1000; + + string? connectPath = PluginConfig.GetProperty("connect_path").GetString(); + + //Get the port of the primary webserver + int port; + bool usingTls; + { + JsonElement firstHost = HostConfig.GetProperty("virtual_hosts").EnumerateArray().First(); + + port = firstHost.GetProperty("interface") + .GetProperty("port") + .GetInt32(); + + //If a certificate is specified, tls is enabled on the port + usingTls = firstHost.TryGetProperty("cert", out _); + } + + using BrokerRegistrationRequest request = new(); + { + string addr = clusterConfig["broker_address"].GetString() ?? throw new KeyNotFoundException("Missing required key 'broker_address' for config 'cluster'"); + + //Recover the certificate + ReadOnlyJsonWebKey cacheCert = await GetCachePrivate(); + + //Init url builder for payload, see if tls is enabled + Uri connectAddress = new UriBuilder(usingTls ? Uri.UriSchemeHttps : Uri.UriSchemeHttp, Dns.GetHostName(), port, connectPath).Uri; + + request.WithBroker(new(addr)) + .WithRegistrationAddress(connectAddress.ToString()) + .WithNodeId(serverId) + .WithSigningKey(cacheCert, true); + } + + while (true) + { + try + { + //Gen a random reg token before registering + BrokerHeartBeatToken = RandomHash.GetRandomHex(32); + //Assign new hb token + request.WithHeartbeatToken(BrokerHeartBeatToken); + + Log.Information("Registering with cache broker {addr}, with node-id {id}", request.BrokerAddress, serverId); + + //Register with the broker + await FBMDataCacheExtensions.ResgisterWithBrokerAsync(request); + + Log.Debug("Successfully registered with cache broker"); + + /* + * Wait in a loop for the broker to send a keepalive + * request with the specified token. When the event + * is signaled the task will be completed + */ + while (true) + { + await Task.Delay(heartBeatDelayMs, UnloadToken); + //Set the timeout to 0 to it will just check the status without blocking + if (!keepaliveWait.WaitOne(0)) + { + //server miseed a keepalive event, time to break the loop and retry + Log.Debug("Broker missed a heartbeat request, attempting to re-register"); + break; + } + //Reset the msr + keepaliveWait.Reset(); + } + } + catch (TaskCanceledException) + { + throw; + } + catch (TimeoutException) + { + Log.Warn("Failed to connect to cache broker server within the specified timeout period"); + } + catch (HttpRequestException re) when (re.InnerException is SocketException) + { + Log.Warn("Cache broker is unavailable or network is unavailable"); + } + catch (Exception ex) + { + Log.Warn(ex, "Failed to update broker registration"); + } + + //Gen random ms delay + int randomMsDelay = RandomNumberGenerator.GetInt32(500, 2000); + //Delay + await Task.Delay(randomMsDelay, UnloadToken); + } + } + catch (KeyNotFoundException kne) + { + Log.Error("Missing required broker configuration variables {ke}", kne.Message); + } + catch (TaskCanceledException) + { + //Normal unload/exit + } + catch (Exception ex) + { + Log.Error(ex); + } + finally + { + keepaliveWait.Dispose(); + BrokerHeartBeatToken = null; + } + Log.Debug("Registration worker exited"); + } + + #endregion + + #region Cluster + + private async Task<ReadOnlyJsonWebKey> GetCachePrivate() + { + return await this.TryGetSecretAsync("cache_private_key").ToJsonWebKey() ?? throw new KeyNotFoundException("Failed to load the cache private key"); + } + + private async Task<ReadOnlyJsonWebKey> GetBrokerPublic() + { + return await this.TryGetSecretAsync("broker_public_key").ToJsonWebKey() ?? throw new KeyNotFoundException("Failed to load the broker's public key"); + } + + + /// <summary> + /// Starts a self-contained process-long task to discover other cache servers + /// from a shared broker server + /// </summary> + /// <param name="cacheStore">The cache store to synchronize</param> + /// <param name="brokerAddress">The broker server's address</param> + /// <param name="serverId">The node-id of the current server</param> + /// <param name="clientConf">The configuration to use when initializing synchronization clients</param> + /// <returns>A task that resolves when the plugin unloads</returns> + private async Task RunClientAsync(ObjectCacheStore cacheStore, Uri brokerAddress, FBMClientConfig clientConf) + { + TimeSpan noServerDelay = TimeSpan.FromSeconds(10); + string nodeId = Dns.GetHostName(); + ListServerRequest listRequest = new(brokerAddress); + try + { + //Get the broker config element + IReadOnlyDictionary<string, JsonElement> clusterConf = this.GetConfig("cluster"); + int serverCheckMs = clusterConf["update_interval_sec"].GetInt32() * 1000; + + //Setup signing and verification certificates + ReadOnlyJsonWebKey cacheSig = await GetCachePrivate(); + ReadOnlyJsonWebKey brokerPub = await GetBrokerPublic(); + //Import certificates + listRequest.WithVerificationKey(brokerPub) + .WithSigningKey(cacheSig); + + //Main event loop + Log.Information("Begining cluster node discovery"); + + ILogProvider? debugLog = this.IsDebug() ? Log : null; + + while (true) + { + //Load the server list + ActiveServer[]? servers; + while (true) + { + try + { + debugLog?.Information("[CACHE] Requesting server list from broker"); + + //Get server list + servers = await FBMDataCacheExtensions.ListServersAsync(listRequest, UnloadToken); + //Servers are loaded, so continue + break; + } + catch(HttpRequestException he) when(he.InnerException is SocketException) + { + Log.Warn("Failed to connect to cache broker, trying again"); + } + catch (TimeoutException) + { + Log.Warn("Failed to connect to cache broker server within the specified timeout period"); + } + catch (Exception ex) + { + Log.Warn(ex, "Failed to get server list from broker"); + } + //Gen random ms delay + int randomMsDelay = RandomNumberGenerator.GetInt32(1000, 2000); + //Delay + await Task.Delay(randomMsDelay, UnloadToken); + } + + if(servers == null || servers.Length == 0) + { + Log.Information("No cluster nodes found, retrying"); + //Delay + await Task.Delay(noServerDelay, UnloadToken); + continue; + } + + + //Lock on sever set while enumerating + lock (ServerLock) + { + //Select servers that are not the current server and are not already being monitored + IEnumerable<ActiveServer> serversToConnectTo = servers.Where(s => !nodeId.Equals(s.ServerId, StringComparison.OrdinalIgnoreCase)); + + //Connect to servers + foreach (ActiveServer server in serversToConnectTo) + { + //Make sure were not currently connected to the server + if (!ListeningServers.Contains(server)) + { + //Add the server to the set + ListeningServers.Add(server); + + //Run listener background task + _ = this.DeferTask(() => RunSyncTaskAsync(server, cacheStore, clientConf, nodeId)); + } + } + } + + //Delay until next check cycle + await Task.Delay(serverCheckMs, UnloadToken); + } + } + catch (FileNotFoundException) + { + Log.Error("Client/cluster private cluster key file was not found or could not be read"); + } + catch (KeyNotFoundException) + { + Log.Error("Missing required cluster configuration varables"); + } + catch (TaskCanceledException) + { + } + catch (Exception ex) + { + Log.Error(ex); + } + finally + { + listRequest.Dispose(); + } + Log.Debug("Cluster sync worker exited"); + } + + private async Task RunSyncTaskAsync(ActiveServer server, ObjectCacheStore cacheStore, FBMClientConfig conf, string nodeId) + { + //Setup client + FBMClient client = new(conf); + try + { + async Task UpdateRecordAsync(string objectId, string newId) + { + //Get request message + FBMRequest modRequest = client.RentRequest(); + try + { + //Set action as get/create + modRequest.WriteHeader(HeaderCommand.Action, Actions.Get); + //Set session-id header + modRequest.WriteHeader(ObjectId, string.IsNullOrWhiteSpace(newId) ? objectId : newId); + + //Make request + using FBMResponse response = await client.SendAsync(modRequest, UnloadToken); + + response.ThrowIfNotSet(); + + //Check response code + string status = response.Headers.First(static s => s.Key == HeaderCommand.Status).Value.ToString(); + if (ResponseCodes.Okay.Equals(status, StringComparison.Ordinal)) + { + //Update the record + await cacheStore.AddOrUpdateBlobAsync(objectId, newId, static (t) => t.ResponseBody, response); + Log.Debug("Updated object {id}", objectId); + } + else + { + Log.Warn("Object {id} was missing on the remote server", objectId); + } + } + finally + { + client.ReturnRequest(modRequest); + } + } + + { + //Sign and verify requests with the cache private key since we are a peer + ReadOnlyJsonWebKey cachePriv = await GetCachePrivate(); + + //Configure cache + client.GetCacheConfiguration() + .WithVerificationKey(cachePriv) + .WithSigningCertificate(cachePriv) + .WithNodeId(nodeId) //set nodeid since were listening for changes + .WithTls(false); + } + + Log.Information("Connecting to {server}...", server.ServerId); + + //Connect to the server + await client.ConnectToCacheAsync(server, UnloadToken); + + //Wroker task callback method + async Task BgWorkerAsync() + { + //Listen for changes + while (true) + { + //Wait for changes + WaitForChangeResult changedObject = await client.WaitForChangeAsync(UnloadToken); + + Log.Debug("Object changed {typ} {obj}", changedObject.Status, changedObject.CurrentId); + + switch (changedObject.Status) + { + case ResponseCodes.NotFound: + Log.Warn("Server cache not properly configured, worker exiting"); + return; + case "deleted": + //Delete the object from the store + _ = cacheStore.DeleteItemAsync(changedObject.CurrentId).ConfigureAwait(false); + break; + case "modified": + //Reload the record from the store + await UpdateRecordAsync(changedObject.CurrentId, changedObject.NewId); + break; + } + } + } + + Log.Information("Connected to {server}, starting queue listeners", server.ServerId); + + //Start worker tasks + List<Task> workerTasks = new(); + for(int i = 0; i < Environment.ProcessorCount; i++) + { + workerTasks.Add(Task.Run(BgWorkerAsync)); + } + + //Wait for sync workers to exit + await Task.WhenAll(workerTasks); + } + catch (InvalidResponseException ie) + { + //See if the plugin is unloading + if (!UnloadToken.IsCancellationRequested) + { + Log.Debug("Server responded with invalid response packet, disconnected. reason {reason}", ie); + } + //Disconnect client gracefully + try + { + await client.DisconnectAsync(); + } + catch (Exception ex) + { + Log.Error(ex); + } + } + catch (OperationCanceledException) + { + //Plugin unloading, Try to disconnect + try + { + await client.DisconnectAsync(); + } + catch(Exception ex) + { + Log.Error(ex); + } + } + catch(Exception ex) + { + Log.Warn("Lost connection to server {h}, {m}", server.ServerId, ex); + } + finally + { + //Remove server from active list, since its been disconnected + RemoveServer(server); + client.Dispose(); + } + } + + protected override void ProcessHostCommand(string cmd) + { + Log.Debug(cmd); + } + + + #endregion + } +} |