diff options
author | vman <public@vaughnnugent.com> | 2022-10-30 02:28:12 -0400 |
---|---|---|
committer | vman <public@vaughnnugent.com> | 2022-10-30 02:28:12 -0400 |
commit | a8510fb835dcc5e1142d700164ce5a4bd44e1a25 (patch) | |
tree | 28caab320f777a384cb6883b68dd999cdc8c0a3f /Libs/VNLib.Plugins.Essentials.Sessions.Runtime |
Add project files.
Diffstat (limited to 'Libs/VNLib.Plugins.Essentials.Sessions.Runtime')
4 files changed, 245 insertions, 0 deletions
diff --git a/Libs/VNLib.Plugins.Essentials.Sessions.Runtime/IRuntimeSessionProvider.cs b/Libs/VNLib.Plugins.Essentials.Sessions.Runtime/IRuntimeSessionProvider.cs new file mode 100644 index 0000000..9941e6a --- /dev/null +++ b/Libs/VNLib.Plugins.Essentials.Sessions.Runtime/IRuntimeSessionProvider.cs @@ -0,0 +1,26 @@ + +using VNLib.Net.Http; +using VNLib.Utils.Logging; + +namespace VNLib.Plugins.Essentials.Sessions +{ + /// <summary> + /// Represents a dynamically loadable type that an provide sessions to http connections + /// </summary> + public interface IRuntimeSessionProvider : ISessionProvider + { + /// <summary> + /// Called immediatly after the plugin is loaded into the appdomain + /// </summary> + /// <param name="plugin">The plugin instance that is loading the module</param> + /// <param name="localizedLog">The localized log provider for the provider</param> + void Load(PluginBase plugin, ILogProvider localizedLog); + + /// <summary> + /// Determines if the provider can return a session for the connection + /// </summary> + /// <param name="entity">The entity to process</param> + /// <returns>A value indicating if this provider should be called to load a session for</returns> + bool CanProcess(IHttpEvent entity); + } +} diff --git a/Libs/VNLib.Plugins.Essentials.Sessions.Runtime/ISessionIdFactory.cs b/Libs/VNLib.Plugins.Essentials.Sessions.Runtime/ISessionIdFactory.cs new file mode 100644 index 0000000..1e88e5c --- /dev/null +++ b/Libs/VNLib.Plugins.Essentials.Sessions.Runtime/ISessionIdFactory.cs @@ -0,0 +1,17 @@ +using System.Diagnostics.CodeAnalysis; + +using VNLib.Net.Http; + +namespace VNLib.Plugins.Essentials.Sessions +{ + public interface ISessionIdFactory + { + /// <summary> + /// Attempts to recover a session-id from the connection + /// </summary> + /// <param name="entity">The connection to process</param> + /// <param name="sessionId"></param> + /// <returns></returns> + bool TryGetSessionId(IHttpEvent entity, [NotNullWhen(true)] out string? sessionId); + } +} diff --git a/Libs/VNLib.Plugins.Essentials.Sessions.Runtime/VNLib.Plugins.Essentials.Sessions.Runtime.csproj b/Libs/VNLib.Plugins.Essentials.Sessions.Runtime/VNLib.Plugins.Essentials.Sessions.Runtime.csproj new file mode 100644 index 0000000..5924b93 --- /dev/null +++ b/Libs/VNLib.Plugins.Essentials.Sessions.Runtime/VNLib.Plugins.Essentials.Sessions.Runtime.csproj @@ -0,0 +1,40 @@ +<Project Sdk="Microsoft.NET.Sdk"> + + <PropertyGroup> + <TargetFramework>net6.0</TargetFramework> + <ImplicitUsings>enable</ImplicitUsings> + <Nullable>enable</Nullable> + <Platforms>AnyCPU;x64</Platforms> + <Authors>Vaughn Nugent</Authors> + <Copyright>Copyright © 2022 Vaughn Nugent</Copyright> + <Version>1.0.0.1</Version> + <PlatformTarget>x64</PlatformTarget> + <SignAssembly>False</SignAssembly> + <PackageProjectUrl>www.vaughnnugent.com/resources</PackageProjectUrl> + </PropertyGroup> + + <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'"> + <Deterministic>True</Deterministic> + </PropertyGroup> + + <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|x64'"> + <Deterministic>True</Deterministic> + </PropertyGroup> + + <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|AnyCPU'"> + <Deterministic>False</Deterministic> + </PropertyGroup> + + <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|x64'"> + <Deterministic>False</Deterministic> + </PropertyGroup> + + <ItemGroup> + <ProjectReference Include="..\..\..\..\VNLib\Essentials\VNLib.Plugins.Essentials.csproj" /> + <ProjectReference Include="..\..\..\..\VNLib\Http\VNLib.Net.Http.csproj" /> + <ProjectReference Include="..\..\..\DataCaching\VNLib.Data.Caching.Extensions\VNLib.Data.Caching.Extensions.csproj" /> + <ProjectReference Include="..\..\..\Extensions\VNLib.Plugins.Extensions.Loading\VNLib.Plugins.Extensions.Loading.csproj" /> + <ProjectReference Include="..\..\..\PluginBase\VNLib.Plugins.PluginBase.csproj" /> + </ItemGroup> + +</Project> diff --git a/Libs/VNLib.Plugins.Essentials.Sessions.Runtime/VnCacheClient.cs b/Libs/VNLib.Plugins.Essentials.Sessions.Runtime/VnCacheClient.cs new file mode 100644 index 0000000..a6979ef --- /dev/null +++ b/Libs/VNLib.Plugins.Essentials.Sessions.Runtime/VnCacheClient.cs @@ -0,0 +1,162 @@ +using System; +using System.Text.Json; +using System.Net.Sockets; +using System.Net.WebSockets; +using System.Security.Cryptography; + +using VNLib.Utils; +using VNLib.Utils.Memory; +using VNLib.Utils.Logging; +using VNLib.Utils.Extensions; +using VNLib.Data.Caching.Extensions; +using VNLib.Net.Messaging.FBM.Client; +using VNLib.Plugins.Extensions.Loading; + +namespace VNLib.Plugins.Essentials.Sessions +{ + /// <summary> + /// A wrapper to simplify a cache client object + /// </summary> + public sealed class VnCacheClient : OpenResourceHandle<FBMClient?> + { + FBMClient? _client; + + /// <summary> + /// The wrapped client + /// </summary> + public override FBMClient? Resource => _client; + + + private TimeSpan RetryInterval; + + private readonly ILogProvider? DebugLog; + private readonly IUnmangedHeap? ClientHeap; + + /// <summary> + /// Initializes an emtpy client wrapper that still requires + /// configuration loading + /// </summary> + /// <param name="debugLog">An optional debugging log</param> + /// <param name="heap">An optional <see cref="IUnmangedHeap"/> for <see cref="FBMClient"/> buffers</param> + public VnCacheClient(ILogProvider? debugLog, IUnmangedHeap? heap = null) + { + DebugLog = debugLog; + //Default to 10 seconds + RetryInterval = TimeSpan.FromSeconds(10); + + ClientHeap = heap; + } + + protected override void Free() + { + _client?.Dispose(); + _client = null; + } + + /// <summary> + /// Loads required configuration variables from the config store and + /// intializes the interal client + /// </summary> + /// <param name="config">A dictionary of configuration varables</param> + /// <exception cref="KeyNotFoundException"></exception> + public async Task LoadConfigAsync(PluginBase pbase, IReadOnlyDictionary<string, JsonElement> config) + { + int maxMessageSize = config["max_message_size"].GetInt32(); + string? brokerAddress = config["broker_address"].GetString() ?? throw new KeyNotFoundException("Missing required configuration variable broker_address"); + + //Get keys async + Task<string?> clientPrivTask = pbase.TryGetSecretAsync("client_private_key"); + Task<string?> brokerPubTask = pbase.TryGetSecretAsync("broker_public_key"); + + //Wait for all tasks to complete + string?[] keys = await Task.WhenAll(clientPrivTask, brokerPubTask); + + byte[] privKey = Convert.FromBase64String(keys[0] ?? throw new KeyNotFoundException("Missing required secret client_private_key")); + byte[] brokerPub = Convert.FromBase64String(keys[1] ?? throw new KeyNotFoundException("Missing required secret broker_public_key")); + + RetryInterval = config["retry_interval_sec"].GetTimeSpan(TimeParseType.Seconds); + + Uri brokerUri = new(brokerAddress); + + //Init the client with default settings + FBMClientConfig conf = FBMDataCacheExtensions.GetDefaultConfig(ClientHeap ?? Memory.Shared, maxMessageSize, DebugLog); + + _client = new(conf); + //Add the configuration + _client.UseBroker(brokerUri) + .ImportBrokerPublicKey(brokerPub) + .ImportClientPrivateKey(privKey) + .UseTls(brokerUri.Scheme == Uri.UriSchemeHttps); + + //Zero the key memory + Memory.InitializeBlock(privKey.AsSpan()); + Memory.InitializeBlock(brokerPub.AsSpan()); + } + + /// <summary> + /// Discovers nodes in the configured cluster and connects to a random node + /// </summary> + /// <param name="Log">A <see cref="ILogProvider"/> to write log events to</param> + /// <param name="cancellationToken">A token to cancel the operation</param> + /// <returns>A task that completes when the operation has been cancelled or an unrecoverable error occured</returns> + /// <exception cref="InvalidOperationException"></exception> + /// <exception cref="OperationCanceledException"></exception> + public async Task RunAsync(ILogProvider Log, CancellationToken cancellationToken) + { + _ = Resource ?? throw new InvalidOperationException("Client configuration not loaded, cannot connect to cache servers"); + + while (true) + { + //Load the server list + ActiveServer[]? servers; + while (true) + { + try + { + Log.Debug("Discovering cluster nodes in broker"); + //Get server list + servers = await Resource.DiscoverNodesAsync(cancellationToken); + break; + } + catch (HttpRequestException re) when (re.InnerException is SocketException) + { + Log.Warn("Broker server is unreachable"); + } + catch (Exception ex) + { + Log.Warn("Failed to get server list from broker, reason {r}", ex.Message); + } + //Gen random ms delay + int randomMsDelay = RandomNumberGenerator.GetInt32(1000, 2000); + await Task.Delay(randomMsDelay, cancellationToken); + } + if (servers?.Length == 0) + { + Log.Warn("No cluster nodes found, retrying"); + await Task.Delay(RetryInterval, cancellationToken); + continue; + } + //select random server from the list of servers + ActiveServer selected = servers!.SelectRandom(); + try + { + Log.Debug("Connecting to server {server}", selected.ServerId); + //Try to connect to server + await Resource.ConnectAndWaitForExitAsync(selected, cancellationToken); + Log.Debug("Cache server disconnected"); + } + catch (WebSocketException wse) + { + Log.Warn("Failed to connect to cache server {reason}", wse.Message); + continue; + } + catch (HttpRequestException he) when (he.InnerException is SocketException) + { + Log.Debug("Failed to connect to recommended server {server}", selected.ServerId); + //Continue next loop + continue; + } + } + } + } +} |