aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLibravatar vman <public@vaughnnugent.com>2022-12-15 01:45:03 -0500
committerLibravatar vman <public@vaughnnugent.com>2022-12-15 01:45:03 -0500
commit1f2b3530ebeafa162fe4df41e691c33cb2ff0009 (patch)
tree7f60d7c761cee2df89303c3ef0550743790a63e2
parenta0d5a8d40de9806e21e64475e3297a2a84effe22 (diff)
JWK sigs, session cleanup v1
-rw-r--r--Libs/VNLib.Plugins.Essentials.Sessions.OAuth/Endpoints/AccessTokenEndpoint.cs12
-rw-r--r--Libs/VNLib.Plugins.Essentials.Sessions.OAuth/O2SessionProviderEntry.cs11
-rw-r--r--Libs/VNLib.Plugins.Essentials.Sessions.OAuth/OAuth2SessionProvider.cs22
-rw-r--r--Libs/VNLib.Plugins.Essentials.Sessions.OAuth/VNLib.Plugins.Essentials.Sessions.OAuth.csproj2
-rw-r--r--Libs/VNLib.Plugins.Essentials.Sessions.Runtime/VNLib.Plugins.Essentials.Sessions.Runtime.csproj4
-rw-r--r--Libs/VNLib.Plugins.Essentials.Sessions.Runtime/VnCacheClient.cs35
-rw-r--r--Libs/VNLib.Plugins.Essentials.Sessions.VNCache/VNLib.Plugins.Essentials.Sessions.VNCache.csproj2
-rw-r--r--Libs/VNLib.Plugins.Essentials.Sessions.VNCache/WebSessionProvider.cs22
-rw-r--r--Libs/VNLib.Plugins.Essentials.Sessions.VNCache/WebSessionProviderEntry.cs6
-rw-r--r--Libs/VNLib.Plugins.Essentials.Sessions/MemorySession.cs30
-rw-r--r--Libs/VNLib.Plugins.Essentials.Sessions/MemorySessionEntrypoint.cs3
-rw-r--r--Libs/VNLib.Plugins.Essentials.Sessions/MemorySessionStore.cs34
-rw-r--r--Libs/VNLib.Plugins.Essentials.Sessions/VNLib.Plugins.Essentials.Sessions.Memory.csproj4
-rw-r--r--Libs/VNLib.Plugins.Sessions.Cache.Client/Exceptions/SessionStatusException.cs3
-rw-r--r--Libs/VNLib.Plugins.Sessions.Cache.Client/RemoteSession.cs17
-rw-r--r--Libs/VNLib.Plugins.Sessions.Cache.Client/SessionCacheClient.cs123
-rw-r--r--Libs/VNLib.Plugins.Sessions.Cache.Client/VNLib.Plugins.Sessions.Cache.Client.csproj3
-rw-r--r--Plugins/CacheBroker/CacheBroker.csproj2
-rw-r--r--Plugins/CacheBroker/Endpoints/BrokerRegistrationEndpoint.cs163
-rw-r--r--Plugins/SessionCacheServer/Endpoints/BrokerHeartBeat.cs38
-rw-r--r--Plugins/SessionCacheServer/Endpoints/ConnectEndpoint.cs136
-rw-r--r--Plugins/SessionCacheServer/ObjectCacheServer.csproj2
-rw-r--r--Plugins/SessionCacheServer/ObjectCacheServerEntry.cs137
-rw-r--r--Plugins/SessionProvider/SessionProvider.csproj2
-rw-r--r--Plugins/SessionProvider/null1
25 files changed, 480 insertions, 334 deletions
diff --git a/Libs/VNLib.Plugins.Essentials.Sessions.OAuth/Endpoints/AccessTokenEndpoint.cs b/Libs/VNLib.Plugins.Essentials.Sessions.OAuth/Endpoints/AccessTokenEndpoint.cs
index a159456..d968398 100644
--- a/Libs/VNLib.Plugins.Essentials.Sessions.OAuth/Endpoints/AccessTokenEndpoint.cs
+++ b/Libs/VNLib.Plugins.Essentials.Sessions.OAuth/Endpoints/AccessTokenEndpoint.cs
@@ -30,12 +30,12 @@ using VNLib.Utils.Memory;
using VNLib.Hashing.IdentityUtility;
using VNLib.Plugins.Essentials.Oauth;
using VNLib.Plugins.Essentials.Endpoints;
+using VNLib.Plugins.Essentials.Oauth.Tokens;
using VNLib.Plugins.Essentials.Oauth.Applications;
using VNLib.Plugins.Essentials.Extensions;
using VNLib.Plugins.Extensions.Loading;
using VNLib.Plugins.Extensions.Loading.Sql;
using VNLib.Plugins.Extensions.Validation;
-using VNLib.Plugins.Essentials.Oauth.Tokens;
namespace VNLib.Plugins.Essentials.Sessions.OAuth.Endpoints
{
@@ -56,9 +56,9 @@ namespace VNLib.Plugins.Essentials.Sessions.OAuth.Endpoints
//override protection settings to allow most connections to authenticate
protected override ProtectionSettings EndpointProtectionSettings { get; } = new()
{
- BrowsersOnly = false,
- SessionsRequired = false,
- VerifySessionCors = false
+ DisableBrowsersOnly = true,
+ DisableSessionsRequired = true,
+ DisableVerifySessionCors = true
};
public AccessTokenEndpoint(string path, PluginBase pbase, CreateTokenImpl tokenStore, Task<JsonDocument?> verificationKey)
@@ -117,7 +117,7 @@ namespace VNLib.Plugins.Essentials.Sessions.OAuth.Endpoints
secret = secret.ToLower();
//Convert secret to private string that is unreferrenced
- PrivateString secretPv = new(secret, false);
+ using PrivateString secretPv = new(secret, false);
//Get the application from apps store
UserApplication? app = await Applications.VerifyAppAsync(clientId, secretPv);
@@ -171,7 +171,7 @@ namespace VNLib.Plugins.Essentials.Sessions.OAuth.Endpoints
if (result == null)
{
- entity.CloseResponseError(HttpStatusCode.ServiceUnavailable, ErrorType.TemporarilyUnabavailable, "You have reached the maximum number of valid tokens for this application");
+ entity.CloseResponseError(HttpStatusCode.TooManyRequests, ErrorType.TemporarilyUnabavailable, "You have reached the maximum number of valid tokens for this application");
return VfReturnType.VirtualSkip;
}
diff --git a/Libs/VNLib.Plugins.Essentials.Sessions.OAuth/O2SessionProviderEntry.cs b/Libs/VNLib.Plugins.Essentials.Sessions.OAuth/O2SessionProviderEntry.cs
index 07b6530..f4462a4 100644
--- a/Libs/VNLib.Plugins.Essentials.Sessions.OAuth/O2SessionProviderEntry.cs
+++ b/Libs/VNLib.Plugins.Essentials.Sessions.OAuth/O2SessionProviderEntry.cs
@@ -27,16 +27,17 @@ using System.Text.Json;
using VNLib.Net.Http;
using VNLib.Utils.Logging;
using VNLib.Utils.Extensions;
+using VNLib.Data.Caching.Extensions;
using VNLib.Plugins.Essentials.Oauth.Tokens;
using VNLib.Plugins.Essentials.Oauth.Applications;
using VNLib.Plugins.Essentials.Sessions.OAuth;
+using VNLib.Plugins.Essentials.Sessions.Runtime;
using VNLib.Plugins.Essentials.Sessions.OAuth.Endpoints;
using VNLib.Plugins.Extensions.Loading;
using VNLib.Plugins.Extensions.Loading.Routing;
using VNLib.Plugins.Extensions.Loading.Sql;
using VNLib.Plugins.Extensions.Loading.Events;
-using VNLib.Plugins.Essentials.Sessions.Runtime;
-using VNLib.Data.Caching.Extensions;
+
namespace VNLib.Plugins.Essentials.Sessions.Oauth
{
@@ -69,7 +70,7 @@ namespace VNLib.Plugins.Essentials.Sessions.Oauth
//Optional application jwt token
Task<JsonDocument?> jwtTokenSecret = plugin.TryGetSecretAsync("application_token_key")
- .ContinueWith(static t => t.Result == null ? null : JsonDocument.Parse(t.Result), TaskScheduler.Default);
+ .ContinueWith(static t => t.Result == null ? null : t.Result.GetJsonDocument(), TaskScheduler.Default);
//Access token endpoint is optional
if (oauth2Config.TryGetValue("token_path", out JsonElement el))
@@ -107,7 +108,7 @@ namespace VNLib.Plugins.Essentials.Sessions.Oauth
IReadOnlyDictionary<string, JsonElement> oauth2Config)
{
//Init cache client
- using VnCacheClient cache = new(plugin.IsDebug() ? plugin.Log : null, Utils.Memory.Memory.Shared);
+ using VnCacheClient cache = new(plugin.IsDebug() ? localized : null, Utils.Memory.Memory.Shared);
try
{
@@ -125,7 +126,7 @@ namespace VNLib.Plugins.Essentials.Sessions.Oauth
await cache.LoadConfigAsync(plugin, cacheConfig);
//Init session provider now that client is loaded
- _sessions = new(cache.Resource!, cacheLimit, idProv, plugin.GetContextOptions());
+ _sessions = new(cache.Resource!, cacheLimit, 100, idProv, plugin.GetContextOptions());
//Schedule cleanup interval with the plugin scheduler
plugin.ScheduleInterval(_sessions, cleanupInterval);
diff --git a/Libs/VNLib.Plugins.Essentials.Sessions.OAuth/OAuth2SessionProvider.cs b/Libs/VNLib.Plugins.Essentials.Sessions.OAuth/OAuth2SessionProvider.cs
index d698c81..106029f 100644
--- a/Libs/VNLib.Plugins.Essentials.Sessions.OAuth/OAuth2SessionProvider.cs
+++ b/Libs/VNLib.Plugins.Essentials.Sessions.OAuth/OAuth2SessionProvider.cs
@@ -55,18 +55,20 @@ namespace VNLib.Plugins.Essentials.Sessions.OAuth
private readonly IOauthSessionIdFactory factory;
private readonly TokenStore TokenStore;
-
- public OAuth2SessionProvider(FBMClient client, int maxCacheItems, IOauthSessionIdFactory idFactory, DbContextOptions dbCtx)
+ private readonly uint MaxConnections;
+
+ public OAuth2SessionProvider(FBMClient client, int maxCacheItems, uint maxConnections, IOauthSessionIdFactory idFactory, DbContextOptions dbCtx)
: base(client, maxCacheItems)
{
factory = idFactory;
TokenStore = new(dbCtx);
+ MaxConnections = maxConnections;
}
///<inheritdoc/>
- protected override RemoteSession SessionCtor(string sessionId) => new OAuth2Session(sessionId, Client, BackgroundTimeout, InvlidatateCache);
+ protected override RemoteSession SessionCtor(string sessionId) => new OAuth2Session(sessionId, Client, BackgroundTimeout, InvalidatateCache);
- private void InvlidatateCache(OAuth2Session session)
+ private void InvalidatateCache(OAuth2Session session)
{
lock (CacheLock)
{
@@ -91,6 +93,14 @@ namespace VNLib.Plugins.Essentials.Sessions.OAuth
return SessionHandle.Empty;
}
+ //Limit max number of waiting clients
+ if (WaitingConnections > MaxConnections)
+ {
+ //Set 503 for temporary unavail
+ entity.CloseResponse(System.Net.HttpStatusCode.ServiceUnavailable);
+ return new SessionHandle(null, FileProcessArgs.VirtualSkip, null);
+ }
+
//Recover the session
RemoteSession session = await base.GetSessionAsync(entity, sessionId, cancellationToken);
@@ -174,14 +184,14 @@ namespace VNLib.Plugins.Essentials.Sessions.OAuth
/*
- * Interval for remving expired tokens
+ * Interval for removing expired tokens
*/
///<inheritdoc/>
async Task IIntervalScheduleable.OnIntervalAsync(ILogProvider log, CancellationToken cancellationToken)
{
//Calculate valid token time
- DateTimeOffset validAfter = DateTimeOffset.UtcNow.Subtract(factory.SessionValidFor);
+ DateTime validAfter = DateTime.UtcNow.Subtract(factory.SessionValidFor);
//Remove tokens from db store
IReadOnlyCollection<ActiveToken> revoked = await TokenStore.CleanupExpiredTokensAsync(validAfter, cancellationToken);
//exception list
diff --git a/Libs/VNLib.Plugins.Essentials.Sessions.OAuth/VNLib.Plugins.Essentials.Sessions.OAuth.csproj b/Libs/VNLib.Plugins.Essentials.Sessions.OAuth/VNLib.Plugins.Essentials.Sessions.OAuth.csproj
index d75a1c0..e9927b5 100644
--- a/Libs/VNLib.Plugins.Essentials.Sessions.OAuth/VNLib.Plugins.Essentials.Sessions.OAuth.csproj
+++ b/Libs/VNLib.Plugins.Essentials.Sessions.OAuth/VNLib.Plugins.Essentials.Sessions.OAuth.csproj
@@ -31,7 +31,7 @@
<Exec Command="erase &quot;F:\Programming\Web Plugins\DevPlugins\RuntimeAssets\$(TargetName)&quot; /q &gt; nul" />
</Target>
<ItemGroup>
- <ProjectReference Include="..\..\..\..\VNLib\Http\VNLib.Net.Http.csproj" />
+ <ProjectReference Include="..\..\..\..\VNLib\Http\src\VNLib.Net.Http.csproj" />
<ProjectReference Include="..\..\..\..\VNLib\Plugins\src\VNLib.Plugins.csproj" />
<ProjectReference Include="..\..\..\..\VNLib\Utils\src\VNLib.Utils.csproj" />
<ProjectReference Include="..\..\..\DataCaching\VNLib.Data.Caching.Extensions\VNLib.Data.Caching.Extensions.csproj" />
diff --git a/Libs/VNLib.Plugins.Essentials.Sessions.Runtime/VNLib.Plugins.Essentials.Sessions.Runtime.csproj b/Libs/VNLib.Plugins.Essentials.Sessions.Runtime/VNLib.Plugins.Essentials.Sessions.Runtime.csproj
index d72d6b9..cf6d0c9 100644
--- a/Libs/VNLib.Plugins.Essentials.Sessions.Runtime/VNLib.Plugins.Essentials.Sessions.Runtime.csproj
+++ b/Libs/VNLib.Plugins.Essentials.Sessions.Runtime/VNLib.Plugins.Essentials.Sessions.Runtime.csproj
@@ -14,8 +14,8 @@
</PropertyGroup>
<ItemGroup>
- <ProjectReference Include="..\..\..\..\VNLib\Essentials\VNLib.Plugins.Essentials.csproj" />
- <ProjectReference Include="..\..\..\..\VNLib\Http\VNLib.Net.Http.csproj" />
+ <ProjectReference Include="..\..\..\..\VNLib\Essentials\src\VNLib.Plugins.Essentials.csproj" />
+ <ProjectReference Include="..\..\..\..\VNLib\Http\src\VNLib.Net.Http.csproj" />
<ProjectReference Include="..\..\..\DataCaching\VNLib.Data.Caching.Extensions\VNLib.Data.Caching.Extensions.csproj" />
<ProjectReference Include="..\..\..\Extensions\VNLib.Plugins.Extensions.Loading\VNLib.Plugins.Extensions.Loading.csproj" />
<ProjectReference Include="..\..\..\PluginBase\VNLib.Plugins.PluginBase.csproj" />
diff --git a/Libs/VNLib.Plugins.Essentials.Sessions.Runtime/VnCacheClient.cs b/Libs/VNLib.Plugins.Essentials.Sessions.Runtime/VnCacheClient.cs
index 3b348d2..2f7bdf2 100644
--- a/Libs/VNLib.Plugins.Essentials.Sessions.Runtime/VnCacheClient.cs
+++ b/Libs/VNLib.Plugins.Essentials.Sessions.Runtime/VnCacheClient.cs
@@ -28,13 +28,14 @@ using System.Net.Sockets;
using System.Net.WebSockets;
using System.Security.Cryptography;
-using VNLib.Utils;
using VNLib.Utils.Memory;
using VNLib.Utils.Logging;
+using VNLib.Utils.Resources;
using VNLib.Utils.Extensions;
using VNLib.Data.Caching.Extensions;
using VNLib.Net.Messaging.FBM.Client;
using VNLib.Plugins.Extensions.Loading;
+using VNLib.Hashing.IdentityUtility;
namespace VNLib.Plugins.Essentials.Sessions.Runtime
{
@@ -70,7 +71,8 @@ namespace VNLib.Plugins.Essentials.Sessions.Runtime
ClientHeap = heap;
}
-
+
+ ///<inheritdoc/>
protected override void Free()
{
_client?.Dispose();
@@ -90,14 +92,20 @@ namespace VNLib.Plugins.Essentials.Sessions.Runtime
string? brokerAddress = config["broker_address"].GetString() ?? throw new KeyNotFoundException("Missing required configuration variable broker_address");
//Get keys async
- Task<string?> clientPrivTask = pbase.TryGetSecretAsync("client_private_key");
- Task<string?> brokerPubTask = pbase.TryGetSecretAsync("broker_public_key");
+ Task<SecretResult?> clientPrivTask = pbase.TryGetSecretAsync("client_private_key");
+ Task<SecretResult?> brokerPubTask = pbase.TryGetSecretAsync("broker_public_key");
+ Task<SecretResult?> cachePubTask = pbase.TryGetSecretAsync("cache_public_key");
//Wait for all tasks to complete
- string?[] keys = await Task.WhenAll(clientPrivTask, brokerPubTask);
+ _ = await Task.WhenAll(clientPrivTask, brokerPubTask, cachePubTask);
+
+ using SecretResult clientPriv = await clientPrivTask ?? throw new KeyNotFoundException("Missing required secret client_private_key");
+ using SecretResult brokerPub = await brokerPubTask ?? throw new KeyNotFoundException("Missing required secret broker_public_key");
+ using SecretResult cachePub = await cachePubTask ?? throw new KeyNotFoundException("Missing required secret cache_public_key");
- byte[] privKey = Convert.FromBase64String(keys[0] ?? throw new KeyNotFoundException("Missing required secret client_private_key"));
- byte[] brokerPub = Convert.FromBase64String(keys[1] ?? throw new KeyNotFoundException("Missing required secret broker_public_key"));
+ ReadOnlyJsonWebKey clientCert = clientPriv.GetJsonWebKey();
+ ReadOnlyJsonWebKey brokerPubKey = brokerPub.GetJsonWebKey();
+ ReadOnlyJsonWebKey cachePubKey = cachePub.GetJsonWebKey();
RetryInterval = config["retry_interval_sec"].GetTimeSpan(TimeParseType.Seconds);
@@ -107,16 +115,14 @@ namespace VNLib.Plugins.Essentials.Sessions.Runtime
FBMClientConfig conf = FBMDataCacheExtensions.GetDefaultConfig(ClientHeap ?? Memory.Shared, maxMessageSize, DebugLog);
_client = new(conf);
- //Add the configuration
+
+ //Add the configuration to the client
_client.GetCacheConfiguration()
.WithBroker(brokerUri)
- .ImportVerificationKey(brokerPub)
- .ImportSigningKey(privKey)
+ .WithVerificationKey(cachePubKey)
+ .WithSigningCertificate(clientCert)
+ .WithBrokerVerificationKey(brokerPubKey)
.WithTls(brokerUri.Scheme == Uri.UriSchemeHttps);
-
- //Zero the key memory
- Memory.InitializeBlock(privKey.AsSpan());
- Memory.InitializeBlock(brokerPub.AsSpan());
}
/// <summary>
@@ -157,6 +163,7 @@ namespace VNLib.Plugins.Essentials.Sessions.Runtime
int randomMsDelay = RandomNumberGenerator.GetInt32(1000, 2000);
await Task.Delay(randomMsDelay, cancellationToken);
}
+
if (servers?.Length == 0)
{
Log.Warn("No cluster nodes found, retrying");
diff --git a/Libs/VNLib.Plugins.Essentials.Sessions.VNCache/VNLib.Plugins.Essentials.Sessions.VNCache.csproj b/Libs/VNLib.Plugins.Essentials.Sessions.VNCache/VNLib.Plugins.Essentials.Sessions.VNCache.csproj
index 8c3e23a..dc73ed8 100644
--- a/Libs/VNLib.Plugins.Essentials.Sessions.VNCache/VNLib.Plugins.Essentials.Sessions.VNCache.csproj
+++ b/Libs/VNLib.Plugins.Essentials.Sessions.VNCache/VNLib.Plugins.Essentials.Sessions.VNCache.csproj
@@ -21,7 +21,7 @@
</ItemGroup>
<ItemGroup>
- <ProjectReference Include="..\..\..\..\VNLib\Http\VNLib.Net.Http.csproj" />
+ <ProjectReference Include="..\..\..\..\VNLib\Http\src\VNLib.Net.Http.csproj" />
<ProjectReference Include="..\..\..\..\VNLib\Utils\src\VNLib.Utils.csproj" />
<ProjectReference Include="..\..\..\DataCaching\VNLib.Data.Caching.Extensions\VNLib.Data.Caching.Extensions.csproj" />
<ProjectReference Include="..\..\..\DataCaching\VNLib.Data.Caching\src\VNLib.Data.Caching.csproj" />
diff --git a/Libs/VNLib.Plugins.Essentials.Sessions.VNCache/WebSessionProvider.cs b/Libs/VNLib.Plugins.Essentials.Sessions.VNCache/WebSessionProvider.cs
index 057e308..5747175 100644
--- a/Libs/VNLib.Plugins.Essentials.Sessions.VNCache/WebSessionProvider.cs
+++ b/Libs/VNLib.Plugins.Essentials.Sessions.VNCache/WebSessionProvider.cs
@@ -71,9 +71,6 @@ namespace VNLib.Plugins.Essentials.Sessions.VNCache
protected override RemoteSession SessionCtor(string sessionId) => new WebSession(sessionId, Client, BackgroundUpdateTimeout, UpdateSessionId);
-
- private uint _waitingCount;
-
public async ValueTask<SessionHandle> GetSessionAsync(IHttpEvent entity, CancellationToken cancellationToken)
{
//Callback to close the session when the handle is closeed
@@ -92,27 +89,15 @@ namespace VNLib.Plugins.Essentials.Sessions.VNCache
}
//Limit max number of waiting clients
- if (_waitingCount > MaxConnections)
+ if (WaitingConnections > MaxConnections)
{
//Set 503 for temporary unavail
entity.CloseResponse(System.Net.HttpStatusCode.ServiceUnavailable);
return new SessionHandle(null, FileProcessArgs.VirtualSkip, null);
}
- RemoteSession session;
-
- //Inc waiting count
- Interlocked.Increment(ref _waitingCount);
- try
- {
- //Recover the session
- session = await GetSessionAsync(entity, sessionId, cancellationToken);
- }
- finally
- {
- //Dec on exit
- Interlocked.Decrement(ref _waitingCount);
- }
+ //Get session
+ RemoteSession session = await GetSessionAsync(entity, sessionId, cancellationToken);
//If the session is new (not in cache), then overwrite the session id with a new one as user may have specified their own
if (session.IsNew)
@@ -131,6 +116,7 @@ namespace VNLib.Plugins.Essentials.Sessions.VNCache
session.Privilages = 0;
session.SetLoginToken(null);
}
+
return new SessionHandle(session, HandleClosedAsync);
}
catch (OperationCanceledException)
diff --git a/Libs/VNLib.Plugins.Essentials.Sessions.VNCache/WebSessionProviderEntry.cs b/Libs/VNLib.Plugins.Essentials.Sessions.VNCache/WebSessionProviderEntry.cs
index 382717b..8e25416 100644
--- a/Libs/VNLib.Plugins.Essentials.Sessions.VNCache/WebSessionProviderEntry.cs
+++ b/Libs/VNLib.Plugins.Essentials.Sessions.VNCache/WebSessionProviderEntry.cs
@@ -84,7 +84,7 @@ namespace VNLib.Plugins.Essentials.Sessions.VNCache
IReadOnlyDictionary<string, JsonElement> webSessionConfig)
{
//Init cache client
- using VnCacheClient cache = new(plugin.IsDebug() ? plugin.Log : null, Memory.Shared);
+ using VnCacheClient cache = new(plugin.IsDebug() ? localized : null, Memory.Shared);
try
{
@@ -97,9 +97,11 @@ namespace VNLib.Plugins.Essentials.Sessions.VNCache
//Init provider
_sessions = new(cache.Resource!, cacheLimit, maxConnections, idFactory);
-
localized.Information("Session provider loaded");
+ //Listen for cache table events
+ _ = plugin.DeferTask(() => _sessions.CleanupExpiredSessionsAsync(localized, plugin.UnloadToken));
+
//Run and wait for exit
await cache.RunAsync(localized, plugin.UnloadToken);
diff --git a/Libs/VNLib.Plugins.Essentials.Sessions/MemorySession.cs b/Libs/VNLib.Plugins.Essentials.Sessions/MemorySession.cs
index a806438..d365cd2 100644
--- a/Libs/VNLib.Plugins.Essentials.Sessions/MemorySession.cs
+++ b/Libs/VNLib.Plugins.Essentials.Sessions/MemorySession.cs
@@ -28,23 +28,27 @@ using System.Threading.Tasks;
using System.Collections.Generic;
using VNLib.Plugins.Essentials.Extensions;
+using VNLib.Utils.Async;
using VNLib.Net.Http;
+using VNLib.Utils.Memory.Caching;
using static VNLib.Plugins.Essentials.Sessions.ISessionExtensions;
#nullable enable
namespace VNLib.Plugins.Essentials.Sessions.Memory
{
- internal class MemorySession : SessionBase
+ internal class MemorySession : SessionBase, ICacheable
{
private readonly Dictionary<string, string> DataStorage;
private readonly Func<IHttpEvent, string, string> OnSessionUpdate;
+ private readonly AsyncQueue<MemorySession> ExpiredTable;
- public MemorySession(string sessionId, IPAddress ipAddress, Func<IHttpEvent, string, string> onSessionUpdate)
+ public MemorySession(string sessionId, IPAddress ipAddress, Func<IHttpEvent, string, string> onSessionUpdate, AsyncQueue<MemorySession> expired)
{
//Set the initial is-new flag
DataStorage = new Dictionary<string, string>(10);
+ ExpiredTable = expired;
OnSessionUpdate = onSessionUpdate;
//Get new session id
@@ -90,14 +94,7 @@ namespace VNLib.Plugins.Essentials.Sessions.Memory
//Memory session always completes
return ValueTask.FromResult<Task?>(null);
}
-
- protected override Task OnEvictedAsync()
- {
- //Clear all session data
- DataStorage.Clear();
- return Task.CompletedTask;
- }
-
+
protected override string IndexerGet(string key)
{
return DataStorage.GetValueOrDefault(key, string.Empty);
@@ -116,5 +113,18 @@ namespace VNLib.Plugins.Essentials.Sessions.Memory
}
DataStorage[key] = value;
}
+
+
+ DateTime ICacheable.Expires { get; set; }
+
+ void ICacheable.Evicted()
+ {
+ DataStorage.Clear();
+ //Enque cleanup
+ _ = ExpiredTable.TryEnque(this);
+ }
+
+ bool IEquatable<ICacheable>.Equals(ICacheable? other) => other is ISession ses && SessionID.Equals(ses.SessionID, StringComparison.Ordinal);
+
}
}
diff --git a/Libs/VNLib.Plugins.Essentials.Sessions/MemorySessionEntrypoint.cs b/Libs/VNLib.Plugins.Essentials.Sessions/MemorySessionEntrypoint.cs
index 91a3a0e..f58129a 100644
--- a/Libs/VNLib.Plugins.Essentials.Sessions/MemorySessionEntrypoint.cs
+++ b/Libs/VNLib.Plugins.Essentials.Sessions/MemorySessionEntrypoint.cs
@@ -73,6 +73,9 @@ namespace VNLib.Plugins.Essentials.Sessions.Memory
_sessions = new(config);
+ //Begin listening for expired records
+ _ = plugin.DeferTask(() => _sessions.CleanupExiredAsync(localized, plugin.UnloadToken));
+
//Schedule garbage collector
_ = plugin.ScheduleInterval(this, TimeSpan.FromMinutes(1));
diff --git a/Libs/VNLib.Plugins.Essentials.Sessions/MemorySessionStore.cs b/Libs/VNLib.Plugins.Essentials.Sessions/MemorySessionStore.cs
index e76d7a4..1af885e 100644
--- a/Libs/VNLib.Plugins.Essentials.Sessions/MemorySessionStore.cs
+++ b/Libs/VNLib.Plugins.Essentials.Sessions/MemorySessionStore.cs
@@ -28,11 +28,12 @@ using System.Threading.Tasks;
using System.Collections.Generic;
using VNLib.Net.Http;
+using VNLib.Net.Http.Core;
using VNLib.Utils;
using VNLib.Utils.Async;
+using VNLib.Utils.Logging;
using VNLib.Utils.Extensions;
using VNLib.Plugins.Essentials.Extensions;
-using VNLib.Net.Http.Core;
#nullable enable
@@ -47,12 +48,14 @@ namespace VNLib.Plugins.Essentials.Sessions.Memory
internal readonly MemorySessionConfig Config;
internal readonly SessionIdFactory IdFactory;
+ internal readonly AsyncQueue<MemorySession> ExpiredSessions;
public MemorySessionStore(MemorySessionConfig config)
{
Config = config;
SessionsStore = new(config.MaxAllowedSessions, StringComparer.Ordinal);
IdFactory = new(config.SessionIdSizeBytes, config.SessionCookieID, config.SessionTimeout);
+ ExpiredSessions = new(false, true);
}
///<inheritdoc/>
@@ -68,7 +71,7 @@ namespace VNLib.Plugins.Essentials.Sessions.Memory
if (IdFactory.TryGetSessionId(entity, out string? sessionId))
{
//Try to get the old record or evict it
- ERRNO result = SessionsStore.TryGetOrEvictRecord(sessionId, out MemorySession session);
+ ERRNO result = SessionsStore.TryGetOrEvictRecord(sessionId, out MemorySession? session);
if(result > 0)
{
//Valid, now wait for exclusive access
@@ -89,7 +92,7 @@ namespace VNLib.Plugins.Essentials.Sessions.Memory
return new(null, FileProcessArgs.VirtualSkip, null);
}
//Initialze a new session
- session = new(sessionId, entity.Server.GetTrustedIp(), UpdateSessionId);
+ session = new(sessionId, entity.Server.GetTrustedIp(), UpdateSessionId, ExpiredSessions);
//Increment the semaphore
(session as IWaitHandle).WaitOne();
//store the session in cache while holding semaphore, and set its expiration
@@ -104,6 +107,31 @@ namespace VNLib.Plugins.Essentials.Sessions.Memory
}
}
+ public async Task CleanupExiredAsync(ILogProvider log, CancellationToken token)
+ {
+ while (true)
+ {
+ try
+ {
+ //Wait for expired session and dispose it
+ using MemorySession session = await ExpiredSessions.DequeueAsync(token);
+
+ //Obtain lock on session
+ await session.WaitOneAsync(CancellationToken.None);
+
+ log.Verbose("Removed expired session {id}", session.SessionID);
+ }
+ catch (OperationCanceledException)
+ {
+ break;
+ }
+ catch (Exception ex)
+ {
+ log.Error(ex);
+ }
+ }
+ }
+
private string UpdateSessionId(IHttpEvent entity, string oldId)
{
//Generate and set a new sessionid
diff --git a/Libs/VNLib.Plugins.Essentials.Sessions/VNLib.Plugins.Essentials.Sessions.Memory.csproj b/Libs/VNLib.Plugins.Essentials.Sessions/VNLib.Plugins.Essentials.Sessions.Memory.csproj
index 0d3cb40..c4a65f5 100644
--- a/Libs/VNLib.Plugins.Essentials.Sessions/VNLib.Plugins.Essentials.Sessions.Memory.csproj
+++ b/Libs/VNLib.Plugins.Essentials.Sessions/VNLib.Plugins.Essentials.Sessions.Memory.csproj
@@ -36,8 +36,8 @@
</PackageReference>
</ItemGroup>
<ItemGroup>
- <ProjectReference Include="..\..\..\..\VNLib\Essentials\VNLib.Plugins.Essentials.csproj" />
- <ProjectReference Include="..\..\..\..\VNLib\Http\VNLib.Net.Http.csproj" />
+ <ProjectReference Include="..\..\..\..\VNLib\Essentials\src\VNLib.Plugins.Essentials.csproj" />
+ <ProjectReference Include="..\..\..\..\VNLib\Http\src\VNLib.Net.Http.csproj" />
<ProjectReference Include="..\..\..\Extensions\VNLib.Plugins.Extensions.Loading\VNLib.Plugins.Extensions.Loading.csproj" />
<ProjectReference Include="..\..\..\PluginBase\VNLib.Plugins.PluginBase.csproj" />
<ProjectReference Include="..\VNLib.Plugins.Essentials.Sessions.Runtime\VNLib.Plugins.Essentials.Sessions.Runtime.csproj" />
diff --git a/Libs/VNLib.Plugins.Sessions.Cache.Client/Exceptions/SessionStatusException.cs b/Libs/VNLib.Plugins.Sessions.Cache.Client/Exceptions/SessionStatusException.cs
index b857638..bc6b217 100644
--- a/Libs/VNLib.Plugins.Sessions.Cache.Client/Exceptions/SessionStatusException.cs
+++ b/Libs/VNLib.Plugins.Sessions.Cache.Client/Exceptions/SessionStatusException.cs
@@ -29,6 +29,9 @@ using VNLib.Plugins.Essentials.Sessions;
namespace VNLib.Plugins.Sessions.Cache.Client.Exceptions
{
+ /// <summary>
+ /// Raised when the status of the session is invalid and cannot be used
+ /// </summary>
public class SessionStatusException : SessionException
{
public SessionStatusException()
diff --git a/Libs/VNLib.Plugins.Sessions.Cache.Client/RemoteSession.cs b/Libs/VNLib.Plugins.Sessions.Cache.Client/RemoteSession.cs
index d2d4200..3b61f68 100644
--- a/Libs/VNLib.Plugins.Sessions.Cache.Client/RemoteSession.cs
+++ b/Libs/VNLib.Plugins.Sessions.Cache.Client/RemoteSession.cs
@@ -37,8 +37,6 @@ using VNLib.Net.Messaging.FBM.Client;
using VNLib.Plugins.Essentials.Sessions;
using VNLib.Plugins.Essentials.Extensions;
-#nullable enable
-
namespace VNLib.Plugins.Sessions.Cache.Client
{
/// <summary>
@@ -48,9 +46,9 @@ namespace VNLib.Plugins.Sessions.Cache.Client
public abstract class RemoteSession : SessionBase
{
protected const string CREATED_TIME_ENTRY = "__.i.ctime";
-
- protected readonly FBMClient Client;
- protected readonly TimeSpan UpdateTimeout;
+
+ protected FBMClient Client { get; }
+ protected TimeSpan UpdateTimeout { get; }
private readonly AsyncLazyInitializer Initializer;
@@ -119,6 +117,8 @@ namespace VNLib.Plugins.Sessions.Cache.Client
protected set => this.SetValueType(CREATED_TIME_ENTRY, value.ToUnixTimeMilliseconds());
}
+
+
///<inheritdoc/>
protected override string IndexerGet(string key)
{
@@ -186,12 +186,5 @@ namespace VNLib.Plugins.Sessions.Cache.Client
throw;
}
}
- ///<inheritdoc/>
- protected override Task OnEvictedAsync()
- {
- //empty the dict to help the GC
- DataStore!.Clear();
- return Task.CompletedTask;
- }
}
}
diff --git a/Libs/VNLib.Plugins.Sessions.Cache.Client/SessionCacheClient.cs b/Libs/VNLib.Plugins.Sessions.Cache.Client/SessionCacheClient.cs
index 8eed404..2e72391 100644
--- a/Libs/VNLib.Plugins.Sessions.Cache.Client/SessionCacheClient.cs
+++ b/Libs/VNLib.Plugins.Sessions.Cache.Client/SessionCacheClient.cs
@@ -29,37 +29,64 @@ using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using VNLib.Net.Http;
-using VNLib.Utils;
+using VNLib.Utils.Async;
+using VNLib.Utils.Logging;
using VNLib.Utils.Memory.Caching;
using VNLib.Net.Messaging.FBM.Client;
using VNLib.Plugins.Essentials.Sessions;
-#nullable enable
-
namespace VNLib.Plugins.Sessions.Cache.Client
{
/// <summary>
/// A client that allows access to sessions located on external servers
/// </summary>
- public abstract class SessionCacheClient : VnDisposeable, ICacheHolder
+ public abstract class SessionCacheClient : ICacheHolder
{
- public class LRUSessionStore<T> : LRUCache<string, T> where T : ISession, ICacheable
+ public class LRUSessionStore<T> : LRUCache<string, T>, ICacheHolder where T : ISession
{
+ internal AsyncQueue<T> ExpiredSessions { get; }
+
+ ///<inheritdoc/>
public override bool IsReadOnly => false;
+ ///<inheritdoc/>
protected override int MaxCapacity { get; }
- public LRUSessionStore(int maxCapacity) : base(StringComparer.Ordinal) => MaxCapacity = maxCapacity;
-
+
+ public LRUSessionStore(int maxCapacity) : base(StringComparer.Ordinal)
+ {
+ MaxCapacity = maxCapacity;
+ ExpiredSessions = new (true, true);
+ }
+
+ ///<inheritdoc/>
protected override bool CacheMiss(string key, [NotNullWhen(true)] out T? value)
{
value = default;
return false;
}
+
+ ///<inheritdoc/>
protected override void Evicted(KeyValuePair<string, T> evicted)
{
- //Evice record
- evicted.Value.Evicted();
+ //add to queue, the list lock should be held during this operatio
+ _ = ExpiredSessions.TryEnque(evicted.Value);
+ }
+
+ ///<inheritdoc/>
+ public void CacheClear()
+ {
+ foreach (KeyValuePair<string, T> value in List)
+ {
+ Evicted(value);
+ }
+ Clear();
+ }
+
+ ///<inheritdoc/>
+ public void CacheHardClear()
+ {
+ CacheClear();
}
}
@@ -67,6 +94,9 @@ namespace VNLib.Plugins.Sessions.Cache.Client
protected readonly object CacheLock;
protected readonly int MaxLoadedEntires;
+ /// <summary>
+ /// The client used to communicate with the cache server
+ /// </summary>
protected FBMClient Client { get; }
/// <summary>
@@ -80,11 +110,14 @@ namespace VNLib.Plugins.Sessions.Cache.Client
CacheLock = new();
CacheTable = new(maxCacheItems);
Client = client;
- //Listen for close events
- Client.ConnectionClosed += Client_ConnectionClosed;
}
- private void Client_ConnectionClosed(object? sender, EventArgs e) => CacheHardClear();
+ private ulong _waitingCount;
+
+ /// <summary>
+ /// The number of pending connections waiting for results from the cache server
+ /// </summary>
+ public ulong WaitingConnections => _waitingCount;
/// <summary>
/// Attempts to get a session from the cache identified by its sessionId asynchronously
@@ -96,7 +129,6 @@ namespace VNLib.Plugins.Sessions.Cache.Client
/// <exception cref="SessionException"></exception>
public virtual async ValueTask<RemoteSession> GetSessionAsync(IHttpEvent entity, string sessionId, CancellationToken cancellationToken)
{
- Check();
try
{
RemoteSession? session;
@@ -113,6 +145,10 @@ namespace VNLib.Plugins.Sessions.Cache.Client
}
//Valid entry found in cache
}
+
+ //Inc waiting count
+ Interlocked.Increment(ref _waitingCount);
+
try
{
//Load session-data
@@ -128,6 +164,11 @@ namespace VNLib.Plugins.Sessions.Cache.Client
}
throw;
}
+ finally
+ {
+ //Dec waiting count
+ Interlocked.Decrement(ref _waitingCount);
+ }
}
catch (SessionException)
{
@@ -151,6 +192,47 @@ namespace VNLib.Plugins.Sessions.Cache.Client
/// <param name="sessionId">The session identifier</param>
/// <returns>The new session for the given ID</returns>
protected abstract RemoteSession SessionCtor(string sessionId);
+
+ /// <summary>
+ /// Begins waiting for expired sessions to be evicted from the cache table that
+ /// may have pending synchronization operations
+ /// </summary>
+ /// <param name="log"></param>
+ /// <param name="token"></param>
+ /// <returns></returns>
+ public async Task CleanupExpiredSessionsAsync(ILogProvider log, CancellationToken token)
+ {
+ //Close handler
+ void OnConnectionClosed(object? sender, EventArgs e) => CacheHardClear();
+
+ //Attach event
+ Client.ConnectionClosed += OnConnectionClosed;
+
+ while (true)
+ {
+ try
+ {
+ //Wait for expired session and dispose it
+ using RemoteSession session = await CacheTable.ExpiredSessions.DequeueAsync(token);
+
+ //Obtain lock on session
+ await session.WaitOneAsync(CancellationToken.None);
+
+ log.Verbose("Removed expired session {id}", session.SessionID);
+ }
+ catch (OperationCanceledException)
+ {
+ break;
+ }
+ catch(Exception ex)
+ {
+ log.Error(ex);
+ }
+ }
+
+ //remove handler
+ Client.ConnectionClosed -= OnConnectionClosed;
+ }
///<inheritdoc/>
public void CacheClear()
@@ -163,21 +245,8 @@ namespace VNLib.Plugins.Sessions.Cache.Client
//Cleanup cache when disconnected
lock (CacheLock)
{
- CacheTable.Clear();
- foreach (RemoteSession session in (IEnumerable<RemoteSession>)CacheTable)
- {
- session.Evicted();
- }
- CacheTable.Clear();
+ CacheTable.CacheHardClear();
}
}
-
- protected override void Free()
- {
- //Unsub from events
- Client.ConnectionClosed -= Client_ConnectionClosed;
- //Clear all cached sessions
- CacheHardClear();
- }
}
}
diff --git a/Libs/VNLib.Plugins.Sessions.Cache.Client/VNLib.Plugins.Sessions.Cache.Client.csproj b/Libs/VNLib.Plugins.Sessions.Cache.Client/VNLib.Plugins.Sessions.Cache.Client.csproj
index fc99bbf..a75ebe3 100644
--- a/Libs/VNLib.Plugins.Sessions.Cache.Client/VNLib.Plugins.Sessions.Cache.Client.csproj
+++ b/Libs/VNLib.Plugins.Sessions.Cache.Client/VNLib.Plugins.Sessions.Cache.Client.csproj
@@ -12,6 +12,7 @@
<GenerateDocumentationFile>True</GenerateDocumentationFile>
<PackageProjectUrl>https://www.vaughnnugent.com/resources</PackageProjectUrl>
<AnalysisLevel>latest-all</AnalysisLevel>
+ <Nullable>enable</Nullable>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'">
@@ -36,7 +37,7 @@
</ItemGroup>
<ItemGroup>
- <ProjectReference Include="..\..\..\..\VNLib\Essentials\VNLib.Plugins.Essentials.csproj" />
+ <ProjectReference Include="..\..\..\..\VNLib\Essentials\src\VNLib.Plugins.Essentials.csproj" />
<ProjectReference Include="..\..\..\DataCaching\VNLib.Data.Caching\src\VNLib.Data.Caching.csproj" />
</ItemGroup>
diff --git a/Plugins/CacheBroker/CacheBroker.csproj b/Plugins/CacheBroker/CacheBroker.csproj
index 97a7e30..f4ea139 100644
--- a/Plugins/CacheBroker/CacheBroker.csproj
+++ b/Plugins/CacheBroker/CacheBroker.csproj
@@ -45,7 +45,7 @@
<ItemGroup>
- <ProjectReference Include="..\..\..\..\VNLib\Essentials\VNLib.Plugins.Essentials.csproj" />
+ <ProjectReference Include="..\..\..\..\VNLib\Essentials\src\VNLib.Plugins.Essentials.csproj" />
<ProjectReference Include="..\..\..\..\VNLib\Hashing\src\VNLib.Hashing.Portable.csproj" />
<ProjectReference Include="..\..\..\Extensions\VNLib.Plugins.Extensions.Loading\VNLib.Plugins.Extensions.Loading.csproj" />
<ProjectReference Include="..\..\..\PluginBase\VNLib.Plugins.PluginBase.csproj" />
diff --git a/Plugins/CacheBroker/Endpoints/BrokerRegistrationEndpoint.cs b/Plugins/CacheBroker/Endpoints/BrokerRegistrationEndpoint.cs
index 340c47e..be700d1 100644
--- a/Plugins/CacheBroker/Endpoints/BrokerRegistrationEndpoint.cs
+++ b/Plugins/CacheBroker/Endpoints/BrokerRegistrationEndpoint.cs
@@ -35,11 +35,11 @@ using System.Threading.Tasks;
using System.Collections.Generic;
using System.Security.Cryptography;
using System.Text.Json.Serialization;
+using System.Security.Cryptography.X509Certificates;
using RestSharp;
using VNLib.Net.Http;
-using VNLib.Utils;
using VNLib.Utils.IO;
using VNLib.Utils.Memory;
using VNLib.Utils.Logging;
@@ -50,8 +50,8 @@ using VNLib.Plugins.Essentials.Endpoints;
using VNLib.Plugins.Essentials.Extensions;
using VNLib.Plugins.Extensions.Loading;
using VNLib.Plugins.Extensions.Loading.Events;
+using VNLib.Plugins.Extensions.Loading.Routing;
using VNLib.Net.Rest.Client;
-using VaultSharp.V1.SystemBackend;
#nullable enable
@@ -69,16 +69,7 @@ namespace VNLib.Plugins.Cache.Broker.Endpoints
MaxTimeout = 10 * 1000,
ThrowOnAnyError = true
}, null);
-
- private static readonly HashAlgorithmName SignatureHashAlg = HashAlgorithmName.SHA384;
- //using the es384 algorithm for signing
- private static readonly ECCurve DefaultCurve = ECCurve.CreateFromFriendlyName("secp384r1");
-
- private static readonly IReadOnlyDictionary<string, string> BrokerJwtHeader = new Dictionary<string, string>()
- {
- { "alg","ES384" },
- { "typ", "JWT"}
- };
+
private class ActiveServer
{
@@ -98,41 +89,18 @@ namespace VNLib.Plugins.Cache.Broker.Endpoints
private readonly object ListLock;
private readonly Dictionary<string, ActiveServer> ActiveServers;
- private readonly Task<byte[]> CachePubKey;
- private readonly Task<byte[]> ClientPubKey;
- private readonly Task<byte[]> BrokerPrivateKey;
-
//Loosen up protection settings since this endpoint is not desinged for browsers or sessions
protected override ProtectionSettings EndpointProtectionSettings { get; } = new()
{
- BrowsersOnly = false,
- CrossSiteDenied = false,
- SessionsRequired = false,
- VerifySessionCors = false,
+ DisableBrowsersOnly = true,
+ DisableCrossSiteDenied = true,
+ DisableSessionsRequired = true,
+ DisableVerifySessionCors = true,
};
public BrokerRegistrationEndpoint(PluginBase plugin, IReadOnlyDictionary<string, JsonElement> config)
{
string? path = config["path"].GetString();
-
- //Get the keys from the vault
- BrokerPrivateKey = plugin.TryGetSecretAsync("broker_private_key").ContinueWith((Task<string?> secret) =>
- {
- _ = secret.Result ?? throw new InvalidOperationException("Broker private key not found in vault");
- return Convert.FromBase64String(secret.Result);
- }, TaskScheduler.Default);
-
- CachePubKey = plugin.TryGetSecretAsync("cache_public_key").ContinueWith((Task<string?> secret) =>
- {
- _ = secret.Result ?? throw new InvalidOperationException("Cache public key not found in vault");
- return Convert.FromBase64String(secret.Result);
- }, TaskScheduler.Default);
-
- ClientPubKey = plugin.TryGetSecretAsync("client_public_key").ContinueWith((Task<string?> secret) =>
- {
- _ = secret.Result ?? throw new InvalidOperationException("Client public key not found in vault");
- return Convert.FromBase64String(secret.Result);
- }, TaskScheduler.Default);
InitPathAndLog(path, plugin.Log);
@@ -140,23 +108,62 @@ namespace VNLib.Plugins.Cache.Broker.Endpoints
ActiveServers = new();
}
+ private async Task<ReadOnlyJsonWebKey> GetClientPublic()
+ {
+ using SecretResult secret = await this.GetPlugin().TryGetSecretAsync("client_public_key") ?? throw new InvalidOperationException("Client public key not found in vault");
+ return secret.GetJsonWebKey();
+ }
+
+ private async Task<ReadOnlyJsonWebKey> GetCachePublic()
+ {
+ using SecretResult secret = await this.GetPlugin().TryGetSecretAsync("cache_public_key") ?? throw new InvalidOperationException("Cache public key not found in vault");
+ return secret.GetJsonWebKey();
+ }
+
+ private async Task<ReadOnlyJsonWebKey> GetBrokerCertificate()
+ {
+ using SecretResult secret = await this.GetPlugin().TryGetSecretAsync("broker_private_key") ?? throw new InvalidOperationException("Broker private key not found in vault");
+ return secret.GetJsonWebKey();
+ }
+
+ /*
+ * Clients and servers use the post method to discover cache
+ * server nodes
+ */
protected override async ValueTask<VfReturnType> PostAsync(HttpEntity entity)
{
//Parse jwt
- using JsonWebToken jwt = await entity.ParseFileAsAsync(ParseJwtAsync) ?? throw new Exception("Invalid JWT");
+ using JsonWebToken? jwt = await entity.ParseFileAsAsync(ParseJwtAsync);
+
+ if(jwt == null)
+ {
+ return VfReturnType.BadRequest;
+ }
+
//Verify with the client's pub key
- using (ECDsa alg = ECDsa.Create(DefaultCurve))
+ using (ReadOnlyJsonWebKey cpCert = await GetClientPublic())
{
- ReadOnlyMemory<byte> client = await ClientPubKey;
- alg.ImportSubjectPublicKeyInfo(client.Span, out _);
- //Verify with client public key
- if (!jwt.Verify(alg, in SignatureHashAlg))
+ if (jwt.VerifyFromJwk(cpCert))
{
- entity.CloseResponse(HttpStatusCode.Unauthorized);
- return VfReturnType.VirtualSkip;
+ goto Accepted;
+ }
+ }
+
+ //Accept usng the cache server key
+ using (ReadOnlyJsonWebKey cacheCert = await GetCachePublic())
+ {
+ if (jwt.VerifyFromJwk(cacheCert))
+ {
+ goto Accepted;
}
}
+
+ entity.CloseResponse(HttpStatusCode.Unauthorized);
+ return VfReturnType.VirtualSkip;
+
+ Accepted:
+
try
{
//Get all active servers
@@ -165,23 +172,17 @@ namespace VNLib.Plugins.Cache.Broker.Endpoints
{
servers = ActiveServers.Values.ToArray();
}
-
+ using ReadOnlyJsonWebKey bpCert = await GetBrokerCertificate();
+
//Create response payload with list of active servers and sign it
using JsonWebToken response = new();
- response.WriteHeader(BrokerJwtHeader);
+ response.WriteHeader(bpCert.JwtHeader);
response.InitPayloadClaim(1)
.AddClaim("servers", servers)
.CommitClaims();
//Sign the jwt using the broker key
- using(ECDsa alg = ECDsa.Create(DefaultCurve))
- {
- ReadOnlyMemory<byte> brokerPrivate = await BrokerPrivateKey;
-
- alg.ImportPkcs8PrivateKey(brokerPrivate.Span, out _);
-
- response.Sign(alg, in SignatureHashAlg, 128);
- }
+ response.SignFromJwk(bpCert);
entity.CloseResponse(HttpStatusCode.OK, ContentType.Text, response.DataBuffer);
return VfReturnType.VirtualSkip;
@@ -211,15 +212,18 @@ namespace VNLib.Plugins.Cache.Broker.Endpoints
protected override async ValueTask<VfReturnType> PutAsync(HttpEntity entity)
{
//Parse jwt
- using JsonWebToken? jwt = await entity.ParseFileAsAsync(ParseJwtAsync) ?? throw new Exception("");
- //Verify with the cache server's pub key
- using (ECDsa alg = ECDsa.Create(DefaultCurve))
+ using JsonWebToken? jwt = await entity.ParseFileAsAsync(ParseJwtAsync);
+
+ if(jwt == null)
+ {
+ return VfReturnType.BadRequest;
+ }
+
+ //Only cache servers may update the list
+ using (ReadOnlyJsonWebKey cpCert = await GetCachePublic())
{
- ReadOnlyMemory<byte> cache = await CachePubKey;
-
- alg.ImportSubjectPublicKeyInfo(cache.Span, out _);
//Verify the jwt
- if (!jwt.Verify(alg, in SignatureHashAlg))
+ if (!jwt.VerifyFromJwk(cpCert))
{
entity.CloseResponse(HttpStatusCode.Unauthorized);
return VfReturnType.VirtualSkip;
@@ -227,8 +231,7 @@ namespace VNLib.Plugins.Cache.Broker.Endpoints
}
try
- {
-
+ {
//Get message body
using JsonDocument requestBody = jwt.GetPayload();
@@ -331,17 +334,21 @@ namespace VNLib.Plugins.Cache.Broker.Endpoints
{
servers = ActiveServers.Values.ToArray();
}
+
+ using ReadOnlyJsonWebKey jwk = await GetBrokerCertificate();
+
LinkedList<Task> all = new();
//Run keeplaive request for all active servers
foreach (ActiveServer server in servers)
{
- all.AddLast(RunHeartbeatAsync(server));
+ all.AddLast(RunHeartbeatAsync(server, jwk));
}
+
//Wait for all to complete
await Task.WhenAll(all);
}
-
- private async Task RunHeartbeatAsync(ActiveServer server)
+
+ private async Task RunHeartbeatAsync(ActiveServer server, ReadOnlyJsonWebKey privKey)
{
try
{
@@ -350,27 +357,23 @@ namespace VNLib.Plugins.Cache.Broker.Endpoints
{
Path = HEARTBEAT_PATH
};
+
string authMessage;
//Init jwt for signing auth messages
using (JsonWebToken jwt = new())
{
- jwt.WriteHeader(BrokerJwtHeader);
+ jwt.WriteHeader(privKey.JwtHeader);
jwt.InitPayloadClaim()
.AddClaim("token", server.Token)
.CommitClaims();
-
- //Sign the jwt using the broker key
- using (ECDsa alg = ECDsa.Create(DefaultCurve))
- {
- ReadOnlyMemory<byte> broker = await BrokerPrivateKey;
-
- alg.ImportPkcs8PrivateKey(broker.Span, out _);
- //Sign with broker key
- jwt.Sign(alg, in SignatureHashAlg, 128);
- }
+
+ //Sign the jwt
+ jwt.SignFromJwk(privKey);
+
//compile
authMessage = jwt.Compile();
}
+
//Build keeplaive request
RestRequest keepaliveRequest = new(uri.Uri, Method.Get);
//Add authorization token
diff --git a/Plugins/SessionCacheServer/Endpoints/BrokerHeartBeat.cs b/Plugins/SessionCacheServer/Endpoints/BrokerHeartBeat.cs
index bad0f7a..2e380a3 100644
--- a/Plugins/SessionCacheServer/Endpoints/BrokerHeartBeat.cs
+++ b/Plugins/SessionCacheServer/Endpoints/BrokerHeartBeat.cs
@@ -29,9 +29,7 @@ using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using System.Collections.Generic;
-using System.Security.Cryptography;
-using VNLib.Data.Caching.Extensions;
using VNLib.Hashing.IdentityUtility;
using VNLib.Plugins.Essentials.Endpoints;
using VNLib.Plugins.Essentials.Extensions;
@@ -48,7 +46,12 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints
private readonly Task<IPAddress[]> BrokerIpList;
private readonly PluginBase Pbase;
- protected override ProtectionSettings EndpointProtectionSettings { get; }
+ protected override ProtectionSettings EndpointProtectionSettings { get; } = new()
+ {
+ DisableBrowsersOnly = true,
+ DisableSessionsRequired = true,
+ DisableVerifySessionCors = true
+ };
public BrokerHeartBeat(Func<string> token, ManualResetEvent keepaliveSet, Uri brokerUri, PluginBase pbase)
{
@@ -57,20 +60,13 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints
BrokerIpList = Dns.GetHostAddressesAsync(brokerUri.DnsSafeHost);
this.Pbase = pbase;
-
- EndpointProtectionSettings = new()
- {
- BrowsersOnly = false,
- SessionsRequired = false,
- VerifySessionCors = false,
- };
}
- private async Task<byte[]> GetBrokerPubAsync()
+ private async Task<ReadOnlyJsonWebKey> GetBrokerPubAsync()
{
- string? brokerPubKey = await Pbase.TryGetSecretAsync("broker_public_key") ?? throw new KeyNotFoundException("Missing required secret : broker_public_key");
+ using SecretResult brokerPubKey = await Pbase.TryGetSecretAsync("broker_public_key") ?? throw new KeyNotFoundException("Missing required secret : broker_public_key");
- return Convert.FromBase64String(brokerPubKey);
+ return brokerPubKey.GetJsonWebKey();
}
protected override async ValueTask<VfReturnType> GetAsync(HttpEntity entity)
@@ -89,35 +85,36 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints
}
//Get the authorization jwt
string? jwtAuth = entity.Server.Headers[HttpRequestHeader.Authorization];
+
if (string.IsNullOrWhiteSpace(jwtAuth))
{
//Token invalid
entity.CloseResponse(HttpStatusCode.Forbidden);
return VfReturnType.VirtualSkip;
}
+
//Parse the jwt
using JsonWebToken jwt = JsonWebToken.Parse(jwtAuth);
- //Init signature alg
- using (ECDsa alg = ECDsa.Create(FBMDataCacheExtensions.CacheCurve))
- {
- //Get pub key
- byte[] key = await GetBrokerPubAsync();
- alg.ImportSubjectPublicKeyInfo(key, out _);
+ //Verify the jwt using the broker's public key certificate
+ using (ReadOnlyJsonWebKey cert = await GetBrokerPubAsync())
+ {
//Verify the jwt
- if (!jwt.Verify(alg, FBMDataCacheExtensions.CacheJwtAlgorithm))
+ if (!jwt.VerifyFromJwk(cert))
{
//Token invalid
entity.CloseResponse(HttpStatusCode.Forbidden);
return VfReturnType.VirtualSkip;
}
}
+
string? auth;
//Recover the auth token from the jwt
using (JsonDocument doc = jwt.GetPayload())
{
auth = doc.RootElement.GetProperty("token").GetString();
}
+
//Verify token
if(Token().Equals(auth, StringComparison.Ordinal))
{
@@ -126,6 +123,7 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints
entity.CloseResponse(HttpStatusCode.OK);
return VfReturnType.VirtualSkip;
}
+
//Token invalid
entity.CloseResponse(HttpStatusCode.Forbidden);
return VfReturnType.VirtualSkip;
diff --git a/Plugins/SessionCacheServer/Endpoints/ConnectEndpoint.cs b/Plugins/SessionCacheServer/Endpoints/ConnectEndpoint.cs
index fee1ea7..77acb13 100644
--- a/Plugins/SessionCacheServer/Endpoints/ConnectEndpoint.cs
+++ b/Plugins/SessionCacheServer/Endpoints/ConnectEndpoint.cs
@@ -24,25 +24,21 @@
using System;
using System.Net;
-using System.Text;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Channels;
using System.Collections.Generic;
-using System.Security.Cryptography;
using System.Collections.Concurrent;
using VNLib.Net.Http;
using VNLib.Hashing;
using VNLib.Utils.Async;
-using VNLib.Utils.Memory;
using VNLib.Utils.Logging;
using VNLib.Hashing.IdentityUtility;
using VNLib.Net.Messaging.FBM;
using VNLib.Net.Messaging.FBM.Client;
using VNLib.Net.Messaging.FBM.Server;
-using VNLib.Data.Caching.Extensions;
using VNLib.Data.Caching.ObjectCache;
using VNLib.Plugins.Extensions.Loading;
using VNLib.Plugins.Essentials.Endpoints;
@@ -77,9 +73,9 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints
//Loosen up protection settings
protected override ProtectionSettings EndpointProtectionSettings { get; } = new()
{
- BrowsersOnly = false,
- SessionsRequired = false,
- CrossSiteDenied = false
+ DisableBrowsersOnly = true,
+ DisableSessionsRequired = true,
+ DisableCrossSiteDenied = true
};
public ConnectEndpoint(string path, ObjectCacheStore store, PluginBase pbase)
@@ -121,20 +117,33 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints
string? nodeId = null;
string? challenge = null;
+ bool isPeer = false;
// Parse jwt
using (JsonWebToken jwt = JsonWebToken.Parse(jwtAuth))
{
- //Get the client public key
- byte[] clientPub = await GetClientPubAsync();
+ bool verified = false;
- //Init sig alg
- using ECDsa verAlg = ECDsa.Create(FBMDataCacheExtensions.CacheCurve);
- //Import client pub key
- verAlg.ImportSubjectPublicKeyInfo(clientPub, out _);
-
- //verify signature for client
- if (!jwt.Verify(verAlg, in FBMDataCacheExtensions.CacheJwtAlgorithm))
+ //Get the client public key certificate to verify the client's message
+ using(ReadOnlyJsonWebKey cert = await GetClientPubAsync())
+ {
+ //verify signature for client
+ if (jwt.VerifyFromJwk(cert))
+ {
+ verified = true;
+ }
+ //May be signed by a cahce server
+ else
+ {
+ using ReadOnlyJsonWebKey cacheCert = await GetCachePubAsync();
+
+ //Set peer and verified flag since the another cache server signed the request
+ isPeer = verified = jwt.VerifyFromJwk(cacheCert);
+ }
+ }
+
+ //Check flag
+ if (!verified)
{
Log.Information("Client signature verification failed");
entity.CloseResponse(HttpStatusCode.Unauthorized);
@@ -154,36 +163,28 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints
}
Log.Debug("Received negotiation request from node {node}", nodeId);
-
//Verified, now we can create an auth message with a short expiration
using JsonWebToken auth = new();
- auth.WriteHeader(FBMDataCacheExtensions.JwtMessageHeader.Span);
- auth.InitPayloadClaim()
- .AddClaim("aud", AudienceLocalServerId)
- .AddClaim("exp", DateTimeOffset.UtcNow.Add(AuthTokenExpiration).ToUnixTimeSeconds())
- .AddClaim("nonce", RandomHash.GetRandomHex(8))
- .AddClaim("chl", challenge)
- //Specify the server's node id if set
- .AddClaim("sub", nodeId)
- //Add negotiaion args
- .AddClaim(FBMClient.REQ_HEAD_BUF_QUERY_ARG, MAX_HEAD_BUF_SIZE)
- .AddClaim(FBMClient.REQ_RECV_BUF_QUERY_ARG, MAX_RECV_BUF_SIZE)
- .AddClaim(FBMClient.REQ_MAX_MESS_QUERY_ARG, MAX_MESSAGE_SIZE)
- .CommitClaims();
-
- //Sign the auth message
- ECDsa sigAlg = ECDsa.Create(FBMDataCacheExtensions.CacheCurve);
- byte[] cachePrivate = await GetCachePrivateKeyAsync();
- try
+ //Sign the auth message from the cache certificate's private key
+ using (ReadOnlyJsonWebKey cert = await GetCachePrivateKeyAsync())
{
- sigAlg.ImportPkcs8PrivateKey(cachePrivate, out _);
- //sign jwt
- auth.Sign(sigAlg, in FBMDataCacheExtensions.CacheJwtAlgorithm, 256);
- }
- finally
- {
- Memory.InitializeBlock(cachePrivate.AsSpan());
- sigAlg.Dispose();
+ auth.WriteHeader(cert.JwtHeader);
+ auth.InitPayloadClaim()
+ .AddClaim("aud", AudienceLocalServerId)
+ .AddClaim("exp", DateTimeOffset.UtcNow.Add(AuthTokenExpiration).ToUnixTimeSeconds())
+ .AddClaim("nonce", RandomHash.GetRandomBase32(8))
+ .AddClaim("chl", challenge!)
+ //Set the ispeer flag if the request was signed by a cache server
+ .AddClaim("isPeer", isPeer)
+ //Specify the server's node id if set
+ .AddClaim("sub", nodeId!)
+ //Add negotiaion args
+ .AddClaim(FBMClient.REQ_HEAD_BUF_QUERY_ARG, MAX_HEAD_BUF_SIZE)
+ .AddClaim(FBMClient.REQ_RECV_BUF_QUERY_ARG, MAX_RECV_BUF_SIZE)
+ .AddClaim(FBMClient.REQ_MAX_MESS_QUERY_ARG, MAX_MESSAGE_SIZE)
+ .CommitClaims();
+
+ auth.SignFromJwk(cert);
}
//Close response
@@ -191,27 +192,23 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints
return VfReturnType.VirtualSkip;
}
- private async Task<byte[]> GetClientPubAsync()
+ private async Task<ReadOnlyJsonWebKey> GetClientPubAsync()
{
- string? brokerPubKey = await Pbase.TryGetSecretAsync("client_public_key") ?? throw new KeyNotFoundException("Missing required secret : client_public_key");
-
- return Convert.FromBase64String(brokerPubKey);
+ using SecretResult brokerPubKey = await Pbase.TryGetSecretAsync("client_public_key") ?? throw new KeyNotFoundException("Missing required secret : client_public_key");
+
+ return brokerPubKey.GetJsonWebKey();
}
- private async Task<byte[]> GetCachePubAsync()
+ private async Task<ReadOnlyJsonWebKey> GetCachePubAsync()
{
- string? brokerPubKey = await Pbase.TryGetSecretAsync("cache_public_key") ?? throw new KeyNotFoundException("Missing required secret : client_public_key");
+ using SecretResult cachPublic = await Pbase.TryGetSecretAsync("cache_public_key") ?? throw new KeyNotFoundException("Missing required secret : client_public_key");
- return Convert.FromBase64String(brokerPubKey);
+ return cachPublic.GetJsonWebKey();
}
- private async Task<byte[]> GetCachePrivateKeyAsync()
+ private async Task<ReadOnlyJsonWebKey> GetCachePrivateKeyAsync()
{
- string? cachePrivate = await Pbase.TryGetSecretAsync("cache_private_key") ?? throw new KeyNotFoundException("Missing required secret : client_public_key");
-
- byte[] data = Convert.FromBase64String(cachePrivate);
+ using SecretResult cachePrivate = await Pbase.TryGetSecretAsync("cache_private_key") ?? throw new KeyNotFoundException("Missing required secret : client_public_key");
- Memory.UnsafeZeroMemory<char>(cachePrivate);
-
- return data;
+ return cachePrivate.GetJsonWebKey();
}
private async Task ChangeWorkerAsync(CancellationToken cancellation)
@@ -265,16 +262,11 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints
//Parse jwt
using (JsonWebToken jwt = JsonWebToken.Parse(jwtAuth))
{
- //Init sig alg, we will verify that the token was signed by this server
- using (ECDsa sigAlg = ECDsa.Create(FBMDataCacheExtensions.CacheCurve))
+ //Get the client public key certificate to verify the client's message
+ using (ReadOnlyJsonWebKey cert = await GetCachePubAsync())
{
- //Get the cache public key
- byte[] cachePub = await GetCachePubAsync();
-
- sigAlg.ImportSubjectPublicKeyInfo(cachePub, out _);
-
- //verify signature against the cache public key, since this server have signed it
- if (!jwt.Verify(sigAlg, in FBMDataCacheExtensions.CacheJwtAlgorithm))
+ //verify signature against the cache public key, since this server must have signed it
+ if (!jwt.VerifyFromJwk(cert))
{
entity.CloseResponse(HttpStatusCode.Unauthorized);
return VfReturnType.VirtualSkip;
@@ -286,8 +278,7 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints
//Verify audience, expiration
- if (!doc.RootElement.TryGetProperty("aud", out JsonElement audEl)
- || AudienceLocalServerId.Equals(audEl.GetString(), StringComparison.OrdinalIgnoreCase))
+ if (!doc.RootElement.TryGetProperty("aud", out JsonElement audEl) || !AudienceLocalServerId.Equals(audEl.GetString(), StringComparison.OrdinalIgnoreCase))
{
entity.CloseResponse(HttpStatusCode.Unauthorized);
return VfReturnType.VirtualSkip;
@@ -300,8 +291,11 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints
return VfReturnType.VirtualSkip;
}
- //The node id is optional and stored in the 'sub' field
- if (doc.RootElement.TryGetProperty("sub", out JsonElement servIdEl))
+ //Check if the client is a peer
+ bool isPeer = doc.RootElement.TryGetProperty("isPeer", out JsonElement isPeerEl) && isPeerEl.GetBoolean();
+
+ //The node id is optional and stored in the 'sub' field, ignore if the client is not a peer
+ if (isPeer && doc.RootElement.TryGetProperty("sub", out JsonElement servIdEl))
{
nodeId = servIdEl.GetString();
}
@@ -342,6 +336,7 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints
//Get the queue
nodeQueue = StatefulEventQueue[nodeId];
}
+
//Init new ws state object and clamp the suggested buffer sizes
WsUserState state = new()
{
@@ -363,6 +358,7 @@ namespace VNLib.Plugins.Essentials.Sessions.Server.Endpoints
return VfReturnType.BadRequest;
}
}
+
private async Task WebsocketAcceptedAsync(WebSocketSession wss)
{
//Inc connected count
diff --git a/Plugins/SessionCacheServer/ObjectCacheServer.csproj b/Plugins/SessionCacheServer/ObjectCacheServer.csproj
index 01b33f3..2cf298d 100644
--- a/Plugins/SessionCacheServer/ObjectCacheServer.csproj
+++ b/Plugins/SessionCacheServer/ObjectCacheServer.csproj
@@ -37,7 +37,7 @@
</PackageReference>
</ItemGroup>
<ItemGroup>
- <ProjectReference Include="..\..\..\..\VNLib\Essentials\VNLib.Plugins.Essentials.csproj" />
+ <ProjectReference Include="..\..\..\..\VNLib\Essentials\src\VNLib.Plugins.Essentials.csproj" />
<ProjectReference Include="..\..\..\DataCaching\VNLib.Data.Caching.Extensions\VNLib.Data.Caching.Extensions.csproj" />
<ProjectReference Include="..\..\..\DataCaching\VNLib.Data.Caching.ObjectCache\VNLib.Data.Caching.ObjectCache.csproj" />
<ProjectReference Include="..\..\..\PluginBase\VNLib.Plugins.PluginBase.csproj" />
diff --git a/Plugins/SessionCacheServer/ObjectCacheServerEntry.cs b/Plugins/SessionCacheServer/ObjectCacheServerEntry.cs
index 17d8ba5..20a6268 100644
--- a/Plugins/SessionCacheServer/ObjectCacheServerEntry.cs
+++ b/Plugins/SessionCacheServer/ObjectCacheServerEntry.cs
@@ -33,12 +33,12 @@ using System.Net.Sockets;
using System.Threading.Tasks;
using System.Collections.Generic;
using System.Security.Cryptography;
-using System.Collections.Concurrent;
using VNLib.Utils.Memory;
using VNLib.Utils.Logging;
using VNLib.Utils.Extensions;
using VNLib.Hashing;
+using VNLib.Hashing.IdentityUtility;
using VNLib.Data.Caching;
using VNLib.Data.Caching.Extensions;
using VNLib.Data.Caching.ObjectCache;
@@ -49,7 +49,7 @@ using VNLib.Plugins.Cache.Broker.Endpoints;
using VNLib.Plugins.Extensions.Loading;
using VNLib.Plugins.Extensions.Loading.Routing;
using VNLib.Plugins.Essentials.Sessions.Server.Endpoints;
-using VNLib.Utils.Memory.Caching;
+
namespace VNLib.Plugins.Essentials.Sessions.Server
{
@@ -58,7 +58,19 @@ namespace VNLib.Plugins.Essentials.Sessions.Server
public override string PluginName => "ObjectCache.Service";
private string? BrokerHeartBeatToken;
-
+
+ private readonly object ServerLock = new();
+ private readonly HashSet<ActiveServer> ListeningServers = new();
+
+
+ private void RemoveServer(ActiveServer server)
+ {
+ lock (ServerLock)
+ {
+ ListeningServers.Remove(server);
+ }
+ }
+
protected override void OnLoad()
{
//Create default heap
@@ -153,8 +165,7 @@ namespace VNLib.Plugins.Essentials.Sessions.Server
try
{
//Get the broker config element
- IReadOnlyDictionary<string, JsonElement> clusterConfig = this.GetConfig("cluster");
-
+ IReadOnlyDictionary<string, JsonElement> clusterConfig = this.GetConfig("cluster");
//Server id is just dns name for now
string serverId = Dns.GetHostName();
@@ -179,11 +190,9 @@ namespace VNLib.Plugins.Essentials.Sessions.Server
using BrokerRegistrationRequest request = new();
{
string addr = clusterConfig["broker_address"].GetString() ?? throw new KeyNotFoundException("Missing required key 'broker_address' for config 'cluster'");
-
- //Try to get the cache private key
- string base64Priv = await this.TryGetSecretAsync("cache_private_key") ?? throw new KeyNotFoundException("Failed to load the cache private key");
- ReadOnlyMemory<byte> privKey = Convert.FromBase64String(base64Priv);
+ //Recover the certificate
+ ReadOnlyJsonWebKey cacheCert = await GetCachePrivate();
//Init url builder for payload, see if tls is enabled
Uri connectAddress = new UriBuilder(usingTls ? Uri.UriSchemeHttps : Uri.UriSchemeHttp, Dns.GetHostName(), port, connectPath).Uri;
@@ -191,10 +200,7 @@ namespace VNLib.Plugins.Essentials.Sessions.Server
request.WithBroker(new(addr))
.WithRegistrationAddress(connectAddress.ToString())
.WithNodeId(serverId)
- .WithPrivateKey(privKey.Span);
- //Wipe memory
- Memory.UnsafeZeroMemory<char>(base64Priv);
- Memory.UnsafeZeroMemory(privKey);
+ .WithSigningKey(cacheCert, true);
}
while (true)
@@ -279,6 +285,19 @@ namespace VNLib.Plugins.Essentials.Sessions.Server
#region Cluster
+ private async Task<ReadOnlyJsonWebKey> GetCachePrivate()
+ {
+ using SecretResult secret = await this.TryGetSecretAsync("cache_private_key") ?? throw new KeyNotFoundException("Failed to load the cache private key");
+ return secret.GetJsonWebKey();
+ }
+
+ private async Task<ReadOnlyJsonWebKey> GetBrokerPublic()
+ {
+ using SecretResult secret = await this.TryGetSecretAsync("broker_public_key") ?? throw new KeyNotFoundException("Failed to load the broker's public key");
+ return secret.GetJsonWebKey();
+ }
+
+
/// <summary>
/// Starts a self-contained process-long task to discover other cache servers
/// from a shared broker server
@@ -291,31 +310,27 @@ namespace VNLib.Plugins.Essentials.Sessions.Server
private async Task RunClientAsync(ObjectCacheStore cacheStore, Uri brokerAddress, FBMClientConfig clientConf)
{
TimeSpan noServerDelay = TimeSpan.FromSeconds(10);
- //Init signing algs
- ReadOnlyMemory<byte> clientPrivKey = null;
- ReadOnlyMemory<byte> brokerPubKey = null;
string nodeId = Dns.GetHostName();
+ ListServerRequest listRequest = new(brokerAddress);
try
{
//Get the broker config element
IReadOnlyDictionary<string, JsonElement> clusterConf = this.GetConfig("cluster");
int serverCheckMs = clusterConf["update_interval_sec"].GetInt32() * 1000;
- //Get client priv key from secret store
- string cpk = await this.TryGetSecretAsync("client_private_key") ?? throw new KeyNotFoundException("Failed to get the client private key from config");
- string bpub = await this.TryGetSecretAsync("broker_public_key") ?? throw new KeyNotFoundException("Failed to get the broker public key from config");
+ //Setup signing and verification certificates
+ ReadOnlyJsonWebKey cacheSig = await GetCachePrivate();
+ ReadOnlyJsonWebKey brokerPub = await GetBrokerPublic();
+ //Import certificates
+ listRequest.WithVerificationKey(brokerPub)
+ .WithSigningKey(cacheSig);
- //Load client private key
- clientPrivKey = Convert.FromBase64String(cpk);
- //Import broker public key
- brokerPubKey = Convert.FromBase64String(bpub);
-
- //Concurrent dict to track remote servers
- ConcurrentDictionary<string, ActiveServer> ActiveServers = new();
//Main event loop
- Log.Information("Discovering available cluster nodes in broker");
+ Log.Information("Begining cluster node discovery");
+
+ ILogProvider? debugLog = this.IsDebug() ? Log : null;
- while (!UnloadToken.IsCancellationRequested)
+ while (true)
{
//Load the server list
ActiveServer[]? servers;
@@ -323,8 +338,10 @@ namespace VNLib.Plugins.Essentials.Sessions.Server
{
try
{
+ debugLog?.Information("[CACHE] Requesting server list from broker");
+
//Get server list
- servers = await FBMDataCacheExtensions.ListServersAsync(brokerAddress, clientPrivKey, brokerPubKey, UnloadToken);
+ servers = await FBMDataCacheExtensions.ListServersAsync(listRequest, UnloadToken);
//Servers are loaded, so continue
break;
}
@@ -345,23 +362,37 @@ namespace VNLib.Plugins.Essentials.Sessions.Server
//Delay
await Task.Delay(randomMsDelay, UnloadToken);
}
- if(servers?.Length == 0)
+
+ if(servers == null || servers.Length == 0)
{
Log.Information("No cluster nodes found, retrying");
//Delay
await Task.Delay(noServerDelay, UnloadToken);
continue;
}
+
- //Select servers that are not the current server and are not already being monitored
- IEnumerable<ActiveServer> serversToConnectTo = servers.Where(s => s.ServerId != nodeId)
- .Where(s => !ActiveServers.ContainsKey(s.ServerId!));
- //Connect to servers
- foreach (ActiveServer server in serversToConnectTo)
+ //Lock on sever set while enumerating
+ lock (ServerLock)
{
- _ = RunSyncTaskAsync(server, ActiveServers, cacheStore, clientConf, clientPrivKey, nodeId)
- .ConfigureAwait(false);
+ //Select servers that are not the current server and are not already being monitored
+ IEnumerable<ActiveServer> serversToConnectTo = servers.Where(s => !nodeId.Equals(s.ServerId, StringComparison.OrdinalIgnoreCase));
+
+ //Connect to servers
+ foreach (ActiveServer server in serversToConnectTo)
+ {
+ //Make sure were not currently connected to the server
+ if (!ListeningServers.Contains(server))
+ {
+ //Add the server to the set
+ ListeningServers.Add(server);
+
+ //Run listener background task
+ _ = this.DeferTask(() => RunSyncTaskAsync(server, cacheStore, clientConf, nodeId));
+ }
+ }
}
+
//Delay until next check cycle
await Task.Delay(serverCheckMs, UnloadToken);
}
@@ -383,19 +414,15 @@ namespace VNLib.Plugins.Essentials.Sessions.Server
}
finally
{
- Memory.UnsafeZeroMemory(clientPrivKey);
- Memory.UnsafeZeroMemory(brokerPubKey);
+ listRequest.Dispose();
}
Log.Debug("Cluster sync worker exited");
}
- private async Task RunSyncTaskAsync(ActiveServer server, ConcurrentDictionary<string, ActiveServer> activeList,
- ObjectCacheStore cacheStore, FBMClientConfig conf, ReadOnlyMemory<byte> privateKey, string nodeId)
+ private async Task RunSyncTaskAsync(ActiveServer server, ObjectCacheStore cacheStore, FBMClientConfig conf, string nodeId)
{
//Setup client
FBMClient client = new(conf);
- //Add server to active list, or replace its old value with the new one
- activeList.AddOrUpdate(server.ServerId, server, (old, update) => server);
try
{
async Task UpdateRecordAsync(string objectId, string newId)
@@ -413,6 +440,7 @@ namespace VNLib.Plugins.Essentials.Sessions.Server
using FBMResponse response = await client.SendAsync(modRequest, UnloadToken);
response.ThrowIfNotSet();
+
//Check response code
string status = response.Headers.First(static s => s.Key == HeaderCommand.Status).Value.ToString();
if (ResponseCodes.Okay.Equals(status, StringComparison.Ordinal))
@@ -431,14 +459,21 @@ namespace VNLib.Plugins.Essentials.Sessions.Server
client.ReturnRequest(modRequest);
}
}
+
+ {
+ //Sign and verify requests with the cache private key since we are a peer
+ ReadOnlyJsonWebKey cachePriv = await GetCachePrivate();
+
+ //Configure cache
+ client.GetCacheConfiguration()
+ .WithVerificationKey(cachePriv)
+ .WithSigningCertificate(cachePriv)
+ .WithNodeId(nodeId) //set nodeid since were listening for changes
+ .WithTls(false);
+ }
+
+ Log.Information("Connecting to {server}...", server.ServerId);
- //Configure cache
- client.GetCacheConfiguration()
- .ImportVerificationKey(privateKey.Span)
- .ImportVerificationKey(null)
- .WithNodeId(nodeId) //set nodeid since were listening for changes
- .WithTls(false);
-
//Connect to the server
await client.ConnectToCacheAsync(server, UnloadToken);
@@ -518,7 +553,7 @@ namespace VNLib.Plugins.Essentials.Sessions.Server
finally
{
//Remove server from active list, since its been disconnected
- _ = activeList.TryRemove(server.ServerId, out _);
+ RemoveServer(server);
client.Dispose();
}
}
diff --git a/Plugins/SessionProvider/SessionProvider.csproj b/Plugins/SessionProvider/SessionProvider.csproj
index 4ed7dc1..98fb5da 100644
--- a/Plugins/SessionProvider/SessionProvider.csproj
+++ b/Plugins/SessionProvider/SessionProvider.csproj
@@ -32,6 +32,8 @@
</PropertyGroup>
<ItemGroup>
+ <ProjectReference Include="..\..\..\..\VNLib\Essentials\src\VNLib.Plugins.Essentials.csproj" />
+ <ProjectReference Include="..\..\..\..\VNLib\Http\src\VNLib.Net.Http.csproj" />
<ProjectReference Include="..\..\..\..\VNLib\Utils\src\VNLib.Utils.csproj" />
<ProjectReference Include="..\..\..\Extensions\VNLib.Plugins.Extensions.Loading\VNLib.Plugins.Extensions.Loading.csproj" />
<ProjectReference Include="..\..\Libs\VNLib.Plugins.Essentials.Sessions.Runtime\VNLib.Plugins.Essentials.Sessions.Runtime.csproj" />
diff --git a/Plugins/SessionProvider/null b/Plugins/SessionProvider/null
deleted file mode 100644
index 48b2100..0000000
--- a/Plugins/SessionProvider/null
+++ /dev/null
@@ -1 +0,0 @@
-F:\Programming\Web Plugins\DevPlugins\SessionProvider\*, Are you sure (Y/N)?