aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--lib/VNLib.Data.Caching.Extensions/src/ApiModel/CacheSiteAdapter.cs65
-rw-r--r--lib/VNLib.Data.Caching.Extensions/src/ApiModel/DiscoveryRequest.cs (renamed from lib/VNLib.Data.Caching.Extensions/src/ActiveServer.cs)34
-rw-r--r--lib/VNLib.Data.Caching.Extensions/src/ApiModel/GetConfigRequest.cs42
-rw-r--r--lib/VNLib.Data.Caching.Extensions/src/ApiModel/ICacheConnectionRequest.cs (renamed from lib/VNLib.Data.Caching.Extensions/src/ICacheNodeAdvertisment.cs)29
-rw-r--r--lib/VNLib.Data.Caching.Extensions/src/ApiModel/NegotationRequest.cs41
-rw-r--r--lib/VNLib.Data.Caching.Extensions/src/ApiModel/ServiceEndpoints.cs176
-rw-r--r--lib/VNLib.Data.Caching.Extensions/src/Clustering/CacheClientConfiguration.cs (renamed from lib/VNLib.Data.Caching.Extensions/src/CacheClientConfiguration.cs)24
-rw-r--r--lib/VNLib.Data.Caching.Extensions/src/Clustering/CacheDiscoveryFailureException.cs44
-rw-r--r--lib/VNLib.Data.Caching.Extensions/src/Clustering/CacheNodeAdvertisment.cs96
-rw-r--r--lib/VNLib.Data.Caching.Extensions/src/Clustering/CacheNodeConfiguration.cs (renamed from lib/VNLib.Data.Caching.Extensions/src/CacheNodeConfiguration.cs)26
-rw-r--r--lib/VNLib.Data.Caching.Extensions/src/Clustering/ICacheDiscoveryErrorHandler.cs (renamed from lib/VNLib.Data.Caching.Extensions/src/ICacheDiscoveryErrorHandler.cs)4
-rw-r--r--lib/VNLib.Data.Caching.Extensions/src/Clustering/INodeDiscoveryCollection.cs (renamed from lib/VNLib.Data.Caching.Extensions/src/INodeDiscoveryCollection.cs)6
-rw-r--r--lib/VNLib.Data.Caching.Extensions/src/Clustering/INodeDiscoveryEnumerator.cs (renamed from lib/VNLib.Data.Caching.Extensions/src/INodeDiscoveryEnumerator.cs)7
-rw-r--r--lib/VNLib.Data.Caching.Extensions/src/Clustering/NodeDiscoveryCollection.cs (renamed from lib/VNLib.Data.Caching.Extensions/src/NodeDiscoveryCollection.cs)75
-rw-r--r--lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs413
-rw-r--r--lib/VNLib.Data.Caching.Extensions/src/ICacheAuthManager.cs6
-rw-r--r--lib/VNLib.Data.Caching.ObjectCache/src/BlobCache.cs4
-rw-r--r--lib/VNLib.Data.Caching.ObjectCache/src/CacheEntry.cs6
-rw-r--r--lib/VNLib.Plugins.Extensions.VNCache/src/RemoteCacheOperator.cs11
-rw-r--r--lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs56
-rw-r--r--lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClientConfig.cs59
-rw-r--r--plugins/ObjectCacheServer/src/CacheAuthKeyStore.cs17
-rw-r--r--plugins/ObjectCacheServer/src/CacheEventQueueManager.cs6
-rw-r--r--plugins/ObjectCacheServer/src/CacheListenerPubQueue.cs25
-rw-r--r--plugins/ObjectCacheServer/src/CacheStore.cs27
-rw-r--r--plugins/ObjectCacheServer/src/CacheSystemUtil.cs2
-rw-r--r--plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs (renamed from plugins/ObjectCacheServer/src/Distribution/CacheNodeReplicationMaanger.cs)158
-rw-r--r--plugins/ObjectCacheServer/src/Clustering/CachePeerMonitor.cs (renamed from plugins/ObjectCacheServer/src/Distribution/CachePeerMonitor.cs)32
-rw-r--r--plugins/ObjectCacheServer/src/Clustering/ICachePeerAdapter.cs (renamed from plugins/ObjectCacheServer/src/Distribution/ICachePeerAdapter.cs)10
-rw-r--r--plugins/ObjectCacheServer/src/Clustering/IPeerMonitor.cs (renamed from plugins/ObjectCacheServer/src/Distribution/IPeerMonitor.cs)5
-rw-r--r--plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs268
-rw-r--r--plugins/ObjectCacheServer/src/Distribution/KnownPeerList.cs100
-rw-r--r--plugins/ObjectCacheServer/src/Distribution/PeerDiscoveryManager.cs161
-rw-r--r--plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs71
-rw-r--r--plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs29
-rw-r--r--plugins/ObjectCacheServer/src/Endpoints/WellKnownEndpoint.cs111
-rw-r--r--plugins/ObjectCacheServer/src/ICachePeer.cs4
-rw-r--r--plugins/ObjectCacheServer/src/NodeConfig.cs63
-rw-r--r--plugins/ObjectCacheServer/src/ObjectCacheServer.csproj5
-rw-r--r--plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs8
40 files changed, 1519 insertions, 807 deletions
diff --git a/lib/VNLib.Data.Caching.Extensions/src/ApiModel/CacheSiteAdapter.cs b/lib/VNLib.Data.Caching.Extensions/src/ApiModel/CacheSiteAdapter.cs
new file mode 100644
index 0000000..99acfd5
--- /dev/null
+++ b/lib/VNLib.Data.Caching.Extensions/src/ApiModel/CacheSiteAdapter.cs
@@ -0,0 +1,65 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Data.Caching.Extensions
+* File: CacheSiteAdapter.cs
+*
+* CacheSiteAdapter.cs is part of VNLib.Data.Caching.Extensions which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* VNLib.Data.Caching.Extensions is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Data.Caching.Extensions is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System.Net;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+
+using RestSharp;
+
+using VNLib.Net.Rest.Client;
+using VNLib.Net.Rest.Client.Construction;
+
+namespace VNLib.Data.Caching.Extensions.ApiModel
+{
+ /// <summary>
+ /// A site adapter for cache REST api requests
+ /// </summary>
+ internal sealed class CacheSiteAdapter : RestSiteAdapterBase
+ {
+ protected override RestClientPool Pool { get; }
+
+ public CacheSiteAdapter(int maxClients)
+ {
+ //Configure connection pool
+ Pool = new(maxClients, new RestClientOptions()
+ {
+ MaxTimeout = 10 * 1000,
+ FollowRedirects = false,
+ Encoding = Encoding.UTF8,
+ AutomaticDecompression = DecompressionMethods.All,
+ ThrowOnAnyError = true
+ });
+ }
+
+ public override void OnResponse(RestResponse response)
+ { }
+
+ public override Task WaitAsync(CancellationToken cancellation = default)
+ {
+ return Task.CompletedTask;
+ }
+ }
+}
diff --git a/lib/VNLib.Data.Caching.Extensions/src/ActiveServer.cs b/lib/VNLib.Data.Caching.Extensions/src/ApiModel/DiscoveryRequest.cs
index 3020376..22b11aa 100644
--- a/lib/VNLib.Data.Caching.Extensions/src/ActiveServer.cs
+++ b/lib/VNLib.Data.Caching.Extensions/src/ApiModel/DiscoveryRequest.cs
@@ -3,9 +3,9 @@
*
* Library: VNLib
* Package: VNLib.Data.Caching.Extensions
-* File: ActiveServer.cs
+* File: DiscoveryRequest.cs
*
-* ActiveServer.cs is part of VNLib.Data.Caching.Extensions which is part of the larger
+* DiscoveryRequest.cs is part of VNLib.Data.Caching.Extensions which is part of the larger
* VNLib collection of libraries and utilities.
*
* VNLib.Data.Caching.Extensions is free software: you can redistribute it and/or modify
@@ -23,29 +23,19 @@
*/
using System;
-using System.Text.Json.Serialization;
+using VNLib.Data.Caching.Extensions.Clustering;
-namespace VNLib.Data.Caching.Extensions
+namespace VNLib.Data.Caching.Extensions.ApiModel
{
- public class ActiveServer : ICacheNodeAdvertisment
+ /// <summary>
+ /// A request message for a discovery request
+ /// </summary>
+ /// <param name="DiscoveryUrl">The discovery endpoint to connec to</param>
+ /// <param name="Config">The local client configuration</param>
+ internal record class DiscoveryRequest(Uri DiscoveryUrl, CacheClientConfiguration Config)
+ : ICacheConnectionRequest
{
- [JsonPropertyName("address")]
- public string? HostName { get; set; }
-
- public string? ServerId { get; set; }
- [JsonPropertyName("ip_address")]
- public string? Ip { get; set; }
-
- public Uri ConnectEndpoint { get; }
-
- public Uri? DiscoveryEndpoint { get; }
-
- [JsonPropertyName("server_id")]
- public string NodeId { get; }
-
- ///<inheritdoc/>
- public override int GetHashCode() => ServerId!.GetHashCode(StringComparison.OrdinalIgnoreCase);
///<inheritdoc/>
- public override bool Equals(object? obj) => obj is ActiveServer s && GetHashCode() == s.GetHashCode();
+ public string? Challenge { get; set; }
}
}
diff --git a/lib/VNLib.Data.Caching.Extensions/src/ApiModel/GetConfigRequest.cs b/lib/VNLib.Data.Caching.Extensions/src/ApiModel/GetConfigRequest.cs
new file mode 100644
index 0000000..43cef4c
--- /dev/null
+++ b/lib/VNLib.Data.Caching.Extensions/src/ApiModel/GetConfigRequest.cs
@@ -0,0 +1,42 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Data.Caching.Extensions
+* File: GetConfigRequest.cs
+*
+* GetConfigRequest.cs is part of VNLib.Data.Caching.Extensions which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* VNLib.Data.Caching.Extensions is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Data.Caching.Extensions is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+using VNLib.Data.Caching.Extensions.Clustering;
+
+namespace VNLib.Data.Caching.Extensions.ApiModel
+{
+ /// <summary>
+ /// A request to get the cache configuration from a cache server's
+ /// well-known configuration endpoint
+ /// </summary>
+ /// <param name="WellKnownEp">The well-known configuration endpoint url</param>
+ /// <param name="Config">The client cache configuration</param>
+ internal record class GetConfigRequest(Uri WellKnownEp, CacheClientConfiguration Config)
+ : ICacheConnectionRequest
+ {
+ ///<inheritdoc/>
+ public string? Challenge { get; set; }
+ }
+}
diff --git a/lib/VNLib.Data.Caching.Extensions/src/ICacheNodeAdvertisment.cs b/lib/VNLib.Data.Caching.Extensions/src/ApiModel/ICacheConnectionRequest.cs
index fc29955..5fb7ba7 100644
--- a/lib/VNLib.Data.Caching.Extensions/src/ICacheNodeAdvertisment.cs
+++ b/lib/VNLib.Data.Caching.Extensions/src/ApiModel/ICacheConnectionRequest.cs
@@ -3,9 +3,9 @@
*
* Library: VNLib
* Package: VNLib.Data.Caching.Extensions
-* File: ICacheNodeAdvertisment.cs
+* File: ICacheConnectionRequest.cs
*
-* ICacheNodeAdvertisment.cs is part of VNLib.Data.Caching.Extensions which is part of the larger
+* ICacheConnectionRequest.cs is part of VNLib.Data.Caching.Extensions which is part of the larger
* VNLib collection of libraries and utilities.
*
* VNLib.Data.Caching.Extensions is free software: you can redistribute it and/or modify
@@ -22,29 +22,26 @@
* along with this program. If not, see https://www.gnu.org/licenses/.
*/
-using System;
+using VNLib.Data.Caching.Extensions.Clustering;
-
-namespace VNLib.Data.Caching.Extensions
+namespace VNLib.Data.Caching.Extensions.ApiModel
{
/// <summary>
- /// Represents a node that can be advertised to clients
+ /// Represents a request to connect to a cache server.
/// </summary>
- public interface ICacheNodeAdvertisment
+ internal interface ICacheConnectionRequest
{
/// <summary>
- /// The endpoint for clients to connect to to access the cache
- /// </summary>
- Uri ConnectEndpoint { get; }
-
- /// <summary>
- /// Gets the address for clients to connect to to discover other discovertable nodes
+ /// The <see cref="CacheClientConfiguration"/> used to configure, authenticate, and
+ /// verify messages sent to and received from cache servers.
/// </summary>
- Uri? DiscoveryEndpoint { get; }
+ CacheClientConfiguration Config { get; }
/// <summary>
- /// Gets the unique identifier for this node
+ /// An optional challenge string to be used during the authentication
+ /// process. When set, is sent in the request JWT, and is expected to
+ /// be returned in the response JWT.
/// </summary>
- string NodeId { get; }
+ string? Challenge { get; set; }
}
}
diff --git a/lib/VNLib.Data.Caching.Extensions/src/ApiModel/NegotationRequest.cs b/lib/VNLib.Data.Caching.Extensions/src/ApiModel/NegotationRequest.cs
new file mode 100644
index 0000000..5842586
--- /dev/null
+++ b/lib/VNLib.Data.Caching.Extensions/src/ApiModel/NegotationRequest.cs
@@ -0,0 +1,41 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Data.Caching.Extensions
+* File: NegotationRequest.cs
+*
+* NegotationRequest.cs is part of VNLib.Data.Caching.Extensions which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* VNLib.Data.Caching.Extensions is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Data.Caching.Extensions is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+using VNLib.Data.Caching.Extensions.Clustering;
+
+namespace VNLib.Data.Caching.Extensions.ApiModel
+{
+ /// <summary>
+ /// A request to negotiate a new connection with a cache server
+ /// </summary>
+ /// <param name="ConnectUrl">The cache endpoint uri to connec to</param>
+ /// <param name="Config">The client cache configuration</param>
+ internal record class NegotationRequest(Uri ConnectUrl, CacheClientConfiguration Config)
+ : ICacheConnectionRequest
+ {
+ ///<inheritdoc/>
+ public string? Challenge { get; set; }
+ }
+}
diff --git a/lib/VNLib.Data.Caching.Extensions/src/ApiModel/ServiceEndpoints.cs b/lib/VNLib.Data.Caching.Extensions/src/ApiModel/ServiceEndpoints.cs
new file mode 100644
index 0000000..c0de4a3
--- /dev/null
+++ b/lib/VNLib.Data.Caching.Extensions/src/ApiModel/ServiceEndpoints.cs
@@ -0,0 +1,176 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Data.Caching.Extensions
+* File: ServiceEndpoints.cs
+*
+* ServiceEndpoints.cs is part of VNLib.Data.Caching.Extensions which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* VNLib.Data.Caching.Extensions is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Data.Caching.Extensions is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+using System.Security;
+using System.Text.Json;
+
+using RestSharp;
+
+using VNLib.Net.Http;
+using VNLib.Hashing;
+using VNLib.Hashing.IdentityUtility;
+using VNLib.Net.Rest.Client.Construction;
+using ContentType = VNLib.Net.Http.ContentType;
+using VNLib.Data.Caching.Extensions.Clustering;
+
+namespace VNLib.Data.Caching.Extensions.ApiModel
+{
+ /*
+ * Defines the cache endpoints, builds and routes request messages to the
+ * server enpoints. In effect defines the client api for cache services.
+ *
+ * This class also define methods for authentication and message verification
+ */
+
+ internal static class ServiceEndpoints
+ {
+ private static readonly TimeSpan MaxTimeDisparity = TimeSpan.FromSeconds(10);
+
+ /// <summary>
+ /// Get the endpoint definition for cache services for the site adapter
+ /// </summary>
+ internal static IRestEndpointDefinition Definition { get; } = new EndpointBuilder();
+
+ private class EndpointBuilder : IRestEndpointDefinition
+ {
+ ///<inheritdoc/>
+ public void BuildRequest(IRestSiteAdapter site, IRestEndpointBuilder builder)
+ {
+ //Define cache service endpoints/requests
+
+ builder.WithEndpoint<DiscoveryRequest>()
+ .WithUrl(e => e.DiscoveryUrl)
+ .WithMethod(Method.Get)
+ //Accept text response (it should be a jwt)
+ .WithHeader("Accept", HttpHelpers.GetContentTypeString(ContentType.Text))
+ .WithHeader("Authorization", BuildDiscoveryAuthToken)
+ //Verify jwt responses
+ .OnResponse(VerifyJwtResponse);
+
+ builder.WithEndpoint<NegotationRequest>()
+ .WithUrl(e => e.ConnectUrl)
+ .WithMethod(Method.Get)
+ //Accept text response (its should be a jwt)
+ .WithHeader("Accept", HttpHelpers.GetContentTypeString(ContentType.Text))
+ .WithHeader("Authorization", BuildDiscoveryAuthToken)
+ //Verify jwt responses
+ .OnResponse(VerifyJwtResponse);
+
+ //Well known endpoint does not require authentication
+ builder.WithEndpoint<GetConfigRequest>()
+ .WithUrl(gc => gc.WellKnownEp)
+ .WithMethod(Method.Get)
+ //Responses should be a signed jwt
+ .WithHeader("Accept", HttpHelpers.GetContentTypeString(ContentType.Text))
+ //Verify jwt responses
+ .OnResponse(VerifyJwtResponse);
+ }
+ }
+
+
+ private static string BuildDiscoveryAuthToken(ICacheConnectionRequest request)
+ {
+ request.Challenge = RandomHash.GetRandomBase32(24);
+
+ //Build request jwt
+ using JsonWebToken jwt = new();
+ jwt.WriteHeader(request.Config.AuthManager.GetJwtHeader());
+
+ //See if the supplied config is for a cache node
+ CacheNodeConfiguration? cnc = request.Config as CacheNodeConfiguration;
+
+ //Init claim
+ JwtPayload claim = jwt.InitPayloadClaim();
+
+ claim.AddClaim("chl", request.Challenge)
+ .AddClaim("iat", DateTimeOffset.UtcNow.ToUnixTimeMilliseconds());
+
+ if (!string.IsNullOrWhiteSpace(cnc?.NodeId))
+ {
+ /*
+ * The unique node id so the other nodes know to load the
+ * proper event queue for the current server
+ */
+ claim.AddClaim("sub", cnc.NodeId);
+ }
+
+ claim.CommitClaims();
+
+ //sign the jwt
+ request.Config.AuthManager.SignJwt(jwt);
+
+ //Compile the jwt
+ return jwt.Compile();
+ }
+
+ private static void VerifyJwtResponse(ICacheConnectionRequest req, RestResponse response)
+ {
+ byte[] data = response.RawBytes ?? throw new ArgumentException("Server response was empty, cannot continue");
+
+ //If node config then set the is-node flag
+ bool isNode = req.Config is CacheNodeConfiguration;
+
+ //Response is jwt
+ using JsonWebToken responseJwt = JsonWebToken.ParseRaw(data);
+
+ //Verify the jwt
+ if (!req.Config.AuthManager.VerifyJwt(responseJwt, isNode))
+ {
+ throw new SecurityException("Failed to verify the discovery server's challenge, cannot continue");
+ }
+
+ //get payload as a document
+ using JsonDocument doc = responseJwt.GetPayload();
+
+ //Verify iat times
+ long iatSec = doc.RootElement.GetProperty("iat").GetInt64();
+
+ //Get dto
+ DateTimeOffset iat = DateTimeOffset.FromUnixTimeSeconds(iatSec);
+
+ DateTimeOffset now = DateTimeOffset.UtcNow;
+
+ //Verify iat is not before or after the current time with the disparity
+ if (iat.Add(MaxTimeDisparity) < now || iat.Subtract(MaxTimeDisparity) > now)
+ {
+ throw new SecurityException("Server returned a request that has expired. Please check your system clock");
+ }
+
+ //If a challenge is set, verify it
+ if (req.Challenge != null)
+ {
+ //Verify challenge
+ string challenge = doc.RootElement.GetProperty("chl").GetString()
+ ?? throw new SecurityException("Server did not return a challenge");
+
+ if (!challenge.Equals(req.Challenge, StringComparison.Ordinal))
+ {
+ throw new SecurityException("Server returned an invalid challenge");
+ }
+ }
+ }
+
+ }
+}
diff --git a/lib/VNLib.Data.Caching.Extensions/src/CacheClientConfiguration.cs b/lib/VNLib.Data.Caching.Extensions/src/Clustering/CacheClientConfiguration.cs
index 9229c89..1c1997e 100644
--- a/lib/VNLib.Data.Caching.Extensions/src/CacheClientConfiguration.cs
+++ b/lib/VNLib.Data.Caching.Extensions/src/Clustering/CacheClientConfiguration.cs
@@ -22,11 +22,14 @@
* along with this program. If not, see https://www.gnu.org/licenses/.
*/
+using System;
using System.Linq;
using System.Collections.Generic;
-namespace VNLib.Data.Caching.Extensions
+
+namespace VNLib.Data.Caching.Extensions.Clustering
{
+
/// <summary>
/// A fluent api configuration object for configuring a <see cref="FBMClient"/>
/// to connect to cache servers.
@@ -53,7 +56,7 @@ namespace VNLib.Data.Caching.Extensions
/// </summary>
public bool UseTls { get; private set; }
- internal ICacheNodeAdvertisment[]? InitialPeers { get; set; }
+ internal Uri[]? WellKnownNodes { get; set; }
/// <summary>
/// Specifies the JWT authentication manager to use for signing and verifying JWTs
@@ -73,7 +76,7 @@ namespace VNLib.Data.Caching.Extensions
public CacheClientConfiguration WithTls(bool useTls)
{
UseTls = useTls;
- return this;
+ return this;
}
/// <summary>
@@ -81,9 +84,20 @@ namespace VNLib.Data.Caching.Extensions
/// </summary>
/// <param name="peers">The collection of servers to discover peers from and connect to</param>
/// <returns>Chainable fluent object</returns>
- public CacheClientConfiguration WithInitialPeers(IEnumerable<ICacheNodeAdvertisment> peers)
+ public CacheClientConfiguration WithInitialPeers(IEnumerable<Uri> peers)
{
- InitialPeers = peers.ToArray();
+ //Check null
+ _ = peers ?? throw new ArgumentNullException(nameof(peers));
+
+ //Store peer array
+ WellKnownNodes = peers.ToArray();
+
+ if (WellKnownNodes.Any(p => !p.IsAbsoluteUri))
+ {
+ WellKnownNodes = null;
+ throw new ArgumentException("All discoverable node uris must be in absolute form");
+ }
+
return this;
}
diff --git a/lib/VNLib.Data.Caching.Extensions/src/Clustering/CacheDiscoveryFailureException.cs b/lib/VNLib.Data.Caching.Extensions/src/Clustering/CacheDiscoveryFailureException.cs
new file mode 100644
index 0000000..84d611e
--- /dev/null
+++ b/lib/VNLib.Data.Caching.Extensions/src/Clustering/CacheDiscoveryFailureException.cs
@@ -0,0 +1,44 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Data.Caching.Extensions
+* File: CacheDiscoveryFailureException.cs
+*
+* CacheDiscoveryFailureException.cs is part of VNLib.Data.Caching.Extensions which is
+* part of the larger VNLib collection of libraries and utilities.
+*
+* VNLib.Data.Caching.Extensions is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Data.Caching.Extensions is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+
+
+namespace VNLib.Data.Caching.Extensions.Clustering
+{
+ /// <summary>
+ /// Raised when an error occurs during cache discovery
+ /// </summary>
+ public class CacheDiscoveryFailureException : Exception
+ {
+ public CacheDiscoveryFailureException(string message) : base(message)
+ { }
+
+ public CacheDiscoveryFailureException(string message, Exception innerException) : base(message, innerException)
+ { }
+
+ public CacheDiscoveryFailureException()
+ { }
+ }
+}
diff --git a/lib/VNLib.Data.Caching.Extensions/src/Clustering/CacheNodeAdvertisment.cs b/lib/VNLib.Data.Caching.Extensions/src/Clustering/CacheNodeAdvertisment.cs
new file mode 100644
index 0000000..e81d506
--- /dev/null
+++ b/lib/VNLib.Data.Caching.Extensions/src/Clustering/CacheNodeAdvertisment.cs
@@ -0,0 +1,96 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: VNLib.Data.Caching.Extensions
+* File: CacheNodeAdvertisment.cs
+*
+* CacheNodeAdvertisment.cs is part of VNLib.Data.Caching.Extensions which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* VNLib.Data.Caching.Extensions is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* VNLib.Data.Caching.Extensions is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+using System.Text.Json.Serialization;
+
+namespace VNLib.Data.Caching.Extensions.Clustering
+{
+ /// <summary>
+ /// Represents a node that can be advertised to clients
+ /// </summary>
+ public class CacheNodeAdvertisment : IEquatable<CacheNodeAdvertisment>
+ {
+ /// <summary>
+ /// The endpoint for clients to connect to to access the cache
+ /// </summary>
+ [JsonIgnore]
+ public Uri? ConnectEndpoint { get; set; }
+
+ /// <summary>
+ /// Gets the address for clients to connect to to discover other discovertable nodes
+ /// </summary>
+ [JsonIgnore]
+ public Uri? DiscoveryEndpoint { get; set; }
+
+ /// <summary>
+ /// Gets the unique identifier for this node
+ /// </summary>
+ [JsonPropertyName("iss")]
+ public string NodeId { get; set; }
+
+ [JsonPropertyName("url")]
+ public string? url
+ {
+ get => ConnectEndpoint?.ToString();
+ set => ConnectEndpoint = value == null ? null : new Uri(value);
+ }
+
+ [JsonPropertyName("dis")]
+ public string? dis
+ {
+ get => DiscoveryEndpoint?.ToString();
+ set => DiscoveryEndpoint = value == null ? null : new Uri(value);
+ }
+
+ /// <summary>
+ /// Determines if the given node is equal to this node, by comparing the node ids
+ /// </summary>
+ /// <param name="obj">The other node advertisment to compare</param>
+ /// <returns>True if the nodes are equal, false otherwise</returns>
+ public override bool Equals(object? obj) => obj is CacheNodeAdvertisment ad && Equals(ad);
+
+ /// <summary>
+ /// Gets the hash code for this node, based on the node id
+ /// </summary>
+ /// <returns>The instance hash-code</returns>
+ public override int GetHashCode() => string.GetHashCode(NodeId, StringComparison.OrdinalIgnoreCase);
+
+ /// <summary>
+ /// Determines if the given node is equal to this node, by comparing the node ids
+ /// </summary>
+ /// <param name="other">The other node advertisment to compare</param>
+ /// <returns>True if the nodes are equal, false otherwise</returns>
+ public bool Equals(CacheNodeAdvertisment? other) => string.Equals(NodeId, other?.NodeId, StringComparison.OrdinalIgnoreCase);
+
+ /// <summary>
+ /// Formats a string representation of this node
+ /// </summary>
+ /// <returns>The formatted information string</returns>
+ public override string ToString()
+ {
+ return $"NodeId: {NodeId} Connect: {url}, Discover?: {dis}";
+ }
+ }
+}
diff --git a/lib/VNLib.Data.Caching.Extensions/src/CacheNodeConfiguration.cs b/lib/VNLib.Data.Caching.Extensions/src/Clustering/CacheNodeConfiguration.cs
index 29a763c..6b7ab48 100644
--- a/lib/VNLib.Data.Caching.Extensions/src/CacheNodeConfiguration.cs
+++ b/lib/VNLib.Data.Caching.Extensions/src/Clustering/CacheNodeConfiguration.cs
@@ -24,13 +24,13 @@
using System;
-namespace VNLib.Data.Caching.Extensions
+namespace VNLib.Data.Caching.Extensions.Clustering
{
/// <summary>
/// A cache configuration for cache servers (nodes)
/// </summary>
- public class CacheNodeConfiguration: CacheClientConfiguration, ICacheNodeAdvertisment
+ public class CacheNodeConfiguration : CacheClientConfiguration
{
/// <summary>
/// The address for clients to connect to
@@ -49,6 +49,22 @@ namespace VNLib.Data.Caching.Extensions
public Uri? DiscoveryEndpoint { get; private set; }
/// <summary>
+ /// Gets the configuration for this node as an advertisment
+ /// </summary>
+ public CacheNodeAdvertisment Advertisment
+ {
+ get
+ {
+ return new CacheNodeAdvertisment()
+ {
+ DiscoveryEndpoint = DiscoveryEndpoint,
+ ConnectEndpoint = ConnectEndpoint,
+ NodeId = NodeId
+ };
+ }
+ }
+
+ /// <summary>
/// Sets the full address of our cache endpoint for clients to connect to
/// </summary>
/// <param name="connectUri">The uri clients will attempt to connect to</param>
@@ -80,9 +96,13 @@ namespace VNLib.Data.Caching.Extensions
/// <param name="nodeId">The cluster node id of the current server</param>
/// <returns>Chainable fluent object</returns>
/// <exception cref="ArgumentNullException"></exception>
- public CacheClientConfiguration WithNodeId(string nodeId)
+ public CacheNodeConfiguration WithNodeId(string nodeId)
{
NodeId = nodeId ?? throw new ArgumentNullException(nameof(nodeId));
+
+ //Update the node id in the node collection
+ (NodeCollection as NodeDiscoveryCollection)!.SetSelfId(nodeId);
+
return this;
}
diff --git a/lib/VNLib.Data.Caching.Extensions/src/ICacheDiscoveryErrorHandler.cs b/lib/VNLib.Data.Caching.Extensions/src/Clustering/ICacheDiscoveryErrorHandler.cs
index 3493d48..984ce3d 100644
--- a/lib/VNLib.Data.Caching.Extensions/src/ICacheDiscoveryErrorHandler.cs
+++ b/lib/VNLib.Data.Caching.Extensions/src/Clustering/ICacheDiscoveryErrorHandler.cs
@@ -24,7 +24,7 @@
using System;
-namespace VNLib.Data.Caching.Extensions
+namespace VNLib.Data.Caching.Extensions.Clustering
{
/// <summary>
/// Represents an type that will handle errors that occur during the discovery process
@@ -36,6 +36,6 @@ namespace VNLib.Data.Caching.Extensions
/// </summary>
/// <param name="errorNode">The node that the error occured on</param>
/// <param name="ex">The exception that caused the invocation</param>
- void OnDiscoveryError(ICacheNodeAdvertisment errorNode, Exception ex);
+ void OnDiscoveryError(CacheNodeAdvertisment errorNode, Exception ex);
}
}
diff --git a/lib/VNLib.Data.Caching.Extensions/src/INodeDiscoveryCollection.cs b/lib/VNLib.Data.Caching.Extensions/src/Clustering/INodeDiscoveryCollection.cs
index 9adebdc..1f4d154 100644
--- a/lib/VNLib.Data.Caching.Extensions/src/INodeDiscoveryCollection.cs
+++ b/lib/VNLib.Data.Caching.Extensions/src/Clustering/INodeDiscoveryCollection.cs
@@ -25,7 +25,7 @@
using System.Collections.Generic;
-namespace VNLib.Data.Caching.Extensions
+namespace VNLib.Data.Caching.Extensions.Clustering
{
/// <summary>
/// Represents a collection of discovered nodes
@@ -45,13 +45,13 @@ namespace VNLib.Data.Caching.Extensions
/// </summary>
/// <param name="initialPeers">An initial collection of peers to add to the enumeration</param>
/// <returns>An enumerator that simplifies discovery of unique nodes</returns>
- INodeDiscoveryEnumerator BeginDiscovery(IEnumerable<ICacheNodeAdvertisment> initialPeers);
+ INodeDiscoveryEnumerator BeginDiscovery(IEnumerable<CacheNodeAdvertisment> initialPeers);
/// <summary>
/// Gets a snapshot of all discovered nodes in the current collection.
/// </summary>
/// <returns>The current collection of notes</returns>
- ICacheNodeAdvertisment[] GetAllNodes();
+ CacheNodeAdvertisment[] GetAllNodes();
/// <summary>
/// Completes a discovery process and updates the collection with the results
diff --git a/lib/VNLib.Data.Caching.Extensions/src/INodeDiscoveryEnumerator.cs b/lib/VNLib.Data.Caching.Extensions/src/Clustering/INodeDiscoveryEnumerator.cs
index f6d5f40..677088a 100644
--- a/lib/VNLib.Data.Caching.Extensions/src/INodeDiscoveryEnumerator.cs
+++ b/lib/VNLib.Data.Caching.Extensions/src/Clustering/INodeDiscoveryEnumerator.cs
@@ -25,18 +25,17 @@
using System.Collections.Generic;
-
-namespace VNLib.Data.Caching.Extensions
+namespace VNLib.Data.Caching.Extensions.Clustering
{
/// <summary>
/// A custom enumerator for the node discovery process
/// </summary>
- public interface INodeDiscoveryEnumerator : IEnumerator<ICacheNodeAdvertisment>
+ public interface INodeDiscoveryEnumerator : IEnumerator<CacheNodeAdvertisment>
{
/// <summary>
/// Adds the specified peer to the collection of discovered peers
/// </summary>
/// <param name="discoveredPeers">The peer collection</param>
- void OnPeerDiscoveryComplete(IEnumerable<ICacheNodeAdvertisment> discoveredPeers);
+ void OnPeerDiscoveryComplete(IEnumerable<CacheNodeAdvertisment> discoveredPeers);
}
}
diff --git a/lib/VNLib.Data.Caching.Extensions/src/NodeDiscoveryCollection.cs b/lib/VNLib.Data.Caching.Extensions/src/Clustering/NodeDiscoveryCollection.cs
index 305f5de..b0e53e1 100644
--- a/lib/VNLib.Data.Caching.Extensions/src/NodeDiscoveryCollection.cs
+++ b/lib/VNLib.Data.Caching.Extensions/src/Clustering/NodeDiscoveryCollection.cs
@@ -3,9 +3,9 @@
*
* Library: VNLib
* Package: VNLib.Data.Caching.Extensions
-* File: INodeDiscoveryCollection.cs
+* File: NodeDiscoveryCollection.cs
*
-* INodeDiscoveryCollection.cs is part of VNLib.Data.Caching.Extensions which is part of the larger
+* NodeDiscoveryCollection.cs is part of VNLib.Data.Caching.Extensions which is part of the larger
* VNLib collection of libraries and utilities.
*
* VNLib.Data.Caching.Extensions is free software: you can redistribute it and/or modify
@@ -24,17 +24,18 @@
using System;
using System.Linq;
-using System.Collections.Generic;
using System.Collections;
+using System.Collections.Generic;
-namespace VNLib.Data.Caching.Extensions
+namespace VNLib.Data.Caching.Extensions.Clustering
{
/// <summary>
/// Represents a collection of available cache nodes from a discovery process
/// </summary>
public sealed class NodeDiscoveryCollection : INodeDiscoveryCollection
{
- private LinkedList<ICacheNodeAdvertisment> _peers;
+ private string? _selfId;
+ private LinkedList<CacheNodeAdvertisment> _peers;
/// <summary>
/// Initializes a new empty <see cref="NodeDiscoveryCollection"/>
@@ -44,17 +45,39 @@ namespace VNLib.Data.Caching.Extensions
_peers = new();
}
+ /// <summary>
+ /// Manually adds nodes to the collection that were not discovered through the discovery process
+ /// </summary>
+ /// <param name="nodes">The nodes to add</param>
+ public void AddManualNodes(IEnumerable<CacheNodeAdvertisment> nodes)
+ {
+ //Get only the nodes that are not already in the collection
+ IEnumerable<CacheNodeAdvertisment> newPeers = nodes.Except(_peers);
+
+ //Add them to the end of the collection
+ foreach (CacheNodeAdvertisment peer in newPeers)
+ {
+ _peers.AddLast(peer);
+ }
+ }
+
+ /// <summary>
+ /// Sets the id of the current node, so it can be excluded from discovery
+ /// </summary>
+ /// <param name="selfId">The id of the current node to exclude</param>
+ public void SetSelfId(string? selfId) => _selfId = selfId;
+
///<inheritdoc/>
public INodeDiscoveryEnumerator BeginDiscovery()
{
- return new NodeEnumerator(new());
+ return new NodeEnumerator(new(), _selfId);
}
///<inheritdoc/>
- public INodeDiscoveryEnumerator BeginDiscovery(IEnumerable<ICacheNodeAdvertisment> initialPeers)
+ public INodeDiscoveryEnumerator BeginDiscovery(IEnumerable<CacheNodeAdvertisment> initialPeers)
{
//Init new enumerator with the initial peers
- return new NodeEnumerator(new(initialPeers));
+ return new NodeEnumerator(new(initialPeers), _selfId);
}
///<inheritdoc/>
@@ -67,38 +90,54 @@ namespace VNLib.Data.Caching.Extensions
}
///<inheritdoc/>
- public ICacheNodeAdvertisment[] GetAllNodes()
+ public CacheNodeAdvertisment[] GetAllNodes()
{
//Capture all current peers
return _peers.ToArray();
}
- private sealed record class NodeEnumerator(LinkedList<ICacheNodeAdvertisment> Peers) : INodeDiscoveryEnumerator
+ private sealed record class NodeEnumerator(LinkedList<CacheNodeAdvertisment> Peers, string? SelfNodeId) : INodeDiscoveryEnumerator
{
+ private bool isInit;
+
//Keep track of the current node in the collection so we can move down the list
- private LinkedListNode<ICacheNodeAdvertisment>? _currentNode = Peers.First;
+ private LinkedListNode<CacheNodeAdvertisment>? _currentNode;
- public ICacheNodeAdvertisment Current => _currentNode?.Value;
+ public CacheNodeAdvertisment Current => _currentNode?.Value;
object IEnumerator.Current => _currentNode?.Value;
///<inheritdoc/>
public bool MoveNext()
{
- //Move to the next peer in the collection
- _currentNode = _currentNode?.Next;
+ if (!isInit)
+ {
+ _currentNode = Peers.First;
+ isInit = true;
+ }
+ else
+ {
+ //Move to the next peer in the collection
+ _currentNode = _currentNode?.Next;
+ }
return _currentNode?.Value != null;
}
///<inheritdoc/>
- public void OnPeerDiscoveryComplete(IEnumerable<ICacheNodeAdvertisment> discoveredPeers)
+ public void OnPeerDiscoveryComplete(IEnumerable<CacheNodeAdvertisment> discoveredPeers)
{
- //Get only the peers from the discovery that are not already in the collection
- IEnumerable<ICacheNodeAdvertisment> newPeers = discoveredPeers.Except(Peers);
+ //Get only the peers from the discovery that are not already in the collection, or ourselves
+ IEnumerable<CacheNodeAdvertisment> newPeers = discoveredPeers.Except(Peers);
+
+ if (!string.IsNullOrWhiteSpace(SelfNodeId))
+ {
+ //remove ourselves from the list
+ newPeers = newPeers.Where(p => !SelfNodeId.Equals(p.NodeId, StringComparison.OrdinalIgnoreCase));
+ }
//Add them to the end of the collection
- foreach (ICacheNodeAdvertisment ad in newPeers)
+ foreach (CacheNodeAdvertisment ad in newPeers)
{
Peers.AddLast(ad);
}
diff --git a/lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs b/lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs
index 634b6de..d53431a 100644
--- a/lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs
+++ b/lib/VNLib.Data.Caching.Extensions/src/FBMDataCacheExtensions.cs
@@ -24,7 +24,6 @@
using System;
using System.Net;
-using System.Text;
using System.Linq;
using System.Security;
using System.Text.Json;
@@ -32,26 +31,24 @@ using System.Threading;
using System.Threading.Tasks;
using System.Collections.Generic;
using System.Security.Cryptography;
-using System.Text.Json.Serialization;
using System.Runtime.CompilerServices;
using RestSharp;
-using VNLib.Net.Http;
using VNLib.Hashing;
using VNLib.Hashing.IdentityUtility;
using VNLib.Utils.Memory;
using VNLib.Utils.Logging;
using VNLib.Utils.Extensions;
-using VNLib.Net.Rest.Client;
using VNLib.Net.Messaging.FBM;
using VNLib.Net.Messaging.FBM.Client;
-using ContentType = VNLib.Net.Http.ContentType;
-
+using VNLib.Net.Rest.Client.Construction;
+using VNLib.Data.Caching.Extensions.ApiModel;
+using VNLib.Data.Caching.Extensions.Clustering;
namespace VNLib.Data.Caching.Extensions
{
-
+
/// <summary>
/// Provides extension methods for FBM data caching using
/// cache servers and brokers
@@ -78,14 +75,18 @@ namespace VNLib.Data.Caching.Extensions
/// </summary>
public const string X_NODE_DISCOVERY_HEADER = "X-Cache-Node-Discovery";
- private static readonly RestClientPool ClientPool = new(2,new RestClientOptions()
+ /*
+ * Lazy to defer errors for debuggong
+ */
+ private static readonly Lazy<CacheSiteAdapter> SiteAdapter = new(() => ConfigureAdapter(2));
+
+ private static CacheSiteAdapter ConfigureAdapter(int maxClients)
{
- MaxTimeout = 10 * 1000,
- FollowRedirects = false,
- Encoding = Encoding.UTF8,
- AutomaticDecompression = DecompressionMethods.All,
- ThrowOnAnyError = true,
- });
+ CacheSiteAdapter adapter = new(maxClients);
+ //Configure the site endpoints
+ adapter.BuildEndpoints(ServiceEndpoints.Definition);
+ return adapter;
+ }
private static readonly ConditionalWeakTable<FBMClient, CacheClientConfiguration> ClientCacheConfig = new();
@@ -138,29 +139,85 @@ namespace VNLib.Data.Caching.Extensions
{
client.Config.DebugLog?.Debug("{debug}: {data}", "[CACHE]", message);
}
-
/// <summary>
- /// Discovers ALL possible cache nodes itteritivley from the current collection of initial peers.
+ /// Discovers ALL possible cache nodes itteritivley, first by collecting the configuration
+ /// from the initial peers.
/// This will make connections to all discoverable servers
/// </summary>
/// <param name="config"></param>
/// <param name="cancellation">A token to cancel the operation</param>
/// <returns></returns>
/// <exception cref="ArgumentException"></exception>
+ /// <exception cref="CacheDiscoveryFailureException"></exception>
public static async Task DiscoverNodesAsync(this CacheClientConfiguration config, CancellationToken cancellation)
{
//Make sure at least one node defined
- if(config?.InitialPeers == null || config.InitialPeers.Length == 0)
+ if(config?.WellKnownNodes == null || config.WellKnownNodes.Length == 0)
+ {
+ throw new ArgumentException("There must be at least one cache node defined in the client configuration");
+ }
+
+ //Get the initial advertisments that arent null
+ CacheNodeAdvertisment[] initialPeers = await ResolveWellKnownAsync(config, cancellation);
+
+ if (initialPeers.Length == 0)
+ {
+ throw new CacheDiscoveryFailureException("There must be at least one available cache node to continue discovery");
+ }
+
+ await DiscoverNodesAsync(config, initialPeers, cancellation);
+ }
+
+ /// <summary>
+ /// Resolves the initial well-known cache nodes into their advertisments
+ /// </summary>
+ /// <param name="config"></param>
+ /// <param name="cancellation">A token to cancel the operation</param>
+ /// <returns>An array of resolved nodes</returns>
+ public static async Task<CacheNodeAdvertisment[]> ResolveWellKnownAsync(this CacheClientConfiguration config, CancellationToken cancellation)
+ {
+ _ = config?.WellKnownNodes ?? throw new ArgumentNullException(nameof(config));
+
+ Task<CacheNodeAdvertisment?>[] initialAdds = new Task<CacheNodeAdvertisment?>[config.WellKnownNodes.Length];
+
+ //Discover initial advertisments from well-known addresses
+ for (int i = 0; i < config.WellKnownNodes.Length; i++)
{
- throw new ArgumentException("There must be at least one cache server defined in the client configuration");
+ initialAdds[i] = DiscoverNodeConfigAsync(config.WellKnownNodes[i], config, cancellation);
}
+ //Wait for all initial adds to complete
+ await Task.WhenAll(initialAdds);
+
+ //Get the initial advertisments that arent null
+ return initialAdds.Select(static x => x.Result!).Where(static s => s != null).ToArray();
+ }
+
+ /// <summary>
+ /// Discovers ALL possible cache nodes itteritivley from the current collection of initial peers.
+ /// This will make connections to all discoverable servers and update the client configuration, with all
+ /// discovered peers
+ /// </summary>
+ /// <param name="config"></param>
+ /// <param name="initialPeers">Accepts an array of initial peers to override the endpoint discovery process</param>
+ /// <param name="cancellation">A token to cancel the operation</param>
+ /// <returns>A task that completes when all nodes have been discovered</returns>
+ /// <exception cref="ArgumentException"></exception>
+ /// <exception cref="CacheDiscoveryFailureException"></exception>
+ public static async Task DiscoverNodesAsync(this CacheClientConfiguration config, CacheNodeAdvertisment[] initialPeers, CancellationToken cancellation)
+ {
+ //Make sure at least one node defined
+ if (initialPeers == null || initialPeers.Length == 0)
+ {
+ throw new ArgumentException("There must be at least one initial peer");
+ }
+
//Get the discovery enumerator with the initial peers
- INodeDiscoveryEnumerator enumerator = config.NodeCollection.BeginDiscovery(config.InitialPeers);
+ INodeDiscoveryEnumerator enumerator = config.NodeCollection.BeginDiscovery(initialPeers);
//Start the discovery process
- await DiscoverNodesAsync(enumerator, config.AuthManager, config.ErrorHandler, cancellation);
+ await DiscoverNodesAsync(enumerator, config, config.ErrorHandler, cancellation);
//Commit nodes
config.NodeCollection.CompleteDiscovery(enumerator);
@@ -168,7 +225,7 @@ namespace VNLib.Data.Caching.Extensions
private static async Task DiscoverNodesAsync(
INodeDiscoveryEnumerator enumerator,
- ICacheAuthManager auth,
+ CacheClientConfiguration config,
ICacheDiscoveryErrorHandler? errHandler,
CancellationToken cancellation
)
@@ -184,17 +241,17 @@ namespace VNLib.Data.Caching.Extensions
}
/*
- * We are allowed to save nodes that do not have a discovery endpoint, but we cannot discover nodes from them
- * we can only use them as cache
+ * We are allowed to save nodes that do not have a discovery endpoint, but we cannot
+ * discover nodes from them we can only use them as cache
*/
- //add a random delay to avoid spamming the server
- await Task.Delay((int)Random.Shared.NextInt64(50, 500), cancellation);
+ //add a random delay to avoid spamming servers
+ await Task.Delay((int)Random.Shared.NextInt64(100, 500), cancellation);
try
{
//Discover nodes from the current node
- ICacheNodeAdvertisment[]? nodes = await GetCacheNodesAsync(enumerator.Current, auth, cancellation);
+ CacheNodeAdvertisment[]? nodes = await GetCacheNodesAsync(enumerator.Current, config, cancellation);
if (nodes != null)
{
@@ -208,78 +265,77 @@ namespace VNLib.Data.Caching.Extensions
//Handle the error
errHandler.OnDiscoveryError(enumerator.Current, ex);
}
+ catch(Exception ex)
+ {
+ throw new CacheDiscoveryFailureException($"Failed to discovery peer node {enumerator.Current?.NodeId}, cannot continue", ex);
+ }
+ }
+ }
+
+ private static async Task<CacheNodeAdvertisment?> DiscoverNodeConfigAsync(Uri serverUri, CacheClientConfiguration config, CancellationToken cancellation)
+ {
+ try
+ {
+ GetConfigRequest req = new (serverUri, config);
+
+ //Site adapter verifies response messages so we dont need to check on the response
+ byte[] data = await SiteAdapter.Value.ExecuteAsync(req, cancellation).AsBytes()
+ ?? throw new CacheDiscoveryFailureException($"No data returned from desired cache node");
+
+ //Response is jwt
+ using JsonWebToken responseJwt = JsonWebToken.ParseRaw(data);
+
+ //The entire payload is just the single serialzed advertisment
+ using JsonDocument doc = responseJwt.GetPayload();
+
+ return doc.RootElement.GetProperty("sub").Deserialize<CacheNodeAdvertisment>();
+ }
+ //Bypass cdfe when error handler is null
+ catch(CacheDiscoveryFailureException) when(config.ErrorHandler == null)
+ {
+ throw;
+ }
+ //Catch exceptions when an error handler is defined
+ catch (Exception ex) when (config.ErrorHandler != null)
+ {
+ //Handle the error
+ config.ErrorHandler.OnDiscoveryError(null!, ex);
+ return null;
+ }
+ catch (Exception ex)
+ {
+ throw new CacheDiscoveryFailureException("Failed to discover node configuration", ex);
}
}
/// <summary>
- /// Contacts the cache broker to get a list of active servers to connect to
+ /// Contacts the given server's discovery endpoint to discover a list of available
+ /// servers we can connect to
/// </summary>
/// <param name="advert">An advertisment of a server to discover other nodes from</param>
/// <param name="cancellationToken">A token to cancel the operationS</param>
- /// <param name="auth">The authentication manager</param>
+ /// <param name="config">The cache configuration object</param>
/// <returns>The list of active servers</returns>
/// <exception cref="SecurityException"></exception>
/// <exception cref="ArgumentException"></exception>
/// <exception cref="ArgumentNullException"></exception>
- public static async Task<ICacheNodeAdvertisment[]?> GetCacheNodesAsync(ICacheNodeAdvertisment advert, ICacheAuthManager auth, CancellationToken cancellationToken = default)
+ public static async Task<CacheNodeAdvertisment[]?> GetCacheNodesAsync(CacheNodeAdvertisment advert, CacheClientConfiguration config, CancellationToken cancellationToken = default)
{
_ = advert ?? throw new ArgumentNullException(nameof(advert));
- _ = auth ?? throw new ArgumentNullException(nameof(auth));
+ _ = config ?? throw new ArgumentNullException(nameof(config));
_ = advert.DiscoveryEndpoint ?? throw new ArgumentException("Advertisment does not expose an advertisment endpoint");
- string jwtBody;
-
- //Build request jwt
- using (JsonWebToken requestJwt = new())
- {
- requestJwt.WriteHeader(auth.GetJwtHeader());
- requestJwt.InitPayloadClaim()
- .AddClaim("iat", DateTimeOffset.UtcNow.ToUnixTimeMilliseconds())
- .AddClaim("nonce", RandomHash.GetRandomBase32(16))
- .CommitClaims();
-
- //sign the jwt
- auth.SignJwt(requestJwt);
-
- //Compile the jwt
- jwtBody = requestJwt.Compile();
- }
-
- //New list request
- RestRequest listRequest = new(advert.DiscoveryEndpoint, Method.Post);
-
- //Add the jwt as a string to the request body
- listRequest.AddStringBody(jwtBody, DataFormat.None);
- listRequest.AddHeader("Accept", HttpHelpers.GetContentTypeString(ContentType.Text));
- listRequest.AddHeader("Content-Type", HttpHelpers.GetContentTypeString(ContentType.Text));
-
- byte[] data;
-
- //Rent client
- using (ClientContract client = ClientPool.Lease())
- {
- //Exec list request
- RestResponse response = await client.Resource.ExecuteAsync(listRequest, cancellationToken);
-
- if (!response.IsSuccessful)
- {
- throw response.ErrorException!;
- }
+ DiscoveryRequest req = new (advert.DiscoveryEndpoint, config);
- data = response.RawBytes ?? throw new InvalidOperationException("No data returned from broker");
- }
+ //Site adapter verifies response messages so we dont need to check on the response
+ byte[] data = await SiteAdapter.Value.ExecuteAsync(req, cancellationToken).AsBytes()
+ ?? throw new InvalidOperationException($"No data returned from node {advert.NodeId}");
//Response is jwt
using JsonWebToken responseJwt = JsonWebToken.ParseRaw(data);
- //Verify the jwt
- if (!auth.VerifyJwt(responseJwt))
- {
- throw new SecurityException("Failed to verify the broker's challenge, cannot continue");
- }
-
using JsonDocument doc = responseJwt.GetPayload();
- return doc.RootElement.GetProperty("peers").Deserialize<Advertisment[]>();
+ return doc.RootElement.GetProperty("peers").Deserialize<CacheNodeAdvertisment[]>();
}
/// <summary>
@@ -313,7 +369,6 @@ namespace VNLib.Data.Caching.Extensions
ClientCacheConfig.AddOrUpdate(client, nodeConfig);
return nodeConfig;
}
-
/// <summary>
/// Waits for the client to disconnect from the server while observing
@@ -322,6 +377,8 @@ namespace VNLib.Data.Caching.Extensions
/// </summary>
/// <param name="client"></param>
/// <param name="token">A token to cancel the connection to the server</param>
+ /// <exception cref="TaskCanceledException"></exception>
+ /// <exception cref="ObjectDisposedException"></exception>
/// <returns>A task that complets when the connecion has been closed successfully</returns>
public static async Task WaitForExitAsync(this FBMClient client, CancellationToken token = default)
{
@@ -341,8 +398,26 @@ namespace VNLib.Data.Caching.Extensions
//Normal try to disconnect the socket
await client.DisconnectAsync(CancellationToken.None);
- //Notify if cancelled
- token.ThrowIfCancellationRequested();
+ //If the cancellation is completed, throw a task cancelled exception
+ if (cancellation.IsCompleted)
+ {
+ throw new TaskCanceledException("The client disconnected because the connection was cancelled");
+ }
+ }
+
+ /// <summary>
+ /// Discovers all available nodes for the current client config
+ /// </summary>
+ /// <param name="client"></param>
+ /// <param name="cancellation">A token to cancel the operation</param>
+ /// <returns>A task that completes when all nodes have been discovered</returns>
+ public static Task DiscoverAvailableNodesAsync(this FBMClient client, CancellationToken cancellation = default)
+ {
+ //Get stored config
+ CacheClientConfiguration conf = ClientCacheConfig.GetOrCreateValue(client);
+
+ //Discover all nodes
+ return conf.DiscoverNodesAsync(cancellation);
}
/// <summary>
@@ -357,16 +432,16 @@ namespace VNLib.Data.Caching.Extensions
/// <exception cref="ArgumentNullException"></exception>
/// <exception cref="SecurityException"></exception>
/// <exception cref="ObjectDisposedException"></exception>
- public static async Task<ICacheNodeAdvertisment> ConnectToRandomCacheAsync(this FBMClient client, CancellationToken cancellation = default)
+ public static async Task<CacheNodeAdvertisment> ConnectToRandomCacheAsync(this FBMClient client, CancellationToken cancellation = default)
{
//Get stored config
CacheClientConfiguration conf = ClientCacheConfig.GetOrCreateValue(client);
//Get all available nodes, or at least the initial peers
- ICacheNodeAdvertisment[]? adverts = conf.NodeCollection.GetAllNodes() ?? conf.InitialPeers ?? throw new ArgumentException("No cache nodes discovered, cannot connect");
+ CacheNodeAdvertisment[]? adverts = conf.NodeCollection.GetAllNodes() ?? throw new ArgumentException("No cache nodes discovered, cannot connect");
//Select random node from all available nodes
- ICacheNodeAdvertisment randomServer = adverts.SelectRandom();
+ CacheNodeAdvertisment randomServer = adverts.SelectRandom();
//Connect to the random server
await ConnectToCacheAsync(client, randomServer, cancellation);
@@ -388,7 +463,7 @@ namespace VNLib.Data.Caching.Extensions
/// <exception cref="ArgumentNullException"></exception>
/// <exception cref="SecurityException"></exception>
/// <exception cref="ObjectDisposedException"></exception>
- public static Task ConnectToCacheAsync(this FBMClient client, ICacheNodeAdvertisment server, CancellationToken token = default)
+ public static Task ConnectToCacheAsync(this FBMClient client, CacheNodeAdvertisment server, CancellationToken token = default)
{
_ = client ?? throw new ArgumentNullException(nameof(client));
_ = server ?? throw new ArgumentNullException(nameof(server));
@@ -414,7 +489,7 @@ namespace VNLib.Data.Caching.Extensions
/// <exception cref="ArgumentNullException"></exception>
/// <exception cref="SecurityException"></exception>
/// <exception cref="ObjectDisposedException"></exception>
- public static Task ConnectToCacheAsync(this FBMClient client, ICacheNodeAdvertisment server, CacheClientConfiguration explicitConfig, CancellationToken token = default)
+ public static Task ConnectToCacheAsync(this FBMClient client, CacheNodeAdvertisment server, CacheClientConfiguration explicitConfig, CancellationToken token = default)
{
_ = client ?? throw new ArgumentNullException(nameof(client));
_ = server ?? throw new ArgumentNullException(nameof(server));
@@ -427,50 +502,10 @@ namespace VNLib.Data.Caching.Extensions
private static async Task ConnectToCacheAsync(
FBMClient client,
CacheClientConfiguration config,
- ICacheNodeAdvertisment server,
+ CacheNodeAdvertisment server,
CancellationToken token = default
)
{
- //build ws uri from the connect endpoint
- UriBuilder uriBuilder = new(server.ConnectEndpoint)
- {
- Scheme = config.UseTls ? "wss://" : "ws://"
- };
-
- string challenge = RandomHash.GetRandomBase32(24);
-
- //See if the supplied config is for a cache node
- CacheNodeConfiguration? cnc = config as CacheNodeConfiguration;
-
- string jwtMessage;
- //Init jwt for connecting to server
- using (JsonWebToken jwt = new())
- {
- jwt.WriteHeader(config.AuthManager.GetJwtHeader());
-
- //Init claim
- JwtPayload claim = jwt.InitPayloadClaim();
-
- claim.AddClaim("chl", challenge);
-
- if (!string.IsNullOrWhiteSpace(cnc?.NodeId))
- {
- /*
- * The unique node id so the other nodes know to load the
- * proper event queue for the current server
- */
- claim.AddClaim("sub", cnc.NodeId);
- }
-
- claim.CommitClaims();
-
- //Sign jwt
- config.AuthManager.SignJwt(jwt);
-
- //Compile to string
- jwtMessage = jwt.Compile();
- }
-
/*
* During a server negiation, the client makes an intial get request to the cache endpoint
* and passes some client negiation terms as a signed message to the server. The server then
@@ -479,52 +514,34 @@ namespace VNLib.Data.Caching.Extensions
* The response from the server is essentailly the 'access token'
*/
- RestRequest negotation = new(server.ConnectEndpoint, Method.Get);
- //Set the jwt auth header for negotiation
- negotation.AddHeader("Authorization", jwtMessage);
- negotation.AddHeader("Accept", HttpHelpers.GetContentTypeString(ContentType.Text));
-
client.LogDebug("Negotiating with cache server");
-
- string authToken;
-
- //rent client
- using (ClientContract clientContract = ClientPool.Lease())
- {
- //Execute the request
- RestResponse response = await clientContract.Resource.ExecuteGetAsync(negotation, token);
- response.ThrowIfError();
-
- if (response.Content == null)
- {
- throw new FBMServerNegiationException("Failed to negotiate with the server, no response");
- }
-
- //Raw content
- authToken = response.Content;
- }
+ //Create a new connection negotiation
+ NegotationRequest req = new(server.ConnectEndpoint, config);
- //Parse the jwt
- using (JsonWebToken jwt = JsonWebToken.Parse(authToken))
- {
- //Verify the jwt
- if (!config.AuthManager.VerifyJwt(jwt))
- {
- throw new SecurityException("Failed to verify the cache server's negotiation message, cannot continue");
- }
+ //Exec negotiation
+ RestResponse response = await SiteAdapter.Value.ExecuteAsync(req, token);
+ /*
+ * JWT will already be veified by the endpoint adapter, so we
+ * just need to validate the server's buffer configuration
+ */
+ using (JsonWebToken jwt = JsonWebToken.ParseRaw(response.RawBytes))
+ {
//Confirm the server's buffer configuration
- ValidateServerNegotation(client, challenge, jwt);
+ ValidateServerNegotation(client, jwt);
}
client.LogDebug("Server negotiation validated, connecting to server");
//The client authorization header is the exact response
- client.ClientSocket.Headers[HttpRequestHeader.Authorization] = authToken;
+ client.ClientSocket.Headers[HttpRequestHeader.Authorization] = response.Content!;
+
+ //See if the supplied config is for a cache node
+ CacheNodeConfiguration? cnc = config as CacheNodeConfiguration;
//Compute the signature of the upgrade token
- client.ClientSocket.Headers[X_UPGRADE_SIG_HEADER] = config.AuthManager.GetBase64UpgradeSingature(authToken);
+ client.ClientSocket.Headers[X_UPGRADE_SIG_HEADER] = config.AuthManager.GetBase64UpgradeSignature(response.Content, cnc != null);
//Check to see if adversize self is enabled
if (cnc?.BroadcastAdverisment == true)
@@ -533,11 +550,17 @@ namespace VNLib.Data.Caching.Extensions
client.ClientSocket.Headers[X_NODE_DISCOVERY_HEADER] = GetAdvertismentHeader(cnc);
}
+ //build ws uri from the connect endpoint
+ UriBuilder uriBuilder = new(server.ConnectEndpoint)
+ {
+ Scheme = config.UseTls ? "wss://" : "ws://"
+ };
+
//Connect async
await client.ConnectAsync(uriBuilder.Uri, token);
}
- private static void ValidateServerNegotation(FBMClient client, string challenge, JsonWebToken jwt)
+ private static void ValidateServerNegotation(FBMClient client, JsonWebToken jwt)
{
try
{
@@ -548,15 +571,6 @@ namespace VNLib.Data.Caching.Extensions
.EnumerateObject()
.ToDictionary(static k => k.Name, static v => v.Value);
- //get the challenge response
- string challengeResponse = args["chl"].GetString()!;
-
- //Check the challenge response
- if (!challenge.Equals(challengeResponse, StringComparison.Ordinal))
- {
- throw new FBMServerNegiationException("Failed to negotiate with the server, challenge response does not match");
- }
-
//Get the negiation values
uint recvBufSize = args[FBMClient.REQ_RECV_BUF_QUERY_ARG].GetUInt32();
uint headerBufSize = args[FBMClient.REQ_HEAD_BUF_QUERY_ARG].GetUInt32();
@@ -593,7 +607,7 @@ namespace VNLib.Data.Caching.Extensions
* compute a signature of the upgrade token and send it to the server to prove we hold the private key.
*/
- private static string GetBase64UpgradeSingature(this ICacheAuthManager man, string? token)
+ private static string GetBase64UpgradeSignature(this ICacheAuthManager man, string? token, bool isPeer)
{
//Compute hash of the token
byte[] hash = ManagedHash.ComputeHash(token, HashAlg.SHA256);
@@ -604,19 +618,7 @@ namespace VNLib.Data.Caching.Extensions
//Return the base64 string
return Convert.ToBase64String(sig);
}
-
- /// <summary>
- /// Verifies the signed auth token against the given verification key
- /// </summary>
- /// <param name="signature">The base64 signature of the token</param>
- /// <param name="token">The raw token to compute the hash of</param>
- /// <param name="nodeConfig">The node configuration</param>
- /// <returns>True if the singature matches, false otherwise</returns>
- /// <exception cref="CryptographicException"></exception>
- public static bool VerifyUpgradeToken(this CacheClientConfiguration nodeConfig, string signature, string token)
- {
- return VerifyUpgradeToken(nodeConfig.AuthManager, signature, token);
- }
+
/// <summary>
/// Verifies the signed auth token against the given verification key
@@ -624,10 +626,11 @@ namespace VNLib.Data.Caching.Extensions
/// <param name="man"></param>
/// <param name="signature">The base64 signature of the token</param>
/// <param name="token">The raw token to compute the hash of</param>
+ /// <param name="isPeer">A value that indicates if the connection is from a peer node</param>
/// <returns>True if the singature matches, false otherwise</returns>
/// <exception cref="ArgumentNullException"></exception>
/// <exception cref="CryptographicException"></exception>
- public static bool VerifyUpgradeToken(this ICacheAuthManager man, string signature, string token)
+ public static bool VerifyUpgradeToken(this ICacheAuthManager man, string signature, string token, bool isPeer)
{
_ = man ?? throw new ArgumentNullException(nameof(man));
@@ -637,7 +640,7 @@ namespace VNLib.Data.Caching.Extensions
//decode the signature
byte[] sig = Convert.FromBase64String(signature);
- return man.VerifyMessageHash(hash, HashAlg.SHA256, sig);
+ return man.VerifyMessageHash(hash, HashAlg.SHA256, sig, isPeer);
}
private static string GetAdvertismentHeader(CacheNodeConfiguration nodeConfiguration)
@@ -675,58 +678,30 @@ namespace VNLib.Data.Caching.Extensions
/// <param name="message">The advertisment message to verify</param>
/// <returns>The advertisment message if successfully verified, or null otherwise</returns>
/// <exception cref="FormatException"></exception>
- public static ICacheNodeAdvertisment? VerifyPeerAdvertisment(this ICacheAuthManager config, string message)
+ public static CacheNodeAdvertisment? VerifyPeerAdvertisment(this ICacheAuthManager config, string message)
{
using JsonWebToken jwt = JsonWebToken.Parse(message);
//Verify the signature
- if (!config.VerifyJwt(jwt))
+ if (!config.VerifyJwt(jwt, true))
{
return null;
}
//Get the payload
- return jwt.GetPayload<Advertisment>();
- }
-
+ return jwt.GetPayload<CacheNodeAdvertisment>();
+ }
/// <summary>
/// Selects a random server from a collection of active servers
/// </summary>
/// <param name="servers"></param>
/// <returns>A server selected at random</returns>
- public static ICacheNodeAdvertisment SelectRandom(this ICollection<ICacheNodeAdvertisment> servers)
+ public static CacheNodeAdvertisment SelectRandom(this ICollection<CacheNodeAdvertisment> servers)
{
//select random server
int randServer = RandomNumberGenerator.GetInt32(0, servers.Count);
return servers.ElementAt(randServer);
}
-
-
- private class Advertisment : ICacheNodeAdvertisment
- {
- [JsonIgnore]
- public Uri? ConnectEndpoint { get; set; }
-
- [JsonIgnore]
- public Uri? DiscoveryEndpoint { get; set; }
-
- [JsonPropertyName("iss")]
- public string NodeId { get; set; }
-
- [JsonPropertyName("url")]
- public string? url
- {
- get => ConnectEndpoint?.ToString();
- set => ConnectEndpoint = value == null ? null : new Uri(value);
- }
-
- [JsonPropertyName("dis")]
- public string? dis
- {
- get => DiscoveryEndpoint?.ToString();
- set => DiscoveryEndpoint = value == null ? null : new Uri(value);
- }
- }
}
}
diff --git a/lib/VNLib.Data.Caching.Extensions/src/ICacheAuthManager.cs b/lib/VNLib.Data.Caching.Extensions/src/ICacheAuthManager.cs
index e3ab868..32ae142 100644
--- a/lib/VNLib.Data.Caching.Extensions/src/ICacheAuthManager.cs
+++ b/lib/VNLib.Data.Caching.Extensions/src/ICacheAuthManager.cs
@@ -53,8 +53,9 @@ namespace VNLib.Data.Caching.Extensions
/// Verifies the given JWT
/// </summary>
/// <param name="jwt">The message to verify authenticity</param>
+ /// <param name="isPeer">A value indicating if the message is from a known node</param>
/// <returns>True of the JWT could be verified, false otherwise</returns>
- bool VerifyJwt(JsonWebToken jwt);
+ bool VerifyJwt(JsonWebToken jwt, bool isPeer);
/// <summary>
/// Signs the given message hash
@@ -70,7 +71,8 @@ namespace VNLib.Data.Caching.Extensions
/// <param name="hash">The message hash to compare</param>
/// <param name="alg">The algorithm used to produce the message hash</param>
/// <param name="signature">The message signature to verify the message against</param>
+ /// <param name="isPeer">A value indicating if the message is from a known node</param>
/// <returns>True of the signature could be verified</returns>
- bool VerifyMessageHash(ReadOnlySpan<byte> hash, HashAlg alg, ReadOnlySpan<byte> signature);
+ bool VerifyMessageHash(ReadOnlySpan<byte> hash, HashAlg alg, ReadOnlySpan<byte> signature, bool isPeer);
}
}
diff --git a/lib/VNLib.Data.Caching.ObjectCache/src/BlobCache.cs b/lib/VNLib.Data.Caching.ObjectCache/src/BlobCache.cs
index f77587b..5e28dde 100644
--- a/lib/VNLib.Data.Caching.ObjectCache/src/BlobCache.cs
+++ b/lib/VNLib.Data.Caching.ObjectCache/src/BlobCache.cs
@@ -187,9 +187,9 @@ namespace VNLib.Data.Caching.ObjectCache
{
//remove the entry and bypass the disposal
bool result = base.Remove(objectId);
-#if DEBUG
+
Debug.Assert(result == true);
-#endif
+
return true;
}
diff --git a/lib/VNLib.Data.Caching.ObjectCache/src/CacheEntry.cs b/lib/VNLib.Data.Caching.ObjectCache/src/CacheEntry.cs
index a02605e..f650cc3 100644
--- a/lib/VNLib.Data.Caching.ObjectCache/src/CacheEntry.cs
+++ b/lib/VNLib.Data.Caching.ObjectCache/src/CacheEntry.cs
@@ -24,6 +24,7 @@
using System;
using System.Buffers;
+using System.Diagnostics;
using System.Buffers.Binary;
using System.Runtime.CompilerServices;
@@ -204,10 +205,9 @@ namespace VNLib.Data.Caching
//Get the data segment
Span<byte> segment = GetDataSegment();
-#if DEBUG
//Test segment length is equivalent to the requested data length
- System.Diagnostics.Debug.Assert(segment.Length == data.Length);
-#endif
+ Debug.Assert(segment.Length == data.Length);
+
//Copy data segment
data.CopyTo(segment);
}
diff --git a/lib/VNLib.Plugins.Extensions.VNCache/src/RemoteCacheOperator.cs b/lib/VNLib.Plugins.Extensions.VNCache/src/RemoteCacheOperator.cs
index 1f0742d..f40f746 100644
--- a/lib/VNLib.Plugins.Extensions.VNCache/src/RemoteCacheOperator.cs
+++ b/lib/VNLib.Plugins.Extensions.VNCache/src/RemoteCacheOperator.cs
@@ -26,9 +26,9 @@ using System;
using System.Threading;
using System.Threading.Tasks;
+using VNLib.Utils.Logging;
using VNLib.Data.Caching;
using VNLib.Plugins.Extensions.Loading;
-using VNLib.Utils.Logging;
namespace VNLib.Plugins.Extensions.VNCache
{
@@ -41,7 +41,7 @@ namespace VNLib.Plugins.Extensions.VNCache
/// The background work method must be sheduled for the cache client to be
/// connected to the backing store
/// </remarks>
- public sealed class RemoteCacheOperator : IAsyncBackgroundWork, IAsyncConfigurable
+ public sealed class RemoteCacheOperator : IAsyncBackgroundWork
{
private readonly VnCacheClient _client;
private CancellationTokenSource? _tokenSource;
@@ -78,11 +78,6 @@ namespace VNLib.Plugins.Extensions.VNCache
/// Cancels the background cache client listener
/// </summary>
public void CancelListener() => _tokenSource?.Cancel();
-
- ///<inheritdoc/>
- public Task ConfigureServiceAsync(PluginBase plugin)
- {
- return _client.ConfigureServiceAsync(plugin);
- }
+
}
} \ No newline at end of file
diff --git a/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs b/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs
index f4f059b..aa3d88f 100644
--- a/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs
+++ b/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClient.cs
@@ -40,7 +40,7 @@ using VNLib.Data.Caching.Extensions;
using VNLib.Data.Caching.ObjectCache;
using VNLib.Net.Messaging.FBM.Client;
using VNLib.Plugins.Extensions.Loading;
-
+using VNLib.Data.Caching.Extensions.Clustering;
namespace VNLib.Plugins.Extensions.VNCache
{
@@ -56,7 +56,7 @@ namespace VNLib.Plugins.Extensions.VNCache
/// A base class that manages
/// </summary>
[ConfigurationName(VNCacheExtensions.CACHE_CONFIG_KEY)]
- internal class VnCacheClient : IGlobalCacheProvider, IAsyncBackgroundWork, IAsyncConfigurable
+ internal class VnCacheClient : IGlobalCacheProvider, IAsyncBackgroundWork
{
private readonly VnCacheClientConfig _config;
@@ -70,12 +70,17 @@ namespace VNLib.Plugins.Extensions.VNCache
/// </summary>
public bool IsConnected { get; private set; }
- public VnCacheClient(PluginBase pbase, IConfigScope config)
+ public VnCacheClient(PluginBase plugin, IConfigScope config)
:this(
- config.Deserialze<VnCacheClientConfig>(),
- pbase.IsDebug() ? pbase.Log : null
+ config.Deserialze<VnCacheClientConfig>(),
+ plugin.IsDebug() ? plugin.Log : null
)
- {}
+ {
+ //Set authenticator and error handler
+ Client.GetCacheConfiguration()
+ .WithAuthenticator(new AuthManager(plugin))
+ .WithErrorHandler(new DiscoveryErrHAndler(plugin.Log));
+ }
public VnCacheClient(VnCacheClientConfig config, ILogProvider? debugLog)
{
@@ -92,17 +97,7 @@ namespace VNLib.Plugins.Extensions.VNCache
//Add the configuration to the client
Client.GetCacheConfiguration()
.WithTls(config.UseTls)
- .WithInitialPeers(config.InitialNodes!);
- }
-
- public Task ConfigureServiceAsync(PluginBase plugin)
- {
- //Set authenticator
- Client.GetCacheConfiguration()
- .WithAuthenticator(new AuthManager(plugin))
- .WithErrorHandler(new DiscoveryErrHAndler(plugin.Log));
-
- return Task.CompletedTask;
+ .WithInitialPeers(config.GetInitialNodeUris());
}
/*
@@ -115,21 +110,25 @@ namespace VNLib.Plugins.Extensions.VNCache
{
while (true)
{
- //Load the server list
- ICacheNodeAdvertisment[]? servers;
+ //Discover nodes in the network
+
while (true)
{
try
{
- pluginLog.Debug("Discovering cluster nodes in broker");
+ pluginLog.Debug("Discovering cluster nodes in network");
//Get server list
- servers = await Client.DiscoverCacheNodesAsync(exitToken);
+ await Client.DiscoverAvailableNodesAsync(exitToken);
break;
}
catch (HttpRequestException re) when (re.InnerException is SocketException)
{
pluginLog.Warn("Broker server is unreachable");
}
+ catch(CacheDiscoveryFailureException ce)
+ {
+ pluginLog.Warn("Failed to discover cache servers, reason {r}", ce);
+ }
catch (Exception ex)
{
pluginLog.Warn("Failed to get server list from broker, reason {r}", ex.Message);
@@ -140,19 +139,12 @@ namespace VNLib.Plugins.Extensions.VNCache
await Task.Delay(randomMsDelay, exitToken);
}
- if (servers?.Length == 0)
- {
- pluginLog.Warn("No cluster nodes found, retrying");
- await Task.Delay(_config.RetryInterval, exitToken);
- continue;
- }
-
try
{
pluginLog.Debug("Connecting to random cache server");
//Connect to a random server
- ICacheNodeAdvertisment selected = await Client.ConnectToRandomCacheAsync(exitToken);
+ CacheNodeAdvertisment selected = await Client.ConnectToRandomCacheAsync(exitToken);
pluginLog.Debug("Connected to cache server {s}", selected.NodeId);
//Set connection status flag
@@ -304,13 +296,13 @@ namespace VNLib.Plugins.Extensions.VNCache
}
///<inheritdoc/>
- public bool VerifyJwt(JsonWebToken jwt)
+ public bool VerifyJwt(JsonWebToken jwt, bool isPeer)
{
return jwt.VerifyFromJwk(_verKey.Value);
}
///<inheritdoc/>
- public bool VerifyMessageHash(ReadOnlySpan<byte> hash, HashAlg alg, ReadOnlySpan<byte> signature)
+ public bool VerifyMessageHash(ReadOnlySpan<byte> hash, HashAlg alg, ReadOnlySpan<byte> signature, bool isPeer)
{
//try to get the rsa alg for the signing key
using RSA? rsa = _verKey.Value.GetRSAPublicKey();
@@ -332,7 +324,7 @@ namespace VNLib.Plugins.Extensions.VNCache
private sealed record class DiscoveryErrHAndler(ILogProvider Logger) : ICacheDiscoveryErrorHandler
{
- public void OnDiscoveryError(ICacheNodeAdvertisment errorNode, Exception ex)
+ public void OnDiscoveryError(CacheNodeAdvertisment errorNode, Exception ex)
{
Logger.Error("Failed to discover nodes from server {s} cause:\n{err}", errorNode.NodeId, ex);
}
diff --git a/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClientConfig.cs b/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClientConfig.cs
index 64d3e07..73fb70f 100644
--- a/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClientConfig.cs
+++ b/lib/VNLib.Plugins.Extensions.VNCache/src/VnCacheClientConfig.cs
@@ -23,9 +23,9 @@
*/
using System;
+using System.Linq;
using System.Text.Json.Serialization;
-using VNLib.Data.Caching.Extensions;
using VNLib.Plugins.Extensions.Loading;
namespace VNLib.Plugins.Extensions.VNCache
@@ -40,7 +40,7 @@ namespace VNLib.Plugins.Extensions.VNCache
/// cache server. This value will be negotiated with the server
/// during a connection upgrade
/// </summary>
- [JsonPropertyName("max_message_size")]
+ [JsonPropertyName("max_object_size")]
public int? MaxMessageSize { get; set; }
/// <summary>
@@ -82,7 +82,18 @@ namespace VNLib.Plugins.Extensions.VNCache
/// The initial peers to connect to
/// </summary>
[JsonPropertyName("initial_nodes")]
- public InitialNode[]? InitialNodes { get; set; }
+ public string[]? InitialNodes { get; set; }
+
+ /// <summary>
+ /// Gets the initial nodes as a collection of URIs
+ /// </summary>
+ /// <returns>The nodes as a collection of URIs</returns>
+ /// <exception cref="InvalidOperationException"></exception>
+ public Uri[] GetInitialNodeUris()
+ {
+ _ = InitialNodes ?? throw new InvalidOperationException("Initial nodes have not been set");
+ return InitialNodes.Select(x => new Uri(x)).ToArray();
+ }
void IOnConfigValidation.Validate()
{
@@ -108,37 +119,21 @@ namespace VNLib.Plugins.Extensions.VNCache
throw new ArgumentException("You must specify at least one initial peer", "initial_peers");
}
- foreach (InitialNode peer in InitialNodes)
- {
- _ = peer.ConnectEndpoint ?? throw new ArgumentException("You must specify a connect endpoint for each initial node", "initial_nodes");
- _ = peer.NodeId ?? throw new ArgumentException("You must specify a node id for each initial node", "initial_nodes");
- }
- }
-
- public sealed record class InitialNode : ICacheNodeAdvertisment
- {
- [JsonIgnore]
- public Uri ConnectEndpoint { get; private set; }
-
- [JsonIgnore]
- public Uri? DiscoveryEndpoint { get; private set; }
-
- [JsonPropertyName("node_id")]
- public string? NodeId { get; set; }
-
- [JsonPropertyName("connect_endpoint")]
- public string? ConnectEndpointString
- {
- get => ConnectEndpoint.ToString();
- set => ConnectEndpoint = new Uri(value!);
- }
-
- [JsonPropertyName("discovery_endpoint")]
- public string? DiscoveryEndpointString
+ //Validate initial nodes
+ foreach (Uri peer in GetInitialNodeUris())
{
- get => DiscoveryEndpoint?.ToString();
- set => DiscoveryEndpoint = value == null ? null : new Uri(value);
+ if (!peer.IsAbsoluteUri)
+ {
+ throw new ArgumentException("You must specify an absolute URI for each initial node", "initial_nodes");
+ }
+
+ //Verify http connection
+ if(peer.Scheme != Uri.UriSchemeHttp || peer.Scheme != Uri.UriSchemeHttps)
+ {
+ throw new ArgumentException("You must specify an HTTP or HTTPS URI for each initial node", "initial_nodes");
+ }
}
}
+
}
} \ No newline at end of file
diff --git a/plugins/ObjectCacheServer/src/CacheAuthKeyStore.cs b/plugins/ObjectCacheServer/src/CacheAuthKeyStore.cs
index 6725fbe..5fc700b 100644
--- a/plugins/ObjectCacheServer/src/CacheAuthKeyStore.cs
+++ b/plugins/ObjectCacheServer/src/CacheAuthKeyStore.cs
@@ -32,7 +32,6 @@ using VNLib.Hashing.IdentityUtility;
using VNLib.Plugins.Extensions.Loading;
using VNLib.Data.Caching.Extensions;
-
namespace VNLib.Data.Caching.ObjectCache.Server
{
sealed record class CacheAuthKeyStore : ICacheAuthManager
@@ -59,9 +58,10 @@ namespace VNLib.Data.Caching.ObjectCache.Server
}
///<inheritdoc/>
- public bool VerifyJwt(JsonWebToken jwt)
+ public bool VerifyJwt(JsonWebToken jwt, bool isPeer)
{
- return jwt.VerifyFromJwk(_clientPub.Value);
+ //Verify from peer server or client
+ return isPeer ? jwt.VerifyFromJwk(_cachePriv.Value) : jwt.VerifyFromJwk(_clientPub.Value);
}
/// <summary>
@@ -85,7 +85,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server
}
//try to get the ecdsa alg for the signing key
- using ECDsa? ecdsa = _cachePriv.Value.GetECDsaPublicKey();
+ using ECDsa? ecdsa = _cachePriv.Value.GetECDsaPrivateKey();
if (ecdsa != null)
{
return ecdsa.SignHash(hash);
@@ -95,17 +95,20 @@ namespace VNLib.Data.Caching.ObjectCache.Server
}
///<inheritdoc/>
- public bool VerifyMessageHash(ReadOnlySpan<byte> hash, HashAlg alg, ReadOnlySpan<byte> signature)
+ public bool VerifyMessageHash(ReadOnlySpan<byte> hash, HashAlg alg, ReadOnlySpan<byte> signature, bool isPeer)
{
+ //Determine the key to verify against
+ ReadOnlyJsonWebKey jwk = isPeer ? _cachePriv.Value : _clientPub.Value;
+
//try to get the rsa alg for the signing key
- using RSA? rsa = _clientPub.Value.GetRSAPublicKey();
+ using RSA? rsa = jwk.GetRSAPublicKey();
if (rsa != null)
{
return rsa.VerifyHash(hash, signature, alg.GetAlgName(), RSASignaturePadding.Pkcs1);
}
//try to get the ecdsa alg for the signing key
- using ECDsa? ecdsa = _clientPub.Value.GetECDsaPublicKey();
+ using ECDsa? ecdsa = jwk.GetECDsaPublicKey();
if (ecdsa != null)
{
return ecdsa.VerifyHash(hash, signature);
diff --git a/plugins/ObjectCacheServer/src/CacheEventQueueManager.cs b/plugins/ObjectCacheServer/src/CacheEventQueueManager.cs
index 049069e..3827121 100644
--- a/plugins/ObjectCacheServer/src/CacheEventQueueManager.cs
+++ b/plugins/ObjectCacheServer/src/CacheEventQueueManager.cs
@@ -104,7 +104,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server
}
//Return the node's queue
- return nq.Queue;
+ return nq;
}
///<inheritdoc/>
@@ -183,8 +183,10 @@ namespace VNLib.Data.Caching.ObjectCache.Server
//Interval to purge stale subscribers
Task IIntervalScheduleable.OnIntervalAsync(ILogProvider log, CancellationToken cancellationToken)
{
- //Purge
+ log.Debug("Purging stale peer event queues");
+
PurgeStaleSubscribers();
+
return Task.CompletedTask;
}
diff --git a/plugins/ObjectCacheServer/src/CacheListenerPubQueue.cs b/plugins/ObjectCacheServer/src/CacheListenerPubQueue.cs
index 52b6abf..9c7388e 100644
--- a/plugins/ObjectCacheServer/src/CacheListenerPubQueue.cs
+++ b/plugins/ObjectCacheServer/src/CacheListenerPubQueue.cs
@@ -25,13 +25,13 @@
using System;
using System.Threading;
using System.Threading.Tasks;
+using System.Threading.Channels;
using VNLib.Utils.Async;
using VNLib.Utils.Logging;
using VNLib.Plugins;
using VNLib.Plugins.Extensions.Loading;
-
namespace VNLib.Data.Caching.ObjectCache.Server
{
internal sealed class CacheListenerPubQueue : ICacheListenerEventQueue, IAsyncBackgroundWork
@@ -44,10 +44,13 @@ namespace VNLib.Data.Caching.ObjectCache.Server
{
_queueManager = plugin.GetOrCreateSingleton<CacheEventQueueManager>();
_logProvider = plugin.Log;
- _listenerQueue = new AsyncQueue<ChangeEvent>(false, true);
-
- //Register processing worker
- _ = plugin.ObserveWork(this, 500);
+ _listenerQueue = new AsyncQueue<ChangeEvent>(new BoundedChannelOptions(10000)
+ {
+ AllowSynchronousContinuations = true,
+ FullMode = BoundedChannelFullMode.DropOldest,
+ SingleReader = true,
+ SingleWriter = false,
+ });
}
///<inheritdoc/>
@@ -59,25 +62,25 @@ namespace VNLib.Data.Caching.ObjectCache.Server
{
//Accumulator for events
ChangeEvent[] accumulator = new ChangeEvent[accumulatorSize];
- int ptr = 0;
+ int index = 0;
//Listen for changes
while (true)
{
//Wait for next event
- accumulator[ptr++] = await _listenerQueue.DequeueAsync(exitToken);
+ accumulator[index++] = await _listenerQueue.DequeueAsync(exitToken);
//try to accumulate more events until we can't anymore
- while (_listenerQueue.TryDequeue(out ChangeEvent? ev) && ptr < accumulatorSize)
+ while (_listenerQueue.TryDequeue(out ChangeEvent? ev) && index < accumulatorSize)
{
- accumulator[ptr++] = ev;
+ accumulator[index++] = ev;
}
//Publish all events to subscribers
- _queueManager.PublishMultiple(accumulator.AsSpan(0, ptr));
+ _queueManager.PublishMultiple(accumulator.AsSpan(0, index));
//Reset pointer
- ptr = 0;
+ index = 0;
}
}
catch (OperationCanceledException)
diff --git a/plugins/ObjectCacheServer/src/CacheStore.cs b/plugins/ObjectCacheServer/src/CacheStore.cs
index e7a7c63..c1d47f6 100644
--- a/plugins/ObjectCacheServer/src/CacheStore.cs
+++ b/plugins/ObjectCacheServer/src/CacheStore.cs
@@ -30,7 +30,6 @@ using VNLib.Utils.Logging;
using VNLib.Plugins;
using VNLib.Plugins.Extensions.Loading;
-
namespace VNLib.Data.Caching.ObjectCache.Server
{
[ConfigurationName("cache")]
@@ -62,6 +61,14 @@ namespace VNLib.Data.Caching.ObjectCache.Server
private static BlobCacheListener InitializeCache(ObjectCacheServerEntry plugin, IConfigScope config)
{
+ const string CacheConfigTemplate =
+@"
+Cache Configuration:
+ Max memory: {max} Mb
+ Buckets: {bc}
+ Entries per-bucket: {mc}
+";
+
//Deserialize the cache config
CacheConfiguration cacheConf = config.Deserialze<CacheConfiguration>();
@@ -74,22 +81,24 @@ namespace VNLib.Data.Caching.ObjectCache.Server
if (cacheConf.MaxCacheEntries < 200)
{
plugin.Log.Information("Suggestion: You may want a larger cache size, you have less than 200 items in cache");
- }
-
- plugin.Log.Verbose("Creating cache store with {bc} buckets, with {mc} items/bucket", cacheConf.BucketCount, cacheConf.MaxCacheEntries);
+ }
//calculate the max memory usage
ulong maxByteSize = ((ulong)cacheConf.MaxCacheEntries * (ulong)cacheConf.BucketCount * (ulong)cacheConf.MaxMessageSize);
- //Log max memory usage
- plugin.Log.Debug("Maxium memory consumption {mx}Mb", maxByteSize / (ulong)(1024 * 1000));
-
- //Load the blob cache table system
- IBlobCacheTable bc = plugin.LoadMemoryCacheSystem(config, plugin.CacheHeap, cacheConf);
+ //Log the cache config
+ plugin.Log.Information(CacheConfigTemplate,
+ maxByteSize / (ulong)(1024 * 1000),
+ cacheConf.BucketCount,
+ cacheConf.MaxCacheEntries
+ );
//Get the event listener
ICacheListenerEventQueue queue = plugin.GetOrCreateSingleton<CacheListenerPubQueue>();
+ //Load the blob cache table system
+ IBlobCacheTable bc = plugin.LoadMemoryCacheSystem(config, plugin.CacheHeap, cacheConf);
+
//Endpoint only allows for a single reader
return new(bc, queue, plugin.Log, plugin.CacheHeap);
}
diff --git a/plugins/ObjectCacheServer/src/CacheSystemUtil.cs b/plugins/ObjectCacheServer/src/CacheSystemUtil.cs
index 669b84f..f8aedae 100644
--- a/plugins/ObjectCacheServer/src/CacheSystemUtil.cs
+++ b/plugins/ObjectCacheServer/src/CacheSystemUtil.cs
@@ -70,7 +70,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server
else
{
//Default type
- table = GetInternalBlobCache(heap, cacheConf, pCManager);
+ table = GetInternalBlobCache(heap, cacheConf, pCManager);
}
//Initialize the subsystem from the cache table
diff --git a/plugins/ObjectCacheServer/src/Distribution/CacheNodeReplicationMaanger.cs b/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs
index a55e8e2..ffdd4f4 100644
--- a/plugins/ObjectCacheServer/src/Distribution/CacheNodeReplicationMaanger.cs
+++ b/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs
@@ -3,10 +3,10 @@
*
* Library: VNLib
* Package: ObjectCacheServer
-* File: ObjectCacheServerEntry.cs
+* File: CacheNodeReplicationMaanger.cs
*
-* ObjectCacheServerEntry.cs is part of ObjectCacheServer which is part of the larger
-* VNLib collection of libraries and utilities.
+* CacheNodeReplicationMaanger.cs is part of ObjectCacheServer which is part
+* of the larger VNLib collection of libraries and utilities.
*
* ObjectCacheServer is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
@@ -35,52 +35,91 @@ using static VNLib.Data.Caching.Constants;
using VNLib.Net.Messaging.FBM;
using VNLib.Net.Messaging.FBM.Client;
using VNLib.Plugins.Extensions.Loading;
+using VNLib.Data.Caching.Extensions.Clustering;
-namespace VNLib.Data.Caching.ObjectCache.Server.Distribution
+namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
{
+
+ /*
+ * This class is responsible for replicating the cache with other nodes.
+ *
+ * It does this by connecting to other nodes and listening for change events.
+ * When a change event occurs, it takes action against the local cache store,
+ * to keep it consistent with the other nodes.
+ *
+ * Change events are only handled first-hand, meaning that events do not
+ * propagate to other nodes, they must be connected individually to each node
+ * and listen for changes.
+ */
+
internal sealed class CacheNodeReplicationMaanger : IAsyncBackgroundWork
{
+ private const string LOG_SCOPE_NAME = "REPL";
+
private static readonly TimeSpan GetItemTimeout = TimeSpan.FromSeconds(10);
+ private const int MAX_MESSAGE_SIZE = 12 * 1024;
- private readonly NodeConfig NodeConfig;
- private readonly ICachePeerAdapter PeerAdapter;
- private readonly ICacheStore CacheStore;
- private readonly FBMClientConfig ClientConfig;
- private readonly PluginBase Plugin;
+ private readonly PluginBase _plugin;
+ private readonly ILogProvider _log;
+ private readonly NodeConfig _nodeConfig;
+ private readonly ICacheStore _cacheStore;
+ private readonly ICachePeerAdapter _peerAdapter;
+ private readonly FBMClientConfig _replicationClientConfig;
+
+ private readonly bool _isDebug;
- private CacheNodeConfiguration CacheConfig => NodeConfig.Config;
+ private int _openConnections;
public CacheNodeReplicationMaanger(PluginBase plugin)
{
//Load the node config
- NodeConfig = plugin.GetOrCreateSingleton<NodeConfig>();
-
- //Get peer adapter
- PeerAdapter = plugin.GetOrCreateSingleton<PeerDiscoveryManager>();
-
- CacheStore = plugin.GetOrCreateSingleton<CacheStore>();
+ _nodeConfig = plugin.GetOrCreateSingleton<NodeConfig>();
+ _cacheStore = plugin.GetOrCreateSingleton<CacheStore>();
+ _peerAdapter = plugin.GetOrCreateSingleton<PeerDiscoveryManager>();
+
+ //Init fbm config with fixed message size
+ _replicationClientConfig = FBMDataCacheExtensions.GetDefaultConfig(
+ (plugin as ObjectCacheServerEntry)!.CacheHeap,
+ MAX_MESSAGE_SIZE,
+ debugLog: plugin.IsDebug() ? plugin.Log : null
+ );
+
+ _plugin = plugin;
+ _isDebug = plugin.IsDebug();
+ _log = plugin.Log.CreateScope(LOG_SCOPE_NAME);
}
public async Task DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken)
{
- pluginLog.Information("[REPL] Initializing node replication worker");
+ _log.Information("Initializing node replication worker");
try
{
while (true)
{
//Get all new peers
- ICacheNodeAdvertisment[] peers = PeerAdapter.GetNewPeers();
+ CacheNodeAdvertisment[] peers = _peerAdapter.GetNewPeers();
- if (peers.Length == 0)
+ if (peers.Length == 0 && _isDebug)
{
- pluginLog.Verbose("[REPL] No new peers to connect to");
+ _log.Verbose("No new peers to connect to");
}
- //Connect to each peer as a background task
- foreach (ICacheNodeAdvertisment peer in peers)
+ //Make sure we don't exceed the max connections
+ if(_openConnections >= _nodeConfig.MaxPeerConnections)
{
- _ = Plugin.ObserveWork(() => OnNewPeerDoWorkAsync(peer, pluginLog, exitToken));
+ if (_isDebug)
+ {
+ _log.Verbose("Max peer connections reached, waiting for a connection to close");
+ }
+ }
+ else
+ {
+ //Connect to each peer as a background task
+ foreach (CacheNodeAdvertisment peer in peers)
+ {
+ _ = _plugin.ObserveWork(() => OnNewPeerDoWorkAsync(peer, _log, exitToken));
+ }
}
//Wait for a new peers
@@ -93,7 +132,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Distribution
}
catch
{
- pluginLog.Error("[REPL] Node replication worker exited with an error");
+ _log.Error("Node replication worker exited with an error");
throw;
}
finally
@@ -101,25 +140,27 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Distribution
}
- pluginLog.Information("[REPL] Node replication worker exited");
+ _log.Information("Node replication worker exited");
}
- private async Task OnNewPeerDoWorkAsync(ICacheNodeAdvertisment newPeer, ILogProvider log, CancellationToken exitToken)
+ private async Task OnNewPeerDoWorkAsync(CacheNodeAdvertisment newPeer, ILogProvider log, CancellationToken exitToken)
{
_ = newPeer ?? throw new ArgumentNullException(nameof(newPeer));
//Setup client
- FBMClient client = new(ClientConfig);
+ FBMClient client = new(_replicationClientConfig);
//Add peer to monitor
- PeerAdapter.OnPeerListenerAttached(newPeer);
+ _peerAdapter.OnPeerListenerAttached(newPeer);
+
+ Interlocked.Increment(ref _openConnections);
try
{
log.Information("Establishing replication connection to peer {server}...", newPeer.NodeId);
//Connect to the server
- await client.ConnectToCacheAsync(newPeer, CacheConfig, exitToken);
+ await client.ConnectToCacheAsync(newPeer, _nodeConfig.Config, exitToken);
log.Information("Connected to {server}, starting queue listeners", newPeer.NodeId);
@@ -176,16 +217,21 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Distribution
}
finally
{
+ Interlocked.Decrement(ref _openConnections);
+
client.Dispose();
//Notify monitor of disconnect
- PeerAdapter.OnPeerListenerDetatched(newPeer);
+ _peerAdapter.OnPeerListenerDetatched(newPeer);
}
}
//Wroker task callback method
private async Task ReplicationWorkerDoWorkAsync(FBMClient client, ILogProvider log, CancellationToken exitToken)
{
+ //Reusable request message
+ using FBMRequest request = new(client.Config);
+
//Listen for changes
while (true)
{
@@ -197,53 +243,47 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Distribution
switch (changedObject.Status)
{
case ResponseCodes.NotFound:
- log.Warn("Server cache not properly configured, worker exiting");
+ log.Error("Server cache not properly configured, worker exiting");
return;
case "deleted":
//Delete the object from the store
- await CacheStore.DeleteItemAsync(changedObject.CurrentId, CancellationToken.None);
+ await _cacheStore.DeleteItemAsync(changedObject.CurrentId, CancellationToken.None);
break;
case "modified":
//Reload the record from the store
- await UpdateRecordAsync(client, log, changedObject.CurrentId, changedObject.NewId, exitToken);
+ await UpdateRecordAsync(client, request, log, changedObject.CurrentId, changedObject.NewId, exitToken);
break;
}
+
+ //Reset request message
+ request.Reset();
}
}
- private async Task UpdateRecordAsync(FBMClient client, ILogProvider log, string objectId, string newId, CancellationToken cancellation)
+ private async Task UpdateRecordAsync(FBMClient client, FBMRequest modRequest, ILogProvider log, string objectId, string newId, CancellationToken cancellation)
{
- //Get request message
- FBMRequest modRequest = client.RentRequest();
- try
- {
- //Set action as get/create
- modRequest.WriteHeader(HeaderCommand.Action, Actions.Get);
- //Set session-id header
- modRequest.WriteHeader(ObjectId, string.IsNullOrWhiteSpace(newId) ? objectId : newId);
+ //Set action as get/create
+ modRequest.WriteHeader(HeaderCommand.Action, Actions.Get);
+ //Set session-id header
+ modRequest.WriteHeader(ObjectId, string.IsNullOrWhiteSpace(newId) ? objectId : newId);
- //Make request
- using FBMResponse response = await client.SendAsync(modRequest, GetItemTimeout, cancellation);
+ //Make request
+ using FBMResponse response = await client.SendAsync(modRequest, GetItemTimeout, cancellation);
- response.ThrowIfNotSet();
+ response.ThrowIfNotSet();
- //Check response code
- string status = response.Headers.First(static s => s.Header == HeaderCommand.Status).Value.ToString();
+ //Check response code
+ string status = response.Headers.First(static s => s.Header == HeaderCommand.Status).Value.ToString();
- if (ResponseCodes.Okay.Equals(status, StringComparison.Ordinal))
- {
- //Update the record
- await CacheStore.AddOrUpdateBlobAsync(objectId, newId, static (t) => t.ResponseBody, response, cancellation);
- log.Debug("Updated object {id}", objectId);
- }
- else
- {
- log.Warn("Object {id} was missing on the remote server", objectId);
- }
+ if (ResponseCodes.Okay.Equals(status, StringComparison.Ordinal))
+ {
+ //Update the record
+ await _cacheStore.AddOrUpdateBlobAsync(objectId, newId, static (t) => t.ResponseBody, response, cancellation);
+ log.Debug("Updated object {id}", objectId);
}
- finally
+ else
{
- client.ReturnRequest(modRequest);
+ log.Warn("Object {id} was missing on the remote server", objectId);
}
}
}
diff --git a/plugins/ObjectCacheServer/src/Distribution/CachePeerMonitor.cs b/plugins/ObjectCacheServer/src/Clustering/CachePeerMonitor.cs
index f191c9d..c49a54b 100644
--- a/plugins/ObjectCacheServer/src/Distribution/CachePeerMonitor.cs
+++ b/plugins/ObjectCacheServer/src/Clustering/CachePeerMonitor.cs
@@ -23,21 +23,38 @@
*/
using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
using System.Collections.Generic;
+using VNLib.Utils;
+using VNLib.Utils.Extensions;
using VNLib.Plugins;
-namespace VNLib.Data.Caching.ObjectCache.Server.Distribution
+namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
{
- internal sealed class CachePeerMonitor : IPeerMonitor
+ internal sealed class CachePeerMonitor : VnDisposeable, IPeerMonitor
{
private readonly LinkedList<ICachePeer> peers = new();
+ private readonly ManualResetEvent newPeerTrigger = new (false);
public CachePeerMonitor(PluginBase plugin)
{ }
+ /// <summary>
+ /// Waits for new peers to connect to the server
+ /// </summary>
+ /// <returns>A task that complets when a new peer has connected</returns>
+ public async Task WaitForChangeAsync()
+ {
+ await newPeerTrigger.WaitAsync();
+
+ //Reset the trigger for next call
+ newPeerTrigger.Reset();
+ }
+
///<inheritdoc/>
public IEnumerable<ICachePeer> GetAllPeers()
{
@@ -55,6 +72,12 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Distribution
{
peers.AddLast(peer);
}
+
+ //Trigger monitor when change occurs
+ if(peer.Advertisment != null)
+ {
+ newPeerTrigger.Set();
+ }
}
///<inheritdoc/>
@@ -66,5 +89,10 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Distribution
peers.Remove(peer);
}
}
+
+ protected override void Free()
+ {
+ newPeerTrigger.Dispose();
+ }
}
}
diff --git a/plugins/ObjectCacheServer/src/Distribution/ICachePeerAdapter.cs b/plugins/ObjectCacheServer/src/Clustering/ICachePeerAdapter.cs
index c3fb022..dd426f6 100644
--- a/plugins/ObjectCacheServer/src/Distribution/ICachePeerAdapter.cs
+++ b/plugins/ObjectCacheServer/src/Clustering/ICachePeerAdapter.cs
@@ -22,9 +22,9 @@
* along with this program. If not, see https://www.gnu.org/licenses/.
*/
-using VNLib.Data.Caching.Extensions;
+using VNLib.Data.Caching.Extensions.Clustering;
-namespace VNLib.Data.Caching.ObjectCache.Server.Distribution
+namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
{
internal interface ICachePeerAdapter
{
@@ -32,18 +32,18 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Distribution
/// Gets the peers that have been discovered but not yet connected to
/// </summary>
/// <returns>A collection of peers that have not been connected to yet</returns>
- ICacheNodeAdvertisment[] GetNewPeers();
+ CacheNodeAdvertisment[] GetNewPeers();
/// <summary>
/// Called when a peer has been connected to
/// </summary>
/// <param name="peer">The peer that has been connected</param>
- void OnPeerListenerAttached(ICacheNodeAdvertisment peer);
+ void OnPeerListenerAttached(CacheNodeAdvertisment peer);
/// <summary>
/// Called when a peer has been disconnected from
/// </summary>
/// <param name="peer">The disconnected peer</param>
- void OnPeerListenerDetatched(ICacheNodeAdvertisment peer);
+ void OnPeerListenerDetatched(CacheNodeAdvertisment peer);
}
}
diff --git a/plugins/ObjectCacheServer/src/Distribution/IPeerMonitor.cs b/plugins/ObjectCacheServer/src/Clustering/IPeerMonitor.cs
index 028171f..d8f358f 100644
--- a/plugins/ObjectCacheServer/src/Distribution/IPeerMonitor.cs
+++ b/plugins/ObjectCacheServer/src/Clustering/IPeerMonitor.cs
@@ -24,7 +24,7 @@
using System.Collections.Generic;
-namespace VNLib.Data.Caching.ObjectCache.Server.Distribution
+namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
{
/// <summary>
/// Represents a monitor for peer cache servers to advertise their presence
@@ -33,7 +33,8 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Distribution
internal interface IPeerMonitor
{
/// <summary>
- /// Notifies the monitor that a peer has connected to the cluster
+ /// Notifies the monitor that a remote peer has connected to the current node for
+ /// replication
/// </summary>
/// <param name="peer">The peer that connected</param>
void OnPeerConnected(ICachePeer peer);
diff --git a/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs b/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs
new file mode 100644
index 0000000..f132cab
--- /dev/null
+++ b/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs
@@ -0,0 +1,268 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: ObjectCacheServer
+* File: PeerDiscoveryManager.cs
+*
+* PeerDiscoveryManager.cs is part of ObjectCacheServer which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* ObjectCacheServer is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* ObjectCacheServer is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+using System.Linq;
+using System.Net.Http;
+using System.Threading;
+using System.Net.Sockets;
+using System.Threading.Tasks;
+using System.Collections.Generic;
+
+using VNLib.Utils.Logging;
+using VNLib.Data.Caching.Extensions;
+using VNLib.Plugins;
+using VNLib.Plugins.Extensions.Loading;
+using VNLib.Data.Caching.Extensions.Clustering;
+
+namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
+{
+
+ /*
+ * This class is responsible for resolving and discovering peer nodes in the cluster network.
+ */
+
+ internal sealed class PeerDiscoveryManager : IAsyncBackgroundWork, ICachePeerAdapter
+ {
+ private const string LOG_SCOPE_NAME = "DISC";
+ private static readonly TimeSpan InitialDelay = TimeSpan.FromSeconds(15);
+
+ private readonly List<CacheNodeAdvertisment> _connectedPeers;
+ private readonly NodeConfig _config;
+ private readonly CachePeerMonitor _monitor;
+ private readonly bool IsDebug;
+ private readonly ILogProvider _log;
+
+ public PeerDiscoveryManager(PluginBase plugin)
+ {
+ //Get config
+ _config = plugin.GetOrCreateSingleton<NodeConfig>();
+
+ //Get the known peers array from config, its allowed to be null for master nodes
+ IConfigScope? config = plugin.TryGetConfig("known_peers");
+ string[] kownPeers = config?.Deserialze<string[]>() ?? Array.Empty<string>();
+
+ //Add known peers to the monitor
+ _config.Config.WithInitialPeers(kownPeers.Select(static s => new Uri(s)));
+
+ plugin.Log.Information("Inital peer nodes: {nodes}", kownPeers);
+
+ //Get the peer monitor
+ _monitor = plugin.GetOrCreateSingleton<CachePeerMonitor>();
+
+ _connectedPeers = new();
+
+ //Create scoped logger
+ _log = plugin.Log.CreateScope(LOG_SCOPE_NAME);
+
+ //Setup discovery error handler
+ _config.Config.WithErrorHandler(new ErrorHandler(_log));
+
+ IsDebug = plugin.IsDebug();
+ }
+
+ async Task IAsyncBackgroundWork.DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken)
+ {
+ /*
+ * This loop uses the peer monitor to keep track of all connected peers, then gets
+ * all the advertising peers (including known peers) and resolves all nodes across
+ * the network.
+ */
+
+ //Start the change listener
+ Task watcher = WatchForPeersAsync(exitToken);
+
+ _log.Information("Node discovery worker started, waiting for {idel} to start initial discovery", InitialDelay);
+
+ try
+ {
+ //Wait for the initial delay
+ await Task.Delay(InitialDelay, exitToken);
+
+ _log.Debug("Begining discovery loop");
+
+ /*
+ * To avoid connecting to ourself, we add ourselves to the connected list
+ * and it should never get removed. This is because the monitor will never
+ * report our own advertisment.
+ */
+ _connectedPeers.Add(_config.Config.Advertisment);
+
+ while (true)
+ {
+ try
+ {
+ if (IsDebug)
+ {
+ _log.Debug("Begining node discovery");
+ }
+
+ //Resolve all known peers
+ CacheNodeAdvertisment[] wellKnown = await _config.Config.ResolveWellKnownAsync(exitToken);
+
+ //Use the monitor to get the initial peers
+ IEnumerable<CacheNodeAdvertisment> ads = GetMonitorAds();
+
+ //Combine well-known with new connected peers
+ CacheNodeAdvertisment[] allAds = ads.Union(wellKnown).ToArray();
+
+ if (allAds.Length > 0)
+ {
+ //Discover all kown nodes
+ await _config.Config.DiscoverNodesAsync(allAds, exitToken);
+ }
+
+ //Log the discovered nodes if verbose logging is enabled
+ if (IsDebug)
+ {
+ CacheNodeAdvertisment[] found = _config.Config.NodeCollection.GetAllNodes();
+
+ _log.Debug("Discovered {count} nodes\n\t{nodes}", found.Length, found.Select(static s => s.NodeId));
+ }
+ }
+ catch(OperationCanceledException)
+ {
+ throw;
+ }
+ catch (Exception ex)
+ {
+ _log.Error(ex, "Failed to discover new peer nodes");
+ }
+
+ //Delay the next discovery
+ await Task.Delay(_config.DiscoveryInterval, exitToken);
+ }
+ }
+ catch (OperationCanceledException)
+ {
+ //Normal exit
+ _log.Information("Node discovery worker exiting");
+ }
+ finally
+ {
+
+ }
+
+ //Wait for the watcher to exit
+ await watcher.ConfigureAwait(false);
+ }
+
+ private IEnumerable<CacheNodeAdvertisment> GetMonitorAds()
+ {
+ return _monitor.GetAllPeers()
+ .Where(static p => p.Advertisment != null)
+ //Without us
+ .Where(n => n.NodeId != _config.Config.NodeId)
+ .Select(static p => p.Advertisment!);
+ }
+
+ //Wait for new peers and update the collection
+ private async Task WatchForPeersAsync(CancellationToken cancellation)
+ {
+ try
+ {
+ _log.Debug("Discovery worker waiting for new peers to connect");
+
+ while (true)
+ {
+
+ //Wait for changes, then get new peers
+ await _monitor.WaitForChangeAsync().WaitAsync(cancellation);
+
+ _log.Verbose("New peers connected");
+
+ //Use the monitor to get the initial peers
+ IEnumerable<CacheNodeAdvertisment> ads = GetMonitorAds();
+
+ ((NodeDiscoveryCollection)_config.Config.NodeCollection).AddManualNodes(ads);
+ }
+ }
+ catch (OperationCanceledException)
+ {
+ //Normal ext
+ _log.Debug("Connected peer listener exited");
+ }
+ }
+
+
+ ///<inheritdoc/>
+ public CacheNodeAdvertisment[] GetNewPeers()
+ {
+ lock (_connectedPeers)
+ {
+ //Get all discovered peers
+ CacheNodeAdvertisment[] peers = _config.Config.NodeCollection.GetAllNodes();
+
+ //Get the difference between the discovered peers and the connected peers
+ return peers.Except(_connectedPeers).ToArray();
+ }
+ }
+
+ ///<inheritdoc/>
+ public void OnPeerListenerAttached(CacheNodeAdvertisment peer)
+ {
+ lock (_connectedPeers)
+ {
+ //Add to connected peers
+ _connectedPeers.Add(peer);
+ }
+ }
+
+ ///<inheritdoc/>
+ public void OnPeerListenerDetatched(CacheNodeAdvertisment peer)
+ {
+ //remove from connected peers
+ lock (_connectedPeers)
+ {
+ _connectedPeers.Remove(peer);
+ }
+ }
+
+
+ private sealed record class ErrorHandler(ILogProvider Logger) : ICacheDiscoveryErrorHandler
+ {
+ public void OnDiscoveryError(CacheNodeAdvertisment errorNode, Exception ex)
+ {
+
+ if (ex is HttpRequestException hre)
+ {
+ if (hre.InnerException is SocketException se)
+ {
+ //traisnport failed
+ Logger.Warn("Failed to connect to server {serv} because {err}", errorNode, se.Message);
+ }
+ else
+ {
+ Logger.Error("Failed to connect to node {n}\n{err}", errorNode, hre);
+ }
+ }
+ else
+ {
+ Logger.Error("Failed to discover nodes from nodeid {nid}, with error\n{err}", errorNode, ex);
+ }
+
+ }
+ }
+ }
+}
diff --git a/plugins/ObjectCacheServer/src/Distribution/KnownPeerList.cs b/plugins/ObjectCacheServer/src/Distribution/KnownPeerList.cs
deleted file mode 100644
index 74df81f..0000000
--- a/plugins/ObjectCacheServer/src/Distribution/KnownPeerList.cs
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
-* Copyright (c) 2023 Vaughn Nugent
-*
-* Library: VNLib
-* Package: ObjectCacheServer
-* File: KnownPeerList.cs
-*
-* KnownPeerList.cs is part of ObjectCacheServer which is part of the larger
-* VNLib collection of libraries and utilities.
-*
-* ObjectCacheServer is free software: you can redistribute it and/or modify
-* it under the terms of the GNU Affero General Public License as
-* published by the Free Software Foundation, either version 3 of the
-* License, or (at your option) any later version.
-*
-* ObjectCacheServer is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-* GNU Affero General Public License for more details.
-*
-* You should have received a copy of the GNU Affero General Public License
-* along with this program. If not, see https://www.gnu.org/licenses/.
-*/
-
-using System;
-using System.Linq;
-using System.Collections.Generic;
-using System.Text.Json.Serialization;
-
-using VNLib.Plugins;
-using VNLib.Data.Caching.Extensions;
-using VNLib.Plugins.Extensions.Loading;
-
-
-namespace VNLib.Data.Caching.ObjectCache.Server.Distribution
-{
- [ConfigurationName("known_peers")]
- internal sealed class KnownPeerList
- {
- private readonly List<KnownPeer> _peers;
-
- public KnownPeerList(PluginBase plugin, IConfigScope config)
- {
- //Deserialze the known peers into an array
- KnownPeer[] peers = config.Deserialze<KnownPeer[]>();
-
- foreach (KnownPeer peer in peers)
- {
- //Validate the peer
- peer.Validate();
- }
-
- _peers = peers?.ToList() ?? new();
- }
-
- public IEnumerable<ICacheNodeAdvertisment> GetPeers()
- {
- return _peers;
- }
-
- private sealed class KnownPeer : ICacheNodeAdvertisment
- {
- public Uri? ConnectEndpoint { get; set; }
- public Uri? DiscoveryEndpoint { get; set; }
-
- [JsonPropertyName("node_id")]
- public string NodeId { get; set; }
-
- [JsonPropertyName("connect_url")]
- public string? ConnectEpPath
- {
- get => ConnectEndpoint?.ToString() ?? string.Empty;
- set => ConnectEndpoint = new Uri(value ?? string.Empty);
- }
-
- [JsonPropertyName("discovery_url")]
- public string? DiscoveryEpPath
- {
- get => DiscoveryEndpoint?.ToString() ?? string.Empty;
- set => DiscoveryEndpoint = new Uri(value ?? string.Empty);
- }
-
- public void Validate()
- {
- if (string.IsNullOrWhiteSpace(NodeId))
- {
- throw new ArgumentException("Node ID cannot be null or whitespace");
- }
- if (ConnectEndpoint is null)
- {
- throw new ArgumentException("Connect endpoint cannot be null");
- }
- if (DiscoveryEndpoint is null)
- {
- throw new ArgumentException("Discovery endpoint cannot be null");
- }
- }
- }
- }
-}
diff --git a/plugins/ObjectCacheServer/src/Distribution/PeerDiscoveryManager.cs b/plugins/ObjectCacheServer/src/Distribution/PeerDiscoveryManager.cs
deleted file mode 100644
index 26ec565..0000000
--- a/plugins/ObjectCacheServer/src/Distribution/PeerDiscoveryManager.cs
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
-* Copyright (c) 2023 Vaughn Nugent
-*
-* Library: VNLib
-* Package: ObjectCacheServer
-* File: PeerDiscoveryManager.cs
-*
-* PeerDiscoveryManager.cs is part of ObjectCacheServer which is part of the larger
-* VNLib collection of libraries and utilities.
-*
-* ObjectCacheServer is free software: you can redistribute it and/or modify
-* it under the terms of the GNU Affero General Public License as
-* published by the Free Software Foundation, either version 3 of the
-* License, or (at your option) any later version.
-*
-* ObjectCacheServer is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-* GNU Affero General Public License for more details.
-*
-* You should have received a copy of the GNU Affero General Public License
-* along with this program. If not, see https://www.gnu.org/licenses/.
-*/
-
-using System;
-using System.Linq;
-using System.Threading;
-using System.Threading.Tasks;
-using System.Collections.Generic;
-
-using VNLib.Plugins;
-using VNLib.Utils.Logging;
-using VNLib.Data.Caching.Extensions;
-using VNLib.Plugins.Extensions.Loading;
-
-
-namespace VNLib.Data.Caching.ObjectCache.Server.Distribution
-{
-
- sealed class PeerDiscoveryManager : IAsyncBackgroundWork, ICachePeerAdapter
- {
- private readonly List<ICacheNodeAdvertisment> _connectedPeers;
- private readonly NodeConfig _config;
- private readonly IPeerMonitor _monitor;
- private readonly KnownPeerList _knownPeers;
-
- public PeerDiscoveryManager(PluginBase plugin)
- {
- //Get config
- _config = plugin.GetOrCreateSingleton<NodeConfig>();
-
- //Get the peer monitor
- _monitor = plugin.GetOrCreateSingleton<CachePeerMonitor>();
-
- //Get the known peer list
- _knownPeers = plugin.GetOrCreateSingleton<KnownPeerList>();
-
- _connectedPeers = new();
-
- //Setup discovery error handler
- _config.Config.WithErrorHandler(new ErrorHandler(plugin.Log));
- }
-
- async Task IAsyncBackgroundWork.DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken)
- {
- /*
- * This loop uses the peer monitor to keep track of all connected peers, then gets
- * all the advertising peers (including known peers) and resolves all nodes across
- * the network.
- */
-
- pluginLog.Information("Node discovery worker started");
-
- try
- {
- while (true)
- {
- try
- {
- //Use the monitor to get the initial peers
- IEnumerable<ICacheNodeAdvertisment> ads = _monitor.GetAllPeers()
- .Where(static p => p.Advertisment != null)
- .Select(static p => p.Advertisment!);
-
- //Add known peers to the initial list
- ads = ads.Union(_knownPeers.GetPeers());
-
- //Set initial peers
- _config.Config.WithInitialPeers(ads);
-
- //Discover all nodes
- await _config.Config.DiscoverNodesAsync(exitToken);
- }
- catch(OperationCanceledException)
- {
- throw;
- }
- catch (Exception ex)
- {
- pluginLog.Error(ex, "Failed to discover new peer nodes");
- }
-
- //Delay the next discovery
- await Task.Delay(_config.DiscoveryInterval, exitToken);
- }
- }
- catch (OperationCanceledException)
- {
- //Normal exit
- pluginLog.Information("Node discovery worker exiting");
- }
- finally
- {
-
- }
- }
-
-
- ///<inheritdoc/>
- public ICacheNodeAdvertisment[] GetNewPeers()
- {
- lock (_connectedPeers)
- {
- //Get all discovered peers
- ICacheNodeAdvertisment[] peers = _config.Config.NodeCollection.GetAllNodes();
-
- //Get the difference between the discovered peers and the connected peers
- return peers.Except(_connectedPeers).ToArray();
- }
- }
-
- ///<inheritdoc/>
- public void OnPeerListenerAttached(ICacheNodeAdvertisment peer)
- {
- lock (_connectedPeers)
- {
- //Add to connected peers
- _connectedPeers.Add(peer);
- }
- }
-
- ///<inheritdoc/>
- public void OnPeerListenerDetatched(ICacheNodeAdvertisment peer)
- {
- //remove from connected peers
- lock (_connectedPeers)
- {
- _connectedPeers.Remove(peer);
- }
- }
-
-
- private sealed record class ErrorHandler(ILogProvider Logger) : ICacheDiscoveryErrorHandler
- {
- public void OnDiscoveryError(ICacheNodeAdvertisment errorNode, Exception ex)
- {
- Logger.Error("Failed to discover nodes from nodeid {nid}, with error\n{err}", errorNode, ex);
- }
- }
- }
-}
diff --git a/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs b/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs
index 8352635..5e794f8 100644
--- a/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs
+++ b/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs
@@ -46,7 +46,7 @@ using VNLib.Plugins.Essentials.Endpoints;
using VNLib.Plugins.Essentials.Extensions;
using VNLib.Plugins.Extensions.Loading.Routing;
using VNLib.Data.Caching.ObjectCache.Server.Distribution;
-
+using VNLib.Data.Caching.Extensions.Clustering;
namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
{
@@ -54,6 +54,8 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
[ConfigurationName("connect_endpoint")]
internal sealed class ConnectEndpoint : ResourceEndpointBase
{
+ private const string LOG_SCOPE_NAME = "CONEP";
+
private static readonly TimeSpan AuthTokenExpiration = TimeSpan.FromSeconds(30);
@@ -90,7 +92,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
{
string? path = config["path"].GetString();
- InitPathAndLog(path, plugin.Log);
+ InitPathAndLog(path, plugin.Log.CreateScope(LOG_SCOPE_NAME));
//Check for ip-verification flag
VerifyIp = config.TryGetValue("verify_ip", out JsonElement vIpEl) && vIpEl.GetBoolean();
@@ -152,28 +154,24 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
// Parse jwt
using (JsonWebToken jwt = JsonWebToken.Parse(jwtAuth))
{
- bool verified = false;
-
//verify signature for client
- if (NodeConfiguration.KeyStore.VerifyJwt(jwt))
+ if (NodeConfiguration.KeyStore.VerifyJwt(jwt, false))
{
- verified = true;
+ //Validated
}
//May be signed by a cache server
- else
+ else if(NodeConfiguration.KeyStore.VerifyJwt(jwt, true))
{
//Set peer and verified flag since the another cache server signed the request
- isPeer = verified = NodeConfiguration.KeyStore.VerifyCachePeer(jwt);
+ isPeer = true;
}
-
- //Check flag
- if (!verified)
+ else
{
Log.Information("Client signature verification failed");
entity.CloseResponse(HttpStatusCode.Unauthorized);
return VfReturnType.VirtualSkip;
}
-
+
//Recover json body
using JsonDocument doc = jwt.GetPayload();
@@ -196,6 +194,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
auth.WriteHeader(NodeConfiguration.KeyStore.GetJwtHeader());
auth.InitPayloadClaim()
.AddClaim("aud", AudienceLocalServerId)
+ .AddClaim("iat", entity.RequestedTimeUtc.ToUnixTimeSeconds())
.AddClaim("exp", entity.RequestedTimeUtc.Add(AuthTokenExpiration).ToUnixTimeSeconds())
.AddClaim("nonce", RandomHash.GetRandomBase32(8))
.AddClaim("chl", challenge!)
@@ -240,7 +239,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
}
string? nodeId = null;
- ICacheNodeAdvertisment? discoveryAd = null;
+ CacheNodeAdvertisment? discoveryAd = null;
//Parse jwt
using (JsonWebToken jwt = JsonWebToken.Parse(jwtAuth))
@@ -299,15 +298,15 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
//Verify the signature the client included of the auth token
- if (isPeer)
+ //Verify token signature against a fellow cache public key
+ if (!NodeConfiguration.KeyStore.VerifyUpgradeToken(clientSignature, jwtAuth, isPeer))
{
- //Verify token signature against a fellow cache public key
- if (!NodeConfiguration.KeyStore.VerifyUpgradeToken(clientSignature, jwtAuth))
- {
- entity.CloseResponse(HttpStatusCode.Unauthorized);
- return VfReturnType.VirtualSkip;
- }
+ entity.CloseResponse(HttpStatusCode.Unauthorized);
+ return VfReturnType.VirtualSkip;
+ }
+ if (isPeer)
+ {
//Try to get the node advertisement header
string? discoveryHeader = entity.Server.Headers[FBMDataCacheExtensions.X_NODE_DISCOVERY_HEADER];
@@ -317,15 +316,6 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
discoveryAd = NodeConfiguration.KeyStore.VerifyPeerAdvertisment(discoveryHeader);
}
}
- else
- {
- //Not a peer, so verify against the client's public key
- if (!NodeConfiguration.KeyStore.VerifyUpgradeToken(clientSignature, jwtAuth))
- {
- entity.CloseResponse(HttpStatusCode.Unauthorized);
- return VfReturnType.VirtualSkip;
- }
- }
}
try
@@ -389,12 +379,12 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
//Notify peers of new connection
Peers.OnPeerConnected(state);
- //Inc connected count
- Interlocked.Increment(ref _connectedClients);
-
//Register plugin exit token to cancel the connected socket
CancellationTokenRegistration reg = this.GetPlugin().UnloadToken.Register(wss.CancelAll);
+ //Inc connected count
+ Interlocked.Increment(ref _connectedClients);
+
try
{
//Init listener args from request
@@ -442,14 +432,12 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
{
Log.Debug(ex);
}
- finally
- {
- //Dec connected count
- Interlocked.Decrement(ref _connectedClients);
- //Unregister the
- reg.Unregister();
- }
+ //Dec connected count
+ Interlocked.Decrement(ref _connectedClients);
+
+ //Unregister the token
+ reg.Unregister();
//Notify monitor of disconnect
Peers.OnPeerDisconnected(state);
@@ -465,12 +453,13 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
public int MaxMessageSize { get; init; }
public int MaxResponseBufferSize { get; init; }
public string? NodeId { get; init; }
- public ICacheNodeAdvertisment? Advertisment { get; init; }
+ public CacheNodeAdvertisment? Advertisment { get; init; }
public override string ToString()
{
return
- $"{nameof(RecvBufferSize)}:{RecvBufferSize}, {nameof(MaxHeaderBufferSize)}: {MaxHeaderBufferSize}, {nameof(MaxMessageSize)}:{MaxMessageSize}, {nameof(MaxResponseBufferSize)}:{MaxResponseBufferSize}";
+ $"{nameof(RecvBufferSize)}:{RecvBufferSize}, {nameof(MaxHeaderBufferSize)}: {MaxHeaderBufferSize}, " +
+ $"{nameof(MaxMessageSize)}:{MaxMessageSize}, {nameof(MaxResponseBufferSize)}:{MaxResponseBufferSize}";
}
}
}
diff --git a/plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs b/plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs
index 90ffca0..77d59dd 100644
--- a/plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs
+++ b/plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs
@@ -26,7 +26,6 @@ using System;
using System.Net;
using System.Linq;
using System.Text.Json;
-using System.Threading.Tasks;
using VNLib.Hashing;
using VNLib.Hashing.IdentityUtility;
@@ -35,17 +34,25 @@ using VNLib.Plugins.Essentials;
using VNLib.Plugins.Essentials.Endpoints;
using VNLib.Plugins.Essentials.Extensions;
using VNLib.Plugins.Extensions.Loading;
-using VNLib.Data.Caching.Extensions;
using VNLib.Data.Caching.ObjectCache.Server.Distribution;
+using VNLib.Data.Caching.Extensions.Clustering;
namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
{
[ConfigurationName("discovery_endpoint")]
- internal sealed class PeerDiscoveryEndpoint : UnprotectedWebEndpoint
+ internal sealed class PeerDiscoveryEndpoint : ResourceEndpointBase
{
private readonly IPeerMonitor PeerMonitor;
private readonly NodeConfig Config;
+ //Loosen up protection settings
+ ///<inheritdoc/>
+ protected override ProtectionSettings EndpointProtectionSettings { get; } = new()
+ {
+ DisableBrowsersOnly = true,
+ DisableSessionsRequired = true
+ };
+
public PeerDiscoveryEndpoint(PluginBase plugin, IConfigScope config)
{
string? path = config["path"].GetString();
@@ -59,7 +66,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
Config = plugin.GetOrCreateSingleton<NodeConfig>();
}
- protected override async ValueTask<VfReturnType> GetAsync(HttpEntity entity)
+ protected override VfReturnType Get(HttpEntity entity)
{
//Get auth token
string? authToken = entity.Server.Headers[HttpRequestHeader.Authorization];
@@ -71,17 +78,18 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
}
string subject = string.Empty;
+ string challenge = string.Empty;
//Parse auth token
using(JsonWebToken jwt = JsonWebToken.Parse(authToken))
{
//try to verify against cache node first
- if (!Config.KeyStore.VerifyCachePeer(jwt))
+ if (!Config.KeyStore.VerifyJwt(jwt, true))
{
//failed...
//try to verify against client key
- if (!Config.KeyStore.VerifyJwt(jwt))
+ if (!Config.KeyStore.VerifyJwt(jwt, false))
{
//invalid token
entity.CloseResponse(HttpStatusCode.Unauthorized);
@@ -90,12 +98,14 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
}
using JsonDocument payload = jwt.GetPayload();
-
+
+ //Get client info to pass back
subject = payload.RootElement.GetProperty("sub").GetString() ?? string.Empty;
+ challenge = payload.RootElement.GetProperty("chl").GetString() ?? string.Empty;
}
//Valid key, get peer list to send to client
- ICacheNodeAdvertisment[] peers = PeerMonitor.GetAllPeers()
+ CacheNodeAdvertisment[] peers = PeerMonitor.GetAllPeers()
.Where(static p => p.Advertisment != null)
.Select(static p => p.Advertisment!)
.ToArray();
@@ -113,7 +123,8 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
.AddClaim("iat", entity.RequestedTimeUtc.ToUnixTimeSeconds())
//Send all peers as a json array
.AddClaim("peers", peers)
- .AddClaim("nonce", RandomHash.GetRandomBase32(24))
+ //Send the challenge back
+ .AddClaim("chl", challenge)
.CommitClaims();
//Sign the response
diff --git a/plugins/ObjectCacheServer/src/Endpoints/WellKnownEndpoint.cs b/plugins/ObjectCacheServer/src/Endpoints/WellKnownEndpoint.cs
new file mode 100644
index 0000000..99c7f19
--- /dev/null
+++ b/plugins/ObjectCacheServer/src/Endpoints/WellKnownEndpoint.cs
@@ -0,0 +1,111 @@
+/*
+* Copyright (c) 2023 Vaughn Nugent
+*
+* Library: VNLib
+* Package: ObjectCacheServer
+* File: WellKnownEndpoint.cs
+*
+* WellKnownEndpoint.cs is part of ObjectCacheServer which is part of the larger
+* VNLib collection of libraries and utilities.
+*
+* ObjectCacheServer is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* ObjectCacheServer is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+using System.Net;
+using System.Text.Json;
+
+using VNLib.Data.Caching.Extensions;
+using VNLib.Data.Caching.Extensions.Clustering;
+using VNLib.Hashing;
+using VNLib.Hashing.IdentityUtility;
+using VNLib.Plugins;
+using VNLib.Plugins.Essentials;
+using VNLib.Plugins.Essentials.Endpoints;
+using VNLib.Plugins.Essentials.Extensions;
+using VNLib.Plugins.Extensions.Loading;
+
+namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
+{
+
+ /*
+ * The well-known endpoint is used to advertise the node's existence to
+ * the network. Clients need to know the endpoint layout to be able to
+ * connect and discover other nodes.
+ */
+
+ [ConfigurationName("well_known", Required = false)]
+ internal sealed class WellKnownEndpoint : ResourceEndpointBase
+ {
+ //Default path for the well known endpoint
+ const string DefaultPath = "/.well-known/vncache";
+
+ //Store serialized advertisment
+ private readonly CacheNodeAdvertisment _advertisment;
+ private readonly ICacheAuthManager _keyStore;
+
+ //Loosen up security requirements
+ protected override ProtectionSettings EndpointProtectionSettings { get; } = new()
+ {
+ DisableBrowsersOnly = true,
+ DisableSessionsRequired = true,
+ };
+
+ public WellKnownEndpoint(PluginBase plugin):this(plugin, null)
+ { }
+
+ public WellKnownEndpoint(PluginBase plugin, IConfigScope? config)
+ {
+ //Get the node config
+ NodeConfig nodeConfig = plugin.GetOrCreateSingleton<NodeConfig>();
+
+ //serialize the config, discovery may not be enabled
+ _advertisment = nodeConfig.Config.Advertisment;
+ _keyStore = nodeConfig.KeyStore;
+
+ //Default to the well known path
+ string path = DefaultPath;
+
+ //See if the user configured a path
+ if(config != null && config.TryGetValue("path", out JsonElement pathEl))
+ {
+ path = pathEl.GetString() ?? DefaultPath;
+ }
+
+ InitPathAndLog(path, plugin.Log);
+ }
+
+ protected override VfReturnType Get(HttpEntity entity)
+ {
+ string entropy = RandomHash.GetRandomBase32(16);
+
+ //Create jwt signed for the client
+ using JsonWebToken jwt = new();
+
+ jwt.WriteHeader(_keyStore.GetJwtHeader());
+ //Write the advertisment as the message body
+ jwt.InitPayloadClaim()
+ .AddClaim("sub", _advertisment)
+ .AddClaim("chl", entropy)
+ .AddClaim("iat", entity.RequestedTimeUtc.ToUnixTimeSeconds())
+ .CommitClaims();
+
+ _keyStore.SignJwt(jwt);
+
+ //Return the advertisment jwt
+ entity.CloseResponse(HttpStatusCode.OK, Net.Http.ContentType.Text, jwt.DataBuffer);
+ return VfReturnType.VirtualSkip;
+ }
+ }
+}
diff --git a/plugins/ObjectCacheServer/src/ICachePeer.cs b/plugins/ObjectCacheServer/src/ICachePeer.cs
index 97b406f..897655a 100644
--- a/plugins/ObjectCacheServer/src/ICachePeer.cs
+++ b/plugins/ObjectCacheServer/src/ICachePeer.cs
@@ -22,7 +22,7 @@
* along with this program. If not, see https://www.gnu.org/licenses/.
*/
-using VNLib.Data.Caching.Extensions;
+using VNLib.Data.Caching.Extensions.Clustering;
namespace VNLib.Data.Caching.ObjectCache.Server
{
@@ -39,6 +39,6 @@ namespace VNLib.Data.Caching.ObjectCache.Server
/// <summary>
/// An optional signed advertisment message for other peers
/// </summary>
- ICacheNodeAdvertisment? Advertisment { get; }
+ CacheNodeAdvertisment? Advertisment { get; }
}
}
diff --git a/plugins/ObjectCacheServer/src/NodeConfig.cs b/plugins/ObjectCacheServer/src/NodeConfig.cs
index 81b8a32..a6c5be9 100644
--- a/plugins/ObjectCacheServer/src/NodeConfig.cs
+++ b/plugins/ObjectCacheServer/src/NodeConfig.cs
@@ -26,36 +26,43 @@ using System;
using System.Net;
using System.Linq;
using System.Text.Json;
-using System.Threading.Tasks;
using VNLib.Plugins;
-using VNLib.Utils;
using VNLib.Utils.Logging;
-using VNLib.Data.Caching.Extensions;
+using VNLib.Utils.Extensions;
using VNLib.Plugins.Extensions.Loading;
using VNLib.Data.Caching.ObjectCache.Server.Endpoints;
+using VNLib.Data.Caching.Extensions.Clustering;
namespace VNLib.Data.Caching.ObjectCache.Server
{
[ConfigurationName("cluster")]
- internal sealed class NodeConfig : VnDisposeable
+ internal sealed class NodeConfig
{
const string CacheConfigTemplate =
@"
- Cluster Configuration:
- Node Id: {id}
- TlsEndabled: {tls},
- Cache Endpoint: {ep}
+Cluster Configuration:
+ Node Id: {id}
+ TlsEndabled: {tls}
+ Cache Endpoint: {ep}
+ Discovery Endpoint: {dep}
+ Discovery Interval: {di}
+ Max Peers: {mpc}
";
public CacheNodeConfiguration Config { get; }
public CacheAuthKeyStore KeyStore { get; }
+ public TimeSpan DiscoveryInterval { get; }
+
+ /// <summary>
+ /// The maximum number of peer connections to allow
+ /// </summary>
+ public uint MaxPeerConnections { get; } = 10;
+
public NodeConfig(PluginBase plugin, IConfigScope config)
- {
- //Server id is just dns name for now
- string nodeId = Dns.GetHostName();
+ {
Config = new();
@@ -75,9 +82,13 @@ namespace VNLib.Data.Caching.ObjectCache.Server
//If the ssl element is present, ssl is enabled for the server
usingTls = firstHost.TryGetProperty("ssl", out _);
}
+ string hostname = Dns.GetHostName();
+
+ //Server id is just dns name for now
+ string nodeId = $"{hostname}:{port}";
//The endpoint to advertise to cache clients that allows cache connections
- Uri cacheEndpoint = GetEndpointUri<ConnectEndpoint>(plugin, usingTls, port, nodeId);
+ Uri cacheEndpoint = GetEndpointUri<ConnectEndpoint>(plugin, usingTls, port, hostname);
//Init key store
KeyStore = new(plugin);
@@ -89,21 +100,32 @@ namespace VNLib.Data.Caching.ObjectCache.Server
.WithTls(usingTls);
//Check if advertising is enabled
- if(config.TryGetValue("advertise", out JsonElement adEl) && adEl.GetBoolean())
+ if(plugin.HasConfigForType<PeerDiscoveryEndpoint>())
{
//Get the the broadcast endpoint
- Uri discoveryEndpoint = GetEndpointUri<PeerDiscoveryEndpoint>(plugin, usingTls, port, nodeId);
+ Uri discoveryEndpoint = GetEndpointUri<PeerDiscoveryEndpoint>(plugin, usingTls, port, hostname);
//Enable advertising
Config.EnableAdvertisment(discoveryEndpoint);
}
-
-
+
+
+ DiscoveryInterval = config["discovery_interval_sec"].GetTimeSpan(TimeParseType.Seconds);
+
+ //Get the max peer connections
+ if(config.TryGetValue("max_peers", out JsonElement maxPeerEl))
+ {
+ MaxPeerConnections = maxPeerEl.GetUInt32();
+ }
+
//log the config
plugin.Log.Information(CacheConfigTemplate,
nodeId,
usingTls,
- cacheEndpoint
+ cacheEndpoint,
+ Config.DiscoveryEndpoint,
+ DiscoveryInterval,
+ MaxPeerConnections
);
}
@@ -115,12 +137,5 @@ namespace VNLib.Data.Caching.ObjectCache.Server
//The endpoint to advertise to cache clients that allows cache connections
return new UriBuilder(usingTls ? Uri.UriSchemeHttps : Uri.UriSchemeHttp, hostName, port, cacheEpConfig["path"].GetString()).Uri;
}
-
-
- protected override void Free()
- {
- //cleanup keys
-
- }
}
}
diff --git a/plugins/ObjectCacheServer/src/ObjectCacheServer.csproj b/plugins/ObjectCacheServer/src/ObjectCacheServer.csproj
index e8d6291..5273b64 100644
--- a/plugins/ObjectCacheServer/src/ObjectCacheServer.csproj
+++ b/plugins/ObjectCacheServer/src/ObjectCacheServer.csproj
@@ -41,4 +41,9 @@
<ProjectReference Include="..\..\..\lib\VNLib.Data.Caching.Extensions\src\VNLib.Data.Caching.Extensions.csproj" />
<ProjectReference Include="..\..\..\lib\VNLib.Data.Caching.ObjectCache\src\VNLib.Data.Caching.ObjectCache.csproj" />
</ItemGroup>
+
+ <Target Condition="'$(BuildingInsideVisualStudio)' == true" Name="PostBuild" AfterTargets="PostBuildEvent">
+ <Exec Command="start xcopy &quot;$(TargetDir)&quot; &quot;F:\downloads\cache-test\plugins\$(TargetName)&quot; /E /Y /R" />
+ </Target>
+
</Project>
diff --git a/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs b/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs
index 5d9a50b..1ddf49b 100644
--- a/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs
+++ b/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs
@@ -33,7 +33,7 @@ using VNLib.Utils.Memory.Diagnostics;
using VNLib.Plugins.Extensions.Loading;
using VNLib.Plugins.Extensions.Loading.Routing;
using VNLib.Data.Caching.ObjectCache.Server.Endpoints;
-
+using VNLib.Data.Caching.ObjectCache.Server.Distribution;
namespace VNLib.Data.Caching.ObjectCache.Server
{
@@ -76,9 +76,15 @@ namespace VNLib.Data.Caching.ObjectCache.Server
{
try
{
+ //Route well-known endpoint
+ this.Route<WellKnownEndpoint>();
+
//Init connect endpoint
this.Route<ConnectEndpoint>();
+ //We must initialize the replication manager
+ _ = this.GetOrCreateSingleton<CacheNodeReplicationMaanger>();
+
//Setup discovery endpoint
if(this.HasConfigForType<PeerDiscoveryEndpoint>())
{