aboutsummaryrefslogtreecommitdiff
path: root/plugins/VNLib.Data.Caching.Providers.VNCache/src/Clustering/ClusterNodeIndex.cs
blob: e9dcbc506d66540e1ed56022b6c89210d6d72da7 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
/*
* Copyright (c) 2024 Vaughn Nugent
* 
* Library: VNLib
* Package: VNLib.Data.Caching.Providers.VNCache
* File: ClusterNodeIndex.cs 
*
* ClusterNodeIndex.cs is part of VNLib.Data.Caching.Providers.VNCache which is part of the larger 
* VNLib collection of libraries and utilities.
*
* VNLib.Data.Caching.Providers.VNCache is free software: you can redistribute it and/or modify 
* it under the terms of the GNU Affero General Public License as 
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* VNLib.Data.Caching.Providers.VNCache is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program.  If not, see https://www.gnu.org/licenses/.
*/

using System;
using System.Threading;
using System.Text.Json;
using System.Reflection;
using System.Threading.Tasks;

using VNLib.Utils.Logging;
using VNLib.Utils.Resources;
using VNLib.Data.Caching.Extensions;
using VNLib.Data.Caching.Extensions.Clustering;
using VNLib.Plugins.Extensions.Loading.Events;

namespace VNLib.Data.Caching.Providers.VNCache.Clustering
{
    internal static class ClusterNodeIndex
    {
        const string APP_DOMAIN_KEY = "vnlib.data.caching.providers.vncache";

        /*
         * Safeley determines if an instance of a node index already exists in the app domain
         * if so it returns that instance, otherwise it creates a new index instance and stores 
         * it in the app domain.
         */

        public static IClusterNodeIndex CreateIndex(VNCacheClusterManager cluster)
        {
            /* TEMPORARY: 
             * Named semaphores are only supported on Windows, which allowed synchronized communication between 
             * plugins, but this is not supported on Linux. This will be replaced with a more robust solution
             * in the future. For now they will just need to be separate instances.
             * 
             * Remember while plugins are in the same app-domain, they do not share an assembly 
             * load context which means unless the default ALC contains the desired types, types won't unify
             * so we have to use "ghetto" features to avoid interprocess communication, in the same process...
             */

            if (OperatingSystem.IsWindows())
            {
                //Create a named semaphore to ensure only one index is created per app domain
                using Semaphore sm = new (1, 1, APP_DOMAIN_KEY, out _);

                if (!sm.WaitOne(500))
                {
                    throw new TimeoutException("Failed to access the Cluster index shared semaphore");
                }

                try
                {
                    //Try to get an existing index from the app domain global storage pool
                    object? remoteIndex = AppDomain.CurrentDomain.GetData(APP_DOMAIN_KEY);
                    if (remoteIndex == null)
                    {
                        //Create a new index and store it in the app domain
                        IClusterNodeIndex index = new LocalHandler(cluster);
                        AppDomain.CurrentDomain.SetData(APP_DOMAIN_KEY, index);
                        return index;
                    }
                    else
                    {
                        //Use the existing index
                        return new RemoteHandler(remoteIndex);
                    }
                }
                finally
                {
                    sm.Release();
                }
            }
            else
            {
                return new LocalHandler(cluster);
            }
        }
   
        /*
         * So a bit of explaination. 
         * 
         * Plugins don't share types. Each plugin will load this package into its own ALC. Which will
         * cause n instances of the cluster indext manager. Which can cause unecessary http traffic 
         * building the cluster index multiple times. In an attemt to avoid this, I try to share a single
         * cluster index instance across all plugins in the same app domain. 
         * 
         * To do this a local handler instance is loaded into whichever plugin accuires the named semaphore
         * first, and then the instance is stored in the app domain global storage pool. If its found,
         * then other plugins will use the remote handler to access the index.
         * 
         * The remote handler, attempts to use reflection to get function delegates and call the local 
         * handler functions via reflection. 
         * 
         * Unless VNLib.Core supports a new way to safley share types across ALCs, this is my solution.
         */

        sealed class LocalHandler(VNCacheClusterManager cluster) : IClusterNodeIndex, IIntervalScheduleable
        {
            private Task _currentUpdate = Task.CompletedTask;

            ///<inheritdoc/>
            public CacheNodeAdvertisment? GetNextNode()
            {
                //Get all discovered nodes
                CacheNodeAdvertisment[] ads = cluster.DiscoveredNodes.GetAllNodes();
                //Just get a random node from the collection for now
                return ads.Length > 0 ? ads.SelectRandom() : null;
            }

            ///<inheritdoc/>
            public Task WaitForDiscoveryAsync(CancellationToken cancellationToken) => _currentUpdate;

            ///<inheritdoc/>
            public Task OnIntervalAsync(ILogProvider log, CancellationToken cancellationToken)
            {
                //Run discovery operation and update the task
                _currentUpdate = cluster.DiscoverNodesAsync(cancellationToken);
                return Task.CompletedTask;
            }

            /*
             * Private methods that are called via reflection
             * by remote instances of the index
             */
            internal string? SerializeNextNode()
            {
                CacheNodeAdvertisment? nextNode = GetNextNode();
                return nextNode == null ? null : JsonSerializer.Serialize(nextNode);
            }
        }

        sealed class RemoteHandler(object RemoteIndex) : IClusterNodeIndex
        {
            private readonly Func<string?> _remoteSerializer = ManagedLibrary.GetMethod<Func<string?>>(RemoteIndex, nameof(LocalHandler.SerializeNextNode), BindingFlags.NonPublic);

            private readonly Func<CancellationToken, Task> _waitTask = ManagedLibrary.GetMethod<Func<CancellationToken, Task>>(RemoteIndex, nameof(WaitForDiscoveryAsync), BindingFlags.Public);

            ///<inheritdoc/>
            public CacheNodeAdvertisment? GetNextNode()
            {
                //Deserialize the next node from the remote index
                string? nexNode = _remoteSerializer();
                return nexNode == null ? null : JsonSerializer.Deserialize<CacheNodeAdvertisment>(nexNode);
            }

            ///<inheritdoc/>
            public Task WaitForDiscoveryAsync(CancellationToken cancellationToken) => _waitTask(cancellationToken);

        }
    }
}