/* * Copyright (c) 2024 Vaughn Nugent * * Library: VNLib * Package: VNLib.Data.Caching.ObjectCache * File: BlobCacheExtensions.cs * * BlobCacheExtensions.cs is part of VNLib.Data.Caching.ObjectCache which is part of the larger * VNLib collection of libraries and utilities. * * VNLib.Data.Caching.ObjectCache is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License as * published by the Free Software Foundation, either version 3 of the * License, or (at your option) any later version. * * VNLib.Data.Caching.ObjectCache is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see https://www.gnu.org/licenses/. */ using System; using System.Threading; using System.Threading.Tasks; namespace VNLib.Data.Caching.ObjectCache { /// /// Provides blob cache extension methods /// public static class BlobCacheExtensions { /// /// Gets a that holds an exclusive lock /// for the current bucekt and holds a referrence to the stored /// /// /// /// A token to cancel the wait operation /// A that holds the referrence public static ValueTask WaitAsync(this IBlobCacheBucket bucket, CancellationToken cancellation) { _ = bucket ?? throw new ArgumentNullException(nameof(bucket)); //Try enter the bucket lock ValueTask cacheWait = bucket.ManualWaitAsync(cancellation); if (cacheWait.IsCompleted) { IBlobCache bucketHandle = cacheWait.GetAwaiter().GetResult(); return new ValueTask(new CacheBucketHandle(bucket, bucketHandle)); } else { return GetHandleAsync(cacheWait, bucket); } static async ValueTask GetHandleAsync(ValueTask waitTask, IBlobCacheBucket bucket) { IBlobCache cache = await waitTask.ConfigureAwait(false); return new CacheBucketHandle(bucket, cache); } } internal static CacheEntry CreateEntry(this IBlobCache cache, string objectId, ReadOnlySpan initialData, DateTime time) { CacheEntry entry = CacheEntry.Create(initialData, cache.MemoryManager); try { //try to add the entry, but if exists, let it throw cache.Add(objectId, entry); entry.SetTime(time); return entry; } catch { entry.Dispose(); throw; } } internal static CacheEntry AddOrUpdateEntry(this IBlobCache cache, string objectId, ReadOnlySpan data, DateTime time) { //See if blob exists if (cache.TryGetValue(objectId, out CacheEntry entry)) { //Update the entry since it exists entry.UpdateData(data); entry.SetTime(time); } else { //Create the new entry entry = cache.CreateEntry(objectId, data, time); } return entry; } internal static CacheEntry TryChangeKey(this IBlobCache cache, string objectId, string alternateId, ReadOnlySpan data, DateTime time) { //Change the key of the blob item and update its data if (cache.TryChangeKey(objectId, alternateId, out CacheEntry entry)) { //If date is 0 length do not overwrite the old entry if found if (data.IsEmpty) { return entry; } //Otherwise update the entry entry.UpdateData(data); entry.SetTime(time); return entry; } else { //entry does not exist at the old id, so we can create a new one at the alternate id return cache.CreateEntry(objectId, data, time); } } /// /// Asynchronously adds or updates an object in the store and optionally update's it's id. /// If the alternate key already exists, it's data is overwritten. /// /// /// The current (or old) id of the object /// An optional id to update the blob to /// A callback that returns the data for the blob /// The state parameter to pass to the data callback /// The time to set on the cache record /// A token to cancel the async operation /// A value task that represents the async operation public static async ValueTask AddOrUpdateObjectAsync( this IBlobCacheTable table, string objectId, string? alternateId, ObjectDataGet bodyData, T state, DateTime time, CancellationToken cancellation = default) { ArgumentNullException.ThrowIfNull(table); ArgumentNullException.ThrowIfNull(bodyData); ArgumentException.ThrowIfNullOrWhiteSpace(objectId); //See if an id change is required if (string.IsNullOrWhiteSpace(alternateId)) { //safe to get the bucket for the primary id IBlobCacheBucket bucket = table.GetBucket(objectId); //Wait for the bucket to be available IBlobCache cache = await bucket.ManualWaitAsync(cancellation); try { _ = cache.AddOrUpdateEntry(objectId, bodyData(state), time); } finally { bucket.Release(); } } else { //Buckets for each id need to be obtained IBlobCacheBucket primary = table.GetBucket(objectId); IBlobCacheBucket alternate = table.GetBucket(alternateId); //Same bucket if (ReferenceEquals(primary, alternate)) { IBlobCache cache = await primary.ManualWaitAsync(cancellation); try { //Update the entry for the single bucket _ = cache.TryChangeKey(objectId, alternateId, bodyData(state), time); } finally { primary.Release(); } } else { //Buckets are different must be awaited individually using CacheBucketHandle primaryHandle = await primary.WaitAsync(cancellation); using CacheBucketHandle alternateHandle = await alternate.WaitAsync(cancellation); //Get the entry from the primary hande if (primaryHandle.Cache.Remove(objectId, out CacheEntry entry)) { try { //Try to see if the alternate key already exists if (alternateHandle.Cache.TryGetValue(alternateId, out CacheEntry existing)) { existing.UpdateData(bodyData(state)); //dispose the old entry since we don't need it entry.Dispose(); } else { //Update the entry buffer and reuse the entry entry.UpdateData(bodyData(state)); //Add the updated entry to the alternate table alternateHandle.Cache.Add(alternateId, entry); } } catch { //Cleanup removed entry if error adding entry.Dispose(); throw; } } else { //Try to see if the alternate key already exists in the target store if (alternateHandle.Cache.TryGetValue(alternateId, out CacheEntry existing)) { //overwrite the existing entry data existing.UpdateData(bodyData(state)); } else { //Old entry did not exist, we need to create a new entry for the alternate bucket _ = alternateHandle.Cache.CreateEntry(alternateId, bodyData(state), time); } } } } } /// /// Asynchronously deletes a previously stored item /// /// /// The id of the object to delete /// A token to cancel the async lock await /// A task that completes when the item has been deleted public static ValueTask DeleteObjectAsync(this IBlobCacheTable table, string objectId, CancellationToken cancellation = default) { ArgumentNullException.ThrowIfNull(table); //Try to get the bucket that the id should belong to IBlobCacheBucket bucket = table.GetBucket(objectId); return DeleteObjectAsync(bucket, objectId, cancellation); } /// /// Asynchronously deletes a previously stored item /// /// /// The id of the object to delete /// A token to cancel the async lock await /// A task that completes when the item has been deleted public static async ValueTask DeleteObjectAsync(this IBlobCacheBucket bucket, string objectId, CancellationToken cancellation = default) { ArgumentNullException.ThrowIfNull(bucket); //Wait for the bucket IBlobCache cache = await bucket.ManualWaitAsync(cancellation); try { return cache.Remove(objectId); } finally { bucket.Release(); } } } }