aboutsummaryrefslogtreecommitdiff
path: root/plugins/ObjectCacheServer/src
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/ObjectCacheServer/src')
-rw-r--r--plugins/ObjectCacheServer/src/Endpoints/BrokerHeartBeat.cs130
-rw-r--r--plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs399
-rw-r--r--plugins/ObjectCacheServer/src/ObjectCacheServer.csproj51
-rw-r--r--plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs567
4 files changed, 1147 insertions, 0 deletions
diff --git a/plugins/ObjectCacheServer/src/Endpoints/BrokerHeartBeat.cs b/plugins/ObjectCacheServer/src/Endpoints/BrokerHeartBeat.cs
new file mode 100644
index 0000000..bd1233e
--- /dev/null
+++ b/plugins/ObjectCacheServer/src/Endpoints/BrokerHeartBeat.cs
@@ -0,0 +1,130 @@
+/*
+* Copyright (c) 2022 Vaughn Nugent
+*
+* Library: VNLib
+* Package: ObjectCacheServer
+* File: BrokerHeartBeat.cs
+*
+* BrokerHeartBeat.cs is part of ObjectCacheServer which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* ObjectCacheServer is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* ObjectCacheServer is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+using System.Net;
+using System.Linq;
+using System.Text.Json;
+using System.Threading;
+using System.Threading.Tasks;
+using System.Collections.Generic;
+
+using VNLib.Hashing.IdentityUtility;
+using VNLib.Plugins.Essentials.Endpoints;
+using VNLib.Plugins.Essentials.Extensions;
+using VNLib.Plugins.Extensions.Loading;
+
+namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints
+{
+ internal sealed class BrokerHeartBeat : ResourceEndpointBase
+ {
+ public override string Path => "/heartbeat";
+
+ private readonly Func<string> Token;
+ private readonly ManualResetEvent KeepaliveSet;
+ private readonly Task<IPAddress[]> BrokerIpList;
+ private readonly PluginBase Pbase;
+
+ protected override ProtectionSettings EndpointProtectionSettings { get; } = new()
+ {
+ DisableBrowsersOnly = true,
+ DisableSessionsRequired = true,
+ DisableVerifySessionCors = true
+ };
+
+ public BrokerHeartBeat(Func<string> token, ManualResetEvent keepaliveSet, Uri brokerUri, PluginBase pbase)
+ {
+ Token = token;
+ KeepaliveSet = keepaliveSet;
+ BrokerIpList = Dns.GetHostAddressesAsync(brokerUri.DnsSafeHost);
+
+ this.Pbase = pbase;
+ }
+
+ private async Task<ReadOnlyJsonWebKey> GetBrokerPubAsync()
+ {
+ return await Pbase.TryGetSecretAsync("broker_public_key").ToJsonWebKey() ?? throw new KeyNotFoundException("Missing required secret : broker_public_key");
+ }
+
+ protected override async ValueTask<VfReturnType> GetAsync(HttpEntity entity)
+ {
+ //If-not loopback then verify server address
+ if (!entity.Server.IsLoopBack())
+ {
+ //Load and verify the broker's ip address matches with an address we have stored
+ IPAddress[] addresses = await BrokerIpList;
+ if (!addresses.Contains(entity.TrustedRemoteIp))
+ {
+ //Token invalid
+ entity.CloseResponse(HttpStatusCode.Forbidden);
+ return VfReturnType.VirtualSkip;
+ }
+ }
+ //Get the authorization jwt
+ string? jwtAuth = entity.Server.Headers[HttpRequestHeader.Authorization];
+
+ if (string.IsNullOrWhiteSpace(jwtAuth))
+ {
+ //Token invalid
+ entity.CloseResponse(HttpStatusCode.Forbidden);
+ return VfReturnType.VirtualSkip;
+ }
+
+ //Parse the jwt
+ using JsonWebToken jwt = JsonWebToken.Parse(jwtAuth);
+
+ //Verify the jwt using the broker's public key certificate
+ using (ReadOnlyJsonWebKey cert = await GetBrokerPubAsync())
+ {
+ //Verify the jwt
+ if (!jwt.VerifyFromJwk(cert))
+ {
+ //Token invalid
+ entity.CloseResponse(HttpStatusCode.Forbidden);
+ return VfReturnType.VirtualSkip;
+ }
+ }
+
+ string? auth;
+ //Recover the auth token from the jwt
+ using (JsonDocument doc = jwt.GetPayload())
+ {
+ auth = doc.RootElement.GetProperty("token").GetString();
+ }
+
+ //Verify token
+ if(Token().Equals(auth, StringComparison.Ordinal))
+ {
+ //Signal keepalive
+ KeepaliveSet.Set();
+ entity.CloseResponse(HttpStatusCode.OK);
+ return VfReturnType.VirtualSkip;
+ }
+
+ //Token invalid
+ entity.CloseResponse(HttpStatusCode.Forbidden);
+ return VfReturnType.VirtualSkip;
+ }
+ }
+}
diff --git a/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs b/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs
new file mode 100644
index 0000000..2fe0994
--- /dev/null
+++ b/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs
@@ -0,0 +1,399 @@
+/*
+* Copyright (c) 2022 Vaughn Nugent
+*
+* Library: VNLib
+* Package: ObjectCacheServer
+* File: ConnectEndpoint.cs
+*
+* ConnectEndpoint.cs is part of ObjectCacheServer which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* ObjectCacheServer is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* ObjectCacheServer is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+using System.Net;
+using System.Text.Json;
+using System.Threading;
+using System.Threading.Tasks;
+using System.Threading.Channels;
+using System.Collections.Generic;
+using System.Collections.Concurrent;
+
+using VNLib.Net.Http;
+using VNLib.Hashing;
+using VNLib.Utils.Async;
+using VNLib.Utils.Logging;
+using VNLib.Hashing.IdentityUtility;
+using VNLib.Net.Messaging.FBM;
+using VNLib.Net.Messaging.FBM.Client;
+using VNLib.Net.Messaging.FBM.Server;
+using VNLib.Data.Caching.ObjectCache;
+using VNLib.Plugins.Extensions.Loading;
+using VNLib.Plugins.Essentials.Endpoints;
+using VNLib.Plugins.Essentials.Extensions;
+
+
+namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints
+{
+ internal sealed class ConnectEndpoint : ResourceEndpointBase
+ {
+ const int MAX_RECV_BUF_SIZE = 1000 * 1024;
+ const int MIN_RECV_BUF_SIZE = 8 * 1024;
+ const int MAX_HEAD_BUF_SIZE = 2048;
+ const int MIN_MESSAGE_SIZE = 10 * 1024;
+ const int MAX_MESSAGE_SIZE = 1000 * 1024;
+ const int MIN_HEAD_BUF_SIZE = 128;
+ const int MAX_EVENT_QUEUE_SIZE = 10000;
+ const int MAX_RESPONSE_BUFFER_SIZE = 10 * 1024;
+
+ private static readonly TimeSpan AuthTokenExpiration = TimeSpan.FromSeconds(30);
+
+ private readonly string AudienceLocalServerId;
+ private readonly ObjectCacheStore Store;
+ private readonly PluginBase Pbase;
+
+ private readonly ConcurrentDictionary<string, AsyncQueue<ChangeEvent>> StatefulEventQueue;
+
+ private uint _connectedClients;
+
+ public uint ConnectedClients => _connectedClients;
+
+ //Loosen up protection settings
+ protected override ProtectionSettings EndpointProtectionSettings { get; } = new()
+ {
+ DisableBrowsersOnly = true,
+ DisableSessionsRequired = true,
+ DisableCrossSiteDenied = true
+ };
+
+ public ConnectEndpoint(string path, ObjectCacheStore store, PluginBase pbase)
+ {
+ InitPathAndLog(path, pbase.Log);
+ Store = store;//Load client public key to verify signed messages
+ Pbase = pbase;
+
+ StatefulEventQueue = new(StringComparer.OrdinalIgnoreCase);
+
+ //Start the queue worker
+ _ = pbase.DeferTask(() => ChangeWorkerAsync(pbase.UnloadToken), 10);
+
+ AudienceLocalServerId = Guid.NewGuid().ToString("N");
+ }
+
+ /*
+ * Used as a client negotiation and verification request
+ *
+ * The token created during this request will be verified by the client
+ * and is already verified by this server, will be passed back
+ * via the authorization header during the websocket upgrade.
+ *
+ * This server must verify the authenticity of the returned token
+ *
+ * The tokens are very short lived as requests are intended to be made
+ * directly after verification
+ */
+
+ protected override async ValueTask<VfReturnType> GetAsync(HttpEntity entity)
+ {
+ //Parse jwt from authoriation
+ string? jwtAuth = entity.Server.Headers[HttpRequestHeader.Authorization];
+ if (string.IsNullOrWhiteSpace(jwtAuth))
+ {
+ entity.CloseResponse(HttpStatusCode.Unauthorized);
+ return VfReturnType.VirtualSkip;
+ }
+
+ string? nodeId = null;
+ string? challenge = null;
+ bool isPeer = false;
+
+ // Parse jwt
+ using (JsonWebToken jwt = JsonWebToken.Parse(jwtAuth))
+ {
+ bool verified = false;
+
+ //Get the client public key certificate to verify the client's message
+ using(ReadOnlyJsonWebKey cert = await GetClientPubAsync())
+ {
+ //verify signature for client
+ if (jwt.VerifyFromJwk(cert))
+ {
+ verified = true;
+ }
+ //May be signed by a cahce server
+ else
+ {
+ using ReadOnlyJsonWebKey cacheCert = await GetCachePubAsync();
+
+ //Set peer and verified flag since the another cache server signed the request
+ isPeer = verified = jwt.VerifyFromJwk(cacheCert);
+ }
+ }
+
+ //Check flag
+ if (!verified)
+ {
+ Log.Information("Client signature verification failed");
+ entity.CloseResponse(HttpStatusCode.Unauthorized);
+ return VfReturnType.VirtualSkip;
+ }
+
+ //Recover json body
+ using JsonDocument doc = jwt.GetPayload();
+ if (doc.RootElement.TryGetProperty("sub", out JsonElement servIdEl))
+ {
+ nodeId = servIdEl.GetString();
+ }
+ if (doc.RootElement.TryGetProperty("chl", out JsonElement challengeEl))
+ {
+ challenge = challengeEl.GetString();
+ }
+ }
+
+ Log.Debug("Received negotiation request from node {node}", nodeId);
+ //Verified, now we can create an auth message with a short expiration
+ using JsonWebToken auth = new();
+ //Sign the auth message from the cache certificate's private key
+ using (ReadOnlyJsonWebKey cert = await GetCachePrivateKeyAsync())
+ {
+ auth.WriteHeader(cert.JwtHeader);
+ auth.InitPayloadClaim()
+ .AddClaim("aud", AudienceLocalServerId)
+ .AddClaim("exp", DateTimeOffset.UtcNow.Add(AuthTokenExpiration).ToUnixTimeSeconds())
+ .AddClaim("nonce", RandomHash.GetRandomBase32(8))
+ .AddClaim("chl", challenge!)
+ //Set the ispeer flag if the request was signed by a cache server
+ .AddClaim("isPeer", isPeer)
+ //Specify the server's node id if set
+ .AddClaim("sub", nodeId!)
+ //Add negotiaion args
+ .AddClaim(FBMClient.REQ_HEAD_BUF_QUERY_ARG, MAX_HEAD_BUF_SIZE)
+ .AddClaim(FBMClient.REQ_RECV_BUF_QUERY_ARG, MAX_RECV_BUF_SIZE)
+ .AddClaim(FBMClient.REQ_MAX_MESS_QUERY_ARG, MAX_MESSAGE_SIZE)
+ .CommitClaims();
+
+ auth.SignFromJwk(cert);
+ }
+
+ //Close response
+ entity.CloseResponse(HttpStatusCode.OK, ContentType.Text, auth.DataBuffer);
+ return VfReturnType.VirtualSkip;
+ }
+
+ private async Task<ReadOnlyJsonWebKey> GetClientPubAsync()
+ {
+ return await Pbase.TryGetSecretAsync("client_public_key").ToJsonWebKey() ?? throw new KeyNotFoundException("Missing required secret : client_public_key");
+ }
+ private async Task<ReadOnlyJsonWebKey> GetCachePubAsync()
+ {
+ return await Pbase.TryGetSecretAsync("cache_public_key").ToJsonWebKey() ?? throw new KeyNotFoundException("Missing required secret : client_public_key");
+ }
+ private async Task<ReadOnlyJsonWebKey> GetCachePrivateKeyAsync()
+ {
+ return await Pbase.TryGetSecretAsync("cache_private_key").ToJsonWebKey() ?? throw new KeyNotFoundException("Missing required secret : client_public_key");
+ }
+
+ private async Task ChangeWorkerAsync(CancellationToken cancellation)
+ {
+ try
+ {
+ //Listen for changes
+ while (true)
+ {
+ ChangeEvent ev = await Store.EventQueue.DequeueAsync(cancellation);
+ //Add event to queues
+ foreach (AsyncQueue<ChangeEvent> queue in StatefulEventQueue.Values)
+ {
+ if (!queue.TryEnque(ev))
+ {
+ Log.Debug("Listener queue has exeeded capacity, change events will be lost");
+ }
+ }
+ }
+ }
+ catch (OperationCanceledException)
+ { }
+ catch (Exception ex)
+ {
+ Log.Error(ex);
+ }
+ }
+
+ private class WsUserState
+ {
+ public int RecvBufferSize { get; init; }
+ public int MaxHeaderBufferSize { get; init; }
+ public int MaxMessageSize { get; init; }
+ public int MaxResponseBufferSize { get; init; }
+ public AsyncQueue<ChangeEvent>? SyncQueue { get; init; }
+ }
+
+ protected override async ValueTask<VfReturnType> WebsocketRequestedAsync(HttpEntity entity)
+ {
+ try
+ {
+ //Parse jwt from authorization
+ string? jwtAuth = entity.Server.Headers[HttpRequestHeader.Authorization];
+ if (string.IsNullOrWhiteSpace(jwtAuth))
+ {
+ entity.CloseResponse(HttpStatusCode.Unauthorized);
+ return VfReturnType.VirtualSkip;
+ }
+
+ string? nodeId = null;
+ //Parse jwt
+ using (JsonWebToken jwt = JsonWebToken.Parse(jwtAuth))
+ {
+ //Get the client public key certificate to verify the client's message
+ using (ReadOnlyJsonWebKey cert = await GetCachePubAsync())
+ {
+ //verify signature against the cache public key, since this server must have signed it
+ if (!jwt.VerifyFromJwk(cert))
+ {
+ entity.CloseResponse(HttpStatusCode.Unauthorized);
+ return VfReturnType.VirtualSkip;
+ }
+ }
+
+ //Recover json body
+ using JsonDocument doc = jwt.GetPayload();
+
+ //Verify audience, expiration
+
+ if (!doc.RootElement.TryGetProperty("aud", out JsonElement audEl) || !AudienceLocalServerId.Equals(audEl.GetString(), StringComparison.OrdinalIgnoreCase))
+ {
+ entity.CloseResponse(HttpStatusCode.Unauthorized);
+ return VfReturnType.VirtualSkip;
+ }
+
+ if (!doc.RootElement.TryGetProperty("exp", out JsonElement expEl)
+ || DateTimeOffset.FromUnixTimeSeconds(expEl.GetInt64()) < DateTimeOffset.UtcNow)
+ {
+ entity.CloseResponse(HttpStatusCode.Unauthorized);
+ return VfReturnType.VirtualSkip;
+ }
+
+ //Check if the client is a peer
+ bool isPeer = doc.RootElement.TryGetProperty("isPeer", out JsonElement isPeerEl) && isPeerEl.GetBoolean();
+
+ //The node id is optional and stored in the 'sub' field, ignore if the client is not a peer
+ if (isPeer && doc.RootElement.TryGetProperty("sub", out JsonElement servIdEl))
+ {
+ nodeId = servIdEl.GetString();
+ }
+ }
+
+ //Get query config suggestions from the client
+ string recvBufCmd = entity.QueryArgs[FBMClient.REQ_RECV_BUF_QUERY_ARG];
+ string maxHeaderCharCmd = entity.QueryArgs[FBMClient.REQ_HEAD_BUF_QUERY_ARG];
+ string maxMessageSizeCmd = entity.QueryArgs[FBMClient.REQ_MAX_MESS_QUERY_ARG];
+
+ //Parse recv buffer size
+ int recvBufSize = int.TryParse(recvBufCmd, out int rbs) ? rbs : MIN_RECV_BUF_SIZE;
+ int maxHeadBufSize = int.TryParse(maxHeaderCharCmd, out int hbs) ? hbs : MIN_HEAD_BUF_SIZE;
+ int maxMessageSize = int.TryParse(maxMessageSizeCmd, out int mxs) ? mxs : MIN_MESSAGE_SIZE;
+
+ AsyncQueue<ChangeEvent>? nodeQueue = null;
+ //The connection may be a caching server node, so get its node-id
+ if (!string.IsNullOrWhiteSpace(nodeId))
+ {
+ /*
+ * Store a new async queue, or get an old queue for the current node
+ *
+ * We should use a bounded queue and disacard LRU items, we also know
+ * only a single writer is needed as the queue is processed on a single thread
+ * and change events may be processed on mutliple threads.
+ */
+
+ BoundedChannelOptions queueOptions = new(MAX_EVENT_QUEUE_SIZE)
+ {
+ AllowSynchronousContinuations = true,
+ SingleReader = false,
+ SingleWriter = true,
+ //Drop oldest item in queue if full
+ FullMode = BoundedChannelFullMode.DropOldest,
+ };
+
+ _ = StatefulEventQueue.TryAdd(nodeId, new(queueOptions));
+ //Get the queue
+ nodeQueue = StatefulEventQueue[nodeId];
+ }
+
+ //Init new ws state object and clamp the suggested buffer sizes
+ WsUserState state = new()
+ {
+ RecvBufferSize = Math.Clamp(recvBufSize, MIN_RECV_BUF_SIZE, MAX_RECV_BUF_SIZE),
+ MaxHeaderBufferSize = Math.Clamp(maxHeadBufSize, MIN_HEAD_BUF_SIZE, MAX_HEAD_BUF_SIZE),
+ MaxMessageSize = Math.Clamp(maxMessageSize, MIN_MESSAGE_SIZE, MAX_MESSAGE_SIZE),
+ MaxResponseBufferSize = Math.Min(maxMessageSize, MAX_RESPONSE_BUFFER_SIZE),
+ SyncQueue = nodeQueue
+ };
+
+ Log.Debug("Client recv buffer suggestion {recv}, header buffer size {head}, response buffer size {r}", recvBufCmd, maxHeaderCharCmd, state.MaxResponseBufferSize);
+
+ //Accept socket and pass state object
+ entity.AcceptWebSocket(WebsocketAcceptedAsync, state);
+ return VfReturnType.VirtualSkip;
+ }
+ catch (KeyNotFoundException)
+ {
+ return VfReturnType.BadRequest;
+ }
+ }
+
+ private async Task WebsocketAcceptedAsync(WebSocketSession wss)
+ {
+ //Inc connected count
+ Interlocked.Increment(ref _connectedClients);
+ //Register plugin exit token to cancel the connected socket
+ CancellationTokenRegistration reg = Pbase.UnloadToken.Register(wss.CancelAll);
+ try
+ {
+ WsUserState state = (wss.UserState as WsUserState)!;
+
+ //Init listener args from request
+ FBMListenerSessionParams args = new()
+ {
+ MaxMessageSize = state.MaxMessageSize,
+ RecvBufferSize = state.RecvBufferSize,
+ ResponseBufferSize = state.MaxResponseBufferSize,
+ MaxHeaderBufferSize = state.MaxHeaderBufferSize,
+ HeaderEncoding = Helpers.DefaultEncoding,
+ };
+
+ //Listen for requests
+ await Store.ListenAsync(wss, args, state.SyncQueue);
+ }
+ catch (OperationCanceledException)
+ {
+ Log.Debug("Websocket connection was canceled");
+ //Disconnect the socket
+ await wss.CloseSocketOutputAsync(System.Net.WebSockets.WebSocketCloseStatus.NormalClosure, "unload", CancellationToken.None);
+ }
+ catch (Exception ex)
+ {
+ Log.Debug(ex);
+ }
+ finally
+ {
+ //Dec connected count
+ Interlocked.Decrement(ref _connectedClients);
+ //Unregister the
+ reg.Unregister();
+ }
+ Log.Debug("Server websocket exited");
+ }
+ }
+}
diff --git a/plugins/ObjectCacheServer/src/ObjectCacheServer.csproj b/plugins/ObjectCacheServer/src/ObjectCacheServer.csproj
new file mode 100644
index 0000000..672597b
--- /dev/null
+++ b/plugins/ObjectCacheServer/src/ObjectCacheServer.csproj
@@ -0,0 +1,51 @@
+<Project Sdk="Microsoft.NET.Sdk">
+ <PropertyGroup>
+ <TargetFramework>net6.0</TargetFramework>
+ <Nullable>enable</Nullable>
+ <Authors>Vaughn Nugent</Authors>
+ <Version>1.0.1.1</Version>
+ <RootNamespace>VNLib.Plugins.Essentials.Sessions.Server</RootNamespace>
+ <Copyright>Copyright © 2023 Vaughn Nugent</Copyright>
+ <SignAssembly>True</SignAssembly>
+ <AssemblyOriginatorKeyFile>\\vaughnnugent.com\Internal\Folder Redirection\vman\Documents\Programming\Software\StrongNameingKey.snk</AssemblyOriginatorKeyFile>
+ </PropertyGroup>
+
+ <!-- Resolve nuget dll files and store them in the output dir -->
+ <PropertyGroup>
+ <EnableDynamicLoading>true</EnableDynamicLoading>
+ <GenerateDocumentationFile>False</GenerateDocumentationFile>
+ <PackageProjectUrl>https://www.vaughnnugent.com/resources</PackageProjectUrl>
+ <AnalysisLevel>latest-all</AnalysisLevel>
+ <ProduceReferenceAssembly>True</ProduceReferenceAssembly>
+
+ </PropertyGroup>
+ <ItemGroup>
+ <Compile Remove="liveplugin2\**" />
+ <Compile Remove="liveplugin\**" />
+ <EmbeddedResource Remove="liveplugin2\**" />
+ <EmbeddedResource Remove="liveplugin\**" />
+ <None Remove="liveplugin2\**" />
+ <None Remove="liveplugin\**" />
+ </ItemGroup>
+ <ItemGroup>
+ <PackageReference Include="ErrorProne.NET.CoreAnalyzers" Version="0.1.2">
+ <PrivateAssets>all</PrivateAssets>
+ <IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
+ </PackageReference>
+ <PackageReference Include="ErrorProne.NET.Structs" Version="0.1.2">
+ <PrivateAssets>all</PrivateAssets>
+ <IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
+ </PackageReference>
+ </ItemGroup>
+ <ItemGroup>
+ <ProjectReference Include="..\..\..\lib\VNLib.Data.Caching.Extensions\src\VNLib.Data.Caching.Extensions.csproj" />
+ <ProjectReference Include="..\..\..\lib\VNLib.Data.Caching.ObjectCache\src\VNLib.Data.Caching.ObjectCache.csproj" />
+ <ProjectReference Include="..\..\CacheBroker\src\CacheBroker.csproj" />
+ </ItemGroup>
+ <ItemGroup>
+ <None Update="ObjectCacheServer.json">
+ <CopyToOutputDirectory>Always</CopyToOutputDirectory>
+ </None>
+ </ItemGroup>
+
+</Project>
diff --git a/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs b/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs
new file mode 100644
index 0000000..85a7996
--- /dev/null
+++ b/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs
@@ -0,0 +1,567 @@
+/*
+* Copyright (c) 2022 Vaughn Nugent
+*
+* Library: VNLib
+* Package: ObjectCacheServer
+* File: ObjectCacheServerEntry.cs
+*
+* ObjectCacheServerEntry.cs is part of ObjectCacheServer which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* ObjectCacheServer is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* ObjectCacheServer is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+using System.IO;
+using System.Net;
+using System.Linq;
+using System.Net.Http;
+using System.Text.Json;
+using System.Threading;
+using System.Net.Sockets;
+using System.Threading.Tasks;
+using System.Collections.Generic;
+using System.Security.Cryptography;
+
+using VNLib.Utils.Memory;
+using VNLib.Utils.Logging;
+using VNLib.Utils.Extensions;
+using VNLib.Hashing;
+using VNLib.Hashing.IdentityUtility;
+using VNLib.Data.Caching;
+using VNLib.Data.Caching.Extensions;
+using VNLib.Data.Caching.ObjectCache;
+using static VNLib.Data.Caching.Constants;
+using VNLib.Net.Messaging.FBM;
+using VNLib.Net.Messaging.FBM.Client;
+using VNLib.Plugins.Cache.Broker.Endpoints;
+using VNLib.Plugins.Extensions.Loading;
+using VNLib.Plugins.Extensions.Loading.Routing;
+using VNLib.Plugins.Essentials.Sessions.Server.Endpoints;
+
+
+namespace VNLib.Plugins.Essentials.Sessions.Server
+{
+ public sealed class ObjectCacheServerEntry : PluginBase
+ {
+ public override string PluginName => "ObjectCache.Service";
+
+ private string? BrokerHeartBeatToken;
+
+ private readonly object ServerLock = new();
+ private readonly HashSet<ActiveServer> ListeningServers = new();
+
+
+ private void RemoveServer(ActiveServer server)
+ {
+ lock (ServerLock)
+ {
+ ListeningServers.Remove(server);
+ }
+ }
+
+ protected override void OnLoad()
+ {
+ //Create default heap
+ IUnmangedHeap CacheHeap = Memory.InitializeNewHeapForProcess();
+ try
+ {
+ IReadOnlyDictionary<string, JsonElement> clusterConf = this.GetConfig("cluster");
+
+ string brokerAddress = clusterConf["broker_address"].GetString() ?? throw new KeyNotFoundException("Missing required key 'broker_address' for config 'cluster'");
+
+ string swapDir = PluginConfig.GetProperty("swap_dir").GetString() ?? throw new KeyNotFoundException("Missing required key 'swap_dir' for config");
+ int cacheSize = PluginConfig.GetProperty("max_cache").GetInt32();
+ string connectPath = PluginConfig.GetProperty("connect_path").GetString() ?? throw new KeyNotFoundException("Missing required element 'connect_path' for config 'cluster'");
+ //TimeSpan cleanupInterval = PluginConfig.GetProperty("cleanup_interval_sec").GetTimeSpan(TimeParseType.Seconds);
+ //TimeSpan validFor = PluginConfig.GetProperty("valid_for_sec").GetTimeSpan(TimeParseType.Seconds);
+ int maxMessageSize = PluginConfig.GetProperty("max_blob_size").GetInt32();
+
+ //Init dir
+ DirectoryInfo dir = new(swapDir);
+ dir.Create();
+ //Init cache listener, single threaded reader
+ ObjectCacheStore CacheListener = new(dir, cacheSize, Log, CacheHeap, true);
+
+ //Init connect endpoint
+ {
+ //Init connect endpoint
+ ConnectEndpoint endpoint = new(connectPath, CacheListener, this);
+ Route(endpoint);
+ }
+
+ //Setup broker and regitration
+ {
+ //init mre to pass the broker heartbeat signal to the registration worker
+ ManualResetEvent mre = new(false);
+ //Route the broker endpoint
+ BrokerHeartBeat brokerEp = new(() => BrokerHeartBeatToken!, mre, new Uri(brokerAddress), this);
+ Route(brokerEp);
+
+ //start registration
+ _ = this.DeferTask(() => RegisterServerAsync(mre), 200);
+ }
+
+ //Setup cluster worker
+ {
+ //Get pre-configured fbm client config for caching
+ FBMClientConfig conf = FBMDataCacheExtensions.GetDefaultConfig(CacheHeap, maxMessageSize, this.IsDebug() ? Log : null);
+
+ //Start Client runner
+ _ = this.DeferTask(() => RunClientAsync(CacheListener, new Uri(brokerAddress), conf), 300);
+ }
+
+ //Load a cache broker to the current server if the config is defined
+ {
+ if(this.HasConfigForType<BrokerRegistrationEndpoint>())
+ {
+ this.Route<BrokerRegistrationEndpoint>();
+ }
+ }
+
+ void Cleanup()
+ {
+ CacheHeap.Dispose();
+ CacheListener.Dispose();
+ }
+
+ //Regsiter cleanup
+ _ = UnloadToken.RegisterUnobserved(Cleanup);
+
+ Log.Information("Plugin loaded");
+ }
+ catch (KeyNotFoundException kne)
+ {
+ CacheHeap.Dispose();
+ Log.Error("Missing required configuration variables {m}", kne.Message);
+ }
+ catch
+ {
+ CacheHeap.Dispose();
+ throw;
+ }
+ }
+
+ protected override void OnUnLoad()
+ {
+ Log.Information("Plugin unloaded");
+ }
+
+ #region Registration
+
+ private async Task RegisterServerAsync(ManualResetEvent keepaliveWait)
+ {
+ try
+ {
+ //Get the broker config element
+ IReadOnlyDictionary<string, JsonElement> clusterConfig = this.GetConfig("cluster");
+
+ //Server id is just dns name for now
+ string serverId = Dns.GetHostName();
+ int heartBeatDelayMs = clusterConfig["heartbeat_timeout_sec"].GetInt32() * 1000;
+
+ string? connectPath = PluginConfig.GetProperty("connect_path").GetString();
+
+ //Get the port of the primary webserver
+ int port;
+ bool usingTls;
+ {
+ JsonElement firstHost = HostConfig.GetProperty("virtual_hosts").EnumerateArray().First();
+
+ port = firstHost.GetProperty("interface")
+ .GetProperty("port")
+ .GetInt32();
+
+ //If a certificate is specified, tls is enabled on the port
+ usingTls = firstHost.TryGetProperty("cert", out _);
+ }
+
+ using BrokerRegistrationRequest request = new();
+ {
+ string addr = clusterConfig["broker_address"].GetString() ?? throw new KeyNotFoundException("Missing required key 'broker_address' for config 'cluster'");
+
+ //Recover the certificate
+ ReadOnlyJsonWebKey cacheCert = await GetCachePrivate();
+
+ //Init url builder for payload, see if tls is enabled
+ Uri connectAddress = new UriBuilder(usingTls ? Uri.UriSchemeHttps : Uri.UriSchemeHttp, Dns.GetHostName(), port, connectPath).Uri;
+
+ request.WithBroker(new(addr))
+ .WithRegistrationAddress(connectAddress.ToString())
+ .WithNodeId(serverId)
+ .WithSigningKey(cacheCert, true);
+ }
+
+ while (true)
+ {
+ try
+ {
+ //Gen a random reg token before registering
+ BrokerHeartBeatToken = RandomHash.GetRandomHex(32);
+ //Assign new hb token
+ request.WithHeartbeatToken(BrokerHeartBeatToken);
+
+ Log.Information("Registering with cache broker {addr}, with node-id {id}", request.BrokerAddress, serverId);
+
+ //Register with the broker
+ await FBMDataCacheExtensions.ResgisterWithBrokerAsync(request);
+
+ Log.Debug("Successfully registered with cache broker");
+
+ /*
+ * Wait in a loop for the broker to send a keepalive
+ * request with the specified token. When the event
+ * is signaled the task will be completed
+ */
+ while (true)
+ {
+ await Task.Delay(heartBeatDelayMs, UnloadToken);
+ //Set the timeout to 0 to it will just check the status without blocking
+ if (!keepaliveWait.WaitOne(0))
+ {
+ //server miseed a keepalive event, time to break the loop and retry
+ Log.Debug("Broker missed a heartbeat request, attempting to re-register");
+ break;
+ }
+ //Reset the msr
+ keepaliveWait.Reset();
+ }
+ }
+ catch (TaskCanceledException)
+ {
+ throw;
+ }
+ catch (TimeoutException)
+ {
+ Log.Warn("Failed to connect to cache broker server within the specified timeout period");
+ }
+ catch (HttpRequestException re) when (re.InnerException is SocketException)
+ {
+ Log.Warn("Cache broker is unavailable or network is unavailable");
+ }
+ catch (Exception ex)
+ {
+ Log.Warn(ex, "Failed to update broker registration");
+ }
+
+ //Gen random ms delay
+ int randomMsDelay = RandomNumberGenerator.GetInt32(500, 2000);
+ //Delay
+ await Task.Delay(randomMsDelay, UnloadToken);
+ }
+ }
+ catch (KeyNotFoundException kne)
+ {
+ Log.Error("Missing required broker configuration variables {ke}", kne.Message);
+ }
+ catch (TaskCanceledException)
+ {
+ //Normal unload/exit
+ }
+ catch (Exception ex)
+ {
+ Log.Error(ex);
+ }
+ finally
+ {
+ keepaliveWait.Dispose();
+ BrokerHeartBeatToken = null;
+ }
+ Log.Debug("Registration worker exited");
+ }
+
+ #endregion
+
+ #region Cluster
+
+ private async Task<ReadOnlyJsonWebKey> GetCachePrivate()
+ {
+ return await this.TryGetSecretAsync("cache_private_key").ToJsonWebKey() ?? throw new KeyNotFoundException("Failed to load the cache private key");
+ }
+
+ private async Task<ReadOnlyJsonWebKey> GetBrokerPublic()
+ {
+ return await this.TryGetSecretAsync("broker_public_key").ToJsonWebKey() ?? throw new KeyNotFoundException("Failed to load the broker's public key");
+ }
+
+
+ /// <summary>
+ /// Starts a self-contained process-long task to discover other cache servers
+ /// from a shared broker server
+ /// </summary>
+ /// <param name="cacheStore">The cache store to synchronize</param>
+ /// <param name="brokerAddress">The broker server's address</param>
+ /// <param name="serverId">The node-id of the current server</param>
+ /// <param name="clientConf">The configuration to use when initializing synchronization clients</param>
+ /// <returns>A task that resolves when the plugin unloads</returns>
+ private async Task RunClientAsync(ObjectCacheStore cacheStore, Uri brokerAddress, FBMClientConfig clientConf)
+ {
+ TimeSpan noServerDelay = TimeSpan.FromSeconds(10);
+ string nodeId = Dns.GetHostName();
+ ListServerRequest listRequest = new(brokerAddress);
+ try
+ {
+ //Get the broker config element
+ IReadOnlyDictionary<string, JsonElement> clusterConf = this.GetConfig("cluster");
+ int serverCheckMs = clusterConf["update_interval_sec"].GetInt32() * 1000;
+
+ //Setup signing and verification certificates
+ ReadOnlyJsonWebKey cacheSig = await GetCachePrivate();
+ ReadOnlyJsonWebKey brokerPub = await GetBrokerPublic();
+ //Import certificates
+ listRequest.WithVerificationKey(brokerPub)
+ .WithSigningKey(cacheSig);
+
+ //Main event loop
+ Log.Information("Begining cluster node discovery");
+
+ ILogProvider? debugLog = this.IsDebug() ? Log : null;
+
+ while (true)
+ {
+ //Load the server list
+ ActiveServer[]? servers;
+ while (true)
+ {
+ try
+ {
+ debugLog?.Information("[CACHE] Requesting server list from broker");
+
+ //Get server list
+ servers = await FBMDataCacheExtensions.ListServersAsync(listRequest, UnloadToken);
+ //Servers are loaded, so continue
+ break;
+ }
+ catch(HttpRequestException he) when(he.InnerException is SocketException)
+ {
+ Log.Warn("Failed to connect to cache broker, trying again");
+ }
+ catch (TimeoutException)
+ {
+ Log.Warn("Failed to connect to cache broker server within the specified timeout period");
+ }
+ catch (Exception ex)
+ {
+ Log.Warn(ex, "Failed to get server list from broker");
+ }
+ //Gen random ms delay
+ int randomMsDelay = RandomNumberGenerator.GetInt32(1000, 2000);
+ //Delay
+ await Task.Delay(randomMsDelay, UnloadToken);
+ }
+
+ if(servers == null || servers.Length == 0)
+ {
+ Log.Information("No cluster nodes found, retrying");
+ //Delay
+ await Task.Delay(noServerDelay, UnloadToken);
+ continue;
+ }
+
+
+ //Lock on sever set while enumerating
+ lock (ServerLock)
+ {
+ //Select servers that are not the current server and are not already being monitored
+ IEnumerable<ActiveServer> serversToConnectTo = servers.Where(s => !nodeId.Equals(s.ServerId, StringComparison.OrdinalIgnoreCase));
+
+ //Connect to servers
+ foreach (ActiveServer server in serversToConnectTo)
+ {
+ //Make sure were not currently connected to the server
+ if (!ListeningServers.Contains(server))
+ {
+ //Add the server to the set
+ ListeningServers.Add(server);
+
+ //Run listener background task
+ _ = this.DeferTask(() => RunSyncTaskAsync(server, cacheStore, clientConf, nodeId));
+ }
+ }
+ }
+
+ //Delay until next check cycle
+ await Task.Delay(serverCheckMs, UnloadToken);
+ }
+ }
+ catch (FileNotFoundException)
+ {
+ Log.Error("Client/cluster private cluster key file was not found or could not be read");
+ }
+ catch (KeyNotFoundException)
+ {
+ Log.Error("Missing required cluster configuration varables");
+ }
+ catch (TaskCanceledException)
+ {
+ }
+ catch (Exception ex)
+ {
+ Log.Error(ex);
+ }
+ finally
+ {
+ listRequest.Dispose();
+ }
+ Log.Debug("Cluster sync worker exited");
+ }
+
+ private async Task RunSyncTaskAsync(ActiveServer server, ObjectCacheStore cacheStore, FBMClientConfig conf, string nodeId)
+ {
+ //Setup client
+ FBMClient client = new(conf);
+ try
+ {
+ async Task UpdateRecordAsync(string objectId, string newId)
+ {
+ //Get request message
+ FBMRequest modRequest = client.RentRequest();
+ try
+ {
+ //Set action as get/create
+ modRequest.WriteHeader(HeaderCommand.Action, Actions.Get);
+ //Set session-id header
+ modRequest.WriteHeader(ObjectId, string.IsNullOrWhiteSpace(newId) ? objectId : newId);
+
+ //Make request
+ using FBMResponse response = await client.SendAsync(modRequest, UnloadToken);
+
+ response.ThrowIfNotSet();
+
+ //Check response code
+ string status = response.Headers.First(static s => s.Key == HeaderCommand.Status).Value.ToString();
+ if (ResponseCodes.Okay.Equals(status, StringComparison.Ordinal))
+ {
+ //Update the record
+ await cacheStore.AddOrUpdateBlobAsync(objectId, newId, static (t) => t.ResponseBody, response);
+ Log.Debug("Updated object {id}", objectId);
+ }
+ else
+ {
+ Log.Warn("Object {id} was missing on the remote server", objectId);
+ }
+ }
+ finally
+ {
+ client.ReturnRequest(modRequest);
+ }
+ }
+
+ {
+ //Sign and verify requests with the cache private key since we are a peer
+ ReadOnlyJsonWebKey cachePriv = await GetCachePrivate();
+
+ //Configure cache
+ client.GetCacheConfiguration()
+ .WithVerificationKey(cachePriv)
+ .WithSigningCertificate(cachePriv)
+ .WithNodeId(nodeId) //set nodeid since were listening for changes
+ .WithTls(false);
+ }
+
+ Log.Information("Connecting to {server}...", server.ServerId);
+
+ //Connect to the server
+ await client.ConnectToCacheAsync(server, UnloadToken);
+
+ //Wroker task callback method
+ async Task BgWorkerAsync()
+ {
+ //Listen for changes
+ while (true)
+ {
+ //Wait for changes
+ WaitForChangeResult changedObject = await client.WaitForChangeAsync(UnloadToken);
+
+ Log.Debug("Object changed {typ} {obj}", changedObject.Status, changedObject.CurrentId);
+
+ switch (changedObject.Status)
+ {
+ case ResponseCodes.NotFound:
+ Log.Warn("Server cache not properly configured, worker exiting");
+ return;
+ case "deleted":
+ //Delete the object from the store
+ _ = cacheStore.DeleteItemAsync(changedObject.CurrentId).ConfigureAwait(false);
+ break;
+ case "modified":
+ //Reload the record from the store
+ await UpdateRecordAsync(changedObject.CurrentId, changedObject.NewId);
+ break;
+ }
+ }
+ }
+
+ Log.Information("Connected to {server}, starting queue listeners", server.ServerId);
+
+ //Start worker tasks
+ List<Task> workerTasks = new();
+ for(int i = 0; i < Environment.ProcessorCount; i++)
+ {
+ workerTasks.Add(Task.Run(BgWorkerAsync));
+ }
+
+ //Wait for sync workers to exit
+ await Task.WhenAll(workerTasks);
+ }
+ catch (InvalidResponseException ie)
+ {
+ //See if the plugin is unloading
+ if (!UnloadToken.IsCancellationRequested)
+ {
+ Log.Debug("Server responded with invalid response packet, disconnected. reason {reason}", ie);
+ }
+ //Disconnect client gracefully
+ try
+ {
+ await client.DisconnectAsync();
+ }
+ catch (Exception ex)
+ {
+ Log.Error(ex);
+ }
+ }
+ catch (OperationCanceledException)
+ {
+ //Plugin unloading, Try to disconnect
+ try
+ {
+ await client.DisconnectAsync();
+ }
+ catch(Exception ex)
+ {
+ Log.Error(ex);
+ }
+ }
+ catch(Exception ex)
+ {
+ Log.Warn("Lost connection to server {h}, {m}", server.ServerId, ex);
+ }
+ finally
+ {
+ //Remove server from active list, since its been disconnected
+ RemoveServer(server);
+ client.Dispose();
+ }
+ }
+
+ protected override void ProcessHostCommand(string cmd)
+ {
+ Log.Debug(cmd);
+ }
+
+
+ #endregion
+ }
+}