aboutsummaryrefslogtreecommitdiff
path: root/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs')
-rw-r--r--plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs91
1 files changed, 43 insertions, 48 deletions
diff --git a/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs b/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs
index 816e6c3..d6b733c 100644
--- a/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs
+++ b/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2023 Vaughn Nugent
+* Copyright (c) 2024 Vaughn Nugent
*
* Library: VNLib
* Package: ObjectCacheServer
@@ -55,11 +55,14 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
{
internal const string LOG_SCOPE_NAME = "CONEP";
-
- private readonly ICacheEventQueueManager PubSubManager;
- private readonly IPeerMonitor Peers;
- private readonly BlobCacheListener<IPeerEventQueue> Store;
- private readonly NodeConfig NodeConfiguration;
+
+ private readonly ObjectCacheSystemState _sysState;
+
+ private PeerEventQueueManager PubSubManager => _sysState.PeerEventQueue;
+ private CachePeerMonitor Peers => _sysState.PeerMonitor;
+ private BlobCacheListener<IPeerEventQueue> Listener => _sysState.Listener;
+ private NodeConfig NodeConfiguration => _sysState.Configuration;
+
private readonly CacheNegotationManager AuthManager;
private uint _connectedClients;
@@ -72,7 +75,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
/// <summary>
/// The cache store configuration
/// </summary>
- public CacheConfiguration CacheConfig { get; }
+ public CacheMemoryConfiguration CacheConfig => _sysState.MemoryConfiguration;
//Loosen up protection settings
///<inheritdoc/>
@@ -83,24 +86,11 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
public ConnectEndpoint(PluginBase plugin)
{
- //Get node configuration
- NodeConfiguration = plugin.GetOrCreateSingleton<NodeConfig>();
+ _sysState = plugin.GetOrCreateSingleton<ObjectCacheSystemState>();
//Init from config and create a new log scope
InitPathAndLog(NodeConfiguration.ConnectPath, plugin.Log.CreateScope(LOG_SCOPE_NAME));
-
- //Setup pub/sub manager
- PubSubManager = plugin.GetOrCreateSingleton<CacheEventQueueManager>();
-
- //Get peer monitor
- Peers = plugin.GetOrCreateSingleton<CachePeerMonitor>();
-
- //Init the cache store
- Store = plugin.GetOrCreateSingleton<CacheStore>().Listener;
-
- //Get the cache store configuration
- CacheConfig = plugin.GetConfigForType<CacheStore>().Deserialze<CacheConfiguration>();
-
+
//Get the auth manager
AuthManager = plugin.GetOrCreateSingleton<CacheNegotationManager>();
}
@@ -127,6 +117,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
{
//Parse jwt from authoriation
string? jwtAuth = entity.Server.Headers[HttpRequestHeader.Authorization];
+
if (string.IsNullOrWhiteSpace(jwtAuth))
{
return VirtualClose(entity, HttpStatusCode.Forbidden);
@@ -149,26 +140,34 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
}
//Verified, now we can create an auth message with a short expiration
- using JsonWebToken auth = AuthManager.ConfirmCLientNegotiation(state, entity.TrustedRemoteIp, entity.RequestedTimeUtc);
+ using JsonWebToken auth = AuthManager.ConfirmClientNegotiation(state, entity.TrustedRemoteIp, entity.RequestedTimeUtc);
- //Close response
+ //Close response by sending a copy of the signed token
entity.CloseResponse(HttpStatusCode.OK, ContentType.Text, auth.DataBuffer);
return VfReturnType.VirtualSkip;
}
protected override VfReturnType WebsocketRequested(HttpEntity entity)
{
+ /*
+ * Check to see if any more connections are allowed,
+ * otherwise deny the connection
+ *
+ * This is done here to prevent the server from being overloaded
+ * on a new connection. It would be ideal to not grant new tokens
+ * but malicious clients could cache a bunch of tokens and use them
+ * later, exhausting resources.
+ */
+ if(_connectedClients >= NodeConfiguration.MaxConcurrentConnections)
+ {
+ return VirtualClose(entity, HttpStatusCode.ServiceUnavailable);
+ }
+
//Parse jwt from authorization
string? jwtAuth = entity.Server.Headers[HttpRequestHeader.Authorization];
string? clientSignature = entity.Server.Headers[FBMDataCacheExtensions.X_UPGRADE_SIG_HEADER];
string? optionalDiscovery = entity.Server.Headers[FBMDataCacheExtensions.X_NODE_DISCOVERY_HEADER];
- //Not null
- if (string.IsNullOrWhiteSpace(jwtAuth) || string.IsNullOrWhiteSpace(clientSignature))
- {
- return VfReturnType.Forbidden;
- }
-
string? nodeId = null;
bool isPeer = false;
@@ -178,15 +177,15 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
return VirtualClose(entity, HttpStatusCode.Unauthorized);
}
- CacheNodeAdvertisment? discoveryAd = null;
-
/*
* If the client is a peer server, it may offer a signed advertisment
* that this node will have the duty of making available to other peers
* if it is valid
*/
- if (isPeer && !string.IsNullOrWhiteSpace(optionalDiscovery))
+ CacheNodeAdvertisment? discoveryAd = null;
+
+ if (isPeer)
{
discoveryAd = NodeConfiguration.KeyStore.VerifyPeerAdvertisment(optionalDiscovery);
}
@@ -196,11 +195,10 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
try
{
//Get query config suggestions from the client
- string recvBufCmd = entity.QueryArgs[FBMClient.REQ_RECV_BUF_QUERY_ARG];
- string maxHeaderCharCmd = entity.QueryArgs[FBMClient.REQ_HEAD_BUF_QUERY_ARG];
- string maxMessageSizeCmd = entity.QueryArgs[FBMClient.REQ_MAX_MESS_QUERY_ARG];
+ string? recvBufCmd = entity.QueryArgs.GetValueOrDefault(FBMClient.REQ_RECV_BUF_QUERY_ARG);
+ string? maxHeaderCharCmd = entity.QueryArgs.GetValueOrDefault(FBMClient.REQ_HEAD_BUF_QUERY_ARG);
+ string? maxMessageSizeCmd = entity.QueryArgs.GetValueOrDefault(FBMClient.REQ_MAX_MESS_QUERY_ARG);
- //Parse recv buffer size
int recvBufSize = int.TryParse(recvBufCmd, out int rbs) ? rbs : CacheConfig.MinRecvBufferSize;
int maxHeadBufSize = int.TryParse(maxHeaderCharCmd, out int hbs) ? hbs : CacheConfig.MinHeaderBufferSize;
int maxMessageSize = int.TryParse(maxMessageSizeCmd, out int mxs) ? mxs : CacheConfig.MaxMessageSize;
@@ -253,9 +251,8 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
Peers.OnPeerConnected(state);
//Register plugin exit token to cancel the connected socket
- CancellationTokenRegistration reg = this.GetPlugin().UnloadToken.Register(wss.CancelAll);
-
- //Inc connected count
+ await using CancellationTokenRegistration reg = this.GetPlugin().UnloadToken.Register(wss.CancelAll);
+
Interlocked.Increment(ref _connectedClients);
try
@@ -280,7 +277,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
try
{
//Begin listening for messages with a queue
- await Store.ListenAsync(wss, queue, args);
+ await Listener.ListenAsync(wss, queue, args);
}
finally
{
@@ -291,7 +288,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
else
{
//Begin listening for messages without a queue
- await Store.ListenAsync(wss, null!, args);
+ await Listener.ListenAsync(wss, null!, args);
}
}
catch (OperationCanceledException)
@@ -303,15 +300,13 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
}
catch (Exception ex)
{
- Log.Debug(ex);
+ //If debug logging is enabled print a more detailed error message
+ Log.Error("An error occured on websocket connection: node {con} -> {error}", state.NodeId, ex.Message);
+ Log.Debug("Websocket connection error: node {con}\n{error}", state.NodeId, ex);
}
-
- //Dec connected count
+
Interlocked.Decrement(ref _connectedClients);
- //Unregister the token
- reg.Unregister();
-
//Notify monitor of disconnect
Peers.OnPeerDisconnected(state);