diff options
Diffstat (limited to 'Plugins/SessionCacheServer/ObjectCacheServerEntry.cs')
-rw-r--r-- | Plugins/SessionCacheServer/ObjectCacheServerEntry.cs | 100 |
1 files changed, 49 insertions, 51 deletions
diff --git a/Plugins/SessionCacheServer/ObjectCacheServerEntry.cs b/Plugins/SessionCacheServer/ObjectCacheServerEntry.cs index 6c969ba..17d8ba5 100644 --- a/Plugins/SessionCacheServer/ObjectCacheServerEntry.cs +++ b/Plugins/SessionCacheServer/ObjectCacheServerEntry.cs @@ -29,7 +29,6 @@ using System.Linq; using System.Net.Http; using System.Text.Json; using System.Threading; -using System.Diagnostics; using System.Net.Sockets; using System.Threading.Tasks; using System.Collections.Generic; @@ -50,6 +49,7 @@ using VNLib.Plugins.Cache.Broker.Endpoints; using VNLib.Plugins.Extensions.Loading; using VNLib.Plugins.Extensions.Loading.Routing; using VNLib.Plugins.Essentials.Sessions.Server.Endpoints; +using VNLib.Utils.Memory.Caching; namespace VNLib.Plugins.Essentials.Sessions.Server { @@ -72,16 +72,16 @@ namespace VNLib.Plugins.Essentials.Sessions.Server string swapDir = PluginConfig.GetProperty("swap_dir").GetString() ?? throw new KeyNotFoundException("Missing required key 'swap_dir' for config"); int cacheSize = PluginConfig.GetProperty("max_cache").GetInt32(); string connectPath = PluginConfig.GetProperty("connect_path").GetString() ?? throw new KeyNotFoundException("Missing required element 'connect_path' for config 'cluster'"); - TimeSpan cleanupInterval = PluginConfig.GetProperty("cleanup_interval_sec").GetTimeSpan(TimeParseType.Seconds); - TimeSpan validFor = PluginConfig.GetProperty("valid_for_sec").GetTimeSpan(TimeParseType.Seconds); + //TimeSpan cleanupInterval = PluginConfig.GetProperty("cleanup_interval_sec").GetTimeSpan(TimeParseType.Seconds); + //TimeSpan validFor = PluginConfig.GetProperty("valid_for_sec").GetTimeSpan(TimeParseType.Seconds); int maxMessageSize = PluginConfig.GetProperty("max_blob_size").GetInt32(); - TimeSpan initialCleanupDelay = TimeSpan.FromSeconds(2); //Init dir DirectoryInfo dir = new(swapDir); dir.Create(); //Init cache listener, single threaded reader ObjectCacheStore CacheListener = new(dir, cacheSize, Log, CacheHeap, true); + //Init connect endpoint { //Init connect endpoint @@ -96,19 +96,20 @@ namespace VNLib.Plugins.Essentials.Sessions.Server //Route the broker endpoint BrokerHeartBeat brokerEp = new(() => BrokerHeartBeatToken!, mre, new Uri(brokerAddress), this); Route(brokerEp); + //start registration - _ = RegisterServerAsync(mre) - .ConfigureAwait(false); + _ = this.DeferTask(() => RegisterServerAsync(mre), 200); } + //Setup cluster worker { //Get pre-configured fbm client config for caching FBMClientConfig conf = FBMDataCacheExtensions.GetDefaultConfig(CacheHeap, maxMessageSize, this.IsDebug() ? Log : null); //Start Client runner - _ = RunClientAsync(CacheListener, new Uri(brokerAddress), conf) - .ConfigureAwait(false); + _ = this.DeferTask(() => RunClientAsync(CacheListener, new Uri(brokerAddress), conf), 300); } + //Load a cache broker to the current server if the config is defined { if(this.HasConfigForType<BrokerRegistrationEndpoint>()) @@ -116,13 +117,10 @@ namespace VNLib.Plugins.Essentials.Sessions.Server this.Route<BrokerRegistrationEndpoint>(); } } - //Init timer and fire immediatly to cleanup - Timer CleanupTimer = new((object? state) => OnCleanupElapsed(state, validFor), CacheListener, initialCleanupDelay, cleanupInterval); - + void Cleanup() { CacheHeap.Dispose(); - CleanupTimer.Dispose(); CacheListener.Dispose(); } @@ -148,24 +146,6 @@ namespace VNLib.Plugins.Essentials.Sessions.Server Log.Information("Plugin unloaded"); } - private void OnCleanupElapsed(object? state, TimeSpan validFor) - { - try - { - ObjectCacheStore listener = state as ObjectCacheStore; - Stopwatch sw = new(); - sw.Start(); - //Cleanup - //await listener.CleanupExpiredAsync(validFor); - sw.Stop(); - Log.Debug("Expired cache records cleaned in {ms} ms", sw.Elapsed.TotalMilliseconds); - } - catch (Exception ex) - { - Log.Error(ex); - } - } - #region Registration private async Task RegisterServerAsync(ManualResetEvent keepaliveWait) @@ -175,12 +155,12 @@ namespace VNLib.Plugins.Essentials.Sessions.Server //Get the broker config element IReadOnlyDictionary<string, JsonElement> clusterConfig = this.GetConfig("cluster"); - Uri brokerAddress = new(clusterConfig["broker_address"].GetString()); + //Server id is just dns name for now string serverId = Dns.GetHostName(); int heartBeatDelayMs = clusterConfig["heartbeat_timeout_sec"].GetInt32() * 1000; - string connectPath = PluginConfig.GetProperty("connect_path").GetString(); + string? connectPath = PluginConfig.GetProperty("connect_path").GetString(); //Get the port of the primary webserver int port; @@ -195,14 +175,27 @@ namespace VNLib.Plugins.Essentials.Sessions.Server //If a certificate is specified, tls is enabled on the port usingTls = firstHost.TryGetProperty("cert", out _); } - - //Try to get the cache private key - string base64Priv = await this.TryGetSecretAsync("cache_private_key") ?? throw new KeyNotFoundException("Failed to load the cache private key"); - - byte[] privKey = Convert.FromBase64String(base64Priv); - //Init url builder for payload, see if tls is enabled - Uri connectAddress = new UriBuilder(usingTls ? Uri.UriSchemeHttps : Uri.UriSchemeHttp, Dns.GetHostName(), port, connectPath).Uri; + using BrokerRegistrationRequest request = new(); + { + string addr = clusterConfig["broker_address"].GetString() ?? throw new KeyNotFoundException("Missing required key 'broker_address' for config 'cluster'"); + + //Try to get the cache private key + string base64Priv = await this.TryGetSecretAsync("cache_private_key") ?? throw new KeyNotFoundException("Failed to load the cache private key"); + + ReadOnlyMemory<byte> privKey = Convert.FromBase64String(base64Priv); + + //Init url builder for payload, see if tls is enabled + Uri connectAddress = new UriBuilder(usingTls ? Uri.UriSchemeHttps : Uri.UriSchemeHttp, Dns.GetHostName(), port, connectPath).Uri; + + request.WithBroker(new(addr)) + .WithRegistrationAddress(connectAddress.ToString()) + .WithNodeId(serverId) + .WithPrivateKey(privKey.Span); + //Wipe memory + Memory.UnsafeZeroMemory<char>(base64Priv); + Memory.UnsafeZeroMemory(privKey); + } while (true) { @@ -210,11 +203,13 @@ namespace VNLib.Plugins.Essentials.Sessions.Server { //Gen a random reg token before registering BrokerHeartBeatToken = RandomHash.GetRandomHex(32); + //Assign new hb token + request.WithHeartbeatToken(BrokerHeartBeatToken); - Log.Information("Registering with cache broker {addr}, with node-id {id}", brokerAddress, serverId); + Log.Information("Registering with cache broker {addr}, with node-id {id}", request.BrokerAddress, serverId); //Register with the broker - await FBMDataCacheExtensions.ResgisterWithBrokerAsync(brokerAddress, privKey, connectAddress.ToString(), serverId, BrokerHeartBeatToken); + await FBMDataCacheExtensions.ResgisterWithBrokerAsync(request); Log.Debug("Successfully registered with cache broker"); @@ -357,15 +352,12 @@ namespace VNLib.Plugins.Essentials.Sessions.Server await Task.Delay(noServerDelay, UnloadToken); continue; } + //Select servers that are not the current server and are not already being monitored - IEnumerable<ActiveServer> serversToConnectTo = from s in - (from ss in servers - where ss.ServerId != nodeId - select ss) - where !ActiveServers.ContainsKey(s.ServerId) - select s; + IEnumerable<ActiveServer> serversToConnectTo = servers.Where(s => s.ServerId != nodeId) + .Where(s => !ActiveServers.ContainsKey(s.ServerId!)); //Connect to servers - foreach(ActiveServer server in serversToConnectTo) + foreach (ActiveServer server in serversToConnectTo) { _ = RunSyncTaskAsync(server, ActiveServers, cacheStore, clientConf, clientPrivKey, nodeId) .ConfigureAwait(false); @@ -439,10 +431,16 @@ namespace VNLib.Plugins.Essentials.Sessions.Server client.ReturnRequest(modRequest); } } - - string challenge = RandomHash.GetRandomBase64(24); + + //Configure cache + client.GetCacheConfiguration() + .ImportVerificationKey(privateKey.Span) + .ImportVerificationKey(null) + .WithNodeId(nodeId) //set nodeid since were listening for changes + .WithTls(false); + //Connect to the server - await client.ConnectAsync(server.HostName, privateKey, challenge, nodeId, false, UnloadToken); + await client.ConnectToCacheAsync(server, UnloadToken); //Wroker task callback method async Task BgWorkerAsync() |