diff options
Diffstat (limited to 'plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs')
-rw-r--r-- | plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs | 540 |
1 files changed, 63 insertions, 477 deletions
diff --git a/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs b/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs index 2fa6220..c1a6ad2 100644 --- a/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs +++ b/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs @@ -27,7 +27,6 @@ using System.IO; using System.Net; using System.Linq; using System.Net.Http; -using System.Text.Json; using System.Threading; using System.Net.Sockets; using System.Threading.Tasks; @@ -38,7 +37,6 @@ using VNLib.Plugins; using VNLib.Utils.Memory; using VNLib.Utils.Logging; using VNLib.Utils.Memory.Diagnostics; -using VNLib.Hashing; using VNLib.Hashing.IdentityUtility; using VNLib.Data.Caching.Extensions; using static VNLib.Data.Caching.Constants; @@ -47,18 +45,64 @@ using VNLib.Net.Messaging.FBM.Client; using VNLib.Plugins.Cache.Broker.Endpoints; using VNLib.Plugins.Extensions.Loading; using VNLib.Plugins.Extensions.Loading.Routing; - +using VNLib.Data.Caching.ObjectCache.Server.Endpoints; namespace VNLib.Data.Caching.ObjectCache.Server { + sealed record class CacheAuthKeyStore(PluginBase Plugin) + { + public Task<ReadOnlyJsonWebKey> GetCachePublicAsync() + { + return Plugin.TryGetSecretAsync("cache_private_key").ToJsonWebKey(true); + } + + public Task<ReadOnlyJsonWebKey> GetCachePrivateAsync() + { + return Plugin.TryGetSecretAsync("cache_private_key").ToJsonWebKey(true); + } + + public Task<ReadOnlyJsonWebKey> GetBrokerPublicAsync() + { + return Plugin.TryGetSecretAsync("broker_public_key").ToJsonWebKey(true); + } + + public Task<ReadOnlyJsonWebKey> GetClientPublicKeyAsync() + { + return Plugin.TryGetSecretAsync("client_public_key").ToJsonWebKey(true); + } + } + + internal interface IBrokerHeartbeatNotifier + { + /// <summary> + /// Called when the heartbeat endpoint receives a heartbeat from the broker + /// </summary> + void HearbeatReceived(); + + /// <summary> + /// Gets the current auth token sent to the broker, which is expected to be sent back in the heartbeat + /// </summary> + /// <returns>The heartbeat auth token if set</returns> + string? GetAuthToken(); + + /// <summary> + /// Gets the address of the broker server + /// </summary> + /// <returns>The full address of the broker server to connect to</returns> + Uri GetBrokerAddress(); + + /// <summary> + /// Gets the public key of the broker server + /// </summary> + /// <returns>The broker's public key</returns> + ReadOnlyJsonWebKey GetBrokerPublicKey(); + } + public sealed class ObjectCacheServerEntry : PluginBase { public override string PluginName => "ObjectCache.Service"; - private readonly Lazy<IUnmangedHeap> _cacheHeap; - private readonly object ServerLock; - private readonly HashSet<ActiveServer> ListeningServers; - private readonly ManualResetEvent BrokerSyncHandle; + private readonly Lazy<IUnmangedHeap> _cacheHeap; /// <summary> /// Gets the shared heap for the plugin @@ -69,12 +113,6 @@ namespace VNLib.Data.Caching.ObjectCache.Server { //Init heap _cacheHeap = new Lazy<IUnmangedHeap>(InitializeHeap, LazyThreadSafetyMode.PublicationOnly); - - ServerLock = new(); - ListeningServers = new(); - - //Set sync handle - BrokerSyncHandle = new(false); } private IUnmangedHeap InitializeHeap() @@ -94,64 +132,30 @@ namespace VNLib.Data.Caching.ObjectCache.Server } - private string? BrokerHeartBeatToken; - - private void RemoveServer(ActiveServer server) - { - lock (ServerLock) - { - ListeningServers.Remove(server); - } - } - - private FBMClientConfig ClientConfig; - - protected override void OnLoad() { try { - IConfigScope clusterConf = this.GetConfig("cluster"); - - Uri brokerAddress = new(clusterConf["broker_address"].GetString() ?? throw new KeyNotFoundException("Missing required key 'broker_address' for config 'cluster'")); + //Setup Node config + NodeConfig nodeConf = this.GetOrCreateSingleton<NodeConfig>(); //Init connect endpoint ConnectEndpoint endpoint = this.Route<ConnectEndpoint>(); - //Get the cache store from the connection endpoint - ICacheStore store = endpoint.GetCacheStore(); + //Route the broker endpoint + this.Route<BrokerHeartBeatEndpoint>(); + + //Setup discovery endpoint + if(this.HasConfigForType<PeerDiscoveryEndpoint>()) + { + this.Route<PeerDiscoveryEndpoint>(); + } ulong maxByteSize = ((ulong)endpoint.CacheConfig.MaxCacheEntries * (ulong)endpoint.CacheConfig.BucketCount * (ulong)endpoint.CacheConfig.MaxMessageSize); //Log max memory usage Log.Debug("Maxium memory consumption {mx}Mb", maxByteSize / (ulong)(1024 * 1000)); - - //Setup broker and regitration - { - //Route the broker endpoint - BrokerHeartBeat brokerEp = new(() => BrokerHeartBeatToken!, BrokerSyncHandle, brokerAddress, this); - Route(brokerEp); - - //start registration - _ = this.ObserveWork(() => RegisterServerAsync(endpoint.Path), 200); - } - - //Setup cluster worker - { - //Get pre-configured fbm client config for caching - ClientConfig = FBMDataCacheExtensions.GetDefaultConfig(CacheHeap, endpoint.CacheConfig.MaxMessageSize / 2, default, this.IsDebug() ? Log : null); - - //Start Client runner - _ = this.ObserveWork(() => RunClientAsync(store, brokerAddress), 300); - } - - //Load a cache broker to the current server if the config is defined - { - if(this.HasConfigForType<BrokerRegistrationEndpoint>()) - { - this.Route<BrokerRegistrationEndpoint>(); - } - } + Log.Information("Plugin loaded"); } @@ -169,430 +173,12 @@ namespace VNLib.Data.Caching.ObjectCache.Server _cacheHeap.Value.Dispose(); } - //Dispose mre sync handle - BrokerSyncHandle.Dispose(); - Log.Information("Plugin unloaded"); } - #region Registration - - private async Task RegisterServerAsync(string connectPath) - { - try - { - //Get the broker config element - IConfigScope clusterConfig = this.GetConfig("cluster"); - - //Server id is just dns name for now - string serverId = Dns.GetHostName(); - - int heartBeatDelayMs = clusterConfig["heartbeat_timeout_sec"].GetInt32() * 1000; - - - //Get the port of the primary webserver - int port; - bool usingTls; - { - JsonElement firstHost = HostConfig.GetProperty("virtual_hosts").EnumerateArray().First(); - - port = firstHost.GetProperty("interface") - .GetProperty("port") - .GetInt32(); - - //If a certificate is specified, tls is enabled on the port - usingTls = firstHost.TryGetProperty("cert", out _); - } - - using BrokerRegistrationRequest request = new(); - { - string addr = clusterConfig["broker_address"].GetString() ?? throw new KeyNotFoundException("Missing required key 'broker_address' for config 'cluster'"); - - //Recover the certificate - ReadOnlyJsonWebKey cacheCert = await GetCachePrivate(); - - //Init url builder for payload, see if tls is enabled - Uri connectAddress = new UriBuilder(usingTls ? Uri.UriSchemeHttps : Uri.UriSchemeHttp, Dns.GetHostName(), port, connectPath).Uri; - - request.WithBroker(new(addr)) - .WithRegistrationAddress(connectAddress.ToString()) - .WithNodeId(serverId) - .WithSigningKey(cacheCert, true); - } - - while (true) - { - try - { - //Gen a random reg token before registering - BrokerHeartBeatToken = RandomHash.GetRandomHex(32); - - //Assign new hb token - request.WithHeartbeatToken(BrokerHeartBeatToken); - - Log.Information("Registering with cache broker {addr}, with node-id {id}", request.BrokerAddress, serverId); - - //Register with the broker - await FBMDataCacheExtensions.ResgisterWithBrokerAsync(request); - - Log.Debug("Successfully registered with cache broker"); - - /* - * Wait in a loop for the broker to send a keepalive - * request with the specified token. When the event - * is signaled the task will be completed - */ - while (true) - { - await Task.Delay(heartBeatDelayMs, UnloadToken); - - //Set the timeout to 0 to it will just check the status without blocking - if (!BrokerSyncHandle.WaitOne(0)) - { - //server miseed a keepalive event, time to break the loop and retry - Log.Debug("Broker missed a heartbeat request, attempting to re-register"); - break; - } - - //Reset the msr - BrokerSyncHandle.Reset(); - } - } - catch (TaskCanceledException) - { - throw; - } - catch (TimeoutException) - { - Log.Warn("Failed to connect to cache broker server within the specified timeout period"); - } - catch (HttpRequestException re) when (re.InnerException is SocketException) - { - Log.Warn("Cache broker is unavailable or network is unavailable"); - } - catch (Exception ex) - { - Log.Warn(ex, "Failed to update broker registration"); - } - - //Gen random ms delay - int randomMsDelay = RandomNumberGenerator.GetInt32(500, 2000); - //Delay - await Task.Delay(randomMsDelay, UnloadToken); - } - } - catch (KeyNotFoundException kne) - { - Log.Error("Missing required broker configuration variables {ke}", kne.Message); - } - catch (TaskCanceledException) - { - //Normal unload/exit - } - catch (Exception ex) - { - Log.Error(ex); - } - finally - { - BrokerHeartBeatToken = null; - } - Log.Debug("Registration worker exited"); - } - - #endregion - - #region Cluster - - private async Task<ReadOnlyJsonWebKey> GetCachePrivate() - { - return await this.TryGetSecretAsync("cache_private_key").ToJsonWebKey() ?? throw new KeyNotFoundException("Failed to load the cache private key"); - } - - private async Task<ReadOnlyJsonWebKey> GetBrokerPublic() - { - return await this.TryGetSecretAsync("broker_public_key").ToJsonWebKey() ?? throw new KeyNotFoundException("Failed to load the broker's public key"); - } - - - /// <summary> - /// Starts a self-contained process-long task to discover other cache servers - /// from a shared broker server - /// </summary> - /// <param name="cacheStore">The cache store to synchronize</param> - /// <param name="brokerAddress">The broker server's address</param> - /// <param name="serverId">The node-id of the current server</param> - /// <param name="clientConf">The configuration to use when initializing synchronization clients</param> - /// <returns>A task that resolves when the plugin unloads</returns> - private async Task RunClientAsync(ICacheStore cacheStore, Uri brokerAddress) - { - TimeSpan noServerDelay = TimeSpan.FromSeconds(10); - - //The node id is just the dns hostname of the current machine - string nodeId = Dns.GetHostName(); - - ListServerRequest listRequest = new(brokerAddress); - try - { - //Get the broker config element - IConfigScope clusterConf = this.GetConfig("cluster"); - - int serverCheckMs = clusterConf["update_interval_sec"].GetInt32() * 1000; - - //Setup signing and verification certificates - ReadOnlyJsonWebKey cacheSig = await GetCachePrivate(); - ReadOnlyJsonWebKey brokerPub = await GetBrokerPublic(); - - //Import certificates - listRequest.WithVerificationKey(brokerPub) - .WithSigningKey(cacheSig); - - //Main event loop - Log.Information("Begining cluster node discovery"); - - ILogProvider? debugLog = this.IsDebug() ? Log : null; - - while (true) - { - //Load the server list - ActiveServer[]? servers; - while (true) - { - try - { - debugLog?.Information("[CACHE] Requesting server list from broker"); - - //Get server list - servers = await FBMDataCacheExtensions.ListServersAsync(listRequest, UnloadToken); - - //Servers are loaded, so continue - break; - } - catch(HttpRequestException he) when(he.InnerException is SocketException) - { - Log.Warn("Failed to connect to cache broker, trying again"); - } - catch (TimeoutException) - { - Log.Warn("Failed to connect to cache broker server within the specified timeout period"); - } - catch (Exception ex) - { - Log.Warn(ex, "Failed to get server list from broker"); - } - - //Gen random ms delay - int randomMsDelay = RandomNumberGenerator.GetInt32(1000, 2000); - - //Delay - await Task.Delay(randomMsDelay, UnloadToken); - } - - if(servers == null || servers.Length == 0) - { - Log.Information("No cluster nodes found, retrying"); - //Delay - await Task.Delay(noServerDelay, UnloadToken); - continue; - } - - - //Lock on sever set while enumerating - lock (ServerLock) - { - //Select servers that are not the current server and are not already being monitored - IEnumerable<ActiveServer> serversToConnectTo = servers.Where(s => !nodeId.Equals(s.ServerId, StringComparison.OrdinalIgnoreCase)); - - //Connect to servers - foreach (ActiveServer server in serversToConnectTo) - { - //Make sure were not currently connected to the server - if (!ListeningServers.Contains(server)) - { - //Add the server to the set - ListeningServers.Add(server); - - //Run listener background task - _ = this.ObserveWork(() => RunSyncTaskAsync(server, cacheStore, nodeId)); - } - } - } - - //Delay until next check cycle - await Task.Delay(serverCheckMs, UnloadToken); - } - } - catch (FileNotFoundException) - { - Log.Error("Client/cluster private cluster key file was not found or could not be read"); - } - catch (KeyNotFoundException) - { - Log.Error("Missing required cluster configuration varables"); - } - catch (TaskCanceledException) - { - //normal exit/unload - } - catch (Exception ex) - { - Log.Error(ex); - } - finally - { - listRequest.Dispose(); - } - Log.Debug("Cluster sync worker exited"); - } - - private async Task RunSyncTaskAsync(ActiveServer server, ICacheStore cacheStore, string nodeId) - { - //Setup timeout for get operations to avoid deadlocks - TimeSpan getTimeout = TimeSpan.FromSeconds(30); - - //Setup client - FBMClient client = new(ClientConfig); - try - { - async Task UpdateRecordAsync(string objectId, string newId) - { - //Get request message - FBMRequest modRequest = client.RentRequest(); - try - { - //Set action as get/create - modRequest.WriteHeader(HeaderCommand.Action, Actions.Get); - //Set session-id header - modRequest.WriteHeader(ObjectId, string.IsNullOrWhiteSpace(newId) ? objectId : newId); - - //Make request - using FBMResponse response = await client.SendAsync(modRequest, getTimeout, UnloadToken); - - response.ThrowIfNotSet(); - - //Check response code - string status = response.Headers.First(static s => s.Header == HeaderCommand.Status).Value.ToString(); - if (ResponseCodes.Okay.Equals(status, StringComparison.Ordinal)) - { - //Update the record - await cacheStore.AddOrUpdateBlobAsync(objectId, newId, static (t) => t.ResponseBody, response); - Log.Debug("Updated object {id}", objectId); - } - else - { - Log.Warn("Object {id} was missing on the remote server", objectId); - } - } - finally - { - client.ReturnRequest(modRequest); - } - } - - { - //Sign and verify requests with the cache private key since we are a peer - ReadOnlyJsonWebKey cachePriv = await GetCachePrivate(); - - //Configure cache - client.GetCacheConfiguration() - .WithVerificationKey(cachePriv) - .WithSigningCertificate(cachePriv) - .WithNodeId(nodeId) //set nodeid since were listening for changes - .WithTls(false); - } - - Log.Information("Connecting to {server}...", server.ServerId); - - //Connect to the server - await client.ConnectToCacheAsync(server, UnloadToken); - - //Wroker task callback method - async Task BgWorkerAsync() - { - //Listen for changes - while (true) - { - //Wait for changes - WaitForChangeResult changedObject = await client.WaitForChangeAsync(UnloadToken); - - Log.Debug("Object changed {typ} {obj}", changedObject.Status, changedObject.CurrentId); - - switch (changedObject.Status) - { - case ResponseCodes.NotFound: - Log.Warn("Server cache not properly configured, worker exiting"); - return; - case "deleted": - //Delete the object from the store - await cacheStore.DeleteItemAsync(changedObject.CurrentId); - break; - case "modified": - //Reload the record from the store - await UpdateRecordAsync(changedObject.CurrentId, changedObject.NewId); - break; - } - } - } - - Log.Information("Connected to {server}, starting queue listeners", server.ServerId); - - //Start worker tasks - List<Task> workerTasks = new(); - for(int i = 0; i < Environment.ProcessorCount; i++) - { - workerTasks.Add(Task.Run(BgWorkerAsync)); - } - - //Wait for sync workers to exit - await Task.WhenAll(workerTasks); - } - catch (InvalidResponseException ie) - { - //See if the plugin is unloading - if (!UnloadToken.IsCancellationRequested) - { - Log.Debug("Server responded with invalid response packet, disconnected. reason {reason}", ie); - } - //Disconnect client gracefully - try - { - await client.DisconnectAsync(); - } - catch (Exception ex) - { - Log.Error(ex); - } - } - catch (OperationCanceledException) - { - //Plugin unloading, Try to disconnect - try - { - await client.DisconnectAsync(); - } - catch(Exception ex) - { - Log.Error(ex); - } - } - catch(Exception ex) - { - Log.Warn("Lost connection to server {h}, {m}", server.ServerId, ex); - } - finally - { - //Remove server from active list, since its been disconnected - RemoveServer(server); - client.Dispose(); - } - } - protected override void ProcessHostCommand(string cmd) { - Log.Debug(cmd); + throw new NotImplementedException(); } - - - #endregion } } |