From cea64e619e714f6dbe51d37ca8329b58d8c271fb Mon Sep 17 00:00:00 2001 From: vman Date: Mon, 26 Dec 2022 16:47:10 -0500 Subject: License + global cache removal --- VNLib.Data.Caching/ClientExtensions.cs | 329 +++++++++++++++++++++++++++++++++ 1 file changed, 329 insertions(+) create mode 100644 VNLib.Data.Caching/ClientExtensions.cs (limited to 'VNLib.Data.Caching/ClientExtensions.cs') diff --git a/VNLib.Data.Caching/ClientExtensions.cs b/VNLib.Data.Caching/ClientExtensions.cs new file mode 100644 index 0000000..0ae61c7 --- /dev/null +++ b/VNLib.Data.Caching/ClientExtensions.cs @@ -0,0 +1,329 @@ +/* +* Copyright (c) 2022 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Data.Caching +* File: ClientExtensions.cs +* +* ClientExtensions.cs is part of VNLib.Data.Caching which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Data.Caching is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Data.Caching is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.IO; +using System.Linq; +using System.Buffers; +using System.Text.Json; +using System.Threading; +using System.Threading.Tasks; +using System.Collections.Generic; +using System.Text.Json.Serialization; +using System.Runtime.CompilerServices; + +using VNLib.Utils.Logging; +using VNLib.Net.Messaging.FBM; +using VNLib.Net.Messaging.FBM.Client; +using VNLib.Net.Messaging.FBM.Server; +using VNLib.Data.Caching.Exceptions; + +using static VNLib.Data.Caching.Constants; + +namespace VNLib.Data.Caching +{ + + /// + /// Provides caching extension methods for + /// + public static class ClientExtensions + { + private static readonly JsonSerializerOptions LocalOptions = new() + { + DictionaryKeyPolicy = JsonNamingPolicy.CamelCase, + NumberHandling = JsonNumberHandling.Strict, + ReadCommentHandling = JsonCommentHandling.Disallow, + WriteIndented = false, + DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull, + IgnoreReadOnlyFields = true, + PropertyNameCaseInsensitive = true, + IncludeFields = false, + + //Use small buffers + DefaultBufferSize = 128 + }; + + private static void LogDebug(this FBMClient client, string message, params object?[] args) + { + if (client.Config.DebugLog != null) + { + client.Config.DebugLog.Debug($"[CACHE] : {message}", args); + } + } + + /// + /// Gets an object from the server if it exists + /// + /// + /// + /// The id of the object to get + /// A token to cancel the operation + /// A task that completes to return the results of the response payload + /// + /// + /// + /// + /// + public static async Task GetObjectAsync(this FBMClient client, string objectId, CancellationToken cancellationToken = default) + { + _ = client ?? throw new ArgumentNullException(nameof(client)); + + client.LogDebug("Getting object {id}", objectId); + + //Rent a new request + FBMRequest request = client.RentRequest(); + try + { + //Set action as get/create + request.WriteHeader(HeaderCommand.Action, Actions.Get); + //Set session-id header + request.WriteHeader(Constants.ObjectId, objectId); + + //Make request + using FBMResponse response = await client.SendAsync(request, cancellationToken); + + response.ThrowIfNotSet(); + //Get the status code + ReadOnlyMemory status = response.Headers.FirstOrDefault(static a => a.Key == HeaderCommand.Status).Value; + if (status.Span.Equals(ResponseCodes.Okay, StringComparison.Ordinal)) + { + return JsonSerializer.Deserialize(response.ResponseBody, LocalOptions); + } + //Session may not exist on the server yet + if (status.Span.Equals(ResponseCodes.NotFound, StringComparison.Ordinal)) + { + return default; + } + throw new InvalidStatusException("Invalid status code recived for object get request", status.ToString()); + } + finally + { + client.ReturnRequest(request); + } + } + + /// + /// Updates the state of the object, and optionally updates the ID of the object. The data + /// parameter is serialized, buffered, and streamed to the remote server + /// + /// + /// + /// The id of the object to update or replace + /// An optional parameter to specify a new ID for the old object + /// The payload data to serialize and set as the data state of the session + /// A token to cancel the operation + /// A task that resolves when the server responds + /// + /// + /// + /// + /// + /// + /// + public static async Task AddOrUpdateObjectAsync(this FBMClient client, string objectId, string? newId, T data, CancellationToken cancellationToken = default) + { + _ = client ?? throw new ArgumentNullException(nameof(client)); + + client.LogDebug("Updating object {id}, newid {nid}", objectId, newId); + + //Rent a new request + FBMRequest request = client.RentRequest(); + try + { + //Set action as get/create + request.WriteHeader(HeaderCommand.Action, Actions.AddOrUpdate); + //Set session-id header + request.WriteHeader(Constants.ObjectId, objectId); + //if new-id set, set the new-id header + if (!string.IsNullOrWhiteSpace(newId)) + { + request.WriteHeader(Constants.NewObjectId, newId); + } + //Get the body writer for the message + IBufferWriter bodyWriter = request.GetBodyWriter(); + //Write json data to the message + using (Utf8JsonWriter jsonWriter = new(bodyWriter)) + { + JsonSerializer.Serialize(jsonWriter, data, LocalOptions); + } + + //Make request + using FBMResponse response = await client.SendAsync(request, cancellationToken); + + response.ThrowIfNotSet(); + //Get the status code + ReadOnlyMemory status = response.Headers.FirstOrDefault(static a => a.Key == HeaderCommand.Status).Value; + //Check status code + if (status.Span.Equals(ResponseCodes.Okay, StringComparison.OrdinalIgnoreCase)) + { + return; + } + else if(status.Span.Equals(ResponseCodes.NotFound, StringComparison.OrdinalIgnoreCase)) + { + throw new ObjectNotFoundException($"object {objectId} not found on remote server"); + } + //Invalid status + throw new InvalidStatusException("Invalid status code recived for object upsert request", status.ToString()); + } + finally + { + //Return the request(clears data and reset) + client.ReturnRequest(request); + } + } + + /// + /// Asynchronously deletes an object in the remote store + /// + /// + /// The id of the object to update or replace + /// A token to cancel the operation + /// A task that resolves when the operation has completed + /// + /// + /// + /// + public static async Task DeleteObjectAsync(this FBMClient client, string objectId, CancellationToken cancellationToken = default) + { + _ = client ?? throw new ArgumentNullException(nameof(client)); + + client.LogDebug("Deleting object {id}", objectId); + //Rent a new request + FBMRequest request = client.RentRequest(); + try + { + //Set action as delete + request.WriteHeader(HeaderCommand.Action, Actions.Delete); + //Set session-id header + request.WriteHeader(Constants.ObjectId, objectId); + + //Make request + using FBMResponse response = await client.SendAsync(request, cancellationToken); + + response.ThrowIfNotSet(); + //Get the status code + ReadOnlyMemory status = response.Headers.FirstOrDefault(static a => a.Key == HeaderCommand.Status).Value; + if (status.Span.Equals(ResponseCodes.Okay, StringComparison.Ordinal)) + { + return; + } + else if(status.Span.Equals(ResponseCodes.NotFound, StringComparison.OrdinalIgnoreCase)) + { + throw new ObjectNotFoundException($"object {objectId} not found on remote server"); + } + throw new InvalidStatusException("Invalid status code recived for object get request", status.ToString()); + } + finally + { + client.ReturnRequest(request); + } + } + + /// + /// Dequeues a change event from the server event queue for the current connection, or waits until a change happens + /// + /// + /// A token to cancel the deuque operation + /// A that contains the modified object id and optionally its new id + public static async Task WaitForChangeAsync(this FBMClient client, CancellationToken cancellationToken = default) + { + //Rent a new request + FBMRequest request = client.RentRequest(); + try + { + //Set action as event dequeue to dequeue a change event + request.WriteHeader(HeaderCommand.Action, Actions.Dequeue); + + //Make request + using FBMResponse response = await client.SendAsync(request, cancellationToken); + + response.ThrowIfNotSet(); + + return new() + { + Status = response.Headers.FirstOrDefault(static a => a.Key == HeaderCommand.Status).Value.ToString(), + CurrentId = response.Headers.SingleOrDefault(static v => v.Key == Constants.ObjectId).Value.ToString(), + NewId = response.Headers.SingleOrDefault(static v => v.Key == Constants.NewObjectId).Value.ToString() + }; + } + finally + { + client.ReturnRequest(request); + } + } + + /// + /// Gets the Object-id for the request message, or throws an if not specified + /// + /// + /// The id of the object requested + /// + public static string ObjectId(this FBMContext context) + { + return context.Request.Headers.First(static kvp => kvp.Key == Constants.ObjectId).Value.ToString(); + } + /// + /// Gets the new ID of the object if specified from the request. Null if the request did not specify an id update + /// + /// + /// The new ID of the object if speicifed, null otherwise + public static string? NewObjectId(this FBMContext context) + { + return context.Request.Headers.FirstOrDefault(static kvp => kvp.Key == Constants.NewObjectId).Value.ToString(); + } + /// + /// Gets the request method for the request + /// + /// + /// The request method string + public static string Method(this FBMContext context) + { + return context.Request.Headers.First(static kvp => kvp.Key == HeaderCommand.Action).Value.ToString(); + } + /// + /// Closes a response with a status code + /// + /// + /// The status code to send to the client + public static void CloseResponse(this FBMContext context, string responseCode) + { + context.Response.WriteHeader(HeaderCommand.Status, responseCode); + } + + + /// + /// Initializes the worker for a reconnect policy and returns an object that can listen for changes + /// and configure the connection as necessary + /// + /// + /// The amount of time to wait between retries + /// The uri to reconnect the client to + /// A for listening for retry events + public static ClientRetryManager SetReconnectPolicy(this T worker, TimeSpan retryDelay, Uri serverUri) where T: IStatefulConnection + { + //Return new manager + return new (worker, retryDelay, serverUri); + } + } +} -- cgit