diff options
8 files changed, 525 insertions, 266 deletions
diff --git a/VNLib.Data.Caching.Extensions/BrokerRegistrationRequest.cs b/VNLib.Data.Caching.Extensions/BrokerRegistrationRequest.cs new file mode 100644 index 0000000..e874520 --- /dev/null +++ b/VNLib.Data.Caching.Extensions/BrokerRegistrationRequest.cs @@ -0,0 +1,102 @@ +/* +* Copyright (c) 2022 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Data.Caching.Extensions +* File: BrokerRegistrationRequest.cs +* +* BrokerRegistrationRequest.cs is part of VNLib.Data.Caching.Extensions which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Data.Caching.Extensions is free software: you can redistribute it and/or modify +* it under the terms of the GNU General Public License as published +* by the Free Software Foundation, either version 2 of the License, +* or (at your option) any later version. +* +* VNLib.Data.Caching.Extensions is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +* General Public License for more details. +* +* You should have received a copy of the GNU General Public License +* along with VNLib.Data.Caching.Extensions. If not, see http://www.gnu.org/licenses/. +*/ + +using System.Security.Cryptography; + +using VNLib.Utils; + + +namespace VNLib.Data.Caching.Extensions +{ + /// <summary> + /// A broker registration request message in a fluent api + /// format. This message may be disposed when no longer in use + /// </summary> + public sealed class BrokerRegistrationRequest : VnDisposeable + { + /// <summary> + /// The cache server node id + /// </summary> + public string? NodeId { get; private set; } + /// <summary> + /// The broker server's address + /// </summary> + public Uri? BrokerAddress { get; private set; } + /// <summary> + /// The security token used by the broker server to + /// authenticate during heartbeat connections + /// </summary> + public string? HeartbeatToken { get; private set; } + /// <summary> + /// The address for remote clients to use to + /// connect to this server + /// </summary> + public string? RegistrationAddress { get; private set; } + /// <summary> + /// The token signature algorithm + /// </summary> + public ECDsa SiginingAlg { get; } + + public BrokerRegistrationRequest() + { + SiginingAlg = ECDsa.Create(FBMDataCacheExtensions.CacheCurve); + } + + public BrokerRegistrationRequest WithPrivateKey(ReadOnlySpan<byte> privateKey) + { + SiginingAlg.ImportPkcs8PrivateKey(privateKey, out _); + return this; + } + + public BrokerRegistrationRequest WithBroker(Uri brokerUri) + { + BrokerAddress = brokerUri; + return this; + } + + public BrokerRegistrationRequest WithRegistrationAddress(string address) + { + RegistrationAddress = address; + return this; + } + + public BrokerRegistrationRequest WithHeartbeatToken(string token) + { + HeartbeatToken = token; + return this; + } + + public BrokerRegistrationRequest WithNodeId(string nodeId) + { + NodeId = nodeId; + return this; + } + + ///<inheritdoc/> + protected override void Free() + { + SiginingAlg.Dispose(); + } + } +} diff --git a/VNLib.Data.Caching.Extensions/ClientCacheConfiguration.cs b/VNLib.Data.Caching.Extensions/ClientCacheConfiguration.cs new file mode 100644 index 0000000..96f54a7 --- /dev/null +++ b/VNLib.Data.Caching.Extensions/ClientCacheConfiguration.cs @@ -0,0 +1,122 @@ +/* +* Copyright (c) 2022 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Data.Caching.Extensions +* File: ClientCacheConfiguration.cs +* +* ClientCacheConfiguration.cs is part of VNLib.Data.Caching.Extensions which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Data.Caching.Extensions is free software: you can redistribute it and/or modify +* it under the terms of the GNU General Public License as published +* by the Free Software Foundation, either version 2 of the License, +* or (at your option) any later version. +* +* VNLib.Data.Caching.Extensions is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +* General Public License for more details. +* +* You should have received a copy of the GNU General Public License +* along with VNLib.Data.Caching.Extensions. If not, see http://www.gnu.org/licenses/. +*/ + +using System.Security.Cryptography; + +using VNLib.Hashing; +using VNLib.Net.Messaging.FBM.Client; + +namespace VNLib.Data.Caching.Extensions +{ + /// <summary> + /// A fluent api configuration object for configuring a <see cref="FBMClient"/> + /// to connect to cache servers. + /// </summary> + public sealed class ClientCacheConfiguration + { + internal ECDsa SigningKey { get; init; } + internal ECDsa VerificationKey { get; init; } + internal string ServerChallenge { get; init; } + internal string? NodeId { get; set; } + internal Uri? BrokerAddress { get; set; } + internal bool UseTls { get; set; } + internal ActiveServer[]? CacheServers { get; set; } + + public ClientCacheConfiguration() + { + //Init the algorithms + SigningKey = ECDsa.Create(FBMDataCacheExtensions.CacheCurve); + VerificationKey = ECDsa.Create(FBMDataCacheExtensions.CacheCurve); + ServerChallenge = RandomHash.GetRandomBase32(24); + } + + /// <summary> + /// Imports the private key used to sign messages + /// </summary> + /// <param name="pkcs8PrivKey">The pkcs8 encoded private key to sign messages</param> + /// <returns>Chainable fluent object</returns> + /// <exception cref="ArgumentException"></exception> + /// <exception cref="CryptographicException"></exception> + public ClientCacheConfiguration ImportSigningKey(ReadOnlySpan<byte> pkcs8PrivKey) + { + SigningKey.ImportPkcs8PrivateKey(pkcs8PrivKey, out _); + return this; + } + + /// <summary> + /// Imports the public key used to verify messages from the remote server + /// </summary> + /// <param name="spkiPublicKey">The subject-public-key-info formatted cache public key</param> + /// <returns>Chainable fluent object</returns> + /// <exception cref="ArgumentException"></exception> + /// <exception cref="CryptographicException"></exception> + public ClientCacheConfiguration ImportVerificationKey(ReadOnlySpan<byte> spkiPublicKey) + { + VerificationKey.ImportSubjectPublicKeyInfo(spkiPublicKey, out _); + return this; + } + + /// <summary> + /// Specifies if all connections should be using TLS + /// </summary> + /// <param name="useTls">A value that indicates if connections should use TLS</param> + /// <returns>Chainable fluent object</returns> + public ClientCacheConfiguration WithTls(bool useTls) + { + UseTls = useTls; + return this; + } + /// <summary> + /// Specifies the broker address to discover cache nodes from + /// </summary> + /// <param name="brokerAddress">The address of the server broker</param> + /// <returns>Chainable fluent object</returns> + /// <exception cref="ArgumentNullException"></exception> + public ClientCacheConfiguration WithBroker(Uri brokerAddress) + { + this.BrokerAddress = brokerAddress ?? throw new ArgumentNullException(nameof(brokerAddress)); + return this; + } + + /// <summary> + /// Specifies the current server's cluster node id. If this + /// is a server connection attempting to listen for changes on the + /// remote server, this id must be set and unique + /// </summary> + /// <param name="nodeId">The cluster node id of the current server</param> + /// <returns>Chainable fluent object</returns> + /// <exception cref="ArgumentNullException"></exception> + public ClientCacheConfiguration WithNodeId(string nodeId) + { + this.NodeId = nodeId ?? throw new ArgumentNullException(nameof(nodeId)); + return this; + } + + ~ClientCacheConfiguration() + { + SigningKey.Clear(); + VerificationKey.Clear(); + } + } +} diff --git a/VNLib.Data.Caching.Extensions/FBMDataCacheExtensions.cs b/VNLib.Data.Caching.Extensions/FBMDataCacheExtensions.cs index ae088fd..3cf7832 100644 --- a/VNLib.Data.Caching.Extensions/FBMDataCacheExtensions.cs +++ b/VNLib.Data.Caching.Extensions/FBMDataCacheExtensions.cs @@ -31,19 +31,19 @@ using System.Security.Cryptography; using System.Runtime.CompilerServices; using RestSharp; - using VNLib.Utils.Memory; using VNLib.Utils.Logging; using VNLib.Utils.Extensions; -using VNLib.Hashing; using VNLib.Hashing.IdentityUtility; using VNLib.Net.Http; using VNLib.Net.Rest.Client; using VNLib.Net.Messaging.FBM.Client; using VNLib.Net.Messaging.FBM; + namespace VNLib.Data.Caching.Extensions { + /// <summary> /// Provides extension methods for FBM data caching using /// cache servers and brokers @@ -63,7 +63,13 @@ namespace VNLib.Data.Caching.Extensions { { "alg", "ES384" }, //Must match alg name { "typ", "JWT"} - }; + }; + + + /// <summary> + /// The raw JWT message header + /// </summary> + public static ReadOnlyMemory<byte> JwtMessageHeader { get; } = JsonSerializer.SerializeToUtf8Bytes(BrokerJwtHeader); private static readonly RestClientPool ClientPool = new(2,new RestClientOptions() { @@ -113,32 +119,7 @@ namespace VNLib.Data.Caching.Extensions DebugLog = debugLog }; } - - - private class CacheConnectionConfig - { - public ECDsa ClientAlg { get; init; } - public ECDsa BrokerAlg { get; init; } - public string ServerChallenge { get; init; } - public string? NodeId { get; set; } - public Uri? BrokerAddress { get; set; } - public bool useTls { get; set; } - public ActiveServer[]? BrokerServers { get; set; } - - public CacheConnectionConfig() - { - //Init the algorithms - ClientAlg = ECDsa.Create(CacheCurve); - BrokerAlg = ECDsa.Create(CacheCurve); - ServerChallenge = RandomHash.GetRandomBase32(24); - } - - ~CacheConnectionConfig() - { - ClientAlg.Clear(); - BrokerAlg.Clear(); - } - } + /// <summary> /// Contacts the cache broker to get a list of active servers to connect to @@ -167,28 +148,28 @@ namespace VNLib.Data.Caching.Extensions /// Contacts the cache broker to get a list of active servers to connect to /// </summary> /// <param name="brokerAddress">The broker server to connec to</param> - /// <param name="clientAlg">The signature algorithm used to sign messages to the broker</param> - /// <param name="brokerAlg">The signature used to verify broker messages</param> + /// <param name="signingAlg">The signature algorithm used to sign messages to the broker</param> + /// <param name="verificationAlg">The signature used to verify broker messages</param> /// <param name="cancellationToken">A token to cancel the operationS</param> /// <returns>The list of active servers</returns> /// <exception cref="SecurityException"></exception> /// <exception cref="ArgumentNullException"></exception> - public static async Task<ActiveServer[]?> ListServersAsync(Uri brokerAddress, ECDsa clientAlg, ECDsa brokerAlg, CancellationToken cancellationToken = default) + public static async Task<ActiveServer[]?> ListServersAsync(Uri brokerAddress, ECDsa signingAlg, ECDsa verificationAlg, CancellationToken cancellationToken = default) { _ = brokerAddress ?? throw new ArgumentNullException(nameof(brokerAddress)); - _ = clientAlg ?? throw new ArgumentNullException(nameof(clientAlg)); - _ = brokerAlg ?? throw new ArgumentNullException(nameof(brokerAlg)); + _ = signingAlg ?? throw new ArgumentNullException(nameof(signingAlg)); + _ = verificationAlg ?? throw new ArgumentNullException(nameof(verificationAlg)); string jwtBody; //Build request jwt using (JsonWebToken requestJwt = new()) { - requestJwt.WriteHeader(BrokerJwtHeader); + requestJwt.WriteHeader(JwtMessageHeader.Span); requestJwt.InitPayloadClaim() .AddClaim("iat", DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()) .CommitClaims(); //sign the jwt - requestJwt.Sign(clientAlg, in CacheJwtAlgorithm, 512); + requestJwt.Sign(signingAlg, in CacheJwtAlgorithm, 512); //Compile the jwt jwtBody = requestJwt.Compile(); } @@ -208,7 +189,7 @@ namespace VNLib.Data.Caching.Extensions //Response is jwt using JsonWebToken responseJwt = JsonWebToken.ParseRaw(response.RawBytes); //Verify the jwt - if (!responseJwt.Verify(brokerAlg, in CacheJwtAlgorithm)) + if (!responseJwt.Verify(verificationAlg, in CacheJwtAlgorithm)) { throw new SecurityException("Failed to verify the broker's challenge, cannot continue"); } @@ -216,116 +197,44 @@ namespace VNLib.Data.Caching.Extensions return doc.RootElement.GetProperty("servers").Deserialize<ActiveServer[]>(); } - /// <summary> - /// Configures a connection to the remote cache server at the specified location - /// with proper authentication. - /// </summary> - /// <param name="client"></param> - /// <param name="serverUri">The server's address</param> - /// <param name="signingKey">The pks8 format EC private key uesd to sign the message</param> - /// <param name="challenge">A challenge to send to the server</param> - /// <param name="nodeId">A token used to identify the current server's event queue on the remote server</param> - /// <param name="token">A token to cancel the connection operation</param> - /// <param name="useTls">Enables the secure websocket protocol</param> - /// <returns>A Task that completes when the connection has been established</returns> - /// <exception cref="ArgumentNullException"></exception> - public static Task ConnectAsync(this FBMClient client, string serverUri, ReadOnlyMemory<byte> signingKey, string challenge, string? nodeId, bool useTls, CancellationToken token = default) - { - //Sign the jwt - using ECDsa sigAlg = ECDsa.Create(CacheCurve); - //Import the signing key - sigAlg.ImportPkcs8PrivateKey(signingKey.Span, out _); - //Return without await because the alg is used to sign before this method returns and can be discarded - return ConnectAsync(client, serverUri, sigAlg, challenge, nodeId, useTls, token); - } - - private static Task ConnectAsync(FBMClient client, string serverUri, ECDsa sigAlg, string challenge, string? nodeId, bool useTls, CancellationToken token = default) - { - _ = serverUri ?? throw new ArgumentNullException(nameof(serverUri)); - _ = challenge ?? throw new ArgumentNullException(nameof(challenge)); - //build ws uri - UriBuilder uriBuilder = new(serverUri) - { - Scheme = useTls ? "wss://" : "ws://" - }; - string jwtMessage; - //Init jwt for connecting to server - using (JsonWebToken jwt = new()) - { - jwt.WriteHeader(BrokerJwtHeader); - //Init claim - JwtPayload claim = jwt.InitPayloadClaim(); - claim.AddClaim("challenge", challenge); - if (!string.IsNullOrWhiteSpace(nodeId)) - { - /* - * The unique node id so the other nodes know to load the - * proper event queue for the current server - */ - claim.AddClaim("server_id", nodeId); - } - claim.CommitClaims(); - - //Sign jwt - jwt.Sign(sigAlg, in CacheJwtAlgorithm, 512); - - //Compile to string - jwtMessage = jwt.Compile(); - } - //Set jwt as authorization header - client.ClientSocket.Headers[HttpRequestHeader.Authorization] = jwtMessage; - //Connect async - return client.ConnectAsync(uriBuilder.Uri, token); - } /// <summary> /// Registers the current server as active with the specified broker /// </summary> - /// <param name="brokerAddress">The address of the broker to register with</param> - /// <param name="signingKey">The private key used to sign the message</param> - /// <param name="serverAddress">The local address of the current server used for discovery</param> - /// <param name="nodeId">The unique id to identify this server (for event queues)</param> - /// <param name="keepAliveToken">A unique security token used by the broker to authenticate itself</param> - /// <returns>A task that resolves when a successful registration is completed, raises exceptions otherwise</returns> - public static async Task ResgisterWithBrokerAsync(Uri brokerAddress, ReadOnlyMemory<byte> signingKey, string serverAddress, string nodeId, string keepAliveToken) + /// <param name="registration">The registration request</param> + public static async Task ResgisterWithBrokerAsync(BrokerRegistrationRequest registration) { - _ = brokerAddress ?? throw new ArgumentNullException(nameof(brokerAddress)); - _ = serverAddress ?? throw new ArgumentNullException(nameof(serverAddress)); - _ = keepAliveToken ?? throw new ArgumentNullException(nameof(keepAliveToken)); - _ = nodeId ?? throw new ArgumentNullException(nameof(nodeId)); + _ = registration.HeartbeatToken ?? throw new ArgumentException("Missing required heartbeat access token"); + _ = registration.NodeId ?? throw new ArgumentException("Missing required cache server NodeId"); + _ = registration.BrokerAddress ?? throw new ArgumentException("Broker server address has not been configured"); + _ = registration.RegistrationAddress ?? throw new ArgumentException("Missing required registration address", nameof(registration)); string requestData; //Create the jwt for signed registration message using (JsonWebToken jwt = new()) { //Shared jwt header - jwt.WriteHeader(BrokerJwtHeader); + jwt.WriteHeader(JwtMessageHeader.Span); //build jwt claim jwt.InitPayloadClaim() - .AddClaim("address", serverAddress) - .AddClaim("server_id", nodeId) - .AddClaim("token", keepAliveToken) + .AddClaim("address", registration.RegistrationAddress) + .AddClaim("sub", registration.NodeId) + .AddClaim("token", registration.HeartbeatToken) .CommitClaims(); //Sign the jwt - using (ECDsa sigAlg = ECDsa.Create(CacheCurve)) - { - //Import the signing key - sigAlg.ImportPkcs8PrivateKey(signingKey.Span, out _); - - jwt.Sign(sigAlg, in CacheJwtAlgorithm, 512); - } + jwt.Sign(registration.SiginingAlg, in CacheJwtAlgorithm, 512); //Compile and save requestData = jwt.Compile(); } //Create reg request message - RestRequest regRequest = new(brokerAddress); + RestRequest regRequest = new(registration.BrokerAddress); regRequest.AddStringBody(requestData, DataFormat.None); regRequest.AddHeader("Content-Type", "text/plain"); //Rent client - using ClientContract client = ClientPool.Lease(); + using ClientContract cc = ClientPool.Lease(); //Exec the regitration request - RestResponse response = await client.Resource.ExecutePutAsync(regRequest); + RestResponse response = await cc.Resource.ExecutePutAsync(regRequest); if(!response.IsSuccessful) { throw response.ErrorException!; @@ -333,77 +242,15 @@ namespace VNLib.Data.Caching.Extensions } - private static readonly ConditionalWeakTable<FBMClient, CacheConnectionConfig> ClientCacheConfig = new(); - - /// <summary> - /// Imports the client signature algorithim's private key from its pkcs8 binary representation - /// </summary> - /// <param name="client"></param> - /// <param name="pkcs8PrivateKey">Pkcs8 format private key</param> - /// <returns>Chainable fluent object</returns> - /// <exception cref="ArgumentException"></exception> - /// <exception cref="CryptographicException"></exception> - public static FBMClient ImportClientPrivateKey(this FBMClient client, ReadOnlySpan<byte> pkcs8PrivateKey) - { - CacheConnectionConfig conf = ClientCacheConfig.GetOrCreateValue(client); - conf.ClientAlg.ImportPkcs8PrivateKey(pkcs8PrivateKey, out _); - return client; - } - /// <summary> - /// Imports the public key used to verify broker server messages - /// </summary> - /// <param name="client"></param> - /// <param name="spkiPublicKey">The subject-public-key-info formatted broker public key</param> - /// <returns>Chainable fluent object</returns> - /// <exception cref="ArgumentException"></exception> - /// <exception cref="CryptographicException"></exception> - public static FBMClient ImportBrokerPublicKey(this FBMClient client, ReadOnlySpan<byte> spkiPublicKey) - { - CacheConnectionConfig conf = ClientCacheConfig.GetOrCreateValue(client); - conf.BrokerAlg.ImportSubjectPublicKeyInfo(spkiPublicKey, out _); - return client; - } - /// <summary> - /// Specifies if all connections should be using TLS - /// </summary> - /// <param name="client"></param> - /// <param name="useTls">A value that indicates if connections should use TLS</param> - /// <returns>Chainable fluent object</returns> - public static FBMClient UseTls(this FBMClient client, bool useTls) - { - CacheConnectionConfig conf = ClientCacheConfig.GetOrCreateValue(client); - conf.useTls = useTls; - return client; - } - /// <summary> - /// Specifies the broker address to discover cache nodes from - /// </summary> - /// <param name="client"></param> - /// <param name="brokerAddress">The address of the server broker</param> - /// <returns>Chainable fluent object</returns> - /// <exception cref="ArgumentNullException"></exception> - public static FBMClient UseBroker(this FBMClient client, Uri brokerAddress) - { - CacheConnectionConfig conf = ClientCacheConfig.GetOrCreateValue(client); - conf.BrokerAddress = brokerAddress ?? throw new ArgumentNullException(nameof(brokerAddress)); - return client; - } + private static readonly ConditionalWeakTable<FBMClient, ClientCacheConfiguration> ClientCacheConfig = new(); /// <summary> - /// Specifies the current server's cluster node id. If this - /// is a server connection attempting to listen for changes on the - /// remote server, this id must be set and unique + /// Allows for configuration of an <see cref="FBMClient"/> + /// for a connection to a cache server /// </summary> /// <param name="client"></param> - /// <param name="nodeId">The cluster node id of the current server</param> - /// <returns>Chainable fluent object</returns> - /// <exception cref="ArgumentNullException"></exception> - public static FBMClient SetNodeId(this FBMClient client, string nodeId) - { - CacheConnectionConfig conf = ClientCacheConfig.GetOrCreateValue(client); - conf.NodeId = nodeId ?? throw new ArgumentNullException(nameof(nodeId)); - return client; - } + /// <returns>A fluent api configuration builder for the current client</returns> + public static ClientCacheConfiguration GetCacheConfiguration(this FBMClient client) => ClientCacheConfig.GetOrCreateValue(client); /// <summary> /// Discovers cache nodes in the broker configured for the current client. @@ -411,57 +258,41 @@ namespace VNLib.Data.Caching.Extensions /// <param name="client"></param> /// <param name="token">A token to cancel the discovery</param> /// <returns>A task the resolves the list of active servers on the broker server</returns> - public static Task<ActiveServer[]?> DiscoverNodesAsync(this FBMClientWorkerBase client, CancellationToken token = default) - { - return client.Client.DiscoverNodesAsync(token); - } + public static Task<ActiveServer[]?> DiscoverCacheNodesAsync(this FBMClientWorkerBase client, CancellationToken token = default) => client.Client.DiscoverCacheNodesAsync(token); /// <summary> /// Discovers cache nodes in the broker configured for the current client. /// </summary> /// <param name="client"></param> /// <param name="token">A token to cancel the discovery </param> /// <returns>A task the resolves the list of active servers on the broker server</returns> - public static async Task<ActiveServer[]?> DiscoverNodesAsync(this FBMClient client, CancellationToken token = default) + public static async Task<ActiveServer[]?> DiscoverCacheNodesAsync(this FBMClient client, CancellationToken token = default) { - CacheConnectionConfig conf = ClientCacheConfig.GetOrCreateValue(client); + ClientCacheConfiguration conf = ClientCacheConfig.GetOrCreateValue(client); //List servers async - ActiveServer[]? servers = await ListServersAsync(conf.BrokerAddress!, conf.ClientAlg, conf.BrokerAlg, token); - conf.BrokerServers = servers; + ActiveServer[]? servers = await ListServersAsync(conf.BrokerAddress!, conf.SigningKey, conf.VerificationKey, token); + conf.CacheServers = servers; return servers; } /// <summary> - /// Connects the client to a remote cache server + /// Waits for the client to disconnect from the server while observing + /// the cancellation token. If the token is cancelled, the connection is + /// closed cleanly if possible /// </summary> /// <param name="client"></param> - /// <param name="server">The server to connect to</param> - /// <param name="token">A token to cancel the connection and/or wait operation</param> - /// <returns>A task that resolves when cancelled or when the connection is lost to the server</returns> - /// <exception cref="OperationCanceledException"></exception> - public static Task ConnectAndWaitForExitAsync(this FBMClientWorkerBase client, ActiveServer server, CancellationToken token = default) + /// <param name="token">A token to cancel the connection to the server</param> + /// <returns>A task that complets when the connecion has been closed successfully</returns> + public static async Task WaitForExitAsync(this FBMClient client, CancellationToken token = default) { - return client.Client.ConnectAndWaitForExitAsync(server, token); - } - - /// <summary> - /// Connects the client to a remote cache server - /// </summary> - /// <param name="client"></param> - /// <param name="server">The server to connect to</param> - /// <param name="token">A token to cancel the connection and/or wait operation</param> - /// <returns>A task that resolves when cancelled or when the connection is lost to the server</returns> - /// <exception cref="OperationCanceledException"></exception> - public static async Task ConnectAndWaitForExitAsync(this FBMClient client, ActiveServer server, CancellationToken token = default) - { - CacheConnectionConfig conf = ClientCacheConfig.GetOrCreateValue(client); - //Connect to server (no server id because client not replication server) - await ConnectAsync(client, server.HostName!, conf.ClientAlg, conf.ServerChallenge, conf.NodeId, conf.useTls, token); + client.LogDebug("Waiting for cache client to exit"); //Get task for cancellation Task cancellation = token.WaitHandle.WaitAsync(); //Task for status handle Task run = client.ConnectionStatusHandle.WaitAsync(); //Wait for cancellation or _ = await Task.WhenAny(cancellation, run); + + client.LogDebug("Disconnecting the cache client"); //Normal try to disconnect the socket await client.DisconnectAsync(CancellationToken.None); //Notify if cancelled @@ -469,6 +300,185 @@ namespace VNLib.Data.Caching.Extensions } /// <summary> + /// Connects to a random server from the servers discovered during a cache server discovery + /// </summary> + /// <param name="client"></param> + /// <param name="cancellation">A token to cancel the operation</param> + /// <returns>The server that the connection was made with</returns> + /// <exception cref="ArgumentException"></exception> + public static async Task<ActiveServer> ConnectToRandomCacheAsync(this FBMClient client, CancellationToken cancellation = default) + { + //Get stored config + ClientCacheConfiguration conf = ClientCacheConfig.GetOrCreateValue(client); + //Select random + ActiveServer? randomServer = conf.CacheServers?.SelectRandom(); + _ = randomServer ?? throw new ArgumentException("No servers detected, cannot connect"); + await ConnectToCacheAsync(client, randomServer, cancellation); + return randomServer; + } + + /// <summary> + /// Connects to the specified server on the configured cache client + /// </summary> + /// <param name="client"></param> + /// <param name="server">The server to connect to</param> + /// <param name="token">A token to cancel the operation</param> + /// <returns>A task that resolves when the client is connected to the cache server</returns> + /// <exception cref="FBMException"></exception> + /// <exception cref="FBMServerNegiationException"></exception> + /// <exception cref="ArgumentException"></exception> + /// <exception cref="ArgumentNullException"></exception> + /// <exception cref="SecurityException"></exception> + /// <exception cref="ObjectDisposedException"></exception> + public static Task ConnectToCacheAsync(this FBMClient client, ActiveServer server, CancellationToken token = default) + { + //Get stored config + ClientCacheConfiguration conf = ClientCacheConfig.GetOrCreateValue(client); + //Connect to server (no server id because client not replication server) + return ConnectToCacheAsync(client, conf, server, token); + } + + private static void LogDebug(this FBMClient client, string message) + { + client.Config.DebugLog.Debug("{debug}: {data}","[CACHE]", message); + } + + private static async Task ConnectToCacheAsync(FBMClient client, ClientCacheConfiguration request, ActiveServer server, CancellationToken token = default) + { + //Construct server uri + Uri serverUri = new(server.HostName!); + + //build ws uri + UriBuilder uriBuilder = new(serverUri) + { + Scheme = request.UseTls ? "wss://" : "ws://" + }; + + string jwtMessage; + //Init jwt for connecting to server + using (JsonWebToken jwt = new()) + { + jwt.WriteHeader(JwtMessageHeader.Span); + //Init claim + JwtPayload claim = jwt.InitPayloadClaim(); + claim.AddClaim("chl", request.ServerChallenge); + if (!string.IsNullOrWhiteSpace(request.NodeId)) + { + /* + * The unique node id so the other nodes know to load the + * proper event queue for the current server + */ + claim.AddClaim("sub", request.NodeId); + } + claim.CommitClaims(); + + //Sign jwt + jwt.Sign(request.SigningKey, in CacheJwtAlgorithm, 512); + + //Compile to string + jwtMessage = jwt.Compile(); + } + + RestRequest negotation = new(serverUri, Method.Get); + //Set the jwt auth header for negotiation + negotation.AddHeader("Authorization", jwtMessage); + + client.LogDebug("Negotiating with cache server"); + + //rent client + using (ClientContract clientContract = ClientPool.Lease()) + { + //Execute the request + RestResponse response = await clientContract.Resource.ExecuteAsync(negotation, token); + //Check verify the response + + if (!response.IsSuccessful) + { + throw response.ErrorException!; + } + + if (response.Content == null) + { + throw new FBMServerNegiationException("Failed to negotiate with the server, no response"); + } + + //Raw content + string authToken = response.Content; + + //Parse the jwt + using JsonWebToken jwt = JsonWebToken.Parse(authToken); + + //Verify the jwt + if (!jwt.Verify(request.VerificationKey, in CacheJwtAlgorithm)) + { + throw new SecurityException("Failed to verify the broker's negotiation message, cannot continue"); + } + + //Confirm the server's buffer configuration + ValidateServerNegotation(client, request.ServerChallenge, jwt); + + client.LogDebug("Server negotiation validated, connecting to server"); + + //The client authorization header is the exact response + client.ClientSocket.Headers[HttpRequestHeader.Authorization] = authToken; + } + + //Connect async + await client.ConnectAsync(uriBuilder.Uri, token); + } + + private static void ValidateServerNegotation(FBMClient client, string challenge, JsonWebToken jwt) + { + try + { + //Get the response message to verify the challenge, and client arguments + using JsonDocument doc = jwt.GetPayload(); + + IReadOnlyDictionary<string, JsonElement> args = doc.RootElement + .EnumerateObject() + .ToDictionary(static k => k.Name, static v => v.Value); + + //get the challenge response + string challengeResponse = args["chl"].GetString()!; + + //Check the challenge response + if (challenge.Equals(challengeResponse, StringComparison.Ordinal)) + { + throw new FBMServerNegiationException("Failed to negotiate with the server, challenge response does not match"); + } + + //Get the negiation values + uint recvBufSize = args[FBMClient.REQ_RECV_BUF_QUERY_ARG].GetUInt32(); + uint headerBufSize= args[FBMClient.REQ_HEAD_BUF_QUERY_ARG].GetUInt32(); + uint maxMessSize= args[FBMClient.REQ_MAX_MESS_QUERY_ARG].GetUInt32(); + + //Verify the values + if (client.Config.RecvBufferSize > recvBufSize) + { + throw new FBMServerNegiationException("Failed to negotiate with the server, the server's recv buffer size is too small"); + } + + if (client.Config.MaxHeaderBufferSize > headerBufSize) + { + throw new FBMServerNegiationException("Failed to negotiate with the server, the server's header buffer size is too small"); + } + + if (client.Config.MaxMessageSize > maxMessSize) + { + throw new FBMServerNegiationException("Failed to negotiate with the server, the server's max message size is too small"); + } + } + catch (FBMException) + { + throw; + } + catch (Exception ex) + { + throw new FBMServerNegiationException("Negotiation with the server failed", ex); + } + } + + /// <summary> /// Selects a random server from a collection of active servers /// </summary> /// <param name="servers"></param> diff --git a/VNLib.Data.Caching.Extensions/FBMServerNegiationException.cs b/VNLib.Data.Caching.Extensions/FBMServerNegiationException.cs new file mode 100644 index 0000000..c7e7727 --- /dev/null +++ b/VNLib.Data.Caching.Extensions/FBMServerNegiationException.cs @@ -0,0 +1,43 @@ +/* +* Copyright (c) 2022 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Data.Caching.Extensions +* File: FBMServerNegiationException.cs +* +* FBMServerNegiationException.cs is part of VNLib.Data.Caching.Extensions which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Data.Caching.Extensions is free software: you can redistribute it and/or modify +* it under the terms of the GNU General Public License as published +* by the Free Software Foundation, either version 2 of the License, +* or (at your option) any later version. +* +* VNLib.Data.Caching.Extensions is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +* General Public License for more details. +* +* You should have received a copy of the GNU General Public License +* along with VNLib.Data.Caching.Extensions. If not, see http://www.gnu.org/licenses/. +*/ + +using VNLib.Net.Messaging.FBM; + + +namespace VNLib.Data.Caching.Extensions +{ + /// <summary> + /// Represents an exception that is raised because of a client-server + /// negotiation failure. + /// </summary> + public class FBMServerNegiationException : FBMException + { + public FBMServerNegiationException() + {} + public FBMServerNegiationException(string message) : base(message) + {} + public FBMServerNegiationException(string message, Exception innerException) : base(message, innerException) + {} + } +} diff --git a/VNLib.Data.Caching.Extensions/VNLib.Data.Caching.Extensions.csproj b/VNLib.Data.Caching.Extensions/VNLib.Data.Caching.Extensions.csproj index fae5e3b..3f15b6e 100644 --- a/VNLib.Data.Caching.Extensions/VNLib.Data.Caching.Extensions.csproj +++ b/VNLib.Data.Caching.Extensions/VNLib.Data.Caching.Extensions.csproj @@ -4,31 +4,16 @@ <TargetFramework>net6.0</TargetFramework> <ImplicitUsings>enable</ImplicitUsings> <Nullable>enable</Nullable> - <PlatformTarget>x64</PlatformTarget> <GenerateDocumentationFile>True</GenerateDocumentationFile> - <Version>1.0.0.1</Version> + <Version>1.0.1.1</Version> <Authors>Vaughn Nugent</Authors> <Copyright>Copyright © 2022 Vaughn Nugent</Copyright> - <PackageProjectUrl>www.vaughnnugent.com/resources</PackageProjectUrl> - <Platforms>AnyCPU;x64</Platforms> + <PackageProjectUrl>https://www.vaughnnugent.com/resources</PackageProjectUrl> </PropertyGroup> - <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'"> - <CheckForOverflowUnderflow>True</CheckForOverflowUnderflow> - </PropertyGroup> - - <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|x64'"> - <CheckForOverflowUnderflow>True</CheckForOverflowUnderflow> - </PropertyGroup> - - <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|AnyCPU'"> - <CheckForOverflowUnderflow>True</CheckForOverflowUnderflow> - <Deterministic>False</Deterministic> - </PropertyGroup> - - <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|x64'"> - <CheckForOverflowUnderflow>True</CheckForOverflowUnderflow> - <Deterministic>False</Deterministic> + <PropertyGroup> + <CopyLocalLockFileAssemblies>true</CopyLocalLockFileAssemblies> + <AnalysisLevel>latest-all</AnalysisLevel> </PropertyGroup> <ItemGroup> @@ -44,7 +29,7 @@ </ItemGroup> <ItemGroup> - <ProjectReference Include="..\..\..\VNLib\Hashing\VNLib.Hashing.Portable.csproj" /> + <ProjectReference Include="..\..\..\VNLib\Hashing\src\VNLib.Hashing.Portable.csproj" /> <ProjectReference Include="..\..\..\VNLib\Http\VNLib.Net.Http.csproj" /> <ProjectReference Include="..\..\..\VNLib\VNLib.Net.Messaging.FBM\src\VNLib.Net.Messaging.FBM.csproj" /> <ProjectReference Include="..\..\VNLib.Net.Rest.Client\VNLib.Net.Rest.Client.csproj" /> diff --git a/VNLib.Data.Caching.Global/VNLib.Data.Caching.Global.csproj b/VNLib.Data.Caching.Global/VNLib.Data.Caching.Global.csproj index 0ec3c36..a23b199 100644 --- a/VNLib.Data.Caching.Global/VNLib.Data.Caching.Global.csproj +++ b/VNLib.Data.Caching.Global/VNLib.Data.Caching.Global.csproj @@ -6,9 +6,19 @@ <Nullable>enable</Nullable> <Authors>Vaughn Nugent</Authors> <Copyright>Copyright © 2022 Vaughn Nugent</Copyright> - <Platforms>AnyCPU;ARM32;x64</Platforms> - <PlatformTarget>x64</PlatformTarget> <GenerateDocumentationFile>True</GenerateDocumentationFile> + <PlatformTarget>AnyCPU</PlatformTarget> + <PackageProjectUrl>https://www.vaughnnugent.com/resources</PackageProjectUrl> + <AnalysisLevel>latest-all</AnalysisLevel> + <Version>1.0.1.1</Version> + </PropertyGroup> + + <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'"> + <Deterministic>False</Deterministic> + </PropertyGroup> + + <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|AnyCPU'"> + <Deterministic>False</Deterministic> </PropertyGroup> <ItemGroup> diff --git a/VNLib.Data.Caching.ObjectCache/VNLib.Data.Caching.ObjectCache.csproj b/VNLib.Data.Caching.ObjectCache/VNLib.Data.Caching.ObjectCache.csproj index 389cabd..91e261c 100644 --- a/VNLib.Data.Caching.ObjectCache/VNLib.Data.Caching.ObjectCache.csproj +++ b/VNLib.Data.Caching.ObjectCache/VNLib.Data.Caching.ObjectCache.csproj @@ -2,30 +2,15 @@ <PropertyGroup> <TargetFramework>net6.0</TargetFramework> - <Platforms>AnyCPU;x64</Platforms> + <Authors>Vaughn Nugent</Authors> <Copyright>Copyright © 2022 Vaughn Nugent</Copyright> <Nullable>enable</Nullable> <GenerateDocumentationFile>True</GenerateDocumentationFile> - <PackageProjectUrl>www.vaughnnugent.com/resources</PackageProjectUrl> + <PackageProjectUrl>https://www.vaughnnugent.com/resources</PackageProjectUrl> <AssemblyVersion>1.0.0.1</AssemblyVersion> - </PropertyGroup> - - <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|x64'"> - <DocumentationFile></DocumentationFile> - <CheckForOverflowUnderflow>False</CheckForOverflowUnderflow> - </PropertyGroup> - - <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'"> - <CheckForOverflowUnderflow>False</CheckForOverflowUnderflow> - </PropertyGroup> - - <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|AnyCPU'"> - <CheckForOverflowUnderflow>False</CheckForOverflowUnderflow> - </PropertyGroup> - - <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|x64'"> - <CheckForOverflowUnderflow>False</CheckForOverflowUnderflow> + <Version>1.0.1.1</Version> + <AnalysisLevel>latest-all</AnalysisLevel> </PropertyGroup> <ItemGroup> diff --git a/VNLib.Data.Caching/src/VNLib.Data.Caching.csproj b/VNLib.Data.Caching/src/VNLib.Data.Caching.csproj index cc09c1f..f4d9269 100644 --- a/VNLib.Data.Caching/src/VNLib.Data.Caching.csproj +++ b/VNLib.Data.Caching/src/VNLib.Data.Caching.csproj @@ -2,17 +2,18 @@ <PropertyGroup> <TargetFramework>net6.0</TargetFramework> - <Platforms>AnyCPU;x64</Platforms> <Authors>Vaughn Nugent</Authors> <Copyright>Copyright © 2022 Vaughn Nugent</Copyright> - <Version>1.0.0.1</Version> + <Version>1.0.1.1</Version> <GenerateDocumentationFile>True</GenerateDocumentationFile> - <PlatformTarget>x64</PlatformTarget> <Nullable>enable</Nullable> </PropertyGroup> - <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|x64'"> + <PropertyGroup> <DocumentationFile></DocumentationFile> + <CopyLocalLockFileAssemblies>true</CopyLocalLockFileAssemblies> + <PackageProjectUrl>https://www.vaughnnugent.com/resources</PackageProjectUrl> + <AnalysisLevel>latest-all</AnalysisLevel> </PropertyGroup> <ItemGroup> @@ -27,6 +28,7 @@ </ItemGroup> <ItemGroup> + <ProjectReference Include="..\..\..\..\VNLib\Http\VNLib.Net.Http.csproj" /> <ProjectReference Include="..\..\..\..\VNLib\Utils\src\VNLib.Utils.csproj" /> <ProjectReference Include="..\..\..\..\VNLib\VNLib.Net.Messaging.FBM\src\VNLib.Net.Messaging.FBM.csproj" /> </ItemGroup> |