aboutsummaryrefslogtreecommitdiff
path: root/plugins
diff options
context:
space:
mode:
Diffstat (limited to 'plugins')
-rw-r--r--plugins/CacheBroker/src/CacheBroker.csproj26
-rw-r--r--plugins/CacheBroker/src/Endpoints/BrokerRegistrationEndpoint.cs13
-rw-r--r--plugins/ObjectCacheServer/src/Endpoints/BrokerHeartBeat.cs4
-rw-r--r--plugins/ObjectCacheServer/src/Endpoints/CacheConfiguration.cs54
-rw-r--r--plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs220
-rw-r--r--plugins/ObjectCacheServer/src/Endpoints/ICacheStore.cs56
-rw-r--r--plugins/ObjectCacheServer/src/ObjectCacheServer.csproj31
-rw-r--r--plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs150
8 files changed, 395 insertions, 159 deletions
diff --git a/plugins/CacheBroker/src/CacheBroker.csproj b/plugins/CacheBroker/src/CacheBroker.csproj
index 285d797..fa28dce 100644
--- a/plugins/CacheBroker/src/CacheBroker.csproj
+++ b/plugins/CacheBroker/src/CacheBroker.csproj
@@ -2,21 +2,15 @@
<PropertyGroup>
<TargetFramework>net6.0</TargetFramework>
- <Copyright>Copyright © 2023 Vaughn Nugent</Copyright>
<RootNamespace>VNLib.Plugins.Cache.Broker</RootNamespace>
- <Authors>Vaughn Nugent</Authors>
<Version>1.0.1.2</Version>
+ <GenerateDocumentationFile>True</GenerateDocumentationFile>
+ <AnalysisLevel>latest-all</AnalysisLevel>
<SignAssembly>True</SignAssembly>
<AssemblyOriginatorKeyFile>\\vaughnnugent.com\Internal\Folder Redirection\vman\Documents\Programming\Software\StrongNameingKey.snk</AssemblyOriginatorKeyFile>
</PropertyGroup>
<ItemGroup>
- <Compile Remove="liveplugin\**" />
- <EmbeddedResource Remove="liveplugin\**" />
- <None Remove="liveplugin\**" />
- </ItemGroup>
-
- <ItemGroup>
<PackageReference Include="ErrorProne.NET.CoreAnalyzers" Version="0.1.2">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
@@ -26,14 +20,13 @@
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
</ItemGroup>
-
+
<PropertyGroup>
<!--Enable dynamic loading-->
<EnableDynamicLoading>true</EnableDynamicLoading>
- <GenerateDocumentationFile>True</GenerateDocumentationFile>
+ <Authors>Vaughn Nugent</Authors>
+ <Copyright>Copyright © 2023 Vaughn Nugent</Copyright>
<PackageProjectUrl>https://www.vaughnnugent.com/resources/software</PackageProjectUrl>
- <AnalysisLevel>latest-all</AnalysisLevel>
-
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'">
@@ -50,12 +43,5 @@
<ProjectReference Include="..\..\..\..\..\core\lib\Utils\src\VNLib.Utils.csproj" />
<ProjectReference Include="..\..\..\..\Extensions\lib\VNLib.Plugins.Extensions.Loading\src\VNLib.Plugins.Extensions.Loading.csproj" />
</ItemGroup>
-
-
- <ItemGroup>
- <None Update="CacheBroker.json">
- <CopyToOutputDirectory>Always</CopyToOutputDirectory>
- </None>
- </ItemGroup>
-
+
</Project>
diff --git a/plugins/CacheBroker/src/Endpoints/BrokerRegistrationEndpoint.cs b/plugins/CacheBroker/src/Endpoints/BrokerRegistrationEndpoint.cs
index c2e0b84..8f983ac 100644
--- a/plugins/CacheBroker/src/Endpoints/BrokerRegistrationEndpoint.cs
+++ b/plugins/CacheBroker/src/Endpoints/BrokerRegistrationEndpoint.cs
@@ -56,11 +56,12 @@ using VNLib.Net.Rest.Client;
namespace VNLib.Plugins.Cache.Broker.Endpoints
{
[ConfigurationName("broker_endpoint")]
- public sealed class BrokerRegistrationEndpoint : ResourceEndpointBase
+ public sealed class BrokerRegistrationEndpoint : ResourceEndpointBase, IDisposable
{
const string HEARTBEAT_PATH = "/heartbeat";
- private static readonly RestClientPool ClientPool = new(10,new RestClientOptions()
+ //Client pool is instance based and may be disposed when the plugin is unloaded
+ private readonly RestClientPool ClientPool = new(10,new RestClientOptions()
{
Encoding = Encoding.UTF8,
FollowRedirects = false,
@@ -69,6 +70,7 @@ namespace VNLib.Plugins.Cache.Broker.Endpoints
}, null);
+ //Matches the json schema set by the FBM caching extensions library
private class ActiveServer
{
[JsonIgnore]
@@ -408,5 +410,12 @@ namespace VNLib.Plugins.Cache.Broker.Endpoints
_ = ActiveServers.Remove(server.ServerId!);
}
}
+
+
+ void IDisposable.Dispose()
+ {
+ //Cleanup client pool when exiting
+ ClientPool.Dispose();
+ }
}
}
diff --git a/plugins/ObjectCacheServer/src/Endpoints/BrokerHeartBeat.cs b/plugins/ObjectCacheServer/src/Endpoints/BrokerHeartBeat.cs
index bd1233e..3930f90 100644
--- a/plugins/ObjectCacheServer/src/Endpoints/BrokerHeartBeat.cs
+++ b/plugins/ObjectCacheServer/src/Endpoints/BrokerHeartBeat.cs
@@ -30,12 +30,14 @@ using System.Threading;
using System.Threading.Tasks;
using System.Collections.Generic;
+using VNLib.Plugins;
+using VNLib.Plugins.Essentials;
using VNLib.Hashing.IdentityUtility;
using VNLib.Plugins.Essentials.Endpoints;
using VNLib.Plugins.Essentials.Extensions;
using VNLib.Plugins.Extensions.Loading;
-namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints
+namespace VNLib.Data.Caching.ObjectCache.Server
{
internal sealed class BrokerHeartBeat : ResourceEndpointBase
{
diff --git a/plugins/ObjectCacheServer/src/Endpoints/CacheConfiguration.cs b/plugins/ObjectCacheServer/src/Endpoints/CacheConfiguration.cs
new file mode 100644
index 0000000..e9584b6
--- /dev/null
+++ b/plugins/ObjectCacheServer/src/Endpoints/CacheConfiguration.cs
@@ -0,0 +1,54 @@
+/*
+* Copyright (c) 2022 Vaughn Nugent
+*
+* Library: VNLib
+* Package: ObjectCacheServer
+* File: ConnectEndpoint.cs
+*
+* ConnectEndpoint.cs is part of ObjectCacheServer which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* ObjectCacheServer is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* ObjectCacheServer is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System.Text.Json.Serialization;
+
+namespace VNLib.Data.Caching.ObjectCache.Server
+{
+ internal sealed class CacheConfiguration
+ {
+ [JsonPropertyName("buffer_recv_max")]
+ public int MaxRecvBufferSize { get; set; } = 1000 * 1024;
+ [JsonPropertyName("buffer_recv_min")]
+ public int MinRecvBufferSize { get; set; } = 8 * 1024;
+
+
+ [JsonPropertyName("buffer_header_max")]
+ public int MaxHeaderBufferSize { get; set; } = 2 * 1024;
+ [JsonPropertyName("buffer_header_min")]
+ public int MinHeaderBufferSize { get; set; } = 128;
+
+
+ [JsonPropertyName("max_message_size")]
+ public int MaxMessageSize { get; set; } = 1000 * 1024;
+
+
+ [JsonPropertyName("change_queue_max_depth")]
+ public int MaxEventQueueDepth { get; set; } = 10 * 1000;
+
+
+ [JsonPropertyName("max_cache")]
+ public int MaxCacheEntries { get; set; } = 10000;
+ }
+}
diff --git a/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs b/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs
index 15cc086..9a1ece0 100644
--- a/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs
+++ b/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs
@@ -31,34 +31,29 @@ using System.Threading.Channels;
using System.Collections.Generic;
using System.Collections.Concurrent;
-using VNLib.Net.Http;
+using VNLib.Plugins;
using VNLib.Hashing;
+using VNLib.Net.Http;
using VNLib.Utils.Async;
+using VNLib.Utils.Memory;
using VNLib.Utils.Logging;
using VNLib.Hashing.IdentityUtility;
using VNLib.Net.Messaging.FBM;
using VNLib.Net.Messaging.FBM.Client;
using VNLib.Net.Messaging.FBM.Server;
-using VNLib.Data.Caching.ObjectCache;
+using VNLib.Plugins.Essentials;
using VNLib.Plugins.Extensions.Loading;
using VNLib.Plugins.Essentials.Endpoints;
using VNLib.Plugins.Essentials.Extensions;
+using System.Text.Json.Serialization;
-
-namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints
+namespace VNLib.Data.Caching.ObjectCache.Server
{
- internal sealed class ConnectEndpoint : ResourceEndpointBase
- {
- const int MAX_RECV_BUF_SIZE = 1000 * 1024;
- const int MIN_RECV_BUF_SIZE = 8 * 1024;
- const int MAX_HEAD_BUF_SIZE = 2048;
- const int MIN_MESSAGE_SIZE = 10 * 1024;
- const int MAX_MESSAGE_SIZE = 1000 * 1024;
- const int MIN_HEAD_BUF_SIZE = 128;
- const int MAX_EVENT_QUEUE_SIZE = 10000;
- const int MAX_RESPONSE_BUFFER_SIZE = 10 * 1024;
- private static readonly TimeSpan AuthTokenExpiration = TimeSpan.FromSeconds(30);
+ [ConfigurationName("store")]
+ internal sealed class ConnectEndpoint : ResourceEndpointBase, IDisposable, IAsyncBackgroundWork
+ {
+ private static readonly TimeSpan AuthTokenExpiration = TimeSpan.FromSeconds(30);
private readonly string AudienceLocalServerId;
private readonly ObjectCacheStore Store;
@@ -68,8 +63,16 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints
private uint _connectedClients;
+ /// <summary>
+ /// Gets the number of active connections
+ /// </summary>
public uint ConnectedClients => _connectedClients;
+ /// <summary>
+ /// The cache store configuration
+ /// </summary>
+ public CacheConfiguration CacheConfig { get; }
+
//Loosen up protection settings
protected override ProtectionSettings EndpointProtectionSettings { get; } = new()
{
@@ -78,20 +81,89 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints
DisableCrossSiteDenied = true
};
- public ConnectEndpoint(string path, ObjectCacheStore store, PluginBase pbase)
+ public ConnectEndpoint(PluginBase plugin, IReadOnlyDictionary<string, JsonElement> config)
{
- InitPathAndLog(path, pbase.Log);
- Store = store;//Load client public key to verify signed messages
- Pbase = pbase;
+ string? path = config["path"].GetString();
+ InitPathAndLog(path, plugin.Log);
+
+ Pbase = plugin;
+
+ //Parse cache config or use default
+ if(config.TryGetValue("cache", out JsonElement confEl))
+ {
+ CacheConfig = confEl.Deserialize<CacheConfiguration>()!;
+ }
+ else
+ {
+ //Init default config if not fount
+ CacheConfig = new();
+
+ Log.Verbose("Loading default cache buffer configuration");
+ }
+
+ //Create event queue client lookup table
StatefulEventQueue = new(StringComparer.OrdinalIgnoreCase);
- //Start the queue worker
- _ = pbase.DeferTask(() => ChangeWorkerAsync(pbase.UnloadToken), 10);
+ //Init the cache store
+ Store = InitializeCache((ObjectCacheServerEntry)plugin, CacheConfig.MaxCacheEntries);
+ /*
+ * Generate a random guid for the current server when created so we
+ * know client tokens belong to us when singed by the same key
+ */
AudienceLocalServerId = Guid.NewGuid().ToString("N");
+
+ //Schedule the queue worker to be run
+ _ = plugin.ObserveWork(this, 100);
+ }
+
+ private static ObjectCacheStore InitializeCache(ObjectCacheServerEntry plugin, int maxCache)
+ {
+ if(maxCache < 2)
+ {
+ throw new ArgumentException("You must configure a 'max_cache' size larger than 1 item");
+ }
+
+ //Suggestion
+ if(maxCache < 200)
+ {
+ plugin.Log.Information("Suggestion: You may want a larger cache size, you have less than 200 items in cache");
+ }
+
+ //Endpoint only allows for a single reader
+ return new (maxCache, plugin.Log, plugin.CacheHeap, true);
}
+ /// <summary>
+ /// Gets the configured cache store
+ /// </summary>
+ /// <returns></returns>
+ public ICacheStore GetCacheStore() => new CacheStore(Store);
+
+
+ //Dispose will be called by the host plugin on unload
+ void IDisposable.Dispose()
+ {
+ //Dispose the store on cleanup
+ Store.Dispose();
+ }
+
+
+ private async Task<ReadOnlyJsonWebKey> GetClientPubAsync()
+ {
+ return await Pbase.TryGetSecretAsync("client_public_key").ToJsonWebKey() ?? throw new KeyNotFoundException("Missing required secret : client_public_key");
+ }
+ private async Task<ReadOnlyJsonWebKey> GetCachePubAsync()
+ {
+ return await Pbase.TryGetSecretAsync("cache_public_key").ToJsonWebKey() ?? throw new KeyNotFoundException("Missing required secret : client_public_key");
+ }
+ private async Task<ReadOnlyJsonWebKey> GetCachePrivateKeyAsync()
+ {
+ return await Pbase.TryGetSecretAsync("cache_private_key").ToJsonWebKey() ?? throw new KeyNotFoundException("Missing required secret : client_public_key");
+ }
+
+
/*
* Used as a client negotiation and verification request
*
@@ -132,7 +204,7 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints
{
verified = true;
}
- //May be signed by a cahce server
+ //May be signed by a cache server
else
{
using ReadOnlyJsonWebKey cacheCert = await GetCachePubAsync();
@@ -163,8 +235,10 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints
}
Log.Debug("Received negotiation request from node {node}", nodeId);
+
//Verified, now we can create an auth message with a short expiration
using JsonWebToken auth = new();
+
//Sign the auth message from the cache certificate's private key
using (ReadOnlyJsonWebKey cert = await GetCachePrivateKeyAsync())
{
@@ -179,9 +253,9 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints
//Specify the server's node id if set
.AddClaim("sub", nodeId!)
//Add negotiaion args
- .AddClaim(FBMClient.REQ_HEAD_BUF_QUERY_ARG, MAX_HEAD_BUF_SIZE)
- .AddClaim(FBMClient.REQ_RECV_BUF_QUERY_ARG, MAX_RECV_BUF_SIZE)
- .AddClaim(FBMClient.REQ_MAX_MESS_QUERY_ARG, MAX_MESSAGE_SIZE)
+ .AddClaim(FBMClient.REQ_HEAD_BUF_QUERY_ARG, CacheConfig.MaxHeaderBufferSize)
+ .AddClaim(FBMClient.REQ_RECV_BUF_QUERY_ARG, CacheConfig.MaxRecvBufferSize)
+ .AddClaim(FBMClient.REQ_MAX_MESS_QUERY_ARG, CacheConfig.MaxMessageSize)
.CommitClaims();
auth.SignFromJwk(cert);
@@ -192,27 +266,17 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints
return VfReturnType.VirtualSkip;
}
- private async Task<ReadOnlyJsonWebKey> GetClientPubAsync()
- {
- return await Pbase.TryGetSecretAsync("client_public_key").ToJsonWebKey() ?? throw new KeyNotFoundException("Missing required secret : client_public_key");
- }
- private async Task<ReadOnlyJsonWebKey> GetCachePubAsync()
- {
- return await Pbase.TryGetSecretAsync("cache_public_key").ToJsonWebKey() ?? throw new KeyNotFoundException("Missing required secret : client_public_key");
- }
- private async Task<ReadOnlyJsonWebKey> GetCachePrivateKeyAsync()
- {
- return await Pbase.TryGetSecretAsync("cache_private_key").ToJsonWebKey() ?? throw new KeyNotFoundException("Missing required secret : client_public_key");
- }
- private async Task ChangeWorkerAsync(CancellationToken cancellation)
+ //Background worker to process event queue items
+ async Task IAsyncBackgroundWork.DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken)
{
try
{
//Listen for changes
while (true)
{
- ChangeEvent ev = await Store.EventQueue.DequeueAsync(cancellation);
+ ChangeEvent ev = await Store.EventQueue.DequeueAsync(exitToken);
+
//Add event to queues
foreach (AsyncQueue<ChangeEvent> queue in StatefulEventQueue.Values)
{
@@ -224,10 +288,8 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints
}
}
catch (OperationCanceledException)
- { }
- catch (Exception ex)
{
- Log.Error(ex);
+ //Normal exit
}
}
@@ -238,6 +300,12 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints
public int MaxMessageSize { get; init; }
public int MaxResponseBufferSize { get; init; }
public AsyncQueue<ChangeEvent>? SyncQueue { get; init; }
+
+ public override string ToString()
+ {
+ return
+ $"{nameof(RecvBufferSize)}:{RecvBufferSize}, {nameof(MaxHeaderBufferSize)}: {MaxHeaderBufferSize}, {nameof(MaxMessageSize)}:{MaxMessageSize}, {nameof(MaxResponseBufferSize)}:{MaxResponseBufferSize}";
+ }
}
protected override async ValueTask<VfReturnType> WebsocketRequestedAsync(HttpEntity entity)
@@ -246,6 +314,7 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints
{
//Parse jwt from authorization
string? jwtAuth = entity.Server.Headers[HttpRequestHeader.Authorization];
+
if (string.IsNullOrWhiteSpace(jwtAuth))
{
entity.CloseResponse(HttpStatusCode.Unauthorized);
@@ -253,6 +322,7 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints
}
string? nodeId = null;
+
//Parse jwt
using (JsonWebToken jwt = JsonWebToken.Parse(jwtAuth))
{
@@ -301,11 +371,12 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints
string maxMessageSizeCmd = entity.QueryArgs[FBMClient.REQ_MAX_MESS_QUERY_ARG];
//Parse recv buffer size
- int recvBufSize = int.TryParse(recvBufCmd, out int rbs) ? rbs : MIN_RECV_BUF_SIZE;
- int maxHeadBufSize = int.TryParse(maxHeaderCharCmd, out int hbs) ? hbs : MIN_HEAD_BUF_SIZE;
- int maxMessageSize = int.TryParse(maxMessageSizeCmd, out int mxs) ? mxs : MIN_MESSAGE_SIZE;
+ int recvBufSize = int.TryParse(recvBufCmd, out int rbs) ? rbs : CacheConfig.MinRecvBufferSize;
+ int maxHeadBufSize = int.TryParse(maxHeaderCharCmd, out int hbs) ? hbs : CacheConfig.MinHeaderBufferSize;
+ int maxMessageSize = int.TryParse(maxMessageSizeCmd, out int mxs) ? mxs : CacheConfig.MaxMessageSize;
AsyncQueue<ChangeEvent>? nodeQueue = null;
+
//The connection may be a caching server node, so get its node-id
if (!string.IsNullOrWhiteSpace(nodeId))
{
@@ -317,7 +388,7 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints
* and change events may be processed on mutliple threads.
*/
- BoundedChannelOptions queueOptions = new(MAX_EVENT_QUEUE_SIZE)
+ BoundedChannelOptions queueOptions = new(CacheConfig.MaxEventQueueDepth)
{
AllowSynchronousContinuations = true,
SingleReader = false,
@@ -327,21 +398,41 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints
};
_ = StatefulEventQueue.TryAdd(nodeId, new(queueOptions));
+
//Get the queue
nodeQueue = StatefulEventQueue[nodeId];
}
-
+
+ /*
+ * Buffer sizing can get messy as the response/resquest sizes can vary
+ * and will include headers, this is a drawback of the FBM protocol
+ * so we need to properly calculate efficient buffer sizes as
+ * negotiated with the client.
+ */
+
+ int maxMessageSizeClamp = Math.Clamp(maxMessageSize, CacheConfig.MinRecvBufferSize, CacheConfig.MaxRecvBufferSize);
+
//Init new ws state object and clamp the suggested buffer sizes
WsUserState state = new()
{
- RecvBufferSize = Math.Clamp(recvBufSize, MIN_RECV_BUF_SIZE, MAX_RECV_BUF_SIZE),
- MaxHeaderBufferSize = Math.Clamp(maxHeadBufSize, MIN_HEAD_BUF_SIZE, MAX_HEAD_BUF_SIZE),
- MaxMessageSize = Math.Clamp(maxMessageSize, MIN_MESSAGE_SIZE, MAX_MESSAGE_SIZE),
- MaxResponseBufferSize = Math.Min(maxMessageSize, MAX_RESPONSE_BUFFER_SIZE),
+ RecvBufferSize = Math.Clamp(recvBufSize, CacheConfig.MinRecvBufferSize, CacheConfig.MaxRecvBufferSize),
+ MaxHeaderBufferSize = Math.Clamp(maxHeadBufSize, CacheConfig.MinHeaderBufferSize, CacheConfig.MaxHeaderBufferSize),
+
+ MaxMessageSize = maxMessageSizeClamp,
+
+ /*
+ * Response buffer needs to be large enough to store a max message
+ * as a response along with all response headers
+ */
+ MaxResponseBufferSize = (int)MemoryUtil.NearestPage(maxMessageSizeClamp),
+
SyncQueue = nodeQueue
};
Log.Debug("Client recv buffer suggestion {recv}, header buffer size {head}, response buffer size {r}", recvBufCmd, maxHeaderCharCmd, state.MaxResponseBufferSize);
+
+ //Print state message to console
+ Log.Verbose("Client buffer state {state}", state);
//Accept socket and pass state object
entity.AcceptWebSocket(WebsocketAcceptedAsync, state);
@@ -370,6 +461,7 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints
RecvBufferSize = state.RecvBufferSize,
ResponseBufferSize = state.MaxResponseBufferSize,
MaxHeaderBufferSize = state.MaxHeaderBufferSize,
+
HeaderEncoding = Helpers.DefaultEncoding,
};
@@ -395,5 +487,31 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints
}
Log.Debug("Server websocket exited");
}
+
+
+ private sealed class CacheStore : ICacheStore
+ {
+ private readonly ObjectCacheStore _cache;
+
+ public CacheStore(ObjectCacheStore cache)
+ {
+ _cache = cache;
+ }
+
+ ValueTask ICacheStore.AddOrUpdateBlobAsync<T>(string objectId, string? alternateId, GetBodyDataCallback<T> bodyData, T state, CancellationToken token)
+ {
+ return _cache.AddOrUpdateBlobAsync(objectId, alternateId, bodyData, state, token);
+ }
+
+ void ICacheStore.Clear()
+ {
+ throw new NotImplementedException();
+ }
+
+ ValueTask<bool> ICacheStore.DeleteItemAsync(string id, CancellationToken token)
+ {
+ return _cache.DeleteItemAsync(id, token);
+ }
+ }
}
}
diff --git a/plugins/ObjectCacheServer/src/Endpoints/ICacheStore.cs b/plugins/ObjectCacheServer/src/Endpoints/ICacheStore.cs
new file mode 100644
index 0000000..3776269
--- /dev/null
+++ b/plugins/ObjectCacheServer/src/Endpoints/ICacheStore.cs
@@ -0,0 +1,56 @@
+/*
+* Copyright (c) 2022 Vaughn Nugent
+*
+* Library: VNLib
+* Package: ObjectCacheServer
+* File: ConnectEndpoint.cs
+*
+* ConnectEndpoint.cs is part of ObjectCacheServer which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* ObjectCacheServer is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* ObjectCacheServer is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace VNLib.Data.Caching.ObjectCache.Server
+{
+ internal interface ICacheStore
+ {
+ /// <summary>
+ /// Asynchronously adds or updates an object in the store and optionally update's its id
+ /// </summary>
+ /// <param name="objectId">The current (or old) id of the object</param>
+ /// <param name="alternateId">An optional id to update the blob to</param>
+ /// <param name="bodyData">A callback that returns the data for the blob</param>
+ /// <param name="state">The state parameter to pass to the data callback</param>
+ /// <param name="token">A token to cancel the async operation</param>
+ /// <returns>A value task that represents the async operation</returns>
+ ValueTask AddOrUpdateBlobAsync<T>(string objectId, string? alternateId, GetBodyDataCallback<T> bodyData, T state, CancellationToken token = default);
+
+ /// <summary>
+ /// Clears all items from the store
+ /// </summary>
+ void Clear();
+
+ /// <summary>
+ /// Asynchronously deletes a previously stored item
+ /// </summary>
+ /// <param name="id">The id of the object to delete</param>
+ /// <param name="token">A token to cancel the async lock await</param>
+ /// <returns>A task that completes when the item has been deleted</returns>
+ ValueTask<bool> DeleteItemAsync(string id, CancellationToken token = default);
+ }
+}
diff --git a/plugins/ObjectCacheServer/src/ObjectCacheServer.csproj b/plugins/ObjectCacheServer/src/ObjectCacheServer.csproj
index 672597b..38f5b97 100644
--- a/plugins/ObjectCacheServer/src/ObjectCacheServer.csproj
+++ b/plugins/ObjectCacheServer/src/ObjectCacheServer.csproj
@@ -2,32 +2,23 @@
<PropertyGroup>
<TargetFramework>net6.0</TargetFramework>
<Nullable>enable</Nullable>
- <Authors>Vaughn Nugent</Authors>
+ <RootNamespace>VNLib.Data.Caching.ObjectCache.Server</RootNamespace>
<Version>1.0.1.1</Version>
- <RootNamespace>VNLib.Plugins.Essentials.Sessions.Server</RootNamespace>
- <Copyright>Copyright © 2023 Vaughn Nugent</Copyright>
<SignAssembly>True</SignAssembly>
<AssemblyOriginatorKeyFile>\\vaughnnugent.com\Internal\Folder Redirection\vman\Documents\Programming\Software\StrongNameingKey.snk</AssemblyOriginatorKeyFile>
+ <EnableDynamicLoading>true</EnableDynamicLoading>
+ <AnalysisLevel>latest-all</AnalysisLevel>
+ <ProduceReferenceAssembly>True</ProduceReferenceAssembly>
+ <GenerateDocumentationFile>False</GenerateDocumentationFile>
</PropertyGroup>
<!-- Resolve nuget dll files and store them in the output dir -->
<PropertyGroup>
- <EnableDynamicLoading>true</EnableDynamicLoading>
- <GenerateDocumentationFile>False</GenerateDocumentationFile>
- <PackageProjectUrl>https://www.vaughnnugent.com/resources</PackageProjectUrl>
- <AnalysisLevel>latest-all</AnalysisLevel>
- <ProduceReferenceAssembly>True</ProduceReferenceAssembly>
-
+ <Authors>Vaughn Nugent</Authors>
+ <Copyright>Copyright © 2023 Vaughn Nugent</Copyright>
+ <PackageProjectUrl>https://www.vaughnnugent.com/resources/software</PackageProjectUrl>
</PropertyGroup>
<ItemGroup>
- <Compile Remove="liveplugin2\**" />
- <Compile Remove="liveplugin\**" />
- <EmbeddedResource Remove="liveplugin2\**" />
- <EmbeddedResource Remove="liveplugin\**" />
- <None Remove="liveplugin2\**" />
- <None Remove="liveplugin\**" />
- </ItemGroup>
- <ItemGroup>
<PackageReference Include="ErrorProne.NET.CoreAnalyzers" Version="0.1.2">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
@@ -42,10 +33,4 @@
<ProjectReference Include="..\..\..\lib\VNLib.Data.Caching.ObjectCache\src\VNLib.Data.Caching.ObjectCache.csproj" />
<ProjectReference Include="..\..\CacheBroker\src\CacheBroker.csproj" />
</ItemGroup>
- <ItemGroup>
- <None Update="ObjectCacheServer.json">
- <CopyToOutputDirectory>Always</CopyToOutputDirectory>
- </None>
- </ItemGroup>
-
</Project>
diff --git a/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs b/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs
index 7b7b9fb..d6dbd9b 100644
--- a/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs
+++ b/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs
@@ -34,34 +34,67 @@ using System.Threading.Tasks;
using System.Collections.Generic;
using System.Security.Cryptography;
+using VNLib.Plugins;
using VNLib.Utils.Memory;
using VNLib.Utils.Logging;
-using VNLib.Utils.Extensions;
+using VNLib.Utils.Memory.Diagnostics;
using VNLib.Hashing;
using VNLib.Hashing.IdentityUtility;
-using VNLib.Data.Caching;
using VNLib.Data.Caching.Extensions;
-using VNLib.Data.Caching.ObjectCache;
using static VNLib.Data.Caching.Constants;
using VNLib.Net.Messaging.FBM;
using VNLib.Net.Messaging.FBM.Client;
using VNLib.Plugins.Cache.Broker.Endpoints;
using VNLib.Plugins.Extensions.Loading;
using VNLib.Plugins.Extensions.Loading.Routing;
-using VNLib.Plugins.Essentials.Sessions.Server.Endpoints;
-namespace VNLib.Plugins.Essentials.Sessions.Server
+namespace VNLib.Data.Caching.ObjectCache.Server
{
public sealed class ObjectCacheServerEntry : PluginBase
{
public override string PluginName => "ObjectCache.Service";
- private string? BrokerHeartBeatToken;
+ private readonly Lazy<IUnmangedHeap> _cacheHeap;
+ private readonly object ServerLock;
+ private readonly HashSet<ActiveServer> ListeningServers;
+ private readonly ManualResetEvent BrokerSyncHandle;
+
+ /// <summary>
+ /// Gets the shared heap for the plugin
+ /// </summary>
+ internal IUnmangedHeap CacheHeap => _cacheHeap.Value;
- private readonly object ServerLock = new();
- private readonly HashSet<ActiveServer> ListeningServers = new();
+ public ObjectCacheServerEntry()
+ {
+ //Init heap
+ _cacheHeap = new Lazy<IUnmangedHeap>(InitializeHeap, LazyThreadSafetyMode.PublicationOnly);
+
+ ServerLock = new();
+ ListeningServers = new();
+ //Set sync handle
+ BrokerSyncHandle = new(false);
+ }
+
+ private IUnmangedHeap InitializeHeap()
+ {
+ //Create default heap
+ IUnmangedHeap _heap = MemoryUtil.InitializeNewHeapForProcess();
+ try
+ {
+ //If the plugin is in debug mode enable heap tracking
+ return this.IsDebug() ? new TrackedHeapWrapper(_heap) : _heap;
+ }
+ catch
+ {
+ _heap.Dispose();
+ throw;
+ }
+ }
+
+
+ private string? BrokerHeartBeatToken;
private void RemoveServer(ActiveServer server)
{
@@ -71,55 +104,45 @@ namespace VNLib.Plugins.Essentials.Sessions.Server
}
}
+ private FBMClientConfig ClientConfig;
+
+
protected override void OnLoad()
{
- //Create default heap
- IUnmangedHeap CacheHeap = MemoryUtil.InitializeNewHeapForProcess();
try
{
IReadOnlyDictionary<string, JsonElement> clusterConf = this.GetConfig("cluster");
- string brokerAddress = clusterConf["broker_address"].GetString() ?? throw new KeyNotFoundException("Missing required key 'broker_address' for config 'cluster'");
-
- string swapDir = PluginConfig.GetProperty("swap_dir").GetString() ?? throw new KeyNotFoundException("Missing required key 'swap_dir' for config");
- int cacheSize = PluginConfig.GetProperty("max_cache").GetInt32();
- string connectPath = PluginConfig.GetProperty("connect_path").GetString() ?? throw new KeyNotFoundException("Missing required element 'connect_path' for config 'cluster'");
- //TimeSpan cleanupInterval = PluginConfig.GetProperty("cleanup_interval_sec").GetTimeSpan(TimeParseType.Seconds);
- //TimeSpan validFor = PluginConfig.GetProperty("valid_for_sec").GetTimeSpan(TimeParseType.Seconds);
- int maxMessageSize = PluginConfig.GetProperty("max_blob_size").GetInt32();
+ Uri brokerAddress = new(clusterConf["broker_address"].GetString() ?? throw new KeyNotFoundException("Missing required key 'broker_address' for config 'cluster'"));
- //Init dir
- DirectoryInfo dir = new(swapDir);
- dir.Create();
- //Init cache listener, single threaded reader
- ObjectCacheStore CacheListener = new(dir, cacheSize, Log, CacheHeap, true);
-
+
//Init connect endpoint
- {
- //Init connect endpoint
- ConnectEndpoint endpoint = new(connectPath, CacheListener, this);
- Route(endpoint);
- }
-
+ ConnectEndpoint endpoint = this.Route<ConnectEndpoint>();
+
+ //Get the cache store from the connection endpoint
+ ICacheStore store = endpoint.GetCacheStore();
+
+ //Log max memory usage
+ Log.Debug("Maxium memory consumption {mx}Mb", ((ulong)endpoint.CacheConfig.MaxCacheEntries * (ulong)endpoint.CacheConfig.MaxMessageSize) / (ulong)(1024 * 1000));
+
//Setup broker and regitration
{
- //init mre to pass the broker heartbeat signal to the registration worker
- ManualResetEvent mre = new(false);
+
//Route the broker endpoint
- BrokerHeartBeat brokerEp = new(() => BrokerHeartBeatToken!, mre, new Uri(brokerAddress), this);
+ BrokerHeartBeat brokerEp = new(() => BrokerHeartBeatToken!, BrokerSyncHandle, brokerAddress, this);
Route(brokerEp);
//start registration
- _ = this.DeferTask(() => RegisterServerAsync(mre), 200);
+ _ = this.ObserveTask(() => RegisterServerAsync(endpoint.Path), 200);
}
//Setup cluster worker
{
//Get pre-configured fbm client config for caching
- FBMClientConfig conf = FBMDataCacheExtensions.GetDefaultConfig(CacheHeap, maxMessageSize, this.IsDebug() ? Log : null);
+ ClientConfig = FBMDataCacheExtensions.GetDefaultConfig(CacheHeap, endpoint.CacheConfig.MaxMessageSize / 2, this.IsDebug() ? Log : null);
//Start Client runner
- _ = this.DeferTask(() => RunClientAsync(CacheListener, new Uri(brokerAddress), conf), 300);
+ _ = this.ObserveTask(() => RunClientAsync(store, brokerAddress), 300);
}
//Load a cache broker to the current server if the config is defined
@@ -129,38 +152,32 @@ namespace VNLib.Plugins.Essentials.Sessions.Server
this.Route<BrokerRegistrationEndpoint>();
}
}
-
- void Cleanup()
- {
- CacheHeap.Dispose();
- CacheListener.Dispose();
- }
-
- //Regsiter cleanup
- _ = UnloadToken.RegisterUnobserved(Cleanup);
Log.Information("Plugin loaded");
}
catch (KeyNotFoundException kne)
{
- CacheHeap.Dispose();
Log.Error("Missing required configuration variables {m}", kne.Message);
}
- catch
- {
- CacheHeap.Dispose();
- throw;
- }
}
protected override void OnUnLoad()
{
+ //dispose heap if initialized
+ if(_cacheHeap.IsValueCreated)
+ {
+ _cacheHeap.Value.Dispose();
+ }
+
+ //Dispose mre sync handle
+ BrokerSyncHandle.Dispose();
+
Log.Information("Plugin unloaded");
}
#region Registration
- private async Task RegisterServerAsync(ManualResetEvent keepaliveWait)
+ private async Task RegisterServerAsync(string connectPath)
{
try
{
@@ -169,9 +186,9 @@ namespace VNLib.Plugins.Essentials.Sessions.Server
//Server id is just dns name for now
string serverId = Dns.GetHostName();
- int heartBeatDelayMs = clusterConfig["heartbeat_timeout_sec"].GetInt32() * 1000;
- string? connectPath = PluginConfig.GetProperty("connect_path").GetString();
+ int heartBeatDelayMs = clusterConfig["heartbeat_timeout_sec"].GetInt32() * 1000;
+
//Get the port of the primary webserver
int port;
@@ -227,15 +244,17 @@ namespace VNLib.Plugins.Essentials.Sessions.Server
while (true)
{
await Task.Delay(heartBeatDelayMs, UnloadToken);
+
//Set the timeout to 0 to it will just check the status without blocking
- if (!keepaliveWait.WaitOne(0))
+ if (!BrokerSyncHandle.WaitOne(0))
{
//server miseed a keepalive event, time to break the loop and retry
Log.Debug("Broker missed a heartbeat request, attempting to re-register");
break;
}
+
//Reset the msr
- keepaliveWait.Reset();
+ BrokerSyncHandle.Reset();
}
}
catch (TaskCanceledException)
@@ -275,7 +294,6 @@ namespace VNLib.Plugins.Essentials.Sessions.Server
}
finally
{
- keepaliveWait.Dispose();
BrokerHeartBeatToken = null;
}
Log.Debug("Registration worker exited");
@@ -305,20 +323,25 @@ namespace VNLib.Plugins.Essentials.Sessions.Server
/// <param name="serverId">The node-id of the current server</param>
/// <param name="clientConf">The configuration to use when initializing synchronization clients</param>
/// <returns>A task that resolves when the plugin unloads</returns>
- private async Task RunClientAsync(ObjectCacheStore cacheStore, Uri brokerAddress, FBMClientConfig clientConf)
+ private async Task RunClientAsync(ICacheStore cacheStore, Uri brokerAddress)
{
TimeSpan noServerDelay = TimeSpan.FromSeconds(10);
+
+ //The node id is just the dns hostname of the current machine
string nodeId = Dns.GetHostName();
+
ListServerRequest listRequest = new(brokerAddress);
try
{
//Get the broker config element
IReadOnlyDictionary<string, JsonElement> clusterConf = this.GetConfig("cluster");
+
int serverCheckMs = clusterConf["update_interval_sec"].GetInt32() * 1000;
//Setup signing and verification certificates
ReadOnlyJsonWebKey cacheSig = await GetCachePrivate();
ReadOnlyJsonWebKey brokerPub = await GetBrokerPublic();
+
//Import certificates
listRequest.WithVerificationKey(brokerPub)
.WithSigningKey(cacheSig);
@@ -340,6 +363,7 @@ namespace VNLib.Plugins.Essentials.Sessions.Server
//Get server list
servers = await FBMDataCacheExtensions.ListServersAsync(listRequest, UnloadToken);
+
//Servers are loaded, so continue
break;
}
@@ -355,8 +379,10 @@ namespace VNLib.Plugins.Essentials.Sessions.Server
{
Log.Warn(ex, "Failed to get server list from broker");
}
+
//Gen random ms delay
int randomMsDelay = RandomNumberGenerator.GetInt32(1000, 2000);
+
//Delay
await Task.Delay(randomMsDelay, UnloadToken);
}
@@ -386,7 +412,7 @@ namespace VNLib.Plugins.Essentials.Sessions.Server
ListeningServers.Add(server);
//Run listener background task
- _ = this.DeferTask(() => RunSyncTaskAsync(server, cacheStore, clientConf, nodeId));
+ _ = this.ObserveTask(() => RunSyncTaskAsync(server, cacheStore, nodeId));
}
}
}
@@ -417,10 +443,10 @@ namespace VNLib.Plugins.Essentials.Sessions.Server
Log.Debug("Cluster sync worker exited");
}
- private async Task RunSyncTaskAsync(ActiveServer server, ObjectCacheStore cacheStore, FBMClientConfig conf, string nodeId)
+ private async Task RunSyncTaskAsync(ActiveServer server, ICacheStore cacheStore, string nodeId)
{
//Setup client
- FBMClient client = new(conf);
+ FBMClient client = new(ClientConfig);
try
{
async Task UpdateRecordAsync(string objectId, string newId)
@@ -440,7 +466,7 @@ namespace VNLib.Plugins.Essentials.Sessions.Server
response.ThrowIfNotSet();
//Check response code
- string status = response.Headers.First(static s => s.Key == HeaderCommand.Status).Value.ToString();
+ string status = response.Headers.First(static s => s.Header == HeaderCommand.Status).Value.ToString();
if (ResponseCodes.Okay.Equals(status, StringComparison.Ordinal))
{
//Update the record