diff options
Diffstat (limited to 'lib/VNLib.Data.Caching/src/ClientExtensions.cs')
-rw-r--r-- | lib/VNLib.Data.Caching/src/ClientExtensions.cs | 328 |
1 files changed, 328 insertions, 0 deletions
diff --git a/lib/VNLib.Data.Caching/src/ClientExtensions.cs b/lib/VNLib.Data.Caching/src/ClientExtensions.cs new file mode 100644 index 0000000..a207886 --- /dev/null +++ b/lib/VNLib.Data.Caching/src/ClientExtensions.cs @@ -0,0 +1,328 @@ +/* +* Copyright (c) 2022 Vaughn Nugent +* +* Library: VNLib +* Package: VNLib.Data.Caching +* File: ClientExtensions.cs +* +* ClientExtensions.cs is part of VNLib.Data.Caching which is part of the larger +* VNLib collection of libraries and utilities. +* +* VNLib.Data.Caching is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* VNLib.Data.Caching is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.IO; +using System.Linq; +using System.Buffers; +using System.Text.Json; +using System.Threading; +using System.Threading.Tasks; +using System.Collections.Generic; +using System.Text.Json.Serialization; + +using VNLib.Utils.Logging; +using VNLib.Net.Messaging.FBM; +using VNLib.Net.Messaging.FBM.Client; +using VNLib.Net.Messaging.FBM.Server; +using VNLib.Data.Caching.Exceptions; + +using static VNLib.Data.Caching.Constants; + +namespace VNLib.Data.Caching +{ + + /// <summary> + /// Provides caching extension methods for <see cref="FBMClient"/> + /// </summary> + public static class ClientExtensions + { + private static readonly JsonSerializerOptions LocalOptions = new() + { + DictionaryKeyPolicy = JsonNamingPolicy.CamelCase, + NumberHandling = JsonNumberHandling.Strict, + ReadCommentHandling = JsonCommentHandling.Disallow, + WriteIndented = false, + DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull, + IgnoreReadOnlyFields = true, + PropertyNameCaseInsensitive = true, + IncludeFields = false, + + //Use small buffers + DefaultBufferSize = 128 + }; + + private static void LogDebug(this FBMClient client, string message, params object?[] args) + { + if (client.Config.DebugLog != null) + { + client.Config.DebugLog.Debug($"[CACHE] : {message}", args); + } + } + + /// <summary> + /// Gets an object from the server if it exists + /// </summary> + /// <typeparam name="T"></typeparam> + /// <param name="client"></param> + /// <param name="objectId">The id of the object to get</param> + /// <param name="cancellationToken">A token to cancel the operation</param> + /// <returns>A task that completes to return the results of the response payload</returns> + /// <exception cref="JsonException"></exception> + /// <exception cref="OutOfMemoryException"></exception> + /// <exception cref="InvalidStatusException"></exception> + /// <exception cref="ObjectDisposedException"></exception> + /// <exception cref="InvalidResponseException"></exception> + public static async Task<T?> GetObjectAsync<T>(this FBMClient client, string objectId, CancellationToken cancellationToken = default) + { + _ = client ?? throw new ArgumentNullException(nameof(client)); + + client.LogDebug("Getting object {id}", objectId); + + //Rent a new request + FBMRequest request = client.RentRequest(); + try + { + //Set action as get/create + request.WriteHeader(HeaderCommand.Action, Actions.Get); + //Set session-id header + request.WriteHeader(Constants.ObjectId, objectId); + + //Make request + using FBMResponse response = await client.SendAsync(request, cancellationToken); + + response.ThrowIfNotSet(); + //Get the status code + ReadOnlyMemory<char> status = response.Headers.FirstOrDefault(static a => a.Key == HeaderCommand.Status).Value; + if (status.Span.Equals(ResponseCodes.Okay, StringComparison.Ordinal)) + { + return JsonSerializer.Deserialize<T>(response.ResponseBody, LocalOptions); + } + //Session may not exist on the server yet + if (status.Span.Equals(ResponseCodes.NotFound, StringComparison.Ordinal)) + { + return default; + } + throw new InvalidStatusException("Invalid status code recived for object get request", status.ToString()); + } + finally + { + client.ReturnRequest(request); + } + } + + /// <summary> + /// Updates the state of the object, and optionally updates the ID of the object. The data + /// parameter is serialized, buffered, and streamed to the remote server + /// </summary> + /// <typeparam name="T"></typeparam> + /// <param name="client"></param> + /// <param name="objectId">The id of the object to update or replace</param> + /// <param name="newId">An optional parameter to specify a new ID for the old object</param> + /// <param name="data">The payload data to serialize and set as the data state of the session</param> + /// <param name="cancellationToken">A token to cancel the operation</param> + /// <returns>A task that resolves when the server responds</returns> + /// <exception cref="JsonException"></exception> + /// <exception cref="OutOfMemoryException"></exception> + /// <exception cref="InvalidStatusException"></exception> + /// <exception cref="ObjectDisposedException"></exception> + /// <exception cref="InvalidResponseException"></exception> + /// <exception cref="MessageTooLargeException"></exception> + /// <exception cref="ObjectNotFoundException"></exception> + public static async Task AddOrUpdateObjectAsync<T>(this FBMClient client, string objectId, string? newId, T data, CancellationToken cancellationToken = default) + { + _ = client ?? throw new ArgumentNullException(nameof(client)); + + client.LogDebug("Updating object {id}, newid {nid}", objectId, newId); + + //Rent a new request + FBMRequest request = client.RentRequest(); + try + { + //Set action as get/create + request.WriteHeader(HeaderCommand.Action, Actions.AddOrUpdate); + //Set session-id header + request.WriteHeader(Constants.ObjectId, objectId); + //if new-id set, set the new-id header + if (!string.IsNullOrWhiteSpace(newId)) + { + request.WriteHeader(Constants.NewObjectId, newId); + } + //Get the body writer for the message + IBufferWriter<byte> bodyWriter = request.GetBodyWriter(); + //Write json data to the message + using (Utf8JsonWriter jsonWriter = new(bodyWriter)) + { + JsonSerializer.Serialize(jsonWriter, data, LocalOptions); + } + + //Make request + using FBMResponse response = await client.SendAsync(request, cancellationToken); + + response.ThrowIfNotSet(); + //Get the status code + ReadOnlyMemory<char> status = response.Headers.FirstOrDefault(static a => a.Key == HeaderCommand.Status).Value; + //Check status code + if (status.Span.Equals(ResponseCodes.Okay, StringComparison.OrdinalIgnoreCase)) + { + return; + } + else if(status.Span.Equals(ResponseCodes.NotFound, StringComparison.OrdinalIgnoreCase)) + { + throw new ObjectNotFoundException($"object {objectId} not found on remote server"); + } + //Invalid status + throw new InvalidStatusException("Invalid status code recived for object upsert request", status.ToString()); + } + finally + { + //Return the request(clears data and reset) + client.ReturnRequest(request); + } + } + + /// <summary> + /// Asynchronously deletes an object in the remote store + /// </summary> + /// <param name="client"></param> + /// <param name="objectId">The id of the object to update or replace</param> + /// <param name="cancellationToken">A token to cancel the operation</param> + /// <returns>A task that resolves when the operation has completed</returns> + /// <exception cref="InvalidStatusException"></exception> + /// <exception cref="ObjectDisposedException"></exception> + /// <exception cref="InvalidResponseException"></exception> + /// <exception cref="ObjectNotFoundException"></exception> + public static async Task DeleteObjectAsync(this FBMClient client, string objectId, CancellationToken cancellationToken = default) + { + _ = client ?? throw new ArgumentNullException(nameof(client)); + + client.LogDebug("Deleting object {id}", objectId); + //Rent a new request + FBMRequest request = client.RentRequest(); + try + { + //Set action as delete + request.WriteHeader(HeaderCommand.Action, Actions.Delete); + //Set session-id header + request.WriteHeader(Constants.ObjectId, objectId); + + //Make request + using FBMResponse response = await client.SendAsync(request, cancellationToken); + + response.ThrowIfNotSet(); + //Get the status code + ReadOnlyMemory<char> status = response.Headers.FirstOrDefault(static a => a.Key == HeaderCommand.Status).Value; + if (status.Span.Equals(ResponseCodes.Okay, StringComparison.Ordinal)) + { + return; + } + else if(status.Span.Equals(ResponseCodes.NotFound, StringComparison.OrdinalIgnoreCase)) + { + throw new ObjectNotFoundException($"object {objectId} not found on remote server"); + } + throw new InvalidStatusException("Invalid status code recived for object get request", status.ToString()); + } + finally + { + client.ReturnRequest(request); + } + } + + /// <summary> + /// Dequeues a change event from the server event queue for the current connection, or waits until a change happens + /// </summary> + /// <param name="client"></param> + /// <param name="cancellationToken">A token to cancel the deuque operation</param> + /// <returns>A <see cref="KeyValuePair{TKey, TValue}"/> that contains the modified object id and optionally its new id</returns> + public static async Task<WaitForChangeResult> WaitForChangeAsync(this FBMClient client, CancellationToken cancellationToken = default) + { + //Rent a new request + FBMRequest request = client.RentRequest(); + try + { + //Set action as event dequeue to dequeue a change event + request.WriteHeader(HeaderCommand.Action, Actions.Dequeue); + + //Make request + using FBMResponse response = await client.SendAsync(request, cancellationToken); + + response.ThrowIfNotSet(); + + return new() + { + Status = response.Headers.FirstOrDefault(static a => a.Key == HeaderCommand.Status).Value.ToString(), + CurrentId = response.Headers.SingleOrDefault(static v => v.Key == Constants.ObjectId).Value.ToString(), + NewId = response.Headers.SingleOrDefault(static v => v.Key == Constants.NewObjectId).Value.ToString() + }; + } + finally + { + client.ReturnRequest(request); + } + } + + /// <summary> + /// Gets the Object-id for the request message, or throws an <see cref="InvalidOperationException"/> if not specified + /// </summary> + /// <param name="context"></param> + /// <returns>The id of the object requested</returns> + /// <exception cref="InvalidOperationException"></exception> + public static string ObjectId(this FBMContext context) + { + return context.Request.Headers.First(static kvp => kvp.Key == Constants.ObjectId).Value.ToString(); + } + /// <summary> + /// Gets the new ID of the object if specified from the request. Null if the request did not specify an id update + /// </summary> + /// <param name="context"></param> + /// <returns>The new ID of the object if speicifed, null otherwise</returns> + public static string? NewObjectId(this FBMContext context) + { + return context.Request.Headers.FirstOrDefault(static kvp => kvp.Key == Constants.NewObjectId).Value.ToString(); + } + /// <summary> + /// Gets the request method for the request + /// </summary> + /// <param name="context"></param> + /// <returns>The request method string</returns> + public static string Method(this FBMContext context) + { + return context.Request.Headers.First(static kvp => kvp.Key == HeaderCommand.Action).Value.ToString(); + } + /// <summary> + /// Closes a response with a status code + /// </summary> + /// <param name="context"></param> + /// <param name="responseCode">The status code to send to the client</param> + public static void CloseResponse(this FBMContext context, string responseCode) + { + context.Response.WriteHeader(HeaderCommand.Status, responseCode); + } + + + /// <summary> + /// Initializes the worker for a reconnect policy and returns an object that can listen for changes + /// and configure the connection as necessary + /// </summary> + /// <param name="worker"></param> + /// <param name="retryDelay">The amount of time to wait between retries</param> + /// <param name="serverUri">The uri to reconnect the client to</param> + /// <returns>A <see cref="ClientRetryManager{T}"/> for listening for retry events</returns> + public static ClientRetryManager<T> SetReconnectPolicy<T>(this T worker, TimeSpan retryDelay, Uri serverUri) where T: IStatefulConnection + { + //Return new manager + return new (worker, retryDelay, serverUri); + } + } +} |