aboutsummaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorLibravatar vnugent <public@vaughnnugent.com>2023-07-15 13:06:00 -0400
committerLibravatar vnugent <public@vaughnnugent.com>2023-07-15 13:06:00 -0400
commit8b4fb26473256da5eaa89f3e9d2ac5d44f1e9b88 (patch)
tree6ff979b6110b9e6c61ff9f22bb0dbdd2094e08cf /lib
parent2f674e79d42e7d36225fa9ac7ecefbc5bc62d325 (diff)
Latest working draft
Diffstat (limited to 'lib')
-rw-r--r--lib/VNLib.Data.Caching.Extensions/src/ApiModel/ServiceEndpoints.cs9
-rw-r--r--lib/VNLib.Data.Caching.ObjectCache/src/BlobCacheLIstener.cs6
-rw-r--r--lib/VNLib.Data.Caching.ObjectCache/src/ICacheListenerEventQueue.cs3
-rw-r--r--lib/VNLib.Plugins.Extensions.VNCache/src/Clustering/ClusterNodeIndex.cs75
-rw-r--r--lib/VNLib.Plugins.Extensions.VNCache/src/Clustering/IClusterNodeIndex.cs49
-rw-r--r--lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs94
-rw-r--r--lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClientConfig.cs12
7 files changed, 205 insertions, 43 deletions
diff --git a/lib/VNLib.Data.Caching.Extensions/src/ApiModel/ServiceEndpoints.cs b/lib/VNLib.Data.Caching.Extensions/src/ApiModel/ServiceEndpoints.cs
index c0de4a3..413bd3c 100644
--- a/lib/VNLib.Data.Caching.Extensions/src/ApiModel/ServiceEndpoints.cs
+++ b/lib/VNLib.Data.Caching.Extensions/src/ApiModel/ServiceEndpoints.cs
@@ -89,6 +89,15 @@ namespace VNLib.Data.Caching.Extensions.ApiModel
}
}
+ /*
+ * These methods define consitend endpoint definitions that must match the server
+ * endpoints.
+ *
+ * Most requests will send an authorization header with a signed JWT that includes
+ * a random challenge that the server will return in the response.
+ *
+ * All responses will be signed JWTs that must be verified before continuing
+ */
private static string BuildDiscoveryAuthToken(ICacheConnectionRequest request)
{
diff --git a/lib/VNLib.Data.Caching.ObjectCache/src/BlobCacheLIstener.cs b/lib/VNLib.Data.Caching.ObjectCache/src/BlobCacheLIstener.cs
index 0fe0663..d69c6bb 100644
--- a/lib/VNLib.Data.Caching.ObjectCache/src/BlobCacheLIstener.cs
+++ b/lib/VNLib.Data.Caching.ObjectCache/src/BlobCacheLIstener.cs
@@ -160,21 +160,21 @@ namespace VNLib.Data.Caching.ObjectCache
}
//Determine if the queue is enabled for the user
- if(!EventQueue.IsEnabled(userState!))
+ if(!EventQueue.IsEnabled(userState))
{
context.CloseResponse(ResponseCodes.NotFound);
return;
}
//try to deq without awaiting
- if (EventQueue.TryDequeue(userState!, out ChangeEvent? change))
+ if (EventQueue.TryDequeue(userState, out ChangeEvent? change))
{
SetResponse(change, context);
}
else
{
//Wait for a new message to process
- ChangeEvent ev = await EventQueue.DequeueAsync(userState!, exitToken);
+ ChangeEvent ev = await EventQueue.DequeueAsync(userState, exitToken);
//Set the response
SetResponse(ev, context);
diff --git a/lib/VNLib.Data.Caching.ObjectCache/src/ICacheListenerEventQueue.cs b/lib/VNLib.Data.Caching.ObjectCache/src/ICacheListenerEventQueue.cs
index 06be4fa..439de94 100644
--- a/lib/VNLib.Data.Caching.ObjectCache/src/ICacheListenerEventQueue.cs
+++ b/lib/VNLib.Data.Caching.ObjectCache/src/ICacheListenerEventQueue.cs
@@ -25,6 +25,7 @@
using System.Threading;
using System.Threading.Tasks;
+using System.Diagnostics.CodeAnalysis;
namespace VNLib.Data.Caching.ObjectCache
{
@@ -38,7 +39,7 @@ namespace VNLib.Data.Caching.ObjectCache
/// </summary>
/// <param name="userState">The unique state of the connection</param>
/// <returns>True if event queuing is enabled</returns>
- bool IsEnabled(object userState);
+ bool IsEnabled([NotNullWhen(true)] object? userState);
/// <summary>
/// Attempts to dequeue a single event from the queue without blocking
diff --git a/lib/VNLib.Plugins.Extensions.VNCache/src/Clustering/ClusterNodeIndex.cs b/lib/VNLib.Plugins.Extensions.VNCache/src/Clustering/ClusterNodeIndex.cs
new file mode 100644
index 0000000..487a4f9
--- /dev/null
+++ b/lib/VNLib.Plugins.Extensions.VNCache/src/Clustering/ClusterNodeIndex.cs
@@ -0,0 +1,75 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Plugins.Extensions.VNCache
+* File: ClusterNodeIndex.cs
+*
+* ClusterNodeIndex.cs is part of VNLib.Plugins.Extensions.VNCache which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* VNLib.Plugins.Extensions.VNCache is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Plugins.Extensions.VNCache is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System.Threading;
+using System.Threading.Tasks;
+
+using VNLib.Utils.Logging;
+using VNLib.Data.Caching.Extensions;
+using VNLib.Data.Caching.Extensions.Clustering;
+using VNLib.Plugins.Extensions.Loading.Events;
+
+namespace VNLib.Plugins.Extensions.VNCache.Clustering
+{
+ internal sealed class ClusterNodeIndex : IClusterNodeIndex, IIntervalScheduleable
+ {
+ private readonly CacheClientConfiguration _config;
+ private Task _currentUpdate;
+
+
+ public ClusterNodeIndex(CacheClientConfiguration config)
+ {
+ _config = config;
+ _currentUpdate = Task.CompletedTask;
+ }
+
+ ///<inheritdoc/>
+ public CacheNodeAdvertisment? GetNextNode()
+ {
+ //Get all nodes
+ CacheNodeAdvertisment[] ads = _config.NodeCollection.GetAllNodes();
+ //Just get a random node from the collection for now
+ return ads.Length > 0 ? ads.SelectRandom() : null;
+ }
+
+ ///<inheritdoc/>
+ public Task WaitForDiscoveryAsync(CancellationToken cancellationToken)
+ {
+ return _currentUpdate.WaitAsync(cancellationToken);
+ }
+
+ /// <summary>
+ /// Runs the discovery process and updates the current update task
+ /// </summary>
+ /// <param name="log"></param>
+ /// <param name="cancellationToken">A token to cancel the operation</param>
+ /// <returns>A task that completes when the discovery process is complete</returns>
+ public Task OnIntervalAsync(ILogProvider log, CancellationToken cancellationToken)
+ {
+ //Run discovery operation and update the task
+ _currentUpdate = _config.DiscoverNodesAsync(cancellationToken);
+ return Task.CompletedTask;
+ }
+ }
+} \ No newline at end of file
diff --git a/lib/VNLib.Plugins.Extensions.VNCache/src/Clustering/IClusterNodeIndex.cs b/lib/VNLib.Plugins.Extensions.VNCache/src/Clustering/IClusterNodeIndex.cs
new file mode 100644
index 0000000..ffbfa0d
--- /dev/null
+++ b/lib/VNLib.Plugins.Extensions.VNCache/src/Clustering/IClusterNodeIndex.cs
@@ -0,0 +1,49 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Plugins.Extensions.VNCache
+* File: IClusterNodeIndex.cs
+*
+* IClusterNodeIndex.cs is part of VNLib.Plugins.Extensions.VNCache which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* VNLib.Plugins.Extensions.VNCache is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Plugins.Extensions.VNCache is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System.Threading;
+using System.Threading.Tasks;
+
+using VNLib.Data.Caching.Extensions.Clustering;
+
+namespace VNLib.Plugins.Extensions.VNCache.Clustering
+{
+ internal interface IClusterNodeIndex
+ {
+ /// <summary>
+ /// Gets the next available node using the configured balancing policy
+ /// or null if no nodes are available
+ /// </summary>
+ /// <returns>The next available node to connect to if any are available</returns>
+ CacheNodeAdvertisment? GetNextNode();
+
+ /// <summary>
+ /// Waits for the discovery process to complete. This is just incase a
+ /// connection wants to happen while a long discovery is processing.
+ /// </summary>
+ /// <param name="cancellationToken">A token to cancel the operation</param>
+ /// <returns>A task that resolves when the discovery process completes</returns>
+ Task WaitForDiscoveryAsync(CancellationToken cancellationToken);
+ }
+} \ No newline at end of file
diff --git a/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs b/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs
index aa3d88f..679c91d 100644
--- a/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs
+++ b/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs
@@ -41,6 +41,8 @@ using VNLib.Data.Caching.ObjectCache;
using VNLib.Net.Messaging.FBM.Client;
using VNLib.Plugins.Extensions.Loading;
using VNLib.Data.Caching.Extensions.Clustering;
+using VNLib.Plugins.Extensions.Loading.Events;
+using VNLib.Plugins.Extensions.VNCache.Clustering;
namespace VNLib.Plugins.Extensions.VNCache
{
@@ -51,14 +53,18 @@ namespace VNLib.Plugins.Extensions.VNCache
TimeSpan RefreshInterval { get; }
}
-
/// <summary>
/// A base class that manages
/// </summary>
[ConfigurationName(VNCacheExtensions.CACHE_CONFIG_KEY)]
internal class VnCacheClient : IGlobalCacheProvider, IAsyncBackgroundWork
{
+ private const string LOG_NAME = "CLIENT";
+ private static readonly TimeSpan InitialDelay = TimeSpan.FromSeconds(10);
+ private static readonly TimeSpan NoNodeDelay = TimeSpan.FromSeconds(10);
+
private readonly VnCacheClientConfig _config;
+ private readonly ClusterNodeIndex _index;
/// <summary>
/// The internal client
@@ -76,10 +82,23 @@ namespace VNLib.Plugins.Extensions.VNCache
plugin.IsDebug() ? plugin.Log : null
)
{
+ ILogProvider scoped = plugin.Log.CreateScope(LOG_NAME);
+
//Set authenticator and error handler
Client.GetCacheConfiguration()
.WithAuthenticator(new AuthManager(plugin))
- .WithErrorHandler(new DiscoveryErrHAndler(plugin.Log));
+ .WithErrorHandler(new DiscoveryErrHAndler(scoped));
+
+ //Schedule discovery interval
+ plugin.ScheduleInterval(_index, _config.DiscoveryInterval);
+
+ //Run discovery after initial delay if interval is greater than initial delay
+ if(_config.DiscoveryInterval > InitialDelay)
+ {
+ //Run a manual initial load
+ scoped.Information("Running initial discovery in {delay}", InitialDelay);
+ _ = plugin.ObserveWork(() => _index.OnIntervalAsync(scoped, plugin.UnloadToken), (int)InitialDelay.TotalMilliseconds);
+ }
}
public VnCacheClient(VnCacheClientConfig config, ILogProvider? debugLog)
@@ -98,6 +117,9 @@ namespace VNLib.Plugins.Extensions.VNCache
Client.GetCacheConfiguration()
.WithTls(config.UseTls)
.WithInitialPeers(config.GetInitialNodeUris());
+
+ //Init index
+ _index = new ClusterNodeIndex(Client.GetCacheConfiguration());
}
/*
@@ -106,46 +128,50 @@ namespace VNLib.Plugins.Extensions.VNCache
*/
public virtual async Task DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken)
{
+ //Scope log
+ pluginLog = pluginLog.CreateScope(LOG_NAME);
+
try
{
+ //Initial delay
+ pluginLog.Debug("Worker started, waiting for startup delay");
+ await Task.Delay((int)InitialDelay.TotalMilliseconds + 1000, exitToken);
+
while (true)
{
- //Discover nodes in the network
-
- while (true)
+ //Wait for a discovery to complete
+ await _index.WaitForDiscoveryAsync(exitToken);
+
+ //Get the next node to connect to
+ CacheNodeAdvertisment? node = _index.GetNextNode();
+
+ if (node is null)
{
- try
- {
- pluginLog.Debug("Discovering cluster nodes in network");
- //Get server list
- await Client.DiscoverAvailableNodesAsync(exitToken);
- break;
- }
- catch (HttpRequestException re) when (re.InnerException is SocketException)
- {
- pluginLog.Warn("Broker server is unreachable");
- }
- catch(CacheDiscoveryFailureException ce)
- {
- pluginLog.Warn("Failed to discover cache servers, reason {r}", ce);
- }
- catch (Exception ex)
+ pluginLog.Warn("No nodes available to connect to, trying again in {delay}", NoNodeDelay);
+ await Task.Delay(NoNodeDelay, exitToken);
+
+ //Run another manual discovery if the interval is greater than the delay
+ if (_config.DiscoveryInterval > NoNodeDelay)
{
- pluginLog.Warn("Failed to get server list from broker, reason {r}", ex.Message);
+ pluginLog.Debug("Forcing a manual discovery");
+
+ //We dont need to await this because it is awaited at the top of the loop
+ _ = _index.OnIntervalAsync(pluginLog, exitToken);
}
- //Gen random ms delay
- int randomMsDelay = RandomNumberGenerator.GetInt32(1000, 2000);
- await Task.Delay(randomMsDelay, exitToken);
+ continue;
}
+ //Ready to connect
+
try
{
- pluginLog.Debug("Connecting to random cache server");
+ pluginLog.Debug("Connecting to {node}", node);
- //Connect to a random server
- CacheNodeAdvertisment selected = await Client.ConnectToRandomCacheAsync(exitToken);
- pluginLog.Debug("Connected to cache server {s}", selected.NodeId);
+ //Connect to the node
+ await Client.ConnectToCacheAsync(node, exitToken);
+
+ pluginLog.Debug("Successfully connected to {s}", node);
//Set connection status flag
IsConnected = true;
@@ -157,7 +183,7 @@ namespace VNLib.Plugins.Extensions.VNCache
}
catch (WebSocketException wse)
{
- pluginLog.Warn("Failed to connect to cache server {reason}", wse.Message);
+ pluginLog.Warn("Failed to connect to cache server {reason}", wse);
continue;
}
catch (HttpRequestException he) when (he.InnerException is SocketException)
@@ -170,6 +196,8 @@ namespace VNLib.Plugins.Extensions.VNCache
{
IsConnected = false;
}
+
+ //Loop again
}
}
catch (OperationCanceledException)
@@ -279,14 +307,14 @@ namespace VNLib.Plugins.Extensions.VNCache
public byte[] SignMessageHash(byte[] hash, HashAlg alg)
{
//try to get the rsa alg for the signing key
- using RSA? rsa = _sigKey.Value.GetRSAPublicKey();
+ using RSA? rsa = _sigKey.Value.GetRSAPrivateKey();
if(rsa != null)
{
return rsa.SignHash(hash, alg.GetAlgName(), RSASignaturePadding.Pkcs1);
}
//try to get the ecdsa alg for the signing key
- using ECDsa? ecdsa = _sigKey.Value.GetECDsaPublicKey();
+ using ECDsa? ecdsa = _sigKey.Value.GetECDsaPrivateKey();
if(ecdsa != null)
{
return ecdsa.SignHash(hash);
@@ -326,7 +354,7 @@ namespace VNLib.Plugins.Extensions.VNCache
{
public void OnDiscoveryError(CacheNodeAdvertisment errorNode, Exception ex)
{
- Logger.Error("Failed to discover nodes from server {s} cause:\n{err}", errorNode.NodeId, ex);
+ Logger.Error("Failed to discover nodes from server {s} cause:\n{err}", errorNode?.NodeId, ex);
}
}
}
diff --git a/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClientConfig.cs b/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClientConfig.cs
index 73fb70f..bfa9d92 100644
--- a/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClientConfig.cs
+++ b/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClientConfig.cs
@@ -53,8 +53,8 @@ namespace VNLib.Plugins.Extensions.VNCache
/// The time (in seconds) to randomly delay polling the broker server
/// for available servers
/// </summary>
- [JsonPropertyName("retry_interval_sec")]
- public int? RetryIntervalSeconds { get; set; }
+ [JsonPropertyName("discovery_interval_Sec")]
+ public int? DiscoveryIntervalSeconds { get; set; }
/// <summary>
/// The maximum time (in seconds) for FBM cache operations are allowed
@@ -71,7 +71,7 @@ namespace VNLib.Plugins.Extensions.VNCache
/// <summary>
/// Retry interval in a timespan
/// </summary>
- internal TimeSpan RetryInterval => TimeSpan.FromSeconds(RetryIntervalSeconds!.Value);
+ internal TimeSpan DiscoveryInterval => TimeSpan.FromSeconds(DiscoveryIntervalSeconds!.Value);
/// <summary>
/// FBM Request timeout
@@ -92,7 +92,7 @@ namespace VNLib.Plugins.Extensions.VNCache
public Uri[] GetInitialNodeUris()
{
_ = InitialNodes ?? throw new InvalidOperationException("Initial nodes have not been set");
- return InitialNodes.Select(x => new Uri(x)).ToArray();
+ return InitialNodes.Select(static x => new Uri(x, UriKind.Absolute)).ToArray();
}
void IOnConfigValidation.Validate()
@@ -102,7 +102,7 @@ namespace VNLib.Plugins.Extensions.VNCache
throw new ArgumentException("Your maxium message size should be a reasonable value greater than 0", "max_message_size");
}
- if (!RetryIntervalSeconds.HasValue || RetryIntervalSeconds.Value < 1)
+ if (!DiscoveryIntervalSeconds.HasValue || DiscoveryIntervalSeconds.Value < 1)
{
throw new ArgumentException("You must specify a retry interval period greater than 0", "retry_interval_sec");
}
@@ -128,7 +128,7 @@ namespace VNLib.Plugins.Extensions.VNCache
}
//Verify http connection
- if(peer.Scheme != Uri.UriSchemeHttp || peer.Scheme != Uri.UriSchemeHttps)
+ if(peer.Scheme != Uri.UriSchemeHttp && peer.Scheme != Uri.UriSchemeHttps)
{
throw new ArgumentException("You must specify an HTTP or HTTPS URI for each initial node", "initial_nodes");
}