diff options
author | vnugent <public@vaughnnugent.com> | 2023-01-12 17:47:41 -0500 |
---|---|---|
committer | vnugent <public@vaughnnugent.com> | 2023-01-12 17:47:41 -0500 |
commit | 751e1a107195f0c9c98c866e8267a5a760545982 (patch) | |
tree | 71a775c91bfd9d455b727c72d2fb628c530f64cc /Plugins/SessionCacheServer | |
parent | 9bb5ddd8f19c0ecabd7af4ee58d80c16826bc183 (diff) |
Large project reorder and consolidation
Diffstat (limited to 'Plugins/SessionCacheServer')
-rw-r--r-- | Plugins/SessionCacheServer/Endpoints/BrokerHeartBeat.cs | 130 | ||||
-rw-r--r-- | Plugins/SessionCacheServer/Endpoints/ConnectEndpoint.cs | 399 | ||||
-rw-r--r-- | Plugins/SessionCacheServer/ObjectCacheServer.csproj | 63 | ||||
-rw-r--r-- | Plugins/SessionCacheServer/ObjectCacheServerEntry.cs | 567 |
4 files changed, 0 insertions, 1159 deletions
diff --git a/Plugins/SessionCacheServer/Endpoints/BrokerHeartBeat.cs b/Plugins/SessionCacheServer/Endpoints/BrokerHeartBeat.cs deleted file mode 100644 index bd1233e..0000000 --- a/Plugins/SessionCacheServer/Endpoints/BrokerHeartBeat.cs +++ /dev/null @@ -1,130 +0,0 @@ -/* -* Copyright (c) 2022 Vaughn Nugent -* -* Library: VNLib -* Package: ObjectCacheServer -* File: BrokerHeartBeat.cs -* -* BrokerHeartBeat.cs is part of ObjectCacheServer which is part of the larger -* VNLib collection of libraries and utilities. -* -* ObjectCacheServer is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License as -* published by the Free Software Foundation, either version 3 of the -* License, or (at your option) any later version. -* -* ObjectCacheServer is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see https://www.gnu.org/licenses/. -*/ - -using System; -using System.Net; -using System.Linq; -using System.Text.Json; -using System.Threading; -using System.Threading.Tasks; -using System.Collections.Generic; - -using VNLib.Hashing.IdentityUtility; -using VNLib.Plugins.Essentials.Endpoints; -using VNLib.Plugins.Essentials.Extensions; -using VNLib.Plugins.Extensions.Loading; - -namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints -{ - internal sealed class BrokerHeartBeat : ResourceEndpointBase - { - public override string Path => "/heartbeat"; - - private readonly Func<string> Token; - private readonly ManualResetEvent KeepaliveSet; - private readonly Task<IPAddress[]> BrokerIpList; - private readonly PluginBase Pbase; - - protected override ProtectionSettings EndpointProtectionSettings { get; } = new() - { - DisableBrowsersOnly = true, - DisableSessionsRequired = true, - DisableVerifySessionCors = true - }; - - public BrokerHeartBeat(Func<string> token, ManualResetEvent keepaliveSet, Uri brokerUri, PluginBase pbase) - { - Token = token; - KeepaliveSet = keepaliveSet; - BrokerIpList = Dns.GetHostAddressesAsync(brokerUri.DnsSafeHost); - - this.Pbase = pbase; - } - - private async Task<ReadOnlyJsonWebKey> GetBrokerPubAsync() - { - return await Pbase.TryGetSecretAsync("broker_public_key").ToJsonWebKey() ?? throw new KeyNotFoundException("Missing required secret : broker_public_key"); - } - - protected override async ValueTask<VfReturnType> GetAsync(HttpEntity entity) - { - //If-not loopback then verify server address - if (!entity.Server.IsLoopBack()) - { - //Load and verify the broker's ip address matches with an address we have stored - IPAddress[] addresses = await BrokerIpList; - if (!addresses.Contains(entity.TrustedRemoteIp)) - { - //Token invalid - entity.CloseResponse(HttpStatusCode.Forbidden); - return VfReturnType.VirtualSkip; - } - } - //Get the authorization jwt - string? jwtAuth = entity.Server.Headers[HttpRequestHeader.Authorization]; - - if (string.IsNullOrWhiteSpace(jwtAuth)) - { - //Token invalid - entity.CloseResponse(HttpStatusCode.Forbidden); - return VfReturnType.VirtualSkip; - } - - //Parse the jwt - using JsonWebToken jwt = JsonWebToken.Parse(jwtAuth); - - //Verify the jwt using the broker's public key certificate - using (ReadOnlyJsonWebKey cert = await GetBrokerPubAsync()) - { - //Verify the jwt - if (!jwt.VerifyFromJwk(cert)) - { - //Token invalid - entity.CloseResponse(HttpStatusCode.Forbidden); - return VfReturnType.VirtualSkip; - } - } - - string? auth; - //Recover the auth token from the jwt - using (JsonDocument doc = jwt.GetPayload()) - { - auth = doc.RootElement.GetProperty("token").GetString(); - } - - //Verify token - if(Token().Equals(auth, StringComparison.Ordinal)) - { - //Signal keepalive - KeepaliveSet.Set(); - entity.CloseResponse(HttpStatusCode.OK); - return VfReturnType.VirtualSkip; - } - - //Token invalid - entity.CloseResponse(HttpStatusCode.Forbidden); - return VfReturnType.VirtualSkip; - } - } -} diff --git a/Plugins/SessionCacheServer/Endpoints/ConnectEndpoint.cs b/Plugins/SessionCacheServer/Endpoints/ConnectEndpoint.cs deleted file mode 100644 index 2fe0994..0000000 --- a/Plugins/SessionCacheServer/Endpoints/ConnectEndpoint.cs +++ /dev/null @@ -1,399 +0,0 @@ -/* -* Copyright (c) 2022 Vaughn Nugent -* -* Library: VNLib -* Package: ObjectCacheServer -* File: ConnectEndpoint.cs -* -* ConnectEndpoint.cs is part of ObjectCacheServer which is part of the larger -* VNLib collection of libraries and utilities. -* -* ObjectCacheServer is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License as -* published by the Free Software Foundation, either version 3 of the -* License, or (at your option) any later version. -* -* ObjectCacheServer is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see https://www.gnu.org/licenses/. -*/ - -using System; -using System.Net; -using System.Text.Json; -using System.Threading; -using System.Threading.Tasks; -using System.Threading.Channels; -using System.Collections.Generic; -using System.Collections.Concurrent; - -using VNLib.Net.Http; -using VNLib.Hashing; -using VNLib.Utils.Async; -using VNLib.Utils.Logging; -using VNLib.Hashing.IdentityUtility; -using VNLib.Net.Messaging.FBM; -using VNLib.Net.Messaging.FBM.Client; -using VNLib.Net.Messaging.FBM.Server; -using VNLib.Data.Caching.ObjectCache; -using VNLib.Plugins.Extensions.Loading; -using VNLib.Plugins.Essentials.Endpoints; -using VNLib.Plugins.Essentials.Extensions; - - -namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints -{ - internal sealed class ConnectEndpoint : ResourceEndpointBase - { - const int MAX_RECV_BUF_SIZE = 1000 * 1024; - const int MIN_RECV_BUF_SIZE = 8 * 1024; - const int MAX_HEAD_BUF_SIZE = 2048; - const int MIN_MESSAGE_SIZE = 10 * 1024; - const int MAX_MESSAGE_SIZE = 1000 * 1024; - const int MIN_HEAD_BUF_SIZE = 128; - const int MAX_EVENT_QUEUE_SIZE = 10000; - const int MAX_RESPONSE_BUFFER_SIZE = 10 * 1024; - - private static readonly TimeSpan AuthTokenExpiration = TimeSpan.FromSeconds(30); - - private readonly string AudienceLocalServerId; - private readonly ObjectCacheStore Store; - private readonly PluginBase Pbase; - - private readonly ConcurrentDictionary<string, AsyncQueue<ChangeEvent>> StatefulEventQueue; - - private uint _connectedClients; - - public uint ConnectedClients => _connectedClients; - - //Loosen up protection settings - protected override ProtectionSettings EndpointProtectionSettings { get; } = new() - { - DisableBrowsersOnly = true, - DisableSessionsRequired = true, - DisableCrossSiteDenied = true - }; - - public ConnectEndpoint(string path, ObjectCacheStore store, PluginBase pbase) - { - InitPathAndLog(path, pbase.Log); - Store = store;//Load client public key to verify signed messages - Pbase = pbase; - - StatefulEventQueue = new(StringComparer.OrdinalIgnoreCase); - - //Start the queue worker - _ = pbase.DeferTask(() => ChangeWorkerAsync(pbase.UnloadToken), 10); - - AudienceLocalServerId = Guid.NewGuid().ToString("N"); - } - - /* - * Used as a client negotiation and verification request - * - * The token created during this request will be verified by the client - * and is already verified by this server, will be passed back - * via the authorization header during the websocket upgrade. - * - * This server must verify the authenticity of the returned token - * - * The tokens are very short lived as requests are intended to be made - * directly after verification - */ - - protected override async ValueTask<VfReturnType> GetAsync(HttpEntity entity) - { - //Parse jwt from authoriation - string? jwtAuth = entity.Server.Headers[HttpRequestHeader.Authorization]; - if (string.IsNullOrWhiteSpace(jwtAuth)) - { - entity.CloseResponse(HttpStatusCode.Unauthorized); - return VfReturnType.VirtualSkip; - } - - string? nodeId = null; - string? challenge = null; - bool isPeer = false; - - // Parse jwt - using (JsonWebToken jwt = JsonWebToken.Parse(jwtAuth)) - { - bool verified = false; - - //Get the client public key certificate to verify the client's message - using(ReadOnlyJsonWebKey cert = await GetClientPubAsync()) - { - //verify signature for client - if (jwt.VerifyFromJwk(cert)) - { - verified = true; - } - //May be signed by a cahce server - else - { - using ReadOnlyJsonWebKey cacheCert = await GetCachePubAsync(); - - //Set peer and verified flag since the another cache server signed the request - isPeer = verified = jwt.VerifyFromJwk(cacheCert); - } - } - - //Check flag - if (!verified) - { - Log.Information("Client signature verification failed"); - entity.CloseResponse(HttpStatusCode.Unauthorized); - return VfReturnType.VirtualSkip; - } - - //Recover json body - using JsonDocument doc = jwt.GetPayload(); - if (doc.RootElement.TryGetProperty("sub", out JsonElement servIdEl)) - { - nodeId = servIdEl.GetString(); - } - if (doc.RootElement.TryGetProperty("chl", out JsonElement challengeEl)) - { - challenge = challengeEl.GetString(); - } - } - - Log.Debug("Received negotiation request from node {node}", nodeId); - //Verified, now we can create an auth message with a short expiration - using JsonWebToken auth = new(); - //Sign the auth message from the cache certificate's private key - using (ReadOnlyJsonWebKey cert = await GetCachePrivateKeyAsync()) - { - auth.WriteHeader(cert.JwtHeader); - auth.InitPayloadClaim() - .AddClaim("aud", AudienceLocalServerId) - .AddClaim("exp", DateTimeOffset.UtcNow.Add(AuthTokenExpiration).ToUnixTimeSeconds()) - .AddClaim("nonce", RandomHash.GetRandomBase32(8)) - .AddClaim("chl", challenge!) - //Set the ispeer flag if the request was signed by a cache server - .AddClaim("isPeer", isPeer) - //Specify the server's node id if set - .AddClaim("sub", nodeId!) - //Add negotiaion args - .AddClaim(FBMClient.REQ_HEAD_BUF_QUERY_ARG, MAX_HEAD_BUF_SIZE) - .AddClaim(FBMClient.REQ_RECV_BUF_QUERY_ARG, MAX_RECV_BUF_SIZE) - .AddClaim(FBMClient.REQ_MAX_MESS_QUERY_ARG, MAX_MESSAGE_SIZE) - .CommitClaims(); - - auth.SignFromJwk(cert); - } - - //Close response - entity.CloseResponse(HttpStatusCode.OK, ContentType.Text, auth.DataBuffer); - return VfReturnType.VirtualSkip; - } - - private async Task<ReadOnlyJsonWebKey> GetClientPubAsync() - { - return await Pbase.TryGetSecretAsync("client_public_key").ToJsonWebKey() ?? throw new KeyNotFoundException("Missing required secret : client_public_key"); - } - private async Task<ReadOnlyJsonWebKey> GetCachePubAsync() - { - return await Pbase.TryGetSecretAsync("cache_public_key").ToJsonWebKey() ?? throw new KeyNotFoundException("Missing required secret : client_public_key"); - } - private async Task<ReadOnlyJsonWebKey> GetCachePrivateKeyAsync() - { - return await Pbase.TryGetSecretAsync("cache_private_key").ToJsonWebKey() ?? throw new KeyNotFoundException("Missing required secret : client_public_key"); - } - - private async Task ChangeWorkerAsync(CancellationToken cancellation) - { - try - { - //Listen for changes - while (true) - { - ChangeEvent ev = await Store.EventQueue.DequeueAsync(cancellation); - //Add event to queues - foreach (AsyncQueue<ChangeEvent> queue in StatefulEventQueue.Values) - { - if (!queue.TryEnque(ev)) - { - Log.Debug("Listener queue has exeeded capacity, change events will be lost"); - } - } - } - } - catch (OperationCanceledException) - { } - catch (Exception ex) - { - Log.Error(ex); - } - } - - private class WsUserState - { - public int RecvBufferSize { get; init; } - public int MaxHeaderBufferSize { get; init; } - public int MaxMessageSize { get; init; } - public int MaxResponseBufferSize { get; init; } - public AsyncQueue<ChangeEvent>? SyncQueue { get; init; } - } - - protected override async ValueTask<VfReturnType> WebsocketRequestedAsync(HttpEntity entity) - { - try - { - //Parse jwt from authorization - string? jwtAuth = entity.Server.Headers[HttpRequestHeader.Authorization]; - if (string.IsNullOrWhiteSpace(jwtAuth)) - { - entity.CloseResponse(HttpStatusCode.Unauthorized); - return VfReturnType.VirtualSkip; - } - - string? nodeId = null; - //Parse jwt - using (JsonWebToken jwt = JsonWebToken.Parse(jwtAuth)) - { - //Get the client public key certificate to verify the client's message - using (ReadOnlyJsonWebKey cert = await GetCachePubAsync()) - { - //verify signature against the cache public key, since this server must have signed it - if (!jwt.VerifyFromJwk(cert)) - { - entity.CloseResponse(HttpStatusCode.Unauthorized); - return VfReturnType.VirtualSkip; - } - } - - //Recover json body - using JsonDocument doc = jwt.GetPayload(); - - //Verify audience, expiration - - if (!doc.RootElement.TryGetProperty("aud", out JsonElement audEl) || !AudienceLocalServerId.Equals(audEl.GetString(), StringComparison.OrdinalIgnoreCase)) - { - entity.CloseResponse(HttpStatusCode.Unauthorized); - return VfReturnType.VirtualSkip; - } - - if (!doc.RootElement.TryGetProperty("exp", out JsonElement expEl) - || DateTimeOffset.FromUnixTimeSeconds(expEl.GetInt64()) < DateTimeOffset.UtcNow) - { - entity.CloseResponse(HttpStatusCode.Unauthorized); - return VfReturnType.VirtualSkip; - } - - //Check if the client is a peer - bool isPeer = doc.RootElement.TryGetProperty("isPeer", out JsonElement isPeerEl) && isPeerEl.GetBoolean(); - - //The node id is optional and stored in the 'sub' field, ignore if the client is not a peer - if (isPeer && doc.RootElement.TryGetProperty("sub", out JsonElement servIdEl)) - { - nodeId = servIdEl.GetString(); - } - } - - //Get query config suggestions from the client - string recvBufCmd = entity.QueryArgs[FBMClient.REQ_RECV_BUF_QUERY_ARG]; - string maxHeaderCharCmd = entity.QueryArgs[FBMClient.REQ_HEAD_BUF_QUERY_ARG]; - string maxMessageSizeCmd = entity.QueryArgs[FBMClient.REQ_MAX_MESS_QUERY_ARG]; - - //Parse recv buffer size - int recvBufSize = int.TryParse(recvBufCmd, out int rbs) ? rbs : MIN_RECV_BUF_SIZE; - int maxHeadBufSize = int.TryParse(maxHeaderCharCmd, out int hbs) ? hbs : MIN_HEAD_BUF_SIZE; - int maxMessageSize = int.TryParse(maxMessageSizeCmd, out int mxs) ? mxs : MIN_MESSAGE_SIZE; - - AsyncQueue<ChangeEvent>? nodeQueue = null; - //The connection may be a caching server node, so get its node-id - if (!string.IsNullOrWhiteSpace(nodeId)) - { - /* - * Store a new async queue, or get an old queue for the current node - * - * We should use a bounded queue and disacard LRU items, we also know - * only a single writer is needed as the queue is processed on a single thread - * and change events may be processed on mutliple threads. - */ - - BoundedChannelOptions queueOptions = new(MAX_EVENT_QUEUE_SIZE) - { - AllowSynchronousContinuations = true, - SingleReader = false, - SingleWriter = true, - //Drop oldest item in queue if full - FullMode = BoundedChannelFullMode.DropOldest, - }; - - _ = StatefulEventQueue.TryAdd(nodeId, new(queueOptions)); - //Get the queue - nodeQueue = StatefulEventQueue[nodeId]; - } - - //Init new ws state object and clamp the suggested buffer sizes - WsUserState state = new() - { - RecvBufferSize = Math.Clamp(recvBufSize, MIN_RECV_BUF_SIZE, MAX_RECV_BUF_SIZE), - MaxHeaderBufferSize = Math.Clamp(maxHeadBufSize, MIN_HEAD_BUF_SIZE, MAX_HEAD_BUF_SIZE), - MaxMessageSize = Math.Clamp(maxMessageSize, MIN_MESSAGE_SIZE, MAX_MESSAGE_SIZE), - MaxResponseBufferSize = Math.Min(maxMessageSize, MAX_RESPONSE_BUFFER_SIZE), - SyncQueue = nodeQueue - }; - - Log.Debug("Client recv buffer suggestion {recv}, header buffer size {head}, response buffer size {r}", recvBufCmd, maxHeaderCharCmd, state.MaxResponseBufferSize); - - //Accept socket and pass state object - entity.AcceptWebSocket(WebsocketAcceptedAsync, state); - return VfReturnType.VirtualSkip; - } - catch (KeyNotFoundException) - { - return VfReturnType.BadRequest; - } - } - - private async Task WebsocketAcceptedAsync(WebSocketSession wss) - { - //Inc connected count - Interlocked.Increment(ref _connectedClients); - //Register plugin exit token to cancel the connected socket - CancellationTokenRegistration reg = Pbase.UnloadToken.Register(wss.CancelAll); - try - { - WsUserState state = (wss.UserState as WsUserState)!; - - //Init listener args from request - FBMListenerSessionParams args = new() - { - MaxMessageSize = state.MaxMessageSize, - RecvBufferSize = state.RecvBufferSize, - ResponseBufferSize = state.MaxResponseBufferSize, - MaxHeaderBufferSize = state.MaxHeaderBufferSize, - HeaderEncoding = Helpers.DefaultEncoding, - }; - - //Listen for requests - await Store.ListenAsync(wss, args, state.SyncQueue); - } - catch (OperationCanceledException) - { - Log.Debug("Websocket connection was canceled"); - //Disconnect the socket - await wss.CloseSocketOutputAsync(System.Net.WebSockets.WebSocketCloseStatus.NormalClosure, "unload", CancellationToken.None); - } - catch (Exception ex) - { - Log.Debug(ex); - } - finally - { - //Dec connected count - Interlocked.Decrement(ref _connectedClients); - //Unregister the - reg.Unregister(); - } - Log.Debug("Server websocket exited"); - } - } -} diff --git a/Plugins/SessionCacheServer/ObjectCacheServer.csproj b/Plugins/SessionCacheServer/ObjectCacheServer.csproj deleted file mode 100644 index ff239cc..0000000 --- a/Plugins/SessionCacheServer/ObjectCacheServer.csproj +++ /dev/null @@ -1,63 +0,0 @@ -<Project Sdk="Microsoft.NET.Sdk"> - <PropertyGroup> - <TargetFramework>net6.0</TargetFramework> - <Nullable>enable</Nullable> - <Authors>Vaughn Nugent</Authors> - <Version>1.0.1.1</Version> - <RootNamespace>VNLib.Plugins.Essentials.Sessions.Server</RootNamespace> - <Copyright>Copyright © 2022 Vaughn Nugent</Copyright> - <SignAssembly>True</SignAssembly> - <AssemblyOriginatorKeyFile>\\vaughnnugent.com\Internal\Folder Redirection\vman\Documents\Programming\Software\StrongNameingKey.snk</AssemblyOriginatorKeyFile> - </PropertyGroup> - - <!-- Resolve nuget dll files and store them in the output dir --> - <PropertyGroup> - <EnableDynamicLoading>true</EnableDynamicLoading> - <GenerateDocumentationFile>False</GenerateDocumentationFile> - <PackageProjectUrl>https://www.vaughnnugent.com/resources</PackageProjectUrl> - <AnalysisLevel>latest-all</AnalysisLevel> - - </PropertyGroup> - <ItemGroup> - <Compile Remove="liveplugin2\**" /> - <Compile Remove="liveplugin\**" /> - <EmbeddedResource Remove="liveplugin2\**" /> - <EmbeddedResource Remove="liveplugin\**" /> - <None Remove="liveplugin2\**" /> - <None Remove="liveplugin\**" /> - </ItemGroup> - <ItemGroup> - <PackageReference Include="ErrorProne.NET.CoreAnalyzers" Version="0.1.2"> - <PrivateAssets>all</PrivateAssets> - <IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets> - </PackageReference> - <PackageReference Include="ErrorProne.NET.Structs" Version="0.1.2"> - <PrivateAssets>all</PrivateAssets> - <IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets> - </PackageReference> - </ItemGroup> - <ItemGroup> - <ProjectReference Include="..\..\..\..\VNLib\Essentials\src\VNLib.Plugins.Essentials.csproj" /> - <ProjectReference Include="..\..\..\DataCaching\VNLib.Data.Caching.Extensions\VNLib.Data.Caching.Extensions.csproj" /> - <ProjectReference Include="..\..\..\DataCaching\VNLib.Data.Caching.ObjectCache\VNLib.Data.Caching.ObjectCache.csproj" /> - <ProjectReference Include="..\..\..\PluginBase\VNLib.Plugins.PluginBase.csproj" /> - <ProjectReference Include="..\CacheBroker\CacheBroker.csproj" /> - </ItemGroup> - - - <ItemGroup> - <None Update="ObjectCacheServer.json"> - <CopyToOutputDirectory>Always</CopyToOutputDirectory> - </None> - </ItemGroup> - - <Target Name="PostBuild" AfterTargets="PostBuildEvent"> - - <Exec Command="erase "\vaughnnugent.com\Internal\Folder Redirection\vman\Documents\Programming\Software\VNLib\Web Plugins/$(TargetName)" /q > nul
start xcopy "$(TargetDir)" "$(ProjectDir)/liveplugin/$(TargetName)" /E /Y /R
start xcopy "$(TargetDir)" "\\vaughnnugent.com\Internal\Folder Redirection\vman\Documents\Programming\Software\VNLib\Web Plugins/$(TargetName)" /E /Y /R" /> - </Target> - - <Target Name="PostBuild" AfterTargets="PostBuildEvent"> - <Exec Command="erase "F:\Programming\Web Plugins\DevPlugins\$(TargetName)" /q > nul" /> - </Target> - -</Project> diff --git a/Plugins/SessionCacheServer/ObjectCacheServerEntry.cs b/Plugins/SessionCacheServer/ObjectCacheServerEntry.cs deleted file mode 100644 index 85a7996..0000000 --- a/Plugins/SessionCacheServer/ObjectCacheServerEntry.cs +++ /dev/null @@ -1,567 +0,0 @@ -/* -* Copyright (c) 2022 Vaughn Nugent -* -* Library: VNLib -* Package: ObjectCacheServer -* File: ObjectCacheServerEntry.cs -* -* ObjectCacheServerEntry.cs is part of ObjectCacheServer which is part of the larger -* VNLib collection of libraries and utilities. -* -* ObjectCacheServer is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License as -* published by the Free Software Foundation, either version 3 of the -* License, or (at your option) any later version. -* -* ObjectCacheServer is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see https://www.gnu.org/licenses/. -*/ - -using System; -using System.IO; -using System.Net; -using System.Linq; -using System.Net.Http; -using System.Text.Json; -using System.Threading; -using System.Net.Sockets; -using System.Threading.Tasks; -using System.Collections.Generic; -using System.Security.Cryptography; - -using VNLib.Utils.Memory; -using VNLib.Utils.Logging; -using VNLib.Utils.Extensions; -using VNLib.Hashing; -using VNLib.Hashing.IdentityUtility; -using VNLib.Data.Caching; -using VNLib.Data.Caching.Extensions; -using VNLib.Data.Caching.ObjectCache; -using static VNLib.Data.Caching.Constants; -using VNLib.Net.Messaging.FBM; -using VNLib.Net.Messaging.FBM.Client; -using VNLib.Plugins.Cache.Broker.Endpoints; -using VNLib.Plugins.Extensions.Loading; -using VNLib.Plugins.Extensions.Loading.Routing; -using VNLib.Plugins.Essentials.Sessions.Server.Endpoints; - - -namespace VNLib.Plugins.Essentials.Sessions.Server -{ - public sealed class ObjectCacheServerEntry : PluginBase - { - public override string PluginName => "ObjectCache.Service"; - - private string? BrokerHeartBeatToken; - - private readonly object ServerLock = new(); - private readonly HashSet<ActiveServer> ListeningServers = new(); - - - private void RemoveServer(ActiveServer server) - { - lock (ServerLock) - { - ListeningServers.Remove(server); - } - } - - protected override void OnLoad() - { - //Create default heap - IUnmangedHeap CacheHeap = Memory.InitializeNewHeapForProcess(); - try - { - IReadOnlyDictionary<string, JsonElement> clusterConf = this.GetConfig("cluster"); - - string brokerAddress = clusterConf["broker_address"].GetString() ?? throw new KeyNotFoundException("Missing required key 'broker_address' for config 'cluster'"); - - string swapDir = PluginConfig.GetProperty("swap_dir").GetString() ?? throw new KeyNotFoundException("Missing required key 'swap_dir' for config"); - int cacheSize = PluginConfig.GetProperty("max_cache").GetInt32(); - string connectPath = PluginConfig.GetProperty("connect_path").GetString() ?? throw new KeyNotFoundException("Missing required element 'connect_path' for config 'cluster'"); - //TimeSpan cleanupInterval = PluginConfig.GetProperty("cleanup_interval_sec").GetTimeSpan(TimeParseType.Seconds); - //TimeSpan validFor = PluginConfig.GetProperty("valid_for_sec").GetTimeSpan(TimeParseType.Seconds); - int maxMessageSize = PluginConfig.GetProperty("max_blob_size").GetInt32(); - - //Init dir - DirectoryInfo dir = new(swapDir); - dir.Create(); - //Init cache listener, single threaded reader - ObjectCacheStore CacheListener = new(dir, cacheSize, Log, CacheHeap, true); - - //Init connect endpoint - { - //Init connect endpoint - ConnectEndpoint endpoint = new(connectPath, CacheListener, this); - Route(endpoint); - } - - //Setup broker and regitration - { - //init mre to pass the broker heartbeat signal to the registration worker - ManualResetEvent mre = new(false); - //Route the broker endpoint - BrokerHeartBeat brokerEp = new(() => BrokerHeartBeatToken!, mre, new Uri(brokerAddress), this); - Route(brokerEp); - - //start registration - _ = this.DeferTask(() => RegisterServerAsync(mre), 200); - } - - //Setup cluster worker - { - //Get pre-configured fbm client config for caching - FBMClientConfig conf = FBMDataCacheExtensions.GetDefaultConfig(CacheHeap, maxMessageSize, this.IsDebug() ? Log : null); - - //Start Client runner - _ = this.DeferTask(() => RunClientAsync(CacheListener, new Uri(brokerAddress), conf), 300); - } - - //Load a cache broker to the current server if the config is defined - { - if(this.HasConfigForType<BrokerRegistrationEndpoint>()) - { - this.Route<BrokerRegistrationEndpoint>(); - } - } - - void Cleanup() - { - CacheHeap.Dispose(); - CacheListener.Dispose(); - } - - //Regsiter cleanup - _ = UnloadToken.RegisterUnobserved(Cleanup); - - Log.Information("Plugin loaded"); - } - catch (KeyNotFoundException kne) - { - CacheHeap.Dispose(); - Log.Error("Missing required configuration variables {m}", kne.Message); - } - catch - { - CacheHeap.Dispose(); - throw; - } - } - - protected override void OnUnLoad() - { - Log.Information("Plugin unloaded"); - } - - #region Registration - - private async Task RegisterServerAsync(ManualResetEvent keepaliveWait) - { - try - { - //Get the broker config element - IReadOnlyDictionary<string, JsonElement> clusterConfig = this.GetConfig("cluster"); - - //Server id is just dns name for now - string serverId = Dns.GetHostName(); - int heartBeatDelayMs = clusterConfig["heartbeat_timeout_sec"].GetInt32() * 1000; - - string? connectPath = PluginConfig.GetProperty("connect_path").GetString(); - - //Get the port of the primary webserver - int port; - bool usingTls; - { - JsonElement firstHost = HostConfig.GetProperty("virtual_hosts").EnumerateArray().First(); - - port = firstHost.GetProperty("interface") - .GetProperty("port") - .GetInt32(); - - //If a certificate is specified, tls is enabled on the port - usingTls = firstHost.TryGetProperty("cert", out _); - } - - using BrokerRegistrationRequest request = new(); - { - string addr = clusterConfig["broker_address"].GetString() ?? throw new KeyNotFoundException("Missing required key 'broker_address' for config 'cluster'"); - - //Recover the certificate - ReadOnlyJsonWebKey cacheCert = await GetCachePrivate(); - - //Init url builder for payload, see if tls is enabled - Uri connectAddress = new UriBuilder(usingTls ? Uri.UriSchemeHttps : Uri.UriSchemeHttp, Dns.GetHostName(), port, connectPath).Uri; - - request.WithBroker(new(addr)) - .WithRegistrationAddress(connectAddress.ToString()) - .WithNodeId(serverId) - .WithSigningKey(cacheCert, true); - } - - while (true) - { - try - { - //Gen a random reg token before registering - BrokerHeartBeatToken = RandomHash.GetRandomHex(32); - //Assign new hb token - request.WithHeartbeatToken(BrokerHeartBeatToken); - - Log.Information("Registering with cache broker {addr}, with node-id {id}", request.BrokerAddress, serverId); - - //Register with the broker - await FBMDataCacheExtensions.ResgisterWithBrokerAsync(request); - - Log.Debug("Successfully registered with cache broker"); - - /* - * Wait in a loop for the broker to send a keepalive - * request with the specified token. When the event - * is signaled the task will be completed - */ - while (true) - { - await Task.Delay(heartBeatDelayMs, UnloadToken); - //Set the timeout to 0 to it will just check the status without blocking - if (!keepaliveWait.WaitOne(0)) - { - //server miseed a keepalive event, time to break the loop and retry - Log.Debug("Broker missed a heartbeat request, attempting to re-register"); - break; - } - //Reset the msr - keepaliveWait.Reset(); - } - } - catch (TaskCanceledException) - { - throw; - } - catch (TimeoutException) - { - Log.Warn("Failed to connect to cache broker server within the specified timeout period"); - } - catch (HttpRequestException re) when (re.InnerException is SocketException) - { - Log.Warn("Cache broker is unavailable or network is unavailable"); - } - catch (Exception ex) - { - Log.Warn(ex, "Failed to update broker registration"); - } - - //Gen random ms delay - int randomMsDelay = RandomNumberGenerator.GetInt32(500, 2000); - //Delay - await Task.Delay(randomMsDelay, UnloadToken); - } - } - catch (KeyNotFoundException kne) - { - Log.Error("Missing required broker configuration variables {ke}", kne.Message); - } - catch (TaskCanceledException) - { - //Normal unload/exit - } - catch (Exception ex) - { - Log.Error(ex); - } - finally - { - keepaliveWait.Dispose(); - BrokerHeartBeatToken = null; - } - Log.Debug("Registration worker exited"); - } - - #endregion - - #region Cluster - - private async Task<ReadOnlyJsonWebKey> GetCachePrivate() - { - return await this.TryGetSecretAsync("cache_private_key").ToJsonWebKey() ?? throw new KeyNotFoundException("Failed to load the cache private key"); - } - - private async Task<ReadOnlyJsonWebKey> GetBrokerPublic() - { - return await this.TryGetSecretAsync("broker_public_key").ToJsonWebKey() ?? throw new KeyNotFoundException("Failed to load the broker's public key"); - } - - - /// <summary> - /// Starts a self-contained process-long task to discover other cache servers - /// from a shared broker server - /// </summary> - /// <param name="cacheStore">The cache store to synchronize</param> - /// <param name="brokerAddress">The broker server's address</param> - /// <param name="serverId">The node-id of the current server</param> - /// <param name="clientConf">The configuration to use when initializing synchronization clients</param> - /// <returns>A task that resolves when the plugin unloads</returns> - private async Task RunClientAsync(ObjectCacheStore cacheStore, Uri brokerAddress, FBMClientConfig clientConf) - { - TimeSpan noServerDelay = TimeSpan.FromSeconds(10); - string nodeId = Dns.GetHostName(); - ListServerRequest listRequest = new(brokerAddress); - try - { - //Get the broker config element - IReadOnlyDictionary<string, JsonElement> clusterConf = this.GetConfig("cluster"); - int serverCheckMs = clusterConf["update_interval_sec"].GetInt32() * 1000; - - //Setup signing and verification certificates - ReadOnlyJsonWebKey cacheSig = await GetCachePrivate(); - ReadOnlyJsonWebKey brokerPub = await GetBrokerPublic(); - //Import certificates - listRequest.WithVerificationKey(brokerPub) - .WithSigningKey(cacheSig); - - //Main event loop - Log.Information("Begining cluster node discovery"); - - ILogProvider? debugLog = this.IsDebug() ? Log : null; - - while (true) - { - //Load the server list - ActiveServer[]? servers; - while (true) - { - try - { - debugLog?.Information("[CACHE] Requesting server list from broker"); - - //Get server list - servers = await FBMDataCacheExtensions.ListServersAsync(listRequest, UnloadToken); - //Servers are loaded, so continue - break; - } - catch(HttpRequestException he) when(he.InnerException is SocketException) - { - Log.Warn("Failed to connect to cache broker, trying again"); - } - catch (TimeoutException) - { - Log.Warn("Failed to connect to cache broker server within the specified timeout period"); - } - catch (Exception ex) - { - Log.Warn(ex, "Failed to get server list from broker"); - } - //Gen random ms delay - int randomMsDelay = RandomNumberGenerator.GetInt32(1000, 2000); - //Delay - await Task.Delay(randomMsDelay, UnloadToken); - } - - if(servers == null || servers.Length == 0) - { - Log.Information("No cluster nodes found, retrying"); - //Delay - await Task.Delay(noServerDelay, UnloadToken); - continue; - } - - - //Lock on sever set while enumerating - lock (ServerLock) - { - //Select servers that are not the current server and are not already being monitored - IEnumerable<ActiveServer> serversToConnectTo = servers.Where(s => !nodeId.Equals(s.ServerId, StringComparison.OrdinalIgnoreCase)); - - //Connect to servers - foreach (ActiveServer server in serversToConnectTo) - { - //Make sure were not currently connected to the server - if (!ListeningServers.Contains(server)) - { - //Add the server to the set - ListeningServers.Add(server); - - //Run listener background task - _ = this.DeferTask(() => RunSyncTaskAsync(server, cacheStore, clientConf, nodeId)); - } - } - } - - //Delay until next check cycle - await Task.Delay(serverCheckMs, UnloadToken); - } - } - catch (FileNotFoundException) - { - Log.Error("Client/cluster private cluster key file was not found or could not be read"); - } - catch (KeyNotFoundException) - { - Log.Error("Missing required cluster configuration varables"); - } - catch (TaskCanceledException) - { - } - catch (Exception ex) - { - Log.Error(ex); - } - finally - { - listRequest.Dispose(); - } - Log.Debug("Cluster sync worker exited"); - } - - private async Task RunSyncTaskAsync(ActiveServer server, ObjectCacheStore cacheStore, FBMClientConfig conf, string nodeId) - { - //Setup client - FBMClient client = new(conf); - try - { - async Task UpdateRecordAsync(string objectId, string newId) - { - //Get request message - FBMRequest modRequest = client.RentRequest(); - try - { - //Set action as get/create - modRequest.WriteHeader(HeaderCommand.Action, Actions.Get); - //Set session-id header - modRequest.WriteHeader(ObjectId, string.IsNullOrWhiteSpace(newId) ? objectId : newId); - - //Make request - using FBMResponse response = await client.SendAsync(modRequest, UnloadToken); - - response.ThrowIfNotSet(); - - //Check response code - string status = response.Headers.First(static s => s.Key == HeaderCommand.Status).Value.ToString(); - if (ResponseCodes.Okay.Equals(status, StringComparison.Ordinal)) - { - //Update the record - await cacheStore.AddOrUpdateBlobAsync(objectId, newId, static (t) => t.ResponseBody, response); - Log.Debug("Updated object {id}", objectId); - } - else - { - Log.Warn("Object {id} was missing on the remote server", objectId); - } - } - finally - { - client.ReturnRequest(modRequest); - } - } - - { - //Sign and verify requests with the cache private key since we are a peer - ReadOnlyJsonWebKey cachePriv = await GetCachePrivate(); - - //Configure cache - client.GetCacheConfiguration() - .WithVerificationKey(cachePriv) - .WithSigningCertificate(cachePriv) - .WithNodeId(nodeId) //set nodeid since were listening for changes - .WithTls(false); - } - - Log.Information("Connecting to {server}...", server.ServerId); - - //Connect to the server - await client.ConnectToCacheAsync(server, UnloadToken); - - //Wroker task callback method - async Task BgWorkerAsync() - { - //Listen for changes - while (true) - { - //Wait for changes - WaitForChangeResult changedObject = await client.WaitForChangeAsync(UnloadToken); - - Log.Debug("Object changed {typ} {obj}", changedObject.Status, changedObject.CurrentId); - - switch (changedObject.Status) - { - case ResponseCodes.NotFound: - Log.Warn("Server cache not properly configured, worker exiting"); - return; - case "deleted": - //Delete the object from the store - _ = cacheStore.DeleteItemAsync(changedObject.CurrentId).ConfigureAwait(false); - break; - case "modified": - //Reload the record from the store - await UpdateRecordAsync(changedObject.CurrentId, changedObject.NewId); - break; - } - } - } - - Log.Information("Connected to {server}, starting queue listeners", server.ServerId); - - //Start worker tasks - List<Task> workerTasks = new(); - for(int i = 0; i < Environment.ProcessorCount; i++) - { - workerTasks.Add(Task.Run(BgWorkerAsync)); - } - - //Wait for sync workers to exit - await Task.WhenAll(workerTasks); - } - catch (InvalidResponseException ie) - { - //See if the plugin is unloading - if (!UnloadToken.IsCancellationRequested) - { - Log.Debug("Server responded with invalid response packet, disconnected. reason {reason}", ie); - } - //Disconnect client gracefully - try - { - await client.DisconnectAsync(); - } - catch (Exception ex) - { - Log.Error(ex); - } - } - catch (OperationCanceledException) - { - //Plugin unloading, Try to disconnect - try - { - await client.DisconnectAsync(); - } - catch(Exception ex) - { - Log.Error(ex); - } - } - catch(Exception ex) - { - Log.Warn("Lost connection to server {h}, {m}", server.ServerId, ex); - } - finally - { - //Remove server from active list, since its been disconnected - RemoveServer(server); - client.Dispose(); - } - } - - protected override void ProcessHostCommand(string cmd) - { - Log.Debug(cmd); - } - - - #endregion - } -} |