/* * Copyright (c) 2022 Vaughn Nugent * * Library: VNLib * Package: VNLib.Data.Caching * File: ClientExtensions.cs * * ClientExtensions.cs is part of VNLib.Data.Caching which is part of the larger * VNLib collection of libraries and utilities. * * VNLib.Data.Caching is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published * by the Free Software Foundation, either version 2 of the License, * or (at your option) any later version. * * VNLib.Data.Caching is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public License * along with VNLib.Data.Caching. If not, see http://www.gnu.org/licenses/. */ using System; using System.IO; using System.Linq; using System.Buffers; using System.Text.Json; using System.Threading; using System.Threading.Tasks; using System.Collections.Generic; using System.Text.Json.Serialization; using System.Runtime.CompilerServices; using VNLib.Utils.Logging; using VNLib.Net.Messaging.FBM; using VNLib.Net.Messaging.FBM.Client; using VNLib.Net.Messaging.FBM.Server; using VNLib.Data.Caching.Exceptions; using static VNLib.Data.Caching.Constants; namespace VNLib.Data.Caching { /// /// Provides caching extension methods for /// public static class ClientExtensions { private static readonly JsonSerializerOptions LocalOptions = new() { DictionaryKeyPolicy = JsonNamingPolicy.CamelCase, NumberHandling = JsonNumberHandling.Strict, ReadCommentHandling = JsonCommentHandling.Disallow, WriteIndented = false, DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull, IgnoreReadOnlyFields = true, PropertyNameCaseInsensitive = true, IncludeFields = false, //Use small buffers DefaultBufferSize = 128 }; private static readonly ConditionalWeakTable GetLock = new(); private static readonly ConditionalWeakTable UpdateLock = new(); private static SemaphoreSlim GetLockCtor(FBMClient client) => new (50); private static SemaphoreSlim UpdateLockCtor(FBMClient client) => new (25); /// /// Gets an object from the server if it exists /// /// /// /// The id of the object to get /// A token to cancel the operation /// A task that completes to return the results of the response payload /// /// /// /// /// public static async Task GetObjectAsync(this FBMClient client, string objectId, CancellationToken cancellationToken = default) { client.Config.DebugLog?.Debug("[DEBUG] Getting object {id}", objectId); SemaphoreSlim getLock = GetLock.GetValue(client, GetLockCtor); //Wait for entry await getLock.WaitAsync(cancellationToken); //Rent a new request FBMRequest request = client.RentRequest(); try { //Set action as get/create request.WriteHeader(HeaderCommand.Action, Actions.Get); //Set session-id header request.WriteHeader(Constants.ObjectId, objectId); //Make request using FBMResponse response = await client.SendAsync(request, cancellationToken); response.ThrowIfNotSet(); //Get the status code ReadOnlyMemory status = response.Headers.FirstOrDefault(static a => a.Key == HeaderCommand.Status).Value; if (status.Span.Equals(ResponseCodes.Okay, StringComparison.Ordinal)) { return JsonSerializer.Deserialize(response.ResponseBody, LocalOptions); } //Session may not exist on the server yet if (status.Span.Equals(ResponseCodes.NotFound, StringComparison.Ordinal)) { return default; } throw new InvalidStatusException("Invalid status code recived for object get request", status.ToString()); } finally { getLock.Release(); client.ReturnRequest(request); } } /// /// Updates the state of the object, and optionally updates the ID of the object. The data /// parameter is serialized, buffered, and streamed to the remote server /// /// /// /// The id of the object to update or replace /// An optional parameter to specify a new ID for the old object /// The payload data to serialize and set as the data state of the session /// A token to cancel the operation /// A task that resolves when the server responds /// /// /// /// /// /// /// public static async Task AddOrUpdateObjectAsync(this FBMClient client, string objectId, string? newId, T data, CancellationToken cancellationToken = default) { client.Config.DebugLog?.Debug("[DEBUG] Updating object {id}, newid {nid}", objectId, newId); SemaphoreSlim updateLock = UpdateLock.GetValue(client, UpdateLockCtor); //Wait for entry await updateLock.WaitAsync(cancellationToken); //Rent a new request FBMRequest request = client.RentRequest(); try { //Set action as get/create request.WriteHeader(HeaderCommand.Action, Actions.AddOrUpdate); //Set session-id header request.WriteHeader(Constants.ObjectId, objectId); //if new-id set, set the new-id header if (!string.IsNullOrWhiteSpace(newId)) { request.WriteHeader(Constants.NewObjectId, newId); } //Get the body writer for the message IBufferWriter bodyWriter = request.GetBodyWriter(); //Write json data to the message using (Utf8JsonWriter jsonWriter = new(bodyWriter)) { JsonSerializer.Serialize(jsonWriter, data, LocalOptions); } //Make request using FBMResponse response = await client.SendAsync(request, cancellationToken); response.ThrowIfNotSet(); //Get the status code ReadOnlyMemory status = response.Headers.FirstOrDefault(static a => a.Key == HeaderCommand.Status).Value; //Check status code if (status.Span.Equals(ResponseCodes.Okay, StringComparison.OrdinalIgnoreCase)) { return; } else if(status.Span.Equals(ResponseCodes.NotFound, StringComparison.OrdinalIgnoreCase)) { throw new ObjectNotFoundException($"object {objectId} not found on remote server"); } //Invalid status throw new InvalidStatusException("Invalid status code recived for object upsert request", status.ToString()); } finally { updateLock.Release(); //Return the request(clears data and reset) client.ReturnRequest(request); } } /// /// Asynchronously deletes an object in the remote store /// /// /// The id of the object to update or replace /// A token to cancel the operation /// A task that resolves when the operation has completed /// /// /// /// public static async Task DeleteObjectAsync(this FBMClient client, string objectId, CancellationToken cancellationToken = default) { client.Config.DebugLog?.Debug("[DEBUG] Deleting object {id}", objectId); SemaphoreSlim updateLock = UpdateLock.GetValue(client, UpdateLockCtor); //Wait for entry await updateLock.WaitAsync(cancellationToken); //Rent a new request FBMRequest request = client.RentRequest(); try { //Set action as delete request.WriteHeader(HeaderCommand.Action, Actions.Delete); //Set session-id header request.WriteHeader(Constants.ObjectId, objectId); //Make request using FBMResponse response = await client.SendAsync(request, cancellationToken); response.ThrowIfNotSet(); //Get the status code ReadOnlyMemory status = response.Headers.FirstOrDefault(static a => a.Key == HeaderCommand.Status).Value; if (status.Span.Equals(ResponseCodes.Okay, StringComparison.Ordinal)) { return; } else if(status.Span.Equals(ResponseCodes.NotFound, StringComparison.OrdinalIgnoreCase)) { throw new ObjectNotFoundException($"object {objectId} not found on remote server"); } throw new InvalidStatusException("Invalid status code recived for object get request", status.ToString()); } finally { updateLock.Release(); client.ReturnRequest(request); } } /// /// Dequeues a change event from the server event queue for the current connection, or waits until a change happens /// /// /// A token to cancel the deuque operation /// A that contains the modified object id and optionally its new id public static async Task WaitForChangeAsync(this FBMClient client, CancellationToken cancellationToken = default) { //Rent a new request FBMRequest request = client.RentRequest(); try { //Set action as event dequeue to dequeue a change event request.WriteHeader(HeaderCommand.Action, Actions.Dequeue); //Make request using FBMResponse response = await client.SendAsync(request, cancellationToken); response.ThrowIfNotSet(); return new() { Status = response.Headers.FirstOrDefault(static a => a.Key == HeaderCommand.Status).Value.ToString(), CurrentId = response.Headers.SingleOrDefault(static v => v.Key == Constants.ObjectId).Value.ToString(), NewId = response.Headers.SingleOrDefault(static v => v.Key == Constants.NewObjectId).Value.ToString() }; } finally { client.ReturnRequest(request); } } /// /// Gets the Object-id for the request message, or throws an if not specified /// /// /// The id of the object requested /// public static string ObjectId(this FBMContext context) { return context.Request.Headers.First(static kvp => kvp.Key == Constants.ObjectId).Value.ToString(); } /// /// Gets the new ID of the object if specified from the request. Null if the request did not specify an id update /// /// /// The new ID of the object if speicifed, null otherwise public static string? NewObjectId(this FBMContext context) { return context.Request.Headers.FirstOrDefault(static kvp => kvp.Key == Constants.NewObjectId).Value.ToString(); } /// /// Gets the request method for the request /// /// /// The request method string public static string Method(this FBMContext context) { return context.Request.Headers.First(static kvp => kvp.Key == HeaderCommand.Action).Value.ToString(); } /// /// Closes a response with a status code /// /// /// The status code to send to the client public static void CloseResponse(this FBMContext context, string responseCode) { context.Response.WriteHeader(HeaderCommand.Status, responseCode); } /// /// Initializes the worker for a reconnect policy and returns an object that can listen for changes /// and configure the connection as necessary /// /// /// The amount of time to wait between retries /// The uri to reconnect the client to /// A for listening for retry events public static ClientRetryManager SetReconnectPolicy(this T worker, TimeSpan retryDelay, Uri serverUri) where T: IStatefulConnection { //Return new manager return new (worker, retryDelay, serverUri); } } }