diff options
27 files changed, 3208 insertions, 0 deletions
diff --git a/VNLib.Data.Caching.Extensions/ActiveServer.cs b/VNLib.Data.Caching.Extensions/ActiveServer.cs new file mode 100644 index 0000000..e4d73c1 --- /dev/null +++ b/VNLib.Data.Caching.Extensions/ActiveServer.cs @@ -0,0 +1,14 @@ +using System.Text.Json.Serialization; + +namespace VNLib.Data.Caching.Extensions +{ + public class ActiveServer + { + [JsonPropertyName("address")] + public string? HostName { get; set; } + [JsonPropertyName("server_id")] + public string? ServerId { get; set; } + [JsonPropertyName("ip_address")] + public string? Ip { get; set; } + } +} diff --git a/VNLib.Data.Caching.Extensions/FBMDataCacheExtensions.cs b/VNLib.Data.Caching.Extensions/FBMDataCacheExtensions.cs new file mode 100644 index 0000000..6533a0b --- /dev/null +++ b/VNLib.Data.Caching.Extensions/FBMDataCacheExtensions.cs @@ -0,0 +1,459 @@ +using System; +using System.Net; +using System.Text; +using System.Security; +using System.Text.Json; +using System.Security.Cryptography; +using System.Runtime.CompilerServices; + +using RestSharp; + +using VNLib.Utils.Memory; +using VNLib.Utils.Logging; +using VNLib.Utils.Extensions; +using VNLib.Hashing; +using VNLib.Hashing.IdentityUtility; +using VNLib.Net.Http; +using VNLib.Net.Rest.Client; +using VNLib.Net.Messaging.FBM.Client; +using VNLib.Net.Messaging.FBM; + +namespace VNLib.Data.Caching.Extensions +{ + /// <summary> + /// Provides extension methods for FBM data caching using + /// cache servers and brokers + /// </summary> + public static class FBMDataCacheExtensions + { + /// <summary> + /// The websocket sub-protocol to use when connecting to cache servers + /// </summary> + public const string CACHE_WS_SUB_PROCOL = "object-cache"; + /// <summary> + /// The default cache message header size + /// </summary> + public const int MAX_FBM_MESSAGE_HEADER_SIZE = 1024; + + private static readonly IReadOnlyDictionary<string, string> BrokerJwtHeader = new Dictionary<string, string>() + { + { "alg", "ES384" }, //Must match alg name + { "typ", "JWT"} + }; + + private static readonly RestClientPool ClientPool = new(2,new RestClientOptions() + { + MaxTimeout = 10 * 1000, + FollowRedirects = false, + Encoding = Encoding.UTF8, + AutomaticDecompression = DecompressionMethods.All, + ThrowOnAnyError = true, + }); + + /// <summary> + /// The default hashing algorithm used to sign an verify connection + /// tokens + /// </summary> + public static readonly HashAlgorithmName CacheJwtAlgorithm = HashAlgorithmName.SHA384; + + //using the es384 algorithm for signing (friendlyname is secp384r1) + /// <summary> + /// The default ECCurve used by the connection library + /// </summary> + public static readonly ECCurve CacheCurve = ECCurve.CreateFromFriendlyName("secp384r1"); + + /// <summary> + /// Gets a <see cref="FBMClientConfig"/> preconfigured object caching + /// protocl + /// </summary> + /// <param name="heap">The client buffer heap</param> + /// <param name="maxMessageSize">The maxium message size (in bytes)</param> + /// <param name="debugLog">An optional debug log</param> + /// <returns>A preconfigured <see cref="FBMClientConfig"/> for object caching</returns> + public static FBMClientConfig GetDefaultConfig(IUnmangedHeap heap, int maxMessageSize, ILogProvider? debugLog = null) + { + return new() + { + BufferHeap = heap, + MaxMessageSize = maxMessageSize * 2, + RecvBufferSize = maxMessageSize, + MessageBufferSize = maxMessageSize, + + MaxHeaderBufferSize = MAX_FBM_MESSAGE_HEADER_SIZE, + SubProtocol = CACHE_WS_SUB_PROCOL, + + HeaderEncoding = Helpers.DefaultEncoding, + + KeepAliveInterval = TimeSpan.FromSeconds(30), + + DebugLog = debugLog + }; + } + + + private class CacheConnectionConfig + { + public ECDsa ClientAlg { get; init; } + public ECDsa BrokerAlg { get; init; } + public string ServerChallenge { get; init; } + public string? NodeId { get; set; } + public Uri? BrokerAddress { get; set; } + public bool useTls { get; set; } + public ActiveServer[]? BrokerServers { get; set; } + + public CacheConnectionConfig() + { + //Init the algorithms + ClientAlg = ECDsa.Create(CacheCurve); + BrokerAlg = ECDsa.Create(CacheCurve); + ServerChallenge = RandomHash.GetRandomBase32(24); + } + + ~CacheConnectionConfig() + { + ClientAlg.Clear(); + BrokerAlg.Clear(); + } + } + + /// <summary> + /// Contacts the cache broker to get a list of active servers to connect to + /// </summary> + /// <param name="brokerAddress">The broker server to connec to</param> + /// <param name="clientPrivKey">The private key used to sign messages sent to the broker</param> + /// <param name="brokerPubKey">The broker public key used to verify broker messages</param> + /// <param name="cancellationToken">A token to cancel the operationS</param> + /// <returns>The list of active servers</returns> + /// <exception cref="SecurityException"></exception> + /// <exception cref="ArgumentNullException"></exception> + public static async Task<ActiveServer[]?> ListServersAsync(Uri brokerAddress, ReadOnlyMemory<byte> clientPrivKey, ReadOnlyMemory<byte> brokerPubKey, CancellationToken cancellationToken = default) + { + using ECDsa client = ECDsa.Create(CacheCurve); + using ECDsa broker = ECDsa.Create(CacheCurve); + + //Import client private key + client.ImportPkcs8PrivateKey(clientPrivKey.Span, out _); + //Broker public key to verify broker messages + broker.ImportSubjectPublicKeyInfo(brokerPubKey.Span, out _); + + return await ListServersAsync(brokerAddress, client, broker, cancellationToken); + } + + /// <summary> + /// Contacts the cache broker to get a list of active servers to connect to + /// </summary> + /// <param name="brokerAddress">The broker server to connec to</param> + /// <param name="clientAlg">The signature algorithm used to sign messages to the broker</param> + /// <param name="brokerAlg">The signature used to verify broker messages</param> + /// <param name="cancellationToken">A token to cancel the operationS</param> + /// <returns>The list of active servers</returns> + /// <exception cref="SecurityException"></exception> + /// <exception cref="ArgumentNullException"></exception> + public static async Task<ActiveServer[]?> ListServersAsync(Uri brokerAddress, ECDsa clientAlg, ECDsa brokerAlg, CancellationToken cancellationToken = default) + { + _ = brokerAddress ?? throw new ArgumentNullException(nameof(brokerAddress)); + _ = clientAlg ?? throw new ArgumentNullException(nameof(clientAlg)); + _ = brokerAlg ?? throw new ArgumentNullException(nameof(brokerAlg)); + + string jwtBody; + //Build request jwt + using (JsonWebToken requestJwt = new()) + { + requestJwt.WriteHeader(BrokerJwtHeader); + requestJwt.InitPayloadClaim() + .AddClaim("iat", DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()) + .CommitClaims(); + //sign the jwt + requestJwt.Sign(clientAlg, in CacheJwtAlgorithm, 512); + //Compile the jwt + jwtBody = requestJwt.Compile(); + } + //New list request + RestRequest listRequest = new(brokerAddress, Method.Post); + //Add the jwt as a string to the request body + listRequest.AddStringBody(jwtBody, DataFormat.None); + listRequest.AddHeader("Content-Type", HttpHelpers.GetContentTypeString(ContentType.Text)); + //Rent client + using ClientContract client = ClientPool.Lease(); + //Exec list request + RestResponse response = await client.Resource.ExecuteAsync(listRequest, cancellationToken); + if (!response.IsSuccessful) + { + throw response.ErrorException!; + } + //Response is jwt + using JsonWebToken responseJwt = JsonWebToken.ParseRaw(response.RawBytes); + //Verify the jwt + if (!responseJwt.Verify(brokerAlg, in CacheJwtAlgorithm)) + { + throw new SecurityException("Failed to verify the broker's challenge, cannot continue"); + } + using JsonDocument doc = responseJwt.GetPayload(); + return doc.RootElement.GetProperty("servers").Deserialize<ActiveServer[]>(); + } + + /// <summary> + /// Configures a connection to the remote cache server at the specified location + /// with proper authentication. + /// </summary> + /// <param name="client"></param> + /// <param name="serverUri">The server's address</param> + /// <param name="signingKey">The pks8 format EC private key uesd to sign the message</param> + /// <param name="challenge">A challenge to send to the server</param> + /// <param name="nodeId">A token used to identify the current server's event queue on the remote server</param> + /// <param name="token">A token to cancel the connection operation</param> + /// <param name="useTls">Enables the secure websocket protocol</param> + /// <returns>A Task that completes when the connection has been established</returns> + /// <exception cref="ArgumentNullException"></exception> + public static Task ConnectAsync(this FBMClient client, string serverUri, ReadOnlyMemory<byte> signingKey, string challenge, string? nodeId, bool useTls, CancellationToken token = default) + { + //Sign the jwt + using ECDsa sigAlg = ECDsa.Create(CacheCurve); + //Import the signing key + sigAlg.ImportPkcs8PrivateKey(signingKey.Span, out _); + //Return without await because the alg is used to sign before this method returns and can be discarded + return ConnectAsync(client, serverUri, sigAlg, challenge, nodeId, useTls, token); + } + + private static Task ConnectAsync(FBMClient client, string serverUri, ECDsa sigAlg, string challenge, string? nodeId, bool useTls, CancellationToken token = default) + { + _ = serverUri ?? throw new ArgumentNullException(nameof(serverUri)); + _ = challenge ?? throw new ArgumentNullException(nameof(challenge)); + //build ws uri + UriBuilder uriBuilder = new(serverUri) + { + Scheme = useTls ? "wss://" : "ws://" + }; + string jwtMessage; + //Init jwt for connecting to server + using (JsonWebToken jwt = new()) + { + jwt.WriteHeader(BrokerJwtHeader); + //Init claim + JwtPayload claim = jwt.InitPayloadClaim(); + claim.AddClaim("challenge", challenge); + if (!string.IsNullOrWhiteSpace(nodeId)) + { + /* + * The unique node id so the other nodes know to load the + * proper event queue for the current server + */ + claim.AddClaim("server_id", nodeId); + } + claim.CommitClaims(); + + //Sign jwt + jwt.Sign(sigAlg, in CacheJwtAlgorithm, 512); + + //Compile to string + jwtMessage = jwt.Compile(); + } + //Set jwt as authorization header + client.ClientSocket.Headers[HttpRequestHeader.Authorization] = jwtMessage; + //Connect async + return client.ConnectAsync(uriBuilder.Uri, token); + } + + /// <summary> + /// Registers the current server as active with the specified broker + /// </summary> + /// <param name="brokerAddress">The address of the broker to register with</param> + /// <param name="signingKey">The private key used to sign the message</param> + /// <param name="serverAddress">The local address of the current server used for discovery</param> + /// <param name="nodeId">The unique id to identify this server (for event queues)</param> + /// <param name="keepAliveToken">A unique security token used by the broker to authenticate itself</param> + /// <returns>A task that resolves when a successful registration is completed, raises exceptions otherwise</returns> + public static async Task ResgisterWithBrokerAsync(Uri brokerAddress, ReadOnlyMemory<byte> signingKey, string serverAddress, string nodeId, string keepAliveToken) + { + _ = brokerAddress ?? throw new ArgumentNullException(nameof(brokerAddress)); + _ = serverAddress ?? throw new ArgumentNullException(nameof(serverAddress)); + _ = keepAliveToken ?? throw new ArgumentNullException(nameof(keepAliveToken)); + _ = nodeId ?? throw new ArgumentNullException(nameof(nodeId)); + + string requestData; + //Create the jwt for signed registration message + using (JsonWebToken jwt = new()) + { + //Shared jwt header + jwt.WriteHeader(BrokerJwtHeader); + //build jwt claim + jwt.InitPayloadClaim() + .AddClaim("address", serverAddress) + .AddClaim("server_id", nodeId) + .AddClaim("token", keepAliveToken) + .CommitClaims(); + + //Sign the jwt + using (ECDsa sigAlg = ECDsa.Create(CacheCurve)) + { + //Import the signing key + sigAlg.ImportPkcs8PrivateKey(signingKey.Span, out _); + + jwt.Sign(sigAlg, in CacheJwtAlgorithm, 512); + } + //Compile and save + requestData = jwt.Compile(); + } + //Create reg request message + RestRequest regRequest = new(brokerAddress); + regRequest.AddStringBody(requestData, DataFormat.None); + regRequest.AddHeader("Content-Type", "text/plain"); + //Rent client + using ClientContract client = ClientPool.Lease(); + //Exec the regitration request + RestResponse response = await client.Resource.ExecutePutAsync(regRequest); + if(!response.IsSuccessful) + { + throw response.ErrorException!; + } + } + + + private static readonly ConditionalWeakTable<FBMClient, CacheConnectionConfig> ClientCacheConfig = new(); + + /// <summary> + /// Imports the client signature algorithim's private key from its pkcs8 binary representation + /// </summary> + /// <param name="client"></param> + /// <param name="pkcs8PrivateKey">Pkcs8 format private key</param> + /// <returns>Chainable fluent object</returns> + /// <exception cref="ArgumentException"></exception> + /// <exception cref="CryptographicException"></exception> + public static FBMClient ImportClientPrivateKey(this FBMClient client, ReadOnlySpan<byte> pkcs8PrivateKey) + { + CacheConnectionConfig conf = ClientCacheConfig.GetOrCreateValue(client); + conf.ClientAlg.ImportPkcs8PrivateKey(pkcs8PrivateKey, out _); + return client; + } + /// <summary> + /// Imports the public key used to verify broker server messages + /// </summary> + /// <param name="client"></param> + /// <param name="spkiPublicKey">The subject-public-key-info formatted broker public key</param> + /// <returns>Chainable fluent object</returns> + /// <exception cref="ArgumentException"></exception> + /// <exception cref="CryptographicException"></exception> + public static FBMClient ImportBrokerPublicKey(this FBMClient client, ReadOnlySpan<byte> spkiPublicKey) + { + CacheConnectionConfig conf = ClientCacheConfig.GetOrCreateValue(client); + conf.BrokerAlg.ImportSubjectPublicKeyInfo(spkiPublicKey, out _); + return client; + } + /// <summary> + /// Specifies if all connections should be using TLS + /// </summary> + /// <param name="client"></param> + /// <param name="useTls">A value that indicates if connections should use TLS</param> + /// <returns>Chainable fluent object</returns> + public static FBMClient UseTls(this FBMClient client, bool useTls) + { + CacheConnectionConfig conf = ClientCacheConfig.GetOrCreateValue(client); + conf.useTls = useTls; + return client; + } + /// <summary> + /// Specifies the broker address to discover cache nodes from + /// </summary> + /// <param name="client"></param> + /// <param name="brokerAddress">The address of the server broker</param> + /// <returns>Chainable fluent object</returns> + /// <exception cref="ArgumentNullException"></exception> + public static FBMClient UseBroker(this FBMClient client, Uri brokerAddress) + { + CacheConnectionConfig conf = ClientCacheConfig.GetOrCreateValue(client); + conf.BrokerAddress = brokerAddress ?? throw new ArgumentNullException(nameof(brokerAddress)); + return client; + } + + /// <summary> + /// Specifies the current server's cluster node id. If this + /// is a server connection attempting to listen for changes on the + /// remote server, this id must be set and unique + /// </summary> + /// <param name="client"></param> + /// <param name="nodeId">The cluster node id of the current server</param> + /// <returns>Chainable fluent object</returns> + /// <exception cref="ArgumentNullException"></exception> + public static FBMClient SetNodeId(this FBMClient client, string nodeId) + { + CacheConnectionConfig conf = ClientCacheConfig.GetOrCreateValue(client); + conf.NodeId = nodeId ?? throw new ArgumentNullException(nameof(nodeId)); + return client; + } + + /// <summary> + /// Discovers cache nodes in the broker configured for the current client. + /// </summary> + /// <param name="client"></param> + /// <param name="token">A token to cancel the discovery</param> + /// <returns>A task the resolves the list of active servers on the broker server</returns> + public static Task<ActiveServer[]?> DiscoverNodesAsync(this FBMClientWorkerBase client, CancellationToken token = default) + { + return client.Client.DiscoverNodesAsync(token); + } + /// <summary> + /// Discovers cache nodes in the broker configured for the current client. + /// </summary> + /// <param name="client"></param> + /// <param name="token">A token to cancel the discovery </param> + /// <returns>A task the resolves the list of active servers on the broker server</returns> + public static async Task<ActiveServer[]?> DiscoverNodesAsync(this FBMClient client, CancellationToken token = default) + { + CacheConnectionConfig conf = ClientCacheConfig.GetOrCreateValue(client); + //List servers async + ActiveServer[]? servers = await ListServersAsync(conf.BrokerAddress!, conf.ClientAlg, conf.BrokerAlg, token); + conf.BrokerServers = servers; + return servers; + } + + /// <summary> + /// Connects the client to a remote cache server + /// </summary> + /// <param name="client"></param> + /// <param name="server">The server to connect to</param> + /// <param name="token">A token to cancel the connection and/or wait operation</param> + /// <returns>A task that resolves when cancelled or when the connection is lost to the server</returns> + /// <exception cref="OperationCanceledException"></exception> + public static Task ConnectAndWaitForExitAsync(this FBMClientWorkerBase client, ActiveServer server, CancellationToken token = default) + { + return client.Client.ConnectAndWaitForExitAsync(server, token); + } + + /// <summary> + /// Connects the client to a remote cache server + /// </summary> + /// <param name="client"></param> + /// <param name="server">The server to connect to</param> + /// <param name="token">A token to cancel the connection and/or wait operation</param> + /// <returns>A task that resolves when cancelled or when the connection is lost to the server</returns> + /// <exception cref="OperationCanceledException"></exception> + public static async Task ConnectAndWaitForExitAsync(this FBMClient client, ActiveServer server, CancellationToken token = default) + { + CacheConnectionConfig conf = ClientCacheConfig.GetOrCreateValue(client); + //Connect to server (no server id because client not replication server) + await ConnectAsync(client, server.HostName!, conf.ClientAlg, conf.ServerChallenge, conf.NodeId, conf.useTls, token); + //Get task for cancellation + Task cancellation = token.WaitHandle.WaitAsync(); + //Task for status handle + Task run = client.ConnectionStatusHandle.WaitAsync(); + //Wait for cancellation or + _ = await Task.WhenAny(cancellation, run); + //Normal try to disconnect the socket + await client.DisconnectAsync(CancellationToken.None); + //Notify if cancelled + token.ThrowIfCancellationRequested(); + } + + /// <summary> + /// Selects a random server from a collection of active servers + /// </summary> + /// <param name="servers"></param> + /// <returns>A server selected at random</returns> + public static ActiveServer SelectRandom(this ICollection<ActiveServer> servers) + { + //select random server + int randServer = RandomNumberGenerator.GetInt32(0, servers.Count); + return servers.ElementAt(randServer); + } + } +} diff --git a/VNLib.Data.Caching.Extensions/VNLib.Data.Caching.Extensions.csproj b/VNLib.Data.Caching.Extensions/VNLib.Data.Caching.Extensions.csproj new file mode 100644 index 0000000..28bf83e --- /dev/null +++ b/VNLib.Data.Caching.Extensions/VNLib.Data.Caching.Extensions.csproj @@ -0,0 +1,53 @@ +<Project Sdk="Microsoft.NET.Sdk"> + + <PropertyGroup> + <TargetFramework>net6.0</TargetFramework> + <ImplicitUsings>enable</ImplicitUsings> + <Nullable>enable</Nullable> + <PlatformTarget>x64</PlatformTarget> + <GenerateDocumentationFile>True</GenerateDocumentationFile> + <Version>1.0.0.1</Version> + <Authors>Vaughn Nugent</Authors> + <Copyright>Copyright © 2022 Vaughn Nugent</Copyright> + <PackageProjectUrl>www.vaughnnugent.com/resources</PackageProjectUrl> + <Platforms>AnyCPU;x64</Platforms> + </PropertyGroup> + + <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'"> + <CheckForOverflowUnderflow>True</CheckForOverflowUnderflow> + </PropertyGroup> + + <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|x64'"> + <CheckForOverflowUnderflow>True</CheckForOverflowUnderflow> + </PropertyGroup> + + <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|AnyCPU'"> + <CheckForOverflowUnderflow>True</CheckForOverflowUnderflow> + <Deterministic>False</Deterministic> + </PropertyGroup> + + <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|x64'"> + <CheckForOverflowUnderflow>True</CheckForOverflowUnderflow> + <Deterministic>False</Deterministic> + </PropertyGroup> + + <ItemGroup> + <PackageReference Include="ErrorProne.NET.CoreAnalyzers" Version="0.1.2"> + <PrivateAssets>all</PrivateAssets> + <IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets> + </PackageReference> + <PackageReference Include="ErrorProne.NET.Structs" Version="0.1.2"> + <PrivateAssets>all</PrivateAssets> + <IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets> + </PackageReference> + <PackageReference Include="RestSharp" Version="108.0.2" /> + </ItemGroup> + + <ItemGroup> + <ProjectReference Include="..\..\..\VNLib\Hashing\VNLib.Hashing.Portable.csproj" /> + <ProjectReference Include="..\..\..\VNLib\Http\VNLib.Net.Http.csproj" /> + <ProjectReference Include="..\..\VNLib.Net.Rest.Client\VNLib.Net.Rest.Client.csproj" /> + <ProjectReference Include="..\VNLib.Net.Messaging.FBM\src\VNLib.Net.Messaging.FBM.csproj" /> + </ItemGroup> + +</Project> diff --git a/VNLib.Data.Caching.Global/Exceptions/CacheNotLoadedException.cs b/VNLib.Data.Caching.Global/Exceptions/CacheNotLoadedException.cs new file mode 100644 index 0000000..c3a3800 --- /dev/null +++ b/VNLib.Data.Caching.Global/Exceptions/CacheNotLoadedException.cs @@ -0,0 +1,14 @@ +namespace VNLib.Data.Caching.Global.Exceptions +{ + public class CacheNotLoadedException : GlobalCacheException + { + public CacheNotLoadedException() + { } + + public CacheNotLoadedException(string? message) : base(message) + { } + + public CacheNotLoadedException(string? message, Exception? innerException) : base(message, innerException) + { } + } +}
\ No newline at end of file diff --git a/VNLib.Data.Caching.Global/Exceptions/GlobalCacheException.cs b/VNLib.Data.Caching.Global/Exceptions/GlobalCacheException.cs new file mode 100644 index 0000000..89305a5 --- /dev/null +++ b/VNLib.Data.Caching.Global/Exceptions/GlobalCacheException.cs @@ -0,0 +1,12 @@ +namespace VNLib.Data.Caching.Global.Exceptions +{ + public class GlobalCacheException : Exception + { + public GlobalCacheException() + { } + public GlobalCacheException(string? message) : base(message) + { } + public GlobalCacheException(string? message, Exception? innerException) : base(message, innerException) + { } + } +}
\ No newline at end of file diff --git a/VNLib.Data.Caching.Global/GlobalDataCache.cs b/VNLib.Data.Caching.Global/GlobalDataCache.cs new file mode 100644 index 0000000..2c60ae2 --- /dev/null +++ b/VNLib.Data.Caching.Global/GlobalDataCache.cs @@ -0,0 +1,145 @@ +using VNLib.Data.Caching.Global.Exceptions; + +namespace VNLib.Data.Caching.Global +{ + /// <summary> + /// A static library for caching data in-process or a remote data + /// cache + /// </summary> + public static class GlobalDataCache + { + + private static IGlobalCacheProvider? CacheProvider; + private static CancellationTokenRegistration _reg; + + private static readonly object CacheLock = new(); + private static readonly Dictionary<string, WeakReference<object>> LocalCache = new(); + + /// <summary> + /// Gets a value that indicates if global cache is available + /// </summary> + public static bool IsAvailable => CacheProvider != null && CacheProvider.IsConnected; + + /// <summary> + /// Sets the backing cache provider for the process-wide global cache + /// </summary> + /// <param name="cacheProvider">The cache provider instance</param> + /// <param name="statusToken">A token that represents the store's validity</param> + public static void SetProvider(IGlobalCacheProvider cacheProvider, CancellationToken statusToken) + { + CacheProvider = cacheProvider ?? throw new ArgumentNullException(nameof(cacheProvider)); + //Remove cache provider when cache provider is no longer valid + _reg = statusToken.Register(Cleanup); + } + + private static void Cleanup() + { + CacheProvider = null; + //Clear local cache + lock (CacheLock) + { + LocalCache.Clear(); + } + _reg.Dispose(); + } + + /// <summary> + /// Asynchronously gets a value from the global cache provider + /// </summary> + /// <typeparam name="T"></typeparam> + /// <param name="key">The key identifying the object to recover from cache</param> + /// <returns>The value if found, or null if it does not exist in the store</returns> + /// <exception cref="GlobalCacheException"></exception> + /// <exception cref="CacheNotLoadedException"></exception> + public static async Task<T?> GetAsync<T>(string key) where T: class + { + //Check local cache first + lock (CacheLock) + { + if (LocalCache.TryGetValue(key, out WeakReference<object>? wr)) + { + //Value is found + if(wr.TryGetTarget(out object? value)) + { + //Value exists and is loaded to local cache + return (T)value; + } + //Value has been collected + else + { + //Remove the key from the table + LocalCache.Remove(key); + } + } + } + //get ref to local cache provider + IGlobalCacheProvider? prov = CacheProvider; + if(prov == null) + { + throw new CacheNotLoadedException("Global cache provider was not found"); + } + //get the value from the store + T? val = await prov.GetAsync<T>(key); + //Only store the value if it was successfully found + if (val != null) + { + //Store in local cache + lock (CacheLock) + { + LocalCache[key] = new WeakReference<object>(val); + } + } + return val; + } + + /// <summary> + /// Asynchronously sets (or updates) a cached value in the global cache + /// </summary> + /// <typeparam name="T"></typeparam> + /// <param name="key">The key identifying the object to recover from cache</param> + /// <param name="value">The value to set at the given key</param> + /// <returns>A task that completes when the update operation has compelted</returns> + /// <exception cref="GlobalCacheException"></exception> + /// <exception cref="CacheNotLoadedException"></exception> + public static async Task SetAsync<T>(string key, T value) where T : class + { + //Local record is stale, allow it to be loaded from cache next call to get + lock (CacheLock) + { + LocalCache.Remove(key); + } + //get ref to local cache provider + IGlobalCacheProvider? prov = CacheProvider; + if (prov == null) + { + throw new CacheNotLoadedException("Global cache provider was not found"); + } + //set the value in the store + await prov.SetAsync<T>(key, value); + } + + /// <summary> + /// Asynchronously deletes an item from cache by its key + /// </summary> + /// <param name="key"></param> + /// <returns>A task that completes when the delete operation has compelted</returns> + /// <exception cref="GlobalCacheException"></exception> + /// <exception cref="CacheNotLoadedException"></exception> + public static async Task DeleteAsync(string key) + { + //Delete from local cache + lock (CacheLock) + { + LocalCache.Remove(key); + } + //get ref to local cache provider + IGlobalCacheProvider? prov = CacheProvider; + if (prov == null) + { + throw new CacheNotLoadedException("Global cache provider was not found"); + } + //Delete value from store + await prov.DeleteAsync(key); + } + } +}
\ No newline at end of file diff --git a/VNLib.Data.Caching.Global/IGlobalCacheProvider.cs b/VNLib.Data.Caching.Global/IGlobalCacheProvider.cs new file mode 100644 index 0000000..12b4b0b --- /dev/null +++ b/VNLib.Data.Caching.Global/IGlobalCacheProvider.cs @@ -0,0 +1,39 @@ + +namespace VNLib.Data.Caching.Global +{ + /// <summary> + /// An interface that a cache provoider must impelement to provide data caching to the + /// <see cref="GlobalDataCache"/> environment + /// </summary> + public interface IGlobalCacheProvider + { + /// <summary> + /// Gets a value that indicates if the cache provider is currently available + /// </summary> + public bool IsConnected { get; } + + /// <summary> + /// Asynchronously gets a value from the backing cache store + /// </summary> + /// <typeparam name="T"></typeparam> + /// <param name="key">The key identifying the object to recover from cache</param> + /// <returns>The value if found, or null if it does not exist in the store</returns> + Task<T?> GetAsync<T>(string key); + + /// <summary> + /// Asynchronously sets (or updates) a cached value in the backing cache store + /// </summary> + /// <typeparam name="T"></typeparam> + /// <param name="key">The key identifying the object to recover from cache</param> + /// <param name="value">The value to set at the given key</param> + /// <returns>A task that completes when the update operation has compelted</returns> + Task SetAsync<T>(string key, T value); + + /// <summary> + /// Asynchronously deletes an item from cache by its key + /// </summary> + /// <param name="key">The key identifying the item to delete</param> + /// <returns>A task that completes when the delete operation has compelted</returns> + Task DeleteAsync(string key); + } +}
\ No newline at end of file diff --git a/VNLib.Data.Caching.Global/VNLib.Data.Caching.Global.csproj b/VNLib.Data.Caching.Global/VNLib.Data.Caching.Global.csproj new file mode 100644 index 0000000..0ec3c36 --- /dev/null +++ b/VNLib.Data.Caching.Global/VNLib.Data.Caching.Global.csproj @@ -0,0 +1,25 @@ +<Project Sdk="Microsoft.NET.Sdk"> + + <PropertyGroup> + <TargetFramework>net6.0</TargetFramework> + <ImplicitUsings>enable</ImplicitUsings> + <Nullable>enable</Nullable> + <Authors>Vaughn Nugent</Authors> + <Copyright>Copyright © 2022 Vaughn Nugent</Copyright> + <Platforms>AnyCPU;ARM32;x64</Platforms> + <PlatformTarget>x64</PlatformTarget> + <GenerateDocumentationFile>True</GenerateDocumentationFile> + </PropertyGroup> + + <ItemGroup> + <PackageReference Include="ErrorProne.NET.CoreAnalyzers" Version="0.1.2"> + <PrivateAssets>all</PrivateAssets> + <IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets> + </PackageReference> + <PackageReference Include="ErrorProne.NET.Structs" Version="0.1.2"> + <PrivateAssets>all</PrivateAssets> + <IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets> + </PackageReference> + </ItemGroup> + +</Project> diff --git a/VNLib.Data.Caching.ObjectCache/ChangeEvent.cs b/VNLib.Data.Caching.ObjectCache/ChangeEvent.cs new file mode 100644 index 0000000..b61b4c2 --- /dev/null +++ b/VNLib.Data.Caching.ObjectCache/ChangeEvent.cs @@ -0,0 +1,19 @@ + +namespace VNLib.Data.Caching.ObjectCache +{ + /// <summary> + /// An event object that is passed when change events occur + /// </summary> + public class ChangeEvent + { + public readonly string CurrentId; + public readonly string? AlternateId; + public readonly bool Deleted; + internal ChangeEvent(string id, string? alternate, bool deleted) + { + CurrentId = id; + AlternateId = alternate; + Deleted = deleted; + } + } +} diff --git a/VNLib.Data.Caching.ObjectCache/ObjectCacheStore.cs b/VNLib.Data.Caching.ObjectCache/ObjectCacheStore.cs new file mode 100644 index 0000000..3fd4999 --- /dev/null +++ b/VNLib.Data.Caching.ObjectCache/ObjectCacheStore.cs @@ -0,0 +1,237 @@ +using System; +using System.IO; +using System.Threading; +using System.Threading.Tasks; + +using VNLib.Utils.IO; +using VNLib.Utils.Async; +using VNLib.Utils.Memory; +using VNLib.Utils.Logging; +using VNLib.Utils.Extensions; +using VNLib.Net.Messaging.FBM.Server; +using static VNLib.Data.Caching.Constants; + +namespace VNLib.Data.Caching.ObjectCache +{ + public delegate ReadOnlySpan<byte> GetBodyDataCallback<T>(T state); + + /// <summary> + /// A <see cref="FBMListener"/> implementation of a <see cref="CacheListener"/> + /// </summary> + public class ObjectCacheStore : CacheListener, IDisposable + { + private readonly SemaphoreSlim StoreLock; + private bool disposedValue; + + ///<inheritdoc/> + protected override ILogProvider Log { get; } + + /// <summary> + /// A queue that stores update and delete events + /// </summary> + public AsyncQueue<ChangeEvent> EventQueue { get; } + + /// <summary> + /// Initialzies a new <see cref="ObjectCacheStore"/> + /// </summary> + /// <param name="dir">The <see cref="DirectoryInfo"/> to store blob files to</param> + /// <param name="cacheMax"></param> + /// <param name="log"></param> + /// <param name="heap"></param> + /// <param name="singleReader">A value that indicates if a single thread is processing events</param> + public ObjectCacheStore(DirectoryInfo dir, int cacheMax, ILogProvider log, IUnmangedHeap heap, bool singleReader) + { + Log = log; + //We can use a single writer and single reader in this context + EventQueue = new(true, singleReader); + InitCache(dir, cacheMax, heap); + InitListener(heap); + StoreLock = new(1,1); + } + + ///<inheritdoc/> + protected override async Task ProcessAsync(FBMContext context, object? userState, CancellationToken cancellationToken) + { + try + { + //Get the action header + string action = context.Method(); + //Optional newid header + string? alternateId = context.NewObjectId(); + + switch (action) + { + case Actions.Get: + { + //Get the object-id header + string objectId = context.ObjectId(); + //Take lock on store + using SemSlimReleaser rel = await StoreLock.GetReleaserAsync(cancellationToken: cancellationToken); + if (Cache!.TryGetValue(objectId, out MemoryHandle<byte>? data)) + { + //Set the status code and write the buffered data to the response buffer + context.CloseResponse(ResponseCodes.Okay); + //Copy data to response buffer + context.Response.WriteBody(data.Span); + } + else + { + context.CloseResponse(ResponseCodes.NotFound); + } + } + break; + case Actions.AddOrUpdate: + { + //Get the object-id header + string objectId = context.ObjectId(); + //Add/update a blob async + await AddOrUpdateBlobAsync(objectId, alternateId, static context => context.Request.BodyData, context); + //Notify update the event bus + await EventQueue.EnqueueAsync(new(objectId, alternateId, false), cancellationToken); + //Set status code + context.CloseResponse(ResponseCodes.Okay); + } + break; + case Actions.Delete: + { + //Get the object-id header + string objectId = context.ObjectId(); + + if (await DeleteItemAsync(objectId)) + { + //Notify deleted + await EventQueue.EnqueueAsync(new(objectId, null, true), cancellationToken); + //Set status header + context.CloseResponse(ResponseCodes.Okay); + } + else + { + //Set status header + context.CloseResponse(ResponseCodes.NotFound); + } + } + break; + // event queue dequeue request + case Actions.Dequeue: + { + //If no event bus is registered, then this is not a legal command + if (userState is not AsyncQueue<ChangeEvent> eventBus) + { + context.CloseResponse(ResponseCodes.NotFound); + break; + } + //Wait for a new message to process + ChangeEvent ev = await eventBus.DequeueAsync(cancellationToken); + if (ev.Deleted) + { + context.CloseResponse("deleted"); + context.Response.WriteHeader(ObjectId, ev.CurrentId); + } + else + { + //Changed + context.CloseResponse("modified"); + context.Response.WriteHeader(ObjectId, ev.CurrentId); + //Set old id if an old id is set + if (ev.CurrentId != null) + { + context.Response.WriteHeader(NewObjectId, ev.AlternateId); + } + } + } + break; + } + } + catch (OperationCanceledException) + { + throw; + } + catch(Exception ex) + { + //Log error and set error status code + Log.Error(ex); + context.CloseResponse(ResponseCodes.Error); + } + } + + /// <summary> + /// Asynchronously deletes a previously stored item + /// </summary> + /// <param name="id">The id of the object to delete</param> + /// <returns>A task that completes when the item has been deleted</returns> + public async Task<bool> DeleteItemAsync(string id) + { + using SemSlimReleaser rel = await StoreLock.GetReleaserAsync(); + return Cache!.Remove(id); + } + + /// <summary> + /// Asynchronously adds or updates an object in the store and optionally update's its id + /// </summary> + /// <param name="objectId">The current (or old) id of the object</param> + /// <param name="alternateId">An optional id to update the blob to</param> + /// <param name="bodyData">A callback that returns the data for the blob</param> + /// <param name="state">The state parameter to pass to the data callback</param> + /// <returns></returns> + public async Task AddOrUpdateBlobAsync<T>(string objectId, string? alternateId, GetBodyDataCallback<T> bodyData, T state) + { + MemoryHandle<byte>? blob; + //See if new/alt session id was specified + if (string.IsNullOrWhiteSpace(alternateId)) + { + //Take lock on store + using SemSlimReleaser rel = await StoreLock.GetReleaserAsync(); + //See if blob exists + if (!Cache!.TryGetValue(objectId, out blob)) + { + //If not, create new blob and add to store + blob = Heap.AllocAndCopy(bodyData(state)); + Cache.Add(objectId, blob); + } + else + { + //Reset the buffer state + blob.WriteAndResize(bodyData(state)); + } + } + //Need to change the id of the record + else + { + //Take lock on store + using SemSlimReleaser rel = await StoreLock.GetReleaserAsync(); + //Try to change the blob key + if (!Cache!.TryChangeKey(objectId, alternateId, out blob)) + { + //Blob not found, create new blob + blob = Heap.AllocAndCopy(bodyData(state)); + Cache.Add(alternateId, blob); + } + else + { + //Reset the buffer state + blob.WriteAndResize(bodyData(state)); + } + } + } + + ///<inheritdoc/> + protected virtual void Dispose(bool disposing) + { + if (!disposedValue) + { + if (disposing) + { + Cache?.Clear(); + } + disposedValue = true; + } + } + ///<inheritdoc/> + public void Dispose() + { + // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method + Dispose(disposing: true); + GC.SuppressFinalize(this); + } + } +} diff --git a/VNLib.Data.Caching.ObjectCache/VNLib.Data.Caching.ObjectCache.csproj b/VNLib.Data.Caching.ObjectCache/VNLib.Data.Caching.ObjectCache.csproj new file mode 100644 index 0000000..389cabd --- /dev/null +++ b/VNLib.Data.Caching.ObjectCache/VNLib.Data.Caching.ObjectCache.csproj @@ -0,0 +1,47 @@ +<Project Sdk="Microsoft.NET.Sdk"> + + <PropertyGroup> + <TargetFramework>net6.0</TargetFramework> + <Platforms>AnyCPU;x64</Platforms> + <Authors>Vaughn Nugent</Authors> + <Copyright>Copyright © 2022 Vaughn Nugent</Copyright> + <Nullable>enable</Nullable> + <GenerateDocumentationFile>True</GenerateDocumentationFile> + <PackageProjectUrl>www.vaughnnugent.com/resources</PackageProjectUrl> + <AssemblyVersion>1.0.0.1</AssemblyVersion> + </PropertyGroup> + + <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|x64'"> + <DocumentationFile></DocumentationFile> + <CheckForOverflowUnderflow>False</CheckForOverflowUnderflow> + </PropertyGroup> + + <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'"> + <CheckForOverflowUnderflow>False</CheckForOverflowUnderflow> + </PropertyGroup> + + <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|AnyCPU'"> + <CheckForOverflowUnderflow>False</CheckForOverflowUnderflow> + </PropertyGroup> + + <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|x64'"> + <CheckForOverflowUnderflow>False</CheckForOverflowUnderflow> + </PropertyGroup> + + <ItemGroup> + <PackageReference Include="ErrorProne.NET.CoreAnalyzers" Version="0.1.2"> + <PrivateAssets>all</PrivateAssets> + <IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets> + </PackageReference> + <PackageReference Include="ErrorProne.NET.Structs" Version="0.1.2"> + <PrivateAssets>all</PrivateAssets> + <IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets> + </PackageReference> + </ItemGroup> + + <ItemGroup> + <ProjectReference Include="..\..\..\VNLib\Utils\src\VNLib.Utils.csproj" /> + <ProjectReference Include="..\VNLib.Data.Caching\src\VNLib.Data.Caching.csproj" /> + </ItemGroup> + +</Project> diff --git a/VNLib.Data.Caching/.gitattributes b/VNLib.Data.Caching/.gitattributes new file mode 100644 index 0000000..1ff0c42 --- /dev/null +++ b/VNLib.Data.Caching/.gitattributes @@ -0,0 +1,63 @@ +############################################################################### +# Set default behavior to automatically normalize line endings. +############################################################################### +* text=auto + +############################################################################### +# Set default behavior for command prompt diff. +# +# This is need for earlier builds of msysgit that does not have it on by +# default for csharp files. +# Note: This is only used by command line +############################################################################### +#*.cs diff=csharp + +############################################################################### +# Set the merge driver for project and solution files +# +# Merging from the command prompt will add diff markers to the files if there +# are conflicts (Merging from VS is not affected by the settings below, in VS +# the diff markers are never inserted). Diff markers may cause the following +# file extensions to fail to load in VS. An alternative would be to treat +# these files as binary and thus will always conflict and require user +# intervention with every merge. To do so, just uncomment the entries below +############################################################################### +#*.sln merge=binary +#*.csproj merge=binary +#*.vbproj merge=binary +#*.vcxproj merge=binary +#*.vcproj merge=binary +#*.dbproj merge=binary +#*.fsproj merge=binary +#*.lsproj merge=binary +#*.wixproj merge=binary +#*.modelproj merge=binary +#*.sqlproj merge=binary +#*.wwaproj merge=binary + +############################################################################### +# behavior for image files +# +# image files are treated as binary by default. +############################################################################### +#*.jpg binary +#*.png binary +#*.gif binary + +############################################################################### +# diff behavior for common document formats +# +# Convert binary document formats to text before diffing them. This feature +# is only available from the command line. Turn it on by uncommenting the +# entries below. +############################################################################### +#*.doc diff=astextplain +#*.DOC diff=astextplain +#*.docx diff=astextplain +#*.DOCX diff=astextplain +#*.dot diff=astextplain +#*.DOT diff=astextplain +#*.pdf diff=astextplain +#*.PDF diff=astextplain +#*.rtf diff=astextplain +#*.RTF diff=astextplain diff --git a/VNLib.Data.Caching/.gitignore b/VNLib.Data.Caching/.gitignore new file mode 100644 index 0000000..63641d6 --- /dev/null +++ b/VNLib.Data.Caching/.gitignore @@ -0,0 +1,476 @@ +# Created by https://www.toptal.com/developers/gitignore/api/c,c++,visualstudio +# Edit at https://www.toptal.com/developers/gitignore?templates=c,c++,visualstudio + +### C ### +# Prerequisites +*.d + +# Object files +*.o +*.ko +*.obj +*.elf + +# Linker output +*.ilk +*.map +*.exp + +# Precompiled Headers +*.gch +*.pch + +# Libraries +*.lib +*.a +*.la +*.lo + +# Shared objects (inc. Windows DLLs) +*.dll +*.so +*.so.* +*.dylib + +# Executables +*.exe +*.out +*.app +*.i*86 +*.x86_64 +*.hex + +# Debug files +*.dSYM/ +*.su +*.idb +*.pdb + +# Kernel Module Compile Results +*.mod* +*.cmd +.tmp_versions/ +modules.order +Module.symvers +Mkfile.old +dkms.conf + +### C++ ### +# Prerequisites + +# Compiled Object files +*.slo + +# Precompiled Headers + +# Compiled Dynamic libraries + +# Fortran module files +*.mod +*.smod + +# Compiled Static libraries +*.lai + +# Executables + +### VisualStudio ### +## Ignore Visual Studio temporary files, build results, and +## files generated by popular Visual Studio add-ons. +## +## Get latest from https://github.com/github/gitignore/blob/main/VisualStudio.gitignore + +# User-specific files +*.rsuser +*.suo +*.user +*.userosscache +*.sln.docstates + +# User-specific files (MonoDevelop/Xamarin Studio) +*.userprefs + +# Mono auto generated files +mono_crash.* + +# Build results +[Dd]ebug/ +[Dd]ebugPublic/ +[Rr]elease/ +[Rr]eleases/ +x64/ +x86/ +[Ww][Ii][Nn]32/ +[Aa][Rr][Mm]/ +[Aa][Rr][Mm]64/ +bld/ +[Bb]in/ +[Oo]bj/ +[Ll]og/ +[Ll]ogs/ + +# Visual Studio 2015/2017 cache/options directory +.vs/ +# Uncomment if you have tasks that create the project's static files in wwwroot +#wwwroot/ + +# Visual Studio 2017 auto generated files +Generated\ Files/ + +# MSTest test Results +[Tt]est[Rr]esult*/ +[Bb]uild[Ll]og.* + +# NUnit +*.VisualState.xml +TestResult.xml +nunit-*.xml + +# Build Results of an ATL Project +[Dd]ebugPS/ +[Rr]eleasePS/ +dlldata.c + +# Benchmark Results +BenchmarkDotNet.Artifacts/ + +# .NET Core +project.lock.json +project.fragment.lock.json +artifacts/ + +# ASP.NET Scaffolding +ScaffoldingReadMe.txt + +# StyleCop +StyleCopReport.xml + +# Files built by Visual Studio +*_i.c +*_p.c +*_h.h +*.meta +*.iobj +*.ipdb +*.pgc +*.pgd +*.rsp +*.sbr +*.tlb +*.tli +*.tlh +*.tmp +*.tmp_proj +*_wpftmp.csproj +*.log +*.tlog +*.vspscc +*.vssscc +.builds +*.pidb +*.svclog +*.scc + +# Chutzpah Test files +_Chutzpah* + +# Visual C++ cache files +ipch/ +*.aps +*.ncb +*.opendb +*.opensdf +*.sdf +*.cachefile +*.VC.db +*.VC.VC.opendb + +# Visual Studio profiler +*.psess +*.vsp +*.vspx +*.sap + +# Visual Studio Trace Files +*.e2e + +# TFS 2012 Local Workspace +$tf/ + +# Guidance Automation Toolkit +*.gpState + +# ReSharper is a .NET coding add-in +_ReSharper*/ +*.[Rr]e[Ss]harper +*.DotSettings.user + +# TeamCity is a build add-in +_TeamCity* + +# DotCover is a Code Coverage Tool +*.dotCover + +# AxoCover is a Code Coverage Tool +.axoCover/* +!.axoCover/settings.json + +# Coverlet is a free, cross platform Code Coverage Tool +coverage*.json +coverage*.xml +coverage*.info + +# Visual Studio code coverage results +*.coverage +*.coveragexml + +# NCrunch +_NCrunch_* +.*crunch*.local.xml +nCrunchTemp_* + +# MightyMoose +*.mm.* +AutoTest.Net/ + +# Web workbench (sass) +.sass-cache/ + +# Installshield output folder +[Ee]xpress/ + +# DocProject is a documentation generator add-in +DocProject/buildhelp/ +DocProject/Help/*.HxT +DocProject/Help/*.HxC +DocProject/Help/*.hhc +DocProject/Help/*.hhk +DocProject/Help/*.hhp +DocProject/Help/Html2 +DocProject/Help/html + +# Click-Once directory +publish/ + +# Publish Web Output +*.[Pp]ublish.xml +*.azurePubxml +# Note: Comment the next line if you want to checkin your web deploy settings, +# but database connection strings (with potential passwords) will be unencrypted +*.pubxml +*.publishproj + +# Microsoft Azure Web App publish settings. Comment the next line if you want to +# checkin your Azure Web App publish settings, but sensitive information contained +# in these scripts will be unencrypted +PublishScripts/ + +# NuGet Packages +*.nupkg +# NuGet Symbol Packages +*.snupkg +# The packages folder can be ignored because of Package Restore +**/[Pp]ackages/* +# except build/, which is used as an MSBuild target. +!**/[Pp]ackages/build/ +# Uncomment if necessary however generally it will be regenerated when needed +#!**/[Pp]ackages/repositories.config +# NuGet v3's project.json files produces more ignorable files +*.nuget.props +*.nuget.targets + +# Microsoft Azure Build Output +csx/ +*.build.csdef + +# Microsoft Azure Emulator +ecf/ +rcf/ + +# Windows Store app package directories and files +AppPackages/ +BundleArtifacts/ +Package.StoreAssociation.xml +_pkginfo.txt +*.appx +*.appxbundle +*.appxupload + +# Visual Studio cache files +# files ending in .cache can be ignored +*.[Cc]ache +# but keep track of directories ending in .cache +!?*.[Cc]ache/ + +# Others +ClientBin/ +~$* +*~ +*.dbmdl +*.dbproj.schemaview +*.jfm +*.pfx +*.publishsettings +orleans.codegen.cs + +# Including strong name files can present a security risk +# (https://github.com/github/gitignore/pull/2483#issue-259490424) +#*.snk + +# Since there are multiple workflows, uncomment next line to ignore bower_components +# (https://github.com/github/gitignore/pull/1529#issuecomment-104372622) +#bower_components/ + +# RIA/Silverlight projects +Generated_Code/ + +# Backup & report files from converting an old project file +# to a newer Visual Studio version. Backup files are not needed, +# because we have git ;-) +_UpgradeReport_Files/ +Backup*/ +UpgradeLog*.XML +UpgradeLog*.htm +ServiceFabricBackup/ +*.rptproj.bak + +# SQL Server files +*.mdf +*.ldf +*.ndf + +# Business Intelligence projects +*.rdl.data +*.bim.layout +*.bim_*.settings +*.rptproj.rsuser +*- [Bb]ackup.rdl +*- [Bb]ackup ([0-9]).rdl +*- [Bb]ackup ([0-9][0-9]).rdl + +# Microsoft Fakes +FakesAssemblies/ + +# GhostDoc plugin setting file +*.GhostDoc.xml + +# Node.js Tools for Visual Studio +.ntvs_analysis.dat +node_modules/ + +# Visual Studio 6 build log +*.plg + +# Visual Studio 6 workspace options file +*.opt + +# Visual Studio 6 auto-generated workspace file (contains which files were open etc.) +*.vbw + +# Visual Studio 6 auto-generated project file (contains which files were open etc.) +*.vbp + +# Visual Studio 6 workspace and project file (working project files containing files to include in project) +*.dsw +*.dsp + +# Visual Studio 6 technical files + +# Visual Studio LightSwitch build output +**/*.HTMLClient/GeneratedArtifacts +**/*.DesktopClient/GeneratedArtifacts +**/*.DesktopClient/ModelManifest.xml +**/*.Server/GeneratedArtifacts +**/*.Server/ModelManifest.xml +_Pvt_Extensions + +# Paket dependency manager +.paket/paket.exe +paket-files/ + +# FAKE - F# Make +.fake/ + +# CodeRush personal settings +.cr/personal + +# Python Tools for Visual Studio (PTVS) +__pycache__/ +*.pyc + +# Cake - Uncomment if you are using it +# tools/** +# !tools/packages.config + +# Tabs Studio +*.tss + +# Telerik's JustMock configuration file +*.jmconfig + +# BizTalk build output +*.btp.cs +*.btm.cs +*.odx.cs +*.xsd.cs + +# OpenCover UI analysis results +OpenCover/ + +# Azure Stream Analytics local run output +ASALocalRun/ + +# MSBuild Binary and Structured Log +*.binlog + +# NVidia Nsight GPU debugger configuration file +*.nvuser + +# MFractors (Xamarin productivity tool) working folder +.mfractor/ + +# Local History for Visual Studio +.localhistory/ + +# Visual Studio History (VSHistory) files +.vshistory/ + +# BeatPulse healthcheck temp database +healthchecksdb + +# Backup folder for Package Reference Convert tool in Visual Studio 2017 +MigrationBackup/ + +# Ionide (cross platform F# VS Code tools) working folder +.ionide/ + +# Fody - auto-generated XML schema +FodyWeavers.xsd + +# VS Code files for those working on multiple tools +.vscode/* +!.vscode/settings.json +!.vscode/tasks.json +!.vscode/launch.json +!.vscode/extensions.json +*.code-workspace + +# Local History for Visual Studio Code +.history/ + +# Windows Installer files from build outputs +*.cab +*.msi +*.msix +*.msm +*.msp + +# JetBrains Rider +*.sln.iml + +### VisualStudio Patch ### +# Additional files built by Visual Studio + +# End of https://www.toptal.com/developers/gitignore/api/c,c++,visualstudio + +*.json
\ No newline at end of file diff --git a/VNLib.Data.Caching/LICENSE.txt b/VNLib.Data.Caching/LICENSE.txt new file mode 100644 index 0000000..d159169 --- /dev/null +++ b/VNLib.Data.Caching/LICENSE.txt @@ -0,0 +1,339 @@ + GNU GENERAL PUBLIC LICENSE + Version 2, June 1991 + + Copyright (C) 1989, 1991 Free Software Foundation, Inc., + 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + Everyone is permitted to copy and distribute verbatim copies + of this license document, but changing it is not allowed. + + Preamble + + The licenses for most software are designed to take away your +freedom to share and change it. By contrast, the GNU General Public +License is intended to guarantee your freedom to share and change free +software--to make sure the software is free for all its users. This +General Public License applies to most of the Free Software +Foundation's software and to any other program whose authors commit to +using it. (Some other Free Software Foundation software is covered by +the GNU Lesser General Public License instead.) You can apply it to +your programs, too. + + When we speak of free software, we are referring to freedom, not +price. Our General Public Licenses are designed to make sure that you +have the freedom to distribute copies of free software (and charge for +this service if you wish), that you receive source code or can get it +if you want it, that you can change the software or use pieces of it +in new free programs; and that you know you can do these things. + + To protect your rights, we need to make restrictions that forbid +anyone to deny you these rights or to ask you to surrender the rights. +These restrictions translate to certain responsibilities for you if you +distribute copies of the software, or if you modify it. + + For example, if you distribute copies of such a program, whether +gratis or for a fee, you must give the recipients all the rights that +you have. You must make sure that they, too, receive or can get the +source code. And you must show them these terms so they know their +rights. + + We protect your rights with two steps: (1) copyright the software, and +(2) offer you this license which gives you legal permission to copy, +distribute and/or modify the software. + + Also, for each author's protection and ours, we want to make certain +that everyone understands that there is no warranty for this free +software. If the software is modified by someone else and passed on, we +want its recipients to know that what they have is not the original, so +that any problems introduced by others will not reflect on the original +authors' reputations. + + Finally, any free program is threatened constantly by software +patents. We wish to avoid the danger that redistributors of a free +program will individually obtain patent licenses, in effect making the +program proprietary. To prevent this, we have made it clear that any +patent must be licensed for everyone's free use or not licensed at all. + + The precise terms and conditions for copying, distribution and +modification follow. + + GNU GENERAL PUBLIC LICENSE + TERMS AND CONDITIONS FOR COPYING, DISTRIBUTION AND MODIFICATION + + 0. This License applies to any program or other work which contains +a notice placed by the copyright holder saying it may be distributed +under the terms of this General Public License. The "Program", below, +refers to any such program or work, and a "work based on the Program" +means either the Program or any derivative work under copyright law: +that is to say, a work containing the Program or a portion of it, +either verbatim or with modifications and/or translated into another +language. (Hereinafter, translation is included without limitation in +the term "modification".) Each licensee is addressed as "you". + +Activities other than copying, distribution and modification are not +covered by this License; they are outside its scope. The act of +running the Program is not restricted, and the output from the Program +is covered only if its contents constitute a work based on the +Program (independent of having been made by running the Program). +Whether that is true depends on what the Program does. + + 1. You may copy and distribute verbatim copies of the Program's +source code as you receive it, in any medium, provided that you +conspicuously and appropriately publish on each copy an appropriate +copyright notice and disclaimer of warranty; keep intact all the +notices that refer to this License and to the absence of any warranty; +and give any other recipients of the Program a copy of this License +along with the Program. + +You may charge a fee for the physical act of transferring a copy, and +you may at your option offer warranty protection in exchange for a fee. + + 2. You may modify your copy or copies of the Program or any portion +of it, thus forming a work based on the Program, and copy and +distribute such modifications or work under the terms of Section 1 +above, provided that you also meet all of these conditions: + + a) You must cause the modified files to carry prominent notices + stating that you changed the files and the date of any change. + + b) You must cause any work that you distribute or publish, that in + whole or in part contains or is derived from the Program or any + part thereof, to be licensed as a whole at no charge to all third + parties under the terms of this License. + + c) If the modified program normally reads commands interactively + when run, you must cause it, when started running for such + interactive use in the most ordinary way, to print or display an + announcement including an appropriate copyright notice and a + notice that there is no warranty (or else, saying that you provide + a warranty) and that users may redistribute the program under + these conditions, and telling the user how to view a copy of this + License. (Exception: if the Program itself is interactive but + does not normally print such an announcement, your work based on + the Program is not required to print an announcement.) + +These requirements apply to the modified work as a whole. If +identifiable sections of that work are not derived from the Program, +and can be reasonably considered independent and separate works in +themselves, then this License, and its terms, do not apply to those +sections when you distribute them as separate works. But when you +distribute the same sections as part of a whole which is a work based +on the Program, the distribution of the whole must be on the terms of +this License, whose permissions for other licensees extend to the +entire whole, and thus to each and every part regardless of who wrote it. + +Thus, it is not the intent of this section to claim rights or contest +your rights to work written entirely by you; rather, the intent is to +exercise the right to control the distribution of derivative or +collective works based on the Program. + +In addition, mere aggregation of another work not based on the Program +with the Program (or with a work based on the Program) on a volume of +a storage or distribution medium does not bring the other work under +the scope of this License. + + 3. You may copy and distribute the Program (or a work based on it, +under Section 2) in object code or executable form under the terms of +Sections 1 and 2 above provided that you also do one of the following: + + a) Accompany it with the complete corresponding machine-readable + source code, which must be distributed under the terms of Sections + 1 and 2 above on a medium customarily used for software interchange; or, + + b) Accompany it with a written offer, valid for at least three + years, to give any third party, for a charge no more than your + cost of physically performing source distribution, a complete + machine-readable copy of the corresponding source code, to be + distributed under the terms of Sections 1 and 2 above on a medium + customarily used for software interchange; or, + + c) Accompany it with the information you received as to the offer + to distribute corresponding source code. (This alternative is + allowed only for noncommercial distribution and only if you + received the program in object code or executable form with such + an offer, in accord with Subsection b above.) + +The source code for a work means the preferred form of the work for +making modifications to it. For an executable work, complete source +code means all the source code for all modules it contains, plus any +associated interface definition files, plus the scripts used to +control compilation and installation of the executable. However, as a +special exception, the source code distributed need not include +anything that is normally distributed (in either source or binary +form) with the major components (compiler, kernel, and so on) of the +operating system on which the executable runs, unless that component +itself accompanies the executable. + +If distribution of executable or object code is made by offering +access to copy from a designated place, then offering equivalent +access to copy the source code from the same place counts as +distribution of the source code, even though third parties are not +compelled to copy the source along with the object code. + + 4. You may not copy, modify, sublicense, or distribute the Program +except as expressly provided under this License. Any attempt +otherwise to copy, modify, sublicense or distribute the Program is +void, and will automatically terminate your rights under this License. +However, parties who have received copies, or rights, from you under +this License will not have their licenses terminated so long as such +parties remain in full compliance. + + 5. You are not required to accept this License, since you have not +signed it. However, nothing else grants you permission to modify or +distribute the Program or its derivative works. These actions are +prohibited by law if you do not accept this License. Therefore, by +modifying or distributing the Program (or any work based on the +Program), you indicate your acceptance of this License to do so, and +all its terms and conditions for copying, distributing or modifying +the Program or works based on it. + + 6. Each time you redistribute the Program (or any work based on the +Program), the recipient automatically receives a license from the +original licensor to copy, distribute or modify the Program subject to +these terms and conditions. You may not impose any further +restrictions on the recipients' exercise of the rights granted herein. +You are not responsible for enforcing compliance by third parties to +this License. + + 7. If, as a consequence of a court judgment or allegation of patent +infringement or for any other reason (not limited to patent issues), +conditions are imposed on you (whether by court order, agreement or +otherwise) that contradict the conditions of this License, they do not +excuse you from the conditions of this License. If you cannot +distribute so as to satisfy simultaneously your obligations under this +License and any other pertinent obligations, then as a consequence you +may not distribute the Program at all. For example, if a patent +license would not permit royalty-free redistribution of the Program by +all those who receive copies directly or indirectly through you, then +the only way you could satisfy both it and this License would be to +refrain entirely from distribution of the Program. + +If any portion of this section is held invalid or unenforceable under +any particular circumstance, the balance of the section is intended to +apply and the section as a whole is intended to apply in other +circumstances. + +It is not the purpose of this section to induce you to infringe any +patents or other property right claims or to contest validity of any +such claims; this section has the sole purpose of protecting the +integrity of the free software distribution system, which is +implemented by public license practices. Many people have made +generous contributions to the wide range of software distributed +through that system in reliance on consistent application of that +system; it is up to the author/donor to decide if he or she is willing +to distribute software through any other system and a licensee cannot +impose that choice. + +This section is intended to make thoroughly clear what is believed to +be a consequence of the rest of this License. + + 8. If the distribution and/or use of the Program is restricted in +certain countries either by patents or by copyrighted interfaces, the +original copyright holder who places the Program under this License +may add an explicit geographical distribution limitation excluding +those countries, so that distribution is permitted only in or among +countries not thus excluded. In such case, this License incorporates +the limitation as if written in the body of this License. + + 9. The Free Software Foundation may publish revised and/or new versions +of the General Public License from time to time. Such new versions will +be similar in spirit to the present version, but may differ in detail to +address new problems or concerns. + +Each version is given a distinguishing version number. If the Program +specifies a version number of this License which applies to it and "any +later version", you have the option of following the terms and conditions +either of that version or of any later version published by the Free +Software Foundation. If the Program does not specify a version number of +this License, you may choose any version ever published by the Free Software +Foundation. + + 10. If you wish to incorporate parts of the Program into other free +programs whose distribution conditions are different, write to the author +to ask for permission. For software which is copyrighted by the Free +Software Foundation, write to the Free Software Foundation; we sometimes +make exceptions for this. Our decision will be guided by the two goals +of preserving the free status of all derivatives of our free software and +of promoting the sharing and reuse of software generally. + + NO WARRANTY + + 11. BECAUSE THE PROGRAM IS LICENSED FREE OF CHARGE, THERE IS NO WARRANTY +FOR THE PROGRAM, TO THE EXTENT PERMITTED BY APPLICABLE LAW. EXCEPT WHEN +OTHERWISE STATED IN WRITING THE COPYRIGHT HOLDERS AND/OR OTHER PARTIES +PROVIDE THE PROGRAM "AS IS" WITHOUT WARRANTY OF ANY KIND, EITHER EXPRESSED +OR IMPLIED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE ENTIRE RISK AS +TO THE QUALITY AND PERFORMANCE OF THE PROGRAM IS WITH YOU. SHOULD THE +PROGRAM PROVE DEFECTIVE, YOU ASSUME THE COST OF ALL NECESSARY SERVICING, +REPAIR OR CORRECTION. + + 12. IN NO EVENT UNLESS REQUIRED BY APPLICABLE LAW OR AGREED TO IN WRITING +WILL ANY COPYRIGHT HOLDER, OR ANY OTHER PARTY WHO MAY MODIFY AND/OR +REDISTRIBUTE THE PROGRAM AS PERMITTED ABOVE, BE LIABLE TO YOU FOR DAMAGES, +INCLUDING ANY GENERAL, SPECIAL, INCIDENTAL OR CONSEQUENTIAL DAMAGES ARISING +OUT OF THE USE OR INABILITY TO USE THE PROGRAM (INCLUDING BUT NOT LIMITED +TO LOSS OF DATA OR DATA BEING RENDERED INACCURATE OR LOSSES SUSTAINED BY +YOU OR THIRD PARTIES OR A FAILURE OF THE PROGRAM TO OPERATE WITH ANY OTHER +PROGRAMS), EVEN IF SUCH HOLDER OR OTHER PARTY HAS BEEN ADVISED OF THE +POSSIBILITY OF SUCH DAMAGES. + + END OF TERMS AND CONDITIONS + + How to Apply These Terms to Your New Programs + + If you develop a new program, and you want it to be of the greatest +possible use to the public, the best way to achieve this is to make it +free software which everyone can redistribute and change under these terms. + + To do so, attach the following notices to the program. It is safest +to attach them to the start of each source file to most effectively +convey the exclusion of warranty; and each file should have at least +the "copyright" line and a pointer to where the full notice is found. + + <one line to give the program's name and a brief idea of what it does.> + Copyright (C) <year> <name of author> + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License along + with this program; if not, write to the Free Software Foundation, Inc., + 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +Also add information on how to contact you by electronic and paper mail. + +If the program is interactive, make it output a short notice like this +when it starts in an interactive mode: + + Gnomovision version 69, Copyright (C) year name of author + Gnomovision comes with ABSOLUTELY NO WARRANTY; for details type `show w'. + This is free software, and you are welcome to redistribute it + under certain conditions; type `show c' for details. + +The hypothetical commands `show w' and `show c' should show the appropriate +parts of the General Public License. Of course, the commands you use may +be called something other than `show w' and `show c'; they could even be +mouse-clicks or menu items--whatever suits your program. + +You should also get your employer (if you work as a programmer) or your +school, if any, to sign a "copyright disclaimer" for the program, if +necessary. Here is a sample; alter the names: + + Yoyodyne, Inc., hereby disclaims all copyright interest in the program + `Gnomovision' (which makes passes at compilers) written by James Hacker. + + <signature of Ty Coon>, 1 April 1989 + Ty Coon, President of Vice + +This General Public License does not permit incorporating your program into +proprietary programs. If your program is a subroutine library, you may +consider it more useful to permit linking proprietary applications with the +library. If this is what you want to do, use the GNU Lesser General +Public License instead of this License. diff --git a/VNLib.Data.Caching/readme.md b/VNLib.Data.Caching/readme.md new file mode 100644 index 0000000..dabb9a5 --- /dev/null +++ b/VNLib.Data.Caching/readme.md @@ -0,0 +1 @@ +# VNLib.Data.Caching
\ No newline at end of file diff --git a/VNLib.Data.Caching/src/BlobCache.cs b/VNLib.Data.Caching/src/BlobCache.cs new file mode 100644 index 0000000..89818be --- /dev/null +++ b/VNLib.Data.Caching/src/BlobCache.cs @@ -0,0 +1,118 @@ +using System; +using System.IO; +using System.Linq; +using System.Collections.Generic; +using System.Diagnostics.CodeAnalysis; + +using VNLib.Utils.IO; +using VNLib.Utils.Logging; +using VNLib.Utils.Memory; +using VNLib.Utils.Memory.Caching; + + +#nullable enable + +namespace VNLib.Data.Caching +{ + /// <summary> + /// A general purpose binary data storage + /// </summary> + public class BlobCache : LRUCache<string, MemoryHandle<byte>> + { + readonly IUnmangedHeap Heap; + readonly DirectoryInfo SwapDir; + readonly ILogProvider Log; + ///<inheritdoc/> + public override bool IsReadOnly { get; } + ///<inheritdoc/> + protected override int MaxCapacity { get; } + + + /// <summary> + /// Initializes a new <see cref="BlobCache"/> store + /// </summary> + /// <param name="swapDir">The <see cref="IsolatedStorageDirectory"/> to swap blob data to when cache</param> + /// <param name="maxCapacity">The maximum number of items to keep in memory</param> + /// <param name="log">A <see cref="ILogProvider"/> to write log data to</param> + /// <param name="heap">A <see cref="IUnmangedHeap"/> to allocate buffers and store <see cref="BlobItem"/> data in memory</param> + public BlobCache(DirectoryInfo swapDir, int maxCapacity, ILogProvider log, IUnmangedHeap heap) + :base(StringComparer.Ordinal) + { + IsReadOnly = false; + MaxCapacity = maxCapacity; + SwapDir = swapDir; + //Update the lookup table size + LookupTable.EnsureCapacity(maxCapacity); + //Set default heap if not specified + Heap = heap; + Log = log; + } + ///<inheritdoc/> + protected override bool CacheMiss(string key, [NotNullWhen(true)] out MemoryHandle<byte>? value) + { + value = null; + return false; + } + ///<inheritdoc/> + protected override void Evicted(KeyValuePair<string, MemoryHandle<byte>> evicted) + { + //Dispose the blob + evicted.Value.Dispose(); + } + /// <summary> + /// If the <see cref="BlobItem"/> is found in the store, changes the key + /// that referrences the blob. + /// </summary> + /// <param name="currentKey">The key that currently referrences the blob in the store</param> + /// <param name="newKey">The new key that will referrence the blob</param> + /// <param name="blob">The <see cref="BlobItem"/> if its found in the store</param> + /// <returns>True if the record was found and the key was changes</returns> + public bool TryChangeKey(string currentKey, string newKey, [NotNullWhen(true)] out MemoryHandle<byte>? blob) + { + if (LookupTable.Remove(currentKey, out LinkedListNode<KeyValuePair<string, MemoryHandle<byte>>>? node)) + { + //Remove the node from the ll + List.Remove(node); + //Update the node kvp + blob = node.Value.Value; + node.Value = new KeyValuePair<string, MemoryHandle<byte>>(newKey, blob); + //Add to end of list + List.AddLast(node); + //Re-add to lookup table with new key + LookupTable.Add(newKey, node); + return true; + } + blob = null; + return false; + } + /// <summary> + /// Removes the <see cref="BlobItem"/> from the store without disposing the blobl + /// </summary> + /// <param name="key">The key that referrences the <see cref="BlobItem"/> in the store</param> + /// <returns>A value indicating if the blob was removed</returns> + public override bool Remove(string key) + { + //Remove the item from the lookup table and if it exists, remove the node from the list + if (LookupTable.Remove(key, out LinkedListNode<KeyValuePair<string, MemoryHandle<byte>>>? node)) + { + //Remove the new from the list + List.Remove(node); + //dispose the buffer + node.Value.Value.Dispose(); + return true; + } + return false; + } + /// <summary> + /// Removes and disposes all blobl elements in cache (or in the backing store) + /// </summary> + public override void Clear() + { + foreach (MemoryHandle<byte> blob in List.Select(kp => kp.Value)) + { + blob.Dispose(); + } + base.Clear(); + } + } +} diff --git a/VNLib.Data.Caching/src/BlobItem.cs b/VNLib.Data.Caching/src/BlobItem.cs new file mode 100644 index 0000000..a5630e9 --- /dev/null +++ b/VNLib.Data.Caching/src/BlobItem.cs @@ -0,0 +1,185 @@ +using System; +using System.IO; +using System.Threading; +using System.Threading.Tasks; + +using VNLib.Utils; +using VNLib.Utils.IO; +using VNLib.Utils.Memory; +using VNLib.Utils.Logging; +using VNLib.Utils.Extensions; + +#nullable enable + +namespace VNLib.Data.Caching +{ + /// <summary> + /// A general purpose binary storage item + /// </summary> + public class BlobItem //: VnDisposeable + { + /* + private static readonly JoinableTaskContext JTX = new(); + private static readonly Semaphore CentralSwapLock = new(Environment.ProcessorCount, Environment.ProcessorCount); + + private readonly VnMemoryStream _loadedData; + private bool _loaded; + + /// <summary> + /// The time the blob was last modified + /// </summary> + public DateTimeOffset LastAccessed { get; private set; } + + + /// <summary> + /// Gets the current size of the file (in bytes) as an atomic operation + /// </summary> + public int FileSize => (int)_loadedData.Length; + /// <summary> + /// The operation synchronization lock + /// </summary> + public AsyncReaderWriterLock OpLock { get; } + /// <summary> + /// Initializes a new <see cref="BlobItem"/> + /// </summary> + /// <param name="heap">The heap to allocate buffers from</param> + internal BlobItem(IUnmangedHeap heap) + { + _loadedData = new(heap); + OpLock = new AsyncReaderWriterLock(JTX); + _loaded = true; + LastAccessed = DateTimeOffset.UtcNow; + } + ///<inheritdoc/> + protected override void Free() + { + _loadedData.Dispose(); + OpLock.Dispose(); + } + + /// <summary> + /// Reads data from the internal buffer and copies it to the specified buffer. + /// Use the <see cref="FileSize"/> property to obtain the size of the internal buffer + /// </summary> + /// <param name="buffer">The buffer to copy data to</param> + /// <returns>When completed, the number of bytes copied to the buffer</returns> + public int Read(Span<byte> buffer) + { + //Make sure the blob has been swapped back into memory + if (!_loaded) + { + throw new InvalidOperationException("The blob was not loaded from the disk"); + } + //Read all data from the buffer and write it to the output buffer + _loadedData.AsSpan().CopyTo(buffer); + //Update last-accessed + LastAccessed = DateTimeOffset.UtcNow; + return (int)_loadedData.Length; + } + /// <summary> + /// Overwrites the internal buffer with the contents of the supplied buffer + /// </summary> + /// <param name="buffer">The buffer containing data to store within the blob</param> + /// <returns>A <see cref="ValueTask"/> that completes when write access has been granted and copied</returns> + /// <exception cref="InvalidOperationException"></exception> + public void Write(ReadOnlySpan<byte> buffer) + { + //Make sure the blob has been swapped back into memory + if (!_loaded) + { + throw new InvalidOperationException("The blob was not loaded from the disk"); + } + //Reset the buffer + _loadedData.SetLength(buffer.Length); + _loadedData.Seek(0, SeekOrigin.Begin); + _loadedData.Write(buffer); + LastAccessed = DateTimeOffset.UtcNow; + } + + /// <summary> + /// Writes the contents of the memory buffer to its designated file on the disk + /// </summary> + /// <param name="heap">The heap to allocate buffers from</param> + /// <param name="swapDir">The <see cref="IsolatedStorageDirectory"/> that stores the file</param> + /// <param name="filename">The name of the file to write data do</param> + /// <param name="log">A log to write errors to</param> + /// <returns>A task that completes when the swap to disk is complete</returns> + internal async Task SwapToDiskAsync(IUnmangedHeap heap, DirectoryInfo swapDir, string filename, ILogProvider log) + { + try + { + //Wait for write lock + await using (AsyncReaderWriterLock.Releaser releaser = await OpLock.WriteLockAsync()) + { + //Enter swap lock + await CentralSwapLock; + try + { + //Open swap file data stream + await using FileStream swapFile = swapDir.OpenFile(filename, FileMode.OpenOrCreate, FileAccess.ReadWrite, bufferSize: 8128); + //reset swap file + swapFile.SetLength(0); + //Seek loaded-data back to 0 before writing + _loadedData.Seek(0, SeekOrigin.Begin); + //Write loaded data to disk + await _loadedData.CopyToAsync(swapFile, 8128, heap); + } + finally + { + CentralSwapLock.Release(); + } + //Release memory held by stream + _loadedData.SetLength(0); + //Clear loaded flag + _loaded = false; + LastAccessed = DateTimeOffset.UtcNow; + } + log.Debug("Blob {name} swapped to disk", filename); + } + catch(Exception ex) + { + log.Error(ex, "Blob swap to disk error"); + } + } + /// <summary> + /// Reads the contents of the blob into a memory buffer from its designated file on disk + /// </summary> + /// <param name="heap">The heap to allocate buffers from</param> + /// <param name="swapDir">The <see cref="IsolatedStorageDirectory"/> that stores the file</param> + /// <param name="filename">The name of the file to write the blob data to</param> + /// <param name="log">A log to write errors to</param> + /// <returns>A task that completes when the swap from disk is complete</returns> + internal async Task SwapFromDiskAsync(IUnmangedHeap heap, DirectoryInfo swapDir, string filename, ILogProvider log) + { + try + { + //Wait for write lock + await using (AsyncReaderWriterLock.Releaser releaser = await OpLock.WriteLockAsync()) + { + //Enter swap lock + await CentralSwapLock; + try + { + //Open swap file data stream + await using FileStream swapFile = swapDir.OpenFile(filename, FileMode.OpenOrCreate, FileAccess.Read, bufferSize:8128); + //Copy from disk to memory + await swapFile.CopyToAsync(_loadedData, 8128, heap); + } + finally + { + CentralSwapLock.Release(); + } + //Set loaded flag + _loaded = true; + LastAccessed = DateTimeOffset.UtcNow; + } + log.Debug("Blob {name} swapped from disk", filename); + } + catch(Exception ex) + { + log.Error(ex, "Blob swap from disk error"); + } + } + */ + } +} diff --git a/VNLib.Data.Caching/src/CacheListener.cs b/VNLib.Data.Caching/src/CacheListener.cs new file mode 100644 index 0000000..18e9684 --- /dev/null +++ b/VNLib.Data.Caching/src/CacheListener.cs @@ -0,0 +1,40 @@ +using System; +using System.IO; + +using VNLib.Utils.Memory; +using VNLib.Net.Messaging.FBM.Server; + +namespace VNLib.Data.Caching +{ + /// <summary> + /// A base implementation of a memory/disk LRU data cache FBM listener + /// </summary> + public abstract class CacheListener : FBMListenerBase + { + /// <summary> + /// The directory swap files will be stored + /// </summary> + public DirectoryInfo? Directory { get; private set; } + /// <summary> + /// The Cache store to access data blobs + /// </summary> + protected BlobCache? Cache { get; private set; } + /// <summary> + /// The <see cref="IUnmangedHeap"/> to allocate buffers from + /// </summary> + protected IUnmangedHeap? Heap { get; private set; } + + /// <summary> + /// Initializes the <see cref="Cache"/> data store + /// </summary> + /// <param name="dir">The directory to swap cache records to</param> + /// <param name="cacheSize">The size of the LRU cache</param> + /// <param name="heap">The heap to allocate buffers from</param> + protected void InitCache(DirectoryInfo dir, int cacheSize, IUnmangedHeap heap) + { + Heap = heap; + Cache = new(dir, cacheSize, Log, Heap); + Directory = dir; + } + } +} diff --git a/VNLib.Data.Caching/src/ClientExtensions.cs b/VNLib.Data.Caching/src/ClientExtensions.cs new file mode 100644 index 0000000..18a1aa9 --- /dev/null +++ b/VNLib.Data.Caching/src/ClientExtensions.cs @@ -0,0 +1,310 @@ +using System; +using System.IO; +using System.Linq; +using System.Buffers; +using System.Text.Json; +using System.Threading; +using System.Threading.Tasks; +using System.Collections.Generic; +using System.Text.Json.Serialization; +using System.Runtime.CompilerServices; + +using VNLib.Utils.Logging; +using VNLib.Net.Messaging.FBM; +using VNLib.Net.Messaging.FBM.Client; +using VNLib.Net.Messaging.FBM.Server; +using VNLib.Data.Caching.Exceptions; + +using static VNLib.Data.Caching.Constants; + +namespace VNLib.Data.Caching +{ + + /// <summary> + /// Provides caching extension methods for <see cref="FBMClient"/> + /// </summary> + public static class ClientExtensions + { + private static readonly JsonSerializerOptions LocalOptions = new() + { + DictionaryKeyPolicy = JsonNamingPolicy.CamelCase, + NumberHandling = JsonNumberHandling.Strict, + ReadCommentHandling = JsonCommentHandling.Disallow, + WriteIndented = false, + DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull, + IgnoreReadOnlyFields = true, + PropertyNameCaseInsensitive = true, + IncludeFields = false, + + //Use small buffers + DefaultBufferSize = 128 + }; + + + private static readonly ConditionalWeakTable<FBMClient, SemaphoreSlim> GetLock = new(); + private static readonly ConditionalWeakTable<FBMClient, SemaphoreSlim> UpdateLock = new(); + + private static SemaphoreSlim GetLockCtor(FBMClient client) => new (50); + + private static SemaphoreSlim UpdateLockCtor(FBMClient client) => new (25); + + /// <summary> + /// Gets an object from the server if it exists + /// </summary> + /// <typeparam name="T"></typeparam> + /// <param name="client"></param> + /// <param name="objectId">The id of the object to get</param> + /// <param name="cancellationToken">A token to cancel the operation</param> + /// <returns>A task that completes to return the results of the response payload</returns> + /// <exception cref="JsonException"></exception> + /// <exception cref="OutOfMemoryException"></exception> + /// <exception cref="InvalidStatusException"></exception> + /// <exception cref="ObjectDisposedException"></exception> + /// <exception cref="InvalidResponseException"></exception> + public static async Task<T?> GetObjectAsync<T>(this FBMClient client, string objectId, CancellationToken cancellationToken = default) + { + client.Config.DebugLog?.Debug("[DEBUG] Getting object {id}", objectId); + SemaphoreSlim getLock = GetLock.GetValue(client, GetLockCtor); + //Wait for entry + await getLock.WaitAsync(cancellationToken); + //Rent a new request + FBMRequest request = client.RentRequest(); + try + { + //Set action as get/create + request.WriteHeader(HeaderCommand.Action, Actions.Get); + //Set session-id header + request.WriteHeader(Constants.ObjectId, objectId); + + //Make request + using FBMResponse response = await client.SendAsync(request, cancellationToken); + + response.ThrowIfNotSet(); + //Get the status code + ReadOnlyMemory<char> status = response.Headers.FirstOrDefault(static a => a.Key == HeaderCommand.Status).Value; + if (status.Span.Equals(ResponseCodes.Okay, StringComparison.Ordinal)) + { + return JsonSerializer.Deserialize<T>(response.ResponseBody, LocalOptions); + } + //Session may not exist on the server yet + if (status.Span.Equals(ResponseCodes.NotFound, StringComparison.Ordinal)) + { + return default; + } + throw new InvalidStatusException("Invalid status code recived for object get request", status.ToString()); + } + finally + { + getLock.Release(); + client.ReturnRequest(request); + } + } + + /// <summary> + /// Updates the state of the object, and optionally updates the ID of the object. The data + /// parameter is serialized, buffered, and streamed to the remote server + /// </summary> + /// <typeparam name="T"></typeparam> + /// <param name="client"></param> + /// <param name="objectId">The id of the object to update or replace</param> + /// <param name="newId">An optional parameter to specify a new ID for the old object</param> + /// <param name="data">The payload data to serialize and set as the data state of the session</param> + /// <param name="cancellationToken">A token to cancel the operation</param> + /// <returns>A task that resolves when the server responds</returns> + /// <exception cref="JsonException"></exception> + /// <exception cref="OutOfMemoryException"></exception> + /// <exception cref="InvalidStatusException"></exception> + /// <exception cref="ObjectDisposedException"></exception> + /// <exception cref="InvalidResponseException"></exception> + /// <exception cref="MessageTooLargeException"></exception> + /// <exception cref="ObjectNotFoundException"></exception> + public static async Task AddOrUpdateObjectAsync<T>(this FBMClient client, string objectId, string? newId, T data, CancellationToken cancellationToken = default) + { + client.Config.DebugLog?.Debug("[DEBUG] Updating object {id}, newid {nid}", objectId, newId); + SemaphoreSlim updateLock = UpdateLock.GetValue(client, UpdateLockCtor); + //Wait for entry + await updateLock.WaitAsync(cancellationToken); + //Rent a new request + FBMRequest request = client.RentRequest(); + try + { + //Set action as get/create + request.WriteHeader(HeaderCommand.Action, Actions.AddOrUpdate); + //Set session-id header + request.WriteHeader(Constants.ObjectId, objectId); + //if new-id set, set the new-id header + if (!string.IsNullOrWhiteSpace(newId)) + { + request.WriteHeader(Constants.NewObjectId, newId); + } + //Get the body writer for the message + IBufferWriter<byte> bodyWriter = request.GetBodyWriter(); + //Write json data to the message + using (Utf8JsonWriter jsonWriter = new(bodyWriter)) + { + JsonSerializer.Serialize(jsonWriter, data, LocalOptions); + } + + //Make request + using FBMResponse response = await client.SendAsync(request, cancellationToken); + + response.ThrowIfNotSet(); + //Get the status code + ReadOnlyMemory<char> status = response.Headers.FirstOrDefault(static a => a.Key == HeaderCommand.Status).Value; + //Check status code + if (status.Span.Equals(ResponseCodes.Okay, StringComparison.OrdinalIgnoreCase)) + { + return; + } + else if(status.Span.Equals(ResponseCodes.NotFound, StringComparison.OrdinalIgnoreCase)) + { + throw new ObjectNotFoundException($"object {objectId} not found on remote server"); + } + //Invalid status + throw new InvalidStatusException("Invalid status code recived for object upsert request", status.ToString()); + } + finally + { + updateLock.Release(); + //Return the request(clears data and reset) + client.ReturnRequest(request); + } + } + + /// <summary> + /// Asynchronously deletes an object in the remote store + /// </summary> + /// <param name="client"></param> + /// <param name="objectId">The id of the object to update or replace</param> + /// <param name="cancellationToken">A token to cancel the operation</param> + /// <returns>A task that resolves when the operation has completed</returns> + /// <exception cref="InvalidStatusException"></exception> + /// <exception cref="ObjectDisposedException"></exception> + /// <exception cref="InvalidResponseException"></exception> + /// <exception cref="ObjectNotFoundException"></exception> + public static async Task DeleteObjectAsync(this FBMClient client, string objectId, CancellationToken cancellationToken = default) + { + client.Config.DebugLog?.Debug("[DEBUG] Deleting object {id}", objectId); + + SemaphoreSlim updateLock = UpdateLock.GetValue(client, UpdateLockCtor); + //Wait for entry + await updateLock.WaitAsync(cancellationToken); + //Rent a new request + FBMRequest request = client.RentRequest(); + try + { + //Set action as delete + request.WriteHeader(HeaderCommand.Action, Actions.Delete); + //Set session-id header + request.WriteHeader(Constants.ObjectId, objectId); + + //Make request + using FBMResponse response = await client.SendAsync(request, cancellationToken); + + response.ThrowIfNotSet(); + //Get the status code + ReadOnlyMemory<char> status = response.Headers.FirstOrDefault(static a => a.Key == HeaderCommand.Status).Value; + if (status.Span.Equals(ResponseCodes.Okay, StringComparison.Ordinal)) + { + return; + } + else if(status.Span.Equals(ResponseCodes.NotFound, StringComparison.OrdinalIgnoreCase)) + { + throw new ObjectNotFoundException($"object {objectId} not found on remote server"); + } + throw new InvalidStatusException("Invalid status code recived for object get request", status.ToString()); + } + finally + { + updateLock.Release(); + client.ReturnRequest(request); + } + } + + /// <summary> + /// Dequeues a change event from the server event queue for the current connection, or waits until a change happens + /// </summary> + /// <param name="client"></param> + /// <param name="cancellationToken">A token to cancel the deuque operation</param> + /// <returns>A <see cref="KeyValuePair{TKey, TValue}"/> that contains the modified object id and optionally its new id</returns> + public static async Task<WaitForChangeResult> WaitForChangeAsync(this FBMClient client, CancellationToken cancellationToken = default) + { + //Rent a new request + FBMRequest request = client.RentRequest(); + try + { + //Set action as event dequeue to dequeue a change event + request.WriteHeader(HeaderCommand.Action, Actions.Dequeue); + + //Make request + using FBMResponse response = await client.SendAsync(request, cancellationToken); + + response.ThrowIfNotSet(); + + return new() + { + Status = response.Headers.FirstOrDefault(static a => a.Key == HeaderCommand.Status).Value.ToString(), + CurrentId = response.Headers.SingleOrDefault(static v => v.Key == Constants.ObjectId).Value.ToString(), + NewId = response.Headers.SingleOrDefault(static v => v.Key == Constants.NewObjectId).Value.ToString() + }; + } + finally + { + client.ReturnRequest(request); + } + } + + /// <summary> + /// Gets the Object-id for the request message, or throws an <see cref="InvalidOperationException"/> if not specified + /// </summary> + /// <param name="context"></param> + /// <returns>The id of the object requested</returns> + /// <exception cref="InvalidOperationException"></exception> + public static string ObjectId(this FBMContext context) + { + return context.Request.Headers.First(static kvp => kvp.Key == Constants.ObjectId).Value.ToString(); + } + /// <summary> + /// Gets the new ID of the object if specified from the request. Null if the request did not specify an id update + /// </summary> + /// <param name="context"></param> + /// <returns>The new ID of the object if speicifed, null otherwise</returns> + public static string? NewObjectId(this FBMContext context) + { + return context.Request.Headers.FirstOrDefault(static kvp => kvp.Key == Constants.NewObjectId).Value.ToString(); + } + /// <summary> + /// Gets the request method for the request + /// </summary> + /// <param name="context"></param> + /// <returns>The request method string</returns> + public static string Method(this FBMContext context) + { + return context.Request.Headers.First(static kvp => kvp.Key == HeaderCommand.Action).Value.ToString(); + } + /// <summary> + /// Closes a response with a status code + /// </summary> + /// <param name="context"></param> + /// <param name="responseCode">The status code to send to the client</param> + public static void CloseResponse(this FBMContext context, string responseCode) + { + context.Response.WriteHeader(HeaderCommand.Status, responseCode); + } + + + /// <summary> + /// Initializes the worker for a reconnect policy and returns an object that can listen for changes + /// and configure the connection as necessary + /// </summary> + /// <param name="worker"></param> + /// <param name="retryDelay">The amount of time to wait between retries</param> + /// <param name="serverUri">The uri to reconnect the client to</param> + /// <returns>A <see cref="ClientRetryManager{T}"/> for listening for retry events</returns> + public static ClientRetryManager<T> SetReconnectPolicy<T>(this T worker, TimeSpan retryDelay, Uri serverUri) where T: IStatefulConnection + { + //Return new manager + return new (worker, retryDelay, serverUri); + } + } +} diff --git a/VNLib.Data.Caching/src/ClientRetryManager.cs b/VNLib.Data.Caching/src/ClientRetryManager.cs new file mode 100644 index 0000000..97d3d3a --- /dev/null +++ b/VNLib.Data.Caching/src/ClientRetryManager.cs @@ -0,0 +1,83 @@ +using System; +using System.Threading.Tasks; +using System.Security.Cryptography; + +using VNLib.Utils; +using VNLib.Net.Messaging.FBM.Client; + +namespace VNLib.Data.Caching +{ + /// <summary> + /// Manages a <see cref="FBMClientWorkerBase"/> reconnect policy + /// </summary> + public class ClientRetryManager<T> : VnDisposeable where T: IStatefulConnection + { + const int RetryRandMaxMsDelay = 1000; + + private readonly TimeSpan RetryDelay; + private readonly T Client; + private readonly Uri ServerUri; + + internal ClientRetryManager(T worker, TimeSpan delay, Uri serverUri) + { + this.Client = worker; + this.RetryDelay = delay; + this.ServerUri = serverUri; + //Register disconnect listener + worker.ConnectionClosed += Worker_Disconnected; + } + + private void Worker_Disconnected(object? sender, EventArgs args) + { + //Exec retry on exit + _ = RetryAsync().ConfigureAwait(false); + } + + + /// <summary> + /// Raised before client is to be reconnected + /// </summary> + public event Action<T>? OnBeforeReconnect; + /// <summary> + /// Raised when the client fails to reconnect. Should return a value that instructs the + /// manager to reconnect + /// </summary> + public event Func<T, Exception, bool>? OnReconnectFailed; + + async Task RetryAsync() + { + + //Begin random delay with retry ms + int randomDelayMs = (int)RetryDelay.TotalMilliseconds; + //random delay to add to prevent retry-storm + randomDelayMs += RandomNumberGenerator.GetInt32(RetryRandMaxMsDelay); + //Retry loop + bool retry = true; + while (retry) + { + try + { + //Inform Listener for the retry + OnBeforeReconnect?.Invoke(Client); + //wait for delay before reconnecting + await Task.Delay(randomDelayMs); + //Reconnect async + await Client.ConnectAsync(ServerUri).ConfigureAwait(false); + break; + } + catch (Exception Ex) + { + //Invoke error handler, may be null, incase exit + retry = OnReconnectFailed?.Invoke(Client, Ex) ?? false; + } + } + } + + ///<inheritdoc/> + protected override void Free() + { + //Unregister the event listener + Client.ConnectionClosed -= Worker_Disconnected; + } + } +} diff --git a/VNLib.Data.Caching/src/Constants.cs b/VNLib.Data.Caching/src/Constants.cs new file mode 100644 index 0000000..b06ec1f --- /dev/null +++ b/VNLib.Data.Caching/src/Constants.cs @@ -0,0 +1,32 @@ +using System; + +using VNLib.Net.Messaging.FBM; + +namespace VNLib.Data.Caching +{ + public static class Constants + { + /// <summary> + /// Contains constants the define actions + /// </summary> + public static class Actions + { + public const string Get= "g"; + public const string AddOrUpdate = "u"; + public const string Delete = "d"; + public const string Dequeue = "dq"; + } + /// <summary> + /// Containts constants for operation response codes + /// </summary> + public static class ResponseCodes + { + public const string Okay = "ok"; + public const string Error = "err"; + public const string NotFound = "nf"; + } + + public const HeaderCommand ObjectId = (HeaderCommand)0xAA; + public const HeaderCommand NewObjectId = (HeaderCommand)0xAB; + } +} diff --git a/VNLib.Data.Caching/src/Exceptions/InvalidStatusException.cs b/VNLib.Data.Caching/src/Exceptions/InvalidStatusException.cs new file mode 100644 index 0000000..f5e35f4 --- /dev/null +++ b/VNLib.Data.Caching/src/Exceptions/InvalidStatusException.cs @@ -0,0 +1,39 @@ +using System; + +using VNLib.Net.Messaging.FBM; + +namespace VNLib.Data.Caching.Exceptions +{ + /// <summary> + /// Raised when the response status code of an FBM Request message is not valid for + /// the specified request + /// </summary> + public class InvalidStatusException : InvalidResponseException + { + private readonly string? StatusCode; + /// <summary> + /// Initalizes a new <see cref="InvalidStatusException"/> with the specfied status code + /// </summary> + /// <param name="message"></param> + /// <param name="statusCode"></param> + public InvalidStatusException(string message, string statusCode):this(message) + { + this.StatusCode = statusCode; + } + + ///<inheritdoc/> + public InvalidStatusException() + { + } + ///<inheritdoc/> + public InvalidStatusException(string message) : base(message) + { + } + ///<inheritdoc/> + public InvalidStatusException(string message, Exception innerException) : base(message, innerException) + { + } + ///<inheritdoc/> + public override string Message => $"InvalidStatusException: Status Code {StatusCode} \r\n {base.Message}"; + } +} diff --git a/VNLib.Data.Caching/src/Exceptions/MessageTooLargeException.cs b/VNLib.Data.Caching/src/Exceptions/MessageTooLargeException.cs new file mode 100644 index 0000000..e8f19c5 --- /dev/null +++ b/VNLib.Data.Caching/src/Exceptions/MessageTooLargeException.cs @@ -0,0 +1,26 @@ +using System; +using System.Runtime.Serialization; + +using VNLib.Net.Messaging.FBM; + +namespace VNLib.Data.Caching.Exceptions +{ + /// <summary> + /// Raised when a request (or server response) calculates the size of the message to be too large to proccess + /// </summary> + public class MessageTooLargeException : FBMException + { + ///<inheritdoc/> + public MessageTooLargeException() + {} + ///<inheritdoc/> + public MessageTooLargeException(string message) : base(message) + {} + ///<inheritdoc/> + public MessageTooLargeException(string message, Exception innerException) : base(message, innerException) + {} + ///<inheritdoc/> + protected MessageTooLargeException(SerializationInfo info, StreamingContext context) : base(info, context) + {} + } +} diff --git a/VNLib.Data.Caching/src/Exceptions/ObjectNotFoundException.cs b/VNLib.Data.Caching/src/Exceptions/ObjectNotFoundException.cs new file mode 100644 index 0000000..fb284f3 --- /dev/null +++ b/VNLib.Data.Caching/src/Exceptions/ObjectNotFoundException.cs @@ -0,0 +1,23 @@ +using System; + +namespace VNLib.Data.Caching.Exceptions +{ + /// <summary> + /// Raised when a command was executed on a desired object in the remote cache + /// but the object was not found + /// </summary> + public class ObjectNotFoundException : InvalidStatusException + { + internal ObjectNotFoundException() + {} + + internal ObjectNotFoundException(string message) : base(message) + {} + + internal ObjectNotFoundException(string message, string statusCode) : base(message, statusCode) + {} + + internal ObjectNotFoundException(string message, Exception innerException) : base(message, innerException) + {} + } +} diff --git a/VNLib.Data.Caching/src/VNLib.Data.Caching.csproj b/VNLib.Data.Caching/src/VNLib.Data.Caching.csproj new file mode 100644 index 0000000..b12da09 --- /dev/null +++ b/VNLib.Data.Caching/src/VNLib.Data.Caching.csproj @@ -0,0 +1,34 @@ +<Project Sdk="Microsoft.NET.Sdk"> + + <PropertyGroup> + <TargetFramework>net6.0</TargetFramework> + <Platforms>AnyCPU;x64</Platforms> + <Authors>Vaughn Nugent</Authors> + <Copyright>Copyright © 2022 Vaughn Nugent</Copyright> + <Version>1.0.0.1</Version> + <GenerateDocumentationFile>True</GenerateDocumentationFile> + <PlatformTarget>x64</PlatformTarget> + <Nullable>enable</Nullable> + </PropertyGroup> + + <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|x64'"> + <DocumentationFile></DocumentationFile> + </PropertyGroup> + + <ItemGroup> + <PackageReference Include="ErrorProne.NET.CoreAnalyzers" Version="0.1.2"> + <PrivateAssets>all</PrivateAssets> + <IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets> + </PackageReference> + <PackageReference Include="ErrorProne.NET.Structs" Version="0.1.2"> + <PrivateAssets>all</PrivateAssets> + <IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets> + </PackageReference> + </ItemGroup> + + <ItemGroup> + <ProjectReference Include="..\..\..\..\VNLib\Utils\src\VNLib.Utils.csproj" /> + <ProjectReference Include="..\..\VNLib.Net.Messaging.FBM\src\VNLib.Net.Messaging.FBM.csproj" /> + </ItemGroup> + +</Project> diff --git a/VNLib.Data.Caching/src/VNLib.Data.Caching.xml b/VNLib.Data.Caching/src/VNLib.Data.Caching.xml new file mode 100644 index 0000000..f1ec423 --- /dev/null +++ b/VNLib.Data.Caching/src/VNLib.Data.Caching.xml @@ -0,0 +1,354 @@ +<?xml version="1.0"?> +<doc> + <assembly> + <name>VNLib.Data.Caching</name> + </assembly> + <members> + <member name="T:VNLib.Data.Caching.BlobCache"> + <summary> + A general purpose binary data storage + </summary> + </member> + <member name="P:VNLib.Data.Caching.BlobCache.IsReadOnly"> + <inheritdoc/> + </member> + <member name="P:VNLib.Data.Caching.BlobCache.MaxCapacity"> + <inheritdoc/> + </member> + <member name="M:VNLib.Data.Caching.BlobCache.#ctor(System.IO.DirectoryInfo,System.Int32,VNLib.Utils.Logging.ILogProvider,VNLib.Utils.Memory.PrivateHeap)"> + <summary> + Initializes a new <see cref="T:VNLib.Data.Caching.BlobCache"/> store + </summary> + <param name="swapDir">The <see cref="T:VNLib.Utils.IO.IsolatedStorageDirectory"/> to swap blob data to when cache</param> + <param name="maxCapacity">The maximum number of items to keep in memory</param> + <param name="log">A <see cref="T:VNLib.Utils.Logging.ILogProvider"/> to write log data to</param> + <param name="heap">A <see cref="T:VNLib.Utils.Memory.PrivateHeap"/> to allocate buffers and store <see cref="T:VNLib.Data.Caching.BlobItem"/> data in memory</param> + </member> + <member name="M:VNLib.Data.Caching.BlobCache.SwapAllToDiskAsync"> + <summary> + Swaps all <see cref="T:VNLib.Data.Caching.BlobItem"/>s that are cached in memory + to disk. + </summary> + </member> + <member name="M:VNLib.Data.Caching.BlobCache.CacheMiss(System.String,VNLib.Data.Caching.BlobItem@)"> + <inheritdoc/> + </member> + <member name="M:VNLib.Data.Caching.BlobCache.Evicted(System.Collections.Generic.KeyValuePair{System.String,VNLib.Data.Caching.BlobItem})"> + <inheritdoc/> + </member> + <member name="M:VNLib.Data.Caching.BlobCache.TryChangeKey(System.String,System.String,VNLib.Data.Caching.BlobItem@)"> + <summary> + If the <see cref="T:VNLib.Data.Caching.BlobItem"/> is found in the store, changes the key + that referrences the blob. + </summary> + <param name="currentKey">The key that currently referrences the blob in the store</param> + <param name="newKey">The new key that will referrence the blob</param> + <param name="blob">The <see cref="T:VNLib.Data.Caching.BlobItem"/> if its found in the store</param> + <returns>True if the record was found and the key was changes</returns> + </member> + <member name="M:VNLib.Data.Caching.BlobCache.Remove(System.String)"> + <summary> + Removes the <see cref="T:VNLib.Data.Caching.BlobItem"/> from the store without disposing the blobl + </summary> + <param name="key">The key that referrences the <see cref="T:VNLib.Data.Caching.BlobItem"/> in the store</param> + <returns>A value indicating if the blob was removed</returns> + </member> + <member name="M:VNLib.Data.Caching.BlobCache.Clear"> + <summary> + Removes and disposes all blobl elements in cache (or in the backing store) + </summary> + </member> + <member name="T:VNLib.Data.Caching.BlobItem"> + <summary> + A general purpose binary storage item + </summary> + </member> + <member name="P:VNLib.Data.Caching.BlobItem.LastModified"> + <summary> + The time the blob was last modified + </summary> + </member> + <member name="P:VNLib.Data.Caching.BlobItem.FileSize"> + <summary> + Gets the current size of the file (in bytes) as an atomic operation + </summary> + </member> + <member name="P:VNLib.Data.Caching.BlobItem.OpLock"> + <summary> + The operation synchronization lock + </summary> + </member> + <member name="M:VNLib.Data.Caching.BlobItem.#ctor(VNLib.Utils.Memory.PrivateHeap)"> + <summary> + Initializes a new <see cref="T:VNLib.Data.Caching.BlobItem"/> + </summary> + <param name="heap">The heap to allocate buffers from</param> + </member> + <member name="M:VNLib.Data.Caching.BlobItem.Free"> + <inheritdoc/> + </member> + <member name="M:VNLib.Data.Caching.BlobItem.Read(System.Span{System.Byte})"> + <summary> + Reads data from the internal buffer and copies it to the specified buffer. + Use the <see cref="P:VNLib.Data.Caching.BlobItem.FileSize"/> property to obtain the size of the internal buffer + </summary> + <param name="buffer">The buffer to copy data to</param> + <returns>When completed, the number of bytes copied to the buffer</returns> + </member> + <member name="M:VNLib.Data.Caching.BlobItem.Write(System.ReadOnlySpan{System.Byte})"> + <summary> + Overwrites the internal buffer with the contents of the supplied buffer + </summary> + <param name="buffer">The buffer containing data to store within the blob</param> + <returns>A <see cref="T:System.Threading.Tasks.ValueTask"/> that completes when write access has been granted and copied</returns> + <exception cref="T:System.InvalidOperationException"></exception> + </member> + <member name="M:VNLib.Data.Caching.BlobItem.SwapToDiskAsync(VNLib.Utils.Memory.PrivateHeap,System.IO.DirectoryInfo,System.String,VNLib.Utils.Logging.ILogProvider)"> + <summary> + Writes the contents of the memory buffer to its designated file on the disk + </summary> + <param name="heap">The heap to allocate buffers from</param> + <param name="swapDir">The <see cref="T:VNLib.Utils.IO.IsolatedStorageDirectory"/> that stores the file</param> + <param name="filename">The name of the file to write data do</param> + <param name="log">A log to write errors to</param> + <returns>A task that completes when the swap to disk is complete</returns> + </member> + <member name="M:VNLib.Data.Caching.BlobItem.SwapFromDiskAsync(VNLib.Utils.Memory.PrivateHeap,System.IO.DirectoryInfo,System.String,VNLib.Utils.Logging.ILogProvider)"> + <summary> + Reads the contents of the blob into a memory buffer from its designated file on disk + </summary> + <param name="heap">The heap to allocate buffers from</param> + <param name="swapDir">The <see cref="T:VNLib.Utils.IO.IsolatedStorageDirectory"/> that stores the file</param> + <param name="filename">The name of the file to write the blob data to</param> + <param name="log">A log to write errors to</param> + <returns>A task that completes when the swap from disk is complete</returns> + </member> + <member name="T:VNLib.Data.Caching.CacheListener"> + <summary> + A base implementation of a memory/disk LRU data cache FBM listener + </summary> + </member> + <member name="P:VNLib.Data.Caching.CacheListener.Cache"> + <summary> + The Cache store to access data blobs + </summary> + </member> + <member name="P:VNLib.Data.Caching.CacheListener.Heap"> + <summary> + The <see cref="T:VNLib.Utils.Memory.PrivateHeap"/> to allocate buffers from + </summary> + </member> + <member name="M:VNLib.Data.Caching.CacheListener.InitCache(System.IO.DirectoryInfo,System.Int32,VNLib.Utils.Memory.PrivateHeap)"> + <summary> + Initializes the <see cref="P:VNLib.Data.Caching.CacheListener.Cache"/> data store + </summary> + <param name="dir"></param> + <param name="cacheSize"></param> + <param name="heap"></param> + </member> + <member name="M:VNLib.Data.Caching.CacheListener.SwapToDiskAsync"> + <summary> + Asynchronously swaps all blobs to the disk + </summary> + <returns></returns> + </member> + <member name="M:VNLib.Data.Caching.CacheListener.Dispose(System.Boolean)"> + <inheritdoc/> + </member> + <member name="M:VNLib.Data.Caching.CacheListener.Dispose"> + <inheritdoc/> + </member> + <member name="T:VNLib.Data.Caching.ClientExtensions"> + <summary> + Provides caching extension methods for <see cref="T:VNLib.Net.Messaging.FBM.Client.FBMClient"/> + </summary> + </member> + <member name="M:VNLib.Data.Caching.ClientExtensions.GetObjectAsync``1(VNLib.Net.Messaging.FBM.Client.FBMClient,System.String,System.Text.Json.JsonSerializerOptions,System.Threading.CancellationToken)"> + <summary> + Gets an object from the server if it exists + </summary> + <typeparam name="T"></typeparam> + <param name="client"></param> + <param name="objectId">The id of the object to get</param> + <param name="jso">The <see cref="T:System.Text.Json.JsonSerializerOptions"/> to use for serialization</param> + <param name="cancellationToken">A token to cancel the operation</param> + <returns>A task that completes to return the results of the response payload</returns> + <exception cref="T:System.Text.Json.JsonException"></exception> + <exception cref="T:System.OutOfMemoryException"></exception> + <exception cref="T:VNLib.Data.Caching.Exceptions.InvalidStatusException"></exception> + <exception cref="T:System.ObjectDisposedException"></exception> + <exception cref="T:VNLib.Net.Messaging.FBM.Client.InvalidResponseException"></exception> + </member> + <member name="M:VNLib.Data.Caching.ClientExtensions.AddOrUpdateObjectAsync``1(VNLib.Net.Messaging.FBM.Client.FBMClient,System.String,System.String,``0,System.Text.Json.JsonSerializerOptions,System.Threading.CancellationToken)"> + <summary> + Updates the state of the session, and optionally updates the ID of the session. The data + property is buffered and streamed to the remote server + </summary> + <typeparam name="T"></typeparam> + <param name="client"></param> + <param name="objectId">The id of the object to update or replace</param> + <param name="newId">An optional parameter to specify a new ID for the old object</param> + <param name="data">The payload data to serialize and set as the data state of the session</param> + <param name="jso">Optional <see cref="T:System.Text.Json.JsonSerializerOptions"/></param> + <param name="cancellationToken">A token to cancel the operation</param> + <returns>A task that resolves when the server responds</returns> + <exception cref="T:System.Text.Json.JsonException"></exception> + <exception cref="T:System.OutOfMemoryException"></exception> + <exception cref="T:VNLib.Data.Caching.Exceptions.InvalidStatusException"></exception> + <exception cref="T:System.ObjectDisposedException"></exception> + <exception cref="T:VNLib.Net.Messaging.FBM.Client.InvalidResponseException"></exception> + <exception cref="T:VNLib.Data.Caching.Exceptions.MessageTooLargeException"></exception> + </member> + <member name="M:VNLib.Data.Caching.ClientExtensions.DeleteObjectAsync(VNLib.Net.Messaging.FBM.Client.FBMClient,System.String,System.Threading.CancellationToken)"> + <summary> + Asynchronously deletes an object in the remote store + </summary> + <param name="client"></param> + <param name="objectId">The id of the object to update or replace</param> + <param name="cancellationToken">A token to cancel the operation</param> + <returns>A task that resolves when the operation has completed</returns> + <exception cref="T:VNLib.Data.Caching.Exceptions.InvalidStatusException"></exception> + <exception cref="T:System.ObjectDisposedException"></exception> + <exception cref="T:VNLib.Net.Messaging.FBM.Client.InvalidResponseException"></exception> + </member> + <member name="M:VNLib.Data.Caching.ClientExtensions.ObjectId(VNLib.Net.Messaging.FBM.Server.FBMContext)"> + <summary> + Gets the Object-id for the request message, or throws an <see cref="T:System.InvalidOperationException"/> if not specified + </summary> + <param name="context"></param> + <returns>The id of the object requested</returns> + <exception cref="T:System.InvalidOperationException"></exception> + </member> + <member name="M:VNLib.Data.Caching.ClientExtensions.NewObjectId(VNLib.Net.Messaging.FBM.Server.FBMContext)"> + <summary> + Gets the new ID of the object if specified from the request. Null if the request did not specify an id update + </summary> + <param name="context"></param> + <returns>The new ID of the object if speicifed, null otherwise</returns> + </member> + <member name="M:VNLib.Data.Caching.ClientExtensions.Method(VNLib.Net.Messaging.FBM.Server.FBMContext)"> + <summary> + Gets the request method for the request + </summary> + <param name="context"></param> + <returns>The request method string</returns> + </member> + <member name="M:VNLib.Data.Caching.ClientExtensions.CloseResponse(VNLib.Net.Messaging.FBM.Server.FBMContext,System.String,System.IO.Stream)"> + <summary> + Closes a response with a status code + </summary> + <param name="context"></param> + <param name="responseCode">The status code to send to the client</param> + <param name="payload">The payload to send to the client</param> + </member> + <member name="M:VNLib.Data.Caching.ClientExtensions.CloseResponse(VNLib.Net.Messaging.FBM.Server.FBMContext,System.String)"> + <summary> + Closes a response with a status code + </summary> + <param name="context"></param> + <param name="responseCode">The status code to send to the client</param> + </member> + <member name="M:VNLib.Data.Caching.ClientExtensions.SetAuth(VNLib.Net.Messaging.FBM.Client.FBMClientWorkerBase,System.ReadOnlySpan{System.Byte},System.Int32)"> + <summary> + Computes the authorization headers for the initial client connection + </summary> + <param name="worker"></param> + <param name="secret">The pre-shared secret used to compute a secure hash of a random token</param> + <param name="saltSize">The size (in bytes) of the salt to compute the hash of</param> + </member> + <member name="M:VNLib.Data.Caching.ClientExtensions.Authorized(VNLib.Net.Http.ConnectionInfo,System.ReadOnlySpan{System.Byte})"> + <summary> + Determines if the client has the proper authorization by verifying the client data can compute the same hash result with + the specified secret + </summary> + <param name="server"></param> + <param name="secret">The pre-shared secret used to verify the data</param> + <returns>True if the authorization headers compute to the proper hash result (keys match), false otherwise</returns> + <remarks>The verification is fixed-time</remarks> + <exception cref="T:System.OutOfMemoryException"></exception> + </member> + <member name="M:VNLib.Data.Caching.ClientExtensions.SetReconnectPolicy(VNLib.Net.Messaging.FBM.Client.FBMClientWorkerBase,System.TimeSpan,System.Uri)"> + <summary> + Initializes the worker for a reconnect policy and returns an object that can listen for changes + and configure the connection as necessary + </summary> + <param name="worker"></param> + <param name="retryDelay">The amount of time to wait between retries</param> + <param name="serverUri">The uri to reconnect the client to</param> + <returns>A <see cref="T:VNLib.Data.Caching.ClientRetryManager"/> for listening for retry events</returns> + </member> + <member name="T:VNLib.Data.Caching.ClientRetryManager"> + <summary> + Manages a <see cref="T:VNLib.Net.Messaging.FBM.Client.FBMClientWorkerBase"/> reconnect policy + </summary> + </member> + <member name="E:VNLib.Data.Caching.ClientRetryManager.OnBeforeReconnect"> + <summary> + Raised before client is to be reconnected + </summary> + </member> + <member name="E:VNLib.Data.Caching.ClientRetryManager.OnReconnectFailed"> + <summary> + Raised when the client fails to reconnect. Should return a value that instructs the + manager to reconnect + </summary> + </member> + <member name="E:VNLib.Data.Caching.ClientRetryManager.OnSuccessfulReconnect"> + <summary> + Raised when the client websocket is successfully reconnected + </summary> + </member> + <member name="T:VNLib.Data.Caching.Constants.Actions"> + <summary> + Contains constants the define actions + </summary> + </member> + <member name="T:VNLib.Data.Caching.Constants.ResponseCodes"> + <summary> + Containts constants for operation response codes + </summary> + </member> + <member name="T:VNLib.Data.Caching.Exceptions.InvalidStatusException"> + <summary> + Raised when the response status code of an FBM Request message is not valid for + the specified request + </summary> + </member> + <member name="M:VNLib.Data.Caching.Exceptions.InvalidStatusException.#ctor(System.String,System.String)"> + <summary> + Initalizes a new <see cref="T:VNLib.Data.Caching.Exceptions.InvalidStatusException"/> with the specfied status code + </summary> + <param name="message"></param> + <param name="statusCode"></param> + </member> + <member name="M:VNLib.Data.Caching.Exceptions.InvalidStatusException.#ctor"> + <inheritdoc/> + </member> + <member name="M:VNLib.Data.Caching.Exceptions.InvalidStatusException.#ctor(System.String)"> + <inheritdoc/> + </member> + <member name="M:VNLib.Data.Caching.Exceptions.InvalidStatusException.#ctor(System.String,System.Exception)"> + <inheritdoc/> + </member> + <member name="P:VNLib.Data.Caching.Exceptions.InvalidStatusException.Message"> + <inheritdoc/> + </member> + <member name="T:VNLib.Data.Caching.Exceptions.MessageTooLargeException"> + <summary> + Raised when a request (or server response) calculates the size of the message to be too large to proccess + </summary> + </member> + <member name="M:VNLib.Data.Caching.Exceptions.MessageTooLargeException.#ctor"> + <inheritdoc/> + </member> + <member name="M:VNLib.Data.Caching.Exceptions.MessageTooLargeException.#ctor(System.String)"> + <inheritdoc/> + </member> + <member name="M:VNLib.Data.Caching.Exceptions.MessageTooLargeException.#ctor(System.String,System.Exception)"> + <inheritdoc/> + </member> + <member name="M:VNLib.Data.Caching.Exceptions.MessageTooLargeException.#ctor(System.Runtime.Serialization.SerializationInfo,System.Runtime.Serialization.StreamingContext)"> + <inheritdoc/> + </member> + </members> +</doc> diff --git a/VNLib.Data.Caching/src/WaitForChangeResult.cs b/VNLib.Data.Caching/src/WaitForChangeResult.cs new file mode 100644 index 0000000..a309c7c --- /dev/null +++ b/VNLib.Data.Caching/src/WaitForChangeResult.cs @@ -0,0 +1,21 @@ +namespace VNLib.Data.Caching +{ + /// <summary> + /// The result of a cache server change event + /// </summary> + public readonly struct WaitForChangeResult + { + /// <summary> + /// The operation status code + /// </summary> + public readonly string Status { get; init; } + /// <summary> + /// The current (or old) id of the element that changed + /// </summary> + public readonly string CurrentId { get; init; } + /// <summary> + /// The new id of the element that changed + /// </summary> + public readonly string NewId { get; init; } + } +} |