/*
* Copyright (c) 2023 Vaughn Nugent
*
* Library: VNLib
* Package: VNLib.Data.Caching.ObjectCache
* File: BlobCacheExtensions.cs
*
* BlobCacheExtensions.cs is part of VNLib.Data.Caching.ObjectCache which is part of the larger
* VNLib collection of libraries and utilities.
*
* VNLib.Data.Caching.ObjectCache is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* VNLib.Data.Caching.ObjectCache is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see https://www.gnu.org/licenses/.
*/
using System;
using System.Threading;
using System.Threading.Tasks;
namespace VNLib.Data.Caching.ObjectCache
{
///
/// Provides blob cache extension methods
///
public static class BlobCacheExtensions
{
///
/// Gets a that holds an exclusive lock
/// for the current bucekt and holds a referrence to the stored
///
///
///
/// A token to cancel the wait operation
/// A that holds the referrence
public static ValueTask WaitAsync(this IBlobCacheBucket bucket, CancellationToken cancellation)
{
_ = bucket ?? throw new ArgumentNullException(nameof(bucket));
//Try enter the bucket lock
ValueTask cacheWait = bucket.ManualWaitAsync(cancellation);
if (cacheWait.IsCompleted)
{
IBlobCache bucketHandle = cacheWait.GetAwaiter().GetResult();
return new ValueTask(new CacheBucketHandle(bucket, bucketHandle));
}
else
{
return GetHandleAsync(cacheWait, bucket);
}
static async ValueTask GetHandleAsync(ValueTask waitTask, IBlobCacheBucket bucket)
{
IBlobCache cache = await waitTask.ConfigureAwait(false);
return new CacheBucketHandle(bucket, cache);
}
}
internal static CacheEntry CreateEntry(this IBlobCache cache, string objectId, ReadOnlySpan initialData, DateTime time)
{
CacheEntry entry = CacheEntry.Create(initialData, cache.MemoryManager);
try
{
//try to add the entry, but if exists, let it throw
cache.Add(objectId, entry);
entry.SetTime(time);
return entry;
}
catch
{
entry.Dispose();
throw;
}
}
internal static CacheEntry AddOrUpdateEntry(this IBlobCache cache, string objectId, ReadOnlySpan data, DateTime time)
{
//See if blob exists
if (cache.TryGetValue(objectId, out CacheEntry entry))
{
//Update the entry since it exists
entry.UpdateData(data);
entry.SetTime(time);
}
else
{
//Create the new entry
entry = cache.CreateEntry(objectId, data, time);
}
return entry;
}
internal static CacheEntry TryChangeKey(this IBlobCache cache, string objectId, string alternateId, ReadOnlySpan data, DateTime time)
{
//Change the key of the blob item and update its data
if (cache.TryChangeKey(objectId, alternateId, out CacheEntry entry))
{
//If date is 0 length do not overwrite the old entry if found
if (data.IsEmpty)
{
return entry;
}
//Otherwise update the entry
entry.UpdateData(data);
entry.SetTime(time);
return entry;
}
else
{
//entry does not exist at the old id, so we can create a new one at the alternate id
return cache.CreateEntry(objectId, data, time);
}
}
///
/// Asynchronously adds or updates an object in the store and optionally update's it's id.
/// If the alternate key already exists, it's data is overwritten.
///
///
/// The current (or old) id of the object
/// An optional id to update the blob to
/// A callback that returns the data for the blob
/// The state parameter to pass to the data callback
/// The time to set on the cache record
/// A token to cancel the async operation
/// A value task that represents the async operation
public static async ValueTask AddOrUpdateObjectAsync(
this IBlobCacheTable table,
string objectId,
string? alternateId,
ObjectDataReader bodyData,
T state,
DateTime time,
CancellationToken cancellation = default)
{
_ = table ?? throw new ArgumentNullException(nameof(table));
_ = bodyData ?? throw new ArgumentNullException(nameof(bodyData));
//See if an id change is required
if (string.IsNullOrWhiteSpace(alternateId))
{
//safe to get the bucket for the primary id
IBlobCacheBucket bucket = table.GetBucket(objectId);
//Wait for the bucket to be available
IBlobCache cache = await bucket.ManualWaitAsync(cancellation);
try
{
_ = cache.AddOrUpdateEntry(objectId, bodyData(state), time);
}
finally
{
bucket.Release();
}
}
else
{
//Buckets for each id need to be obtained
IBlobCacheBucket primary = table.GetBucket(objectId);
IBlobCacheBucket alternate = table.GetBucket(alternateId);
//Same bucket
if (ReferenceEquals(primary, alternate))
{
IBlobCache cache = await primary.ManualWaitAsync(cancellation);
try
{
//Update the entry for the single bucket
_ = cache.TryChangeKey(objectId, alternateId, bodyData(state), time);
}
finally
{
primary.Release();
}
}
else
{
//Buckets are different must be awaited individually
using CacheBucketHandle primaryHandle = await primary.WaitAsync(cancellation);
using CacheBucketHandle alternateHandle = await alternate.WaitAsync(cancellation);
//Get the entry from the primary hande
if (primaryHandle.Cache.Remove(objectId, out CacheEntry entry))
{
try
{
//Try to see if the alternate key already exists
if (alternateHandle.Cache.TryGetValue(alternateId, out CacheEntry existing))
{
existing.UpdateData(bodyData(state));
//dispose the old entry since we don't need it
entry.Dispose();
}
else
{
//Update the entry buffer and reuse the entry
entry.UpdateData(bodyData(state));
//Add the updated entry to the alternate table
alternateHandle.Cache.Add(alternateId, entry);
}
}
catch
{
//Cleanup removed entry if error adding
entry.Dispose();
throw;
}
}
else
{
//Try to see if the alternate key already exists in the target store
if (alternateHandle.Cache.TryGetValue(alternateId, out CacheEntry existing))
{
//overwrite the existing entry data
existing.UpdateData(bodyData(state));
}
else
{
//Old entry did not exist, we need to create a new entry for the alternate bucket
_ = alternateHandle.Cache.CreateEntry(alternateId, bodyData(state), time);
}
}
}
}
}
///
/// Asynchronously deletes a previously stored item
///
///
/// The id of the object to delete
/// A token to cancel the async lock await
/// A task that completes when the item has been deleted
public static async ValueTask DeleteObjectAsync(this IBlobCacheTable table, string objectId, CancellationToken cancellation = default)
{
//Try to get the bucket that the id should belong to
IBlobCacheBucket bucket = table.GetBucket(objectId);
//Wait for the bucket
IBlobCache cache = await bucket.ManualWaitAsync(cancellation);
try
{
return cache.Remove(objectId);
}
finally
{
bucket.Release();
}
}
}
}