aboutsummaryrefslogtreecommitdiff
path: root/plugins/VNLib.Data.Caching.Providers.Redis/src/RedisClientCacheEntry.cs
blob: 30d936ca52ee2cb0ab4ae7809e2637c09e0849e4 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
/*
* Copyright (c) 2023 Vaughn Nugent
* 
* Library: VNLib
* Package: VNLib.Data.Caching.Providers.Redis
* File: RedisClientCacheEntry.cs 
*
* RedisClientCacheEntry.cs is part of VNLib.Data.Caching.Providers.Redis 
* which is part of the larger VNLib collection of libraries and utilities.
*
* VNLib.Data.Caching.Providers.Redis is free software: you can redistribute it and/or modify 
* it under the terms of the GNU Affero General Public License as 
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* VNLib.Data.Caching.Providers.Redis is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program.  If not, see https://www.gnu.org/licenses/.
*/


using System;
using System.Buffers;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;

using StackExchange.Redis;

using VNLib.Utils;
using VNLib.Utils.Memory;
using VNLib.Utils.Logging;
using VNLib.Utils.Extensions;
using VNLib.Plugins;
using VNLib.Plugins.Extensions.Loading;

namespace VNLib.Data.Caching.Providers.Redis
{
    /*
     * This package exports an IGlobalCacheProvider that is intended to be packaged by 
     * application distributors that want to use Redis as a global cache for their 
     * application.
     * 
     * The IGlobalCacheProvider primarily performs get/set operations on raw memory 
     * where possible. Custom serializers are allowed to be used for object serialziation.
     * 
     * The interface also requires that implementations provide a fallback serialization 
     * method. For now, this is a JSON serializer. But will likely have more complex 
     * decision making where possible, such as protobufs
     */

    [ExternService]
    [ConfigurationName("cache")]
    public sealed class RedisClientCacheEntry : IGlobalCacheProvider
    {
        private const int InitialWriterBufferSize = 4096;
       
        private readonly JsonCacheObjectSerializer _fallbackSerializer;
        private readonly IUnmangedHeap _defaultHeap;
        private readonly Task OnLoadTask;
      

        private ConnectionMultiplexer? _redis;
        private IDatabase? _database;

        public RedisClientCacheEntry(PluginBase plugin, IConfigScope config)
        {
            _fallbackSerializer = new();
            _defaultHeap = MemoryUtil.Shared;

            ILogProvider redisLog = plugin.Log.CreateScope("REDIS");

            //Allow a raw connection string to be used
            if(config.ContainsKey("connection_string"))
            {
                string connectionString = config.GetRequiredProperty("connection_string", el => el.GetString()!);

                //Store load task so it can be awaited by the host
                OnLoadTask = Task.Run(async () =>
                {

                    if(connectionString.Contains("password=[SECRET]", StringComparison.OrdinalIgnoreCase))
                    {
                        //Load the password from the secret store and replace the placeholder with the found secret
                        using ISecretResult password = await plugin.GetSecretAsync("redis_password");
                        connectionString = connectionString.Replace("password=[SECRET]", $"password={password.Result}", StringComparison.OrdinalIgnoreCase);
                    }

                    redisLog.Information("Connecting to Redis server...");

                    //Connect to the server
                    _redis = await ConnectionMultiplexer.ConnectAsync(connectionString);

                    _database = _redis.GetDatabase();

                    redisLog.Information("Successfully connected to Redis server");
                });
            }
            else
            {
                ConfigurationOptions options = GetOptionsFromConfig(config);

                //Store load task so it can be awaited by the host
                OnLoadTask = Task.Run(async () =>
                {
                    //Retrieve the password last
                    using ISecretResult password = await plugin.GetSecretAsync("redis_password");
                    options.Password = password.Result.ToString();

                    redisLog.Information("Connecting to Redis server...");

                    //Connect to the server
                    _redis = await ConnectionMultiplexer.ConnectAsync(options);

                    _database = _redis.GetDatabase();

                    redisLog.Information("Successfully connected to Redis server");
                });
            }
        }

        private static ConfigurationOptions GetOptionsFromConfig(IConfigScope config)
        {
            //Try go get the hostname
            string? hostname = config.GetRequiredProperty("url", p => p.GetString()!);
            Uri serverUri = new(hostname, UriKind.RelativeOrAbsolute);

            ConfigurationOptions options = new()
            {
                Ssl = serverUri.Scheme == "rediss://",
            };

            //Add the host and port
            options.EndPoints.Add(serverUri.DnsSafeHost, serverUri.Port);

            //Get optional values
            if (config.TryGetValue("user", out JsonElement user))
            {
                options.User = user.GetString();
            }

            if (config.TryGetValue("keepalive_sec", out JsonElement keepaliveSec))
            {
                options.KeepAlive = keepaliveSec.GetInt32();
            }

            if (config.TryGetValue("timeout_ms", out JsonElement timeoutMs))
            {
                options.SyncTimeout = timeoutMs.GetInt32();
            }

            if (config.TryGetValue("connect_timeout_ms", out JsonElement connectTimeoutMs))
            {
                options.ConnectTimeout = connectTimeoutMs.GetInt32();
            }

            if (config.TryGetValue("abort_on_connect_fail", out JsonElement abortOnConnectFail))
            {
                options.AbortOnConnectFail = abortOnConnectFail.GetBoolean();
            }

            if (config.TryGetValue("allow_admin", out JsonElement allowAdmin))
            {
                options.AllowAdmin = allowAdmin.GetBoolean();
            }

            if (config.TryGetValue("connect_retry", out JsonElement connectRetry))
            {
                options.ConnectRetry = connectRetry.GetInt32();
            }

            if (config.TryGetValue("connect_timeout", out JsonElement connectTimeout))
            {
                options.ConnectTimeout = connectTimeout.GetInt32();
            }

            if (config.TryGetValue("default_database", out JsonElement defaultDatabase))
            {
                options.DefaultDatabase = defaultDatabase.GetInt32();
            }

            if (config.TryGetValue("keep_alive", out JsonElement keepAlive))
            {
                options.KeepAlive = keepAlive.GetInt32();
            }

            if (config.TryGetValue("name", out JsonElement name))
            {
                options.ClientName = name.GetString();
            }

            return options;
        }

        ///<inheritdoc/>
        public bool IsConnected => _redis?.IsConnected == true;

        //Called by the host to wait for the cache to be loaded
        public Task InitAsync() => OnLoadTask;

        ///<inheritdoc/>
        public Task AddOrUpdateAsync<T>(string key, string? newKey, T value, CancellationToken cancellation) => AddOrUpdateAsync(key, newKey, value, _fallbackSerializer, cancellation);

        ///<inheritdoc/>
        public async Task AddOrUpdateAsync<T>(string key, string? newKey, T value, ICacheObjectSerializer serialzer, CancellationToken cancellation)
        {
            _ = key ?? throw new ArgumentNullException(nameof(key));
            _ = serialzer ?? throw new ArgumentNullException(nameof(serialzer));

            //Alloc update buffer
            using AddOrUpdateBuffer buffer = new(_defaultHeap, InitialWriterBufferSize, false);

            //Serialize the object
            serialzer.Serialize(value, buffer);

            //Update object data
            await _database.StringSetAsync(key, buffer.GetWrittenData());

            if (!string.IsNullOrWhiteSpace(newKey))
            {
               //also update the key
                await _database.KeyRenameAsync(key, newKey);
            }
        }

        ///<inheritdoc/>
        public async Task AddOrUpdateAsync<T>(string key, string? newKey, ObjectDataReader<T> callback, T state, CancellationToken cancellation)
        {
            /*
             * Because the redis database only allows ReadonlyMemory when 
             * updating keys, we must copy the object data into a temporary
             * heap buffer and then copy it into the database.
             */

            int length = 0;

            //Create a copy buffer and copy the object data into it
            using IMemoryOwner<byte> buffer = AllocAndCopy(callback, state, _defaultHeap, ref length);

            //Set the value at the old key
            await _database.StringSetAsync(key, buffer.Memory[..length]);

            //If required also update the key
            if (!string.IsNullOrWhiteSpace(newKey))
            {
                await _database.KeyRenameAsync(key, newKey);
            }
            
            static IMemoryOwner<byte> AllocAndCopy(ObjectDataReader<T> callback, T state, IUnmangedHeap heap, ref int length)
            {
                //Get the buffer from the callback
                ReadOnlySpan<byte> data = callback(state);
                length = data.Length;

                //Alloc the buffer on the desired heap
                MemoryManager<byte> buffer = heap.DirectAlloc<byte>(length, false);

                //Copy object data to the buffer
                data.CopyTo(buffer.GetSpan());

                return buffer;
            }
        }

        ///<inheritdoc/>
        public async Task<bool> DeleteAsync(string key, CancellationToken cancellation)
        {
            RedisValue value = await _database.StringGetDeleteAsync(key);
            return value.IsNull == false;   //Should only be null if the key did not exist
        }

        ///<inheritdoc/>
        public Task<T?> GetAsync<T>(string key, CancellationToken cancellation) => GetAsync<T>(key, _fallbackSerializer, cancellation);

        ///<inheritdoc/>
        public async Task<T?> GetAsync<T>(string key, ICacheObjectDeserializer deserializer, CancellationToken cancellation)
        {
            _ = deserializer ?? throw new ArgumentNullException(nameof(deserializer));

            //Try to get the value from the cache
            RedisValue value = await _database.StringGetAsync(key);

            //If the value is found, set the raw data
            if (value.IsNull)
            {
                return default;
            }

            return deserializer.Deserialize<T>(((ReadOnlyMemory<byte>)value).Span);
        }

        ///<inheritdoc/>
        public async Task GetAsync<T>(string key, ObjectDataSet<T> callback, T state, CancellationToken cancellation)
        {
            _ = callback ?? throw new ArgumentNullException(nameof(callback));

            //Try to get the value from the cache
            RedisValue value = await _database.StringGetAsync(key);

            //If the value is found, set the raw data
            if (!value.IsNull)
            {
                //Invoke callback with object data
                callback(state, ((ReadOnlyMemory<byte>)value).Span);
            }
        }

        ///<inheritdoc/>
        public object GetUnderlyingStore()
        {
            return _database == null ? throw new InvalidOperationException("The cache store is not available") : _database;
        }

        private sealed class AddOrUpdateBuffer: VnDisposeable, IBufferWriter<byte>
        {
            private readonly MemoryHandle<byte> _handle;
            private readonly MemoryManager<byte> _manager;

            private int _position;

            public AddOrUpdateBuffer(IUnmangedHeap heap, int initialSize, bool zero)
            {
                _handle = heap.Alloc<byte>(CalNewSize(initialSize), zero);
                //Create memory manager around the memhandle that does not own the handle
                _manager = _handle.ToMemoryManager(false);
            }

            public void Advance(int count)
            {
                if(count < 0)
                {
                    throw new ArgumentOutOfRangeException(nameof(count));
                }
                _position += count;
            }

            ///<inheritdoc/>
            public Memory<byte> GetMemory(int sizeHint = 0)
            {
                nint newSize = CalNewSize(sizeHint);

                //Resize if needed
                _handle.ResizeIfSmaller(newSize);

                //Return the memory
                return _manager.Memory.Slice(_position, sizeHint);
            }
            
            nint CalNewSize(int size) => MemoryUtil.NearestPage(size + _position);

            ///<inheritdoc/>
            public Span<byte> GetSpan(int sizeHint = 0)
            {
                nint newSize = CalNewSize(sizeHint);

                //Resize if needed
                _handle.ResizeIfSmaller(newSize);

                //Return the memory
                return _handle.AsSpan(_position);
            }

            /// <summary>
            /// Gets the written data
            /// </summary>
            /// <returns>The memory segment pointing to the data that was written by the serializer</returns>
            public ReadOnlyMemory<byte> GetWrittenData() => _manager.Memory[.._position];

            protected override void Free()
            {
                //Free the handle, dont need to free memory manager
                _handle.Dispose();
            }
        }
    }
}