aboutsummaryrefslogtreecommitdiff
path: root/Libs/VNLib.Plugins.Essentials.Sessions.Runtime
diff options
context:
space:
mode:
authorLibravatar vman <public@vaughnnugent.com>2022-10-30 02:28:12 -0400
committerLibravatar vman <public@vaughnnugent.com>2022-10-30 02:28:12 -0400
commita8510fb835dcc5e1142d700164ce5a4bd44e1a25 (patch)
tree28caab320f777a384cb6883b68dd999cdc8c0a3f /Libs/VNLib.Plugins.Essentials.Sessions.Runtime
Add project files.
Diffstat (limited to 'Libs/VNLib.Plugins.Essentials.Sessions.Runtime')
-rw-r--r--Libs/VNLib.Plugins.Essentials.Sessions.Runtime/IRuntimeSessionProvider.cs26
-rw-r--r--Libs/VNLib.Plugins.Essentials.Sessions.Runtime/ISessionIdFactory.cs17
-rw-r--r--Libs/VNLib.Plugins.Essentials.Sessions.Runtime/VNLib.Plugins.Essentials.Sessions.Runtime.csproj40
-rw-r--r--Libs/VNLib.Plugins.Essentials.Sessions.Runtime/VnCacheClient.cs162
4 files changed, 245 insertions, 0 deletions
diff --git a/Libs/VNLib.Plugins.Essentials.Sessions.Runtime/IRuntimeSessionProvider.cs b/Libs/VNLib.Plugins.Essentials.Sessions.Runtime/IRuntimeSessionProvider.cs
new file mode 100644
index 0000000..9941e6a
--- /dev/null
+++ b/Libs/VNLib.Plugins.Essentials.Sessions.Runtime/IRuntimeSessionProvider.cs
@@ -0,0 +1,26 @@
+
+using VNLib.Net.Http;
+using VNLib.Utils.Logging;
+
+namespace VNLib.Plugins.Essentials.Sessions
+{
+ /// <summary>
+ /// Represents a dynamically loadable type that an provide sessions to http connections
+ /// </summary>
+ public interface IRuntimeSessionProvider : ISessionProvider
+ {
+ /// <summary>
+ /// Called immediatly after the plugin is loaded into the appdomain
+ /// </summary>
+ /// <param name="plugin">The plugin instance that is loading the module</param>
+ /// <param name="localizedLog">The localized log provider for the provider</param>
+ void Load(PluginBase plugin, ILogProvider localizedLog);
+
+ /// <summary>
+ /// Determines if the provider can return a session for the connection
+ /// </summary>
+ /// <param name="entity">The entity to process</param>
+ /// <returns>A value indicating if this provider should be called to load a session for</returns>
+ bool CanProcess(IHttpEvent entity);
+ }
+}
diff --git a/Libs/VNLib.Plugins.Essentials.Sessions.Runtime/ISessionIdFactory.cs b/Libs/VNLib.Plugins.Essentials.Sessions.Runtime/ISessionIdFactory.cs
new file mode 100644
index 0000000..1e88e5c
--- /dev/null
+++ b/Libs/VNLib.Plugins.Essentials.Sessions.Runtime/ISessionIdFactory.cs
@@ -0,0 +1,17 @@
+using System.Diagnostics.CodeAnalysis;
+
+using VNLib.Net.Http;
+
+namespace VNLib.Plugins.Essentials.Sessions
+{
+ public interface ISessionIdFactory
+ {
+ /// <summary>
+ /// Attempts to recover a session-id from the connection
+ /// </summary>
+ /// <param name="entity">The connection to process</param>
+ /// <param name="sessionId"></param>
+ /// <returns></returns>
+ bool TryGetSessionId(IHttpEvent entity, [NotNullWhen(true)] out string? sessionId);
+ }
+}
diff --git a/Libs/VNLib.Plugins.Essentials.Sessions.Runtime/VNLib.Plugins.Essentials.Sessions.Runtime.csproj b/Libs/VNLib.Plugins.Essentials.Sessions.Runtime/VNLib.Plugins.Essentials.Sessions.Runtime.csproj
new file mode 100644
index 0000000..5924b93
--- /dev/null
+++ b/Libs/VNLib.Plugins.Essentials.Sessions.Runtime/VNLib.Plugins.Essentials.Sessions.Runtime.csproj
@@ -0,0 +1,40 @@
+<Project Sdk="Microsoft.NET.Sdk">
+
+ <PropertyGroup>
+ <TargetFramework>net6.0</TargetFramework>
+ <ImplicitUsings>enable</ImplicitUsings>
+ <Nullable>enable</Nullable>
+ <Platforms>AnyCPU;x64</Platforms>
+ <Authors>Vaughn Nugent</Authors>
+ <Copyright>Copyright © 2022 Vaughn Nugent</Copyright>
+ <Version>1.0.0.1</Version>
+ <PlatformTarget>x64</PlatformTarget>
+ <SignAssembly>False</SignAssembly>
+ <PackageProjectUrl>www.vaughnnugent.com/resources</PackageProjectUrl>
+ </PropertyGroup>
+
+ <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'">
+ <Deterministic>True</Deterministic>
+ </PropertyGroup>
+
+ <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">
+ <Deterministic>True</Deterministic>
+ </PropertyGroup>
+
+ <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|AnyCPU'">
+ <Deterministic>False</Deterministic>
+ </PropertyGroup>
+
+ <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|x64'">
+ <Deterministic>False</Deterministic>
+ </PropertyGroup>
+
+ <ItemGroup>
+ <ProjectReference Include="..\..\..\..\VNLib\Essentials\VNLib.Plugins.Essentials.csproj" />
+ <ProjectReference Include="..\..\..\..\VNLib\Http\VNLib.Net.Http.csproj" />
+ <ProjectReference Include="..\..\..\DataCaching\VNLib.Data.Caching.Extensions\VNLib.Data.Caching.Extensions.csproj" />
+ <ProjectReference Include="..\..\..\Extensions\VNLib.Plugins.Extensions.Loading\VNLib.Plugins.Extensions.Loading.csproj" />
+ <ProjectReference Include="..\..\..\PluginBase\VNLib.Plugins.PluginBase.csproj" />
+ </ItemGroup>
+
+</Project>
diff --git a/Libs/VNLib.Plugins.Essentials.Sessions.Runtime/VnCacheClient.cs b/Libs/VNLib.Plugins.Essentials.Sessions.Runtime/VnCacheClient.cs
new file mode 100644
index 0000000..a6979ef
--- /dev/null
+++ b/Libs/VNLib.Plugins.Essentials.Sessions.Runtime/VnCacheClient.cs
@@ -0,0 +1,162 @@
+using System;
+using System.Text.Json;
+using System.Net.Sockets;
+using System.Net.WebSockets;
+using System.Security.Cryptography;
+
+using VNLib.Utils;
+using VNLib.Utils.Memory;
+using VNLib.Utils.Logging;
+using VNLib.Utils.Extensions;
+using VNLib.Data.Caching.Extensions;
+using VNLib.Net.Messaging.FBM.Client;
+using VNLib.Plugins.Extensions.Loading;
+
+namespace VNLib.Plugins.Essentials.Sessions
+{
+ /// <summary>
+ /// A wrapper to simplify a cache client object
+ /// </summary>
+ public sealed class VnCacheClient : OpenResourceHandle<FBMClient?>
+ {
+ FBMClient? _client;
+
+ /// <summary>
+ /// The wrapped client
+ /// </summary>
+ public override FBMClient? Resource => _client;
+
+
+ private TimeSpan RetryInterval;
+
+ private readonly ILogProvider? DebugLog;
+ private readonly IUnmangedHeap? ClientHeap;
+
+ /// <summary>
+ /// Initializes an emtpy client wrapper that still requires
+ /// configuration loading
+ /// </summary>
+ /// <param name="debugLog">An optional debugging log</param>
+ /// <param name="heap">An optional <see cref="IUnmangedHeap"/> for <see cref="FBMClient"/> buffers</param>
+ public VnCacheClient(ILogProvider? debugLog, IUnmangedHeap? heap = null)
+ {
+ DebugLog = debugLog;
+ //Default to 10 seconds
+ RetryInterval = TimeSpan.FromSeconds(10);
+
+ ClientHeap = heap;
+ }
+
+ protected override void Free()
+ {
+ _client?.Dispose();
+ _client = null;
+ }
+
+ /// <summary>
+ /// Loads required configuration variables from the config store and
+ /// intializes the interal client
+ /// </summary>
+ /// <param name="config">A dictionary of configuration varables</param>
+ /// <exception cref="KeyNotFoundException"></exception>
+ public async Task LoadConfigAsync(PluginBase pbase, IReadOnlyDictionary<string, JsonElement> config)
+ {
+ int maxMessageSize = config["max_message_size"].GetInt32();
+ string? brokerAddress = config["broker_address"].GetString() ?? throw new KeyNotFoundException("Missing required configuration variable broker_address");
+
+ //Get keys async
+ Task<string?> clientPrivTask = pbase.TryGetSecretAsync("client_private_key");
+ Task<string?> brokerPubTask = pbase.TryGetSecretAsync("broker_public_key");
+
+ //Wait for all tasks to complete
+ string?[] keys = await Task.WhenAll(clientPrivTask, brokerPubTask);
+
+ byte[] privKey = Convert.FromBase64String(keys[0] ?? throw new KeyNotFoundException("Missing required secret client_private_key"));
+ byte[] brokerPub = Convert.FromBase64String(keys[1] ?? throw new KeyNotFoundException("Missing required secret broker_public_key"));
+
+ RetryInterval = config["retry_interval_sec"].GetTimeSpan(TimeParseType.Seconds);
+
+ Uri brokerUri = new(brokerAddress);
+
+ //Init the client with default settings
+ FBMClientConfig conf = FBMDataCacheExtensions.GetDefaultConfig(ClientHeap ?? Memory.Shared, maxMessageSize, DebugLog);
+
+ _client = new(conf);
+ //Add the configuration
+ _client.UseBroker(brokerUri)
+ .ImportBrokerPublicKey(brokerPub)
+ .ImportClientPrivateKey(privKey)
+ .UseTls(brokerUri.Scheme == Uri.UriSchemeHttps);
+
+ //Zero the key memory
+ Memory.InitializeBlock(privKey.AsSpan());
+ Memory.InitializeBlock(brokerPub.AsSpan());
+ }
+
+ /// <summary>
+ /// Discovers nodes in the configured cluster and connects to a random node
+ /// </summary>
+ /// <param name="Log">A <see cref="ILogProvider"/> to write log events to</param>
+ /// <param name="cancellationToken">A token to cancel the operation</param>
+ /// <returns>A task that completes when the operation has been cancelled or an unrecoverable error occured</returns>
+ /// <exception cref="InvalidOperationException"></exception>
+ /// <exception cref="OperationCanceledException"></exception>
+ public async Task RunAsync(ILogProvider Log, CancellationToken cancellationToken)
+ {
+ _ = Resource ?? throw new InvalidOperationException("Client configuration not loaded, cannot connect to cache servers");
+
+ while (true)
+ {
+ //Load the server list
+ ActiveServer[]? servers;
+ while (true)
+ {
+ try
+ {
+ Log.Debug("Discovering cluster nodes in broker");
+ //Get server list
+ servers = await Resource.DiscoverNodesAsync(cancellationToken);
+ break;
+ }
+ catch (HttpRequestException re) when (re.InnerException is SocketException)
+ {
+ Log.Warn("Broker server is unreachable");
+ }
+ catch (Exception ex)
+ {
+ Log.Warn("Failed to get server list from broker, reason {r}", ex.Message);
+ }
+ //Gen random ms delay
+ int randomMsDelay = RandomNumberGenerator.GetInt32(1000, 2000);
+ await Task.Delay(randomMsDelay, cancellationToken);
+ }
+ if (servers?.Length == 0)
+ {
+ Log.Warn("No cluster nodes found, retrying");
+ await Task.Delay(RetryInterval, cancellationToken);
+ continue;
+ }
+ //select random server from the list of servers
+ ActiveServer selected = servers!.SelectRandom();
+ try
+ {
+ Log.Debug("Connecting to server {server}", selected.ServerId);
+ //Try to connect to server
+ await Resource.ConnectAndWaitForExitAsync(selected, cancellationToken);
+ Log.Debug("Cache server disconnected");
+ }
+ catch (WebSocketException wse)
+ {
+ Log.Warn("Failed to connect to cache server {reason}", wse.Message);
+ continue;
+ }
+ catch (HttpRequestException he) when (he.InnerException is SocketException)
+ {
+ Log.Debug("Failed to connect to recommended server {server}", selected.ServerId);
+ //Continue next loop
+ continue;
+ }
+ }
+ }
+ }
+}