aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLibravatar vnugent <public@vaughnnugent.com>2023-06-22 21:16:28 -0400
committerLibravatar vnugent <public@vaughnnugent.com>2023-06-22 21:16:28 -0400
commit1a8ab1457244d15b19ddcc94958f645f5ec2abc7 (patch)
tree3994806e0737cf6f519a72cca8836c6e81eac7e2
parentdc0fc53fd3c3f6c32c8b0d063922c7018fa2c48f (diff)
Save checkpoint
-rw-r--r--README.md6
-rw-r--r--lib/VNLib.Data.Caching.Extensions/src/ActiveServer.cs2
-rw-r--r--lib/VNLib.Data.Caching.Extensions/src/BrokerRegistrationRequest.cs116
-rw-r--r--lib/VNLib.Data.Caching.Extensions/src/CacheClientConfiguration.cs106
-rw-r--r--lib/VNLib.Data.Caching.Extensions/src/CacheListServerRequest.cs82
-rw-r--r--lib/VNLib.Data.Caching.Extensions/src/CacheNodeConfiguration.cs14
-rw-r--r--lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs317
-rw-r--r--lib/VNLib.Data.Caching.Extensions/src/ICacheAuthManager.cs76
-rw-r--r--lib/VNLib.Data.Caching.Extensions/src/ICacheDiscoveryErrorHandler.cs41
-rw-r--r--lib/VNLib.Data.Caching.Extensions/src/ICacheNodeAdvertisment.cs (renamed from lib/VNLib.Data.Caching.Extensions/src/ICachePeerAdvertisment.cs)6
-rw-r--r--lib/VNLib.Data.Caching.Extensions/src/INodeDiscoveryCollection.cs (renamed from plugins/ObjectCacheServer/src/Distribution/INodeDiscoveryCollection.cs)26
-rw-r--r--lib/VNLib.Data.Caching.Extensions/src/INodeDiscoveryEnumerator.cs (renamed from plugins/ObjectCacheServer/src/Distribution/INodeDiscoveryEnumerator.cs)27
-rw-r--r--lib/VNLib.Data.Caching.Extensions/src/NodeDiscoveryCollection.cs (renamed from plugins/ObjectCacheServer/src/Distribution/NodeDiscoveryCollection.cs)68
-rw-r--r--lib/VNLib.Data.Caching.ObjectCache/src/BlobCacheLIstener.cs38
-rw-r--r--lib/VNLib.Data.Caching.ObjectCache/src/ICacheListenerEventQueue.cs65
-rw-r--r--lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs132
-rw-r--r--lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClientConfig.cs48
-rw-r--r--plugins/ObjectCacheServer/src/CacheAuthKeyStore.cs117
-rw-r--r--plugins/ObjectCacheServer/src/CacheEventQueueManager.cs17
-rw-r--r--plugins/ObjectCacheServer/src/CacheListenerPubQueue.cs117
-rw-r--r--plugins/ObjectCacheServer/src/CacheStore.cs (renamed from plugins/ObjectCacheServer/src/Endpoints/CacheStore.cs)19
-rw-r--r--plugins/ObjectCacheServer/src/CacheSystemUtil.cs (renamed from plugins/ObjectCacheServer/src/Endpoints/CacheSystemUtil.cs)0
-rw-r--r--plugins/ObjectCacheServer/src/Distribution/CacheNodeReplicationMaanger.cs16
-rw-r--r--plugins/ObjectCacheServer/src/Distribution/CachePeerMonitor.cs29
-rw-r--r--plugins/ObjectCacheServer/src/Distribution/ICachePeerAdapter.cs10
-rw-r--r--plugins/ObjectCacheServer/src/Distribution/IPeerMonitor.cs4
-rw-r--r--plugins/ObjectCacheServer/src/Distribution/KnownPeerList.cs100
-rw-r--r--plugins/ObjectCacheServer/src/Distribution/PeerDiscoveryManager.cs91
-rw-r--r--plugins/ObjectCacheServer/src/Endpoints/BrokerHeartBeatEndpoint.cs149
-rw-r--r--plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs105
-rw-r--r--plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs14
-rw-r--r--plugins/ObjectCacheServer/src/ICacheEventQueueManager.cs9
-rw-r--r--plugins/ObjectCacheServer/src/ICachePeer.cs2
-rw-r--r--plugins/ObjectCacheServer/src/IPeerEventQueue.cs50
-rw-r--r--plugins/ObjectCacheServer/src/NodeConfig.cs177
-rw-r--r--plugins/ObjectCacheServer/src/ObjectCacheServer.csproj2
-rw-r--r--plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs78
37 files changed, 1149 insertions, 1127 deletions
diff --git a/README.md b/README.md
index 1316eb4..a462769 100644
--- a/README.md
+++ b/README.md
@@ -1,9 +1,11 @@
# DataCaching
*A collection of data-caching libraries and server plugins for cache clients, servers, and clustering*
-#### Builds
-Debug build w/ symbols & xml docs, release builds, NuGet packages, and individually packaged source code are available on my [website](https://www.vaughnnugent.com/resources/software). All tar-gzip (.tgz) files will have an associated .sha384 appended checksum of the desired download file.
+## Docs
+You can find the documentation for this module and all child extension projects on my website [here](https://www.vaughnnugent.com/resources/software/articles?tags=docs,_VNLib.Data.Caching).
+## Builds
+Debug build w/ symbols & xml docs, release builds, NuGet packages, and individually packaged source code are available on my [website](https://www.vaughnnugent.com/resources/software/modules/VNLib.Data.Caching). All tar-gzip (.tgz) files will have an associated .sha384 appended checksum of the desired download file.
## Licensing
Projects contained in this repository are individually licensed, either GNU GPL V2+ or GNU GPL GPL Aferro V3+. Builds contain the required license txt files in the archive. \ No newline at end of file
diff --git a/lib/VNLib.Data.Caching.Extensions/src/ActiveServer.cs b/lib/VNLib.Data.Caching.Extensions/src/ActiveServer.cs
index 2d02491..3020376 100644
--- a/lib/VNLib.Data.Caching.Extensions/src/ActiveServer.cs
+++ b/lib/VNLib.Data.Caching.Extensions/src/ActiveServer.cs
@@ -27,7 +27,7 @@ using System.Text.Json.Serialization;
namespace VNLib.Data.Caching.Extensions
{
- public class ActiveServer : ICachePeerAdvertisment
+ public class ActiveServer : ICacheNodeAdvertisment
{
[JsonPropertyName("address")]
public string? HostName { get; set; }
diff --git a/lib/VNLib.Data.Caching.Extensions/src/BrokerRegistrationRequest.cs b/lib/VNLib.Data.Caching.Extensions/src/BrokerRegistrationRequest.cs
deleted file mode 100644
index 9ec559a..0000000
--- a/lib/VNLib.Data.Caching.Extensions/src/BrokerRegistrationRequest.cs
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
-* Copyright (c) 2023 Vaughn Nugent
-*
-* Library: VNLib
-* Package: VNLib.Data.Caching.Extensions
-* File: BrokerRegistrationRequest.cs
-*
-* BrokerRegistrationRequest.cs is part of VNLib.Data.Caching.Extensions which is part of the larger
-* VNLib collection of libraries and utilities.
-*
-* VNLib.Data.Caching.Extensions is free software: you can redistribute it and/or modify
-* it under the terms of the GNU Affero General Public License as
-* published by the Free Software Foundation, either version 3 of the
-* License, or (at your option) any later version.
-*
-* VNLib.Data.Caching.Extensions is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-* GNU Affero General Public License for more details.
-*
-* You should have received a copy of the GNU Affero General Public License
-* along with this program. If not, see https://www.gnu.org/licenses/.
-*/
-
-using System;
-using System.Collections.Generic;
-
-using VNLib.Utils;
-using VNLib.Hashing.IdentityUtility;
-
-
-namespace VNLib.Data.Caching.Extensions
-{
- /// <summary>
- /// A broker registration request message in a fluent api
- /// format. This message may be disposed when no longer in use
- /// </summary>
- public sealed class BrokerRegistrationRequest : VnDisposeable
- {
- private bool ownsKey;
- private ReadOnlyJsonWebKey? SigningKey;
-
- /// <summary>
- /// The cache server node id
- /// </summary>
- public string? NodeId { get; private set; }
- /// <summary>
- /// The broker server's address
- /// </summary>
- public Uri? BrokerAddress { get; private set; }
- /// <summary>
- /// The security token used by the broker server to
- /// authenticate during heartbeat connections
- /// </summary>
- public string? HeartbeatToken { get; private set; }
- /// <summary>
- /// The address for remote clients to use to
- /// connect to this server
- /// </summary>
- public string? RegistrationAddress { get; private set; }
-
- /// <summary>
- /// Recovers the private key from the supplied certificate
- /// </summary>
- /// <param name="jwk">The private key used to sign messages</param>
- /// <param name="ownsKey">A value that indicates if the current instance owns the key</param>
- /// <returns></returns>
- /// <exception cref="ArgumentException"></exception>
- public BrokerRegistrationRequest WithSigningKey(ReadOnlyJsonWebKey jwk, bool ownsKey)
- {
- this.ownsKey = ownsKey;
- SigningKey = jwk ?? throw new ArgumentNullException(nameof(jwk));
- return this;
- }
-
- public BrokerRegistrationRequest WithBroker(Uri brokerUri)
- {
- BrokerAddress = brokerUri;
- return this;
- }
-
- public BrokerRegistrationRequest WithRegistrationAddress(string address)
- {
- RegistrationAddress = address;
- return this;
- }
-
- public BrokerRegistrationRequest WithHeartbeatToken(string token)
- {
- HeartbeatToken = token;
- return this;
- }
-
- public BrokerRegistrationRequest WithNodeId(string nodeId)
- {
- NodeId = nodeId;
- return this;
- }
-
- internal void SignJwt(JsonWebToken jwt)
- {
- jwt.SignFromJwk(SigningKey);
- }
-
- internal IReadOnlyDictionary<string, string?> JsonHeader => SigningKey!.JwtHeader;
-
- ///<inheritdoc/>
- protected override void Free()
- {
- if (ownsKey)
- {
- SigningKey?.Dispose();
- }
- }
- }
-}
diff --git a/lib/VNLib.Data.Caching.Extensions/src/CacheClientConfiguration.cs b/lib/VNLib.Data.Caching.Extensions/src/CacheClientConfiguration.cs
index 05e4928..9229c89 100644
--- a/lib/VNLib.Data.Caching.Extensions/src/CacheClientConfiguration.cs
+++ b/lib/VNLib.Data.Caching.Extensions/src/CacheClientConfiguration.cs
@@ -3,9 +3,9 @@
*
* Library: VNLib
* Package: VNLib.Data.Caching.Extensions
-* File: ClientCacheConfiguration.cs
+* File: CacheClientConfiguration.cs
*
-* ClientCacheConfiguration.cs is part of VNLib.Data.Caching.Extensions which is part of the larger
+* CacheClientConfiguration.cs is part of VNLib.Data.Caching.Extensions which is part of the larger
* VNLib collection of libraries and utilities.
*
* VNLib.Data.Caching.Extensions is free software: you can redistribute it and/or modify
@@ -22,69 +22,47 @@
* along with this program. If not, see https://www.gnu.org/licenses/.
*/
-using System;
+using System.Linq;
using System.Collections.Generic;
-using System.Security.Cryptography;
-
-using VNLib.Hashing;
-using VNLib.Hashing.IdentityUtility;
namespace VNLib.Data.Caching.Extensions
{
- public interface ICacheJwtManager
- {
- IReadOnlyDictionary<string, string?> GetJwtHeader();
-
- void SignJwt(JsonWebToken jwt);
-
- bool VerifyCache(JsonWebToken jwt);
-
- bool VerifyBroker(JsonWebToken jwt);
- }
-
/// <summary>
/// A fluent api configuration object for configuring a <see cref="FBMClient"/>
/// to connect to cache servers.
/// </summary>
- public class CacheClientConfiguration : ICacheJwtManager, ICacheListServerRequest
+ public class CacheClientConfiguration
{
- public ReadOnlyJsonWebKey? SigningKey { get; private set; }
- public ReadOnlyJsonWebKey? VerificationKey { get; private set; }
- public ReadOnlyJsonWebKey? BrokerVerificationKey { get; private set; }
-
- public Uri? DiscoveryEndpoint { get; private set; }
- public bool UseTls { get; private set; }
- internal ICachePeerAdvertisment[]? CacheServers { get; set; }
+ /// <summary>
+ /// Stores available cache servers to be used for discovery, and connections
+ /// </summary>
+ public INodeDiscoveryCollection NodeCollection { get; } = new NodeDiscoveryCollection();
/// <summary>
- /// Imports the private key used to sign messages
+ /// The authentication manager to use for signing and verifying messages to and from the cache servers
/// </summary>
- /// <param name="jwk">The <see cref="ReadOnlyJsonWebKey"/> with a private key loaded</param>
- /// <returns>Chainable fluent object</returns>
- /// <exception cref="ArgumentException"></exception>
- /// <exception cref="CryptographicException"></exception>
- public CacheClientConfiguration WithSigningKey(ReadOnlyJsonWebKey jwk)
- {
- SigningKey = jwk ?? throw new ArgumentNullException(nameof(jwk));
- return this;
- }
+ public ICacheAuthManager AuthManager { get; private set; }
/// <summary>
- /// Imports the public key used to verify messages from the remote server
+ /// The error handler to use for handling errors that occur during the discovery process
/// </summary>
- /// <param name="jwk">The <see cref="ReadOnlyJsonWebKey"/> public key only used for message verification</param>
- /// <returns>Chainable fluent object</returns>
- /// <exception cref="ArgumentException"></exception>
- /// <exception cref="CryptographicException"></exception>
- public CacheClientConfiguration WithVerificationKey(ReadOnlyJsonWebKey jwk)
- {
- VerificationKey = jwk ?? throw new ArgumentNullException(nameof(jwk));
- return this;
- }
+ public ICacheDiscoveryErrorHandler? ErrorHandler { get; private set; }
- public CacheClientConfiguration WithBrokerVerificationKey(ReadOnlyJsonWebKey jwk)
+ /// <summary>
+ /// Specifies if all connections should use TLS
+ /// </summary>
+ public bool UseTls { get; private set; }
+
+ internal ICacheNodeAdvertisment[]? InitialPeers { get; set; }
+
+ /// <summary>
+ /// Specifies the JWT authentication manager to use for signing and verifying JWTs
+ /// </summary>
+ /// <param name="manager">The authentication manager</param>
+ /// <returns>Chainable fluent object</returns>
+ public CacheClientConfiguration WithAuthenticator(ICacheAuthManager manager)
{
- BrokerVerificationKey = jwk ?? throw new ArgumentNullException(nameof(jwk));
+ AuthManager = manager;
return this;
}
@@ -92,7 +70,6 @@ namespace VNLib.Data.Caching.Extensions
/// Specifies if all connections should be using TLS
/// </summary>
/// <param name="useTls">A value that indicates if connections should use TLS</param>
- /// <returns>Chainable fluent object</returns>
public CacheClientConfiguration WithTls(bool useTls)
{
UseTls = useTls;
@@ -100,28 +77,25 @@ namespace VNLib.Data.Caching.Extensions
}
/// <summary>
- /// Specifies the broker address to discover cache nodes from
+ /// Specifies the initial cache peers to connect to
/// </summary>
- /// <param name="brokerAddress">The address of the server broker</param>
+ /// <param name="peers">The collection of servers to discover peers from and connect to</param>
/// <returns>Chainable fluent object</returns>
- /// <exception cref="ArgumentNullException"></exception>
- public CacheClientConfiguration WithBroker(Uri brokerAddress)
+ public CacheClientConfiguration WithInitialPeers(IEnumerable<ICacheNodeAdvertisment> peers)
{
- DiscoveryEndpoint = brokerAddress ?? throw new ArgumentNullException(nameof(brokerAddress));
+ InitialPeers = peers.ToArray();
return this;
}
-
- ///<inheritdoc/>
- public void SignJwt(JsonWebToken jwt) => jwt.SignFromJwk(SigningKey!);
-
- ///<inheritdoc/>
- public bool VerifyCache(JsonWebToken jwt) => jwt.VerifyFromJwk(VerificationKey!);
-
- ///<inheritdoc/>
- public bool VerifyBroker(JsonWebToken jwt) => jwt.VerifyFromJwk(BrokerVerificationKey!);
-
- ///<inheritdoc/>
- public IReadOnlyDictionary<string, string?> GetJwtHeader() => SigningKey!.JwtHeader;
+ /// <summary>
+ /// Specifies the error handler to use for handling errors that occur during the discovery process
+ /// </summary>
+ /// <param name="handler">The error handler to use during a discovery</param>
+ /// <returns>Chainable fluent object</returns>
+ public CacheClientConfiguration WithErrorHandler(ICacheDiscoveryErrorHandler handler)
+ {
+ ErrorHandler = handler;
+ return this;
+ }
}
}
diff --git a/lib/VNLib.Data.Caching.Extensions/src/CacheListServerRequest.cs b/lib/VNLib.Data.Caching.Extensions/src/CacheListServerRequest.cs
deleted file mode 100644
index 76d4ad8..0000000
--- a/lib/VNLib.Data.Caching.Extensions/src/CacheListServerRequest.cs
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
-* Copyright (c) 2023 Vaughn Nugent
-*
-* Library: VNLib
-* Package: VNLib.Data.Caching.Extensions
-* File: ListServerRequest.cs
-*
-* ListServerRequest.cs is part of VNLib.Data.Caching.Extensions which is part of the larger
-* VNLib collection of libraries and utilities.
-*
-* VNLib.Data.Caching.Extensions is free software: you can redistribute it and/or modify
-* it under the terms of the GNU Affero General Public License as
-* published by the Free Software Foundation, either version 3 of the
-* License, or (at your option) any later version.
-*
-* VNLib.Data.Caching.Extensions is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-* GNU Affero General Public License for more details.
-*
-* You should have received a copy of the GNU Affero General Public License
-* along with this program. If not, see https://www.gnu.org/licenses/.
-*/
-
-using System;
-using System.Collections.Generic;
-
-using VNLib.Utils;
-using VNLib.Hashing.IdentityUtility;
-
-namespace VNLib.Data.Caching.Extensions
-{
- public interface ICacheListServerRequest : ICacheJwtManager
- {
- Uri DiscoveryEndpoint { get; }
- }
-
- /// <summary>
- /// A request container for a ListServer request
- /// </summary>
- public sealed class CacheListServerRequest : ICacheListServerRequest
- {
- private readonly ICacheJwtManager _manager;
-
-
- /// <summary>
- /// The address of the broker server to connect to
- /// </summary>
- public Uri DiscoveryEndpoint { get; private set; }
-
- public CacheListServerRequest(ICacheJwtManager keyManager, Uri? brokerAddress = null)
- {
- _manager = keyManager;
- DiscoveryEndpoint = brokerAddress!;
- }
-
-
- /// <summary>
- /// Sets the broker address for the request
- /// </summary>
- /// <param name="brokerAddr">The broker server's address to connect to</param>
- /// <returns>A fluent chainable value</returns>
- /// <exception cref="ArgumentNullException"></exception>
- public CacheListServerRequest WithDiscoveryEndpoint(Uri brokerAddr)
- {
- DiscoveryEndpoint = brokerAddr ?? throw new ArgumentNullException(nameof(brokerAddr));
- return this;
- }
-
- /// <inheritdoc/>
- public void SignJwt(JsonWebToken jwt) => _manager.SignJwt(jwt);
-
- /// <inheritdoc/>
- public bool VerifyCache(JsonWebToken jwt) => _manager.VerifyCache(jwt);
-
- /// <inheritdoc/>
- public bool VerifyBroker(JsonWebToken jwt) => _manager.VerifyBroker(jwt);
-
- /// <inheritdoc/>
- public IReadOnlyDictionary<string, string?> GetJwtHeader() => _manager.GetJwtHeader();
- }
-}
diff --git a/lib/VNLib.Data.Caching.Extensions/src/CacheNodeConfiguration.cs b/lib/VNLib.Data.Caching.Extensions/src/CacheNodeConfiguration.cs
index 21a99e1..29a763c 100644
--- a/lib/VNLib.Data.Caching.Extensions/src/CacheNodeConfiguration.cs
+++ b/lib/VNLib.Data.Caching.Extensions/src/CacheNodeConfiguration.cs
@@ -24,11 +24,13 @@
using System;
-
namespace VNLib.Data.Caching.Extensions
{
- public class CacheNodeConfiguration: CacheClientConfiguration, ICachePeerAdvertisment
+ /// <summary>
+ /// A cache configuration for cache servers (nodes)
+ /// </summary>
+ public class CacheNodeConfiguration: CacheClientConfiguration, ICacheNodeAdvertisment
{
/// <summary>
/// The address for clients to connect to
@@ -56,9 +58,13 @@ namespace VNLib.Data.Caching.Extensions
return this;
}
- public CacheNodeConfiguration EnableAdvertisment(bool enable, Uri? discoveryEndpoint)
+ /// <summary>
+ /// Enables or disables the advertisement of this node to other nodes
+ /// </summary>
+ /// <param name="discoveryEndpoint">The absolute endpoint clients will use to connect to</param>
+ public CacheNodeConfiguration EnableAdvertisment(Uri? discoveryEndpoint)
{
- BroadcastAdverisment = enable;
+ BroadcastAdverisment = discoveryEndpoint != null;
DiscoveryEndpoint = discoveryEndpoint;
return this;
}
diff --git a/lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs b/lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs
index 9efe16a..634b6de 100644
--- a/lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs
+++ b/lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs
@@ -139,91 +139,114 @@ namespace VNLib.Data.Caching.Extensions
client.Config.DebugLog?.Debug("{debug}: {data}", "[CACHE]", message);
}
+
/// <summary>
- /// Creats a new <see cref="CacheListServerRequest"/> from an existing <see cref="CacheClientConfiguration"/>
+ /// Discovers ALL possible cache nodes itteritivley from the current collection of initial peers.
+ /// This will make connections to all discoverable servers
/// </summary>
- /// <param name="conf">The prepared client configuration</param>
- /// <returns>The new <see cref="CacheListServerRequest"/></returns>
- public static CacheListServerRequest GetListMessage(this CacheClientConfiguration conf)
+ /// <param name="config"></param>
+ /// <param name="cancellation">A token to cancel the operation</param>
+ /// <returns></returns>
+ /// <exception cref="ArgumentException"></exception>
+ public static async Task DiscoverNodesAsync(this CacheClientConfiguration config, CancellationToken cancellation)
{
- return new(conf, conf.DiscoveryEndpoint);
+ //Make sure at least one node defined
+ if(config?.InitialPeers == null || config.InitialPeers.Length == 0)
+ {
+ throw new ArgumentException("There must be at least one cache server defined in the client configuration");
+ }
+
+ //Get the discovery enumerator with the initial peers
+ INodeDiscoveryEnumerator enumerator = config.NodeCollection.BeginDiscovery(config.InitialPeers);
+
+ //Start the discovery process
+ await DiscoverNodesAsync(enumerator, config.AuthManager, config.ErrorHandler, cancellation);
+
+ //Commit nodes
+ config.NodeCollection.CompleteDiscovery(enumerator);
}
- /// <summary>
- /// Discovers peer nodes from a given initial peer and returns a list of discovered nodes. If the config
- /// is for a cache peer node, the current peer is removed from the list of discovered nodes.
- /// </summary>
- /// <param name="cacheConfig"></param>
- /// <param name="initialPeer">The initial peer to discover nodes from</param>
- /// <param name="cancellation">A token to cancel the discovery operation</param>
- /// <returns>The collection of discovered nodes</returns>
- /// <exception cref="ArgumentNullException"></exception>
- public static async Task<ICachePeerAdvertisment[]?> DiscoverClusterNodesAsync(
- this CacheClientConfiguration cacheConfig,
- ICachePeerAdvertisment initialPeer,
- CancellationToken cancellation
+ private static async Task DiscoverNodesAsync(
+ INodeDiscoveryEnumerator enumerator,
+ ICacheAuthManager auth,
+ ICacheDiscoveryErrorHandler? errHandler,
+ CancellationToken cancellation
)
{
- _ = initialPeer?.DiscoveryEndpoint ?? throw new ArgumentException("Advertisment does not expose an advertisment endpoint");
-
- //Create list request
- CacheListServerRequest request = cacheConfig.GetListMessage();
+ //Loop through servers
+ while (enumerator.MoveNext())
+ {
+ //Make sure the node has a discovery endpoint
+ if (enumerator.Current.DiscoveryEndpoint == null)
+ {
+ //Skip this node
+ continue;
+ }
- //Override with the initial peer's discovery endpoint
- request.WithDiscoveryEndpoint(initialPeer.DiscoveryEndpoint);
+ /*
+ * We are allowed to save nodes that do not have a discovery endpoint, but we cannot discover nodes from them
+ * we can only use them as cache
+ */
- //Get the list of servers
- ICachePeerAdvertisment[]? servers = await ListServersAsync(request, cancellation);
+ //add a random delay to avoid spamming the server
+ await Task.Delay((int)Random.Shared.NextInt64(50, 500), cancellation);
- if (servers == null)
- {
- return null;
- }
-
- if(cacheConfig is CacheNodeConfiguration cnc)
- {
- //Filter out the current node
- return servers.Where(s => !cnc.NodeId.Equals(s.NodeId, StringComparison.OrdinalIgnoreCase)).ToArray();
+ try
+ {
+ //Discover nodes from the current node
+ ICacheNodeAdvertisment[]? nodes = await GetCacheNodesAsync(enumerator.Current, auth, cancellation);
+
+ if (nodes != null)
+ {
+ //Add nodes to the collection
+ enumerator.OnPeerDiscoveryComplete(nodes);
+ }
+ }
+ //Catch exceptions when an error handler is defined
+ catch(Exception ex) when (errHandler != null)
+ {
+ //Handle the error
+ errHandler.OnDiscoveryError(enumerator.Current, ex);
+ }
}
- else
- {
- //Do not filter
- return servers;
- }
}
/// <summary>
/// Contacts the cache broker to get a list of active servers to connect to
/// </summary>
- /// <param name="request">The request message used to connecto the broker server</param>
+ /// <param name="advert">An advertisment of a server to discover other nodes from</param>
/// <param name="cancellationToken">A token to cancel the operationS</param>
+ /// <param name="auth">The authentication manager</param>
/// <returns>The list of active servers</returns>
/// <exception cref="SecurityException"></exception>
+ /// <exception cref="ArgumentException"></exception>
/// <exception cref="ArgumentNullException"></exception>
- public static async Task<ICachePeerAdvertisment[]?> ListServersAsync(ICacheListServerRequest request, CancellationToken cancellationToken = default)
+ public static async Task<ICacheNodeAdvertisment[]?> GetCacheNodesAsync(ICacheNodeAdvertisment advert, ICacheAuthManager auth, CancellationToken cancellationToken = default)
{
- _ = request ?? throw new ArgumentNullException(nameof(request));
+ _ = advert ?? throw new ArgumentNullException(nameof(advert));
+ _ = auth ?? throw new ArgumentNullException(nameof(auth));
+ _ = advert.DiscoveryEndpoint ?? throw new ArgumentException("Advertisment does not expose an advertisment endpoint");
string jwtBody;
//Build request jwt
using (JsonWebToken requestJwt = new())
{
- requestJwt.WriteHeader(request.GetJwtHeader());
+ requestJwt.WriteHeader(auth.GetJwtHeader());
requestJwt.InitPayloadClaim()
.AddClaim("iat", DateTimeOffset.UtcNow.ToUnixTimeMilliseconds())
.AddClaim("nonce", RandomHash.GetRandomBase32(16))
.CommitClaims();
//sign the jwt
- request.SignJwt(requestJwt);
+ auth.SignJwt(requestJwt);
//Compile the jwt
jwtBody = requestJwt.Compile();
}
//New list request
- RestRequest listRequest = new(request.DiscoveryEndpoint, Method.Post);
+ RestRequest listRequest = new(advert.DiscoveryEndpoint, Method.Post);
//Add the jwt as a string to the request body
listRequest.AddStringBody(jwtBody, DataFormat.None);
@@ -248,87 +271,18 @@ namespace VNLib.Data.Caching.Extensions
//Response is jwt
using JsonWebToken responseJwt = JsonWebToken.ParseRaw(data);
-
+
//Verify the jwt
- if (!request.VerifyBroker(responseJwt))
+ if (!auth.VerifyJwt(responseJwt))
{
throw new SecurityException("Failed to verify the broker's challenge, cannot continue");
}
-
+
using JsonDocument doc = responseJwt.GetPayload();
return doc.RootElement.GetProperty("peers").Deserialize<Advertisment[]>();
}
/// <summary>
- /// Registers the current node with the broker
- /// </summary>
- /// <returns>A task that completes when the regitration has been made successfully</returns>
- /// <exception cref="ArgumentException"></exception>
- public static async Task RegisterWithBrokerAsync(this CacheNodeConfiguration config, string authToken)
- {
- //Recover the certificate
- ReadOnlyJsonWebKey cacheCert = config?.SigningKey ?? throw new ArgumentException(nameof(config.SigningKey));
-
- //init broker request
- using BrokerRegistrationRequest request = new();
-
- request.WithBroker(config.DiscoveryEndpoint!)
- .WithRegistrationAddress(config.ConnectEndpoint!.ToString())
- .WithNodeId(config.NodeId!)
- .WithSigningKey(cacheCert, true)
- .WithHeartbeatToken(authToken);
-
-
- //Send the request
- await RegisterWithBrokerAsync(request);
- }
-
- /// <summary>
- /// Registers the current server as active with the specified broker
- /// </summary>
- /// <param name="registration">The registration request</param>
- public static async Task RegisterWithBrokerAsync(BrokerRegistrationRequest registration)
- {
- _ = registration ?? throw new ArgumentNullException(nameof(registration));
- _ = registration.HeartbeatToken ?? throw new ArgumentException("Missing required heartbeat access token");
- _ = registration.NodeId ?? throw new ArgumentException("Missing required cache server NodeId");
- _ = registration.BrokerAddress ?? throw new ArgumentException("Broker server address has not been configured");
- _ = registration.RegistrationAddress ?? throw new ArgumentException("Missing required registration address", nameof(registration));
-
- string requestData;
- //Create the jwt for signed registration message
- using (JsonWebToken jwt = new())
- {
- //Shared jwt header
- jwt.WriteHeader(registration.JsonHeader);
- //build jwt claim
- jwt.InitPayloadClaim()
- .AddClaim("address", registration.RegistrationAddress)
- .AddClaim("sub", registration.NodeId)
- .AddClaim("token", registration.HeartbeatToken)
- .CommitClaims();
-
- //Sign the jwt
- registration.SignJwt(jwt);
- //Compile and save
- requestData = jwt.Compile();
- }
-
- //Create reg request message
- RestRequest regRequest = new(registration.BrokerAddress);
- regRequest.AddStringBody(requestData, DataFormat.None);
- regRequest.AddHeader("Content-Type", "text/plain");
-
- //Rent client
- using ClientContract cc = ClientPool.Lease();
-
- //Exec the regitration request
- RestResponse response = await cc.Resource.ExecutePutAsync(regRequest);
- response.ThrowIfError();
- }
-
-
- /// <summary>
/// Allows for configuration of an <see cref="FBMClient"/>
/// for a connection to a cache server
/// </summary>
@@ -359,32 +313,7 @@ namespace VNLib.Data.Caching.Extensions
ClientCacheConfig.AddOrUpdate(client, nodeConfig);
return nodeConfig;
}
-
- /// <summary>
- /// Discovers cache nodes in the broker configured for the current client.
- /// </summary>
- /// <param name="client"></param>
- /// <param name="token">A token to cancel the discovery</param>
- /// <returns>A task the resolves the list of active servers on the broker server</returns>
- public static Task<ICachePeerAdvertisment[]?> DiscoverCacheNodesAsync(this FBMClientWorkerBase client, CancellationToken token = default)
- {
- return client.Client.DiscoverCacheNodesAsync(token);
- }
-
- /// <summary>
- /// Discovers cache nodes in the broker configured for the current client.
- /// </summary>
- /// <param name="client"></param>
- /// <param name="token">A token to cancel the discovery </param>
- /// <returns>A task the resolves the list of active servers on the broker server</returns>
- public static async Task<ICachePeerAdvertisment[]?> DiscoverCacheNodesAsync(this FBMClient client, CancellationToken token = default)
- {
- //Get the stored client config
- CacheClientConfiguration conf = ClientCacheConfig.GetOrCreateValue(client);
-
- //List servers async
- return conf.CacheServers = await ListServersAsync(conf, token);
- }
+
/// <summary>
/// Waits for the client to disconnect from the server while observing
@@ -428,15 +357,18 @@ namespace VNLib.Data.Caching.Extensions
/// <exception cref="ArgumentNullException"></exception>
/// <exception cref="SecurityException"></exception>
/// <exception cref="ObjectDisposedException"></exception>
- public static async Task<ICachePeerAdvertisment> ConnectToRandomCacheAsync(this FBMClient client, CancellationToken cancellation = default)
+ public static async Task<ICacheNodeAdvertisment> ConnectToRandomCacheAsync(this FBMClient client, CancellationToken cancellation = default)
{
//Get stored config
CacheClientConfiguration conf = ClientCacheConfig.GetOrCreateValue(client);
- //Select random
- ICachePeerAdvertisment? randomServer = conf.CacheServers?.SelectRandom()
- ?? throw new ArgumentException("No servers detected, cannot connect");
+ //Get all available nodes, or at least the initial peers
+ ICacheNodeAdvertisment[]? adverts = conf.NodeCollection.GetAllNodes() ?? conf.InitialPeers ?? throw new ArgumentException("No cache nodes discovered, cannot connect");
+ //Select random node from all available nodes
+ ICacheNodeAdvertisment randomServer = adverts.SelectRandom();
+
+ //Connect to the random server
await ConnectToCacheAsync(client, randomServer, cancellation);
//Return the random server we connected to
@@ -456,13 +388,14 @@ namespace VNLib.Data.Caching.Extensions
/// <exception cref="ArgumentNullException"></exception>
/// <exception cref="SecurityException"></exception>
/// <exception cref="ObjectDisposedException"></exception>
- public static Task ConnectToCacheAsync(this FBMClient client, ICachePeerAdvertisment server, CancellationToken token = default)
+ public static Task ConnectToCacheAsync(this FBMClient client, ICacheNodeAdvertisment server, CancellationToken token = default)
{
_ = client ?? throw new ArgumentNullException(nameof(client));
_ = server ?? throw new ArgumentNullException(nameof(server));
//Get stored config
CacheClientConfiguration conf = ClientCacheConfig.GetOrCreateValue(client);
+
//Connect to server (no server id because client not replication server)
return ConnectToCacheAsync(client, conf, server, token);
}
@@ -481,7 +414,7 @@ namespace VNLib.Data.Caching.Extensions
/// <exception cref="ArgumentNullException"></exception>
/// <exception cref="SecurityException"></exception>
/// <exception cref="ObjectDisposedException"></exception>
- public static Task ConnectToCacheAsync(this FBMClient client, ICachePeerAdvertisment server, CacheClientConfiguration explicitConfig, CancellationToken token = default)
+ public static Task ConnectToCacheAsync(this FBMClient client, ICacheNodeAdvertisment server, CacheClientConfiguration explicitConfig, CancellationToken token = default)
{
_ = client ?? throw new ArgumentNullException(nameof(client));
_ = server ?? throw new ArgumentNullException(nameof(server));
@@ -494,7 +427,7 @@ namespace VNLib.Data.Caching.Extensions
private static async Task ConnectToCacheAsync(
FBMClient client,
CacheClientConfiguration config,
- ICachePeerAdvertisment server,
+ ICacheNodeAdvertisment server,
CancellationToken token = default
)
{
@@ -513,7 +446,7 @@ namespace VNLib.Data.Caching.Extensions
//Init jwt for connecting to server
using (JsonWebToken jwt = new())
{
- jwt.WriteHeader(config.GetJwtHeader());
+ jwt.WriteHeader(config.AuthManager.GetJwtHeader());
//Init claim
JwtPayload claim = jwt.InitPayloadClaim();
@@ -532,7 +465,7 @@ namespace VNLib.Data.Caching.Extensions
claim.CommitClaims();
//Sign jwt
- config.SignJwt(jwt);
+ config.AuthManager.SignJwt(jwt);
//Compile to string
jwtMessage = jwt.Compile();
@@ -576,7 +509,7 @@ namespace VNLib.Data.Caching.Extensions
using (JsonWebToken jwt = JsonWebToken.Parse(authToken))
{
//Verify the jwt
- if (!config.VerifyCache(jwt))
+ if (!config.AuthManager.VerifyJwt(jwt))
{
throw new SecurityException("Failed to verify the cache server's negotiation message, cannot continue");
}
@@ -591,7 +524,7 @@ namespace VNLib.Data.Caching.Extensions
client.ClientSocket.Headers[HttpRequestHeader.Authorization] = authToken;
//Compute the signature of the upgrade token
- client.ClientSocket.Headers[X_UPGRADE_SIG_HEADER] = GetBase64UpgradeSingature(authToken, config.SigningKey!);
+ client.ClientSocket.Headers[X_UPGRADE_SIG_HEADER] = config.AuthManager.GetBase64UpgradeSingature(authToken);
//Check to see if adversize self is enabled
if (cnc?.BroadcastAdverisment == true)
@@ -660,38 +593,16 @@ namespace VNLib.Data.Caching.Extensions
* compute a signature of the upgrade token and send it to the server to prove we hold the private key.
*/
- private static string GetBase64UpgradeSingature(string? token, ReadOnlyJsonWebKey key)
+ private static string GetBase64UpgradeSingature(this ICacheAuthManager man, string? token)
{
- //try to get the ecdsa key first
- using ECDsa? ec = key.GetECDsaPrivateKey();
-
- if(ec != null)
- {
- //Compute hash of the token
- byte[] hash = ManagedHash.ComputeHash(token, HashAlg.SHA256);
-
- //Sign the hash
- byte[] sig = ec.SignHash(hash, DSASignatureFormat.IeeeP1363FixedFieldConcatenation);
-
- //Return the base64 string
- return Convert.ToBase64String(sig);
- }
-
- //Check rsa next
- using RSA? rsa = key.GetRSAPrivateKey();
- if(rsa != null)
- {
- //Compute hash of the token
- byte[] hash = ManagedHash.ComputeHash(token, HashAlg.SHA256);
+ //Compute hash of the token
+ byte[] hash = ManagedHash.ComputeHash(token, HashAlg.SHA256);
- //Sign the hash
- byte[] sig = rsa.SignHash(hash, HashAlgorithmName.SHA256, RSASignaturePadding.Pkcs1);
+ //Sign the hash
+ byte[] sig = man.SignMessageHash(hash, HashAlg.SHA256);
- //Return the base64 string
- return Convert.ToBase64String(sig);
- }
-
- throw new CryptographicException("Cache JKW does not export a supported private key for upgrade challenges");
+ //Return the base64 string
+ return Convert.ToBase64String(sig);
}
/// <summary>
@@ -704,21 +615,21 @@ namespace VNLib.Data.Caching.Extensions
/// <exception cref="CryptographicException"></exception>
public static bool VerifyUpgradeToken(this CacheClientConfiguration nodeConfig, string signature, string token)
{
- return VerifyUpgradeToken(signature, token, nodeConfig.VerificationKey);
+ return VerifyUpgradeToken(nodeConfig.AuthManager, signature, token);
}
/// <summary>
/// Verifies the signed auth token against the given verification key
/// </summary>
+ /// <param name="man"></param>
/// <param name="signature">The base64 signature of the token</param>
/// <param name="token">The raw token to compute the hash of</param>
- /// <param name="verifcationKey">The key used to verify the singature with</param>
/// <returns>True if the singature matches, false otherwise</returns>
/// <exception cref="ArgumentNullException"></exception>
/// <exception cref="CryptographicException"></exception>
- public static bool VerifyUpgradeToken(string signature, string token, ReadOnlyJsonWebKey verifcationKey)
+ public static bool VerifyUpgradeToken(this ICacheAuthManager man, string signature, string token)
{
- _ = verifcationKey ?? throw new ArgumentNullException(nameof(verifcationKey));
+ _ = man ?? throw new ArgumentNullException(nameof(man));
//get the hash of the token
byte[] hash = ManagedHash.ComputeHash(token, HashAlg.SHA256);
@@ -726,23 +637,7 @@ namespace VNLib.Data.Caching.Extensions
//decode the signature
byte[] sig = Convert.FromBase64String(signature);
- //try to get the ecdsa key first
- using ECDsa? ec = verifcationKey.GetECDsaPublicKey();
- if(ec != null)
- {
- //Verify the signature
- return ec.VerifyHash(hash, sig, DSASignatureFormat.IeeeP1363FixedFieldConcatenation);
- }
-
- //Check rsa next
- using RSA? rsa = verifcationKey.GetRSAPublicKey();
- if(rsa != null)
- {
- //Verify the signature
- return rsa.VerifyHash(hash, sig, HashAlgorithmName.SHA256, RSASignaturePadding.Pkcs1);
- }
-
- throw new CryptographicException("Cache JKW does not export a supported public key for upgrade challenges");
+ return man.VerifyMessageHash(hash, HashAlg.SHA256, sig);
}
private static string GetAdvertismentHeader(CacheNodeConfiguration nodeConfiguration)
@@ -756,7 +651,7 @@ namespace VNLib.Data.Caching.Extensions
using JsonWebToken jwt = new();
//Get the jwt header
- jwt.WriteHeader(nodeConfiguration.GetJwtHeader());
+ jwt.WriteHeader(nodeConfiguration.AuthManager.GetJwtHeader());
jwt.InitPayloadClaim()
.AddClaim("nonce", RandomHash.GetRandomBase32(16))
@@ -768,7 +663,7 @@ namespace VNLib.Data.Caching.Extensions
.CommitClaims();
//Sign message
- nodeConfiguration.SignJwt(jwt);
+ nodeConfiguration.AuthManager.SignJwt(jwt);
return jwt.Compile();
}
@@ -780,12 +675,12 @@ namespace VNLib.Data.Caching.Extensions
/// <param name="message">The advertisment message to verify</param>
/// <returns>The advertisment message if successfully verified, or null otherwise</returns>
/// <exception cref="FormatException"></exception>
- public static ICachePeerAdvertisment? VerifyPeerAdvertisment(this ICacheJwtManager config, string message)
+ public static ICacheNodeAdvertisment? VerifyPeerAdvertisment(this ICacheAuthManager config, string message)
{
using JsonWebToken jwt = JsonWebToken.Parse(message);
//Verify the signature
- if (!config.VerifyCache(jwt))
+ if (!config.VerifyJwt(jwt))
{
return null;
}
@@ -800,7 +695,7 @@ namespace VNLib.Data.Caching.Extensions
/// </summary>
/// <param name="servers"></param>
/// <returns>A server selected at random</returns>
- public static ICachePeerAdvertisment SelectRandom(this ICollection<ICachePeerAdvertisment> servers)
+ public static ICacheNodeAdvertisment SelectRandom(this ICollection<ICacheNodeAdvertisment> servers)
{
//select random server
int randServer = RandomNumberGenerator.GetInt32(0, servers.Count);
@@ -808,7 +703,7 @@ namespace VNLib.Data.Caching.Extensions
}
- private class Advertisment : ICachePeerAdvertisment
+ private class Advertisment : ICacheNodeAdvertisment
{
[JsonIgnore]
public Uri? ConnectEndpoint { get; set; }
diff --git a/lib/VNLib.Data.Caching.Extensions/src/ICacheAuthManager.cs b/lib/VNLib.Data.Caching.Extensions/src/ICacheAuthManager.cs
new file mode 100644
index 0000000..e3ab868
--- /dev/null
+++ b/lib/VNLib.Data.Caching.Extensions/src/ICacheAuthManager.cs
@@ -0,0 +1,76 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Data.Caching.Extensions
+* File: ClientCacheConfiguration.cs
+*
+* ClientCacheConfiguration.cs is part of VNLib.Data.Caching.Extensions which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* VNLib.Data.Caching.Extensions is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Data.Caching.Extensions is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+using System.Collections.Generic;
+
+using VNLib.Hashing;
+using VNLib.Hashing.IdentityUtility;
+
+namespace VNLib.Data.Caching.Extensions
+{
+ /// <summary>
+ /// Provides authentication services for cache clients and
+ /// servers.
+ /// </summary>
+ public interface ICacheAuthManager
+ {
+ /// <summary>
+ /// Gets the JWT header to use for signing messages with the
+ /// given key
+ /// </summary>
+ /// <returns>The JWT header collection</returns>
+ IReadOnlyDictionary<string, string?> GetJwtHeader();
+
+ /// <summary>
+ /// Signs the given JWT
+ /// </summary>
+ /// <param name="jwt">The message to sign</param>
+ void SignJwt(JsonWebToken jwt);
+
+ /// <summary>
+ /// Verifies the given JWT
+ /// </summary>
+ /// <param name="jwt">The message to verify authenticity</param>
+ /// <returns>True of the JWT could be verified, false otherwise</returns>
+ bool VerifyJwt(JsonWebToken jwt);
+
+ /// <summary>
+ /// Signs the given message hash
+ /// </summary>
+ /// <param name="hash">The message hash to sign</param>
+ /// <param name="alg">The algorithm used to sign the message hash</param>
+ /// <returns>The signature of the hash</returns>
+ byte[] SignMessageHash(byte[] hash, HashAlg alg);
+
+ /// <summary>
+ /// Verifies the given message hash against the signature.
+ /// </summary>
+ /// <param name="hash">The message hash to compare</param>
+ /// <param name="alg">The algorithm used to produce the message hash</param>
+ /// <param name="signature">The message signature to verify the message against</param>
+ /// <returns>True of the signature could be verified</returns>
+ bool VerifyMessageHash(ReadOnlySpan<byte> hash, HashAlg alg, ReadOnlySpan<byte> signature);
+ }
+}
diff --git a/lib/VNLib.Data.Caching.Extensions/src/ICacheDiscoveryErrorHandler.cs b/lib/VNLib.Data.Caching.Extensions/src/ICacheDiscoveryErrorHandler.cs
new file mode 100644
index 0000000..3493d48
--- /dev/null
+++ b/lib/VNLib.Data.Caching.Extensions/src/ICacheDiscoveryErrorHandler.cs
@@ -0,0 +1,41 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Data.Caching.Extensions
+* File: ClientCacheConfiguration.cs
+*
+* ClientCacheConfiguration.cs is part of VNLib.Data.Caching.Extensions which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* VNLib.Data.Caching.Extensions is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Data.Caching.Extensions is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+
+namespace VNLib.Data.Caching.Extensions
+{
+ /// <summary>
+ /// Represents an type that will handle errors that occur during the discovery process
+ /// </summary>
+ public interface ICacheDiscoveryErrorHandler
+ {
+ /// <summary>
+ /// Invoked when an error occurs during the discovery process
+ /// </summary>
+ /// <param name="errorNode">The node that the error occured on</param>
+ /// <param name="ex">The exception that caused the invocation</param>
+ void OnDiscoveryError(ICacheNodeAdvertisment errorNode, Exception ex);
+ }
+}
diff --git a/lib/VNLib.Data.Caching.Extensions/src/ICachePeerAdvertisment.cs b/lib/VNLib.Data.Caching.Extensions/src/ICacheNodeAdvertisment.cs
index acf883e..fc29955 100644
--- a/lib/VNLib.Data.Caching.Extensions/src/ICachePeerAdvertisment.cs
+++ b/lib/VNLib.Data.Caching.Extensions/src/ICacheNodeAdvertisment.cs
@@ -3,9 +3,9 @@
*
* Library: VNLib
* Package: VNLib.Data.Caching.Extensions
-* File: ICachePeerAdvertisment.cs
+* File: ICacheNodeAdvertisment.cs
*
-* ICachePeerAdvertisment.cs is part of VNLib.Data.Caching.Extensions which is part of the larger
+* ICacheNodeAdvertisment.cs is part of VNLib.Data.Caching.Extensions which is part of the larger
* VNLib collection of libraries and utilities.
*
* VNLib.Data.Caching.Extensions is free software: you can redistribute it and/or modify
@@ -30,7 +30,7 @@ namespace VNLib.Data.Caching.Extensions
/// <summary>
/// Represents a node that can be advertised to clients
/// </summary>
- public interface ICachePeerAdvertisment
+ public interface ICacheNodeAdvertisment
{
/// <summary>
/// The endpoint for clients to connect to to access the cache
diff --git a/plugins/ObjectCacheServer/src/Distribution/INodeDiscoveryCollection.cs b/lib/VNLib.Data.Caching.Extensions/src/INodeDiscoveryCollection.cs
index d69da40..9adebdc 100644
--- a/plugins/ObjectCacheServer/src/Distribution/INodeDiscoveryCollection.cs
+++ b/lib/VNLib.Data.Caching.Extensions/src/INodeDiscoveryCollection.cs
@@ -2,18 +2,18 @@
* Copyright (c) 2023 Vaughn Nugent
*
* Library: VNLib
-* Package: ObjectCacheServer
+* Package: VNLib.Data.Caching.Extensions
* File: INodeDiscoveryCollection.cs
*
-* INodeDiscoveryCollection.cs is part of ObjectCacheServer which is
-* part of the larger VNLib collection of libraries and utilities.
+* INodeDiscoveryCollection.cs is part of VNLib.Data.Caching.Extensions which is part of the larger
+* VNLib collection of libraries and utilities.
*
-* ObjectCacheServer is free software: you can redistribute it and/or modify
+* VNLib.Data.Caching.Extensions is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
-* ObjectCacheServer is distributed in the hope that it will be useful,
+* VNLib.Data.Caching.Extensions is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
@@ -22,13 +22,17 @@
* along with this program. If not, see https://www.gnu.org/licenses/.
*/
-using System.Collections.Generic;
-using VNLib.Data.Caching.Extensions;
+using System.Collections.Generic;
-namespace VNLib.Data.Caching.ObjectCache.Server.Distribution
+namespace VNLib.Data.Caching.Extensions
{
- internal interface INodeDiscoveryCollection
+ /// <summary>
+ /// Represents a collection of discovered nodes
+ /// </summary>
+#pragma warning disable CA1711 // Identifiers should not have incorrect suffix
+ public interface INodeDiscoveryCollection
+#pragma warning restore CA1711 // Identifiers should not have incorrect suffix
{
/// <summary>
/// Begins a new discovery and gets an enumerator for the discovery process
@@ -41,13 +45,13 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Distribution
/// </summary>
/// <param name="initialPeers">An initial collection of peers to add to the enumeration</param>
/// <returns>An enumerator that simplifies discovery of unique nodes</returns>
- INodeDiscoveryEnumerator BeginDiscovery(IEnumerable<ICachePeerAdvertisment> initialPeers);
+ INodeDiscoveryEnumerator BeginDiscovery(IEnumerable<ICacheNodeAdvertisment> initialPeers);
/// <summary>
/// Gets a snapshot of all discovered nodes in the current collection.
/// </summary>
/// <returns>The current collection of notes</returns>
- ICachePeerAdvertisment[] GetAllNodes();
+ ICacheNodeAdvertisment[] GetAllNodes();
/// <summary>
/// Completes a discovery process and updates the collection with the results
diff --git a/plugins/ObjectCacheServer/src/Distribution/INodeDiscoveryEnumerator.cs b/lib/VNLib.Data.Caching.Extensions/src/INodeDiscoveryEnumerator.cs
index 5cddf9c..f6d5f40 100644
--- a/plugins/ObjectCacheServer/src/Distribution/INodeDiscoveryEnumerator.cs
+++ b/lib/VNLib.Data.Caching.Extensions/src/INodeDiscoveryEnumerator.cs
@@ -2,18 +2,18 @@
* Copyright (c) 2023 Vaughn Nugent
*
* Library: VNLib
-* Package: ObjectCacheServer
+* Package: VNLib.Data.Caching.Extensions
* File: INodeDiscoveryEnumerator.cs
*
-* INodeDiscoveryEnumerator.cs is part of ObjectCacheServer which is part
-* of the larger VNLib collection of libraries and utilities.
+* INodeDiscoveryEnumerator.cs is part of VNLib.Data.Caching.Extensions which is part of the larger
+* VNLib collection of libraries and utilities.
*
-* ObjectCacheServer is free software: you can redistribute it and/or modify
+* VNLib.Data.Caching.Extensions is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
-* ObjectCacheServer is distributed in the hope that it will be useful,
+* VNLib.Data.Caching.Extensions is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
@@ -22,24 +22,21 @@
* along with this program. If not, see https://www.gnu.org/licenses/.
*/
+
using System.Collections.Generic;
-using VNLib.Data.Caching.Extensions;
-namespace VNLib.Data.Caching.ObjectCache.Server.Distribution
+namespace VNLib.Data.Caching.Extensions
{
- internal interface INodeDiscoveryEnumerator
+ /// <summary>
+ /// A custom enumerator for the node discovery process
+ /// </summary>
+ public interface INodeDiscoveryEnumerator : IEnumerator<ICacheNodeAdvertisment>
{
/// <summary>
- /// Moves the enumerator to the next peer in the discovery process and returns the result
- /// </summary>
- /// <returns>The next peer advertisment in the enumeration</returns>
- ICachePeerAdvertisment? GetNextPeer();
-
- /// <summary>
/// Adds the specified peer to the collection of discovered peers
/// </summary>
/// <param name="discoveredPeers">The peer collection</param>
- void OnPeerDiscoveryComplete(IEnumerable<ICachePeerAdvertisment> discoveredPeers);
+ void OnPeerDiscoveryComplete(IEnumerable<ICacheNodeAdvertisment> discoveredPeers);
}
}
diff --git a/plugins/ObjectCacheServer/src/Distribution/NodeDiscoveryCollection.cs b/lib/VNLib.Data.Caching.Extensions/src/NodeDiscoveryCollection.cs
index f773a2e..305f5de 100644
--- a/plugins/ObjectCacheServer/src/Distribution/NodeDiscoveryCollection.cs
+++ b/lib/VNLib.Data.Caching.Extensions/src/NodeDiscoveryCollection.cs
@@ -2,18 +2,18 @@
* Copyright (c) 2023 Vaughn Nugent
*
* Library: VNLib
-* Package: ObjectCacheServer
-* File: PeerDiscoveryManager.cs
+* Package: VNLib.Data.Caching.Extensions
+* File: INodeDiscoveryCollection.cs
*
-* PeerDiscoveryManager.cs is part of ObjectCacheServer which is part of the larger
+* INodeDiscoveryCollection.cs is part of VNLib.Data.Caching.Extensions which is part of the larger
* VNLib collection of libraries and utilities.
*
-* ObjectCacheServer is free software: you can redistribute it and/or modify
+* VNLib.Data.Caching.Extensions is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
-* ObjectCacheServer is distributed in the hope that it will be useful,
+* VNLib.Data.Caching.Extensions is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
@@ -25,21 +25,24 @@
using System;
using System.Linq;
using System.Collections.Generic;
+using System.Collections;
-using VNLib.Plugins;
-using VNLib.Data.Caching.Extensions;
-
-namespace VNLib.Data.Caching.ObjectCache.Server.Distribution
+namespace VNLib.Data.Caching.Extensions
{
- sealed class NodeDiscoveryCollection : INodeDiscoveryCollection
+ /// <summary>
+ /// Represents a collection of available cache nodes from a discovery process
+ /// </summary>
+ public sealed class NodeDiscoveryCollection : INodeDiscoveryCollection
{
- private LinkedList<ICachePeerAdvertisment> _peers;
-
+ private LinkedList<ICacheNodeAdvertisment> _peers;
- public NodeDiscoveryCollection(PluginBase plugin)
+ /// <summary>
+ /// Initializes a new empty <see cref="NodeDiscoveryCollection"/>
+ /// </summary>
+ public NodeDiscoveryCollection()
{
- _peers = new();
- }
+ _peers = new();
+ }
///<inheritdoc/>
public INodeDiscoveryEnumerator BeginDiscovery()
@@ -48,7 +51,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Distribution
}
///<inheritdoc/>
- public INodeDiscoveryEnumerator BeginDiscovery(IEnumerable<ICachePeerAdvertisment> initialPeers)
+ public INodeDiscoveryEnumerator BeginDiscovery(IEnumerable<ICacheNodeAdvertisment> initialPeers)
{
//Init new enumerator with the initial peers
return new NodeEnumerator(new(initialPeers));
@@ -64,36 +67,51 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Distribution
}
///<inheritdoc/>
- public ICachePeerAdvertisment[] GetAllNodes()
+ public ICacheNodeAdvertisment[] GetAllNodes()
{
//Capture all current peers
return _peers.ToArray();
}
- private sealed record class NodeEnumerator(LinkedList<ICachePeerAdvertisment> Peers) : INodeDiscoveryEnumerator
+ private sealed record class NodeEnumerator(LinkedList<ICacheNodeAdvertisment> Peers) : INodeDiscoveryEnumerator
{
//Keep track of the current node in the collection so we can move down the list
- private LinkedListNode<ICachePeerAdvertisment>? _currentNode = Peers.First;
+ private LinkedListNode<ICacheNodeAdvertisment>? _currentNode = Peers.First;
+
+ public ICacheNodeAdvertisment Current => _currentNode?.Value;
+ object IEnumerator.Current => _currentNode?.Value;
+
- public ICachePeerAdvertisment? GetNextPeer()
+ ///<inheritdoc/>
+ public bool MoveNext()
{
//Move to the next peer in the collection
_currentNode = _currentNode?.Next;
- return _currentNode?.Value;
+ return _currentNode?.Value != null;
}
- public void OnPeerDiscoveryComplete(IEnumerable<ICachePeerAdvertisment> discoveredPeers)
+ ///<inheritdoc/>
+ public void OnPeerDiscoveryComplete(IEnumerable<ICacheNodeAdvertisment> discoveredPeers)
{
//Get only the peers from the discovery that are not already in the collection
- IEnumerable<ICachePeerAdvertisment> newPeers = discoveredPeers.Except(Peers);
-
+ IEnumerable<ICacheNodeAdvertisment> newPeers = discoveredPeers.Except(Peers);
+
//Add them to the end of the collection
- foreach(ICachePeerAdvertisment ad in newPeers)
+ foreach (ICacheNodeAdvertisment ad in newPeers)
{
Peers.AddLast(ad);
}
}
+
+ public void Reset()
+ {
+ //Go to the first node
+ _currentNode = Peers.First;
+ }
+
+ public void Dispose()
+ { }
}
}
}
diff --git a/lib/VNLib.Data.Caching.ObjectCache/src/BlobCacheLIstener.cs b/lib/VNLib.Data.Caching.ObjectCache/src/BlobCacheLIstener.cs
index f69c2a4..0fe0663 100644
--- a/lib/VNLib.Data.Caching.ObjectCache/src/BlobCacheLIstener.cs
+++ b/lib/VNLib.Data.Caching.ObjectCache/src/BlobCacheLIstener.cs
@@ -42,7 +42,6 @@ using System;
using System.Threading;
using System.Threading.Tasks;
-using VNLib.Utils.Async;
using VNLib.Utils.Memory;
using VNLib.Utils.Logging;
using VNLib.Net.Messaging.FBM.Server;
@@ -65,7 +64,7 @@ namespace VNLib.Data.Caching.ObjectCache
/// <summary>
/// A queue that stores update and delete events
/// </summary>
- public AsyncQueue<ChangeEvent> EventQueue { get; }
+ public ICacheListenerEventQueue EventQueue { get; }
/// <summary>
/// The Cache store to access data blobs
@@ -77,18 +76,17 @@ namespace VNLib.Data.Caching.ObjectCache
/// Initialzies a new <see cref="BlobCacheListener"/>
/// </summary>
/// <param name="cache">The cache table to work from</param>
+ /// <param name="queue">The event queue to publish changes to</param>
/// <param name="log">Writes error and debug logging information</param>
/// <param name="heap">The heap to alloc FBM buffers and <see cref="CacheEntry"/> cache buffers from</param>
- /// <param name="singleReader">A value that indicates if a single thread is processing events</param>
/// <exception cref="ArgumentNullException"></exception>
- public BlobCacheListener(IBlobCacheTable cache, ILogProvider log, IUnmangedHeap heap, bool singleReader)
+ public BlobCacheListener(IBlobCacheTable cache, ICacheListenerEventQueue queue, ILogProvider log, IUnmangedHeap heap)
{
Log = log;
Cache = cache ?? throw new ArgumentNullException(nameof(cache));
- //Writes may happen from multple threads with bucket design and no lock
- EventQueue = new(false, singleReader);
+ EventQueue = queue ?? throw new ArgumentNullException(nameof(queue));
InitListener(heap);
}
@@ -161,32 +159,25 @@ namespace VNLib.Data.Caching.ObjectCache
}
}
- static async Task DequeAsync(AsyncQueue<ChangeEvent> queue, FBMContext context, CancellationToken exitToken)
- {
- //Wait for a new message to process
- ChangeEvent ev = await queue.DequeueAsync(exitToken);
-
- //Set the response
- SetResponse(ev, context);
- }
-
- //If no event bus is registered, then this is not a legal command
- if (userState is not AsyncQueue<ChangeEvent> eventBus)
+ //Determine if the queue is enabled for the user
+ if(!EventQueue.IsEnabled(userState!))
{
context.CloseResponse(ResponseCodes.NotFound);
-
return;
}
//try to deq without awaiting
- if (eventBus.TryDequeue(out ChangeEvent? change))
+ if (EventQueue.TryDequeue(userState!, out ChangeEvent? change))
{
SetResponse(change, context);
}
else
{
- //Process async
- await DequeAsync(eventBus, context, exitToken);
+ //Wait for a new message to process
+ ChangeEvent ev = await EventQueue.DequeueAsync(userState!, exitToken);
+
+ //Set the response
+ SetResponse(ev, context);
}
return;
@@ -257,10 +248,7 @@ namespace VNLib.Data.Caching.ObjectCache
private void EnqueEvent(ChangeEvent change)
{
- if (!EventQueue.TryEnque(change))
- {
- Log.Warn("Change event {ev} was not enqued because the event queue is overflowing!", change.CurrentId);
- }
+ EventQueue.PublishEvent(change);
}
diff --git a/lib/VNLib.Data.Caching.ObjectCache/src/ICacheListenerEventQueue.cs b/lib/VNLib.Data.Caching.ObjectCache/src/ICacheListenerEventQueue.cs
new file mode 100644
index 0000000..06be4fa
--- /dev/null
+++ b/lib/VNLib.Data.Caching.ObjectCache/src/ICacheListenerEventQueue.cs
@@ -0,0 +1,65 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Data.Caching.ObjectCache
+* File: ICacheListenerEventQueue.cs
+*
+* ICacheListenerEventQueue.cs is part of VNLib.Data.Caching.ObjectCache which
+* is part of the larger VNLib collection of libraries and utilities.
+*
+* VNLib.Data.Caching.ObjectCache is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Data.Caching.ObjectCache is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace VNLib.Data.Caching.ObjectCache
+{
+ /// <summary>
+ /// Represents a single client's event queue
+ /// </summary>
+ public interface ICacheListenerEventQueue
+ {
+ /// <summary>
+ /// Determines if the queue is enabled for the given user state
+ /// </summary>
+ /// <param name="userState">The unique state of the connection</param>
+ /// <returns>True if event queuing is enabled</returns>
+ bool IsEnabled(object userState);
+
+ /// <summary>
+ /// Attempts to dequeue a single event from the queue without blocking
+ /// </summary>
+ /// <param name="userState">A user state object to associate with the wait operation</param>
+ /// <param name="changeEvent">The dequeued event if successfully dequeued</param>
+ /// <returns>True if an event was waiting and could be dequeued, false otherwise</returns>
+ bool TryDequeue(object userState, out ChangeEvent changeEvent);
+
+ /// <summary>
+ /// Waits asynchronously for an event to be dequeued
+ /// </summary>
+ /// <param name="userState">A user state object to associate with the wait operation</param>
+ /// <param name="cancellation">A token to cancel the wait operation</param>
+ /// <returns>The <see cref="ChangeEvent"/> that as a result of the dequeue operation</returns>
+ ValueTask<ChangeEvent> DequeueAsync(object userState, CancellationToken cancellation);
+
+ /// <summary>
+ /// Publishes an event to the queue
+ /// </summary>
+ /// <param name="changeEvent">The change event to publish</param>
+ void PublishEvent(ChangeEvent changeEvent);
+ }
+}
diff --git a/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs b/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs
index 9f5ccfe..f4f059b 100644
--- a/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs
+++ b/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs
@@ -31,15 +31,17 @@ using System.Net.WebSockets;
using System.Collections.Generic;
using System.Security.Cryptography;
+using VNLib.Hashing;
+using VNLib.Hashing.IdentityUtility;
using VNLib.Utils.Memory;
using VNLib.Utils.Logging;
-using VNLib.Hashing.IdentityUtility;
using VNLib.Data.Caching;
using VNLib.Data.Caching.Extensions;
using VNLib.Data.Caching.ObjectCache;
using VNLib.Net.Messaging.FBM.Client;
using VNLib.Plugins.Extensions.Loading;
+
namespace VNLib.Plugins.Extensions.VNCache
{
public interface ICacheRefreshPolicy
@@ -49,6 +51,7 @@ namespace VNLib.Plugins.Extensions.VNCache
TimeSpan RefreshInterval { get; }
}
+
/// <summary>
/// A base class that manages
/// </summary>
@@ -81,8 +84,6 @@ namespace VNLib.Plugins.Extensions.VNCache
_config = config;
- Uri brokerUri = new(config.BrokerAddress!);
-
//Init the client with default settings
FBMClientConfig conf = FBMDataCacheExtensions.GetDefaultConfig(MemoryUtil.Shared, config.MaxMessageSize!.Value, config.RequestTimeout, debugLog);
@@ -90,30 +91,18 @@ namespace VNLib.Plugins.Extensions.VNCache
//Add the configuration to the client
Client.GetCacheConfiguration()
- .WithBroker(brokerUri)
- .WithTls(brokerUri.Scheme == Uri.UriSchemeHttps);
+ .WithTls(config.UseTls)
+ .WithInitialPeers(config.InitialNodes!);
}
-
- public virtual async Task ConfigureServiceAsync(PluginBase plugin)
+ public Task ConfigureServiceAsync(PluginBase plugin)
{
- //Get keys async
- Task<ReadOnlyJsonWebKey?> clientPrivTask = plugin.TryGetSecretAsync("client_private_key").ToJsonWebKey();
- Task<ReadOnlyJsonWebKey?> brokerPubTask = plugin.TryGetSecretAsync("broker_public_key").ToJsonWebKey();
- Task<ReadOnlyJsonWebKey?> cachePubTask = plugin.TryGetSecretAsync("cache_public_key").ToJsonWebKey();
-
- //Wait for all tasks to complete
- _ = await Task.WhenAll(clientPrivTask, brokerPubTask, cachePubTask);
-
- ReadOnlyJsonWebKey clientPriv = await clientPrivTask ?? throw new KeyNotFoundException("Missing required secret client_private_key");
- ReadOnlyJsonWebKey brokerPub = await brokerPubTask ?? throw new KeyNotFoundException("Missing required secret broker_public_key");
- ReadOnlyJsonWebKey cachePub = await cachePubTask ?? throw new KeyNotFoundException("Missing required secret cache_public_key");
-
- //Connection authentication methods
+ //Set authenticator
Client.GetCacheConfiguration()
- .WithVerificationKey(cachePub)
- .WithSigningKey(clientPriv)
- .WithBrokerVerificationKey(brokerPub);
+ .WithAuthenticator(new AuthManager(plugin))
+ .WithErrorHandler(new DiscoveryErrHAndler(plugin.Log));
+
+ return Task.CompletedTask;
}
/*
@@ -127,7 +116,7 @@ namespace VNLib.Plugins.Extensions.VNCache
while (true)
{
//Load the server list
- ICachePeerAdvertisment[]? servers;
+ ICacheNodeAdvertisment[]? servers;
while (true)
{
try
@@ -163,7 +152,7 @@ namespace VNLib.Plugins.Extensions.VNCache
pluginLog.Debug("Connecting to random cache server");
//Connect to a random server
- ICachePeerAdvertisment selected = await Client.ConnectToRandomCacheAsync(exitToken);
+ ICacheNodeAdvertisment selected = await Client.ConnectToRandomCacheAsync(exitToken);
pluginLog.Debug("Connected to cache server {s}", selected.NodeId);
//Set connection status flag
@@ -255,5 +244,98 @@ namespace VNLib.Plugins.Extensions.VNCache
? throw new InvalidOperationException("The underlying client is not connected to a cache node")
: Client!.AddOrUpdateObjectAsync(key, newKey, value, serialzer, cancellation);
}
+
+
+ private sealed class AuthManager : ICacheAuthManager
+ {
+
+ private IAsyncLazy<ReadOnlyJsonWebKey> _sigKey;
+ private IAsyncLazy<ReadOnlyJsonWebKey> _verKey;
+
+ public AuthManager(PluginBase plugin)
+ {
+ //Lazy load keys
+
+ //Get the signing key
+ _sigKey = plugin.GetSecretAsync("client_private_key").ToLazy(static r => r.GetJsonWebKey());
+
+ //Lazy load cache public key
+ _verKey = plugin.GetSecretAsync("cache_public_key").ToLazy(static r => r.GetJsonWebKey());
+ }
+
+ public async Task AwaitLazyKeyLoad()
+ {
+ await _sigKey;
+ await _verKey;
+ }
+
+ ///<inheritdoc/>
+ public IReadOnlyDictionary<string, string?> GetJwtHeader()
+ {
+ //Get the signing key jwt header
+ return _sigKey.Value.JwtHeader;
+ }
+
+ ///<inheritdoc/>
+ public void SignJwt(JsonWebToken jwt)
+ {
+ //Sign the jwt with signing key
+ jwt.SignFromJwk(_sigKey.Value);
+ }
+
+ ///<inheritdoc/>
+ public byte[] SignMessageHash(byte[] hash, HashAlg alg)
+ {
+ //try to get the rsa alg for the signing key
+ using RSA? rsa = _sigKey.Value.GetRSAPublicKey();
+ if(rsa != null)
+ {
+ return rsa.SignHash(hash, alg.GetAlgName(), RSASignaturePadding.Pkcs1);
+ }
+
+ //try to get the ecdsa alg for the signing key
+ using ECDsa? ecdsa = _sigKey.Value.GetECDsaPublicKey();
+ if(ecdsa != null)
+ {
+ return ecdsa.SignHash(hash);
+ }
+
+ throw new NotSupportedException("The signing key is not a valid RSA or ECDSA key");
+ }
+
+ ///<inheritdoc/>
+ public bool VerifyJwt(JsonWebToken jwt)
+ {
+ return jwt.VerifyFromJwk(_verKey.Value);
+ }
+
+ ///<inheritdoc/>
+ public bool VerifyMessageHash(ReadOnlySpan<byte> hash, HashAlg alg, ReadOnlySpan<byte> signature)
+ {
+ //try to get the rsa alg for the signing key
+ using RSA? rsa = _verKey.Value.GetRSAPublicKey();
+ if (rsa != null)
+ {
+ return rsa.VerifyHash(hash, signature, alg.GetAlgName(), RSASignaturePadding.Pkcs1);
+ }
+
+ //try to get the ecdsa alg for the signing key
+ using ECDsa? ecdsa = _verKey.Value.GetECDsaPublicKey();
+ if (ecdsa != null)
+ {
+ return ecdsa.VerifyHash(hash, signature);
+ }
+
+ throw new NotSupportedException("The current key is not an RSA or ECDSA key and is not supported");
+ }
+ }
+
+ private sealed record class DiscoveryErrHAndler(ILogProvider Logger) : ICacheDiscoveryErrorHandler
+ {
+ public void OnDiscoveryError(ICacheNodeAdvertisment errorNode, Exception ex)
+ {
+ Logger.Error("Failed to discover nodes from server {s} cause:\n{err}", errorNode.NodeId, ex);
+ }
+ }
}
} \ No newline at end of file
diff --git a/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClientConfig.cs b/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClientConfig.cs
index 1d888ec..64d3e07 100644
--- a/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClientConfig.cs
+++ b/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClientConfig.cs
@@ -25,6 +25,7 @@
using System;
using System.Text.Json.Serialization;
+using VNLib.Data.Caching.Extensions;
using VNLib.Plugins.Extensions.Loading;
namespace VNLib.Plugins.Extensions.VNCache
@@ -45,8 +46,8 @@ namespace VNLib.Plugins.Extensions.VNCache
/// <summary>
/// The broker server address
/// </summary>
- [JsonPropertyName("broker_address")]
- public string? BrokerAddress { get; set; }
+ [JsonPropertyName("use_tls")]
+ public bool UseTls { get; set; } = true;
/// <summary>
/// The time (in seconds) to randomly delay polling the broker server
@@ -77,6 +78,12 @@ namespace VNLib.Plugins.Extensions.VNCache
/// </summary>
internal TimeSpan RequestTimeout => TimeSpan.FromSeconds(RequestTimeoutSeconds!.Value);
+ /// <summary>
+ /// The initial peers to connect to
+ /// </summary>
+ [JsonPropertyName("initial_nodes")]
+ public InitialNode[]? InitialNodes { get; set; }
+
void IOnConfigValidation.Validate()
{
if (!MaxMessageSize.HasValue || MaxMessageSize.Value < 1)
@@ -95,9 +102,42 @@ namespace VNLib.Plugins.Extensions.VNCache
throw new ArgumentException("You must specify a positive integer FBM message timoeut", "request_timeout_sec");
}
- if(!Uri.TryCreate(BrokerAddress, UriKind.RelativeOrAbsolute, out _))
+ //Validate initial nodes
+ if (InitialNodes == null || InitialNodes.Length == 0)
+ {
+ throw new ArgumentException("You must specify at least one initial peer", "initial_peers");
+ }
+
+ foreach (InitialNode peer in InitialNodes)
+ {
+ _ = peer.ConnectEndpoint ?? throw new ArgumentException("You must specify a connect endpoint for each initial node", "initial_nodes");
+ _ = peer.NodeId ?? throw new ArgumentException("You must specify a node id for each initial node", "initial_nodes");
+ }
+ }
+
+ public sealed record class InitialNode : ICacheNodeAdvertisment
+ {
+ [JsonIgnore]
+ public Uri ConnectEndpoint { get; private set; }
+
+ [JsonIgnore]
+ public Uri? DiscoveryEndpoint { get; private set; }
+
+ [JsonPropertyName("node_id")]
+ public string? NodeId { get; set; }
+
+ [JsonPropertyName("connect_endpoint")]
+ public string? ConnectEndpointString
+ {
+ get => ConnectEndpoint.ToString();
+ set => ConnectEndpoint = new Uri(value!);
+ }
+
+ [JsonPropertyName("discovery_endpoint")]
+ public string? DiscoveryEndpointString
{
- throw new ArgumentException("You must specify a valid HTTP uri broker address", "broker_address");
+ get => DiscoveryEndpoint?.ToString();
+ set => DiscoveryEndpoint = value == null ? null : new Uri(value);
}
}
}
diff --git a/plugins/ObjectCacheServer/src/CacheAuthKeyStore.cs b/plugins/ObjectCacheServer/src/CacheAuthKeyStore.cs
new file mode 100644
index 0000000..6725fbe
--- /dev/null
+++ b/plugins/ObjectCacheServer/src/CacheAuthKeyStore.cs
@@ -0,0 +1,117 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: ObjectCacheServer
+* File: ObjectCacheServerEntry.cs
+*
+* ObjectCacheServerEntry.cs is part of ObjectCacheServer which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* ObjectCacheServer is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* ObjectCacheServer is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+using System.Collections.Generic;
+using System.Security.Cryptography;
+
+using VNLib.Hashing;
+using VNLib.Plugins;
+using VNLib.Hashing.IdentityUtility;
+using VNLib.Plugins.Extensions.Loading;
+using VNLib.Data.Caching.Extensions;
+
+
+namespace VNLib.Data.Caching.ObjectCache.Server
+{
+ sealed record class CacheAuthKeyStore : ICacheAuthManager
+ {
+ private readonly IAsyncLazy<ReadOnlyJsonWebKey> _clientPub;
+ private readonly IAsyncLazy<ReadOnlyJsonWebKey> _cachePriv;
+
+ public CacheAuthKeyStore(PluginBase plugin)
+ {
+ _clientPub = plugin.GetSecretAsync("client_public_key").ToLazy(r => r.GetJsonWebKey());
+ _cachePriv = plugin.GetSecretAsync("cache_private_key").ToLazy(r => r.GetJsonWebKey());
+ }
+
+ ///<inheritdoc/>
+ public IReadOnlyDictionary<string, string?> GetJwtHeader()
+ {
+ return _cachePriv.Value.JwtHeader;
+ }
+
+ ///<inheritdoc/>
+ public void SignJwt(JsonWebToken jwt)
+ {
+ jwt.SignFromJwk(_cachePriv.Value);
+ }
+
+ ///<inheritdoc/>
+ public bool VerifyJwt(JsonWebToken jwt)
+ {
+ return jwt.VerifyFromJwk(_clientPub.Value);
+ }
+
+ /// <summary>
+ /// Verifies the message against the stored cache key
+ /// </summary>
+ /// <param name="jwt">The token to verify</param>
+ /// <returns>True if the token was verified, false otherwise</returns>
+ public bool VerifyCachePeer(JsonWebToken jwt)
+ {
+ return jwt.VerifyFromJwk(_cachePriv.Value);
+ }
+
+ ///<inheritdoc/>
+ public byte[] SignMessageHash(byte[] hash, HashAlg alg)
+ {
+ //try to get the rsa alg for the signing key
+ using RSA? rsa = _cachePriv.Value.GetRSAPublicKey();
+ if (rsa != null)
+ {
+ return rsa.SignHash(hash, alg.GetAlgName(), RSASignaturePadding.Pkcs1);
+ }
+
+ //try to get the ecdsa alg for the signing key
+ using ECDsa? ecdsa = _cachePriv.Value.GetECDsaPublicKey();
+ if (ecdsa != null)
+ {
+ return ecdsa.SignHash(hash);
+ }
+
+ throw new NotSupportedException("The signing key is not a valid RSA or ECDSA key");
+ }
+
+ ///<inheritdoc/>
+ public bool VerifyMessageHash(ReadOnlySpan<byte> hash, HashAlg alg, ReadOnlySpan<byte> signature)
+ {
+ //try to get the rsa alg for the signing key
+ using RSA? rsa = _clientPub.Value.GetRSAPublicKey();
+ if (rsa != null)
+ {
+ return rsa.VerifyHash(hash, signature, alg.GetAlgName(), RSASignaturePadding.Pkcs1);
+ }
+
+ //try to get the ecdsa alg for the signing key
+ using ECDsa? ecdsa = _clientPub.Value.GetECDsaPublicKey();
+ if (ecdsa != null)
+ {
+ return ecdsa.VerifyHash(hash, signature);
+ }
+
+ throw new NotSupportedException("The current key is not an RSA or ECDSA key and is not supported");
+ }
+ }
+}
diff --git a/plugins/ObjectCacheServer/src/CacheEventQueueManager.cs b/plugins/ObjectCacheServer/src/CacheEventQueueManager.cs
index 5fb6d2a..049069e 100644
--- a/plugins/ObjectCacheServer/src/CacheEventQueueManager.cs
+++ b/plugins/ObjectCacheServer/src/CacheEventQueueManager.cs
@@ -39,6 +39,7 @@ using VNLib.Plugins.Extensions.Loading.Events;
namespace VNLib.Data.Caching.ObjectCache.Server
{
+
[ConfigurationName("event_manager")]
internal sealed class CacheEventQueueManager : ICacheEventQueueManager, IDisposable, IIntervalScheduleable
{
@@ -70,7 +71,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server
}
///<inheritdoc/>
- public AsyncQueue<ChangeEvent> Subscribe(ICachePeer peer)
+ public IPeerEventQueue Subscribe(ICachePeer peer)
{
NodeQueue? nq;
@@ -198,7 +199,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server
* attached to the queue
*/
- private sealed class NodeQueue
+ private sealed class NodeQueue : IPeerEventQueue
{
public int Listeners;
@@ -243,6 +244,18 @@ namespace VNLib.Data.Caching.ObjectCache.Server
Queue.TryEnque(changes[i]);
}
}
+
+ ///<inheritdoc/>
+ public ValueTask<ChangeEvent> DequeueAsync(CancellationToken cancellation)
+ {
+ return Queue.DequeueAsync(cancellation);
+ }
+
+ ///<inheritdoc/>
+ public bool TryDequeue(out ChangeEvent change)
+ {
+ return Queue.TryDequeue(out change);
+ }
}
}
}
diff --git a/plugins/ObjectCacheServer/src/CacheListenerPubQueue.cs b/plugins/ObjectCacheServer/src/CacheListenerPubQueue.cs
new file mode 100644
index 0000000..52b6abf
--- /dev/null
+++ b/plugins/ObjectCacheServer/src/CacheListenerPubQueue.cs
@@ -0,0 +1,117 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: ObjectCacheServer
+* File: CacheListenerPubQueue.cs
+*
+* CacheListenerPubQueue.cs is part of ObjectCacheServer which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* ObjectCacheServer is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* ObjectCacheServer is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+
+using VNLib.Utils.Async;
+using VNLib.Utils.Logging;
+using VNLib.Plugins;
+using VNLib.Plugins.Extensions.Loading;
+
+
+namespace VNLib.Data.Caching.ObjectCache.Server
+{
+ internal sealed class CacheListenerPubQueue : ICacheListenerEventQueue, IAsyncBackgroundWork
+ {
+ private readonly AsyncQueue<ChangeEvent> _listenerQueue;
+ private readonly ILogProvider _logProvider;
+ private readonly ICacheEventQueueManager _queueManager;
+
+ public CacheListenerPubQueue(PluginBase plugin)
+ {
+ _queueManager = plugin.GetOrCreateSingleton<CacheEventQueueManager>();
+ _logProvider = plugin.Log;
+ _listenerQueue = new AsyncQueue<ChangeEvent>(false, true);
+
+ //Register processing worker
+ _ = plugin.ObserveWork(this, 500);
+ }
+
+ ///<inheritdoc/>
+ public async Task DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken)
+ {
+ const int accumulatorSize = 64;
+
+ try
+ {
+ //Accumulator for events
+ ChangeEvent[] accumulator = new ChangeEvent[accumulatorSize];
+ int ptr = 0;
+
+ //Listen for changes
+ while (true)
+ {
+ //Wait for next event
+ accumulator[ptr++] = await _listenerQueue.DequeueAsync(exitToken);
+
+ //try to accumulate more events until we can't anymore
+ while (_listenerQueue.TryDequeue(out ChangeEvent? ev) && ptr < accumulatorSize)
+ {
+ accumulator[ptr++] = ev;
+ }
+
+ //Publish all events to subscribers
+ _queueManager.PublishMultiple(accumulator.AsSpan(0, ptr));
+
+ //Reset pointer
+ ptr = 0;
+ }
+ }
+ catch (OperationCanceledException)
+ {
+ //Normal exit
+ pluginLog.Debug("Change queue listener worker exited");
+ }
+ }
+
+ ///<inheritdoc/>
+ public bool IsEnabled(object userState)
+ {
+ return userState is IPeerEventQueue;
+ }
+
+ ///<inheritdoc/>
+ public void PublishEvent(ChangeEvent changeEvent)
+ {
+ if (!_listenerQueue.TryEnque(changeEvent))
+ {
+ _logProvider.Warn("Cache listener event queue is overflowing");
+ }
+ }
+
+ ///<inheritdoc/>
+ public bool TryDequeue(object userState, out ChangeEvent changeEvent)
+ {
+ return (userState as IPeerEventQueue)!.TryDequeue(out changeEvent);
+ }
+
+ ///<inheritdoc/>
+ public ValueTask<ChangeEvent> DequeueAsync(object userState, CancellationToken cancellation)
+ {
+ return (userState as IPeerEventQueue)!.DequeueAsync(cancellation);
+ }
+ }
+}
diff --git a/plugins/ObjectCacheServer/src/Endpoints/CacheStore.cs b/plugins/ObjectCacheServer/src/CacheStore.cs
index 67db433..e7a7c63 100644
--- a/plugins/ObjectCacheServer/src/Endpoints/CacheStore.cs
+++ b/plugins/ObjectCacheServer/src/CacheStore.cs
@@ -3,9 +3,9 @@
*
* Library: VNLib
* Package: ObjectCacheServer
-* File: ConnectEndpoint.cs
+* File: CacheStore.cs
*
-* ConnectEndpoint.cs is part of ObjectCacheServer which is part of the larger
+* CacheStore.cs is part of ObjectCacheServer which is part of the larger
* VNLib collection of libraries and utilities.
*
* ObjectCacheServer is free software: you can redistribute it and/or modify
@@ -25,14 +25,16 @@
using System;
using System.Threading;
using System.Threading.Tasks;
+
using VNLib.Utils.Logging;
using VNLib.Plugins;
using VNLib.Plugins.Extensions.Loading;
+
namespace VNLib.Data.Caching.ObjectCache.Server
{
[ConfigurationName("cache")]
- sealed class CacheStore : ICacheStore, IDisposable
+ internal sealed class CacheStore : ICacheStore, IDisposable
{
public BlobCacheListener Listener { get; }
@@ -76,11 +78,20 @@ namespace VNLib.Data.Caching.ObjectCache.Server
plugin.Log.Verbose("Creating cache store with {bc} buckets, with {mc} items/bucket", cacheConf.BucketCount, cacheConf.MaxCacheEntries);
+ //calculate the max memory usage
+ ulong maxByteSize = ((ulong)cacheConf.MaxCacheEntries * (ulong)cacheConf.BucketCount * (ulong)cacheConf.MaxMessageSize);
+
+ //Log max memory usage
+ plugin.Log.Debug("Maxium memory consumption {mx}Mb", maxByteSize / (ulong)(1024 * 1000));
+
//Load the blob cache table system
IBlobCacheTable bc = plugin.LoadMemoryCacheSystem(config, plugin.CacheHeap, cacheConf);
+ //Get the event listener
+ ICacheListenerEventQueue queue = plugin.GetOrCreateSingleton<CacheListenerPubQueue>();
+
//Endpoint only allows for a single reader
- return new(bc, plugin.Log, plugin.CacheHeap, true);
+ return new(bc, queue, plugin.Log, plugin.CacheHeap);
}
public void Dispose()
diff --git a/plugins/ObjectCacheServer/src/Endpoints/CacheSystemUtil.cs b/plugins/ObjectCacheServer/src/CacheSystemUtil.cs
index 669b84f..669b84f 100644
--- a/plugins/ObjectCacheServer/src/Endpoints/CacheSystemUtil.cs
+++ b/plugins/ObjectCacheServer/src/CacheSystemUtil.cs
diff --git a/plugins/ObjectCacheServer/src/Distribution/CacheNodeReplicationMaanger.cs b/plugins/ObjectCacheServer/src/Distribution/CacheNodeReplicationMaanger.cs
index b453dcc..a55e8e2 100644
--- a/plugins/ObjectCacheServer/src/Distribution/CacheNodeReplicationMaanger.cs
+++ b/plugins/ObjectCacheServer/src/Distribution/CacheNodeReplicationMaanger.cs
@@ -58,7 +58,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Distribution
//Get peer adapter
PeerAdapter = plugin.GetOrCreateSingleton<PeerDiscoveryManager>();
-
+ CacheStore = plugin.GetOrCreateSingleton<CacheStore>();
}
public async Task DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken)
@@ -70,15 +70,15 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Distribution
while (true)
{
//Get all new peers
- ICachePeerAdvertisment[] peers = PeerAdapter.GetNewPeers();
+ ICacheNodeAdvertisment[] peers = PeerAdapter.GetNewPeers();
if (peers.Length == 0)
{
pluginLog.Verbose("[REPL] No new peers to connect to");
}
- //Connect to each peer
- foreach (ICachePeerAdvertisment peer in peers)
+ //Connect to each peer as a background task
+ foreach (ICacheNodeAdvertisment peer in peers)
{
_ = Plugin.ObserveWork(() => OnNewPeerDoWorkAsync(peer, pluginLog, exitToken));
}
@@ -104,13 +104,16 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Distribution
pluginLog.Information("[REPL] Node replication worker exited");
}
- private async Task OnNewPeerDoWorkAsync(ICachePeerAdvertisment newPeer, ILogProvider log, CancellationToken exitToken)
+ private async Task OnNewPeerDoWorkAsync(ICacheNodeAdvertisment newPeer, ILogProvider log, CancellationToken exitToken)
{
_ = newPeer ?? throw new ArgumentNullException(nameof(newPeer));
//Setup client
FBMClient client = new(ClientConfig);
+ //Add peer to monitor
+ PeerAdapter.OnPeerListenerAttached(newPeer);
+
try
{
log.Information("Establishing replication connection to peer {server}...", newPeer.NodeId);
@@ -198,7 +201,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Distribution
return;
case "deleted":
//Delete the object from the store
- await CacheStore.DeleteItemAsync(changedObject.CurrentId);
+ await CacheStore.DeleteItemAsync(changedObject.CurrentId, CancellationToken.None);
break;
case "modified":
//Reload the record from the store
@@ -226,6 +229,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Distribution
//Check response code
string status = response.Headers.First(static s => s.Header == HeaderCommand.Status).Value.ToString();
+
if (ResponseCodes.Okay.Equals(status, StringComparison.Ordinal))
{
//Update the record
diff --git a/plugins/ObjectCacheServer/src/Distribution/CachePeerMonitor.cs b/plugins/ObjectCacheServer/src/Distribution/CachePeerMonitor.cs
index 82b280c..f191c9d 100644
--- a/plugins/ObjectCacheServer/src/Distribution/CachePeerMonitor.cs
+++ b/plugins/ObjectCacheServer/src/Distribution/CachePeerMonitor.cs
@@ -22,7 +22,7 @@
* along with this program. If not, see https://www.gnu.org/licenses/.
*/
-using System;
+using System.Linq;
using System.Collections.Generic;
using VNLib.Plugins;
@@ -32,24 +32,39 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Distribution
internal sealed class CachePeerMonitor : IPeerMonitor
{
- public CachePeerMonitor(PluginBase plugin)
- {
- }
+ private readonly LinkedList<ICachePeer> peers = new();
+
+ public CachePeerMonitor(PluginBase plugin)
+ { }
+ ///<inheritdoc/>
public IEnumerable<ICachePeer> GetAllPeers()
{
- throw new NotImplementedException();
+ lock(peers)
+ {
+ return peers.ToArray();
+ }
}
+ ///<inheritdoc/>
public void OnPeerConnected(ICachePeer peer)
{
- throw new NotImplementedException();
+ //When a peer is connected we can add it to the list so the replication manager can see it
+ lock(peers)
+ {
+ peers.AddLast(peer);
+ }
}
+ ///<inheritdoc/>
public void OnPeerDisconnected(ICachePeer peer)
{
- throw new NotImplementedException();
+ //When a peer is disconnected we can remove it from the list
+ lock(peers)
+ {
+ peers.Remove(peer);
+ }
}
}
}
diff --git a/plugins/ObjectCacheServer/src/Distribution/ICachePeerAdapter.cs b/plugins/ObjectCacheServer/src/Distribution/ICachePeerAdapter.cs
index d029f10..c3fb022 100644
--- a/plugins/ObjectCacheServer/src/Distribution/ICachePeerAdapter.cs
+++ b/plugins/ObjectCacheServer/src/Distribution/ICachePeerAdapter.cs
@@ -3,9 +3,9 @@
*
* Library: VNLib
* Package: ObjectCacheServer
-* File: ObjectCacheServerEntry.cs
+* File: ICachePeerAdapter.cs
*
-* ObjectCacheServerEntry.cs is part of ObjectCacheServer which is part of the larger
+* ICachePeerAdapter.cs is part of ObjectCacheServer which is part of the larger
* VNLib collection of libraries and utilities.
*
* ObjectCacheServer is free software: you can redistribute it and/or modify
@@ -32,18 +32,18 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Distribution
/// Gets the peers that have been discovered but not yet connected to
/// </summary>
/// <returns>A collection of peers that have not been connected to yet</returns>
- ICachePeerAdvertisment[] GetNewPeers();
+ ICacheNodeAdvertisment[] GetNewPeers();
/// <summary>
/// Called when a peer has been connected to
/// </summary>
/// <param name="peer">The peer that has been connected</param>
- void OnPeerListenerAttached(ICachePeerAdvertisment peer);
+ void OnPeerListenerAttached(ICacheNodeAdvertisment peer);
/// <summary>
/// Called when a peer has been disconnected from
/// </summary>
/// <param name="peer">The disconnected peer</param>
- void OnPeerListenerDetatched(ICachePeerAdvertisment peer);
+ void OnPeerListenerDetatched(ICacheNodeAdvertisment peer);
}
}
diff --git a/plugins/ObjectCacheServer/src/Distribution/IPeerMonitor.cs b/plugins/ObjectCacheServer/src/Distribution/IPeerMonitor.cs
index b4cb840..028171f 100644
--- a/plugins/ObjectCacheServer/src/Distribution/IPeerMonitor.cs
+++ b/plugins/ObjectCacheServer/src/Distribution/IPeerMonitor.cs
@@ -45,9 +45,9 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Distribution
void OnPeerDisconnected(ICachePeer peer);
/// <summary>
- /// Gets an enumerable of all peers currently active in the current peer
+ /// Gets an enumerable of all peers currently connected to this node
/// </summary>
- /// <returns></returns>
+ /// <returns>The collection of all connected peers</returns>
IEnumerable<ICachePeer> GetAllPeers();
}
}
diff --git a/plugins/ObjectCacheServer/src/Distribution/KnownPeerList.cs b/plugins/ObjectCacheServer/src/Distribution/KnownPeerList.cs
new file mode 100644
index 0000000..74df81f
--- /dev/null
+++ b/plugins/ObjectCacheServer/src/Distribution/KnownPeerList.cs
@@ -0,0 +1,100 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: ObjectCacheServer
+* File: KnownPeerList.cs
+*
+* KnownPeerList.cs is part of ObjectCacheServer which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* ObjectCacheServer is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* ObjectCacheServer is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+using System.Linq;
+using System.Collections.Generic;
+using System.Text.Json.Serialization;
+
+using VNLib.Plugins;
+using VNLib.Data.Caching.Extensions;
+using VNLib.Plugins.Extensions.Loading;
+
+
+namespace VNLib.Data.Caching.ObjectCache.Server.Distribution
+{
+ [ConfigurationName("known_peers")]
+ internal sealed class KnownPeerList
+ {
+ private readonly List<KnownPeer> _peers;
+
+ public KnownPeerList(PluginBase plugin, IConfigScope config)
+ {
+ //Deserialze the known peers into an array
+ KnownPeer[] peers = config.Deserialze<KnownPeer[]>();
+
+ foreach (KnownPeer peer in peers)
+ {
+ //Validate the peer
+ peer.Validate();
+ }
+
+ _peers = peers?.ToList() ?? new();
+ }
+
+ public IEnumerable<ICacheNodeAdvertisment> GetPeers()
+ {
+ return _peers;
+ }
+
+ private sealed class KnownPeer : ICacheNodeAdvertisment
+ {
+ public Uri? ConnectEndpoint { get; set; }
+ public Uri? DiscoveryEndpoint { get; set; }
+
+ [JsonPropertyName("node_id")]
+ public string NodeId { get; set; }
+
+ [JsonPropertyName("connect_url")]
+ public string? ConnectEpPath
+ {
+ get => ConnectEndpoint?.ToString() ?? string.Empty;
+ set => ConnectEndpoint = new Uri(value ?? string.Empty);
+ }
+
+ [JsonPropertyName("discovery_url")]
+ public string? DiscoveryEpPath
+ {
+ get => DiscoveryEndpoint?.ToString() ?? string.Empty;
+ set => DiscoveryEndpoint = new Uri(value ?? string.Empty);
+ }
+
+ public void Validate()
+ {
+ if (string.IsNullOrWhiteSpace(NodeId))
+ {
+ throw new ArgumentException("Node ID cannot be null or whitespace");
+ }
+ if (ConnectEndpoint is null)
+ {
+ throw new ArgumentException("Connect endpoint cannot be null");
+ }
+ if (DiscoveryEndpoint is null)
+ {
+ throw new ArgumentException("Discovery endpoint cannot be null");
+ }
+ }
+ }
+ }
+}
diff --git a/plugins/ObjectCacheServer/src/Distribution/PeerDiscoveryManager.cs b/plugins/ObjectCacheServer/src/Distribution/PeerDiscoveryManager.cs
index 54e4258..26ec565 100644
--- a/plugins/ObjectCacheServer/src/Distribution/PeerDiscoveryManager.cs
+++ b/plugins/ObjectCacheServer/src/Distribution/PeerDiscoveryManager.cs
@@ -33,14 +33,16 @@ using VNLib.Utils.Logging;
using VNLib.Data.Caching.Extensions;
using VNLib.Plugins.Extensions.Loading;
+
namespace VNLib.Data.Caching.ObjectCache.Server.Distribution
{
sealed class PeerDiscoveryManager : IAsyncBackgroundWork, ICachePeerAdapter
{
+ private readonly List<ICacheNodeAdvertisment> _connectedPeers;
private readonly NodeConfig _config;
- private readonly IPeerMonitor _monitor;
- private readonly INodeDiscoveryCollection _peers;
+ private readonly IPeerMonitor _monitor;
+ private readonly KnownPeerList _knownPeers;
public PeerDiscoveryManager(PluginBase plugin)
{
@@ -50,14 +52,23 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Distribution
//Get the peer monitor
_monitor = plugin.GetOrCreateSingleton<CachePeerMonitor>();
- //Get the node collection
- _peers = plugin.GetOrCreateSingleton<NodeDiscoveryCollection>();
+ //Get the known peer list
+ _knownPeers = plugin.GetOrCreateSingleton<KnownPeerList>();
_connectedPeers = new();
+
+ //Setup discovery error handler
+ _config.Config.WithErrorHandler(new ErrorHandler(plugin.Log));
}
async Task IAsyncBackgroundWork.DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken)
{
+ /*
+ * This loop uses the peer monitor to keep track of all connected peers, then gets
+ * all the advertising peers (including known peers) and resolves all nodes across
+ * the network.
+ */
+
pluginLog.Information("Node discovery worker started");
try
@@ -66,7 +77,19 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Distribution
{
try
{
- await DiscoverAllNodesAsync(pluginLog, exitToken);
+ //Use the monitor to get the initial peers
+ IEnumerable<ICacheNodeAdvertisment> ads = _monitor.GetAllPeers()
+ .Where(static p => p.Advertisment != null)
+ .Select(static p => p.Advertisment!);
+
+ //Add known peers to the initial list
+ ads = ads.Union(_knownPeers.GetPeers());
+
+ //Set initial peers
+ _config.Config.WithInitialPeers(ads);
+
+ //Discover all nodes
+ await _config.Config.DiscoverNodesAsync(exitToken);
}
catch(OperationCanceledException)
{
@@ -91,54 +114,15 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Distribution
}
}
-
- async Task DiscoverAllNodesAsync(ILogProvider log, CancellationToken cancellation)
- {
- //Use the monitor to get the initial peers
- IEnumerable<ICachePeerAdvertisment> ads = _monitor.GetAllPeers()
- .Where(static p => p.Advertisment != null)
- .Select(static p => p.Advertisment!);
-
- //Init enumerator with initial peers
- INodeDiscoveryEnumerator enumerator = _peers.BeginDiscovery(ads);
-
- do
- {
- //Load the initial peer
- ICachePeerAdvertisment? peer = enumerator.GetNextPeer();
-
- if (peer == null)
- {
- break;
- }
-
- log.Verbose("Discovering peer nodes from {Peer}", peer.NodeId);
-
- //Discover nodes from this peer
- ICachePeerAdvertisment[]? newNodes = await _config.Config.DiscoverClusterNodesAsync(peer, cancellation);
-
- //Add nodes to the enumerator
- if (newNodes != null)
- {
- enumerator.OnPeerDiscoveryComplete(newNodes);
- }
-
- } while (true);
-
- //Commit peer updates
- _peers.CompleteDiscovery(enumerator);
- }
-
-
- private readonly List<ICachePeerAdvertisment> _connectedPeers;
+
///<inheritdoc/>
- public ICachePeerAdvertisment[] GetNewPeers()
+ public ICacheNodeAdvertisment[] GetNewPeers()
{
lock (_connectedPeers)
{
//Get all discovered peers
- ICachePeerAdvertisment[] peers = _peers.GetAllNodes();
+ ICacheNodeAdvertisment[] peers = _config.Config.NodeCollection.GetAllNodes();
//Get the difference between the discovered peers and the connected peers
return peers.Except(_connectedPeers).ToArray();
@@ -146,7 +130,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Distribution
}
///<inheritdoc/>
- public void OnPeerListenerAttached(ICachePeerAdvertisment peer)
+ public void OnPeerListenerAttached(ICacheNodeAdvertisment peer)
{
lock (_connectedPeers)
{
@@ -156,7 +140,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Distribution
}
///<inheritdoc/>
- public void OnPeerListenerDetatched(ICachePeerAdvertisment peer)
+ public void OnPeerListenerDetatched(ICacheNodeAdvertisment peer)
{
//remove from connected peers
lock (_connectedPeers)
@@ -164,5 +148,14 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Distribution
_connectedPeers.Remove(peer);
}
}
+
+
+ private sealed record class ErrorHandler(ILogProvider Logger) : ICacheDiscoveryErrorHandler
+ {
+ public void OnDiscoveryError(ICacheNodeAdvertisment errorNode, Exception ex)
+ {
+ Logger.Error("Failed to discover nodes from nodeid {nid}, with error\n{err}", errorNode, ex);
+ }
+ }
}
}
diff --git a/plugins/ObjectCacheServer/src/Endpoints/BrokerHeartBeatEndpoint.cs b/plugins/ObjectCacheServer/src/Endpoints/BrokerHeartBeatEndpoint.cs
deleted file mode 100644
index b9c00e6..0000000
--- a/plugins/ObjectCacheServer/src/Endpoints/BrokerHeartBeatEndpoint.cs
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
-* Copyright (c) 2023 Vaughn Nugent
-*
-* Library: VNLib
-* Package: ObjectCacheServer
-* File: BrokerHeartBeatEndpoint.cs
-*
-* BrokerHeartBeatEndpoint.cs is part of ObjectCacheServer which is part of the larger
-* VNLib collection of libraries and utilities.
-*
-* ObjectCacheServer is free software: you can redistribute it and/or modify
-* it under the terms of the GNU Affero General Public License as
-* published by the Free Software Foundation, either version 3 of the
-* License, or (at your option) any later version.
-*
-* ObjectCacheServer is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-* GNU Affero General Public License for more details.
-*
-* You should have received a copy of the GNU Affero General Public License
-* along with this program. If not, see https://www.gnu.org/licenses/.
-*/
-
-using System;
-using System.Net;
-using System.Linq;
-using System.Text.Json;
-using System.Threading.Tasks;
-
-
-using VNLib.Plugins;
-using VNLib.Utils.Logging;
-using VNLib.Plugins.Essentials;
-using VNLib.Hashing.IdentityUtility;
-using VNLib.Plugins.Essentials.Endpoints;
-using VNLib.Plugins.Essentials.Extensions;
-using VNLib.Plugins.Extensions.Loading;
-
-namespace VNLib.Data.Caching.ObjectCache.Server
-{
- internal sealed class BrokerHeartBeatEndpoint : ResourceEndpointBase
- {
- private readonly IBrokerHeartbeatNotifier _heartBeat;
- private readonly Task<IPAddress[]> BrokerIpList;
- private readonly bool DebugMode;
-
- ///<inheritdoc/>
- protected override ProtectionSettings EndpointProtectionSettings { get; } = new()
- {
- DisableBrowsersOnly = true,
- DisableSessionsRequired = true
- };
-
- public BrokerHeartBeatEndpoint(PluginBase plugin)
- {
- //Get debug flag
- DebugMode = plugin.IsDebug();
-
- //Get or create the current node config
- _heartBeat = plugin.GetOrCreateSingleton<NodeConfig>();
-
- /*
- * Resolve the ip address of the broker and store it to verify connections
- * later
- */
- BrokerIpList = Dns.GetHostAddressesAsync(_heartBeat.GetBrokerAddress().DnsSafeHost);
-
- //Setup endpoint
- InitPathAndLog("/heartbeat", plugin.Log);
- }
-
-
- protected override async ValueTask<VfReturnType> GetAsync(HttpEntity entity)
- {
- //If-not loopback then verify server address
- if (!entity.Server.IsLoopBack())
- {
- //Load and verify the broker's ip address matches with an address we have stored
- IPAddress[] addresses = await BrokerIpList;
-
- if (!addresses.Contains(entity.TrustedRemoteIp))
- {
- if (DebugMode)
- {
- Log.Debug("Received connection {ip} that was not a DNS safe address for the broker server, access denied");
- }
-
- //Token invalid
- entity.CloseResponse(HttpStatusCode.Forbidden);
- return VfReturnType.VirtualSkip;
- }
- }
-
- //Get the authorization jwt
- string? jwtAuth = entity.Server.Headers[HttpRequestHeader.Authorization];
-
- if (string.IsNullOrWhiteSpace(jwtAuth))
- {
- //Token invalid
- entity.CloseResponse(HttpStatusCode.Forbidden);
- return VfReturnType.VirtualSkip;
- }
-
- //Parse the jwt
- using JsonWebToken jwt = JsonWebToken.Parse(jwtAuth);
-
- //Verify the jwt using the broker's public key certificate
- using (ReadOnlyJsonWebKey cert = _heartBeat.GetBrokerPublicKey())
- {
- //Verify the jwt
- if (!jwt.VerifyFromJwk(cert))
- {
- //Token invalid
- entity.CloseResponse(HttpStatusCode.Forbidden);
- return VfReturnType.VirtualSkip;
- }
- }
-
- string? auth;
- //Recover the auth token from the jwt
- using (JsonDocument doc = jwt.GetPayload())
- {
- auth = doc.RootElement.GetProperty("token").GetString();
- }
-
- //Get our stored token used for registration
- string? selfToken = _heartBeat.GetAuthToken();
-
- //Verify token
- if (selfToken != null && selfToken.Equals(auth, StringComparison.Ordinal))
- {
- //Signal keepalive
- _heartBeat.HearbeatReceived();
- entity.CloseResponse(HttpStatusCode.OK);
- return VfReturnType.VirtualSkip;
- }
-
- if (DebugMode)
- {
- Log.Debug("Invalid auth token recieved from broker sever, access denied");
- }
-
- //Token invalid
- entity.CloseResponse(HttpStatusCode.Forbidden);
- return VfReturnType.VirtualSkip;
- }
- }
-}
diff --git a/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs b/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs
index 167a7e9..8352635 100644
--- a/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs
+++ b/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs
@@ -31,10 +31,8 @@ using System.Collections.Generic;
using VNLib.Hashing;
using VNLib.Net.Http;
-using VNLib.Utils.Async;
using VNLib.Utils.Memory;
using VNLib.Utils.Logging;
-using VNLib.Utils.Extensions;
using VNLib.Data.Caching;
using VNLib.Data.Caching.Extensions;
using VNLib.Hashing.IdentityUtility;
@@ -49,21 +47,21 @@ using VNLib.Plugins.Essentials.Extensions;
using VNLib.Plugins.Extensions.Loading.Routing;
using VNLib.Data.Caching.ObjectCache.Server.Distribution;
-namespace VNLib.Data.Caching.ObjectCache.Server
+
+namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
{
[ConfigurationName("connect_endpoint")]
- internal sealed class ConnectEndpoint : ResourceEndpointBase, IAsyncBackgroundWork
+ internal sealed class ConnectEndpoint : ResourceEndpointBase
{
private static readonly TimeSpan AuthTokenExpiration = TimeSpan.FromSeconds(30);
- private readonly CacheNodeConfiguration NodeConfiguration;
+ private readonly NodeConfig NodeConfiguration;
private readonly ICacheEventQueueManager PubSubManager;
private readonly IPeerMonitor Peers;
private readonly BlobCacheListener Store;
- private readonly CacheAuthKeyStore KeyStore;
private readonly bool VerifyIp;
private readonly string AudienceLocalServerId;
@@ -94,8 +92,6 @@ namespace VNLib.Data.Caching.ObjectCache.Server
InitPathAndLog(path, plugin.Log);
- KeyStore = new(plugin);
-
//Check for ip-verification flag
VerifyIp = config.TryGetValue("verify_ip", out JsonElement vIpEl) && vIpEl.GetBoolean();
@@ -103,7 +99,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server
PubSubManager = plugin.GetOrCreateSingleton<CacheEventQueueManager>();
//Get node configuration
- NodeConfiguration = plugin.GetOrCreateSingleton<NodeConfig>().Config;
+ NodeConfiguration = plugin.GetOrCreateSingleton<NodeConfig>();
//Get peer monitor
Peers = plugin.GetOrCreateSingleton<CachePeerMonitor>();
@@ -119,9 +115,6 @@ namespace VNLib.Data.Caching.ObjectCache.Server
* know client tokens belong to us when singed by the same key
*/
AudienceLocalServerId = Guid.NewGuid().ToString("N");
-
- //Schedule the queue worker to be run
- _ = plugin.ObserveWork(this, 100);
}
@@ -142,7 +135,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server
* received the messages properly
*/
- protected override async ValueTask<VfReturnType> GetAsync(HttpEntity entity)
+ protected override VfReturnType Get(HttpEntity entity)
{
//Parse jwt from authoriation
string? jwtAuth = entity.Server.Headers[HttpRequestHeader.Authorization];
@@ -161,22 +154,18 @@ namespace VNLib.Data.Caching.ObjectCache.Server
{
bool verified = false;
- //Get the client public key certificate to verify the client's message
- using(ReadOnlyJsonWebKey cert = await KeyStore.GetClientPublicKeyAsync())
+ //verify signature for client
+ if (NodeConfiguration.KeyStore.VerifyJwt(jwt))
{
- //verify signature for client
- if (jwt.VerifyFromJwk(cert))
- {
- verified = true;
- }
- //May be signed by a cache server
- else
- {
- //Set peer and verified flag since the another cache server signed the request
- isPeer = verified = NodeConfiguration.VerifyCache(jwt);
- }
+ verified = true;
+ }
+ //May be signed by a cache server
+ else
+ {
+ //Set peer and verified flag since the another cache server signed the request
+ isPeer = verified = NodeConfiguration.KeyStore.VerifyCachePeer(jwt);
}
-
+
//Check flag
if (!verified)
{
@@ -204,7 +193,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server
//Verified, now we can create an auth message with a short expiration
using JsonWebToken auth = new();
- auth.WriteHeader(NodeConfiguration.GetJwtHeader());
+ auth.WriteHeader(NodeConfiguration.KeyStore.GetJwtHeader());
auth.InitPayloadClaim()
.AddClaim("aud", AudienceLocalServerId)
.AddClaim("exp", entity.RequestedTimeUtc.Add(AuthTokenExpiration).ToUnixTimeSeconds())
@@ -223,14 +212,14 @@ namespace VNLib.Data.Caching.ObjectCache.Server
.CommitClaims();
//Sign the auth message from our private key
- NodeConfiguration.SignJwt(auth);
+ NodeConfiguration.KeyStore.SignJwt(auth);
//Close response
entity.CloseResponse(HttpStatusCode.OK, ContentType.Text, auth.DataBuffer);
return VfReturnType.VirtualSkip;
}
- protected override async ValueTask<VfReturnType> WebsocketRequestedAsync(HttpEntity entity)
+ protected override VfReturnType WebsocketRequested(HttpEntity entity)
{
//Parse jwt from authorization
string? jwtAuth = entity.Server.Headers[HttpRequestHeader.Authorization];
@@ -251,13 +240,13 @@ namespace VNLib.Data.Caching.ObjectCache.Server
}
string? nodeId = null;
- ICachePeerAdvertisment? discoveryAd = null;
+ ICacheNodeAdvertisment? discoveryAd = null;
//Parse jwt
using (JsonWebToken jwt = JsonWebToken.Parse(jwtAuth))
{
//verify signature against the cache public key, since this server must have signed it
- if (!NodeConfiguration.VerifyCache(jwt))
+ if (!NodeConfiguration.KeyStore.VerifyCachePeer(jwt))
{
entity.CloseResponse(HttpStatusCode.Unauthorized);
return VfReturnType.VirtualSkip;
@@ -313,7 +302,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server
if (isPeer)
{
//Verify token signature against a fellow cache public key
- if (!NodeConfiguration.VerifyUpgradeToken(clientSignature, jwtAuth))
+ if (!NodeConfiguration.KeyStore.VerifyUpgradeToken(clientSignature, jwtAuth))
{
entity.CloseResponse(HttpStatusCode.Unauthorized);
return VfReturnType.VirtualSkip;
@@ -325,16 +314,13 @@ namespace VNLib.Data.Caching.ObjectCache.Server
//Verify the node advertisement header and publish it
if (!string.IsNullOrWhiteSpace(discoveryHeader))
{
- discoveryAd = NodeConfiguration.VerifyPeerAdvertisment(discoveryHeader);
+ discoveryAd = NodeConfiguration.KeyStore.VerifyPeerAdvertisment(discoveryHeader);
}
}
else
{
//Not a peer, so verify against the client's public key
- using ReadOnlyJsonWebKey clientPub = await KeyStore.GetClientPublicKeyAsync();
-
- //Verify token signature
- if (!FBMDataCacheExtensions.VerifyUpgradeToken(clientSignature, jwtAuth, clientPub))
+ if (!NodeConfiguration.KeyStore.VerifyUpgradeToken(clientSignature, jwtAuth))
{
entity.CloseResponse(HttpStatusCode.Unauthorized);
return VfReturnType.VirtualSkip;
@@ -426,7 +412,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server
if (!string.IsNullOrWhiteSpace(state.NodeId))
{
//Get the event queue for the current node
- AsyncQueue<ChangeEvent> queue = PubSubManager.Subscribe(state);
+ IPeerEventQueue queue = PubSubManager.Subscribe(state);
try
{
@@ -470,44 +456,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server
Log.Debug("Server websocket exited");
}
-
-
- //Background worker to process event queue items
- async Task IAsyncBackgroundWork.DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken)
- {
- const int accumulatorSize = 64;
-
- try
- {
- //Accumulator for events
- ChangeEvent[] accumulator = new ChangeEvent[accumulatorSize];
- int ptr = 0;
-
- //Listen for changes
- while (true)
- {
- //Wait for next event
- accumulator[ptr++] = await Store.EventQueue.DequeueAsync(exitToken);
-
- //try to accumulate more events until we can't anymore
- while (Store.EventQueue.TryDequeue(out ChangeEvent? ev) && ptr < accumulatorSize)
- {
- accumulator[ptr++] = ev;
- }
-
- //Publish all events to subscribers
- PubSubManager.PublishMultiple(accumulator.AsSpan(0, ptr));
-
- //Reset pointer
- ptr = 0;
- }
- }
- catch (OperationCanceledException)
- {
- //Normal exit
- pluginLog.Debug("Change queue listener worker exited");
- }
- }
+
private class WsUserState : ICachePeer
{
@@ -516,7 +465,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server
public int MaxMessageSize { get; init; }
public int MaxResponseBufferSize { get; init; }
public string? NodeId { get; init; }
- public ICachePeerAdvertisment? Advertisment { get; init; }
+ public ICacheNodeAdvertisment? Advertisment { get; init; }
public override string ToString()
{
diff --git a/plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs b/plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs
index 670d624..90ffca0 100644
--- a/plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs
+++ b/plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs
@@ -23,8 +23,8 @@
*/
using System;
-using System.Linq;
using System.Net;
+using System.Linq;
using System.Text.Json;
using System.Threading.Tasks;
@@ -76,14 +76,12 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
using(JsonWebToken jwt = JsonWebToken.Parse(authToken))
{
//try to verify against cache node first
- if (!Config.Config.VerifyCache(jwt))
+ if (!Config.KeyStore.VerifyCachePeer(jwt))
{
//failed...
//try to verify against client key
- using ReadOnlyJsonWebKey clientPub = await Config.KeyStore.GetClientPublicKeyAsync();
-
- if (!jwt.VerifyFromJwk(clientPub))
+ if (!Config.KeyStore.VerifyJwt(jwt))
{
//invalid token
entity.CloseResponse(HttpStatusCode.Unauthorized);
@@ -97,7 +95,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
}
//Valid key, get peer list to send to client
- ICachePeerAdvertisment[] peers = PeerMonitor.GetAllPeers()
+ ICacheNodeAdvertisment[] peers = PeerMonitor.GetAllPeers()
.Where(static p => p.Advertisment != null)
.Select(static p => p.Advertisment!)
.ToArray();
@@ -106,7 +104,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
using JsonWebToken response = new();
//set header from cache config
- response.WriteHeader(Config.Config.GetJwtHeader());
+ response.WriteHeader(Config.KeyStore.GetJwtHeader());
response.InitPayloadClaim()
.AddClaim("iss", Config.Config.NodeId)
@@ -119,7 +117,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
.CommitClaims();
//Sign the response
- Config.Config.SignJwt(response);
+ Config.KeyStore.SignJwt(response);
//Send response to client
entity.CloseResponse(HttpStatusCode.OK, Net.Http.ContentType.Text, response.DataBuffer);
diff --git a/plugins/ObjectCacheServer/src/ICacheEventQueueManager.cs b/plugins/ObjectCacheServer/src/ICacheEventQueueManager.cs
index 6b07000..20edf0b 100644
--- a/plugins/ObjectCacheServer/src/ICacheEventQueueManager.cs
+++ b/plugins/ObjectCacheServer/src/ICacheEventQueueManager.cs
@@ -24,9 +24,6 @@
using System;
-using VNLib.Utils.Async;
-
-
namespace VNLib.Data.Caching.ObjectCache.Server
{
/// <summary>
@@ -49,14 +46,14 @@ namespace VNLib.Data.Caching.ObjectCache.Server
/// <summary>
/// Attatches a subscriber that will receive all published changes
/// </summary>
- /// <param name="nodeId">The id of the node to get the queue for</param>
+ /// <param name="peer">The peer node that wishes to subscribe for events</param>
/// <returns>The initilaizes event queue for the single subscriber</returns>
- AsyncQueue<ChangeEvent> Subscribe(ICachePeer peer);
+ IPeerEventQueue Subscribe(ICachePeer peer);
/// <summary>
/// Detatches a subscriber from the event queue
/// </summary>
- /// <param name="nodeId">The id of the nede to detach</param>
+ /// <param name="peer">The peer to unsubscribe from events</param>
void Unsubscribe(ICachePeer peer);
/// <summary>
diff --git a/plugins/ObjectCacheServer/src/ICachePeer.cs b/plugins/ObjectCacheServer/src/ICachePeer.cs
index d374400..97b406f 100644
--- a/plugins/ObjectCacheServer/src/ICachePeer.cs
+++ b/plugins/ObjectCacheServer/src/ICachePeer.cs
@@ -39,6 +39,6 @@ namespace VNLib.Data.Caching.ObjectCache.Server
/// <summary>
/// An optional signed advertisment message for other peers
/// </summary>
- ICachePeerAdvertisment? Advertisment { get; }
+ ICacheNodeAdvertisment? Advertisment { get; }
}
}
diff --git a/plugins/ObjectCacheServer/src/IPeerEventQueue.cs b/plugins/ObjectCacheServer/src/IPeerEventQueue.cs
new file mode 100644
index 0000000..63dbd3f
--- /dev/null
+++ b/plugins/ObjectCacheServer/src/IPeerEventQueue.cs
@@ -0,0 +1,50 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: ObjectCacheServer
+* File: CacheEventQueueManager.cs
+*
+* CacheEventQueueManager.cs is part of ObjectCacheServer which is
+* part of the larger VNLib collection of libraries and utilities.
+*
+* ObjectCacheServer is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* ObjectCacheServer is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System.Threading;
+using System.Threading.Tasks;
+
+
+namespace VNLib.Data.Caching.ObjectCache.Server
+{
+ /// <summary>
+ /// Represents a queue of events for a specific peer node
+ /// </summary>
+ internal interface IPeerEventQueue
+ {
+ /// <summary>
+ /// Dequeues an event from the queue asynchronously
+ /// </summary>
+ /// <param name="cancellation">A token to cancel the operation</param>
+ /// <returns>The value task that represents the wait</returns>
+ ValueTask<ChangeEvent> DequeueAsync(CancellationToken cancellation);
+
+ /// <summary>
+ /// Attemts to dequeue an event from the queue
+ /// </summary>
+ /// <param name="change">The change event that was dequeued if possible</param>
+ /// <returns>True if the event was dequeued</returns>
+ bool TryDequeue(out ChangeEvent change);
+ }
+}
diff --git a/plugins/ObjectCacheServer/src/NodeConfig.cs b/plugins/ObjectCacheServer/src/NodeConfig.cs
index 614f0d6..81b8a32 100644
--- a/plugins/ObjectCacheServer/src/NodeConfig.cs
+++ b/plugins/ObjectCacheServer/src/NodeConfig.cs
@@ -3,9 +3,9 @@
*
* Library: VNLib
* Package: ObjectCacheServer
-* File: ObjectCacheServerEntry.cs
+* File: NodeConfig.cs
*
-* ObjectCacheServerEntry.cs is part of ObjectCacheServer which is part of the larger
+* NodeConfig.cs is part of ObjectCacheServer which is part of the larger
* VNLib collection of libraries and utilities.
*
* ObjectCacheServer is free software: you can redistribute it and/or modify
@@ -25,63 +25,48 @@
using System;
using System.Net;
using System.Linq;
-using System.Net.Http;
using System.Text.Json;
-using System.Threading;
-using System.Net.Sockets;
using System.Threading.Tasks;
-using System.Collections.Generic;
-using System.Security.Cryptography;
using VNLib.Plugins;
using VNLib.Utils;
using VNLib.Utils.Logging;
-using VNLib.Utils.Extensions;
-using VNLib.Hashing;
-using VNLib.Hashing.IdentityUtility;
using VNLib.Data.Caching.Extensions;
using VNLib.Plugins.Extensions.Loading;
-
+using VNLib.Data.Caching.ObjectCache.Server.Endpoints;
namespace VNLib.Data.Caching.ObjectCache.Server
{
[ConfigurationName("cluster")]
- internal sealed class NodeConfig : VnDisposeable, IAsyncConfigurable, IAsyncBackgroundWork, IBrokerHeartbeatNotifier
+ internal sealed class NodeConfig : VnDisposeable
{
const string CacheConfigTemplate =
@"
Cluster Configuration:
- Broker Address: {ba}
- Heartbeat Timeout: {hb}
Node Id: {id}
TlsEndabled: {tls},
Cache Endpoint: {ep}
";
public CacheNodeConfiguration Config { get; }
- public CacheAuthKeyStore KeyStore { get; }
-
- private readonly ManualResetEventSlim hearbeatHandle;
- private readonly TimeSpan _hearbeatTimeout;
- private string? _authToken;
+ public CacheAuthKeyStore KeyStore { get; }
public NodeConfig(PluginBase plugin, IConfigScope config)
{
//Server id is just dns name for now
string nodeId = Dns.GetHostName();
- Config = new();
- //Get the heartbeat interval
- TimeSpan heartBeatDelayMs = config["heartbeat_timeout_sec"].GetTimeSpan(TimeParseType.Seconds);
-
- string brokerAddr = config["broker_address"].GetString() ?? throw new KeyNotFoundException("Missing required key 'broker_address' for config 'cluster'");
+ Config = new();
//Get the port of the primary webserver
int port;
bool usingTls;
{
- JsonElement firstHost = plugin.HostConfig.GetProperty("virtual_hosts").EnumerateArray().First();
+ //Get the port number of the first virtual host
+ JsonElement firstHost = plugin.HostConfig.GetProperty("virtual_hosts")
+ .EnumerateArray()
+ .First();
port = firstHost.GetProperty("interface")
.GetProperty("port")
@@ -91,147 +76,51 @@ namespace VNLib.Data.Caching.ObjectCache.Server
usingTls = firstHost.TryGetProperty("ssl", out _);
}
- //Get the cache endpoint config
- IConfigScope cacheEpConfig = plugin.GetConfigForType<ConnectEndpoint>();
-
//The endpoint to advertise to cache clients that allows cache connections
- UriBuilder endpoint = new(usingTls ? Uri.UriSchemeHttps : Uri.UriSchemeHttp, nodeId, port, cacheEpConfig["path"].GetString());
+ Uri cacheEndpoint = GetEndpointUri<ConnectEndpoint>(plugin, usingTls, port, nodeId);
+
+ //Init key store
+ KeyStore = new(plugin);
//Setup cache node config
- Config.WithCacheEndpoint(endpoint.Uri)
+ Config.WithCacheEndpoint(cacheEndpoint)
.WithNodeId(nodeId)
- .WithTls(usingTls)
- .WithBroker(new(brokerAddr));
+ .WithAuthenticator(KeyStore)
+ .WithTls(usingTls);
//Check if advertising is enabled
if(config.TryGetValue("advertise", out JsonElement adEl) && adEl.GetBoolean())
{
- Config.EnableAdvertisment(true, "");
- }
-
- //Init key store
- KeyStore = new(plugin);
-
- //Init heartbeat handle unsiganled waiting for first heartbeat
- hearbeatHandle = new(false);
-
- //Schedule heartbeat
- _ = plugin.ObserveWork(this, 500);
+ //Get the the broadcast endpoint
+ Uri discoveryEndpoint = GetEndpointUri<PeerDiscoveryEndpoint>(plugin, usingTls, port, nodeId);
+ //Enable advertising
+ Config.EnableAdvertisment(discoveryEndpoint);
+ }
+
+
//log the config
plugin.Log.Information(CacheConfigTemplate,
- brokerAddr,
- heartBeatDelayMs,
nodeId,
usingTls,
- endpoint.Uri);
+ cacheEndpoint
+ );
}
- async Task IAsyncConfigurable.ConfigureServiceAsync(PluginBase plugin)
+ private static Uri GetEndpointUri<T>(PluginBase plugin, bool usingTls, int port, string hostName) where T: IEndpoint
{
- //Get cache private key for signing from the key store
- ReadOnlyJsonWebKey signingKey = await KeyStore.GetCachePrivateAsync();
-
- Config.WithSigningKey(signingKey);
-
- //Get broker public key for verifying from the key store
- ReadOnlyJsonWebKey brokerKey = await KeyStore.GetBrokerPublicAsync();
+ //Get the cache endpoint config
+ IConfigScope cacheEpConfig = plugin.GetConfigForType<T>();
- Config.WithBrokerVerificationKey(brokerKey);
+ //The endpoint to advertise to cache clients that allows cache connections
+ return new UriBuilder(usingTls ? Uri.UriSchemeHttps : Uri.UriSchemeHttp, hostName, port, cacheEpConfig["path"].GetString()).Uri;
}
+
protected override void Free()
{
- //Dispose the heartbeat handle
- hearbeatHandle.Dispose();
-
//cleanup keys
- Config.SigningKey?.Dispose();
- Config.VerificationKey?.Dispose();
- Config.BrokerVerificationKey?.Dispose();
- }
-
- ///<inheritdoc/>
- public void HearbeatReceived()
- {
- //Set the heartbeat handle as received
- hearbeatHandle.Set();
+
}
-
- ///<inheritdoc/>
- public string? GetAuthToken() => _authToken;
-
- ///<inheritdoc/>
- public Uri GetBrokerAddress() => Config.DiscoveryEndpoint!;
-
- ///<inheritdoc/>
- public ReadOnlyJsonWebKey GetBrokerPublicKey() => Config.BrokerVerificationKey!;
-
-
- /*
- * Worker loop for registering with the broker and monitoring hearbeat requests
- */
- async Task IAsyncBackgroundWork.DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken)
- {
- //Listen in loop
- while (true)
- {
- try
- {
- //Regen the auth token before registering
- _authToken = RandomHash.GetRandomBase32(32);
-
- pluginLog.Information("Registering with cache broker server with id {id}", Config.NodeId);
-
- //Register with the broker and pass the current auth token
- await Config.RegisterWithBrokerAsync(_authToken);
-
- //Enter heartbeat loop
- while (true)
- {
- //Wait for the heartbeat timeout
- await Task.Delay(_hearbeatTimeout, exitToken);
-
- //Confrim the hearbeat was received within the timeout period
- if (!hearbeatHandle.IsSet)
- {
- //If the heartbeat handle is not set, the heartbeat was not received, reg-register
- pluginLog.Information("Broker missed hearbeat request");
-
- //not received, break out of the heartbeat loop to re-register
- break;
- }
-
- //Reset the handle and continue the heartbeat loop
- hearbeatHandle.Reset();
- }
-
- //Add random delay to prevent all nodes from re-registering at the same time
- await Task.Delay(RandomNumberGenerator.GetInt32(1000, 5000), exitToken);
- }
- catch (OperationCanceledException)
- {
- pluginLog.Debug("Registration loop exited on unload");
- break;
- }
- catch (TimeoutException)
- {
- pluginLog.Warn("Failed to connect to cache broker server within the specified timeout period");
- }
- catch (HttpRequestException re) when (re.InnerException is SocketException)
- {
- pluginLog.Warn("Cache broker is unavailable or network is unavailable");
- }
- catch(HttpRequestException re) when (re.StatusCode.HasValue)
- {
- pluginLog.Warn("Failed to register with cache broker server, received status code {code}", re.StatusCode);
- }
- catch (Exception ex)
- {
- pluginLog.Warn("Exception occured in registraion loop: {ex}", ex!.Message);
- }
- }
- }
-
}
}
diff --git a/plugins/ObjectCacheServer/src/ObjectCacheServer.csproj b/plugins/ObjectCacheServer/src/ObjectCacheServer.csproj
index 0c53095..e8d6291 100644
--- a/plugins/ObjectCacheServer/src/ObjectCacheServer.csproj
+++ b/plugins/ObjectCacheServer/src/ObjectCacheServer.csproj
@@ -37,8 +37,8 @@
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\..\..\..\core\lib\Plugins.PluginBase\src\VNLib.Plugins.PluginBase.csproj" />
+ <ProjectReference Include="..\..\..\..\Extensions\lib\VNLib.Plugins.Extensions.Loading\src\VNLib.Plugins.Extensions.Loading.csproj" />
<ProjectReference Include="..\..\..\lib\VNLib.Data.Caching.Extensions\src\VNLib.Data.Caching.Extensions.csproj" />
<ProjectReference Include="..\..\..\lib\VNLib.Data.Caching.ObjectCache\src\VNLib.Data.Caching.ObjectCache.csproj" />
- <ProjectReference Include="..\..\CacheBroker\src\CacheBroker.csproj" />
</ItemGroup>
</Project>
diff --git a/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs b/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs
index c1a6ad2..5d9a50b 100644
--- a/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs
+++ b/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs
@@ -23,80 +23,20 @@
*/
using System;
-using System.IO;
-using System.Net;
-using System.Linq;
-using System.Net.Http;
using System.Threading;
-using System.Net.Sockets;
-using System.Threading.Tasks;
using System.Collections.Generic;
-using System.Security.Cryptography;
using VNLib.Plugins;
using VNLib.Utils.Memory;
using VNLib.Utils.Logging;
using VNLib.Utils.Memory.Diagnostics;
-using VNLib.Hashing.IdentityUtility;
-using VNLib.Data.Caching.Extensions;
-using static VNLib.Data.Caching.Constants;
-using VNLib.Net.Messaging.FBM;
-using VNLib.Net.Messaging.FBM.Client;
-using VNLib.Plugins.Cache.Broker.Endpoints;
using VNLib.Plugins.Extensions.Loading;
using VNLib.Plugins.Extensions.Loading.Routing;
using VNLib.Data.Caching.ObjectCache.Server.Endpoints;
+
namespace VNLib.Data.Caching.ObjectCache.Server
{
- sealed record class CacheAuthKeyStore(PluginBase Plugin)
- {
- public Task<ReadOnlyJsonWebKey> GetCachePublicAsync()
- {
- return Plugin.TryGetSecretAsync("cache_private_key").ToJsonWebKey(true);
- }
-
- public Task<ReadOnlyJsonWebKey> GetCachePrivateAsync()
- {
- return Plugin.TryGetSecretAsync("cache_private_key").ToJsonWebKey(true);
- }
-
- public Task<ReadOnlyJsonWebKey> GetBrokerPublicAsync()
- {
- return Plugin.TryGetSecretAsync("broker_public_key").ToJsonWebKey(true);
- }
-
- public Task<ReadOnlyJsonWebKey> GetClientPublicKeyAsync()
- {
- return Plugin.TryGetSecretAsync("client_public_key").ToJsonWebKey(true);
- }
- }
-
- internal interface IBrokerHeartbeatNotifier
- {
- /// <summary>
- /// Called when the heartbeat endpoint receives a heartbeat from the broker
- /// </summary>
- void HearbeatReceived();
-
- /// <summary>
- /// Gets the current auth token sent to the broker, which is expected to be sent back in the heartbeat
- /// </summary>
- /// <returns>The heartbeat auth token if set</returns>
- string? GetAuthToken();
-
- /// <summary>
- /// Gets the address of the broker server
- /// </summary>
- /// <returns>The full address of the broker server to connect to</returns>
- Uri GetBrokerAddress();
-
- /// <summary>
- /// Gets the public key of the broker server
- /// </summary>
- /// <returns>The broker's public key</returns>
- ReadOnlyJsonWebKey GetBrokerPublicKey();
- }
public sealed class ObjectCacheServerEntry : PluginBase
{
@@ -136,26 +76,14 @@ namespace VNLib.Data.Caching.ObjectCache.Server
{
try
{
- //Setup Node config
- NodeConfig nodeConf = this.GetOrCreateSingleton<NodeConfig>();
-
//Init connect endpoint
- ConnectEndpoint endpoint = this.Route<ConnectEndpoint>();
-
- //Route the broker endpoint
- this.Route<BrokerHeartBeatEndpoint>();
+ this.Route<ConnectEndpoint>();
//Setup discovery endpoint
if(this.HasConfigForType<PeerDiscoveryEndpoint>())
{
this.Route<PeerDiscoveryEndpoint>();
- }
-
- ulong maxByteSize = ((ulong)endpoint.CacheConfig.MaxCacheEntries * (ulong)endpoint.CacheConfig.BucketCount * (ulong)endpoint.CacheConfig.MaxMessageSize);
-
- //Log max memory usage
- Log.Debug("Maxium memory consumption {mx}Mb", maxByteSize / (ulong)(1024 * 1000));
-
+ }
Log.Information("Plugin loaded");
}