diff options
author | vnugent <public@vaughnnugent.com> | 2023-07-15 13:06:00 -0400 |
---|---|---|
committer | vnugent <public@vaughnnugent.com> | 2023-07-15 13:06:00 -0400 |
commit | 8b4fb26473256da5eaa89f3e9d2ac5d44f1e9b88 (patch) | |
tree | 6ff979b6110b9e6c61ff9f22bb0dbdd2094e08cf /plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs | |
parent | 2f674e79d42e7d36225fa9ac7ecefbc5bc62d325 (diff) |
Latest working draft
Diffstat (limited to 'plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs')
-rw-r--r-- | plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs | 56 |
1 files changed, 29 insertions, 27 deletions
diff --git a/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs b/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs index 5e794f8..d232fd8 100644 --- a/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs +++ b/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs @@ -45,13 +45,13 @@ using VNLib.Plugins.Extensions.Loading; using VNLib.Plugins.Essentials.Endpoints; using VNLib.Plugins.Essentials.Extensions; using VNLib.Plugins.Extensions.Loading.Routing; -using VNLib.Data.Caching.ObjectCache.Server.Distribution; using VNLib.Data.Caching.Extensions.Clustering; +using VNLib.Data.Caching.ObjectCache.Server.Cache; +using VNLib.Data.Caching.ObjectCache.Server.Clustering; namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints { - [ConfigurationName("connect_endpoint")] internal sealed class ConnectEndpoint : ResourceEndpointBase { private const string LOG_SCOPE_NAME = "CONEP"; @@ -62,10 +62,8 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints private readonly NodeConfig NodeConfiguration; private readonly ICacheEventQueueManager PubSubManager; private readonly IPeerMonitor Peers; - private readonly BlobCacheListener Store; - - private readonly bool VerifyIp; + private readonly string AudienceLocalServerId; private uint _connectedClients; @@ -88,21 +86,17 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints DisableSessionsRequired = true }; - public ConnectEndpoint(PluginBase plugin, IConfigScope config) + public ConnectEndpoint(PluginBase plugin) { - string? path = config["path"].GetString(); - - InitPathAndLog(path, plugin.Log.CreateScope(LOG_SCOPE_NAME)); + //Get node configuration + NodeConfiguration = plugin.GetOrCreateSingleton<NodeConfig>(); - //Check for ip-verification flag - VerifyIp = config.TryGetValue("verify_ip", out JsonElement vIpEl) && vIpEl.GetBoolean(); + //Init from config and create a new log scope + InitPathAndLog(NodeConfiguration.ConnectPath, plugin.Log.CreateScope(LOG_SCOPE_NAME)); //Setup pub/sub manager PubSubManager = plugin.GetOrCreateSingleton<CacheEventQueueManager>(); - //Get node configuration - NodeConfiguration = plugin.GetOrCreateSingleton<NodeConfig>(); - //Get peer monitor Peers = plugin.GetOrCreateSingleton<CachePeerMonitor>(); @@ -186,7 +180,14 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints } } - Log.Debug("Received negotiation request from node {node}", nodeId); + if (isPeer) + { + Log.Debug("Received negotiation request from peer node {node}", nodeId); + } + else + { + Log.Debug("Received negotiation request from client {client}", entity.TrustedRemoteIp.ToString()); + } //Verified, now we can create an auth message with a short expiration using JsonWebToken auth = new(); @@ -256,7 +257,8 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints //Verify audience, expiration - if (!doc.RootElement.TryGetProperty("aud", out JsonElement audEl) || !AudienceLocalServerId.Equals(audEl.GetString(), StringComparison.OrdinalIgnoreCase)) + if (!doc.RootElement.TryGetProperty("aud", out JsonElement audEl) + || !AudienceLocalServerId.Equals(audEl.GetString(), StringComparison.OrdinalIgnoreCase)) { entity.CloseResponse(HttpStatusCode.Unauthorized); return VfReturnType.VirtualSkip; @@ -270,7 +272,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints } //Check node ip address matches if required - if (VerifyIp) + if (NodeConfiguration.VerifyIp) { if (!doc.RootElement.TryGetProperty("ip", out JsonElement ipEl)) { @@ -318,6 +320,8 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints } } + WsUserState state; + try { //Get query config suggestions from the client @@ -340,7 +344,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints int maxMessageSizeClamp = Math.Clamp(maxMessageSize, CacheConfig.MinRecvBufferSize, CacheConfig.MaxRecvBufferSize); //Init new ws state object and clamp the suggested buffer sizes - WsUserState state = new() + state = new() { RecvBufferSize = Math.Clamp(recvBufSize, CacheConfig.MinRecvBufferSize, CacheConfig.MaxRecvBufferSize), MaxHeaderBufferSize = Math.Clamp(maxHeadBufSize, CacheConfig.MinHeaderBufferSize, CacheConfig.MaxHeaderBufferSize), @@ -356,20 +360,18 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints NodeId = nodeId, Advertisment = discoveryAd }; - - Log.Debug("Client recv buffer suggestion {recv}, header buffer size {head}, response buffer size {r}", recvBufCmd, maxHeaderCharCmd, state.MaxResponseBufferSize); - - //Print state message to console - Log.Verbose("Client buffer state {state}", state); - - //Accept socket and pass state object - entity.AcceptWebSocket(WebsocketAcceptedAsync, state); - return VfReturnType.VirtualSkip; } catch (KeyNotFoundException) { return VfReturnType.BadRequest; } + + //Print state message to console + Log.Debug("Client buffer state {state}", state); + + //Accept socket and pass state object + entity.AcceptWebSocket(WebsocketAcceptedAsync, state); + return VfReturnType.VirtualSkip; } private async Task WebsocketAcceptedAsync(WebSocketSession wss) |