/* * Copyright (c) 2023 Vaughn Nugent * * Library: VNLib * Package: VNLib.Data.Caching * File: ClientExtensions.cs * * ClientExtensions.cs is part of VNLib.Data.Caching which is part of the larger * VNLib collection of libraries and utilities. * * VNLib.Data.Caching is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License as * published by the Free Software Foundation, either version 3 of the * License, or (at your option) any later version. * * VNLib.Data.Caching is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see https://www.gnu.org/licenses/. */ using System; using System.Linq; using System.Buffers; using System.Text.Json; using System.Threading; using System.Threading.Tasks; using System.Runtime.CompilerServices; using VNLib.Utils.Logging; using VNLib.Net.Messaging.FBM; using VNLib.Net.Messaging.FBM.Client; using VNLib.Net.Messaging.FBM.Server; using VNLib.Data.Caching.Exceptions; using static VNLib.Data.Caching.Constants; namespace VNLib.Data.Caching { /// /// Provides caching extension methods for /// public static class ClientExtensions { private static readonly JsonCacheObjectSerializer DefaultSerializer = new(); [MethodImpl(MethodImplOptions.AggressiveInlining)] private static void LogDebug(this FBMClient client, string message, params object?[] args) { client.Config.DebugLog?.Debug($"[CACHE] : {message}", args); } /// /// Gets an object from the server if it exists, and uses the default serialzer to /// recover the object /// /// /// /// The id of the object to get /// A token to cancel the operation /// A task that completes to return the results of the response payload /// /// /// /// /// public static Task GetObjectAsync(this FBMClient client, string objectId, CancellationToken cancellationToken = default) { return GetObjectAsync(client, objectId, DefaultSerializer, cancellationToken); } /// /// Updates the state of the object, and optionally updates the ID of the object. The data /// parameter is serialized, buffered, and streamed to the remote server /// /// /// /// The id of the object to update or replace /// An optional parameter to specify a new ID for the old object /// The payload data to serialize and set as the data state of the session /// A token to cancel the operation /// A task that resolves when the server responds /// /// /// /// /// /// /// public static Task AddOrUpdateObjectAsync(this FBMClient client, string objectId, string? newId, T data, CancellationToken cancellationToken = default) { //Use the default/json serialzer if not specified return AddOrUpdateObjectAsync(client, objectId, newId, data, DefaultSerializer, cancellationToken); } /// /// Gets an object from the server if it exists /// /// /// /// The id of the object to get /// A token to cancel the operation /// The custom data deserialzer used to deserialze the binary cache result /// A task that completes to return the results of the response payload /// /// /// public static async Task GetObjectAsync(this FBMClient client, string objectId, ICacheObjectDeserialzer deserialzer, CancellationToken cancellationToken = default) { _ = client ?? throw new ArgumentNullException(nameof(client)); _ = deserialzer ?? throw new ArgumentNullException(nameof(deserialzer)); client.LogDebug("Getting object {id}", objectId); //Rent a new request FBMRequest request = client.RentRequest(); try { //Set action as get/create request.WriteHeader(HeaderCommand.Action, Actions.Get); //Set object id header request.WriteHeader(Constants.ObjectId, objectId); //Make request using FBMResponse response = await client.SendAsync(request, cancellationToken); response.ThrowIfNotSet(); //Get the status code FBMMessageHeader status = response.Headers.FirstOrDefault(static a => a.Header == HeaderCommand.Status); //Check ok status code, then its safe to deserialize if (status.Value.Equals(ResponseCodes.Okay, StringComparison.Ordinal)) { return deserialzer.Deserialze(response.ResponseBody); } //Object may not exist on the server yet if (status.Value.Equals(ResponseCodes.NotFound, StringComparison.Ordinal)) { return default; } throw new InvalidStatusException("Invalid status code recived for object get request", status.ToString()); } finally { client.ReturnRequest(request); } } /// /// Updates the state of the object, and optionally updates the ID of the object. The data /// parameter is serialized, buffered, and streamed to the remote server /// /// /// /// The id of the object to update or replace /// An optional parameter to specify a new ID for the old object /// The payload data to serialize and set as the data state of the session /// The custom serializer to used to serialze the object to binary /// A token to cancel the operation /// A task that resolves when the server responds /// /// /// /// /// /// public static async Task AddOrUpdateObjectAsync( this FBMClient client, string objectId, string? newId, T data, ICacheObjectSerialzer serializer, CancellationToken cancellationToken = default) { _ = client ?? throw new ArgumentNullException(nameof(client)); _ = serializer ?? throw new ArgumentNullException(nameof(serializer)); client.LogDebug("Updating object {id}, newid {nid}", objectId, newId); //Rent a new request FBMRequest request = client.RentRequest(); try { //Set action as get/create request.WriteHeader(HeaderCommand.Action, Actions.AddOrUpdate); //Set session-id header request.WriteHeader(Constants.ObjectId, objectId); //if new-id set, set the new-id header if (!string.IsNullOrWhiteSpace(newId)) { request.WriteHeader(Constants.NewObjectId, newId); } //Get the body writer for the message IBufferWriter bodyWriter = request.GetBodyWriter(); //Serialize the message serializer.Serialize(data, bodyWriter); //Make request using FBMResponse response = await client.SendAsync(request, cancellationToken); response.ThrowIfNotSet(); //Get the status code FBMMessageHeader status = response.Headers.FirstOrDefault(static a => a.Header == HeaderCommand.Status); //Check status code if (status.Value.Equals(ResponseCodes.Okay, StringComparison.OrdinalIgnoreCase)) { return; } else if (status.Value.Equals(ResponseCodes.NotFound, StringComparison.OrdinalIgnoreCase)) { throw new ObjectNotFoundException($"object {objectId} not found on remote server"); } //Invalid status throw new InvalidStatusException("Invalid status code recived for object upsert request", status.ToString()); } finally { //Return the request(clears data and reset) client.ReturnRequest(request); } } /// /// Asynchronously deletes an object in the remote store /// /// /// The id of the object to update or replace /// A token to cancel the operation /// A task that resolves when the operation has completed /// /// /// /// public static async Task DeleteObjectAsync(this FBMClient client, string objectId, CancellationToken cancellationToken = default) { _ = client ?? throw new ArgumentNullException(nameof(client)); client.LogDebug("Deleting object {id}", objectId); //Rent a new request FBMRequest request = client.RentRequest(); try { //Set action as delete request.WriteHeader(HeaderCommand.Action, Actions.Delete); //Set session-id header request.WriteHeader(Constants.ObjectId, objectId); //Make request using FBMResponse response = await client.SendAsync(request, cancellationToken); response.ThrowIfNotSet(); //Get the status code FBMMessageHeader status = response.Headers.FirstOrDefault(static a => a.Header == HeaderCommand.Status); if (status.Value.Equals(ResponseCodes.Okay, StringComparison.Ordinal)) { return; } else if(status.Value.Equals(ResponseCodes.NotFound, StringComparison.OrdinalIgnoreCase)) { throw new ObjectNotFoundException($"object {objectId} not found on remote server"); } throw new InvalidStatusException("Invalid status code recived for object get request", status.ToString()); } finally { client.ReturnRequest(request); } } /// /// Updates the state of the object, and optionally updates the ID of the object. The data /// parameter is serialized, buffered, and streamed to the remote server /// /// /// The id of the object to update or replace /// An optional parameter to specify a new ID for the old object /// An that represents the data to set /// A token to cancel the operation /// A task that resolves when the server responds /// /// /// /// /// /// public async static Task AddOrUpdateObjectAsync(this FBMClient client, string objectId, string? newId, IObjectData data, CancellationToken cancellationToken = default) { _ = client ?? throw new ArgumentNullException(nameof(client)); _ = data ?? throw new ArgumentNullException(nameof(data)); client.LogDebug("Updating object {id}, newid {nid}", objectId, newId); //Rent a new request FBMRequest request = client.RentRequest(); try { //Set action as get/create request.WriteHeader(HeaderCommand.Action, Actions.AddOrUpdate); //Set session-id header request.WriteHeader(Constants.ObjectId, objectId); //if new-id set, set the new-id header if (!string.IsNullOrWhiteSpace(newId)) { request.WriteHeader(Constants.NewObjectId, newId); } //Write the message body as the objet data request.WriteBody(data.GetData()); //Make request using FBMResponse response = await client.SendAsync(request, cancellationToken); response.ThrowIfNotSet(); //Get the status code FBMMessageHeader status = response.Headers.FirstOrDefault(static a => a.Header == HeaderCommand.Status); //Check status code if (status.Value.Equals(ResponseCodes.Okay, StringComparison.OrdinalIgnoreCase)) { return; } else if (status.Value.Equals(ResponseCodes.NotFound, StringComparison.OrdinalIgnoreCase)) { throw new ObjectNotFoundException($"object {objectId} not found on remote server"); } //Invalid status throw new InvalidStatusException("Invalid status code recived for object upsert request", status.ToString()); } finally { //Return the request(clears data and reset) client.ReturnRequest(request); } } /// /// Gets an object from the server if it exists. If data is retreived, it sets /// the , if no data is /// found, this method returns and never calls SetData. /// /// /// The id of the object to get /// A token to cancel the operation /// An that represents the object data to set /// A task that completes to return the results of the response payload /// /// /// public static async Task GetObjectAsync(this FBMClient client, string objectId, IObjectData data, CancellationToken cancellationToken = default) { _ = client ?? throw new ArgumentNullException(nameof(client)); _ = data ?? throw new ArgumentNullException(nameof(data)); client.LogDebug("Getting object {id}", objectId); //Rent a new request FBMRequest request = client.RentRequest(); try { //Set action as get/create request.WriteHeader(HeaderCommand.Action, Actions.Get); //Set object id header request.WriteHeader(Constants.ObjectId, objectId); //Make request using FBMResponse response = await client.SendAsync(request, cancellationToken); response.ThrowIfNotSet(); //Get the status code FBMMessageHeader status = response.Headers.FirstOrDefault(static a => a.Header == HeaderCommand.Status); //Check ok status code, then its safe to deserialize if (status.Value.Equals(ResponseCodes.Okay, StringComparison.Ordinal)) { //Write the object data data.SetData(response.ResponseBody); return; } //Object may not exist on the server yet if (status.Value.Equals(ResponseCodes.NotFound, StringComparison.Ordinal)) { return; } throw new InvalidStatusException("Invalid status code recived for object get request", status.ToString()); } finally { client.ReturnRequest(request); } } /// /// Dequeues a change event from the server event queue for the current connection, or waits until a change happens /// /// /// A token to cancel the deuque operation /// A that contains information about the modified element public static async Task WaitForChangeAsync(this FBMClient client, CancellationToken cancellationToken = default) { //Rent a new request FBMRequest request = client.RentRequest(); try { //Set action as event dequeue to dequeue a change event request.WriteHeader(HeaderCommand.Action, Actions.Dequeue); //Make request using FBMResponse response = await client.SendAsync(request, cancellationToken); response.ThrowIfNotSet(); return new() { Status = response.Headers.FirstOrDefault(static a => a.Header == HeaderCommand.Status).Value.ToString(), CurrentId = response.Headers.SingleOrDefault(static v => v.Header == Constants.ObjectId).Value.ToString(), NewId = response.Headers.SingleOrDefault(static v => v.Header == Constants.NewObjectId).Value.ToString() }; } finally { client.ReturnRequest(request); } } /// /// Gets the Object-id for the request message, or throws an if not specified /// /// /// The id of the object requested /// [MethodImpl(MethodImplOptions.AggressiveInlining)] public static string ObjectId(this FBMContext context) { return context.Request.Headers.First(static kvp => kvp.Header == Constants.ObjectId).Value.ToString(); } /// /// Gets the new ID of the object if specified from the request. Null if the request did not specify an id update /// /// /// The new ID of the object if speicifed, null otherwise [MethodImpl(MethodImplOptions.AggressiveInlining)] public static string? NewObjectId(this FBMContext context) { return context.Request.Headers.FirstOrDefault(static kvp => kvp.Header == Constants.NewObjectId).GetValueString(); } /// /// Gets the request method for the request /// /// /// The request method string [MethodImpl(MethodImplOptions.AggressiveInlining)] public static string Method(this FBMContext context) { return context.Request.Headers.First(static kvp => kvp.Header == HeaderCommand.Action).Value.ToString(); } /// /// Closes a response with a status code /// /// /// The status code to send to the client [MethodImpl(MethodImplOptions.AggressiveInlining)] public static void CloseResponse(this FBMContext context, string responseCode) { context.Response.WriteHeader(HeaderCommand.Status, responseCode); } /// /// Initializes the worker for a reconnect policy and returns an object that can listen for changes /// and configure the connection as necessary /// /// /// The amount of time to wait between retries /// The uri to reconnect the client to /// A for listening for retry events public static ClientRetryManager SetReconnectPolicy(this T worker, TimeSpan retryDelay, Uri serverUri) where T: IStatefulConnection { //Return new manager return new (worker, retryDelay, serverUri); } } }