aboutsummaryrefslogtreecommitdiff
path: root/lib/VNLib.Data.Caching.Extensions/src/Clustering/NodeDiscoveryCollection.cs
blob: 16b96a313346fd3904a1c770709c1592ff7f5655 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
/*
* Copyright (c) 2024 Vaughn Nugent
* 
* Library: VNLib
* Package: VNLib.Data.Caching.Extensions
* File: NodeDiscoveryCollection.cs 
*
* NodeDiscoveryCollection.cs is part of VNLib.Data.Caching.Extensions which is part 
* of the larger VNLib collection of libraries and utilities.
*
* VNLib.Data.Caching.Extensions is free software: you can redistribute it and/or modify 
* it under the terms of the GNU Affero General Public License as 
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* VNLib.Data.Caching.Extensions is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program.  If not, see https://www.gnu.org/licenses/.
*/

using System;
using System.Linq;
using System.Collections;
using System.Collections.Generic;
using System.Runtime.CompilerServices;

using VNLib.Utils.Extensions;

namespace VNLib.Data.Caching.Extensions.Clustering
{
    /// <summary>
    /// Represents a collection of available cache nodes from a discovery process
    /// </summary>
    public sealed class NodeDiscoveryCollection(StrongBox<string?>? selfId) : INodeDiscoveryCollection
    {
        private LinkedList<CacheNodeAdvertisment> _peers = new();

        /// <summary>
        /// Manually adds nodes to the collection that were not discovered through the discovery process
        /// </summary>
        /// <param name="nodes">The nodes to add</param>
        public void AddManualNodes(IEnumerable<CacheNodeAdvertisment> nodes)
        {
            //Get only the nodes that are not already in the collection
            IEnumerable<CacheNodeAdvertisment> newPeers = nodes.Except(_peers);

            //Add them to the end of the collection
            foreach (CacheNodeAdvertisment peer in newPeers)
            {
                _peers.AddLast(peer);
            }
        }

        /// <summary>
        /// Removes a vector of nodes from the internal collection
        /// </summary>
        /// <param name="nodes">The vector containg nodes to remove from the collection</param>
        public void RemoveManualNodes(IEnumerable<CacheNodeAdvertisment> nodes) => nodes.ForEach(n => _peers.Remove(n));

        ///<inheritdoc/>
        public INodeDiscoveryEnumerator BeginDiscovery() => new NodeEnumerator(new(), selfId?.Value);

        ///<inheritdoc/>
        public INodeDiscoveryEnumerator BeginDiscovery(IEnumerable<CacheNodeAdvertisment> initialPeers)
        {
            ArgumentNullException.ThrowIfNull(initialPeers);

            //Init new enumerator with the initial peers
            return new NodeEnumerator(new(initialPeers), selfId?.Value);
        }

        ///<inheritdoc/>
        public void CompleteDiscovery(INodeDiscoveryEnumerator enumerator)
        {
            ArgumentNullException.ThrowIfNull(enumerator);  

            //Capture all nodes from the enumerator and store them as our current peers
            _peers = (enumerator as NodeEnumerator)!.Peers;
        }

        ///<inheritdoc/>
        public CacheNodeAdvertisment[] GetAllNodes() => _peers.ToArray();

        private sealed record class NodeEnumerator(LinkedList<CacheNodeAdvertisment> Peers, string? SelfNodeId) : INodeDiscoveryEnumerator
        {
            private bool isInit;

            //Keep track of the current node in the collection so we can move down the list
            private LinkedListNode<CacheNodeAdvertisment>? _currentNode;

            public CacheNodeAdvertisment Current => _currentNode?.Value;
            object IEnumerator.Current => _currentNode?.Value;


            ///<inheritdoc/>
            public bool MoveNext()
            {
                if (!isInit)
                {
                    _currentNode = Peers.First;
                    isInit = true;
                }
                else
                {
                    //Move to the next peer in the collection
                    _currentNode = _currentNode?.Next;
                }

                return _currentNode?.Value != null;
            }

            ///<inheritdoc/>
            public void OnPeerDiscoveryComplete(IEnumerable<CacheNodeAdvertisment> discoveredPeers)
            {
                //Get only the peers from the discovery that are not already in the collection, or ourselves
                IEnumerable<CacheNodeAdvertisment> newPeers = discoveredPeers.Except(Peers);

                if (!string.IsNullOrWhiteSpace(SelfNodeId))
                {
                    //remove ourselves from the list
                    newPeers = newPeers.Where(p => !SelfNodeId.Equals(p.NodeId, StringComparison.OrdinalIgnoreCase));
                }

                //Add them to the end of the collection 
                foreach (CacheNodeAdvertisment ad in newPeers)
                {
                    Peers.AddLast(ad);
                }
            }

            public void Reset()
            {
                //Go to the first node
                _currentNode = Peers.First;
            }

            public void Dispose()
            { }
        }
    }
}