aboutsummaryrefslogtreecommitdiff
path: root/lib/VNLib.Data.Caching.Extensions/src/VNCacheClusterManager.cs
blob: f68968d0a96d8f57ccc22c10f804c7e428c5cdaf (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
/*
* Copyright (c) 2024 Vaughn Nugent
* 
* Library: VNLib
* Package: VNLib.Data.Caching.Extensions
* File: VNCacheClusterManager.cs 
*
* VNCacheClusterManager.cs is part of VNLib.Data.Caching.Extensions 
* which is part of the larger VNLib collection of libraries and utilities.
*
* VNLib.Data.Caching.Extensions is free software: you can redistribute it and/or modify 
* it under the terms of the GNU Affero General Public License as 
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* VNLib.Data.Caching.Extensions is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program.  If not, see https://www.gnu.org/licenses/.
*/

using System;
using System.Linq;
using System.Security;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using System.Collections.Generic;

using RestSharp;

using VNLib.Hashing.IdentityUtility;
using VNLib.Net.Rest.Client.Construction;
using VNLib.Data.Caching.Extensions.ApiModel;
using VNLib.Data.Caching.Extensions.Clustering;

namespace VNLib.Data.Caching.Extensions
{

    /// <summary>
    /// A VNCache cluster client discovery maanger. Used to simplify the discovery 
    /// of cache nodes
    /// </summary>
    /// <param name="config">The client configuration instance</param>
    public class VNCacheClusterManager(CacheClientConfiguration config)
    {
        /// <summary>
        /// The internal collection of discovered nodes
        /// </summary>
        protected NodeDiscoveryCollection NodeCollection { get; } = GetNodeCollection(config);

        /// <summary>
        /// Gets the collection of discovered nodes within the manager
        /// </summary>
        public INodeDiscoveryCollection DiscoveredNodes => NodeCollection;

        /// <summary>
        /// The underlying <see cref="CacheClientConfiguration"/> instance
        /// </summary>
        public CacheClientConfiguration Config => config;

        /// <summary>
        /// Adds an array of nodes manually to the collection of discovered cluster nodes
        /// </summary>
        /// <param name="nodes"></param>
        public void AddManualNodes(params CacheNodeAdvertisment[] nodes) => AddManualNodes(nodes.AsEnumerable());

        /// <summary>
        /// Adds an array of nodes manually to the collection of discovered cluster nodes
        /// </summary>
        /// <param name="nodes"></param>
        public void AddManualNodes(IEnumerable<CacheNodeAdvertisment> nodes) => NodeCollection.AddManualNodes(nodes);

        /// <summary>
        /// Removes an array of nodes manually from the collection of discovered cluster nodes
        /// </summary>
        /// <param name="nodes"></param>
        public void RemoveManualNodes(params CacheNodeAdvertisment[] nodes) => RemoveManualNodes(nodes.AsEnumerable());

        /// <summary>
        /// Removes an array of nodes manually from the collection of discovered cluster nodes
        /// </summary>
        /// <param name="nodes"></param>
        public void RemoveManualNodes(IEnumerable<CacheNodeAdvertisment> nodes) => NodeCollection.RemoveManualNodes(nodes);

        /// <summary>
        /// Resolves the initial well-known cache nodes into their advertisments
        /// </summary>
        /// <param name="cancellation">A token to cancel the operation</param>
        /// <returns>An array of resolved nodes</returns>
        public async Task<CacheNodeAdvertisment[]> ResolveWellKnownAsync(CancellationToken cancellation)
        {
            //Make sure at least one node defined
            if (config?.WellKnownNodes == null || config.WellKnownNodes.Length == 0)
            {
                throw new ArgumentException("There must be at least one cache node defined in the client configuration");
            }

            Task<CacheNodeAdvertisment?>[] initialAdds = new Task<CacheNodeAdvertisment?>[config.WellKnownNodes.Length];

            //Discover initial advertisments from well-known addresses
            for (int i = 0; i < config.WellKnownNodes.Length; i++)
            {
                initialAdds[i] = DiscoverNodeConfigAsync(config.WellKnownNodes[i], cancellation);
            }

            //Wait for all initial adds to complete
            await Task.WhenAll(initialAdds);

            //Get the initial advertisments that arent null
            return initialAdds.Select(static x => x.Result!).Where(static s => s != null).ToArray();
        }

        /// <summary>
        /// Discovers ALL possible cache nodes itteritivley, first by collecting the configuration
        /// from the initial peers.
        /// This will make connections to all discoverable servers
        /// </summary>
        /// <param name="cancellation">A token to cancel the operation</param>
        /// <returns></returns>
        /// <exception cref="ArgumentException"></exception>
        /// <exception cref="CacheDiscoveryFailureException"></exception>
        /// <remarks>
        /// This method simply combines the <see cref="ResolveWellKnownAsync"/> and <see cref="DiscoverNodesAsync"/>
        /// methods into a single operation
        /// </remarks>
        public async Task DiscoverNodesAsync(CancellationToken cancellation)
        {
            //Make sure at least one node defined
            if (config?.WellKnownNodes == null || config.WellKnownNodes.Length == 0)
            {
                throw new ArgumentException("There must be at least one cache node defined in the client configuration");
            }

            /*
             * Connect to well-known nodes from the client configuration to discovery its layout.
             * 
             */
            CacheNodeAdvertisment[] initialPeers = await ResolveWellKnownAsync(cancellation);

            if (initialPeers.Length == 0)
            {
                throw new CacheDiscoveryFailureException("There must be at least one available cache node to continue discovery");
            }

            await DiscoverNodesAsync(initialPeers, cancellation);
        }

        /// <summary>
        /// Discovers ALL possible cache nodes itteritivley from the current collection of initial peers.
        /// This will make connections to all discoverable servers and update the client configuration, with all 
        /// discovered peers
        /// </summary>
        /// <param name="initialPeers">Accepts an array of initial peers to override the endpoint discovery process</param>
        /// <param name="cancellation">A token to cancel the operation</param>
        /// <returns>A task that completes when all nodes have been discovered</returns>
        /// <exception cref="ArgumentException"></exception>
        /// <exception cref="CacheDiscoveryFailureException"></exception>
        public async Task DiscoverNodesAsync(CacheNodeAdvertisment[] initialPeers, CancellationToken cancellation)
        {
            //Make sure at least one node defined
            ArgumentNullException.ThrowIfNull(initialPeers);
            ArgumentOutOfRangeException.ThrowIfZero(initialPeers.Length);

            //Get the discovery enumerator with the initial peers
            using INodeDiscoveryEnumerator enumerator = NodeCollection.BeginDiscovery(initialPeers);

            //Start the discovery process
            await DiscoverNodesAsync(enumerator, config, config.ErrorHandler, cancellation);

            //Commit discovered nodes to stored node collection
            NodeCollection.CompleteDiscovery(enumerator);
        }

        private static async Task DiscoverNodesAsync(
            INodeDiscoveryEnumerator enumerator,
            CacheClientConfiguration config,
            ICacheDiscoveryErrorHandler? errHandler,
            CancellationToken cancellation
        )
        {
            //Loop through servers
            while (enumerator.MoveNext())
            {
                //Make sure the node has a discovery endpoint
                if (enumerator.Current.DiscoveryEndpoint == null)
                {
                    //Skip this node
                    continue;
                }

                /*
                 * We are allowed to save nodes that do not have a discovery endpoint, but we cannot 
                 * discover nodes from them we can only use them as cache
                 */

                //add a random delay to avoid spamming servers
                await Task.Delay((int)Random.Shared.NextInt64(100, 500), cancellation);

                try
                {
                    //Discover nodes from the current node
                    CacheNodeAdvertisment[]? nodes = await GetCacheNodesAsync(enumerator.Current, config, cancellation);

                    if (nodes != null)
                    {
                        //Add nodes to the collection
                        enumerator.OnPeerDiscoveryComplete(nodes);
                    }
                }
                //Catch exceptions when an error handler is defined
                catch (Exception ex) when (errHandler != null)
                {
                    //Handle the error
                    errHandler.OnDiscoveryError(enumerator.Current, ex);
                }
                catch (Exception ex)
                {
                    throw new CacheDiscoveryFailureException($"Failed to discovery peer node {enumerator.Current?.NodeId}, cannot continue", ex);
                }
            }
        }

        /// <summary>
        /// Contacts the given server's discovery endpoint to discover a list of available 
        /// servers we can connect to
        /// </summary>
        /// <param name="advert">An advertisment of a server to discover other nodes from</param>
        /// <param name="cancellationToken">A token to cancel the operationS</param>
        /// <param name="config">The cache configuration object</param>
        /// <returns>The list of active servers</returns>
        /// <exception cref="SecurityException"></exception>
        /// <exception cref="ArgumentException"></exception>
        /// <exception cref="ArgumentNullException"></exception>
        public static async Task<CacheNodeAdvertisment[]?> GetCacheNodesAsync(CacheNodeAdvertisment advert, CacheClientConfiguration config, CancellationToken cancellationToken = default)
        {
            ArgumentNullException.ThrowIfNull(advert);
            ArgumentNullException.ThrowIfNull(config);
            ArgumentNullException.ThrowIfNull(advert.DiscoveryEndpoint, nameof(advert.DiscoveryEndpoint));

            DiscoveryRequest req = new (advert.DiscoveryEndpoint, config);

            //Site adapter verifies response messages so we dont need to check on the response
            byte[] data = await CacheSiteAdapter.Instance.ExecuteAsync(req, cancellationToken).AsBytes()
                ?? throw new InvalidOperationException($"No data returned from node {advert.NodeId}");

            //Response is jwt
            using JsonWebToken responseJwt = JsonWebToken.ParseRaw(data);

            using JsonDocument doc = responseJwt.GetPayload();
            return doc.RootElement.GetProperty("peers").Deserialize<CacheNodeAdvertisment[]>();
        }


        /*
         * This method will connect to a given well-known (cache config endpoint) and discover the 
         * servers configuration (endpoint config) 
         * 
         * This function exists so clients only need a single endpoint to connect to, and the server
         * will return it's signed configuration data (including cluster network information)
         */
        private async Task<CacheNodeAdvertisment?> DiscoverNodeConfigAsync(Uri serverUri, CancellationToken cancellation)
        {
            try
            {
                GetConfigRequest req = new (serverUri, config);

                //Site adapter verifies response messages so we dont need to check on the response
                byte[] data = await CacheSiteAdapter.Instance.ExecuteAsync(req, cancellation).AsBytes()
                        ?? throw new CacheDiscoveryFailureException($"No data returned from desired cache node");

                //Response is jwt
                using JsonWebToken responseJwt = JsonWebToken.ParseRaw(data);

                //The entire payload is just the single serialzed advertisment
                using JsonDocument doc =  responseJwt.GetPayload();

                return doc.RootElement.GetProperty("sub").Deserialize<CacheNodeAdvertisment>();
            }
            //Bypass cdfe when error handler is null (avoid nesting)`
            catch (CacheDiscoveryFailureException) when (config.ErrorHandler == null)
            {
                throw;
            }
            //Catch exceptions when an error handler is defined
            catch (Exception ex) when (config.ErrorHandler != null)
            {
                //Handle the error
                config.ErrorHandler.OnDiscoveryError(serverUri, ex);
                return null;
            }
            catch (Exception ex)
            {
                throw new CacheDiscoveryFailureException("Failed to discover node configuration", ex);
            }
        }

        private static NodeDiscoveryCollection GetNodeCollection(CacheClientConfiguration config)
        {
            return config is CacheNodeConfiguration cnc ? new (cnc.NodeIdRef!) : new (null);
        }
    }
}