aboutsummaryrefslogtreecommitdiff
path: root/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs
diff options
context:
space:
mode:
authorLibravatar vnugent <public@vaughnnugent.com>2023-06-12 19:04:15 -0400
committerLibravatar vnugent <public@vaughnnugent.com>2023-06-12 19:04:15 -0400
commitdc0fc53fd3c3f6c32c8b0d063922c7018fa2c48f (patch)
tree92f963014624a1016f6cb645af5afd18278c54c3 /plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs
parent392b38a40e01f2d4dbd457da122dfaf7a1ffe00f (diff)
Baby steps for autonomous nodes
Diffstat (limited to 'plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs')
-rw-r--r--plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs437
1 files changed, 220 insertions, 217 deletions
diff --git a/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs b/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs
index 2f896bc..167a7e9 100644
--- a/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs
+++ b/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs
@@ -27,40 +27,46 @@ using System.Net;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
-using System.Threading.Channels;
using System.Collections.Generic;
-using System.Collections.Concurrent;
-using VNLib.Plugins;
using VNLib.Hashing;
using VNLib.Net.Http;
using VNLib.Utils.Async;
using VNLib.Utils.Memory;
using VNLib.Utils.Logging;
+using VNLib.Utils.Extensions;
using VNLib.Data.Caching;
+using VNLib.Data.Caching.Extensions;
using VNLib.Hashing.IdentityUtility;
using VNLib.Net.Messaging.FBM;
using VNLib.Net.Messaging.FBM.Client;
using VNLib.Net.Messaging.FBM.Server;
+using VNLib.Plugins;
using VNLib.Plugins.Essentials;
using VNLib.Plugins.Extensions.Loading;
using VNLib.Plugins.Essentials.Endpoints;
using VNLib.Plugins.Essentials.Extensions;
-
+using VNLib.Plugins.Extensions.Loading.Routing;
+using VNLib.Data.Caching.ObjectCache.Server.Distribution;
namespace VNLib.Data.Caching.ObjectCache.Server
{
- [ConfigurationName("store")]
- internal sealed class ConnectEndpoint : ResourceEndpointBase, IDisposable, IAsyncBackgroundWork
+ [ConfigurationName("connect_endpoint")]
+ internal sealed class ConnectEndpoint : ResourceEndpointBase, IAsyncBackgroundWork
{
- private static readonly TimeSpan AuthTokenExpiration = TimeSpan.FromSeconds(30);
+ private static readonly TimeSpan AuthTokenExpiration = TimeSpan.FromSeconds(30);
+
+
+ private readonly CacheNodeConfiguration NodeConfiguration;
+ private readonly ICacheEventQueueManager PubSubManager;
+ private readonly IPeerMonitor Peers;
- private readonly string AudienceLocalServerId;
private readonly BlobCacheListener Store;
- private readonly PluginBase Pbase;
+ private readonly CacheAuthKeyStore KeyStore;
- private readonly ConcurrentDictionary<string, AsyncQueue<ChangeEvent>> StatefulEventQueue;
+ private readonly bool VerifyIp;
+ private readonly string AudienceLocalServerId;
private uint _connectedClients;
@@ -87,27 +93,26 @@ namespace VNLib.Data.Caching.ObjectCache.Server
string? path = config["path"].GetString();
InitPathAndLog(path, plugin.Log);
-
- Pbase = plugin;
- //Parse cache config or use default
- if(config.TryGetValue("cache", out JsonElement confEl))
- {
- CacheConfig = confEl.Deserialize<CacheConfiguration>()!;
- }
- else
- {
- //Init default config if not fount
- CacheConfig = new();
+ KeyStore = new(plugin);
- Log.Verbose("Loading default cache buffer configuration");
- }
+ //Check for ip-verification flag
+ VerifyIp = config.TryGetValue("verify_ip", out JsonElement vIpEl) && vIpEl.GetBoolean();
- //Create event queue client lookup table
- StatefulEventQueue = new(StringComparer.OrdinalIgnoreCase);
+ //Setup pub/sub manager
+ PubSubManager = plugin.GetOrCreateSingleton<CacheEventQueueManager>();
+
+ //Get node configuration
+ NodeConfiguration = plugin.GetOrCreateSingleton<NodeConfig>().Config;
+
+ //Get peer monitor
+ Peers = plugin.GetOrCreateSingleton<CachePeerMonitor>();
//Init the cache store
- Store = InitializeCache((ObjectCacheServerEntry)plugin, CacheConfig, config);
+ Store = plugin.GetOrCreateSingleton<CacheStore>().Listener;
+
+ //Get the cache store configuration
+ CacheConfig = plugin.GetConfigForType<CacheStore>().Deserialze<CacheConfiguration>();
/*
* Generate a random guid for the current server when created so we
@@ -118,60 +123,8 @@ namespace VNLib.Data.Caching.ObjectCache.Server
//Schedule the queue worker to be run
_ = plugin.ObserveWork(this, 100);
}
-
-
- private static BlobCacheListener InitializeCache(ObjectCacheServerEntry plugin, CacheConfiguration cacheConf, IConfigScope config)
- {
- if(cacheConf.MaxCacheEntries < 2)
- {
- throw new ArgumentException("You must configure a 'max_cache' size larger than 1 item");
- }
-
- //Suggestion
- if(cacheConf.MaxCacheEntries < 200)
- {
- plugin.Log.Information("Suggestion: You may want a larger cache size, you have less than 200 items in cache");
- }
-
- plugin.Log.Verbose("Creating cache store with {bc} buckets, with {mc} items/bucket", cacheConf.BucketCount, cacheConf.MaxCacheEntries);
-
- //Load the blob cache table system
- IBlobCacheTable bc = plugin.LoadMemoryCacheSystem(config, plugin.CacheHeap, cacheConf);
-
- //Endpoint only allows for a single reader
- return new (bc, plugin.Log, plugin.CacheHeap, true);
- }
-
-
- /// <summary>
- /// Gets the configured cache store
- /// </summary>
- /// <returns></returns>
- public ICacheStore GetCacheStore() => new CacheStore(Store);
- //Dispose will be called by the host plugin on unload
- void IDisposable.Dispose()
- {
- //Dispose the store on cleanup
- Store.Dispose();
- }
-
-
- private async Task<ReadOnlyJsonWebKey> GetClientPubAsync()
- {
- return await Pbase.TryGetSecretAsync("client_public_key").ToJsonWebKey() ?? throw new KeyNotFoundException("Missing required secret : client_public_key");
- }
- private async Task<ReadOnlyJsonWebKey> GetCachePubAsync()
- {
- return await Pbase.TryGetSecretAsync("cache_public_key").ToJsonWebKey() ?? throw new KeyNotFoundException("Missing required secret : client_public_key");
- }
- private async Task<ReadOnlyJsonWebKey> GetCachePrivateKeyAsync()
- {
- return await Pbase.TryGetSecretAsync("cache_private_key").ToJsonWebKey() ?? throw new KeyNotFoundException("Missing required secret : client_public_key");
- }
-
-
/*
* Used as a client negotiation and verification request
*
@@ -183,6 +136,10 @@ namespace VNLib.Data.Caching.ObjectCache.Server
*
* The tokens are very short lived as requests are intended to be made
* directly after verification
+ *
+ * Clients must also sign the entire token with their private key and
+ * set the signature in the x-upgrade-sig header so we can verify they
+ * received the messages properly
*/
protected override async ValueTask<VfReturnType> GetAsync(HttpEntity entity)
@@ -205,7 +162,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server
bool verified = false;
//Get the client public key certificate to verify the client's message
- using(ReadOnlyJsonWebKey cert = await GetClientPubAsync())
+ using(ReadOnlyJsonWebKey cert = await KeyStore.GetClientPublicKeyAsync())
{
//verify signature for client
if (jwt.VerifyFromJwk(cert))
@@ -215,10 +172,8 @@ namespace VNLib.Data.Caching.ObjectCache.Server
//May be signed by a cache server
else
{
- using ReadOnlyJsonWebKey cacheCert = await GetCachePubAsync();
-
//Set peer and verified flag since the another cache server signed the request
- isPeer = verified = jwt.VerifyFromJwk(cacheCert);
+ isPeer = verified = NodeConfiguration.VerifyCache(jwt);
}
}
@@ -232,10 +187,12 @@ namespace VNLib.Data.Caching.ObjectCache.Server
//Recover json body
using JsonDocument doc = jwt.GetPayload();
+
if (doc.RootElement.TryGetProperty("sub", out JsonElement servIdEl))
{
nodeId = servIdEl.GetString();
}
+
if (doc.RootElement.TryGetProperty("chl", out JsonElement challengeEl))
{
challenge = challengeEl.GetString();
@@ -246,133 +203,147 @@ namespace VNLib.Data.Caching.ObjectCache.Server
//Verified, now we can create an auth message with a short expiration
using JsonWebToken auth = new();
+
+ auth.WriteHeader(NodeConfiguration.GetJwtHeader());
+ auth.InitPayloadClaim()
+ .AddClaim("aud", AudienceLocalServerId)
+ .AddClaim("exp", entity.RequestedTimeUtc.Add(AuthTokenExpiration).ToUnixTimeSeconds())
+ .AddClaim("nonce", RandomHash.GetRandomBase32(8))
+ .AddClaim("chl", challenge!)
+ //Set the ispeer flag if the request was signed by a cache server
+ .AddClaim("isPeer", isPeer)
+ //Specify the server's node id if set
+ .AddClaim("sub", nodeId!)
+ //Set ip address
+ .AddClaim("ip", entity.TrustedRemoteIp.ToString())
+ //Add negotiaion args
+ .AddClaim(FBMClient.REQ_HEAD_BUF_QUERY_ARG, CacheConfig.MaxHeaderBufferSize)
+ .AddClaim(FBMClient.REQ_RECV_BUF_QUERY_ARG, CacheConfig.MaxRecvBufferSize)
+ .AddClaim(FBMClient.REQ_MAX_MESS_QUERY_ARG, CacheConfig.MaxMessageSize)
+ .CommitClaims();
+
+ //Sign the auth message from our private key
+ NodeConfiguration.SignJwt(auth);
- //Sign the auth message from the cache certificate's private key
- using (ReadOnlyJsonWebKey cert = await GetCachePrivateKeyAsync())
- {
- auth.WriteHeader(cert.JwtHeader);
- auth.InitPayloadClaim()
- .AddClaim("aud", AudienceLocalServerId)
- .AddClaim("exp", entity.RequestedTimeUtc.Add(AuthTokenExpiration).ToUnixTimeSeconds())
- .AddClaim("nonce", RandomHash.GetRandomBase32(8))
- .AddClaim("chl", challenge!)
- //Set the ispeer flag if the request was signed by a cache server
- .AddClaim("isPeer", isPeer)
- //Specify the server's node id if set
- .AddClaim("sub", nodeId!)
- //Add negotiaion args
- .AddClaim(FBMClient.REQ_HEAD_BUF_QUERY_ARG, CacheConfig.MaxHeaderBufferSize)
- .AddClaim(FBMClient.REQ_RECV_BUF_QUERY_ARG, CacheConfig.MaxRecvBufferSize)
- .AddClaim(FBMClient.REQ_MAX_MESS_QUERY_ARG, CacheConfig.MaxMessageSize)
- .CommitClaims();
-
- auth.SignFromJwk(cert);
- }
-
//Close response
entity.CloseResponse(HttpStatusCode.OK, ContentType.Text, auth.DataBuffer);
return VfReturnType.VirtualSkip;
}
-
- //Background worker to process event queue items
- async Task IAsyncBackgroundWork.DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken)
+ protected override async ValueTask<VfReturnType> WebsocketRequestedAsync(HttpEntity entity)
{
- try
- {
- //Listen for changes
- while (true)
- {
- ChangeEvent ev = await Store.EventQueue.DequeueAsync(exitToken);
+ //Parse jwt from authorization
+ string? jwtAuth = entity.Server.Headers[HttpRequestHeader.Authorization];
- //Add event to queues
- foreach (AsyncQueue<ChangeEvent> queue in StatefulEventQueue.Values)
- {
- if (!queue.TryEnque(ev))
- {
- Log.Debug("Listener queue has exeeded capacity, change events will be lost");
- }
- }
- }
- }
- catch (OperationCanceledException)
+ if (string.IsNullOrWhiteSpace(jwtAuth))
{
- //Normal exit
+ entity.CloseResponse(HttpStatusCode.Unauthorized);
+ return VfReturnType.VirtualSkip;
}
- }
- private class WsUserState
- {
- public int RecvBufferSize { get; init; }
- public int MaxHeaderBufferSize { get; init; }
- public int MaxMessageSize { get; init; }
- public int MaxResponseBufferSize { get; init; }
- public AsyncQueue<ChangeEvent>? SyncQueue { get; init; }
+ //Get the upgrade signature header
+ string? clientSignature = entity.Server.Headers[FBMDataCacheExtensions.X_UPGRADE_SIG_HEADER];
- public override string ToString()
+ if (string.IsNullOrWhiteSpace(clientSignature))
{
- return
- $"{nameof(RecvBufferSize)}:{RecvBufferSize}, {nameof(MaxHeaderBufferSize)}: {MaxHeaderBufferSize}, {nameof(MaxMessageSize)}:{MaxMessageSize}, {nameof(MaxResponseBufferSize)}:{MaxResponseBufferSize}";
+ entity.CloseResponse(HttpStatusCode.Unauthorized);
+ return VfReturnType.VirtualSkip;
}
- }
- protected override async ValueTask<VfReturnType> WebsocketRequestedAsync(HttpEntity entity)
- {
- try
+ string? nodeId = null;
+ ICachePeerAdvertisment? discoveryAd = null;
+
+ //Parse jwt
+ using (JsonWebToken jwt = JsonWebToken.Parse(jwtAuth))
{
- //Parse jwt from authorization
- string? jwtAuth = entity.Server.Headers[HttpRequestHeader.Authorization];
+ //verify signature against the cache public key, since this server must have signed it
+ if (!NodeConfiguration.VerifyCache(jwt))
+ {
+ entity.CloseResponse(HttpStatusCode.Unauthorized);
+ return VfReturnType.VirtualSkip;
+ }
+
+ //Recover json body
+ using JsonDocument doc = jwt.GetPayload();
+
+ //Verify audience, expiration
- if (string.IsNullOrWhiteSpace(jwtAuth))
+ if (!doc.RootElement.TryGetProperty("aud", out JsonElement audEl) || !AudienceLocalServerId.Equals(audEl.GetString(), StringComparison.OrdinalIgnoreCase))
+ {
+ entity.CloseResponse(HttpStatusCode.Unauthorized);
+ return VfReturnType.VirtualSkip;
+ }
+
+ if (!doc.RootElement.TryGetProperty("exp", out JsonElement expEl)
+ || DateTimeOffset.FromUnixTimeSeconds(expEl.GetInt64()) < entity.RequestedTimeUtc)
{
entity.CloseResponse(HttpStatusCode.Unauthorized);
return VfReturnType.VirtualSkip;
}
-
- string? nodeId = null;
- //Parse jwt
- using (JsonWebToken jwt = JsonWebToken.Parse(jwtAuth))
+ //Check node ip address matches if required
+ if (VerifyIp)
{
- //Get the client public key certificate to verify the client's message
- using (ReadOnlyJsonWebKey cert = await GetCachePubAsync())
+ if (!doc.RootElement.TryGetProperty("ip", out JsonElement ipEl))
{
- //verify signature against the cache public key, since this server must have signed it
- if (!jwt.VerifyFromJwk(cert))
- {
- entity.CloseResponse(HttpStatusCode.Unauthorized);
- return VfReturnType.VirtualSkip;
- }
+ entity.CloseResponse(HttpStatusCode.Unauthorized);
+ return VfReturnType.VirtualSkip;
}
-
- //Recover json body
- using JsonDocument doc = jwt.GetPayload();
-
- //Verify audience, expiration
- if (!doc.RootElement.TryGetProperty("aud", out JsonElement audEl) || !AudienceLocalServerId.Equals(audEl.GetString(), StringComparison.OrdinalIgnoreCase))
+ string? clientIp = ipEl.GetString();
+ //Verify the client ip address matches the one in the token
+ if (clientIp == null || !IPAddress.TryParse(clientIp, out IPAddress? clientIpAddr) || !clientIpAddr.Equals(entity.TrustedRemoteIp))
{
entity.CloseResponse(HttpStatusCode.Unauthorized);
return VfReturnType.VirtualSkip;
}
+ }
+
+ //Check if the client is a peer
+ bool isPeer = doc.RootElement.TryGetProperty("isPeer", out JsonElement isPeerEl) && isPeerEl.GetBoolean();
- if (!doc.RootElement.TryGetProperty("exp", out JsonElement expEl)
- || DateTimeOffset.FromUnixTimeSeconds(expEl.GetInt64()) < entity.RequestedTimeUtc)
+ //The node id is optional and stored in the 'sub' field, ignore if the client is not a peer
+ if (isPeer && doc.RootElement.TryGetProperty("sub", out JsonElement servIdEl))
+ {
+ nodeId = servIdEl.GetString();
+ }
+
+ //Verify the signature the client included of the auth token
+
+ if (isPeer)
+ {
+ //Verify token signature against a fellow cache public key
+ if (!NodeConfiguration.VerifyUpgradeToken(clientSignature, jwtAuth))
{
entity.CloseResponse(HttpStatusCode.Unauthorized);
return VfReturnType.VirtualSkip;
}
- //Check if the client is a peer
- bool isPeer = doc.RootElement.TryGetProperty("isPeer", out JsonElement isPeerEl) && isPeerEl.GetBoolean();
+ //Try to get the node advertisement header
+ string? discoveryHeader = entity.Server.Headers[FBMDataCacheExtensions.X_NODE_DISCOVERY_HEADER];
- //The node id is optional and stored in the 'sub' field, ignore if the client is not a peer
- if (isPeer && doc.RootElement.TryGetProperty("sub", out JsonElement servIdEl))
+ //Verify the node advertisement header and publish it
+ if (!string.IsNullOrWhiteSpace(discoveryHeader))
{
- nodeId = servIdEl.GetString();
+ discoveryAd = NodeConfiguration.VerifyPeerAdvertisment(discoveryHeader);
}
}
-
+ else
+ {
+ //Not a peer, so verify against the client's public key
+ using ReadOnlyJsonWebKey clientPub = await KeyStore.GetClientPublicKeyAsync();
+
+ //Verify token signature
+ if (!FBMDataCacheExtensions.VerifyUpgradeToken(clientSignature, jwtAuth, clientPub))
+ {
+ entity.CloseResponse(HttpStatusCode.Unauthorized);
+ return VfReturnType.VirtualSkip;
+ }
+ }
+ }
+
+ try
+ {
//Get query config suggestions from the client
string recvBufCmd = entity.QueryArgs[FBMClient.REQ_RECV_BUF_QUERY_ARG];
string maxHeaderCharCmd = entity.QueryArgs[FBMClient.REQ_HEAD_BUF_QUERY_ARG];
@@ -382,34 +353,6 @@ namespace VNLib.Data.Caching.ObjectCache.Server
int recvBufSize = int.TryParse(recvBufCmd, out int rbs) ? rbs : CacheConfig.MinRecvBufferSize;
int maxHeadBufSize = int.TryParse(maxHeaderCharCmd, out int hbs) ? hbs : CacheConfig.MinHeaderBufferSize;
int maxMessageSize = int.TryParse(maxMessageSizeCmd, out int mxs) ? mxs : CacheConfig.MaxMessageSize;
-
- AsyncQueue<ChangeEvent>? nodeQueue = null;
-
- //The connection may be a caching server node, so get its node-id
- if (!string.IsNullOrWhiteSpace(nodeId))
- {
- /*
- * Store a new async queue, or get an old queue for the current node
- *
- * We should use a bounded queue and disacard LRU items, we also know
- * only a single writer is needed as the queue is processed on a single thread
- * and change events may be processed on mutliple threads.
- */
-
- BoundedChannelOptions queueOptions = new(CacheConfig.MaxEventQueueDepth)
- {
- AllowSynchronousContinuations = true,
- SingleReader = false,
- SingleWriter = true,
- //Drop oldest item in queue if full
- FullMode = BoundedChannelFullMode.DropOldest,
- };
-
- _ = StatefulEventQueue.TryAdd(nodeId, new(queueOptions));
-
- //Get the queue
- nodeQueue = StatefulEventQueue[nodeId];
- }
/*
* Buffer sizing can get messy as the response/resquest sizes can vary
@@ -434,7 +377,8 @@ namespace VNLib.Data.Caching.ObjectCache.Server
*/
MaxResponseBufferSize = (int)MemoryUtil.NearestPage(maxMessageSizeClamp),
- SyncQueue = nodeQueue
+ NodeId = nodeId,
+ Advertisment = discoveryAd
};
Log.Debug("Client recv buffer suggestion {recv}, header buffer size {head}, response buffer size {r}", recvBufCmd, maxHeaderCharCmd, state.MaxResponseBufferSize);
@@ -454,14 +398,19 @@ namespace VNLib.Data.Caching.ObjectCache.Server
private async Task WebsocketAcceptedAsync(WebSocketSession wss)
{
+ WsUserState state = (WsUserState)wss.UserState!;
+
+ //Notify peers of new connection
+ Peers.OnPeerConnected(state);
+
//Inc connected count
Interlocked.Increment(ref _connectedClients);
+
//Register plugin exit token to cancel the connected socket
- CancellationTokenRegistration reg = Pbase.UnloadToken.Register(wss.CancelAll);
+ CancellationTokenRegistration reg = this.GetPlugin().UnloadToken.Register(wss.CancelAll);
+
try
{
- WsUserState state = (wss.UserState as WsUserState)!;
-
//Init listener args from request
FBMListenerSessionParams args = new()
{
@@ -473,12 +422,33 @@ namespace VNLib.Data.Caching.ObjectCache.Server
HeaderEncoding = Helpers.DefaultEncoding,
};
- //Listen for requests
- await Store.ListenAsync(wss, args, state.SyncQueue);
+ //Check if the client is a peer node, if it is, subscribe to change events
+ if (!string.IsNullOrWhiteSpace(state.NodeId))
+ {
+ //Get the event queue for the current node
+ AsyncQueue<ChangeEvent> queue = PubSubManager.Subscribe(state);
+
+ try
+ {
+ //Begin listening for messages with a queue
+ await Store.ListenAsync(wss, args, queue);
+ }
+ finally
+ {
+ //ALAWYS Detatch listener
+ PubSubManager.Unsubscribe(state);
+ }
+ }
+ else
+ {
+ //Begin listening for messages without a queue
+ await Store.ListenAsync(wss, args, null);
+ }
}
catch (OperationCanceledException)
{
Log.Debug("Websocket connection was canceled");
+
//Disconnect the socket
await wss.CloseSocketOutputAsync(System.Net.WebSockets.WebSocketCloseStatus.NormalClosure, "unload", CancellationToken.None);
}
@@ -490,35 +460,68 @@ namespace VNLib.Data.Caching.ObjectCache.Server
{
//Dec connected count
Interlocked.Decrement(ref _connectedClients);
+
//Unregister the
reg.Unregister();
}
+
+ //Notify monitor of disconnect
+ Peers.OnPeerDisconnected(state);
+
Log.Debug("Server websocket exited");
}
-
- private sealed class CacheStore : ICacheStore
+
+ //Background worker to process event queue items
+ async Task IAsyncBackgroundWork.DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken)
{
- private readonly BlobCacheListener _cache;
+ const int accumulatorSize = 64;
- public CacheStore(BlobCacheListener cache)
+ try
{
- _cache = cache;
- }
+ //Accumulator for events
+ ChangeEvent[] accumulator = new ChangeEvent[accumulatorSize];
+ int ptr = 0;
- ValueTask ICacheStore.AddOrUpdateBlobAsync<T>(string objectId, string? alternateId, GetBodyDataCallback<T> bodyData, T state, CancellationToken token)
- {
- return _cache.Cache.AddOrUpdateObjectAsync(objectId, alternateId, bodyData, state, default, token);
- }
+ //Listen for changes
+ while (true)
+ {
+ //Wait for next event
+ accumulator[ptr++] = await Store.EventQueue.DequeueAsync(exitToken);
- void ICacheStore.Clear()
+ //try to accumulate more events until we can't anymore
+ while (Store.EventQueue.TryDequeue(out ChangeEvent? ev) && ptr < accumulatorSize)
+ {
+ accumulator[ptr++] = ev;
+ }
+
+ //Publish all events to subscribers
+ PubSubManager.PublishMultiple(accumulator.AsSpan(0, ptr));
+
+ //Reset pointer
+ ptr = 0;
+ }
+ }
+ catch (OperationCanceledException)
{
- throw new NotImplementedException();
+ //Normal exit
+ pluginLog.Debug("Change queue listener worker exited");
}
+ }
+
+ private class WsUserState : ICachePeer
+ {
+ public int RecvBufferSize { get; init; }
+ public int MaxHeaderBufferSize { get; init; }
+ public int MaxMessageSize { get; init; }
+ public int MaxResponseBufferSize { get; init; }
+ public string? NodeId { get; init; }
+ public ICachePeerAdvertisment? Advertisment { get; init; }
- ValueTask<bool> ICacheStore.DeleteItemAsync(string id, CancellationToken token)
+ public override string ToString()
{
- return _cache.Cache.DeleteObjectAsync(id, token);
+ return
+ $"{nameof(RecvBufferSize)}:{RecvBufferSize}, {nameof(MaxHeaderBufferSize)}: {MaxHeaderBufferSize}, {nameof(MaxMessageSize)}:{MaxMessageSize}, {nameof(MaxResponseBufferSize)}:{MaxResponseBufferSize}";
}
}
}