aboutsummaryrefslogtreecommitdiff
path: root/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs
diff options
context:
space:
mode:
authorLibravatar vnugent <public@vaughnnugent.com>2023-07-15 13:06:00 -0400
committerLibravatar vnugent <public@vaughnnugent.com>2023-07-15 13:06:00 -0400
commit8b4fb26473256da5eaa89f3e9d2ac5d44f1e9b88 (patch)
tree6ff979b6110b9e6c61ff9f22bb0dbdd2094e08cf /plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs
parent2f674e79d42e7d36225fa9ac7ecefbc5bc62d325 (diff)
Latest working draft
Diffstat (limited to 'plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs')
-rw-r--r--plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs56
1 files changed, 29 insertions, 27 deletions
diff --git a/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs b/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs
index 5e794f8..d232fd8 100644
--- a/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs
+++ b/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs
@@ -45,13 +45,13 @@ using VNLib.Plugins.Extensions.Loading;
using VNLib.Plugins.Essentials.Endpoints;
using VNLib.Plugins.Essentials.Extensions;
using VNLib.Plugins.Extensions.Loading.Routing;
-using VNLib.Data.Caching.ObjectCache.Server.Distribution;
using VNLib.Data.Caching.Extensions.Clustering;
+using VNLib.Data.Caching.ObjectCache.Server.Cache;
+using VNLib.Data.Caching.ObjectCache.Server.Clustering;
namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
{
- [ConfigurationName("connect_endpoint")]
internal sealed class ConnectEndpoint : ResourceEndpointBase
{
private const string LOG_SCOPE_NAME = "CONEP";
@@ -62,10 +62,8 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
private readonly NodeConfig NodeConfiguration;
private readonly ICacheEventQueueManager PubSubManager;
private readonly IPeerMonitor Peers;
-
private readonly BlobCacheListener Store;
-
- private readonly bool VerifyIp;
+
private readonly string AudienceLocalServerId;
private uint _connectedClients;
@@ -88,21 +86,17 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
DisableSessionsRequired = true
};
- public ConnectEndpoint(PluginBase plugin, IConfigScope config)
+ public ConnectEndpoint(PluginBase plugin)
{
- string? path = config["path"].GetString();
-
- InitPathAndLog(path, plugin.Log.CreateScope(LOG_SCOPE_NAME));
+ //Get node configuration
+ NodeConfiguration = plugin.GetOrCreateSingleton<NodeConfig>();
- //Check for ip-verification flag
- VerifyIp = config.TryGetValue("verify_ip", out JsonElement vIpEl) && vIpEl.GetBoolean();
+ //Init from config and create a new log scope
+ InitPathAndLog(NodeConfiguration.ConnectPath, plugin.Log.CreateScope(LOG_SCOPE_NAME));
//Setup pub/sub manager
PubSubManager = plugin.GetOrCreateSingleton<CacheEventQueueManager>();
- //Get node configuration
- NodeConfiguration = plugin.GetOrCreateSingleton<NodeConfig>();
-
//Get peer monitor
Peers = plugin.GetOrCreateSingleton<CachePeerMonitor>();
@@ -186,7 +180,14 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
}
}
- Log.Debug("Received negotiation request from node {node}", nodeId);
+ if (isPeer)
+ {
+ Log.Debug("Received negotiation request from peer node {node}", nodeId);
+ }
+ else
+ {
+ Log.Debug("Received negotiation request from client {client}", entity.TrustedRemoteIp.ToString());
+ }
//Verified, now we can create an auth message with a short expiration
using JsonWebToken auth = new();
@@ -256,7 +257,8 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
//Verify audience, expiration
- if (!doc.RootElement.TryGetProperty("aud", out JsonElement audEl) || !AudienceLocalServerId.Equals(audEl.GetString(), StringComparison.OrdinalIgnoreCase))
+ if (!doc.RootElement.TryGetProperty("aud", out JsonElement audEl)
+ || !AudienceLocalServerId.Equals(audEl.GetString(), StringComparison.OrdinalIgnoreCase))
{
entity.CloseResponse(HttpStatusCode.Unauthorized);
return VfReturnType.VirtualSkip;
@@ -270,7 +272,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
}
//Check node ip address matches if required
- if (VerifyIp)
+ if (NodeConfiguration.VerifyIp)
{
if (!doc.RootElement.TryGetProperty("ip", out JsonElement ipEl))
{
@@ -318,6 +320,8 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
}
}
+ WsUserState state;
+
try
{
//Get query config suggestions from the client
@@ -340,7 +344,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
int maxMessageSizeClamp = Math.Clamp(maxMessageSize, CacheConfig.MinRecvBufferSize, CacheConfig.MaxRecvBufferSize);
//Init new ws state object and clamp the suggested buffer sizes
- WsUserState state = new()
+ state = new()
{
RecvBufferSize = Math.Clamp(recvBufSize, CacheConfig.MinRecvBufferSize, CacheConfig.MaxRecvBufferSize),
MaxHeaderBufferSize = Math.Clamp(maxHeadBufSize, CacheConfig.MinHeaderBufferSize, CacheConfig.MaxHeaderBufferSize),
@@ -356,20 +360,18 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
NodeId = nodeId,
Advertisment = discoveryAd
};
-
- Log.Debug("Client recv buffer suggestion {recv}, header buffer size {head}, response buffer size {r}", recvBufCmd, maxHeaderCharCmd, state.MaxResponseBufferSize);
-
- //Print state message to console
- Log.Verbose("Client buffer state {state}", state);
-
- //Accept socket and pass state object
- entity.AcceptWebSocket(WebsocketAcceptedAsync, state);
- return VfReturnType.VirtualSkip;
}
catch (KeyNotFoundException)
{
return VfReturnType.BadRequest;
}
+
+ //Print state message to console
+ Log.Debug("Client buffer state {state}", state);
+
+ //Accept socket and pass state object
+ entity.AcceptWebSocket(WebsocketAcceptedAsync, state);
+ return VfReturnType.VirtualSkip;
}
private async Task WebsocketAcceptedAsync(WebSocketSession wss)