From e5bb0ee302e789cb96e7ecfe839cbbcc8e3fd5d7 Mon Sep 17 00:00:00 2001 From: vnugent Date: Sun, 10 Mar 2024 16:46:50 -0400 Subject: Squashed commit of the following: commit 2f7565976472f0f056db60520bf253a776112c10 Merge: 323ff67 6b87785 Author: vnugent Date: Sun Mar 10 16:45:23 2024 -0400 merge master commit 323ff67badfc46ad638d75f059d60d9425ccb2fa Author: vnugent Date: Sun Mar 10 15:50:07 2024 -0400 ci(server): Conainerize and add vncache server packages commit 5d4192880654fd6e00e587814169415b42621327 Author: vnugent Date: Sat Mar 9 19:13:21 2024 -0500 chore: #2 Minor fixes and polish before release commit a4b3504bb891829074d1efde0433eae010862181 Author: vnugent Date: Sat Mar 9 16:30:44 2024 -0500 package updates commit 4d8cfc10382105b0acbd94df93ad3d05ff91db54 Author: vnugent Date: Wed Mar 6 21:30:58 2024 -0500 refactor: #2 Centralize server state, default discovery endpoints & more commit 016a96a80cce025a86c6cf26707738f6a2eb2658 Author: vnugent Date: Thu Feb 29 21:22:38 2024 -0500 feat: add future support for memory diagnostics, and some docs commit 456ead9bc8b0f61357bae93152ad0403c4940101 Author: vnugent Date: Tue Feb 13 14:46:35 2024 -0500 fix: #1 shared cluster index on linux & latested core updates commit a481d63f964a5d5204cac2e95141f37f9a28d573 Author: vnugent Date: Tue Jan 23 15:43:50 2024 -0500 cache extension api tweaks --- plugins/ObjectCacheServer/Taskfile.yaml | 88 ++++++ .../ObjectCacheServer/server/config/config.json | 104 +++++++ .../ObjectCacheServer/server/container/Dockerfile | 82 +++++ .../server/container/Taskfile.yaml | 80 +++++ .../ObjectCacheServer-template.json | 54 ++++ .../config-templates/config-template.json | 105 +++++++ .../server/container/docker-compose.yaml | 45 +++ plugins/ObjectCacheServer/server/container/run.sh | 15 + plugins/ObjectCacheServer/server/install.ps1 | 26 ++ .../ObjectCacheServer/server/install.taskfile.yaml | 20 ++ plugins/ObjectCacheServer/server/taskfile.yaml | 193 ++++++++++++ .../src/BucketLocalManagerFactory.cs | 53 +++- .../src/Cache/CacheConfiguration.cs | 53 ---- .../src/Cache/CacheEventQueueManager.cs | 264 ---------------- .../src/Cache/CacheListenerPubQueue.cs | 31 +- .../src/Cache/CacheMemoryConfiguration.cs | 58 ++++ plugins/ObjectCacheServer/src/Cache/CacheStore.cs | 131 -------- .../ObjectCacheServer/src/Cache/CacheSystemUtil.cs | 8 +- .../src/Cache/PeerEventQueueManager.cs | 248 +++++++++++++++ plugins/ObjectCacheServer/src/CacheAuthKeyStore.cs | 14 +- plugins/ObjectCacheServer/src/CacheConstants.cs | 107 +++++++ .../src/Clustering/CacheNodeReplicationMaanger.cs | 85 +++--- .../src/Clustering/CachePeerMonitor.cs | 11 +- .../src/Clustering/PeerDiscoveryManager.cs | 130 +++----- .../src/Endpoints/CacheNegotationManager.cs | 53 ++-- .../src/Endpoints/ConnectEndpoint.cs | 94 +++--- .../src/Endpoints/PeerDiscoveryEndpoint.cs | 58 ++-- .../src/Endpoints/WellKnownEndpoint.cs | 10 +- plugins/ObjectCacheServer/src/NodeConfig.cs | 191 ------------ .../src/ObjectCacheServerEntry.cs | 71 ++--- .../src/ObjectCacheSystemState.cs | 331 +++++++++++++++++++++ .../ObjectCacheServer/src/ServerClusterConfig.cs | 152 ++++++++++ 32 files changed, 1991 insertions(+), 974 deletions(-) create mode 100644 plugins/ObjectCacheServer/Taskfile.yaml create mode 100644 plugins/ObjectCacheServer/server/config/config.json create mode 100644 plugins/ObjectCacheServer/server/container/Dockerfile create mode 100644 plugins/ObjectCacheServer/server/container/Taskfile.yaml create mode 100644 plugins/ObjectCacheServer/server/container/config-templates/ObjectCacheServer-template.json create mode 100644 plugins/ObjectCacheServer/server/container/config-templates/config-template.json create mode 100644 plugins/ObjectCacheServer/server/container/docker-compose.yaml create mode 100644 plugins/ObjectCacheServer/server/container/run.sh create mode 100644 plugins/ObjectCacheServer/server/install.ps1 create mode 100644 plugins/ObjectCacheServer/server/install.taskfile.yaml create mode 100644 plugins/ObjectCacheServer/server/taskfile.yaml delete mode 100644 plugins/ObjectCacheServer/src/Cache/CacheConfiguration.cs delete mode 100644 plugins/ObjectCacheServer/src/Cache/CacheEventQueueManager.cs create mode 100644 plugins/ObjectCacheServer/src/Cache/CacheMemoryConfiguration.cs delete mode 100644 plugins/ObjectCacheServer/src/Cache/CacheStore.cs create mode 100644 plugins/ObjectCacheServer/src/Cache/PeerEventQueueManager.cs create mode 100644 plugins/ObjectCacheServer/src/CacheConstants.cs delete mode 100644 plugins/ObjectCacheServer/src/NodeConfig.cs create mode 100644 plugins/ObjectCacheServer/src/ObjectCacheSystemState.cs create mode 100644 plugins/ObjectCacheServer/src/ServerClusterConfig.cs (limited to 'plugins/ObjectCacheServer') diff --git a/plugins/ObjectCacheServer/Taskfile.yaml b/plugins/ObjectCacheServer/Taskfile.yaml new file mode 100644 index 0000000..a339359 --- /dev/null +++ b/plugins/ObjectCacheServer/Taskfile.yaml @@ -0,0 +1,88 @@ +# https://taskfile.dev + +#Called by the vnbuild system to produce builds for my website +#https://www.vaughnnugent.com/resources/software + +#This taskfile is called from the root of a project that is being built +#and the purpose of this taskfile is to package up the output of a build +#from the solution file, and package it up into a tgz files for distribution + +version: '3' + +vars: + TARGET: '{{.USER_WORKING_DIR}}/bin' + RELEASE_DIR: "./bin/release/{{.TARGET_FRAMEWORK}}/publish" + SOURCE_OUT: "{{.USER_WORKING_DIR}}/bin/source" + +includes: + ci: + taskfile: server/taskfile.yaml + dir: server/ #must execute from the server directory + optional: true + +tasks: + + #called by ci to build the output + build: + cmds: + - task: ci:build + + #when build succeeds, archive the output into a tgz + postbuild_success: + dir: '{{.USER_WORKING_DIR}}' + cmds: + #pack up source code + - task: packsource + + #run post in debug mode + - task: postbuild + vars: { BUILD_MODE: debug } + + #remove uncessary files from the release dir + - powershell -Command "Get-ChildItem -Recurse '{{.RELEASE_DIR}}/' -Include *.pdb,*.xml | Remove-Item" + + #run post in release mode + - task: postbuild + vars: { BUILD_MODE: release } + + - task: ci:postbuild_success + + + postbuild_failed: + dir: '{{.USER_WORKING_DIR}}' + cmds: + - echo "postbuild failed {{.PROJECT_NAME}}" + + + postbuild: + dir: '{{.USER_WORKING_DIR}}' + internal: true + vars: + #the build output directory + BUILD_OUT: "{{.USER_WORKING_DIR}}/bin/{{.BUILD_MODE}}/{{.TARGET_FRAMEWORK}}/publish" + + cmds: + + #copy license and readme to target + - cd .. && powershell -Command "Copy-Item -Path ./build.readme.md -Destination '{{.BUILD_OUT}}/readme.md'" + + #tar outputs + - cd "{{.BUILD_OUT}}" && tar -czf "{{.TARGET}}/{{.BUILD_MODE}}.tgz" . + + packsource: + dir: '{{.USER_WORKING_DIR}}' + internal: true + cmds: + #copy source code to target + - powershell -Command "Get-ChildItem -Include *.cs,*.csproj -Recurse | Where { \$_.FullName -notlike '*\obj\*' -and \$_.FullName -notlike '*\bin\*' } | Resolve-Path -Relative | tar --files-from - -czf '{{.TARGET}}/src.tgz'" + + +#Remove the output dirs on clean + clean: + dir: '{{.USER_WORKING_DIR}}' + cmds: + - for: [ 'bin/', 'obj/' ] + cmd: powershell Remove-Item -Recurse '{{.ITEM}}' + ignore_error: true + + - task: ci:clean diff --git a/plugins/ObjectCacheServer/server/config/config.json b/plugins/ObjectCacheServer/server/config/config.json new file mode 100644 index 0000000..1f8a382 --- /dev/null +++ b/plugins/ObjectCacheServer/server/config/config.json @@ -0,0 +1,104 @@ +{ + + //Host application config, config is loaded as a read-only DOM that is available + //to the host and loaded child plugins, all elements are available to plugins via the 'HostConfig' property + + "http": { + //The defaut HTTP version to being requests with (does not support http/2 yet) + "default_version": "HTTP/1.1", + //The maxium size (in bytes) of response messges that will be compressed + "compression_limit": 10000, + //Minium response size (in bytes) to compress + "compression_minimum": 2048, + //The size of the buffer to use when parsing multipart/form data uploads + "multipart_max_buf_size": 1024, + //The maxium ammount of data (in bytes) allows for mulitpart/form data file uploads + "multipart_max_size": 0, + //Absolute maximum size (in bytes) of the request entity body (exludes headers) + "max_entity_size": 10240, + //Keepalive ms for HTTP1.1 keepalive connections + "keepalive_ms": 100000, + //The buffer size to use when parsing headers (also the maxium request header size allowed) + "header_buf_size": 8128, + //The maxium number of headers allowed in an HTTP request message + "max_request_header_count": 50, + //The maxium number of allowed network connections, before 503s will be issued automatically and connections closed + "max_connections": 5000, + //The size in bytes of the buffer to use when writing response messages + "response_buf_size": 4096, + //time (in ms) to wait for a response from an active connection in recv mode, before dropping it + "recv_timeout_ms": 5000, + //Time in ms to wait for the client to accept transport data before terminating the connection + "send_timeout_ms": 60000, + //The size (in bytes) of the buffer used to store all response header data + "response_header_buf_size": 16384, + //Max number of file uploads allowed per request + "max_uploads_per_request": 1 + }, + + //Maxium ammount of time a request is allowed to be processed (includes loading or waiting for sessions) before operations will be cancelled and a 503 returned + "max_execution_time_ms": 20000, + + "virtual_hosts": [ + { + "interface": { + "address": "0.0.0.0", + "port": 2557 + }, + + //Collection of "trusted" servers to allow proxy header support from + "downstream_servers": [], + + //The hostname to listen for, "*" as wildcard, and "[system]" as the default hostname for the current machine + "hostname": "*", + "path": "root/", //Point to some place we can read nothing from + + "deny_extensions": [ ], + "default_files": [ ], + "error_files": [], + "cache_default_sec": 864000, + + "DISABLED ssl": {} + } + ], + + + //Defines the directory where plugin's are to be loaded from + "plugins": { + //Hot-reload creates collectable assemblies that allow full re-load support in the host application, should only be used for development purposes! + "hot_reload": false, + "path": "plugins/", + "config_dir": "config/", + "assets": "plugins/assets/" + }, + + "sys_log": { + "path": "data/logs/sys-log.txt", + "flush_sec": 5, + "retained_files": 31, + "file_size_limit": 10485760, + "interval": "infinite" + }, + + "app_log": { + "path": "data/logs/app-log.txt", + "flush_sec": 5, + "retained_files": 31, + "file_size_limit": 10485760, + "interval": "infinite" + }, + + //HASHICORP VAULT + "hashicorp_vault": { + "url": "", + "token": "", + "trust_cert": false + }, + + "secrets": { + //Special key used by the loading library for access to the PasswordHashing library to pepper password hashes + "cache_private_key": "", + "client_public_key": "" + } +} + diff --git a/plugins/ObjectCacheServer/server/container/Dockerfile b/plugins/ObjectCacheServer/server/container/Dockerfile new file mode 100644 index 0000000..6c466d4 --- /dev/null +++ b/plugins/ObjectCacheServer/server/container/Dockerfile @@ -0,0 +1,82 @@ +#Copyright (c) Vaughn Nugent +#Licensed under the GNU AGPL V3.0 + +#use plain alpine latest to build native libraries in +FROM alpine:3.19 as native-cont + +#install public libs and build tools +RUN apk update && apk add build-base cmake npm +#most universal way to use Task is from NPM +RUN npm install -g @go-task/cli + +WORKDIR /build + +#include local artifacts +COPY app/ . + +#build internal libraries and copy the libraries to the /lib output directory +RUN mkdir out/ +RUN task build-libs + +#APP CONTAINER +#move into a clean dotnet apline lean image +FROM mcr.microsoft.com/dotnet/runtime:8.0.2-alpine3.19-amd64 as app-cont + +LABEL name="vnuge/vncache" +LABEL maintainer="Vaughn Nugent " +LABEL description="A simple clustered network data caching service" + +#copy local artifacts again in run container +COPY app/ /app + +#pull compiled libs from build container +COPY --from=native-cont /build/out /app/lib + +RUN apk update && apk add --no-cache gettext icu-libs dumb-init + +#workdir +WORKDIR /app + +#default to 2557 for cache port +EXPOSE 2557/tcp + +VOLUME /app/ssl +#expose an assets directory for custom assets install +VOLUME /app/usr/assets + +#disable dotnet invariant culture on alpine +ENV DOTNET_SYSTEM_GLOBALIZATION_INVARIANT=0 + +#add helper/required libraries +#ENV VNLIB_SHARED_HEAP_FILE_PATH=/app/lib/libvn_rpmalloc.so not ready yet, still need to debug + +#cache varables +ENV MAX_ENTRIES=10000 +ENV CACHE_BUCKETS=100 +ENV CACHE_MAX_MESSAGE=20480 +ENV MAX_CONCURRENT_CONNECTIONS=1000 + +ENV VERIFY_IP=true +ENV MAX_PEER_NODES=10 +ENV DISCOVERY_INTERVAL=360 +ENV CACHE_CONNECT_PATH="/cache" +ENV DISCOVER_PATH="/discover" +ENV KNOWN_PEERS=[] + +#HC Vault +ENV HC_VAULT_ADDR="" +ENV HC_VAULT_TOKEN="" +ENV HC_VAULT_TRUST_CERT=false + +#SECRETS +ENV CACHE_PRIV_KEY="" +ENV CLIENT_PUB_KEY="" + +#HTTP/PROXY Config +ENV HTTP_DOWNSTREAM_SERVERS=[] +ENV HTTP_MAX_CONNS=5000 + +#run the init script within dumb-init +ENTRYPOINT ["dumb-init", "--"] +CMD ["ash", "./run.sh"] + diff --git a/plugins/ObjectCacheServer/server/container/Taskfile.yaml b/plugins/ObjectCacheServer/server/container/Taskfile.yaml new file mode 100644 index 0000000..10ee86b --- /dev/null +++ b/plugins/ObjectCacheServer/server/container/Taskfile.yaml @@ -0,0 +1,80 @@ +# https://taskfile.dev + +#inlcuded by the ci main taskfile to produce containerized builds, and also +#be included by the container itself to run build tasks inside the container + +version: "3" + +vars: + INCLUDE_FILES: "Dockerfile, docker-compose.yaml" + +includes: + install: + taskfile: ../install.taskfile.yaml + optional: true #not needed for inside container build + +tasks: + #called from inside the container to build native libraries + build-libs: + vars: + OUT_DIR: "{{.USER_WORKING_DIR}}/out" + + #build stage generates the following libraries + generates: + - "{{.USER_WORKING_DIR}}/out/libvn_rpmalloc.so" + + cmds: + #build rpmalloc library + - cd lib/vnlib_rpmalloc/ && task && cp build/libvn_rpmalloc.so {{.OUT_DIR}}/libvn_rpmalloc.so + + #called from ci pipline to build the package + build: + cmds: + # clean up the run.sh script to remove windows line endings in my wsl default instance + - cmd: wsl dos2unix ./run.sh + platforms: [ windows/amd64 ] + + #init build image + - task: setup-container-image + + #remove the default config file as it's not needed in the container + - powershell -Command "rm -Force -Recurse build/app/config/" + + #install rpmalloc + - task: install-rpmalloc-lib + + postbuild_success: + cmds: + #tar up the build directory and move it to the output bin directory + - cmd: cd build/ && tar -czf '{{ .BINARY_DIR }}/{{.PACKAGE_FILE_NAME}}' . + #clean up all the build files after build succeeds + - task: clean + + clean: + ignore_error: true + cmds: + - cmd: powershell -Command "rm -Recurse -Force ./build" + + install-rpmalloc-lib: + internal: true + cmds: + #install compressor plugin + - task: install:install + vars: + PROJECT_NAME: 'vnlib_rpmalloc' + MODULE_NAME: "VNLib.Core" + FILE_NAME: "src.tgz" + DIR: './build/app/lib/vnlib_rpmalloc' + + setup-container-image: + internal: true + cmds: + #make build directory + - powershell -Command "mkdir build, build/app, build/app/config-templates/, build/app/static/ -Force" + #copy the existing linux-x64 build to the build folder, this will be the container base + - powershell -Command "cp -Recurse -Force ../build/linux-x64/* build/app/" + #copy local scripts and config data into the build folder + - powershell -Command "cp -Force run.sh, Taskfile.yaml build/app/" + - powershell -Command "cp -Force Dockerfile, docker-compose.yaml build/" + - powershell -Command "cp -Force static/* build/app/static/" + - powershell -Command "cp -Force config-templates/* build/app/config-templates/" diff --git a/plugins/ObjectCacheServer/server/container/config-templates/ObjectCacheServer-template.json b/plugins/ObjectCacheServer/server/container/config-templates/ObjectCacheServer-template.json new file mode 100644 index 0000000..765c3d7 --- /dev/null +++ b/plugins/ObjectCacheServer/server/container/config-templates/ObjectCacheServer-template.json @@ -0,0 +1,54 @@ +{ + "debug": false, + + //enables cache server cluster node data + "cluster": { + //Delay to re-discover peers + "discovery_interval_sec": ${DISCOVERY_INTERVAL}, + + //The maxium number of peers to connect to + "max_peers": ${MAX_PEER_NODES}, + + //Max ev queue depth before LRU eviction + "max_queue_depth": 10000, + + //Time between queue purge + "queue_purge_interval_sec": 360000, + + //Forces strict ip address verification on upgrades (best to leave on) + "verify_ip": ${VERIFY_IP}, + + //The cache websocket endpoint path + "connect_path": "${CACHE_CONNECT_PATH}", + + //Optional to allow nodes to discover nodes we adverties + "discovery_path": "${DISCOVER_PATH}", + + //Optionally change the well-known path (clients must know this) + "well_known_path": null, + + //The maxium number of connections to this node + "max_concurrent_connections": ${MAX_CONCURRENT_CONNECTIONS} + }, + + //Cache configuration object, FBM protocol variables + "cache": { + + //Max number of cache entires to be stored + "max_cache": ${MAX_ENTRIES}, + + //the number of cache buckets to distribute load + "buckets": ${CACHE_BUCKETS}, + + //FBM buffer config + "buffer_recv_max": ${CACHE_MAX_MESSAGE}, //Up to 100Kb transfer buffer + "buffer_recv_min": 8192, //min of 8k transfer buffer + "buffer_header_max": 2048, //2k max header buffer size + "buffer_header_min": 128, //128 byte min request header buffer size + "max_message_size": ${CACHE_MAX_MESSAGE} //Absolute maxium message size allowed, also the maxium size of cache entires + }, + + //Known peers array, must point to well-known endpoint for discovery + "known_peers": ${KNOWN_PEERS} + +} \ No newline at end of file diff --git a/plugins/ObjectCacheServer/server/container/config-templates/config-template.json b/plugins/ObjectCacheServer/server/container/config-templates/config-template.json new file mode 100644 index 0000000..6362432 --- /dev/null +++ b/plugins/ObjectCacheServer/server/container/config-templates/config-template.json @@ -0,0 +1,105 @@ +{ + + //Host application config, config is loaded as a read-only DOM that is available + //to the host and loaded child plugins, all elements are available to plugins via the 'HostConfig' property + + "http": { + //The defaut HTTP version to being requests with (does not support http/2 yet) + "default_version": "HTTP/1.1", + //The maxium size (in bytes) of response messges that will be compressed + "compression_limit": 10000, + //Minium response size (in bytes) to compress + "compression_minimum": 2048, + //The size of the buffer to use when parsing multipart/form data uploads + "multipart_max_buf_size": 1024, + //The maxium ammount of data (in bytes) allows for mulitpart/form data file uploads + "multipart_max_size": 0, + //Absolute maximum size (in bytes) of the request entity body (exludes headers) + "max_entity_size": 10240, + //Keepalive ms for HTTP1.1 keepalive connections + "keepalive_ms": 100000, + //The buffer size to use when parsing headers (also the maxium request header size allowed) + "header_buf_size": 8128, + //The maxium number of headers allowed in an HTTP request message + "max_request_header_count": 50, + //The maxium number of allowed network connections, before 503s will be issued automatically and connections closed + "max_connections": ${HTTP_MAX_CONNS}, + //The size in bytes of the buffer to use when writing response messages + "response_buf_size": 4096, + //time (in ms) to wait for a response from an active connection in recv mode, before dropping it + "recv_timeout_ms": 5000, + //Time in ms to wait for the client to accept transport data before terminating the connection + "send_timeout_ms": 60000, + //The size (in bytes) of the buffer used to store all response header data + "response_header_buf_size": 16384, + //Max number of file uploads allowed per request + "max_uploads_per_request": 1 + }, + + //Maxium ammount of time a request is allowed to be processed (includes loading or waiting for sessions) before operations will be cancelled and a 503 returned + "max_execution_time_ms": 20000, + + "virtual_hosts": [ + { + "interface": { + "address": "0.0.0.0", + "port": 2557 + }, + + //Collection of "trusted" servers to allow proxy header support from + "downstream_servers": ${HTTP_DOWNSTREAM_SERVERS}, + + //The hostname to listen for, "*" as wildcard, and "[system]" as the default hostname for the current machine + "hostname": "*", + "path": "root/", + + "deny_extensions": [ ], + "default_files": [ ], + "error_files": [], + "cache_default_sec": 864000, + + //Disabled until well-tested + //"ssl": ${SSL_JSON} + } + ], + + + //Defines the directory where plugin's are to be loaded from + "plugins": { + //Hot-reload creates collectable assemblies that allow full re-load support in the host application, should only be used for development purposes! + "hot_reload": false, + "path": "plugins/", + "config_dir": "config/", + "assets": "plugins/assets/" + }, + + "sys_log": { + "path": "data/logs/sys-log.txt", + "flush_sec": 5, + "retained_files": 31, + "file_size_limit": 10485760, + "interval": "infinite" + }, + + "app_log": { + "path": "data/logs/app-log.txt", + "flush_sec": 5, + "retained_files": 31, + "file_size_limit": 10485760, + "interval": "infinite" + }, + + //HASHICORP VAULT + "hashicorp_vault": { + "url": "${HC_VAULT_ADDR}", + "token": "${HC_VAULT_TOKEN}", + "trust_cert": ${HC_VAULT_TRUST_CERT} + }, + + "secrets": { + //Special key used by the loading library for access to the PasswordHashing library to pepper password hashes + "cache_private_key": "${CACHE_PRIV_KEY}", + "client_public_key": "${CLIENT_PUB_KEY}" + } +} + diff --git a/plugins/ObjectCacheServer/server/container/docker-compose.yaml b/plugins/ObjectCacheServer/server/container/docker-compose.yaml new file mode 100644 index 0000000..c1b61fa --- /dev/null +++ b/plugins/ObjectCacheServer/server/container/docker-compose.yaml @@ -0,0 +1,45 @@ +#Copyright (c) Vaughn Nugent +#Licensed under the GNU AGPLv3 + +version: '3.6' + +services: + vncache: + image: vnuge/vncache + container_name: vncache + restart: unless-stopped + hostname: vncache-server + volumes: + - ./assets:/app/usr/assets:ro #optional if assets are required + - ./ssl:/app/ssl:ro #optional only if SSL is enabled (currently not a feature) + ports: + - 2557:2557 + environment: + #System memory consumption is calculated as follows: + # MAX_ENTIRES x CACHE_BUCKETS x CACHE_MAX_MESSAGE = max memory consumption + + MAX_CONCURRENT_CONNECTIONS: "1000" #max number of concurrent connections + MAX_ENTRIES: "10000" #max number of cache entries per bucket + CACHE_BUCKETS: "100" #number of cache buckets for load balancing + CACHE_MAX_MESSAGE: "20480" #20KB + VERIFY_IP: "true" #verfies the IP address of clients during negotiation (recommended) + MAX_PEER_NODES: "10" #max number of other peer nodes this node shoud connect to + DISCOVERY_INTERVAL: "360" #time (in seconds) between peer node discovery + KNOWN_PEERS: '[]' #array of known peer nodes in the cluster + + #SECRETS (must be JWK formatted keys) + CACHE_PRIV_KEY: "" #REQUIRED local private key used to identify and sign messages to clients and other nodes + CLIENT_PUB_KEY: "" #REQUIRED used to verify client messages + + #HC vault + #HC_VAULT_ADDR: "" + #HC_VAULT_TOKEN: "" + #HC_VAULT_TRUST_CERT: "false" + + #HTTP + #HTTP_DOWNSTREAM_SERVERS: '[]' + #SSL_JSON: '{"cert": "ssl/cert.pem", "privkey":"ssl/priv.pem"}' + HTTP_MAX_CONNS: "5000" + + SERVER_ARGS: "--input-off" + diff --git a/plugins/ObjectCacheServer/server/container/run.sh b/plugins/ObjectCacheServer/server/container/run.sh new file mode 100644 index 0000000..2c2636c --- /dev/null +++ b/plugins/ObjectCacheServer/server/container/run.sh @@ -0,0 +1,15 @@ +#! /bin/sh + +#this script will be invoked by dumb-init in the container on statup and is located at /app + +rm -rf config && mkdir config + +#substitude all -template files in the config-templates dir and write them to the config dir +for file in config-templates/*-template.json; do + envsubst < $file > config/$(basename $file -template.json).json +done + +cp usr/assets/* plugins/assets/ -rf + +#start the server +dotnet webserver/VNLib.WebServer.dll --config config/config.json $SERVER_ARGS \ No newline at end of file diff --git a/plugins/ObjectCacheServer/server/install.ps1 b/plugins/ObjectCacheServer/server/install.ps1 new file mode 100644 index 0000000..4c42c18 --- /dev/null +++ b/plugins/ObjectCacheServer/server/install.ps1 @@ -0,0 +1,26 @@ +param([String] $BaseUrl, [String] $ModuleName, [String] $ProjectName, [String]$FileName) + +#get the latest file +Invoke-WebRequest "$BaseUrl/$ModuleName/@latest" -OutFile latest.txt +#read the file into a variable +$latest = Get-Content latest.txt + +#download the latest version +Invoke-WebRequest "$BaseUrl/$ModuleName/$latest/$ProjectName/$FileName" -OutFile $FileName + +#download latest sha256 +Invoke-WebRequest "$BaseUrl/$ModuleName/$latest/$ProjectName/$FileName.sha256" -OutFile "$FileName.sha256" + +#verify the file +$hash = (Get-FileHash $FileName -Algorithm SHA256).Hash + +#read the sha256 file +$sha256 = Get-Content "$FileName.sha256" + +#compare the hashes +if ($hash -eq $sha256) { + Write-Host "Hashes match, file is valid" -ForegroundColor Blue +} else { + throw "Hashes do not match, file is invalid" +} + diff --git a/plugins/ObjectCacheServer/server/install.taskfile.yaml b/plugins/ObjectCacheServer/server/install.taskfile.yaml new file mode 100644 index 0000000..37baf12 --- /dev/null +++ b/plugins/ObjectCacheServer/server/install.taskfile.yaml @@ -0,0 +1,20 @@ +# https://taskfile.dev + +#Called by the vnbuild system to produce builds for my website +#https://www.vaughnnugent.com/resources/software + +version: "3" + +tasks: + + install: + internal: true + cmds: + #make the plugin directory + - cmd: powershell -Command "mkdir {{.DIR}} -Force" + ignore_error: true + - cmd: powershell -Command "pwd" + - cd {{.DIR}} && powershell "{{.SCRIPT_DIR}}/install.ps1" -BaseUrl {{.BUILDS_URL}} -ModuleName {{.MODULE_NAME}} -ProjectName {{.PROJECT_NAME}} -FileName {{.FILE_NAME}} + - cd {{.DIR}} && tar -xzf {{.FILE_NAME}} + #remove the archive file + - cd {{.DIR}} && powershell -Command "rm {{.FILE_NAME}}" \ No newline at end of file diff --git a/plugins/ObjectCacheServer/server/taskfile.yaml b/plugins/ObjectCacheServer/server/taskfile.yaml new file mode 100644 index 0000000..38eae79 --- /dev/null +++ b/plugins/ObjectCacheServer/server/taskfile.yaml @@ -0,0 +1,193 @@ +# https://taskfile.dev + +#Inlcuded taskfile for object cache server that is used to produce +#ci builds for standalone caching servers + +version: "3" + +vars: + BUILDS_URL: https://www.vaughnnugent.com/public/resources/software/builds + SCRIPT_DIR: '{{.TASKFILE_DIR}}' + BINARY_DIR: '{{.PROJECT_DIR}}/bin' #binary dir is not available for dotnet plugis + +includes: + install: + taskfile: install.taskfile.yaml + optional: true + + container: + dir: container #always run from the container directory + taskfile: container/Taskfile.yaml + optional: true + vars: + BUILDS_URL: '{{.BUILDS_URL}}' + PACKAGE_FILE_NAME: "vncache-alpine3.19-oci.tgz" #the name of the output package file + +tasks: +# CLIENT-SIDE TASKS + default: + desc: "Runs the VNCache server" + cmds: + - task: run + + run: + desc: "Runs the VNCache server" + silent: true + env: + #server should detect the file extension and load the correct library + VNLIB_SHARED_HEAP_FILE_PATH: lib/libvn_rpmalloc + + cmds: + - cmd: dotnet webserver/VNLib.WebServer.dll --config config/config.json --input-off --inline-scheduler {{.ARGS}} + #setup sever environment + + + setup-debian: + desc: "Performs initial setup on Debian x64 based machines" + silent: true + cmds: + - apt update + - apt install -y dotnet-runtime-8.0 gcc cmake + - task: setup + - echo "Setup complete" + + setup-fedora: + desc: "Performs initial setup on Fedora/Redhat x64 (dnf) based machines" + silent: true + cmds: + - dnf update + - dnf install -y dotnet-runtime-8.0 gcc cmake + - task: setup + - echo "Setup complete" + + setup-alpine: + desc: "Performs initial setup on Alpine x64 based machines" + silent: true + cmds: + - apk update + - apk add --no-cache dotnet8-runtime gcc cmake + - task: setup + - echo "Setup complete" + + setup: + cmds: + #build rpmalloc lib + - task: build-rpmalloc + + build-rpmalloc: + internal: true + dir: 'lib/' + vars: + RPMALLOC_DIR: 'vnlib_rpmalloc' + cmds: + #build rpmalloc library + - cmd: cd vnlib_rpmalloc/ && task + + - cmd: cp vnlib_rpmalloc/build/libvn_rpmalloc.so libvn_rpmalloc.so + platforms: [ linux ] + + - cmd: cp vnlib_rpmalloc/build/libvn_rpmalloc.dylib libvn_rpmalloc.dylib + platforms: [ darwin ] + + - cmd: powershell -Command "cp vnlib_rpmalloc/build/Release/vnlib_rpmalloc.dll libvn_rpmalloc.dll" + platforms: [ windows/amd64 ] + +# CI BUILD TASKS + build: + desc: "CI ONLY! DO NOT RUN" + cmds: + - task: install-plugins + - task: install-webserver + + #run container build last + - task: container:build + + install-webserver: + internal: true + cmds: + - for: [ win-x64, linux-x64, osx-x64, linux-arm64 ] + task: create-env + vars: + TARGET_OS: '{{.ITEM}}' + + install-plugins: + internal: true + cmds: + - cmd: powershell -Command "mkdir lib -Force" + ignore_error: true + + #copy the object-cache plugin output to the local plugins directory + - cmd: powershell -Command "cp -Recurse -Force {{.PROJECT_DIR}}/bin/Release/net8.0/publish/ plugins/{{.PROJECT_NAME}}/" + + #download rpmalloc + - task: install:install + vars: + PROJECT_NAME: 'vnlib_rpmalloc' + MODULE_NAME: "VNLib.Core" + FILE_NAME: "src.tgz" + DIR: './lib/vnlib_rpmalloc' + + postbuild_success: + desc: "CI ONLY! DO NOT RUN" + cmds: + - for: [ win-x64, linux-x64, osx-x64, linux-arm64 ] + task: pack + vars: + TARGET_OS: '{{.ITEM}}' + + #cleanup unnecessary build files that clog up the pipeline + - for: [ build, plugins, lib ] + cmd: powershell -Command "rm -Recurse '{{.ITEM}}'" + ignore_error: true + + - task: container:postbuild_success + + build-container: + internal: true + cmds: + - task: container:build + + #Creates a new webserver build environment for an operating system configuration + create-env: + internal: true + vars: + BUILD_DIR: './build/{{.TARGET_OS}}' + cmds: + #create dir for env + - cmd: powershell -Command "mkdir {{.BUILD_DIR}} -Force" + ignore_error: true + + #copy build files + - for: [ plugins, lib, config, taskfile.yaml ] + cmd: powershell -Command "cp -Recurse -Force {{.ITEM}} {{.BUILD_DIR}}" + + - task: get-webserver + vars: + TARGET_OS: '{{.TARGET_OS}}' + BUILD_DIR: '{{.BUILD_DIR}}' + + #fetches a copy of (the desired os version) VNLib.WebServer project and installs it into the build directory + get-webserver: + internal: true + cmds: + - task: install:install + vars: + PROJECT_NAME: 'VNLib.Webserver' + MODULE_NAME: "VNLib.Webserver" + FILE_NAME: "{{.TARGET_OS}}-release.tgz" + DIR: '{{.BUILD_DIR}}/webserver' + + pack: + internal: true + cmds: + - cd build/{{.TARGET_OS}} && tar -czf '{{ .BINARY_DIR }}/{{ .TARGET_OS }}-release.tgz' . + + clean: + desc: "CI ONLY! DO NOT RUN" + ignore_error: true + cmds: + - for: [ build/, bin/, plugins/, lib/] + cmd: powershell -Command "rm -Recurse -Force '{{.ITEM}}'" + + - task: container:clean + diff --git a/plugins/ObjectCacheServer/src/BucketLocalManagerFactory.cs b/plugins/ObjectCacheServer/src/BucketLocalManagerFactory.cs index 40f4c29..6f733ed 100644 --- a/plugins/ObjectCacheServer/src/BucketLocalManagerFactory.cs +++ b/plugins/ObjectCacheServer/src/BucketLocalManagerFactory.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2023 Vaughn Nugent +* Copyright (c) 2024 Vaughn Nugent * * Library: VNLib * Package: VNLib.Plugins.Extensions.VNCache @@ -24,6 +24,7 @@ using System; +using System.Linq; using System.Buffers; using System.Text.Json; using System.Collections.Generic; @@ -32,9 +33,10 @@ using System.Runtime.CompilerServices; using VNLib.Plugins; using VNLib.Utils; using VNLib.Utils.Memory; +using VNLib.Utils.Memory.Diagnostics; +using VNLib.Utils.Logging; using VNLib.Utils.Extensions; using VNLib.Plugins.Extensions.Loading; - /* * How bucket local memory works: * @@ -53,6 +55,8 @@ namespace VNLib.Data.Caching.ObjectCache.Server { private readonly LinkedList _managers = new (); private readonly bool _zeroAll; + private readonly bool _enableHeapTracking; + private readonly ILogProvider _statsLogger; /// public ICacheEntryMemoryManager CreateForBucket(uint bucketId) @@ -60,6 +64,13 @@ namespace VNLib.Data.Caching.ObjectCache.Server //Init a new heap for the individual bucket IUnmangedHeap localHeap = MemoryUtil.InitializeNewHeapForProcess(); + if (_enableHeapTracking) + { +#pragma warning disable CA2000 // Dispose objects before losing scope + localHeap = new TrackedHeapWrapper(localHeap, true); +#pragma warning restore CA2000 // Dispose objects before losing scope + } + BucketLocalManager manager = new (localHeap, bucketId, _zeroAll); _managers.AddLast(manager); @@ -74,11 +85,13 @@ namespace VNLib.Data.Caching.ObjectCache.Server if (config != null) { //Try to get the zero all flag - if (config.TryGetValue("zero_all", out JsonElement zeroEl)) - { - _zeroAll = zeroEl.GetBoolean(); - } + _zeroAll = config.TryGetValue("zero_all", out JsonElement zeroEl) && zeroEl.GetBoolean(); + + //Get the heap tracking flag + _enableHeapTracking = config.TryGetValue("diag_mem", out JsonElement trackEl) && trackEl.GetBoolean(); } + + _statsLogger = plugin.Log.CreateScope("Cache MemStats"); } protected override void Free() @@ -90,6 +103,24 @@ namespace VNLib.Data.Caching.ObjectCache.Server } } + public void LogHeapStats() + { + //If tracking is not enabled, the heap instances stored by the managers will not be tracked, and the cast in the code below will fail + if (!_enableHeapTracking) + { + return; + } + + string[] statsPerHeap = _managers.Select(hm => + { + HeapStatistics stats = (hm.Heap as TrackedHeapWrapper)!.GetCurrentStats(); + return $"\tBucket {hm.BucketId}: Current {stats.AllocatedBytes / 1024}kB, Blocks {stats.AllocatedBlocks}, Max size {stats.MaxHeapSize / 1024}kB"; + + }).ToArray(); + + _statsLogger.Debug("Memory statistics for cache memory manager: {hm}\n{stats}", GetHashCode(), statsPerHeap); + } + /* * Buckets are mutually exclusive, so we can use a single heap for each bucket * to get a little more performance on memory operations @@ -104,7 +135,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server /// public void FreeHandle(object handle) { - _ = handle ?? throw new ArgumentNullException(nameof(handle)); + ArgumentNullException.ThrowIfNull(handle); MemoryHandle _handle = Unsafe.As>(ref handle); //Free the handle @@ -114,7 +145,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server /// public uint GetHandleSize(object handle) { - _ = handle ?? throw new ArgumentNullException(nameof(handle)); + ArgumentNullException.ThrowIfNull(handle); MemoryHandle _handle = Unsafe.As>(ref handle); return (uint)_handle.Length; @@ -123,7 +154,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server /// public Span GetSpan(object handle, uint offset, uint length) { - _ = handle ?? throw new ArgumentNullException(nameof(handle)); + ArgumentNullException.ThrowIfNull(handle); MemoryHandle _handle = Unsafe.As>(ref handle); return _handle.GetOffsetSpan(offset, checked((int)length)); @@ -132,7 +163,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server /// public MemoryHandle PinHandle(object handle, int offset) { - _ = handle ?? throw new ArgumentNullException(nameof(handle)); + ArgumentNullException.ThrowIfNull(handle); MemoryHandle _handle = Unsafe.As>(ref handle); //Pin the handle @@ -142,7 +173,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server /// public void ResizeHandle(object handle, uint newSize) { - _ = handle ?? throw new ArgumentNullException(nameof(handle)); + ArgumentNullException.ThrowIfNull(handle); MemoryHandle _handle = Unsafe.As>(ref handle); //Resize the handle diff --git a/plugins/ObjectCacheServer/src/Cache/CacheConfiguration.cs b/plugins/ObjectCacheServer/src/Cache/CacheConfiguration.cs deleted file mode 100644 index bd15d24..0000000 --- a/plugins/ObjectCacheServer/src/Cache/CacheConfiguration.cs +++ /dev/null @@ -1,53 +0,0 @@ -/* -* Copyright (c) 2023 Vaughn Nugent -* -* Library: VNLib -* Package: ObjectCacheServer -* File: CacheConfiguration.cs -* -* CacheConfiguration.cs is part of ObjectCacheServer which is part of the larger -* VNLib collection of libraries and utilities. -* -* ObjectCacheServer is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License as -* published by the Free Software Foundation, either version 3 of the -* License, or (at your option) any later version. -* -* ObjectCacheServer is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see https://www.gnu.org/licenses/. -*/ - -using System.Text.Json.Serialization; - -namespace VNLib.Data.Caching.ObjectCache.Server.Cache -{ - internal sealed class CacheConfiguration - { - [JsonPropertyName("buffer_recv_max")] - public int MaxRecvBufferSize { get; set; } = 1000 * 1024; - [JsonPropertyName("buffer_recv_min")] - public int MinRecvBufferSize { get; set; } = 8 * 1024; - - - [JsonPropertyName("buffer_header_max")] - public int MaxHeaderBufferSize { get; set; } = 2 * 1024; - [JsonPropertyName("buffer_header_min")] - public int MinHeaderBufferSize { get; set; } = 128; - - - [JsonPropertyName("max_message_size")] - public int MaxMessageSize { get; set; } = 1000 * 1024; - - - [JsonPropertyName("max_cache")] - public uint MaxCacheEntries { get; set; } = 10000; - - [JsonPropertyName("buckets")] - public uint BucketCount { get; set; } = 10; - } -} diff --git a/plugins/ObjectCacheServer/src/Cache/CacheEventQueueManager.cs b/plugins/ObjectCacheServer/src/Cache/CacheEventQueueManager.cs deleted file mode 100644 index e3c613d..0000000 --- a/plugins/ObjectCacheServer/src/Cache/CacheEventQueueManager.cs +++ /dev/null @@ -1,264 +0,0 @@ -/* -* Copyright (c) 2023 Vaughn Nugent -* -* Library: VNLib -* Package: ObjectCacheServer -* File: CacheEventQueueManager.cs -* -* CacheEventQueueManager.cs is part of ObjectCacheServer which is -* part of the larger VNLib collection of libraries and utilities. -* -* ObjectCacheServer is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License as -* published by the Free Software Foundation, either version 3 of the -* License, or (at your option) any later version. -* -* ObjectCacheServer is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see https://www.gnu.org/licenses/. -*/ - -using System; -using System.Linq; -using System.Threading; -using System.Threading.Tasks; -using System.Threading.Channels; -using System.Collections.Generic; - -using VNLib.Plugins; -using VNLib.Utils.Async; -using VNLib.Utils.Logging; -using VNLib.Plugins.Extensions.Loading; -using VNLib.Plugins.Extensions.Loading.Events; - - -namespace VNLib.Data.Caching.ObjectCache.Server.Cache -{ - internal sealed class CacheEventQueueManager : ICacheEventQueueManager, IDisposable, IIntervalScheduleable - { - private readonly int MaxQueueDepth; - - private readonly object SubLock; - private readonly LinkedList Subscribers; - - private readonly object StoreLock; - private readonly Dictionary QueueStore; - - - public CacheEventQueueManager(PluginBase plugin) - { - //Get node config - NodeConfig config = plugin.GetOrCreateSingleton(); - - //Get max queue depth - MaxQueueDepth = config.MaxQueueDepth; - - /* - * Schedule purge interval to clean up stale queues - */ - plugin.ScheduleInterval(this, config.EventQueuePurgeInterval); - - SubLock = new(); - Subscribers = new(); - - StoreLock = new(); - QueueStore = new(StringComparer.OrdinalIgnoreCase); - } - - /// - public IPeerEventQueue Subscribe(ICachePeer peer) - { - NodeQueue? nq; - - bool isNew = false; - - //Enter sync lock - lock (StoreLock) - { - //Try to recover the queue for the node - if (!QueueStore.TryGetValue(peer.NodeId, out nq)) - { - //Create new queue - nq = new(peer.NodeId, MaxQueueDepth); - QueueStore.Add(peer.NodeId, nq); - isNew = true; - } - - //Increment listener count - nq.Listeners++; - } - - //Publish new peer to subscribers list - if (isNew) - { - lock (SubLock) - { - //Add peer to subscribers list - Subscribers.AddLast(nq); - } - } - - //Return the node's queue - return nq; - } - - /// - public void Unsubscribe(ICachePeer peer) - { - //Detach a listener for a node - lock (StoreLock) - { - //Get the queue and decrement the listener count - NodeQueue nq = QueueStore[peer.NodeId]; - nq.Listeners--; - } - } - - /// - public void PublishSingle(ChangeEvent change) - { - //Wait to enter the sub lock - lock (SubLock) - { - //Loop through ll the fast way - LinkedListNode? q = Subscribers.First; - - while (q != null) - { - //Pub single event node - q.Value.PublishChange(change); - - //Get next queue - q = q.Next; - } - } - } - - /// - public void PublishMultiple(Span changes) - { - //Wait to enter the sub lock - lock (SubLock) - { - //Loop through ll the fast way - LinkedListNode? q = Subscribers.First; - - while (q != null) - { - //Publish multiple - q.Value.PublishChanges(changes); - - //Get next queue - q = q.Next; - } - } - } - - /// - public void PurgeStaleSubscribers() - { - //Enter locks - lock (SubLock) - { - lock (StoreLock) - { - //Get all stale queues (queues without listeners) - NodeQueue[] staleQueues = QueueStore.Values.Where(static nq => nq.Listeners == 0).ToArray(); - - foreach (NodeQueue nq in staleQueues) - { - //Remove from store - QueueStore.Remove(nq.NodeId); - - //remove from subscribers - Subscribers.Remove(nq); - } - } - } - } - - //Interval to purge stale subscribers - Task IIntervalScheduleable.OnIntervalAsync(ILogProvider log, CancellationToken cancellationToken) - { - log.Debug("Purging stale peer event queues"); - - PurgeStaleSubscribers(); - - return Task.CompletedTask; - } - - void IDisposable.Dispose() - { - QueueStore.Clear(); - Subscribers.Clear(); - } - - /* - * Holds queues for each node and keeps track of the number of listeners - * attached to the queue - */ - - private sealed class NodeQueue : IPeerEventQueue - { - public int Listeners; - - public string NodeId { get; } - - public AsyncQueue Queue { get; } - - public NodeQueue(string nodeId, int maxDepth) - { - NodeId = nodeId; - - /* - * Create a bounded channel that acts as a lru and evicts - * the oldest item when the queue is full - * - * There will also only ever be a single thread writing events - * to the queue - */ - - BoundedChannelOptions queueOptions = new(maxDepth) - { - AllowSynchronousContinuations = true, - SingleReader = false, - SingleWriter = true, - //Drop oldest item in queue if full - FullMode = BoundedChannelFullMode.DropOldest, - }; - - //Init queue/channel - Queue = new(queueOptions); - } - - public void PublishChange(ChangeEvent change) - { - Queue.TryEnque(change); - } - - public void PublishChanges(Span changes) - { - for (int i = 0; i < changes.Length; i++) - { - Queue.TryEnque(changes[i]); - } - } - - /// - public ValueTask DequeueAsync(CancellationToken cancellation) - { - return Queue.DequeueAsync(cancellation); - } - - /// - public bool TryDequeue(out ChangeEvent change) - { - return Queue.TryDequeue(out change); - } - } - } -} diff --git a/plugins/ObjectCacheServer/src/Cache/CacheListenerPubQueue.cs b/plugins/ObjectCacheServer/src/Cache/CacheListenerPubQueue.cs index 6942828..aef0255 100644 --- a/plugins/ObjectCacheServer/src/Cache/CacheListenerPubQueue.cs +++ b/plugins/ObjectCacheServer/src/Cache/CacheListenerPubQueue.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2023 Vaughn Nugent +* Copyright (c) 2024 Vaughn Nugent * * Library: VNLib * Package: ObjectCacheServer @@ -45,39 +45,33 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache internal sealed class CacheListenerPubQueue : ICacheListenerEventQueue, IAsyncBackgroundWork { - private const int MAX_LOCAL_QUEUE_ITEMS = 10000; - private const string LOG_SCOPE_NAME = "QUEUE"; - private readonly AsyncQueue _listenerQueue; private readonly ILogProvider _logProvider; - private readonly ICacheEventQueueManager _queueManager; + private readonly PeerEventQueueManager _queueManager; - public CacheListenerPubQueue(PluginBase plugin) + public CacheListenerPubQueue(PluginBase plugin, PeerEventQueueManager queueMan) { - _queueManager = plugin.GetOrCreateSingleton(); - _logProvider = plugin.Log.CreateScope(LOG_SCOPE_NAME); + _queueManager = queueMan; + _logProvider = plugin.Log.CreateScope(CacheConstants.LogScopes.CacheListenerPubQueue); //Init local queue to store published events - _listenerQueue = new(new BoundedChannelOptions(MAX_LOCAL_QUEUE_ITEMS) + _listenerQueue = new(new BoundedChannelOptions(CacheConstants.CacheListenerChangeQueueSize) { AllowSynchronousContinuations = true, FullMode = BoundedChannelFullMode.DropOldest, - SingleReader = true, + SingleReader = true, //Always a singe thread reading events SingleWriter = false, }); } /// - async Task IAsyncBackgroundWork.DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken) + async Task IAsyncBackgroundWork.DoWorkAsync(ILogProvider _, CancellationToken exitToken) { const int accumulatorSize = 64; - //Create scope - pluginLog = pluginLog.CreateScope(LOG_SCOPE_NAME); - try { - pluginLog.Debug("Change queue worker listening for local cache changes"); + _logProvider.Debug("Change queue worker listening for local cache changes"); //Accumulator for events ChangeEvent[] accumulator = new ChangeEvent[accumulatorSize]; @@ -105,15 +99,12 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache catch (OperationCanceledException) { //Normal exit - pluginLog.Debug("Change queue listener worker exited"); + _logProvider.Debug("Change queue listener worker exited"); } } /// - public bool IsEnabled([NotNullWhen(true)] IPeerEventQueue? userState) - { - return userState is IPeerEventQueue; - } + public bool IsEnabled([NotNullWhen(true)] IPeerEventQueue? userState) => userState is not null; /// public void PublishEvent(ChangeEvent changeEvent) diff --git a/plugins/ObjectCacheServer/src/Cache/CacheMemoryConfiguration.cs b/plugins/ObjectCacheServer/src/Cache/CacheMemoryConfiguration.cs new file mode 100644 index 0000000..c404cc5 --- /dev/null +++ b/plugins/ObjectCacheServer/src/Cache/CacheMemoryConfiguration.cs @@ -0,0 +1,58 @@ +/* +* Copyright (c) 2024 Vaughn Nugent +* +* Library: VNLib +* Package: ObjectCacheServer +* File: CacheMemoryConfiguration.cs +* +* CacheMemoryConfiguration.cs is part of ObjectCacheServer which +* is part of the larger VNLib collection of libraries and utilities. +* +* ObjectCacheServer is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* ObjectCacheServer is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System.Text.Json.Serialization; + +namespace VNLib.Data.Caching.ObjectCache.Server.Cache +{ + internal sealed class CacheMemoryConfiguration + { + [JsonPropertyName("buffer_recv_max")] + public int MaxRecvBufferSize { get; set; } = 1000 * 1024; + [JsonPropertyName("buffer_recv_min")] + public int MinRecvBufferSize { get; set; } = 8 * 1024; + + + [JsonPropertyName("buffer_header_max")] + public int MaxHeaderBufferSize { get; set; } = 2 * 1024; + + [JsonPropertyName("buffer_header_min")] + public int MinHeaderBufferSize { get; set; } = 128; + + + [JsonPropertyName("max_message_size")] + public int MaxMessageSize { get; set; } = 1000 * 1024; + + + [JsonPropertyName("max_cache")] + public uint MaxCacheEntries { get; set; } = 10000; + + [JsonPropertyName("buckets")] + public uint BucketCount { get; set; } = 10; + + + [JsonPropertyName("memory_lib_path")] + public string? ExternLibPath { get; set; } + } +} diff --git a/plugins/ObjectCacheServer/src/Cache/CacheStore.cs b/plugins/ObjectCacheServer/src/Cache/CacheStore.cs deleted file mode 100644 index 75abe37..0000000 --- a/plugins/ObjectCacheServer/src/Cache/CacheStore.cs +++ /dev/null @@ -1,131 +0,0 @@ -/* -* Copyright (c) 2024 Vaughn Nugent -* -* Library: VNLib -* Package: ObjectCacheServer -* File: CacheStore.cs -* -* CacheStore.cs is part of ObjectCacheServer which is part of the larger -* VNLib collection of libraries and utilities. -* -* ObjectCacheServer is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License as -* published by the Free Software Foundation, either version 3 of the -* License, or (at your option) any later version. -* -* ObjectCacheServer is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see https://www.gnu.org/licenses/. -*/ - -using System; -using System.Threading; -using System.Threading.Tasks; - -using VNLib.Utils.Logging; -using VNLib.Net.Messaging.FBM; -using VNLib.Plugins; -using VNLib.Plugins.Extensions.Loading; - -namespace VNLib.Data.Caching.ObjectCache.Server.Cache -{ - /* - * Implements the blob cache store, which is an abstraction around the blob cache listener. - * This allows for publishing local events (say from other nodes) to keep caches in sync. - */ - - [ConfigurationName("cache")] - internal sealed class CacheStore : ICacheStore, IDisposable - { - /// - /// Gets the underlying cache listener - /// - public BlobCacheListener Listener { get; } - - - public CacheStore(PluginBase plugin, IConfigScope config) - { - //Init cache - Listener = InitializeCache((ObjectCacheServerEntry)plugin, config); - } - - /// - ValueTask ICacheStore.AddOrUpdateBlobAsync(string objectId, string? alternateId, ObjectDataGet bodyData, T state, CancellationToken token) - { - return Listener.Cache.AddOrUpdateObjectAsync(objectId, alternateId, bodyData, state, default, token); - } - - /// - void ICacheStore.Clear() - { - throw new NotImplementedException(); - } - - /// - ValueTask ICacheStore.DeleteItemAsync(string id, CancellationToken token) - { - return Listener.Cache.DeleteObjectAsync(id, token); - } - - private static BlobCacheListener InitializeCache(ObjectCacheServerEntry plugin, IConfigScope config) - { - const string CacheConfigTemplate = -@" -Cache Configuration: - Max memory: {max} Mb - Buckets: {bc} - Entries per-bucket: {mc} -"; - - //Deserialize the cache config - CacheConfiguration cacheConf = config.Deserialze(); - - if (cacheConf.MaxCacheEntries < 2) - { - throw new ArgumentException("You must configure a 'max_cache' size larger than 1 item"); - } - - //Suggestion - if (cacheConf.MaxCacheEntries < 200) - { - plugin.Log.Information("Suggestion: You may want a larger cache size, you have less than 200 items in cache"); - } - - //calculate the max memory usage - ulong maxByteSize = cacheConf.MaxCacheEntries * (ulong)cacheConf.BucketCount * (ulong)cacheConf.MaxMessageSize; - - //Log the cache config - plugin.Log.Information(CacheConfigTemplate, - maxByteSize / (1024 * 1000), - cacheConf.BucketCount, - cacheConf.MaxCacheEntries - ); - - //Get the event listener - ICacheListenerEventQueue queue = plugin.GetOrCreateSingleton(); - - //Get the memory manager - ICacheMemoryManagerFactory manager = plugin.GetOrCreateSingleton(); - - //Load the blob cache table system - IBlobCacheTable bc = plugin.LoadMemoryCacheSystem(config, manager, cacheConf); - - FallbackFBMMemoryManager fbmMemManager = new(plugin.ListenerHeap); - - //Endpoint only allows for a single reader - return new(bc, queue, plugin.Log, fbmMemManager); - } - - /* - * Cleaned up by the plugin on exit - */ - public void Dispose() - { - Listener.Dispose(); - } - } -} diff --git a/plugins/ObjectCacheServer/src/Cache/CacheSystemUtil.cs b/plugins/ObjectCacheServer/src/Cache/CacheSystemUtil.cs index b7bf83f..8f196b0 100644 --- a/plugins/ObjectCacheServer/src/Cache/CacheSystemUtil.cs +++ b/plugins/ObjectCacheServer/src/Cache/CacheSystemUtil.cs @@ -29,6 +29,7 @@ using System.Text.Json; using VNLib.Utils.Resources; using VNLib.Plugins; using VNLib.Plugins.Extensions.Loading; +using VNLib.Utils.Extensions; namespace VNLib.Data.Caching.ObjectCache.Server.Cache { @@ -49,7 +50,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache /// The cache configuration object /// The loaded implementation /// - public static IBlobCacheTable LoadMemoryCacheSystem(this PluginBase plugin, IConfigScope config, ICacheMemoryManagerFactory heap, CacheConfiguration cacheConf) + public static IBlobCacheTable LoadMemoryCacheSystem(this PluginBase plugin, IConfigScope config, ICacheMemoryManagerFactory heap, CacheMemoryConfiguration cacheConf) { #pragma warning disable CA2000 // Dispose objects before losing scope @@ -94,10 +95,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache if(initMethod != null) { //Itterate all buckets - foreach (IBlobCacheBucket bucket in table) - { - initMethod.Invoke(bucket.Id); - } + table.ForEach(bucket => initMethod(bucket.Id)); } } diff --git a/plugins/ObjectCacheServer/src/Cache/PeerEventQueueManager.cs b/plugins/ObjectCacheServer/src/Cache/PeerEventQueueManager.cs new file mode 100644 index 0000000..4b76a9b --- /dev/null +++ b/plugins/ObjectCacheServer/src/Cache/PeerEventQueueManager.cs @@ -0,0 +1,248 @@ +/* +* Copyright (c) 2024 Vaughn Nugent +* +* Library: VNLib +* Package: ObjectCacheServer +* File: PeerEventQueueManager.cs +* +* PeerEventQueueManager.cs is part of ObjectCacheServer which is +* part of the larger VNLib collection of libraries and utilities. +* +* ObjectCacheServer is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* ObjectCacheServer is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using System.Threading.Channels; +using System.Collections.Generic; + +using VNLib.Plugins; +using VNLib.Utils.Async; +using VNLib.Utils.Logging; +using VNLib.Plugins.Extensions.Loading; +using VNLib.Plugins.Extensions.Loading.Events; + + +namespace VNLib.Data.Caching.ObjectCache.Server.Cache +{ + internal sealed class PeerEventQueueManager : ICacheEventQueueManager, IIntervalScheduleable + { + private readonly int MaxQueueDepth; + + private readonly object SubLock = new(); + private readonly LinkedList Subscribers = []; + + private readonly object StoreLock = new(); + private readonly Dictionary QueueStore = new(StringComparer.OrdinalIgnoreCase); + + public PeerEventQueueManager(PluginBase plugin, ServerClusterConfig config) + { + MaxQueueDepth = config.MaxQueueDepth; + + /* + * Schedule purge interval to clean up stale queues + */ + plugin.ScheduleInterval(this, config.EventQueuePurgeInterval); + + //Cleanup disposeables on unload + _ = plugin.RegisterForUnload(() => + { + QueueStore.Clear(); + Subscribers.Clear(); + }); + } + + /// + public IPeerEventQueue Subscribe(ICachePeer peer) + { + PeerEventListenerQueue? nq; + + bool isNew = false; + + //Enter sync lock + lock (StoreLock) + { + //Try to recover the queue for the node + if (!QueueStore.TryGetValue(peer.NodeId, out nq)) + { + //Create new queue since an existing queue was not found + nq = new(peer.NodeId, MaxQueueDepth); + QueueStore.Add(peer.NodeId, nq); + isNew = true; + } + + //Increment listener count since a new listener has attached + nq.Listeners++; + } + + //Publish new peer to subscribers list + if (isNew) + { + lock (SubLock) + { + //Add peer to subscribers list + Subscribers.AddLast(nq); + } + } + + //Return the node's queue + return nq; + } + + /// + public void Unsubscribe(ICachePeer peer) + { + /* + * The reason I am not purging queues that no longer have listeners + * now is because it is possible that a listener needed to detach because of + * a network issue and will be reconnecting shortly. If the node doesnt + * come back before the next purge interval, it's events will be purged. + * + * Point is: there is a reason for the garbage collection style purging + */ + + //Detach a listener for a node + lock (StoreLock) + { + //Get the queue and decrement the listener count + PeerEventListenerQueue nq = QueueStore[peer.NodeId]; + nq.Listeners--; + } + } + + /// + public void PublishSingle(ChangeEvent change) + { + //Wait to enter the sub lock + lock (SubLock) + { + //Loop through ll the fast way + LinkedListNode? q = Subscribers.First; + + while (q != null) + { + //Pub single event node + q.Value.PublishChange(change); + + //Get next queue + q = q.Next; + } + } + } + + /// + public void PublishMultiple(Span changes) + { + //Wait to enter the sub lock + lock (SubLock) + { + //Loop through ll the fast way + LinkedListNode? q = Subscribers.First; + + while (q != null) + { + //Publish multiple + q.Value.PublishChanges(changes); + + //Get next queue + q = q.Next; + } + } + } + + /// + public void PurgeStaleSubscribers() + { + //Enter locks + lock (SubLock) + { + lock (StoreLock) + { + //Get all stale queues (queues without listeners) + PeerEventListenerQueue[] staleQueues = QueueStore.Values.Where(static nq => nq.Listeners == 0).ToArray(); + + foreach (PeerEventListenerQueue nq in staleQueues) + { + //Remove from store + QueueStore.Remove(nq.NodeId); + + //remove from subscribers + Subscribers.Remove(nq); + } + } + } + } + + //Interval to purge stale subscribers + Task IIntervalScheduleable.OnIntervalAsync(ILogProvider log, CancellationToken cancellationToken) + { + log.Debug("Purging stale peer event queues"); + + PurgeStaleSubscribers(); + + return Task.CompletedTask; + } + + + /* + * Holds queues for each node and keeps track of the number of listeners + * attached to the queue + * + * The role of this class is to store change events for a given peer node, + * and return them when the peer requests them. It also keeps track of the + * number of active listeners (server connections) to the queue. + */ + + private sealed class PeerEventListenerQueue(string nodeId, int maxDepth) : IPeerEventQueue + { + public int Listeners; + + public string NodeId => nodeId; + + /* + * Create a bounded channel that acts as a lru and evicts + * the oldest item when the queue is full + * + * There will also only ever be a single thread writing events + * to the queue + */ + private readonly AsyncQueue Queue = new(new BoundedChannelOptions(maxDepth) + { + AllowSynchronousContinuations = true, + SingleReader = false, + SingleWriter = true, + //Drop oldest item in queue if full + FullMode = BoundedChannelFullMode.DropOldest, + }); + + public void PublishChange(ChangeEvent change) => Queue.TryEnque(change); + + public void PublishChanges(Span changes) + { + for (int i = 0; i < changes.Length; i++) + { + Queue.TryEnque(changes[i]); + } + } + + /// + public ValueTask DequeueAsync(CancellationToken cancellation) => Queue.DequeueAsync(cancellation); + + /// + public bool TryDequeue(out ChangeEvent change) => Queue.TryDequeue(out change); + } + } +} diff --git a/plugins/ObjectCacheServer/src/CacheAuthKeyStore.cs b/plugins/ObjectCacheServer/src/CacheAuthKeyStore.cs index 5fc700b..5be0776 100644 --- a/plugins/ObjectCacheServer/src/CacheAuthKeyStore.cs +++ b/plugins/ObjectCacheServer/src/CacheAuthKeyStore.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2023 Vaughn Nugent +* Copyright (c) 2024 Vaughn Nugent * * Library: VNLib * Package: ObjectCacheServer @@ -34,16 +34,10 @@ using VNLib.Data.Caching.Extensions; namespace VNLib.Data.Caching.ObjectCache.Server { - sealed record class CacheAuthKeyStore : ICacheAuthManager + sealed class CacheAuthKeyStore(PluginBase plugin) : ICacheAuthManager { - private readonly IAsyncLazy _clientPub; - private readonly IAsyncLazy _cachePriv; - - public CacheAuthKeyStore(PluginBase plugin) - { - _clientPub = plugin.GetSecretAsync("client_public_key").ToLazy(r => r.GetJsonWebKey()); - _cachePriv = plugin.GetSecretAsync("cache_private_key").ToLazy(r => r.GetJsonWebKey()); - } + private readonly IAsyncLazy _clientPub = plugin.Secrets().GetSecretAsync("client_public_key").ToLazy(r => r.GetJsonWebKey()); + private readonly IAsyncLazy _cachePriv = plugin.Secrets().GetSecretAsync("cache_private_key").ToLazy(r => r.GetJsonWebKey()); /// public IReadOnlyDictionary GetJwtHeader() diff --git a/plugins/ObjectCacheServer/src/CacheConstants.cs b/plugins/ObjectCacheServer/src/CacheConstants.cs new file mode 100644 index 0000000..85f737d --- /dev/null +++ b/plugins/ObjectCacheServer/src/CacheConstants.cs @@ -0,0 +1,107 @@ +/* +* Copyright (c) 2024 Vaughn Nugent +* +* Library: VNLib +* Package: ObjectCacheServer +* File: CacheConstants.cs +* +* CacheConstants.cs is part of ObjectCacheServer which is +* part of the larger VNLib collection of libraries and utilities. +* +* ObjectCacheServer is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* ObjectCacheServer is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; + +namespace VNLib.Data.Caching.ObjectCache.Server +{ + internal static class CacheConstants + { + /// + /// The default path for the VNCache well known endpoint (aka discovery endpoint) + /// + public const string DefaultWellKnownPath = "/.well-known/vncache"; + + /// + /// The maximum size of buffers for FBM messages sent between servers. + /// + public const int MaxSyncMessageSize = 12 * 1024; + + /// + /// The maximum size of the change queue for the cache listener + /// + public const int CacheListenerChangeQueueSize = 10000; + + /// + /// The time a client authorization token is valid for + /// + public static readonly TimeSpan ClientAuthTokenExpiration = TimeSpan.FromSeconds(30); + + public static class LogScopes + { + /// + /// The log scope for the cache listener + /// + public const string BlobCacheListener = "CacheListener"; + + /// + /// The peer discovery log scope + /// + public const string PeerDiscovery = "DISC"; + + /// + /// The log scope for the replication FBM client debug log (if debugging is enabled) + /// + public const string ReplicationFbmDebug = "REPL-CLNT"; + + /// + /// The log scope for cache replication events + /// + public const string RepliactionManager = "REPL-MGR"; + + /// + /// The log scope for the cache listener change event queue + /// + public const string CacheListenerPubQueue = "QUEUE"; + + /// + /// The log scope for the cache connection websocket endpoint + /// + public const string ConnectionEndpoint = "CONEP"; + } + + public static class Delays + { + /// + /// The amount of startup delay before starting an initial peer discovery + /// + public static readonly TimeSpan InitialDiscovery = TimeSpan.FromSeconds(15); + + /// + /// The amount of time to wait before retrying a failed resolve + /// of a well-known peers + /// + public static readonly TimeSpan WellKnownResolveFailed = TimeSpan.FromSeconds(20); + + /// + /// The amount of time to wait when getting the value of a changed item from the cache + /// + /// + /// When an item change was detected from another peer, the cache will wait this + /// amount of time to get the new value from the cache before timing out. + /// + public static readonly TimeSpan CacheSyncGetItemTimeout = TimeSpan.FromSeconds(10); + } + } +} diff --git a/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs b/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs index a240dde..92f0352 100644 --- a/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs +++ b/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2023 Vaughn Nugent +* Copyright (c) 2024 Vaughn Nugent * * Library: VNLib * Package: ObjectCacheServer @@ -36,7 +36,6 @@ using VNLib.Net.Messaging.FBM; using VNLib.Net.Messaging.FBM.Client; using VNLib.Plugins.Extensions.Loading; using VNLib.Data.Caching.Extensions.Clustering; -using VNLib.Data.Caching.ObjectCache.Server.Cache; namespace VNLib.Data.Caching.ObjectCache.Server.Clustering { @@ -55,43 +54,36 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering internal sealed class CacheNodeReplicationMaanger : IAsyncBackgroundWork { - private const string LOG_SCOPE_NAME = "REPL"; - - private static readonly TimeSpan GetItemTimeout = TimeSpan.FromSeconds(10); - private const int MAX_MESSAGE_SIZE = 12 * 1024; - private readonly PluginBase _plugin; private readonly ILogProvider _log; - private readonly NodeConfig _nodeConfig; - private readonly ICacheStore _cacheStore; - private readonly ICachePeerAdapter _peerAdapter; private readonly FBMClientFactory _clientFactory; - + private readonly ObjectCacheSystemState _sysState; + private readonly bool _isDebug; private int _openConnections; public CacheNodeReplicationMaanger(PluginBase plugin) { - //Load the node config - _nodeConfig = plugin.GetOrCreateSingleton(); - _cacheStore = plugin.GetOrCreateSingleton(); - _peerAdapter = plugin.GetOrCreateSingleton(); + _sysState = plugin.GetOrCreateSingleton(); //Init fbm config with fixed message size FBMClientConfig clientConfig = FBMDataCacheExtensions.GetDefaultConfig( - (plugin as ObjectCacheServerEntry)!.ListenerHeap, - MAX_MESSAGE_SIZE, - debugLog: plugin.IsDebug() ? plugin.Log : null + _sysState.SharedCacheHeap, + CacheConstants.MaxSyncMessageSize, + debugLog: plugin.IsDebug() ? plugin.Log.CreateScope(CacheConstants.LogScopes.ReplicationFbmDebug) : null ); //Init ws fallback factory and client factory - FBMFallbackClientWsFactory wsFactory = new(); - _clientFactory = new(in clientConfig, wsFactory); + _clientFactory = new( + ref clientConfig, + new FBMFallbackClientWsFactory(), + (int)_sysState.ClusterConfig.MaxPeerConnections + ); _plugin = plugin; _isDebug = plugin.IsDebug(); - _log = plugin.Log.CreateScope(LOG_SCOPE_NAME); + _log = plugin.Log.CreateScope(CacheConstants.LogScopes.RepliactionManager); } public async Task DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken) @@ -103,7 +95,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering while (true) { //Get all new peers - CacheNodeAdvertisment[] peers = _peerAdapter.GetNewPeers(); + CacheNodeAdvertisment[] peers = _sysState.PeerDiscovery.GetNewPeers(); if (peers.Length == 0 && _isDebug) { @@ -111,7 +103,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering } //Make sure we don't exceed the max connections - if(_openConnections >= _nodeConfig.MaxPeerConnections) + if(_openConnections >= _sysState.ClusterConfig.MaxPeerConnections) { if (_isDebug) { @@ -148,15 +140,24 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering _log.Information("Node replication worker exited"); } + /* + * This method is called when a new peer has connected (or discovered) to establish a + * replication connection. + */ private async Task OnNewPeerDoWorkAsync(CacheNodeAdvertisment newPeer, ILogProvider log, CancellationToken exitToken) { - _ = newPeer ?? throw new ArgumentNullException(nameof(newPeer)); - - //Setup client + ArgumentNullException.ThrowIfNull(newPeer); + FBMClient client = _clientFactory.CreateClient(); - //Add peer to monitor - _peerAdapter.OnPeerListenerAttached(newPeer); + /* + * Notify discovery that we will be listening to this peer + * + * This exists so when a new discovery happens, the work loop will produce + * the difference of new peers to existing peers, and we can connect to them. + * Avoiding infinite connections to the same peer. + */ + _sysState.PeerDiscovery.OnPeerListenerAttached(newPeer); Interlocked.Increment(ref _openConnections); @@ -165,12 +166,12 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering log.Information("Establishing replication connection to peer {server}...", newPeer.NodeId); //Connect to the server - await client.ConnectToCacheAsync(newPeer, _nodeConfig.Config, exitToken); + await client.ConnectToCacheAsync(newPeer, _sysState.NodeConfig, exitToken); log.Information("Connected to {server}, starting queue listeners", newPeer.NodeId); //Start worker tasks - List workerTasks = new(); + List workerTasks = []; for (int i = 0; i < Environment.ProcessorCount; i++) { @@ -187,6 +188,10 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering //Disconnect client gracefully await client.DisconnectAsync(CancellationToken.None); } + catch(FBMServerNegiationException fbm) + { + log.Error("Failed to negotiate buffer configuration, check your cache memory configuration. Error:{err}", fbm.Message); + } catch (InvalidResponseException ie) { //See if the plugin is unloading @@ -218,7 +223,15 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering } catch (Exception ex) { - log.Warn("Lost connection to peer {h}\n {m}", newPeer.NodeId, ex); + //Avoid call stacks unless debug or higher logging levels + if (log.IsEnabled(LogLevel.Debug)) + { + log.Warn("Lost connection to peer {h}\n {m}", newPeer.NodeId, ex); + } + else + { + log.Warn("Lost connection to peer {h}\n {m}", newPeer.NodeId, ex.Message); + } } finally { @@ -226,8 +239,8 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering client.Dispose(); - //Notify monitor of disconnect - _peerAdapter.OnPeerListenerDetatched(newPeer); + //Notify monitor of disconnect to make it available again later + _sysState.PeerDiscovery.OnPeerListenerDetatched(newPeer); } } @@ -259,7 +272,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering return; case "deleted": //Delete the object from the store - await _cacheStore.DeleteItemAsync(changedObject.CurrentId, CancellationToken.None); + await _sysState.InternalStore.DeleteItemAsync(changedObject.CurrentId, CancellationToken.None); break; case "modified": //Reload the record from the store @@ -287,7 +300,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering modRequest.WriteHeader(ObjectId, string.IsNullOrWhiteSpace(newId) ? objectId : newId); //Make request - using FBMResponse response = await client.SendAsync(modRequest, GetItemTimeout, cancellation); + using FBMResponse response = await client.SendAsync(modRequest, CacheConstants.Delays.CacheSyncGetItemTimeout, cancellation); response.ThrowIfNotSet(); @@ -297,7 +310,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering if (ResponseCodes.Okay.Equals(status, StringComparison.Ordinal)) { //Update the record - await _cacheStore.AddOrUpdateBlobAsync(objectId, newId, static (t) => t.ResponseBody, response, cancellation); + await _sysState.InternalStore.AddOrUpdateBlobAsync(objectId, newId, static (t) => t.ResponseBody, response, cancellation); log.Debug("Updated object {id}", objectId); } else diff --git a/plugins/ObjectCacheServer/src/Clustering/CachePeerMonitor.cs b/plugins/ObjectCacheServer/src/Clustering/CachePeerMonitor.cs index c49a54b..c3fbd8e 100644 --- a/plugins/ObjectCacheServer/src/Clustering/CachePeerMonitor.cs +++ b/plugins/ObjectCacheServer/src/Clustering/CachePeerMonitor.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2023 Vaughn Nugent +* Copyright (c) 2024 Vaughn Nugent * * Library: VNLib * Package: ObjectCacheServer @@ -29,7 +29,6 @@ using System.Collections.Generic; using VNLib.Utils; using VNLib.Utils.Extensions; -using VNLib.Plugins; namespace VNLib.Data.Caching.ObjectCache.Server.Clustering { @@ -37,12 +36,9 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering internal sealed class CachePeerMonitor : VnDisposeable, IPeerMonitor { - private readonly LinkedList peers = new(); + private readonly List peers = new(); private readonly ManualResetEvent newPeerTrigger = new (false); - public CachePeerMonitor(PluginBase plugin) - { } - /// /// Waits for new peers to connect to the server /// @@ -70,7 +66,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering //When a peer is connected we can add it to the list so the replication manager can see it lock(peers) { - peers.AddLast(peer); + peers.Add(peer); } //Trigger monitor when change occurs @@ -92,6 +88,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering protected override void Free() { + peers.Clear(); newPeerTrigger.Dispose(); } } diff --git a/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs b/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs index 6475f9c..b9a220d 100644 --- a/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs +++ b/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2023 Vaughn Nugent +* Copyright (c) 2024 Vaughn Nugent * * Library: VNLib * Package: ObjectCacheServer @@ -24,14 +24,11 @@ using System; using System.Linq; -using System.Net.Http; using System.Threading; -using System.Net.Sockets; using System.Threading.Tasks; using System.Collections.Generic; using VNLib.Utils.Logging; -using VNLib.Plugins; using VNLib.Plugins.Extensions.Loading; using VNLib.Data.Caching.Extensions; using VNLib.Data.Caching.Extensions.Clustering; @@ -43,54 +40,19 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering * This class is responsible for resolving and discovering peer nodes in the cluster network. */ - internal sealed class PeerDiscoveryManager : IAsyncBackgroundWork, ICachePeerAdapter + internal sealed class PeerDiscoveryManager( + CacheNodeConfiguration config, + ServerClusterConfig clusterConf, + CachePeerMonitor Monitor, + ILogProvider Log, + bool IsDebug, + bool HasWellKnown + ) + : IAsyncBackgroundWork, ICachePeerAdapter { - private const string LOG_SCOPE_NAME = "DISC"; - /* - * The initial discovery delay. This allows for the server to initialize before - * starting the discovery process. This will probably be a shorter delay - * than a usual discovery interval. - */ - private static readonly TimeSpan InitialDelay = TimeSpan.FromSeconds(15); - private static readonly TimeSpan WhenWellKnownResolveFailed = TimeSpan.FromSeconds(20); - - - private readonly List _connectedPeers; - private readonly NodeConfig Config; - private readonly CachePeerMonitor Monitor; - private readonly ILogProvider Log; - private readonly bool IsDebug; - private readonly bool HasWellKnown; - - public PeerDiscoveryManager(PluginBase plugin) - { - //Get config - Config = plugin.GetOrCreateSingleton(); - - //Get the known peers array from config, its allowed to be null for master nodes - IConfigScope? config = plugin.TryGetConfig("known_peers"); - string[] kownPeers = config?.Deserialze() ?? Array.Empty(); - - //Add known peers to the monitor - Config.Config.WithInitialPeers(kownPeers.Select(static s => new Uri(s))); - - HasWellKnown = kownPeers.Length > 0; - - //Get the peer monitor - Monitor = plugin.GetOrCreateSingleton(); - - _connectedPeers = new(); - - //Create scoped logger - Log = plugin.Log.CreateScope(LOG_SCOPE_NAME); - - Log.Information("Inital peer nodes: {nodes}", kownPeers); - //Setup discovery error handler - Config.Config.WithErrorHandler(new ErrorHandler(Log)); - - IsDebug = plugin.IsDebug(); - } + private readonly List _connectedPeers = []; + private readonly VNCacheClusterManager clusterMan = new(config); async Task IAsyncBackgroundWork.DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken) { @@ -103,12 +65,11 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering //Start the change listener Task watcher = WatchForPeersAsync(exitToken); - Log.Information("Node discovery worker started, waiting for {idel} to start initial discovery", InitialDelay); + Log.Information("Node discovery worker started, waiting for {idel} to start initial discovery", CacheConstants.Delays.InitialDiscovery); try - { - //Wait for the initial delay - await Task.Delay(InitialDelay, exitToken); + { + await Task.Delay(CacheConstants.Delays.InitialDiscovery, exitToken); Log.Debug("Begining discovery loop"); @@ -123,26 +84,32 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering Log.Debug("Begining node discovery"); } - //Resolve all known peers - CacheNodeAdvertisment[] wellKnown = await Config.Config.ResolveWellKnownAsync(exitToken); - wellKnownFailed = wellKnown.Length == 0; + /* + * On every loop we will need to resolve well-known servers incase they go down + * or change. There probably should be some more advanced logic and caching here. + * + * Node may not have any well-known nodes, so we need to check for that. + */ + CacheNodeAdvertisment[] wellKnown = HasWellKnown ? + await clusterMan.ResolveWellKnownAsync(exitToken) : + Array.Empty(); //Use the monitor to get the initial peers IEnumerable ads = GetMonitorAds(); - //Combine well-known with new connected peers + //Combine well-known peers that are currently connected to this server CacheNodeAdvertisment[] allAds = ads.Union(wellKnown).ToArray(); if (allAds.Length > 0) { - //Discover all known nodes - await Config.Config.DiscoverNodesAsync(allAds, exitToken); + //Build the discovery map from all the known nodes to find all known nodes in the entire cluster + await clusterMan.DiscoverNodesAsync(allAds, exitToken); } //Log the discovered nodes if verbose logging is enabled if (IsDebug) { - CacheNodeAdvertisment[] found = Config.Config.NodeCollection.GetAllNodes(); + CacheNodeAdvertisment[] found = clusterMan.DiscoveredNodes.GetAllNodes(); Log.Debug("Discovered {count} nodes\n\t{nodes}", found.Length, found.Select(static s => s.NodeId)); } @@ -168,16 +135,16 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering { if (IsDebug) { - Log.Debug("Well known node discovery failed, waiting for {idel} before retrying", WhenWellKnownResolveFailed); + Log.Debug("Well known node discovery failed, waiting for {idel} before retrying", CacheConstants.Delays.WellKnownResolveFailed); } //Wait for shorter duration - await Task.Delay(WhenWellKnownResolveFailed, exitToken); + await Task.Delay(CacheConstants.Delays.WellKnownResolveFailed, exitToken); } else { //Delay the next discovery - await Task.Delay(Config.DiscoveryInterval, exitToken); + await Task.Delay(clusterConf.DiscoveryInterval, exitToken); } } } @@ -188,7 +155,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering } finally { - + Monitor.Dispose(); } //Wait for the watcher to exit @@ -197,10 +164,11 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering private IEnumerable GetMonitorAds() { + string selfId = (clusterMan.Config as CacheNodeConfiguration)!.NodeId; return Monitor.GetAllPeers() .Where(static p => p.Advertisment != null) //Without us - .Where(n => n.NodeId != Config.Config.NodeId) + .Where(n => !string.Equals(n.NodeId, selfId, StringComparison.OrdinalIgnoreCase)) .Select(static p => p.Advertisment!); } @@ -222,7 +190,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering //Use the monitor to get the initial peers IEnumerable ads = GetMonitorAds(); - ((NodeDiscoveryCollection)Config.Config.NodeCollection).AddManualNodes(ads); + clusterMan.AddManualNodes(ads); } } catch (OperationCanceledException) @@ -239,7 +207,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering lock (_connectedPeers) { //Get all discovered peers - CacheNodeAdvertisment[] peers = Config.Config.NodeCollection.GetAllNodes(); + CacheNodeAdvertisment[] peers = clusterMan.DiscoveredNodes.GetAllNodes(); //Get the difference between the discovered peers and the connected peers return peers.Except(_connectedPeers).ToArray(); @@ -265,31 +233,5 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering _connectedPeers.Remove(peer); } } - - - private sealed record class ErrorHandler(ILogProvider Logger) : ICacheDiscoveryErrorHandler - { - public void OnDiscoveryError(CacheNodeAdvertisment errorNode, Exception ex) - { - - if (ex is HttpRequestException hre) - { - if (hre.InnerException is SocketException se) - { - //traisnport failed - Logger.Warn("Failed to connect to server {serv} because {err}", errorNode, se.Message); - } - else - { - Logger.Error("Failed to connect to node {n}\n{err}", errorNode, hre); - } - } - else - { - Logger.Error("Failed to discover nodes from nodeid {nid}, with error\n{err}", errorNode, ex); - } - - } - } } } diff --git a/plugins/ObjectCacheServer/src/Endpoints/CacheNegotationManager.cs b/plugins/ObjectCacheServer/src/Endpoints/CacheNegotationManager.cs index d1591f8..99433e1 100644 --- a/plugins/ObjectCacheServer/src/Endpoints/CacheNegotationManager.cs +++ b/plugins/ObjectCacheServer/src/Endpoints/CacheNegotationManager.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2023 Vaughn Nugent +* Copyright (c) 2024 Vaughn Nugent * * Library: VNLib * Package: ObjectCacheServer @@ -45,7 +45,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints public bool IsPeer { get; set; } } - internal sealed class CacheNegotationManager + internal sealed class CacheNegotationManager(PluginBase plugin) { /* * Cache keys are centralized and may be shared between all cache server nodes. This means @@ -62,23 +62,11 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints * client could use trial and error to find the servers buffer configuration. */ - private static readonly TimeSpan AuthTokenExpiration = TimeSpan.FromSeconds(30); + private readonly string AudienceLocalServerId = Guid.NewGuid().ToString("N"); - private readonly string AudienceLocalServerId; - private readonly NodeConfig _nodeConfig; - private readonly CacheConfiguration _cacheConfig; + private readonly ObjectCacheSystemState _sysState = plugin.GetOrCreateSingleton(); - public CacheNegotationManager(PluginBase plugin) - { - //Get node configuration - _nodeConfig = plugin.GetOrCreateSingleton(); - - //Get the cache store configuration - _cacheConfig = plugin.GetConfigForType().Deserialze(); - - AudienceLocalServerId = Guid.NewGuid().ToString("N"); - } - + private CacheMemoryConfiguration CacheConfig => _sysState.MemoryConfiguration; public bool IsClientNegotiationValid(string authToken, out ClientNegotiationState state) { @@ -88,12 +76,12 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints using JsonWebToken jwt = JsonWebToken.Parse(authToken); //verify signature for client - if (_nodeConfig.KeyStore.VerifyJwt(jwt, false)) + if (_sysState.KeyStore.VerifyJwt(jwt, false)) { //Validated as normal client } //May be signed by a cache server - else if (_nodeConfig.KeyStore.VerifyJwt(jwt, true)) + else if (_sysState.KeyStore.VerifyJwt(jwt, true)) { //Set peer and verified flag since the another cache server signed the request state.IsPeer = true; @@ -117,16 +105,16 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints return true; } - public JsonWebToken ConfirmCLientNegotiation(ClientNegotiationState state, IPAddress clientIp, DateTimeOffset now) + public JsonWebToken ConfirmClientNegotiation(ClientNegotiationState state, IPAddress clientIp, DateTimeOffset now) { //Verified, now we can create an auth message with a short expiration JsonWebToken auth = new(); - auth.WriteHeader(_nodeConfig.KeyStore.GetJwtHeader()); + auth.WriteHeader(_sysState.KeyStore.GetJwtHeader()); auth.InitPayloadClaim() .AddClaim("aud", AudienceLocalServerId) .AddClaim("iat", now.ToUnixTimeSeconds()) - .AddClaim("exp", now.Add(AuthTokenExpiration).ToUnixTimeSeconds()) + .AddClaim("exp", now.Add(CacheConstants.ClientAuthTokenExpiration).ToUnixTimeSeconds()) .AddClaim("nonce", RandomHash.GetRandomBase32(8)) .AddClaim("chl", state.Challenge!) //Set the ispeer flag if the request was signed by a cache server @@ -136,24 +124,29 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints //Set ip address .AddClaim("ip", clientIp.ToString()) //Add negotiaion args - .AddClaim(FBMClient.REQ_HEAD_BUF_QUERY_ARG, _cacheConfig.MaxHeaderBufferSize) - .AddClaim(FBMClient.REQ_RECV_BUF_QUERY_ARG, _cacheConfig.MaxRecvBufferSize) - .AddClaim(FBMClient.REQ_MAX_MESS_QUERY_ARG, _cacheConfig.MaxMessageSize) + .AddClaim(FBMClient.REQ_HEAD_BUF_QUERY_ARG, CacheConfig.MaxHeaderBufferSize) + .AddClaim(FBMClient.REQ_RECV_BUF_QUERY_ARG, CacheConfig.MaxRecvBufferSize) + .AddClaim(FBMClient.REQ_MAX_MESS_QUERY_ARG, CacheConfig.MaxMessageSize) .CommitClaims(); //Sign the auth message from our private key - _nodeConfig.KeyStore.SignJwt(auth); + _sysState.KeyStore.SignJwt(auth); return auth; } - public bool ValidateUpgrade(string upgradeToken, string tokenSignature, DateTimeOffset now, IPAddress connectionIp, ref string? nodeId, ref bool isPeer) + public bool ValidateUpgrade(string? upgradeToken, string? tokenSignature, DateTimeOffset now, IPAddress connectionIp, ref string? nodeId, ref bool isPeer) { + if(string.IsNullOrWhiteSpace(upgradeToken) || string.IsNullOrWhiteSpace(tokenSignature)) + { + return false; + } + //Parse jwt using JsonWebToken jwt = JsonWebToken.Parse(upgradeToken); //verify signature against the cache public key, since this server must have signed it - if (!_nodeConfig.KeyStore.VerifyCachePeer(jwt)) + if (!_sysState.KeyStore.VerifyCachePeer(jwt)) { return false; } @@ -175,7 +168,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints } //Check node ip address matches if required - if (_nodeConfig.VerifyIp) + if (_sysState.ClusterConfig.VerifyIp) { if (!doc.RootElement.TryGetProperty("ip", out JsonElement ipEl)) { @@ -201,7 +194,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints } //Verify token signature against a fellow cache public key - return _nodeConfig.KeyStore.VerifyUpgradeToken(tokenSignature, upgradeToken, isPeer); + return _sysState.KeyStore.VerifyUpgradeToken(tokenSignature, upgradeToken, isPeer); } } } diff --git a/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs b/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs index 816e6c3..8368d3a 100644 --- a/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs +++ b/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2023 Vaughn Nugent +* Copyright (c) 2024 Vaughn Nugent * * Library: VNLib * Package: ObjectCacheServer @@ -53,13 +53,13 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints internal sealed class ConnectEndpoint : ResourceEndpointBase { - internal const string LOG_SCOPE_NAME = "CONEP"; + private readonly ObjectCacheSystemState _sysState; - - private readonly ICacheEventQueueManager PubSubManager; - private readonly IPeerMonitor Peers; - private readonly BlobCacheListener Store; - private readonly NodeConfig NodeConfiguration; + private PeerEventQueueManager PubSubManager => _sysState.PeerEventQueue; + private CachePeerMonitor Peers => _sysState.PeerMonitor; + private BlobCacheListener Listener => _sysState.Listener; + private ServerClusterConfig ClusterConfiguration => _sysState.ClusterConfig; + private readonly CacheNegotationManager AuthManager; private uint _connectedClients; @@ -72,7 +72,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints /// /// The cache store configuration /// - public CacheConfiguration CacheConfig { get; } + public CacheMemoryConfiguration CacheConfig => _sysState.MemoryConfiguration; //Loosen up protection settings /// @@ -83,24 +83,11 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints public ConnectEndpoint(PluginBase plugin) { - //Get node configuration - NodeConfiguration = plugin.GetOrCreateSingleton(); + _sysState = plugin.GetOrCreateSingleton(); //Init from config and create a new log scope - InitPathAndLog(NodeConfiguration.ConnectPath, plugin.Log.CreateScope(LOG_SCOPE_NAME)); - - //Setup pub/sub manager - PubSubManager = plugin.GetOrCreateSingleton(); - - //Get peer monitor - Peers = plugin.GetOrCreateSingleton(); - - //Init the cache store - Store = plugin.GetOrCreateSingleton().Listener; - - //Get the cache store configuration - CacheConfig = plugin.GetConfigForType().Deserialze(); - + InitPathAndLog(ClusterConfiguration.ConnectPath, plugin.Log.CreateScope(CacheConstants.LogScopes.ConnectionEndpoint)); + //Get the auth manager AuthManager = plugin.GetOrCreateSingleton(); } @@ -127,6 +114,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints { //Parse jwt from authoriation string? jwtAuth = entity.Server.Headers[HttpRequestHeader.Authorization]; + if (string.IsNullOrWhiteSpace(jwtAuth)) { return VirtualClose(entity, HttpStatusCode.Forbidden); @@ -149,26 +137,34 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints } //Verified, now we can create an auth message with a short expiration - using JsonWebToken auth = AuthManager.ConfirmCLientNegotiation(state, entity.TrustedRemoteIp, entity.RequestedTimeUtc); + using JsonWebToken auth = AuthManager.ConfirmClientNegotiation(state, entity.TrustedRemoteIp, entity.RequestedTimeUtc); - //Close response + //Close response by sending a copy of the signed token entity.CloseResponse(HttpStatusCode.OK, ContentType.Text, auth.DataBuffer); return VfReturnType.VirtualSkip; } protected override VfReturnType WebsocketRequested(HttpEntity entity) { + /* + * Check to see if any more connections are allowed, + * otherwise deny the connection + * + * This is done here to prevent the server from being overloaded + * on a new connection. It would be ideal to not grant new tokens + * but malicious clients could cache a bunch of tokens and use them + * later, exhausting resources. + */ + if(_connectedClients >= ClusterConfiguration.MaxConcurrentConnections) + { + return VirtualClose(entity, HttpStatusCode.ServiceUnavailable); + } + //Parse jwt from authorization string? jwtAuth = entity.Server.Headers[HttpRequestHeader.Authorization]; string? clientSignature = entity.Server.Headers[FBMDataCacheExtensions.X_UPGRADE_SIG_HEADER]; string? optionalDiscovery = entity.Server.Headers[FBMDataCacheExtensions.X_NODE_DISCOVERY_HEADER]; - //Not null - if (string.IsNullOrWhiteSpace(jwtAuth) || string.IsNullOrWhiteSpace(clientSignature)) - { - return VfReturnType.Forbidden; - } - string? nodeId = null; bool isPeer = false; @@ -178,17 +174,17 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints return VirtualClose(entity, HttpStatusCode.Unauthorized); } - CacheNodeAdvertisment? discoveryAd = null; - /* * If the client is a peer server, it may offer a signed advertisment * that this node will have the duty of making available to other peers * if it is valid */ - if (isPeer && !string.IsNullOrWhiteSpace(optionalDiscovery)) + CacheNodeAdvertisment? discoveryAd = null; + + if (isPeer) { - discoveryAd = NodeConfiguration.KeyStore.VerifyPeerAdvertisment(optionalDiscovery); + discoveryAd = _sysState.KeyStore.VerifyPeerAdvertisment(optionalDiscovery); } WsUserState state; @@ -196,11 +192,10 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints try { //Get query config suggestions from the client - string recvBufCmd = entity.QueryArgs[FBMClient.REQ_RECV_BUF_QUERY_ARG]; - string maxHeaderCharCmd = entity.QueryArgs[FBMClient.REQ_HEAD_BUF_QUERY_ARG]; - string maxMessageSizeCmd = entity.QueryArgs[FBMClient.REQ_MAX_MESS_QUERY_ARG]; + string? recvBufCmd = entity.QueryArgs.GetValueOrDefault(FBMClient.REQ_RECV_BUF_QUERY_ARG); + string? maxHeaderCharCmd = entity.QueryArgs.GetValueOrDefault(FBMClient.REQ_HEAD_BUF_QUERY_ARG); + string? maxMessageSizeCmd = entity.QueryArgs.GetValueOrDefault(FBMClient.REQ_MAX_MESS_QUERY_ARG); - //Parse recv buffer size int recvBufSize = int.TryParse(recvBufCmd, out int rbs) ? rbs : CacheConfig.MinRecvBufferSize; int maxHeadBufSize = int.TryParse(maxHeaderCharCmd, out int hbs) ? hbs : CacheConfig.MinHeaderBufferSize; int maxMessageSize = int.TryParse(maxMessageSizeCmd, out int mxs) ? mxs : CacheConfig.MaxMessageSize; @@ -253,9 +248,8 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints Peers.OnPeerConnected(state); //Register plugin exit token to cancel the connected socket - CancellationTokenRegistration reg = this.GetPlugin().UnloadToken.Register(wss.CancelAll); - - //Inc connected count + await using CancellationTokenRegistration reg = this.GetPlugin().UnloadToken.Register(wss.CancelAll); + Interlocked.Increment(ref _connectedClients); try @@ -280,7 +274,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints try { //Begin listening for messages with a queue - await Store.ListenAsync(wss, queue, args); + await Listener.ListenAsync(wss, queue, args); } finally { @@ -291,7 +285,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints else { //Begin listening for messages without a queue - await Store.ListenAsync(wss, null!, args); + await Listener.ListenAsync(wss, null!, args); } } catch (OperationCanceledException) @@ -303,15 +297,13 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints } catch (Exception ex) { - Log.Debug(ex); + //If debug logging is enabled print a more detailed error message + Log.Error("An error occured on websocket connection: node {con} -> {error}", state.NodeId, ex.Message); + Log.Debug("Websocket connection error: node {con}\n{error}", state.NodeId, ex); } - - //Dec connected count + Interlocked.Decrement(ref _connectedClients); - //Unregister the token - reg.Unregister(); - //Notify monitor of disconnect Peers.OnPeerDisconnected(state); diff --git a/plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs b/plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs index 7d376b8..8038b70 100644 --- a/plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs +++ b/plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2023 Vaughn Nugent +* Copyright (c) 2024 Vaughn Nugent * * Library: VNLib * Package: ObjectCacheServer @@ -40,25 +40,27 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints { internal sealed class PeerDiscoveryEndpoint : ResourceEndpointBase { - private readonly IPeerMonitor PeerMonitor; - private readonly NodeConfig Config; + private readonly ObjectCacheSystemState _sysState; + + private CacheAuthKeyStore KeyStore => _sysState.KeyStore; + + private CachePeerMonitor PeerMonitor => _sysState.PeerMonitor; - //Loosen up protection settings /// protected override ProtectionSettings EndpointProtectionSettings { get; } = new() { - DisableSessionsRequired = true + /* + * Sessions will not be used or required for this endpoint. + * We should also assume the session system is not even loaded + */ + DisableSessionsRequired = true }; public PeerDiscoveryEndpoint(PluginBase plugin) { - //Get the peer monitor - PeerMonitor = plugin.GetOrCreateSingleton(); + _sysState = plugin.GetOrCreateSingleton(); - //Get the node config - Config = plugin.GetOrCreateSingleton(); - - InitPathAndLog(Config.DiscoveryPath, plugin.Log); + InitPathAndLog(_sysState.ClusterConfig.DiscoveryPath!, plugin.Log); } protected override VfReturnType Get(HttpEntity entity) @@ -68,36 +70,41 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints if(string.IsNullOrWhiteSpace(authToken)) { - entity.CloseResponse(HttpStatusCode.Unauthorized); - return VfReturnType.VirtualSkip; + return VirtualClose(entity, HttpStatusCode.Unauthorized); } string subject = string.Empty; string challenge = string.Empty; - //Parse auth token - using(JsonWebToken jwt = JsonWebToken.Parse(authToken)) + try { + //Parse auth token + using JsonWebToken jwt = JsonWebToken.Parse(authToken); + //try to verify against cache node first - if (!Config.KeyStore.VerifyJwt(jwt, true)) + if (!KeyStore.VerifyJwt(jwt, true)) { //failed... //try to verify against client key - if (!Config.KeyStore.VerifyJwt(jwt, false)) + if (!KeyStore.VerifyJwt(jwt, false)) { //invalid token - entity.CloseResponse(HttpStatusCode.Unauthorized); - return VfReturnType.VirtualSkip; + return VirtualClose(entity, HttpStatusCode.Unauthorized); } } using JsonDocument payload = jwt.GetPayload(); //Get client info to pass back - subject = payload.RootElement.TryGetProperty("sub", out JsonElement subEl) ? subEl.GetString() ?? string.Empty : string.Empty; + subject = payload.RootElement.TryGetProperty("sub", out JsonElement subEl) ? subEl.GetString() ?? string.Empty : string.Empty; challenge = payload.RootElement.GetProperty("chl").GetString() ?? string.Empty; } + catch (FormatException) + { + //If tokens are invalid format, let the client know instead of a server error + return VfReturnType.BadRequest; + } //Valid key, get peer list to send to client CacheNodeAdvertisment[] peers = PeerMonitor.GetAllPeers() @@ -109,10 +116,10 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints using JsonWebToken response = new(); //set header from cache config - response.WriteHeader(Config.KeyStore.GetJwtHeader()); + response.WriteHeader(KeyStore.GetJwtHeader()); response.InitPayloadClaim() - .AddClaim("iss", Config.Config.NodeId) + .AddClaim("iss", _sysState.NodeConfig.NodeId) //Audience is the requestor id .AddClaim("sub", subject) .AddClaim("iat", entity.RequestedTimeUtc.ToUnixTimeSeconds()) @@ -122,10 +129,9 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints .AddClaim("chl", challenge) .CommitClaims(); - //Sign the response - Config.KeyStore.SignJwt(response); - - //Send response to client + + KeyStore.SignJwt(response); + entity.CloseResponse(HttpStatusCode.OK, Net.Http.ContentType.Text, response.DataBuffer); return VfReturnType.VirtualSkip; } diff --git a/plugins/ObjectCacheServer/src/Endpoints/WellKnownEndpoint.cs b/plugins/ObjectCacheServer/src/Endpoints/WellKnownEndpoint.cs index 87a471b..18855e3 100644 --- a/plugins/ObjectCacheServer/src/Endpoints/WellKnownEndpoint.cs +++ b/plugins/ObjectCacheServer/src/Endpoints/WellKnownEndpoint.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2023 Vaughn Nugent +* Copyright (c) 2024 Vaughn Nugent * * Library: VNLib * Package: ObjectCacheServer @@ -59,13 +59,13 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints public WellKnownEndpoint(PluginBase plugin) { //Get the node config - NodeConfig nodeConfig = plugin.GetOrCreateSingleton(); + ObjectCacheSystemState conf = plugin.GetOrCreateSingleton(); //serialize the config, discovery may not be enabled - _advertisment = nodeConfig.Config.Advertisment; - _keyStore = nodeConfig.KeyStore; + _advertisment = conf.NodeConfig.Advertisment; + _keyStore = conf.KeyStore; - InitPathAndLog(nodeConfig.WellKnownPath, plugin.Log); + InitPathAndLog(conf.ClusterConfig.WellKnownPath, plugin.Log); } protected override VfReturnType Get(HttpEntity entity) diff --git a/plugins/ObjectCacheServer/src/NodeConfig.cs b/plugins/ObjectCacheServer/src/NodeConfig.cs deleted file mode 100644 index 3a2e10e..0000000 --- a/plugins/ObjectCacheServer/src/NodeConfig.cs +++ /dev/null @@ -1,191 +0,0 @@ -/* -* Copyright (c) 2023 Vaughn Nugent -* -* Library: VNLib -* Package: ObjectCacheServer -* File: NodeConfig.cs -* -* NodeConfig.cs is part of ObjectCacheServer which is part of the larger -* VNLib collection of libraries and utilities. -* -* ObjectCacheServer is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License as -* published by the Free Software Foundation, either version 3 of the -* License, or (at your option) any later version. -* -* ObjectCacheServer is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see https://www.gnu.org/licenses/. -*/ - -using System; -using System.Net; -using System.Linq; -using System.Text.Json; -using System.Collections.Generic; - -using VNLib.Plugins; -using VNLib.Utils.Logging; -using VNLib.Utils.Extensions; -using VNLib.Plugins.Extensions.Loading; -using VNLib.Data.Caching.Extensions.Clustering; - - -namespace VNLib.Data.Caching.ObjectCache.Server -{ - [ConfigurationName("cluster")] - internal sealed class NodeConfig - { - //Default path for the well known endpoint - const string DefaultPath = "/.well-known/vncache"; - - public CacheNodeConfiguration Config { get; } - - public CacheAuthKeyStore KeyStore { get; } - - public TimeSpan DiscoveryInterval { get; } - - public TimeSpan EventQueuePurgeInterval { get; } - - public int MaxQueueDepth { get; } - - public string? DiscoveryPath { get; } - - public string ConnectPath { get; } - - public string WellKnownPath { get; } - - public bool VerifyIp { get; } - - /// - /// The maximum number of peer connections to allow - /// - public uint MaxPeerConnections { get; } = 10; - - public NodeConfig(PluginBase plugin, IConfigScope config) - { - //Get the port of the primary webserver - int port; - bool usingTls; - { - //Get the port number of the first virtual host - JsonElement firstHost = plugin.HostConfig.GetProperty("virtual_hosts") - .EnumerateArray() - .First(); - - port = firstHost.GetProperty("interface") - .GetProperty("port") - .GetInt32(); - - //If the ssl element is present, ssl is enabled for the server - usingTls = firstHost.TryGetProperty("ssl", out _); - } - string hostname = Dns.GetHostName(); - - //Server id is just dns name for now - string nodeId = $"{hostname}:{port}"; - - //Init key store - KeyStore = new(plugin); - - - DiscoveryInterval = config["discovery_interval_sec"].GetTimeSpan(TimeParseType.Seconds); - - //Get the event queue purge interval - EventQueuePurgeInterval = config["queue_purge_interval_sec"].GetTimeSpan(TimeParseType.Seconds); - - //Get the max queue depth - MaxQueueDepth = (int)config["max_queue_depth"].GetUInt32(); - - - //Get the connect path - ConnectPath = config["connect_path"].GetString() ?? throw new KeyNotFoundException("Missing required key 'connect_path' in cluster config"); - - //Get the verify ip setting - VerifyIp = config["verify_ip"].GetBoolean(); - - Uri connectEp = BuildUri(usingTls, hostname, port, ConnectPath); - Uri? discoveryEp = null; - - Config = new(); - - //Setup cache node config - Config.WithCacheEndpoint(connectEp) - .WithNodeId(nodeId) - .WithAuthenticator(KeyStore) - .WithTls(usingTls); - - //Get the discovery path (optional) - if (config.TryGetValue("discovery_path", out JsonElement discoveryPathEl)) - { - DiscoveryPath = discoveryPathEl.GetString(); - - //Enable advertisment if a discovery path is present - if (!string.IsNullOrEmpty(DiscoveryPath)) - { - //Build the discovery endpoint, it must be an absolute uri - discoveryEp = BuildUri(usingTls, hostname, port, DiscoveryPath); - Config.EnableAdvertisment(discoveryEp); - } - } - - //Allow custom well-known path - if(config.TryGetValue("well_known_path", out JsonElement wkEl)) - { - WellKnownPath = wkEl.GetString() ?? DefaultPath; - } - //Default if not set - WellKnownPath ??= DefaultPath; - - //Get the max peer connections - if (config.TryGetValue("max_peers", out JsonElement maxPeerEl)) - { - MaxPeerConnections = maxPeerEl.GetUInt32(); - } - - const string CacheConfigTemplate = -@" -Cluster Configuration: - Node Id: {id} - TlsEndabled: {tls} - Verify Ip: {vi} - Well-Known: {wk} - Cache Endpoint: {ep} - Discovery Endpoint: {dep} - Discovery Interval: {di} - Max Peer Connections: {mpc} - Max Queue Depth: {mqd} - Event Queue Purge Interval: {eqpi} -"; - - //log the config - plugin.Log.Information(CacheConfigTemplate, - nodeId, - usingTls, - VerifyIp, - WellKnownPath, - connectEp, - discoveryEp, - DiscoveryInterval, - MaxPeerConnections, - MaxQueueDepth, - EventQueuePurgeInterval - ); - } - - private static Uri BuildUri(bool tls, string host, int port, string path) - { - return new UriBuilder - { - Scheme = tls ? "https" : "http", - Host = host, - Port = port, - Path = path - }.Uri; - } - } -} diff --git a/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs b/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs index aada787..42bd0c7 100644 --- a/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs +++ b/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs @@ -1,5 +1,5 @@ /* -* Copyright (c) 2023 Vaughn Nugent +* Copyright (c) 2024 Vaughn Nugent * * Library: VNLib * Package: ObjectCacheServer @@ -23,13 +23,11 @@ */ using System; -using System.Threading; using System.Collections.Generic; using VNLib.Plugins; -using VNLib.Utils.Memory; +using VNLib.Utils; using VNLib.Utils.Logging; -using VNLib.Utils.Memory.Diagnostics; using VNLib.Plugins.Extensions.Loading; using VNLib.Plugins.Extensions.Loading.Routing; @@ -43,38 +41,15 @@ namespace VNLib.Data.Caching.ObjectCache.Server { public override string PluginName => "ObjectCache.Service"; - private readonly Lazy _cacheHeap; - - internal IUnmangedHeap ListenerHeap => _cacheHeap.Value; - - public ObjectCacheServerEntry() - { - //Init heap - _cacheHeap = new Lazy(InitializeHeap, LazyThreadSafetyMode.PublicationOnly); - } - - internal IUnmangedHeap InitializeHeap() - { - //Create default heap - IUnmangedHeap _heap = MemoryUtil.InitializeNewHeapForProcess(); - try - { - //If the plugin is in debug mode enable heap tracking - return this.IsDebug() ? new TrackedHeapWrapper(_heap, true) : _heap; - } - catch - { - _heap.Dispose(); - throw; - } - } + ObjectCacheSystemState? sysState; protected override void OnLoad() { try { - //Get the node configuration first - NodeConfig config = this.GetOrCreateSingleton(); + //Initialize the cache node builder + sysState = this.GetOrCreateSingleton(); + sysState.Initialize(); //Route well-known endpoint this.Route(); @@ -85,8 +60,8 @@ namespace VNLib.Data.Caching.ObjectCache.Server //We must initialize the replication manager _ = this.GetOrCreateSingleton(); - //Setup discovery endpoint - if(!string.IsNullOrWhiteSpace(config.DiscoveryPath)) + //Setup discovery endpoint only if the user enabled clustering + if(!string.IsNullOrWhiteSpace(sysState.ClusterConfig.DiscoveryPath)) { this.Route(); } @@ -101,18 +76,34 @@ namespace VNLib.Data.Caching.ObjectCache.Server protected override void OnUnLoad() { - //dispose heap if initialized - if(_cacheHeap.IsValueCreated) - { - _cacheHeap.Value.Dispose(); - } - Log.Information("Plugin unloaded"); } protected override void ProcessHostCommand(string cmd) { - throw new NotImplementedException(); + if(string.IsNullOrWhiteSpace(cmd)) + { + return; + } + + ArgumentList al = new(cmd.Split(" ")); + + if(al.Count == 0) + { + Log.Warn("Invalid command"); + return; + } + + switch (al[0].ToLower(null)) + { + case "memstats": + sysState?.LogMemoryStats(); + break; + + default: + Log.Warn("Invalid command"); + break; + } } } } diff --git a/plugins/ObjectCacheServer/src/ObjectCacheSystemState.cs b/plugins/ObjectCacheServer/src/ObjectCacheSystemState.cs new file mode 100644 index 0000000..cd5bf1b --- /dev/null +++ b/plugins/ObjectCacheServer/src/ObjectCacheSystemState.cs @@ -0,0 +1,331 @@ +/* +* Copyright (c) 2024 Vaughn Nugent +* +* Library: VNLib +* Package: ObjectCacheServer +* File: ObjectCacheSystemState.cs +* +* ObjectCacheSystemState.cs is part of ObjectCacheServer which is +* part of the larger VNLib collection of libraries and utilities. +* +* ObjectCacheServer is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* ObjectCacheServer is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Linq; +using System.Net.Http; +using System.Net.Sockets; +using System.Threading; +using System.Threading.Tasks; +using System.Collections.Generic; + +using VNLib.Utils.Logging; +using VNLib.Utils.Memory; +using VNLib.Utils.Memory.Diagnostics; +using VNLib.Net.Messaging.FBM; +using VNLib.Plugins; +using VNLib.Plugins.Extensions.Loading; + +using VNLib.Data.Caching.Extensions.Clustering; +using VNLib.Data.Caching.ObjectCache.Server.Cache; +using VNLib.Data.Caching.ObjectCache.Server.Clustering; + +namespace VNLib.Data.Caching.ObjectCache.Server +{ + /* + * The purpose of this class is to manage the state of the entire cache server. + * All configuration and state should be creatd and managed by this class. To make it + * easier to manage. + */ + [ConfigurationName("cache")] + internal sealed class ObjectCacheSystemState(PluginBase plugin, IConfigScope config) : IDisposable + { + public BlobCacheListener Listener { get; private set; } = null!; + + public ICacheStore InternalStore { get; private set; } = null!; + + /// + /// Used for miscellaneous shared memory allocations (like the cache listener) + /// + public IUnmangedHeap SharedCacheHeap { get; private set; } = null!; + + /// + /// The plugin-wide, shared node configuration + /// + public ServerClusterConfig ClusterConfig { get; } = plugin.GetOrCreateSingleton(); + + /// + /// The system wide cache authenticator + /// + public CacheAuthKeyStore KeyStore { get; } = new(plugin); + + /// + /// The system cache node configuration + /// + public CacheNodeConfiguration NodeConfig { get; private set; } + + /// + /// The peer discovery manager + /// + public PeerDiscoveryManager PeerDiscovery { get; private set; } = null!; + + /// + /// System wide peer monitor + /// + public CachePeerMonitor PeerMonitor { get; } = new(); + + public CacheMemoryConfiguration MemoryConfiguration { get; } = config.Deserialze(); + + /// + /// The system wide peer event queue manager + /// + public PeerEventQueueManager PeerEventQueue { get; private set; } + + private ICacheMemoryManagerFactory _cacheMemManager; + + void IDisposable.Dispose() + { + SharedCacheHeap.Dispose(); + Listener.Dispose(); + } + + /// + /// Initializes the cache node state + /// + public void Initialize() + { + CacheMemoryConfiguration cacheConf = MemoryConfiguration; + + ArgumentOutOfRangeException.ThrowIfLessThan(cacheConf.MaxCacheEntries, 2u); + + //Suggestion + if (cacheConf.MaxCacheEntries < 200) + { + plugin.Log.Information("Suggestion: You may want a larger cache size, you have less than 200 items in cache"); + } + + LogMemConfiguration(); + + PeerEventQueue = new(plugin, ClusterConfig); + + //If the plugin is in debug mode enable heap tracking + SharedCacheHeap = plugin.IsDebug() ? + new TrackedHeapWrapper(MemoryUtil.InitializeNewHeapForProcess(), true) + : MemoryUtil.InitializeNewHeapForProcess(); + + //Load node configuration first + (NodeConfig = ClusterConfig.BuildNodeConfig()) + .WithAuthenticator(KeyStore); //Also pass the key store to the node config + + ConfigurePeerDiscovery(); + + ConfigureCacheListener(); + } + + private void ConfigurePeerDiscovery() + { + //Get the known peers array from config, its allowed to be null for master nodes + IConfigScope? config = plugin.TryGetConfig("known_peers"); + string[] kownPeers = config?.Deserialze() ?? []; + + ILogProvider discLogger = plugin.Log.CreateScope(CacheConstants.LogScopes.PeerDiscovery); + + //Allow just origin nodes to be used as known peers + IEnumerable peerUris = kownPeers.Select(static p => + { + Uri bUri = new(p, UriKind.Absolute); + return bUri.LocalPath == "/" ? new Uri(bUri, CacheConstants.DefaultWellKnownPath) : bUri; + }); + + NodeConfig.WithInitialPeers(peerUris) + .WithErrorHandler(new ErrorHandler(discLogger)); + + discLogger.Information("Inital peer nodes: {nodes}", kownPeers); + + PeerDiscovery = new PeerDiscoveryManager( + NodeConfig, + ClusterConfig, + PeerMonitor, + discLogger, + plugin.IsDebug(), + kownPeers.Length > 0 + ); + + //Discovery manager needs to be scheduled for background work to run the discovery loop + _ = plugin.ObserveWork(PeerDiscovery, 10); + } + + private void ConfigureCacheListener() + { + /* + * Allow loading external managed dll for a bucket-local memory manager + */ + ICacheMemoryManagerFactory manager; + + if (string.IsNullOrWhiteSpace(MemoryConfiguration.ExternLibPath)) + { + //Get the memory manager + manager = plugin.GetOrCreateSingleton(); + } + else + { + manager = plugin.CreateServiceExternal(MemoryConfiguration.ExternLibPath); + } + + _cacheMemManager = manager; + + CacheListenerPubQueue queue = new(plugin, PeerEventQueue); + + //Must register background worker to listen for changes + _ = plugin.ObserveWork(queue, 150); + + //Endpoint only allows for a single reader + Listener = new( + plugin.LoadMemoryCacheSystem(config, manager, MemoryConfiguration), + queue, + plugin.Log.CreateScope(CacheConstants.LogScopes.BlobCacheListener), + new SharedHeapFBMMemoryManager(SharedCacheHeap) + ); + + InternalStore = new CacheStore(Listener.Cache); + } + + private void LogMemConfiguration() + { + const string CacheConfigTemplate = +@" +Cache Configuration: + Max memory: {max} Mb + Buckets: {bc} + Entries per-bucket: {mc} + HeapTracking: {ht} +"; + + CacheMemoryConfiguration cacheConf = MemoryConfiguration; + + //calculate the max memory usage + ulong maxByteSize = cacheConf.MaxCacheEntries * (ulong)cacheConf.BucketCount * (ulong)cacheConf.MaxMessageSize; + + //Log the cache config + plugin.Log.Information( + CacheConfigTemplate, + maxByteSize / (1024 * 1000), + cacheConf.BucketCount, + cacheConf.MaxCacheEntries, + plugin.IsDebug() + ); + } + + public void LogMemoryStats() + { + if(SharedCacheHeap is TrackedHeapWrapper thw) + { + const string shStatTemplate = +@" VNCache shared heap stats: + Current: {cur}kB + Blocks: {blks} + Max size: {max}kB +"; + HeapStatistics stats = thw.GetCurrentStats(); + plugin.Log.Debug( + shStatTemplate, + stats.AllocatedBytes / 1024, + stats.AllocatedBlocks, + stats.MaxHeapSize / 1024 + ); + + } + + //Also print logs for the bucket local managers if they are enabled + if(_cacheMemManager is BucketLocalManagerFactory blmf) + { + blmf.LogHeapStats(); + } + } + + private sealed class ErrorHandler(ILogProvider Logger) : ICacheDiscoveryErrorHandler + { + public void OnDiscoveryError(CacheNodeAdvertisment errorNode, Exception ex) + => LogError(ex, errorNode.NodeId, errorNode.ConnectEndpoint); + + public void OnDiscoveryError(Uri errorAddress, Exception ex) + => LogError(ex, null, errorAddress); + + private void LogError(Exception ex, string? nodId, Uri? connectAddress) + { + //For logging purposes, use the node id if its available, otherwise use the address + if(nodId == null && connectAddress != null) + { + nodId = connectAddress.ToString(); + } + + if (ex is HttpRequestException hre) + { + if (hre.InnerException is SocketException se) + { + //transport failed + Logger.Warn("Failed to connect to server {serv} because {err}", nodId, se.Message); + } + else + { + Logger.Error("Failed to connect to node {n}\n{err}", nodId, hre); + } + } + if (ex is OperationCanceledException) + { + Logger.Error("Failed to discover nodes from nodeid {nid}, because the operation was canceled"); + } + else if (ex is TimeoutException) + { + //Only log exception stack when in debug logging mode + Logger.Warn("Failed to discover nodes from nodeid {nid}, because a timeout occured", nodId); + } + else + { + //Only log exception stack when in debug logging mode + if (Logger.IsEnabled(LogLevel.Debug)) + { + Logger.Error("Failed to discover nodes from nodeid {nid}, with error\n{err}", nodId, ex); + } + else + { + Logger.Error("Failed to discover nodes from nodeid {nid}, with error: {err}", nodId, ex.Message); + } + } + } + } + + internal sealed class CacheStore(IBlobCacheTable table) : ICacheStore + { + + /// + ValueTask ICacheStore.AddOrUpdateBlobAsync(string objectId, string? alternateId, ObjectDataGet bodyData, T state, CancellationToken token) + { + return table.AddOrUpdateObjectAsync(objectId, alternateId, bodyData, state, default, token); + } + + /// + void ICacheStore.Clear() + { + throw new NotImplementedException(); + } + + /// + ValueTask ICacheStore.DeleteItemAsync(string id, CancellationToken token) + { + return table.DeleteObjectAsync(id, token); + } + } + } +} diff --git a/plugins/ObjectCacheServer/src/ServerClusterConfig.cs b/plugins/ObjectCacheServer/src/ServerClusterConfig.cs new file mode 100644 index 0000000..8e098cd --- /dev/null +++ b/plugins/ObjectCacheServer/src/ServerClusterConfig.cs @@ -0,0 +1,152 @@ +/* +* Copyright (c) 2024 Vaughn Nugent +* +* Library: VNLib +* Package: ObjectCacheServer +* File: ServerClusterConfig.cs +* +* ServerClusterConfig.cs is part of ObjectCacheServer which is part of the larger +* VNLib collection of libraries and utilities. +* +* ObjectCacheServer is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* ObjectCacheServer is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see https://www.gnu.org/licenses/. +*/ + +using System; +using System.Net; +using System.Linq; +using System.Text.Json; +using System.Collections.Generic; + +using VNLib.Plugins; +using VNLib.Utils.Logging; +using VNLib.Utils.Extensions; +using VNLib.Plugins.Extensions.Loading; +using VNLib.Data.Caching.Extensions.Clustering; + +namespace VNLib.Data.Caching.ObjectCache.Server +{ + [ConfigurationName("cluster")] + internal sealed class ServerClusterConfig(PluginBase plugin, IConfigScope config) + { + public TimeSpan DiscoveryInterval { get; } = config.GetRequiredProperty("discovery_interval_sec", p => p.GetTimeSpan(TimeParseType.Seconds)); + + public TimeSpan EventQueuePurgeInterval { get; } = config.GetRequiredProperty("queue_purge_interval_sec", p => p.GetTimeSpan(TimeParseType.Seconds)); + + public int MaxQueueDepth { get; } = (int)config.GetRequiredProperty("max_queue_depth", p => p.GetUInt32()); + + public string? DiscoveryPath { get; } = config.GetValueOrDefault("discovery_path", p => p.GetString(), null); + + public string ConnectPath { get; } = config.GetRequiredProperty("connect_path", p => p.GetString()!); + + public string WellKnownPath { get; } = config.GetValueOrDefault("well_known_path", p => p.GetString()!, CacheConstants.DefaultWellKnownPath) + ?? CacheConstants.DefaultWellKnownPath; + + public bool VerifyIp { get; } = config.GetRequiredProperty("verify_ip", p => p.GetBoolean()); + + /// + /// The maximum number of peer connections to allow + /// + public uint MaxPeerConnections { get; } = config.GetValueOrDefault("max_peers", p => p.GetUInt32(), 10u); + + /// + /// The maxium number of concurrent client connections to allow + /// before rejecting new connections + /// + public uint MaxConcurrentConnections { get; } = config.GetValueOrDefault("max_concurrent_connections", p => p.GetUInt32(), 100u); + + const string CacheConfigTemplate = +@" +Cluster Configuration: + Node Id: {id} + TlsEndabled: {tls} + Verify Ip: {vi} + Well-Known: {wk} + Cache Endpoint: {ep} + Discovery Endpoint: {dep} + Discovery Interval: {di} + Max Peer Connections: {mpc} + Max Queue Depth: {mqd} + Event Queue Purge Interval: {eqpi} +"; + + internal CacheNodeConfiguration BuildNodeConfig() + { + CacheNodeConfiguration conf = new(); + + //Get the port of the primary webserver + int port; + bool usingTls; + { + //Get the port number of the first virtual host + JsonElement firstHost = plugin.HostConfig.GetProperty("virtual_hosts") + .EnumerateArray() + .First(); + + port = firstHost.GetProperty("interface") + .GetProperty("port") + .GetInt32(); + + //If the ssl element is present, ssl is enabled for the server + usingTls = firstHost.TryGetProperty("ssl", out _); + } + string hostname = Dns.GetHostName(); + + //Server id is just dns name for now + string nodeId = $"{hostname}:{port}"; + + Uri connectEp = BuildUri(usingTls, hostname, port, ConnectPath); + Uri? discoveryEp = null; + + + conf.WithCacheEndpoint(connectEp) + .WithNodeId(nodeId) + .WithTls(usingTls); + + //Get the discovery path (optional) + if (!string.IsNullOrWhiteSpace(DiscoveryPath)) + { + //Build the discovery endpoint, it must be an absolute uri + discoveryEp = BuildUri(usingTls, hostname, port, DiscoveryPath); + conf.EnableAdvertisment(discoveryEp); + } + + //print the cluster configuration to the log + plugin.Log.Information(CacheConfigTemplate, + nodeId, + usingTls, + VerifyIp, + WellKnownPath, + connectEp, + discoveryEp, + DiscoveryInterval, + MaxPeerConnections, + MaxQueueDepth, + EventQueuePurgeInterval + ); + + return conf; + } + + private static Uri BuildUri(bool tls, string host, int port, string path) + { + return new UriBuilder + { + Scheme = tls ? "https" : "http", + Host = host, + Port = port, + Path = path + }.Uri; + } + } +} -- cgit