aboutsummaryrefslogtreecommitdiff
path: root/Plugins/SessionCacheServer/ObjectCacheServerEntry.cs
diff options
context:
space:
mode:
Diffstat (limited to 'Plugins/SessionCacheServer/ObjectCacheServerEntry.cs')
-rw-r--r--Plugins/SessionCacheServer/ObjectCacheServerEntry.cs100
1 files changed, 49 insertions, 51 deletions
diff --git a/Plugins/SessionCacheServer/ObjectCacheServerEntry.cs b/Plugins/SessionCacheServer/ObjectCacheServerEntry.cs
index 6c969ba..17d8ba5 100644
--- a/Plugins/SessionCacheServer/ObjectCacheServerEntry.cs
+++ b/Plugins/SessionCacheServer/ObjectCacheServerEntry.cs
@@ -29,7 +29,6 @@ using System.Linq;
using System.Net.Http;
using System.Text.Json;
using System.Threading;
-using System.Diagnostics;
using System.Net.Sockets;
using System.Threading.Tasks;
using System.Collections.Generic;
@@ -50,6 +49,7 @@ using VNLib.Plugins.Cache.Broker.Endpoints;
using VNLib.Plugins.Extensions.Loading;
using VNLib.Plugins.Extensions.Loading.Routing;
using VNLib.Plugins.Essentials.Sessions.Server.Endpoints;
+using VNLib.Utils.Memory.Caching;
namespace VNLib.Plugins.Essentials.Sessions.Server
{
@@ -72,16 +72,16 @@ namespace VNLib.Plugins.Essentials.Sessions.Server
string swapDir = PluginConfig.GetProperty("swap_dir").GetString() ?? throw new KeyNotFoundException("Missing required key 'swap_dir' for config");
int cacheSize = PluginConfig.GetProperty("max_cache").GetInt32();
string connectPath = PluginConfig.GetProperty("connect_path").GetString() ?? throw new KeyNotFoundException("Missing required element 'connect_path' for config 'cluster'");
- TimeSpan cleanupInterval = PluginConfig.GetProperty("cleanup_interval_sec").GetTimeSpan(TimeParseType.Seconds);
- TimeSpan validFor = PluginConfig.GetProperty("valid_for_sec").GetTimeSpan(TimeParseType.Seconds);
+ //TimeSpan cleanupInterval = PluginConfig.GetProperty("cleanup_interval_sec").GetTimeSpan(TimeParseType.Seconds);
+ //TimeSpan validFor = PluginConfig.GetProperty("valid_for_sec").GetTimeSpan(TimeParseType.Seconds);
int maxMessageSize = PluginConfig.GetProperty("max_blob_size").GetInt32();
- TimeSpan initialCleanupDelay = TimeSpan.FromSeconds(2);
//Init dir
DirectoryInfo dir = new(swapDir);
dir.Create();
//Init cache listener, single threaded reader
ObjectCacheStore CacheListener = new(dir, cacheSize, Log, CacheHeap, true);
+
//Init connect endpoint
{
//Init connect endpoint
@@ -96,19 +96,20 @@ namespace VNLib.Plugins.Essentials.Sessions.Server
//Route the broker endpoint
BrokerHeartBeat brokerEp = new(() => BrokerHeartBeatToken!, mre, new Uri(brokerAddress), this);
Route(brokerEp);
+
//start registration
- _ = RegisterServerAsync(mre)
- .ConfigureAwait(false);
+ _ = this.DeferTask(() => RegisterServerAsync(mre), 200);
}
+
//Setup cluster worker
{
//Get pre-configured fbm client config for caching
FBMClientConfig conf = FBMDataCacheExtensions.GetDefaultConfig(CacheHeap, maxMessageSize, this.IsDebug() ? Log : null);
//Start Client runner
- _ = RunClientAsync(CacheListener, new Uri(brokerAddress), conf)
- .ConfigureAwait(false);
+ _ = this.DeferTask(() => RunClientAsync(CacheListener, new Uri(brokerAddress), conf), 300);
}
+
//Load a cache broker to the current server if the config is defined
{
if(this.HasConfigForType<BrokerRegistrationEndpoint>())
@@ -116,13 +117,10 @@ namespace VNLib.Plugins.Essentials.Sessions.Server
this.Route<BrokerRegistrationEndpoint>();
}
}
- //Init timer and fire immediatly to cleanup
- Timer CleanupTimer = new((object? state) => OnCleanupElapsed(state, validFor), CacheListener, initialCleanupDelay, cleanupInterval);
-
+
void Cleanup()
{
CacheHeap.Dispose();
- CleanupTimer.Dispose();
CacheListener.Dispose();
}
@@ -148,24 +146,6 @@ namespace VNLib.Plugins.Essentials.Sessions.Server
Log.Information("Plugin unloaded");
}
- private void OnCleanupElapsed(object? state, TimeSpan validFor)
- {
- try
- {
- ObjectCacheStore listener = state as ObjectCacheStore;
- Stopwatch sw = new();
- sw.Start();
- //Cleanup
- //await listener.CleanupExpiredAsync(validFor);
- sw.Stop();
- Log.Debug("Expired cache records cleaned in {ms} ms", sw.Elapsed.TotalMilliseconds);
- }
- catch (Exception ex)
- {
- Log.Error(ex);
- }
- }
-
#region Registration
private async Task RegisterServerAsync(ManualResetEvent keepaliveWait)
@@ -175,12 +155,12 @@ namespace VNLib.Plugins.Essentials.Sessions.Server
//Get the broker config element
IReadOnlyDictionary<string, JsonElement> clusterConfig = this.GetConfig("cluster");
- Uri brokerAddress = new(clusterConfig["broker_address"].GetString());
+
//Server id is just dns name for now
string serverId = Dns.GetHostName();
int heartBeatDelayMs = clusterConfig["heartbeat_timeout_sec"].GetInt32() * 1000;
- string connectPath = PluginConfig.GetProperty("connect_path").GetString();
+ string? connectPath = PluginConfig.GetProperty("connect_path").GetString();
//Get the port of the primary webserver
int port;
@@ -195,14 +175,27 @@ namespace VNLib.Plugins.Essentials.Sessions.Server
//If a certificate is specified, tls is enabled on the port
usingTls = firstHost.TryGetProperty("cert", out _);
}
-
- //Try to get the cache private key
- string base64Priv = await this.TryGetSecretAsync("cache_private_key") ?? throw new KeyNotFoundException("Failed to load the cache private key");
-
- byte[] privKey = Convert.FromBase64String(base64Priv);
- //Init url builder for payload, see if tls is enabled
- Uri connectAddress = new UriBuilder(usingTls ? Uri.UriSchemeHttps : Uri.UriSchemeHttp, Dns.GetHostName(), port, connectPath).Uri;
+ using BrokerRegistrationRequest request = new();
+ {
+ string addr = clusterConfig["broker_address"].GetString() ?? throw new KeyNotFoundException("Missing required key 'broker_address' for config 'cluster'");
+
+ //Try to get the cache private key
+ string base64Priv = await this.TryGetSecretAsync("cache_private_key") ?? throw new KeyNotFoundException("Failed to load the cache private key");
+
+ ReadOnlyMemory<byte> privKey = Convert.FromBase64String(base64Priv);
+
+ //Init url builder for payload, see if tls is enabled
+ Uri connectAddress = new UriBuilder(usingTls ? Uri.UriSchemeHttps : Uri.UriSchemeHttp, Dns.GetHostName(), port, connectPath).Uri;
+
+ request.WithBroker(new(addr))
+ .WithRegistrationAddress(connectAddress.ToString())
+ .WithNodeId(serverId)
+ .WithPrivateKey(privKey.Span);
+ //Wipe memory
+ Memory.UnsafeZeroMemory<char>(base64Priv);
+ Memory.UnsafeZeroMemory(privKey);
+ }
while (true)
{
@@ -210,11 +203,13 @@ namespace VNLib.Plugins.Essentials.Sessions.Server
{
//Gen a random reg token before registering
BrokerHeartBeatToken = RandomHash.GetRandomHex(32);
+ //Assign new hb token
+ request.WithHeartbeatToken(BrokerHeartBeatToken);
- Log.Information("Registering with cache broker {addr}, with node-id {id}", brokerAddress, serverId);
+ Log.Information("Registering with cache broker {addr}, with node-id {id}", request.BrokerAddress, serverId);
//Register with the broker
- await FBMDataCacheExtensions.ResgisterWithBrokerAsync(brokerAddress, privKey, connectAddress.ToString(), serverId, BrokerHeartBeatToken);
+ await FBMDataCacheExtensions.ResgisterWithBrokerAsync(request);
Log.Debug("Successfully registered with cache broker");
@@ -357,15 +352,12 @@ namespace VNLib.Plugins.Essentials.Sessions.Server
await Task.Delay(noServerDelay, UnloadToken);
continue;
}
+
//Select servers that are not the current server and are not already being monitored
- IEnumerable<ActiveServer> serversToConnectTo = from s in
- (from ss in servers
- where ss.ServerId != nodeId
- select ss)
- where !ActiveServers.ContainsKey(s.ServerId)
- select s;
+ IEnumerable<ActiveServer> serversToConnectTo = servers.Where(s => s.ServerId != nodeId)
+ .Where(s => !ActiveServers.ContainsKey(s.ServerId!));
//Connect to servers
- foreach(ActiveServer server in serversToConnectTo)
+ foreach (ActiveServer server in serversToConnectTo)
{
_ = RunSyncTaskAsync(server, ActiveServers, cacheStore, clientConf, clientPrivKey, nodeId)
.ConfigureAwait(false);
@@ -439,10 +431,16 @@ namespace VNLib.Plugins.Essentials.Sessions.Server
client.ReturnRequest(modRequest);
}
}
-
- string challenge = RandomHash.GetRandomBase64(24);
+
+ //Configure cache
+ client.GetCacheConfiguration()
+ .ImportVerificationKey(privateKey.Span)
+ .ImportVerificationKey(null)
+ .WithNodeId(nodeId) //set nodeid since were listening for changes
+ .WithTls(false);
+
//Connect to the server
- await client.ConnectAsync(server.HostName, privateKey, challenge, nodeId, false, UnloadToken);
+ await client.ConnectToCacheAsync(server, UnloadToken);
//Wroker task callback method
async Task BgWorkerAsync()