/*
* Copyright (c) 2023 Vaughn Nugent
*
* Library: VNLib
* Package: VNLib.Data.Caching
* File: ClientExtensions.cs
*
* ClientExtensions.cs is part of VNLib.Data.Caching which is part of the larger
* VNLib collection of libraries and utilities.
*
* VNLib.Data.Caching is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* VNLib.Data.Caching is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see https://www.gnu.org/licenses/.
*/
using System;
using System.Linq;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using System.Runtime.CompilerServices;
using VNLib.Utils.Logging;
using VNLib.Net.Messaging.FBM;
using VNLib.Net.Messaging.FBM.Client;
using VNLib.Net.Messaging.FBM.Server;
using VNLib.Data.Caching.Exceptions;
using static VNLib.Data.Caching.Constants;
namespace VNLib.Data.Caching
{
///
/// Provides caching extension methods for
///
public static class ClientExtensions
{
private static readonly JsonCacheObjectSerializer DefaultSerializer = new(256);
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static void LogDebug(this FBMClient client, string message, params object?[] args)
{
client.Config.DebugLog?.Debug($"[CACHE] : {message}", args);
}
///
/// Updates the state of the object, and optionally updates the ID of the object. The data
/// parameter is serialized, buffered, and streamed to the remote server
///
///
///
/// The id of the object to update or replace
/// An optional parameter to specify a new ID for the old object
/// The payload data to serialize and set as the data state of the session
/// A token to cancel the operation
/// A task that resolves when the server responds
///
///
///
///
///
///
///
public static Task AddOrUpdateObjectAsync(this FBMClient client, string objectId, string? newId, T data, CancellationToken cancellationToken = default)
{
//Use the default/json serialzer if not specified
return AddOrUpdateObjectAsync(client, objectId, newId, data, DefaultSerializer, cancellationToken);
}
///
/// Updates the state of the object, and optionally updates the ID of the object. The data
/// parameter is serialized, buffered, and streamed to the remote server
///
///
///
/// The id of the object to update or replace
/// An optional parameter to specify a new ID for the old object
/// The payload data to serialize and set as the data state of the session
/// The custom serializer to used to serialze the object to binary
/// A token to cancel the operation
/// A task that resolves when the server responds
///
///
///
///
///
///
public static async Task AddOrUpdateObjectAsync(
this FBMClient client,
string objectId,
string? newId,
T data,
ICacheObjectSerializer serializer,
CancellationToken cancellationToken = default)
{
_ = client ?? throw new ArgumentNullException(nameof(client));
_ = serializer ?? throw new ArgumentNullException(nameof(serializer));
client.LogDebug("Updating object {id}, newid {nid}", objectId, newId);
//Rent a new request
FBMRequest request = client.RentRequest();
try
{
//Set action as get/create
request.WriteHeader(HeaderCommand.Action, Actions.AddOrUpdate);
//Set object-id header
request.WriteHeader(Constants.ObjectId, objectId);
//if new-id set, set the new-id header
if (!string.IsNullOrWhiteSpace(newId))
{
request.WriteHeader(Constants.NewObjectId, newId);
}
//Serialize the message using the request buffer
serializer.Serialize(data, request.GetBodyWriter());
//Make request
using FBMResponse response = await client.SendAsync(request, cancellationToken);
response.ThrowIfNotSet();
//Get the status code
FBMMessageHeader status = response.Headers.FirstOrDefault(static a => a.Header == HeaderCommand.Status);
//Check status code
if (status.Value.Equals(ResponseCodes.Okay, StringComparison.OrdinalIgnoreCase))
{
return;
}
else if (status.Value.Equals(ResponseCodes.NotFound, StringComparison.OrdinalIgnoreCase))
{
throw new ObjectNotFoundException($"object {objectId} not found on remote server");
}
//Invalid status
throw new InvalidStatusException("Invalid status code recived for object upsert request", status.ToString());
}
finally
{
//Return the request(clears data and reset)
client.ReturnRequest(request);
}
}
///
/// Updates the state of the object, and optionally updates the ID of the object. The data
/// parameter is serialized, buffered, and streamed to the remote server
///
///
/// The id of the object to update or replace
/// An optional parameter to specify a new ID for the old object
/// An that represents the data to set
/// A token to cancel the operation
/// A task that resolves when the server responds
///
///
///
///
///
///
public static Task AddOrUpdateObjectAsync(this FBMClient client, string objectId, string? newId, IObjectData data, CancellationToken cancellationToken = default)
{
return AddOrUpdateObjectAsync(client, objectId, newId, static d => d.GetData(), data, cancellationToken);
}
///
/// Updates the state of the object, and optionally updates the ID of the object. The data
/// parameter is serialized, buffered, and streamed to the remote server
///
///
/// The id of the object to update or replace
/// An optional parameter to specify a new ID for the old object
/// A callback method that will return the desired object data
/// A token to cancel the operation
/// The state to be passed to the callback
/// A task that resolves when the server responds
///
///
///
///
///
///
public static async Task AddOrUpdateObjectAsync(this FBMClient client, string objectId, string? newId, ObjectDataReader callback, T state, CancellationToken cancellationToken = default)
{
_ = client ?? throw new ArgumentNullException(nameof(client));
_ = callback ?? throw new ArgumentNullException(nameof(callback));
client.LogDebug("Updating object {id}, newid {nid}", objectId, newId);
//Rent a new request
FBMRequest request = client.RentRequest();
try
{
//Set action as get/create
request.WriteHeader(HeaderCommand.Action, Actions.AddOrUpdate);
//Set session-id header
request.WriteHeader(Constants.ObjectId, objectId);
//if new-id set, set the new-id header
if (!string.IsNullOrWhiteSpace(newId))
{
request.WriteHeader(Constants.NewObjectId, newId);
}
//Write the message body as the objet data
request.WriteBody(callback(state));
//Make request
using FBMResponse response = await client.SendAsync(request, cancellationToken);
response.ThrowIfNotSet();
//Get the status code
FBMMessageHeader status = response.Headers.FirstOrDefault(static a => a.Header == HeaderCommand.Status);
//Check status code
if (status.Value.Equals(ResponseCodes.Okay, StringComparison.OrdinalIgnoreCase))
{
return;
}
else if (status.Value.Equals(ResponseCodes.NotFound, StringComparison.OrdinalIgnoreCase))
{
throw new ObjectNotFoundException($"object {objectId} not found on remote server");
}
//Invalid status
throw new InvalidStatusException("Invalid status code recived for object upsert request", status.ToString());
}
finally
{
//Return the request(clears data and reset)
client.ReturnRequest(request);
}
}
///
/// Gets an object from the server if it exists, and uses the default serialzer to
/// recover the object
///
///
///
/// The id of the object to get
/// A token to cancel the operation
/// A task that completes to return the results of the response payload
///
///
///
///
///
public static Task GetObjectAsync(this FBMClient client, string objectId, CancellationToken cancellationToken = default)
{
return GetObjectAsync(client, objectId, DefaultSerializer, cancellationToken);
}
///
/// Gets an object from the server if it exists
///
///
///
/// The id of the object to get
/// A token to cancel the operation
/// The custom data deserialzer used to deserialze the binary cache result
/// A task that completes to return the results of the response payload
///
///
///
public static async Task GetObjectAsync(this FBMClient client, string objectId, ICacheObjectDeserializer deserialzer, CancellationToken cancellationToken = default)
{
_ = client ?? throw new ArgumentNullException(nameof(client));
_ = deserialzer ?? throw new ArgumentNullException(nameof(deserialzer));
client.LogDebug("Getting object {id}", objectId);
//Rent a new request
FBMRequest request = client.RentRequest();
try
{
//Set action as get/create
request.WriteHeader(HeaderCommand.Action, Actions.Get);
//Set object id header
request.WriteHeader(Constants.ObjectId, objectId);
//Make request
using FBMResponse response = await client.SendAsync(request, cancellationToken);
response.ThrowIfNotSet();
//Get the status code
FBMMessageHeader status = response.Headers.FirstOrDefault(static a => a.Header == HeaderCommand.Status);
//Check ok status code, then its safe to deserialize
if (status.Value.Equals(ResponseCodes.Okay, StringComparison.Ordinal))
{
return deserialzer.Deserialize(response.ResponseBody);
}
//Object may not exist on the server yet
if (status.Value.Equals(ResponseCodes.NotFound, StringComparison.Ordinal))
{
return default;
}
throw new InvalidStatusException("Invalid status code recived for object get request", status.ToString());
}
finally
{
client.ReturnRequest(request);
}
}
///
/// Gets an object from the server if it exists. If data is retreived, it sets
/// the , if no data is
/// found, this method returns and never calls SetData.
///
///
/// The id of the object to get
/// An object data instance used to store the found object data
/// A token to cancel the operation
/// A task that completes to return the results of the response payload
///
///
///
public static Task GetObjectAsync(this FBMClient client, string objectId, IObjectData data, CancellationToken cancellationToken = default)
{
return GetObjectAsync(client, objectId, static (p, d) => p.SetData(d), data, cancellationToken);
}
///
/// Gets an object from the server if it exists. If data is retreived, it sets
/// the , if no data is
/// found, this method returns and never calls SetData.
///
///
/// The id of the object to get
/// A callback method used to store the recovered object data
/// The state parameter to pass to the callback method
/// A token to cancel the operation
/// When complete, true if the object was found, false if not found, and an exception otherwise
///
///
///
public static async Task GetObjectAsync(this FBMClient client, string objectId, ObjectDataSet setter, T state, CancellationToken cancellationToken = default)
{
_ = client ?? throw new ArgumentNullException(nameof(client));
_ = setter ?? throw new ArgumentNullException(nameof(setter));
client.LogDebug("Getting object {id}", objectId);
//Rent a new request
FBMRequest request = client.RentRequest();
try
{
//Set action as get/create
request.WriteHeader(HeaderCommand.Action, Actions.Get);
//Set object id header
request.WriteHeader(Constants.ObjectId, objectId);
//Make request
using FBMResponse response = await client.SendAsync(request, cancellationToken);
response.ThrowIfNotSet();
//Get the status code
FBMMessageHeader status = response.Headers.FirstOrDefault(static a => a.Header == HeaderCommand.Status);
//Check ok status code, then its safe to deserialize
if (status.Value.Equals(ResponseCodes.Okay, StringComparison.Ordinal))
{
//Write the object data
setter(state, response.ResponseBody);
return true;
}
//Object may not exist on the server yet
if (status.Value.Equals(ResponseCodes.NotFound, StringComparison.Ordinal))
{
return false;
}
throw new InvalidStatusException("Invalid status code recived for object get request", status.ToString());
}
finally
{
client.ReturnRequest(request);
}
}
///
/// Asynchronously deletes an object in the remote store
///
///
/// The id of the object to update or replace
/// A token to cancel the operation
/// A task that resolves when the operation has completed
///
///
///
///
public static async Task DeleteObjectAsync(this FBMClient client, string objectId, CancellationToken cancellationToken = default)
{
_ = client ?? throw new ArgumentNullException(nameof(client));
client.LogDebug("Deleting object {id}", objectId);
//Rent a new request
FBMRequest request = client.RentRequest();
try
{
//Set action as delete
request.WriteHeader(HeaderCommand.Action, Actions.Delete);
//Set session-id header
request.WriteHeader(Constants.ObjectId, objectId);
//Make request
using FBMResponse response = await client.SendAsync(request, cancellationToken);
response.ThrowIfNotSet();
//Get the status code
FBMMessageHeader status = response.Headers.FirstOrDefault(static a => a.Header == HeaderCommand.Status);
if (status.Value.Equals(ResponseCodes.Okay, StringComparison.Ordinal))
{
return true;
}
else if (status.Value.Equals(ResponseCodes.NotFound, StringComparison.OrdinalIgnoreCase))
{
return false;
}
throw new InvalidStatusException("Invalid status code recived for object get request", status.ToString());
}
finally
{
client.ReturnRequest(request);
}
}
///
/// Dequeues a change event from the server event queue for the current connection, or waits until a change happens
///
///
/// The instance to store change event data to
/// A token to cancel the deuque operation
/// A that contains information about the modified element
///
///
public static async Task WaitForChangeAsync(this FBMClient client, WaitForChangeResult change, CancellationToken cancellationToken = default)
{
_ = change ?? throw new ArgumentNullException(nameof(change));
//Rent a new request
FBMRequest request = client.RentRequest();
try
{
//Set action as event dequeue to dequeue a change event
request.WriteHeader(HeaderCommand.Action, Actions.Dequeue);
//Make request
using FBMResponse response = await client.SendAsync(request, cancellationToken);
response.ThrowIfNotSet();
change.Status = response.Headers.FirstOrDefault(static a => a.Header == HeaderCommand.Status).Value.ToString();
change.CurrentId = response.Headers.SingleOrDefault(static v => v.Header == Constants.ObjectId).Value.ToString();
change.NewId = response.Headers.SingleOrDefault(static v => v.Header == Constants.NewObjectId).Value.ToString();
}
finally
{
client.ReturnRequest(request);
}
}
///
/// Gets the Object-id for the request message, or throws an if not specified
///
///
/// The id of the object requested
///
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static string ObjectId(this FBMContext context)
{
return context.Request.Headers.First(static kvp => kvp.Header == Constants.ObjectId).Value.ToString();
}
///
/// Gets the new ID of the object if specified from the request. Null if the request did not specify an id update
///
///
/// The new ID of the object if speicifed, null otherwise
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static string? NewObjectId(this FBMContext context)
{
return context.Request.Headers.FirstOrDefault(static kvp => kvp.Header == Constants.NewObjectId).GetValueString();
}
///
/// Gets the request method for the request
///
///
/// The request method string
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static string Method(this FBMContext context)
{
return context.Request.Headers.First(static kvp => kvp.Header == HeaderCommand.Action).Value.ToString();
}
///
/// Closes a response with a status code
///
///
/// The status code to send to the client
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static void CloseResponse(this FBMContext context, string responseCode)
{
context.Response.WriteHeader(HeaderCommand.Status, responseCode);
}
///
/// Initializes the worker for a reconnect policy and returns an object that can listen for changes
/// and configure the connection as necessary
///
///
/// The amount of time to wait between retries
/// The uri to reconnect the client to
/// A for listening for retry events
public static ClientRetryManager SetReconnectPolicy(this T worker, TimeSpan retryDelay, Uri serverUri) where T: IStatefulConnection
{
//Return new manager
return new (worker, retryDelay, serverUri);
}
}
}