diff options
Diffstat (limited to 'plugins/ObjectCacheServer/src/Endpoints')
4 files changed, 102 insertions, 102 deletions
diff --git a/plugins/ObjectCacheServer/src/Endpoints/CacheNegotationManager.cs b/plugins/ObjectCacheServer/src/Endpoints/CacheNegotationManager.cs index d1591f8..48f4448 100644 --- a/plugins/ObjectCacheServer/src/Endpoints/CacheNegotationManager.cs +++ b/plugins/ObjectCacheServer/src/Endpoints/CacheNegotationManager.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2023 Vaughn Nugent +* Copyright (c) 2024 Vaughn Nugent * * Library: VNLib * Package: ObjectCacheServer @@ -45,7 +45,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints public bool IsPeer { get; set; } } - internal sealed class CacheNegotationManager + internal sealed class CacheNegotationManager(PluginBase plugin) { /* * Cache keys are centralized and may be shared between all cache server nodes. This means @@ -64,21 +64,13 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints private static readonly TimeSpan AuthTokenExpiration = TimeSpan.FromSeconds(30); - private readonly string AudienceLocalServerId; - private readonly NodeConfig _nodeConfig; - private readonly CacheConfiguration _cacheConfig; + private readonly string AudienceLocalServerId = Guid.NewGuid().ToString("N"); - public CacheNegotationManager(PluginBase plugin) - { - //Get node configuration - _nodeConfig = plugin.GetOrCreateSingleton<NodeConfig>(); + private readonly ObjectCacheSystemState _sysState = plugin.GetOrCreateSingleton<ObjectCacheSystemState>(); - //Get the cache store configuration - _cacheConfig = plugin.GetConfigForType<CacheStore>().Deserialze<CacheConfiguration>(); + private NodeConfig NodeConfig => _sysState.Configuration; - AudienceLocalServerId = Guid.NewGuid().ToString("N"); - } - + private CacheMemoryConfiguration CacheConfig => _sysState.MemoryConfiguration; public bool IsClientNegotiationValid(string authToken, out ClientNegotiationState state) { @@ -88,12 +80,12 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints using JsonWebToken jwt = JsonWebToken.Parse(authToken); //verify signature for client - if (_nodeConfig.KeyStore.VerifyJwt(jwt, false)) + if (NodeConfig.KeyStore.VerifyJwt(jwt, false)) { //Validated as normal client } //May be signed by a cache server - else if (_nodeConfig.KeyStore.VerifyJwt(jwt, true)) + else if (NodeConfig.KeyStore.VerifyJwt(jwt, true)) { //Set peer and verified flag since the another cache server signed the request state.IsPeer = true; @@ -117,12 +109,12 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints return true; } - public JsonWebToken ConfirmCLientNegotiation(ClientNegotiationState state, IPAddress clientIp, DateTimeOffset now) + public JsonWebToken ConfirmClientNegotiation(ClientNegotiationState state, IPAddress clientIp, DateTimeOffset now) { //Verified, now we can create an auth message with a short expiration JsonWebToken auth = new(); - auth.WriteHeader(_nodeConfig.KeyStore.GetJwtHeader()); + auth.WriteHeader(NodeConfig.KeyStore.GetJwtHeader()); auth.InitPayloadClaim() .AddClaim("aud", AudienceLocalServerId) .AddClaim("iat", now.ToUnixTimeSeconds()) @@ -136,24 +128,29 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints //Set ip address .AddClaim("ip", clientIp.ToString()) //Add negotiaion args - .AddClaim(FBMClient.REQ_HEAD_BUF_QUERY_ARG, _cacheConfig.MaxHeaderBufferSize) - .AddClaim(FBMClient.REQ_RECV_BUF_QUERY_ARG, _cacheConfig.MaxRecvBufferSize) - .AddClaim(FBMClient.REQ_MAX_MESS_QUERY_ARG, _cacheConfig.MaxMessageSize) + .AddClaim(FBMClient.REQ_HEAD_BUF_QUERY_ARG, CacheConfig.MaxHeaderBufferSize) + .AddClaim(FBMClient.REQ_RECV_BUF_QUERY_ARG, CacheConfig.MaxRecvBufferSize) + .AddClaim(FBMClient.REQ_MAX_MESS_QUERY_ARG, CacheConfig.MaxMessageSize) .CommitClaims(); //Sign the auth message from our private key - _nodeConfig.KeyStore.SignJwt(auth); + NodeConfig.KeyStore.SignJwt(auth); return auth; } - public bool ValidateUpgrade(string upgradeToken, string tokenSignature, DateTimeOffset now, IPAddress connectionIp, ref string? nodeId, ref bool isPeer) + public bool ValidateUpgrade(string? upgradeToken, string? tokenSignature, DateTimeOffset now, IPAddress connectionIp, ref string? nodeId, ref bool isPeer) { + if(string.IsNullOrWhiteSpace(upgradeToken) || string.IsNullOrWhiteSpace(tokenSignature)) + { + return false; + } + //Parse jwt using JsonWebToken jwt = JsonWebToken.Parse(upgradeToken); //verify signature against the cache public key, since this server must have signed it - if (!_nodeConfig.KeyStore.VerifyCachePeer(jwt)) + if (!NodeConfig.KeyStore.VerifyCachePeer(jwt)) { return false; } @@ -175,7 +172,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints } //Check node ip address matches if required - if (_nodeConfig.VerifyIp) + if (NodeConfig.VerifyIp) { if (!doc.RootElement.TryGetProperty("ip", out JsonElement ipEl)) { @@ -201,7 +198,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints } //Verify token signature against a fellow cache public key - return _nodeConfig.KeyStore.VerifyUpgradeToken(tokenSignature, upgradeToken, isPeer); + return NodeConfig.KeyStore.VerifyUpgradeToken(tokenSignature, upgradeToken, isPeer); } } } diff --git a/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs b/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs index 816e6c3..d6b733c 100644 --- a/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs +++ b/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2023 Vaughn Nugent +* Copyright (c) 2024 Vaughn Nugent * * Library: VNLib * Package: ObjectCacheServer @@ -55,11 +55,14 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints { internal const string LOG_SCOPE_NAME = "CONEP"; - - private readonly ICacheEventQueueManager PubSubManager; - private readonly IPeerMonitor Peers; - private readonly BlobCacheListener<IPeerEventQueue> Store; - private readonly NodeConfig NodeConfiguration; + + private readonly ObjectCacheSystemState _sysState; + + private PeerEventQueueManager PubSubManager => _sysState.PeerEventQueue; + private CachePeerMonitor Peers => _sysState.PeerMonitor; + private BlobCacheListener<IPeerEventQueue> Listener => _sysState.Listener; + private NodeConfig NodeConfiguration => _sysState.Configuration; + private readonly CacheNegotationManager AuthManager; private uint _connectedClients; @@ -72,7 +75,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints /// <summary> /// The cache store configuration /// </summary> - public CacheConfiguration CacheConfig { get; } + public CacheMemoryConfiguration CacheConfig => _sysState.MemoryConfiguration; //Loosen up protection settings ///<inheritdoc/> @@ -83,24 +86,11 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints public ConnectEndpoint(PluginBase plugin) { - //Get node configuration - NodeConfiguration = plugin.GetOrCreateSingleton<NodeConfig>(); + _sysState = plugin.GetOrCreateSingleton<ObjectCacheSystemState>(); //Init from config and create a new log scope InitPathAndLog(NodeConfiguration.ConnectPath, plugin.Log.CreateScope(LOG_SCOPE_NAME)); - - //Setup pub/sub manager - PubSubManager = plugin.GetOrCreateSingleton<CacheEventQueueManager>(); - - //Get peer monitor - Peers = plugin.GetOrCreateSingleton<CachePeerMonitor>(); - - //Init the cache store - Store = plugin.GetOrCreateSingleton<CacheStore>().Listener; - - //Get the cache store configuration - CacheConfig = plugin.GetConfigForType<CacheStore>().Deserialze<CacheConfiguration>(); - + //Get the auth manager AuthManager = plugin.GetOrCreateSingleton<CacheNegotationManager>(); } @@ -127,6 +117,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints { //Parse jwt from authoriation string? jwtAuth = entity.Server.Headers[HttpRequestHeader.Authorization]; + if (string.IsNullOrWhiteSpace(jwtAuth)) { return VirtualClose(entity, HttpStatusCode.Forbidden); @@ -149,26 +140,34 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints } //Verified, now we can create an auth message with a short expiration - using JsonWebToken auth = AuthManager.ConfirmCLientNegotiation(state, entity.TrustedRemoteIp, entity.RequestedTimeUtc); + using JsonWebToken auth = AuthManager.ConfirmClientNegotiation(state, entity.TrustedRemoteIp, entity.RequestedTimeUtc); - //Close response + //Close response by sending a copy of the signed token entity.CloseResponse(HttpStatusCode.OK, ContentType.Text, auth.DataBuffer); return VfReturnType.VirtualSkip; } protected override VfReturnType WebsocketRequested(HttpEntity entity) { + /* + * Check to see if any more connections are allowed, + * otherwise deny the connection + * + * This is done here to prevent the server from being overloaded + * on a new connection. It would be ideal to not grant new tokens + * but malicious clients could cache a bunch of tokens and use them + * later, exhausting resources. + */ + if(_connectedClients >= NodeConfiguration.MaxConcurrentConnections) + { + return VirtualClose(entity, HttpStatusCode.ServiceUnavailable); + } + //Parse jwt from authorization string? jwtAuth = entity.Server.Headers[HttpRequestHeader.Authorization]; string? clientSignature = entity.Server.Headers[FBMDataCacheExtensions.X_UPGRADE_SIG_HEADER]; string? optionalDiscovery = entity.Server.Headers[FBMDataCacheExtensions.X_NODE_DISCOVERY_HEADER]; - //Not null - if (string.IsNullOrWhiteSpace(jwtAuth) || string.IsNullOrWhiteSpace(clientSignature)) - { - return VfReturnType.Forbidden; - } - string? nodeId = null; bool isPeer = false; @@ -178,15 +177,15 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints return VirtualClose(entity, HttpStatusCode.Unauthorized); } - CacheNodeAdvertisment? discoveryAd = null; - /* * If the client is a peer server, it may offer a signed advertisment * that this node will have the duty of making available to other peers * if it is valid */ - if (isPeer && !string.IsNullOrWhiteSpace(optionalDiscovery)) + CacheNodeAdvertisment? discoveryAd = null; + + if (isPeer) { discoveryAd = NodeConfiguration.KeyStore.VerifyPeerAdvertisment(optionalDiscovery); } @@ -196,11 +195,10 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints try { //Get query config suggestions from the client - string recvBufCmd = entity.QueryArgs[FBMClient.REQ_RECV_BUF_QUERY_ARG]; - string maxHeaderCharCmd = entity.QueryArgs[FBMClient.REQ_HEAD_BUF_QUERY_ARG]; - string maxMessageSizeCmd = entity.QueryArgs[FBMClient.REQ_MAX_MESS_QUERY_ARG]; + string? recvBufCmd = entity.QueryArgs.GetValueOrDefault(FBMClient.REQ_RECV_BUF_QUERY_ARG); + string? maxHeaderCharCmd = entity.QueryArgs.GetValueOrDefault(FBMClient.REQ_HEAD_BUF_QUERY_ARG); + string? maxMessageSizeCmd = entity.QueryArgs.GetValueOrDefault(FBMClient.REQ_MAX_MESS_QUERY_ARG); - //Parse recv buffer size int recvBufSize = int.TryParse(recvBufCmd, out int rbs) ? rbs : CacheConfig.MinRecvBufferSize; int maxHeadBufSize = int.TryParse(maxHeaderCharCmd, out int hbs) ? hbs : CacheConfig.MinHeaderBufferSize; int maxMessageSize = int.TryParse(maxMessageSizeCmd, out int mxs) ? mxs : CacheConfig.MaxMessageSize; @@ -253,9 +251,8 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints Peers.OnPeerConnected(state); //Register plugin exit token to cancel the connected socket - CancellationTokenRegistration reg = this.GetPlugin().UnloadToken.Register(wss.CancelAll); - - //Inc connected count + await using CancellationTokenRegistration reg = this.GetPlugin().UnloadToken.Register(wss.CancelAll); + Interlocked.Increment(ref _connectedClients); try @@ -280,7 +277,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints try { //Begin listening for messages with a queue - await Store.ListenAsync(wss, queue, args); + await Listener.ListenAsync(wss, queue, args); } finally { @@ -291,7 +288,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints else { //Begin listening for messages without a queue - await Store.ListenAsync(wss, null!, args); + await Listener.ListenAsync(wss, null!, args); } } catch (OperationCanceledException) @@ -303,15 +300,13 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints } catch (Exception ex) { - Log.Debug(ex); + //If debug logging is enabled print a more detailed error message + Log.Error("An error occured on websocket connection: node {con} -> {error}", state.NodeId, ex.Message); + Log.Debug("Websocket connection error: node {con}\n{error}", state.NodeId, ex); } - - //Dec connected count + Interlocked.Decrement(ref _connectedClients); - //Unregister the token - reg.Unregister(); - //Notify monitor of disconnect Peers.OnPeerDisconnected(state); diff --git a/plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs b/plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs index 7d376b8..56fe8cd 100644 --- a/plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs +++ b/plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2023 Vaughn Nugent +* Copyright (c) 2024 Vaughn Nugent * * Library: VNLib * Package: ObjectCacheServer @@ -40,25 +40,29 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints { internal sealed class PeerDiscoveryEndpoint : ResourceEndpointBase { - private readonly IPeerMonitor PeerMonitor; - private readonly NodeConfig Config; + private readonly ObjectCacheSystemState SysState; + + private CacheAuthKeyStore KeyStore => SysState.Configuration.KeyStore; + + private CachePeerMonitor PeerMonitor => SysState.PeerMonitor; + + private CacheNodeConfiguration NodeConfig => SysState.Configuration.Config; - //Loosen up protection settings ///<inheritdoc/> protected override ProtectionSettings EndpointProtectionSettings { get; } = new() { - DisableSessionsRequired = true + /* + * Sessions will not be used or required for this endpoint. + * We should also assume the session system is not even loaded + */ + DisableSessionsRequired = true }; public PeerDiscoveryEndpoint(PluginBase plugin) { - //Get the peer monitor - PeerMonitor = plugin.GetOrCreateSingleton<CachePeerMonitor>(); - - //Get the node config - Config = plugin.GetOrCreateSingleton<NodeConfig>(); + SysState = plugin.GetOrCreateSingleton<ObjectCacheSystemState>(); - InitPathAndLog(Config.DiscoveryPath, plugin.Log); + InitPathAndLog(SysState.Configuration.DiscoveryPath!, plugin.Log); } protected override VfReturnType Get(HttpEntity entity) @@ -68,36 +72,41 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints if(string.IsNullOrWhiteSpace(authToken)) { - entity.CloseResponse(HttpStatusCode.Unauthorized); - return VfReturnType.VirtualSkip; + return VirtualClose(entity, HttpStatusCode.Unauthorized); } string subject = string.Empty; string challenge = string.Empty; - //Parse auth token - using(JsonWebToken jwt = JsonWebToken.Parse(authToken)) + try { + //Parse auth token + using JsonWebToken jwt = JsonWebToken.Parse(authToken); + //try to verify against cache node first - if (!Config.KeyStore.VerifyJwt(jwt, true)) + if (!KeyStore.VerifyJwt(jwt, true)) { //failed... //try to verify against client key - if (!Config.KeyStore.VerifyJwt(jwt, false)) + if (!KeyStore.VerifyJwt(jwt, false)) { //invalid token - entity.CloseResponse(HttpStatusCode.Unauthorized); - return VfReturnType.VirtualSkip; + return VirtualClose(entity, HttpStatusCode.Unauthorized); } } using JsonDocument payload = jwt.GetPayload(); //Get client info to pass back - subject = payload.RootElement.TryGetProperty("sub", out JsonElement subEl) ? subEl.GetString() ?? string.Empty : string.Empty; + subject = payload.RootElement.TryGetProperty("sub", out JsonElement subEl) ? subEl.GetString() ?? string.Empty : string.Empty; challenge = payload.RootElement.GetProperty("chl").GetString() ?? string.Empty; } + catch (FormatException) + { + //If tokens are invalid format, let the client know instead of a server error + return VfReturnType.BadRequest; + } //Valid key, get peer list to send to client CacheNodeAdvertisment[] peers = PeerMonitor.GetAllPeers() @@ -109,10 +118,10 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints using JsonWebToken response = new(); //set header from cache config - response.WriteHeader(Config.KeyStore.GetJwtHeader()); + response.WriteHeader(KeyStore.GetJwtHeader()); response.InitPayloadClaim() - .AddClaim("iss", Config.Config.NodeId) + .AddClaim("iss", NodeConfig.NodeId) //Audience is the requestor id .AddClaim("sub", subject) .AddClaim("iat", entity.RequestedTimeUtc.ToUnixTimeSeconds()) @@ -122,10 +131,9 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints .AddClaim("chl", challenge) .CommitClaims(); - //Sign the response - Config.KeyStore.SignJwt(response); - - //Send response to client + + KeyStore.SignJwt(response); + entity.CloseResponse(HttpStatusCode.OK, Net.Http.ContentType.Text, response.DataBuffer); return VfReturnType.VirtualSkip; } diff --git a/plugins/ObjectCacheServer/src/Endpoints/WellKnownEndpoint.cs b/plugins/ObjectCacheServer/src/Endpoints/WellKnownEndpoint.cs index 87a471b..04380c5 100644 --- a/plugins/ObjectCacheServer/src/Endpoints/WellKnownEndpoint.cs +++ b/plugins/ObjectCacheServer/src/Endpoints/WellKnownEndpoint.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2023 Vaughn Nugent +* Copyright (c) 2024 Vaughn Nugent * * Library: VNLib * Package: ObjectCacheServer @@ -59,7 +59,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints public WellKnownEndpoint(PluginBase plugin) { //Get the node config - NodeConfig nodeConfig = plugin.GetOrCreateSingleton<NodeConfig>(); + NodeConfig nodeConfig = plugin.GetOrCreateSingleton<ObjectCacheSystemState>().Configuration; //serialize the config, discovery may not be enabled _advertisment = nodeConfig.Config.Advertisment; |