aboutsummaryrefslogtreecommitdiff
path: root/plugins/ObjectCacheServer
diff options
context:
space:
mode:
authorLibravatar vnugent <public@vaughnnugent.com>2024-03-10 16:46:50 -0400
committerLibravatar vnugent <public@vaughnnugent.com>2024-03-10 16:46:50 -0400
commite5bb0ee302e789cb96e7ecfe839cbbcc8e3fd5d7 (patch)
tree6f4d824eaea0f4c433f98c0685bf66c06b30e16a /plugins/ObjectCacheServer
parent6b87785026ca57d6f41cff87ddbd066362f3cacc (diff)
Squashed commit of the following:
commit 2f7565976472f0f056db60520bf253a776112c10 Merge: 323ff67 6b87785 Author: vnugent <public@vaughnnugent.com> Date: Sun Mar 10 16:45:23 2024 -0400 merge master commit 323ff67badfc46ad638d75f059d60d9425ccb2fa Author: vnugent <public@vaughnnugent.com> Date: Sun Mar 10 15:50:07 2024 -0400 ci(server): Conainerize and add vncache server packages commit 5d4192880654fd6e00e587814169415b42621327 Author: vnugent <public@vaughnnugent.com> Date: Sat Mar 9 19:13:21 2024 -0500 chore: #2 Minor fixes and polish before release commit a4b3504bb891829074d1efde0433eae010862181 Author: vnugent <public@vaughnnugent.com> Date: Sat Mar 9 16:30:44 2024 -0500 package updates commit 4d8cfc10382105b0acbd94df93ad3d05ff91db54 Author: vnugent <public@vaughnnugent.com> Date: Wed Mar 6 21:30:58 2024 -0500 refactor: #2 Centralize server state, default discovery endpoints & more commit 016a96a80cce025a86c6cf26707738f6a2eb2658 Author: vnugent <public@vaughnnugent.com> Date: Thu Feb 29 21:22:38 2024 -0500 feat: add future support for memory diagnostics, and some docs commit 456ead9bc8b0f61357bae93152ad0403c4940101 Author: vnugent <public@vaughnnugent.com> Date: Tue Feb 13 14:46:35 2024 -0500 fix: #1 shared cluster index on linux & latested core updates commit a481d63f964a5d5204cac2e95141f37f9a28d573 Author: vnugent <public@vaughnnugent.com> Date: Tue Jan 23 15:43:50 2024 -0500 cache extension api tweaks
Diffstat (limited to 'plugins/ObjectCacheServer')
-rw-r--r--plugins/ObjectCacheServer/Taskfile.yaml88
-rw-r--r--plugins/ObjectCacheServer/server/config/config.json104
-rw-r--r--plugins/ObjectCacheServer/server/container/Dockerfile82
-rw-r--r--plugins/ObjectCacheServer/server/container/Taskfile.yaml80
-rw-r--r--plugins/ObjectCacheServer/server/container/config-templates/ObjectCacheServer-template.json54
-rw-r--r--plugins/ObjectCacheServer/server/container/config-templates/config-template.json105
-rw-r--r--plugins/ObjectCacheServer/server/container/docker-compose.yaml45
-rw-r--r--plugins/ObjectCacheServer/server/container/run.sh15
-rw-r--r--plugins/ObjectCacheServer/server/install.ps126
-rw-r--r--plugins/ObjectCacheServer/server/install.taskfile.yaml20
-rw-r--r--plugins/ObjectCacheServer/server/taskfile.yaml193
-rw-r--r--plugins/ObjectCacheServer/src/BucketLocalManagerFactory.cs53
-rw-r--r--plugins/ObjectCacheServer/src/Cache/CacheListenerPubQueue.cs31
-rw-r--r--plugins/ObjectCacheServer/src/Cache/CacheMemoryConfiguration.cs (renamed from plugins/ObjectCacheServer/src/Cache/CacheConfiguration.cs)15
-rw-r--r--plugins/ObjectCacheServer/src/Cache/CacheStore.cs131
-rw-r--r--plugins/ObjectCacheServer/src/Cache/CacheSystemUtil.cs8
-rw-r--r--plugins/ObjectCacheServer/src/Cache/PeerEventQueueManager.cs (renamed from plugins/ObjectCacheServer/src/Cache/CacheEventQueueManager.cs)132
-rw-r--r--plugins/ObjectCacheServer/src/CacheAuthKeyStore.cs14
-rw-r--r--plugins/ObjectCacheServer/src/CacheConstants.cs107
-rw-r--r--plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs85
-rw-r--r--plugins/ObjectCacheServer/src/Clustering/CachePeerMonitor.cs11
-rw-r--r--plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs130
-rw-r--r--plugins/ObjectCacheServer/src/Endpoints/CacheNegotationManager.cs53
-rw-r--r--plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs94
-rw-r--r--plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs58
-rw-r--r--plugins/ObjectCacheServer/src/Endpoints/WellKnownEndpoint.cs10
-rw-r--r--plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs71
-rw-r--r--plugins/ObjectCacheServer/src/ObjectCacheSystemState.cs331
-rw-r--r--plugins/ObjectCacheServer/src/ServerClusterConfig.cs (renamed from plugins/ObjectCacheServer/src/NodeConfig.cs)139
29 files changed, 1651 insertions, 634 deletions
diff --git a/plugins/ObjectCacheServer/Taskfile.yaml b/plugins/ObjectCacheServer/Taskfile.yaml
new file mode 100644
index 0000000..a339359
--- /dev/null
+++ b/plugins/ObjectCacheServer/Taskfile.yaml
@@ -0,0 +1,88 @@
+# https://taskfile.dev
+
+#Called by the vnbuild system to produce builds for my website
+#https://www.vaughnnugent.com/resources/software
+
+#This taskfile is called from the root of a project that is being built
+#and the purpose of this taskfile is to package up the output of a build
+#from the solution file, and package it up into a tgz files for distribution
+
+version: '3'
+
+vars:
+ TARGET: '{{.USER_WORKING_DIR}}/bin'
+ RELEASE_DIR: "./bin/release/{{.TARGET_FRAMEWORK}}/publish"
+ SOURCE_OUT: "{{.USER_WORKING_DIR}}/bin/source"
+
+includes:
+ ci:
+ taskfile: server/taskfile.yaml
+ dir: server/ #must execute from the server directory
+ optional: true
+
+tasks:
+
+ #called by ci to build the output
+ build:
+ cmds:
+ - task: ci:build
+
+ #when build succeeds, archive the output into a tgz
+ postbuild_success:
+ dir: '{{.USER_WORKING_DIR}}'
+ cmds:
+ #pack up source code
+ - task: packsource
+
+ #run post in debug mode
+ - task: postbuild
+ vars: { BUILD_MODE: debug }
+
+ #remove uncessary files from the release dir
+ - powershell -Command "Get-ChildItem -Recurse '{{.RELEASE_DIR}}/' -Include *.pdb,*.xml | Remove-Item"
+
+ #run post in release mode
+ - task: postbuild
+ vars: { BUILD_MODE: release }
+
+ - task: ci:postbuild_success
+
+
+ postbuild_failed:
+ dir: '{{.USER_WORKING_DIR}}'
+ cmds:
+ - echo "postbuild failed {{.PROJECT_NAME}}"
+
+
+ postbuild:
+ dir: '{{.USER_WORKING_DIR}}'
+ internal: true
+ vars:
+ #the build output directory
+ BUILD_OUT: "{{.USER_WORKING_DIR}}/bin/{{.BUILD_MODE}}/{{.TARGET_FRAMEWORK}}/publish"
+
+ cmds:
+
+ #copy license and readme to target
+ - cd .. && powershell -Command "Copy-Item -Path ./build.readme.md -Destination '{{.BUILD_OUT}}/readme.md'"
+
+ #tar outputs
+ - cd "{{.BUILD_OUT}}" && tar -czf "{{.TARGET}}/{{.BUILD_MODE}}.tgz" .
+
+ packsource:
+ dir: '{{.USER_WORKING_DIR}}'
+ internal: true
+ cmds:
+ #copy source code to target
+ - powershell -Command "Get-ChildItem -Include *.cs,*.csproj -Recurse | Where { \$_.FullName -notlike '*\obj\*' -and \$_.FullName -notlike '*\bin\*' } | Resolve-Path -Relative | tar --files-from - -czf '{{.TARGET}}/src.tgz'"
+
+
+#Remove the output dirs on clean
+ clean:
+ dir: '{{.USER_WORKING_DIR}}'
+ cmds:
+ - for: [ 'bin/', 'obj/' ]
+ cmd: powershell Remove-Item -Recurse '{{.ITEM}}'
+ ignore_error: true
+
+ - task: ci:clean
diff --git a/plugins/ObjectCacheServer/server/config/config.json b/plugins/ObjectCacheServer/server/config/config.json
new file mode 100644
index 0000000..1f8a382
--- /dev/null
+++ b/plugins/ObjectCacheServer/server/config/config.json
@@ -0,0 +1,104 @@
+{
+
+ //Host application config, config is loaded as a read-only DOM that is available
+ //to the host and loaded child plugins, all elements are available to plugins via the 'HostConfig' property
+
+ "http": {
+ //The defaut HTTP version to being requests with (does not support http/2 yet)
+ "default_version": "HTTP/1.1",
+ //The maxium size (in bytes) of response messges that will be compressed
+ "compression_limit": 10000,
+ //Minium response size (in bytes) to compress
+ "compression_minimum": 2048,
+ //The size of the buffer to use when parsing multipart/form data uploads
+ "multipart_max_buf_size": 1024,
+ //The maxium ammount of data (in bytes) allows for mulitpart/form data file uploads
+ "multipart_max_size": 0,
+ //Absolute maximum size (in bytes) of the request entity body (exludes headers)
+ "max_entity_size": 10240,
+ //Keepalive ms for HTTP1.1 keepalive connections
+ "keepalive_ms": 100000,
+ //The buffer size to use when parsing headers (also the maxium request header size allowed)
+ "header_buf_size": 8128,
+ //The maxium number of headers allowed in an HTTP request message
+ "max_request_header_count": 50,
+ //The maxium number of allowed network connections, before 503s will be issued automatically and connections closed
+ "max_connections": 5000,
+ //The size in bytes of the buffer to use when writing response messages
+ "response_buf_size": 4096,
+ //time (in ms) to wait for a response from an active connection in recv mode, before dropping it
+ "recv_timeout_ms": 5000,
+ //Time in ms to wait for the client to accept transport data before terminating the connection
+ "send_timeout_ms": 60000,
+ //The size (in bytes) of the buffer used to store all response header data
+ "response_header_buf_size": 16384,
+ //Max number of file uploads allowed per request
+ "max_uploads_per_request": 1
+ },
+
+ //Maxium ammount of time a request is allowed to be processed (includes loading or waiting for sessions) before operations will be cancelled and a 503 returned
+ "max_execution_time_ms": 20000,
+
+ "virtual_hosts": [
+ {
+ "interface": {
+ "address": "0.0.0.0",
+ "port": 2557
+ },
+
+ //Collection of "trusted" servers to allow proxy header support from
+ "downstream_servers": [],
+
+ //The hostname to listen for, "*" as wildcard, and "[system]" as the default hostname for the current machine
+ "hostname": "*",
+ "path": "root/", //Point to some place we can read nothing from
+
+ "deny_extensions": [ ],
+ "default_files": [ ],
+ "error_files": [],
+ "cache_default_sec": 864000,
+
+ "DISABLED ssl": {}
+ }
+ ],
+
+
+ //Defines the directory where plugin's are to be loaded from
+ "plugins": {
+ //Hot-reload creates collectable assemblies that allow full re-load support in the host application, should only be used for development purposes!
+ "hot_reload": false,
+ "path": "plugins/",
+ "config_dir": "config/",
+ "assets": "plugins/assets/"
+ },
+
+ "sys_log": {
+ "path": "data/logs/sys-log.txt",
+ "flush_sec": 5,
+ "retained_files": 31,
+ "file_size_limit": 10485760,
+ "interval": "infinite"
+ },
+
+ "app_log": {
+ "path": "data/logs/app-log.txt",
+ "flush_sec": 5,
+ "retained_files": 31,
+ "file_size_limit": 10485760,
+ "interval": "infinite"
+ },
+
+ //HASHICORP VAULT
+ "hashicorp_vault": {
+ "url": "",
+ "token": "",
+ "trust_cert": false
+ },
+
+ "secrets": {
+ //Special key used by the loading library for access to the PasswordHashing library to pepper password hashes
+ "cache_private_key": "",
+ "client_public_key": ""
+ }
+}
+
diff --git a/plugins/ObjectCacheServer/server/container/Dockerfile b/plugins/ObjectCacheServer/server/container/Dockerfile
new file mode 100644
index 0000000..6c466d4
--- /dev/null
+++ b/plugins/ObjectCacheServer/server/container/Dockerfile
@@ -0,0 +1,82 @@
+#Copyright (c) Vaughn Nugent
+#Licensed under the GNU AGPL V3.0
+
+#use plain alpine latest to build native libraries in
+FROM alpine:3.19 as native-cont
+
+#install public libs and build tools
+RUN apk update && apk add build-base cmake npm
+#most universal way to use Task is from NPM
+RUN npm install -g @go-task/cli
+
+WORKDIR /build
+
+#include local artifacts
+COPY app/ .
+
+#build internal libraries and copy the libraries to the /lib output directory
+RUN mkdir out/
+RUN task build-libs
+
+#APP CONTAINER
+#move into a clean dotnet apline lean image
+FROM mcr.microsoft.com/dotnet/runtime:8.0.2-alpine3.19-amd64 as app-cont
+
+LABEL name="vnuge/vncache"
+LABEL maintainer="Vaughn Nugent <vnpublic@proton.me>"
+LABEL description="A simple clustered network data caching service"
+
+#copy local artifacts again in run container
+COPY app/ /app
+
+#pull compiled libs from build container
+COPY --from=native-cont /build/out /app/lib
+
+RUN apk update && apk add --no-cache gettext icu-libs dumb-init
+
+#workdir
+WORKDIR /app
+
+#default to 2557 for cache port
+EXPOSE 2557/tcp
+
+VOLUME /app/ssl
+#expose an assets directory for custom assets install
+VOLUME /app/usr/assets
+
+#disable dotnet invariant culture on alpine
+ENV DOTNET_SYSTEM_GLOBALIZATION_INVARIANT=0
+
+#add helper/required libraries
+#ENV VNLIB_SHARED_HEAP_FILE_PATH=/app/lib/libvn_rpmalloc.so not ready yet, still need to debug
+
+#cache varables
+ENV MAX_ENTRIES=10000
+ENV CACHE_BUCKETS=100
+ENV CACHE_MAX_MESSAGE=20480
+ENV MAX_CONCURRENT_CONNECTIONS=1000
+
+ENV VERIFY_IP=true
+ENV MAX_PEER_NODES=10
+ENV DISCOVERY_INTERVAL=360
+ENV CACHE_CONNECT_PATH="/cache"
+ENV DISCOVER_PATH="/discover"
+ENV KNOWN_PEERS=[]
+
+#HC Vault
+ENV HC_VAULT_ADDR=""
+ENV HC_VAULT_TOKEN=""
+ENV HC_VAULT_TRUST_CERT=false
+
+#SECRETS
+ENV CACHE_PRIV_KEY=""
+ENV CLIENT_PUB_KEY=""
+
+#HTTP/PROXY Config
+ENV HTTP_DOWNSTREAM_SERVERS=[]
+ENV HTTP_MAX_CONNS=5000
+
+#run the init script within dumb-init
+ENTRYPOINT ["dumb-init", "--"]
+CMD ["ash", "./run.sh"]
+
diff --git a/plugins/ObjectCacheServer/server/container/Taskfile.yaml b/plugins/ObjectCacheServer/server/container/Taskfile.yaml
new file mode 100644
index 0000000..10ee86b
--- /dev/null
+++ b/plugins/ObjectCacheServer/server/container/Taskfile.yaml
@@ -0,0 +1,80 @@
+# https://taskfile.dev
+
+#inlcuded by the ci main taskfile to produce containerized builds, and also
+#be included by the container itself to run build tasks inside the container
+
+version: "3"
+
+vars:
+ INCLUDE_FILES: "Dockerfile, docker-compose.yaml"
+
+includes:
+ install:
+ taskfile: ../install.taskfile.yaml
+ optional: true #not needed for inside container build
+
+tasks:
+ #called from inside the container to build native libraries
+ build-libs:
+ vars:
+ OUT_DIR: "{{.USER_WORKING_DIR}}/out"
+
+ #build stage generates the following libraries
+ generates:
+ - "{{.USER_WORKING_DIR}}/out/libvn_rpmalloc.so"
+
+ cmds:
+ #build rpmalloc library
+ - cd lib/vnlib_rpmalloc/ && task && cp build/libvn_rpmalloc.so {{.OUT_DIR}}/libvn_rpmalloc.so
+
+ #called from ci pipline to build the package
+ build:
+ cmds:
+ # clean up the run.sh script to remove windows line endings in my wsl default instance
+ - cmd: wsl dos2unix ./run.sh
+ platforms: [ windows/amd64 ]
+
+ #init build image
+ - task: setup-container-image
+
+ #remove the default config file as it's not needed in the container
+ - powershell -Command "rm -Force -Recurse build/app/config/"
+
+ #install rpmalloc
+ - task: install-rpmalloc-lib
+
+ postbuild_success:
+ cmds:
+ #tar up the build directory and move it to the output bin directory
+ - cmd: cd build/ && tar -czf '{{ .BINARY_DIR }}/{{.PACKAGE_FILE_NAME}}' .
+ #clean up all the build files after build succeeds
+ - task: clean
+
+ clean:
+ ignore_error: true
+ cmds:
+ - cmd: powershell -Command "rm -Recurse -Force ./build"
+
+ install-rpmalloc-lib:
+ internal: true
+ cmds:
+ #install compressor plugin
+ - task: install:install
+ vars:
+ PROJECT_NAME: 'vnlib_rpmalloc'
+ MODULE_NAME: "VNLib.Core"
+ FILE_NAME: "src.tgz"
+ DIR: './build/app/lib/vnlib_rpmalloc'
+
+ setup-container-image:
+ internal: true
+ cmds:
+ #make build directory
+ - powershell -Command "mkdir build, build/app, build/app/config-templates/, build/app/static/ -Force"
+ #copy the existing linux-x64 build to the build folder, this will be the container base
+ - powershell -Command "cp -Recurse -Force ../build/linux-x64/* build/app/"
+ #copy local scripts and config data into the build folder
+ - powershell -Command "cp -Force run.sh, Taskfile.yaml build/app/"
+ - powershell -Command "cp -Force Dockerfile, docker-compose.yaml build/"
+ - powershell -Command "cp -Force static/* build/app/static/"
+ - powershell -Command "cp -Force config-templates/* build/app/config-templates/"
diff --git a/plugins/ObjectCacheServer/server/container/config-templates/ObjectCacheServer-template.json b/plugins/ObjectCacheServer/server/container/config-templates/ObjectCacheServer-template.json
new file mode 100644
index 0000000..765c3d7
--- /dev/null
+++ b/plugins/ObjectCacheServer/server/container/config-templates/ObjectCacheServer-template.json
@@ -0,0 +1,54 @@
+{
+ "debug": false,
+
+ //enables cache server cluster node data
+ "cluster": {
+ //Delay to re-discover peers
+ "discovery_interval_sec": ${DISCOVERY_INTERVAL},
+
+ //The maxium number of peers to connect to
+ "max_peers": ${MAX_PEER_NODES},
+
+ //Max ev queue depth before LRU eviction
+ "max_queue_depth": 10000,
+
+ //Time between queue purge
+ "queue_purge_interval_sec": 360000,
+
+ //Forces strict ip address verification on upgrades (best to leave on)
+ "verify_ip": ${VERIFY_IP},
+
+ //The cache websocket endpoint path
+ "connect_path": "${CACHE_CONNECT_PATH}",
+
+ //Optional to allow nodes to discover nodes we adverties
+ "discovery_path": "${DISCOVER_PATH}",
+
+ //Optionally change the well-known path (clients must know this)
+ "well_known_path": null,
+
+ //The maxium number of connections to this node
+ "max_concurrent_connections": ${MAX_CONCURRENT_CONNECTIONS}
+ },
+
+ //Cache configuration object, FBM protocol variables
+ "cache": {
+
+ //Max number of cache entires to be stored
+ "max_cache": ${MAX_ENTRIES},
+
+ //the number of cache buckets to distribute load
+ "buckets": ${CACHE_BUCKETS},
+
+ //FBM buffer config
+ "buffer_recv_max": ${CACHE_MAX_MESSAGE}, //Up to 100Kb transfer buffer
+ "buffer_recv_min": 8192, //min of 8k transfer buffer
+ "buffer_header_max": 2048, //2k max header buffer size
+ "buffer_header_min": 128, //128 byte min request header buffer size
+ "max_message_size": ${CACHE_MAX_MESSAGE} //Absolute maxium message size allowed, also the maxium size of cache entires
+ },
+
+ //Known peers array, must point to well-known endpoint for discovery
+ "known_peers": ${KNOWN_PEERS}
+
+} \ No newline at end of file
diff --git a/plugins/ObjectCacheServer/server/container/config-templates/config-template.json b/plugins/ObjectCacheServer/server/container/config-templates/config-template.json
new file mode 100644
index 0000000..6362432
--- /dev/null
+++ b/plugins/ObjectCacheServer/server/container/config-templates/config-template.json
@@ -0,0 +1,105 @@
+{
+
+ //Host application config, config is loaded as a read-only DOM that is available
+ //to the host and loaded child plugins, all elements are available to plugins via the 'HostConfig' property
+
+ "http": {
+ //The defaut HTTP version to being requests with (does not support http/2 yet)
+ "default_version": "HTTP/1.1",
+ //The maxium size (in bytes) of response messges that will be compressed
+ "compression_limit": 10000,
+ //Minium response size (in bytes) to compress
+ "compression_minimum": 2048,
+ //The size of the buffer to use when parsing multipart/form data uploads
+ "multipart_max_buf_size": 1024,
+ //The maxium ammount of data (in bytes) allows for mulitpart/form data file uploads
+ "multipart_max_size": 0,
+ //Absolute maximum size (in bytes) of the request entity body (exludes headers)
+ "max_entity_size": 10240,
+ //Keepalive ms for HTTP1.1 keepalive connections
+ "keepalive_ms": 100000,
+ //The buffer size to use when parsing headers (also the maxium request header size allowed)
+ "header_buf_size": 8128,
+ //The maxium number of headers allowed in an HTTP request message
+ "max_request_header_count": 50,
+ //The maxium number of allowed network connections, before 503s will be issued automatically and connections closed
+ "max_connections": ${HTTP_MAX_CONNS},
+ //The size in bytes of the buffer to use when writing response messages
+ "response_buf_size": 4096,
+ //time (in ms) to wait for a response from an active connection in recv mode, before dropping it
+ "recv_timeout_ms": 5000,
+ //Time in ms to wait for the client to accept transport data before terminating the connection
+ "send_timeout_ms": 60000,
+ //The size (in bytes) of the buffer used to store all response header data
+ "response_header_buf_size": 16384,
+ //Max number of file uploads allowed per request
+ "max_uploads_per_request": 1
+ },
+
+ //Maxium ammount of time a request is allowed to be processed (includes loading or waiting for sessions) before operations will be cancelled and a 503 returned
+ "max_execution_time_ms": 20000,
+
+ "virtual_hosts": [
+ {
+ "interface": {
+ "address": "0.0.0.0",
+ "port": 2557
+ },
+
+ //Collection of "trusted" servers to allow proxy header support from
+ "downstream_servers": ${HTTP_DOWNSTREAM_SERVERS},
+
+ //The hostname to listen for, "*" as wildcard, and "[system]" as the default hostname for the current machine
+ "hostname": "*",
+ "path": "root/",
+
+ "deny_extensions": [ ],
+ "default_files": [ ],
+ "error_files": [],
+ "cache_default_sec": 864000,
+
+ //Disabled until well-tested
+ //"ssl": ${SSL_JSON}
+ }
+ ],
+
+
+ //Defines the directory where plugin's are to be loaded from
+ "plugins": {
+ //Hot-reload creates collectable assemblies that allow full re-load support in the host application, should only be used for development purposes!
+ "hot_reload": false,
+ "path": "plugins/",
+ "config_dir": "config/",
+ "assets": "plugins/assets/"
+ },
+
+ "sys_log": {
+ "path": "data/logs/sys-log.txt",
+ "flush_sec": 5,
+ "retained_files": 31,
+ "file_size_limit": 10485760,
+ "interval": "infinite"
+ },
+
+ "app_log": {
+ "path": "data/logs/app-log.txt",
+ "flush_sec": 5,
+ "retained_files": 31,
+ "file_size_limit": 10485760,
+ "interval": "infinite"
+ },
+
+ //HASHICORP VAULT
+ "hashicorp_vault": {
+ "url": "${HC_VAULT_ADDR}",
+ "token": "${HC_VAULT_TOKEN}",
+ "trust_cert": ${HC_VAULT_TRUST_CERT}
+ },
+
+ "secrets": {
+ //Special key used by the loading library for access to the PasswordHashing library to pepper password hashes
+ "cache_private_key": "${CACHE_PRIV_KEY}",
+ "client_public_key": "${CLIENT_PUB_KEY}"
+ }
+}
+
diff --git a/plugins/ObjectCacheServer/server/container/docker-compose.yaml b/plugins/ObjectCacheServer/server/container/docker-compose.yaml
new file mode 100644
index 0000000..c1b61fa
--- /dev/null
+++ b/plugins/ObjectCacheServer/server/container/docker-compose.yaml
@@ -0,0 +1,45 @@
+#Copyright (c) Vaughn Nugent
+#Licensed under the GNU AGPLv3
+
+version: '3.6'
+
+services:
+ vncache:
+ image: vnuge/vncache
+ container_name: vncache
+ restart: unless-stopped
+ hostname: vncache-server
+ volumes:
+ - ./assets:/app/usr/assets:ro #optional if assets are required
+ - ./ssl:/app/ssl:ro #optional only if SSL is enabled (currently not a feature)
+ ports:
+ - 2557:2557
+ environment:
+ #System memory consumption is calculated as follows:
+ # MAX_ENTIRES x CACHE_BUCKETS x CACHE_MAX_MESSAGE = max memory consumption
+
+ MAX_CONCURRENT_CONNECTIONS: "1000" #max number of concurrent connections
+ MAX_ENTRIES: "10000" #max number of cache entries per bucket
+ CACHE_BUCKETS: "100" #number of cache buckets for load balancing
+ CACHE_MAX_MESSAGE: "20480" #20KB
+ VERIFY_IP: "true" #verfies the IP address of clients during negotiation (recommended)
+ MAX_PEER_NODES: "10" #max number of other peer nodes this node shoud connect to
+ DISCOVERY_INTERVAL: "360" #time (in seconds) between peer node discovery
+ KNOWN_PEERS: '[]' #array of known peer nodes in the cluster
+
+ #SECRETS (must be JWK formatted keys)
+ CACHE_PRIV_KEY: "" #REQUIRED local private key used to identify and sign messages to clients and other nodes
+ CLIENT_PUB_KEY: "" #REQUIRED used to verify client messages
+
+ #HC vault
+ #HC_VAULT_ADDR: ""
+ #HC_VAULT_TOKEN: ""
+ #HC_VAULT_TRUST_CERT: "false"
+
+ #HTTP
+ #HTTP_DOWNSTREAM_SERVERS: '[]'
+ #SSL_JSON: '{"cert": "ssl/cert.pem", "privkey":"ssl/priv.pem"}'
+ HTTP_MAX_CONNS: "5000"
+
+ SERVER_ARGS: "--input-off"
+
diff --git a/plugins/ObjectCacheServer/server/container/run.sh b/plugins/ObjectCacheServer/server/container/run.sh
new file mode 100644
index 0000000..2c2636c
--- /dev/null
+++ b/plugins/ObjectCacheServer/server/container/run.sh
@@ -0,0 +1,15 @@
+#! /bin/sh
+
+#this script will be invoked by dumb-init in the container on statup and is located at /app
+
+rm -rf config && mkdir config
+
+#substitude all -template files in the config-templates dir and write them to the config dir
+for file in config-templates/*-template.json; do
+ envsubst < $file > config/$(basename $file -template.json).json
+done
+
+cp usr/assets/* plugins/assets/ -rf
+
+#start the server
+dotnet webserver/VNLib.WebServer.dll --config config/config.json $SERVER_ARGS \ No newline at end of file
diff --git a/plugins/ObjectCacheServer/server/install.ps1 b/plugins/ObjectCacheServer/server/install.ps1
new file mode 100644
index 0000000..4c42c18
--- /dev/null
+++ b/plugins/ObjectCacheServer/server/install.ps1
@@ -0,0 +1,26 @@
+param([String] $BaseUrl, [String] $ModuleName, [String] $ProjectName, [String]$FileName)
+
+#get the latest file
+Invoke-WebRequest "$BaseUrl/$ModuleName/@latest" -OutFile latest.txt
+#read the file into a variable
+$latest = Get-Content latest.txt
+
+#download the latest version
+Invoke-WebRequest "$BaseUrl/$ModuleName/$latest/$ProjectName/$FileName" -OutFile $FileName
+
+#download latest sha256
+Invoke-WebRequest "$BaseUrl/$ModuleName/$latest/$ProjectName/$FileName.sha256" -OutFile "$FileName.sha256"
+
+#verify the file
+$hash = (Get-FileHash $FileName -Algorithm SHA256).Hash
+
+#read the sha256 file
+$sha256 = Get-Content "$FileName.sha256"
+
+#compare the hashes
+if ($hash -eq $sha256) {
+ Write-Host "Hashes match, file is valid" -ForegroundColor Blue
+} else {
+ throw "Hashes do not match, file is invalid"
+}
+
diff --git a/plugins/ObjectCacheServer/server/install.taskfile.yaml b/plugins/ObjectCacheServer/server/install.taskfile.yaml
new file mode 100644
index 0000000..37baf12
--- /dev/null
+++ b/plugins/ObjectCacheServer/server/install.taskfile.yaml
@@ -0,0 +1,20 @@
+# https://taskfile.dev
+
+#Called by the vnbuild system to produce builds for my website
+#https://www.vaughnnugent.com/resources/software
+
+version: "3"
+
+tasks:
+
+ install:
+ internal: true
+ cmds:
+ #make the plugin directory
+ - cmd: powershell -Command "mkdir {{.DIR}} -Force"
+ ignore_error: true
+ - cmd: powershell -Command "pwd"
+ - cd {{.DIR}} && powershell "{{.SCRIPT_DIR}}/install.ps1" -BaseUrl {{.BUILDS_URL}} -ModuleName {{.MODULE_NAME}} -ProjectName {{.PROJECT_NAME}} -FileName {{.FILE_NAME}}
+ - cd {{.DIR}} && tar -xzf {{.FILE_NAME}}
+ #remove the archive file
+ - cd {{.DIR}} && powershell -Command "rm {{.FILE_NAME}}" \ No newline at end of file
diff --git a/plugins/ObjectCacheServer/server/taskfile.yaml b/plugins/ObjectCacheServer/server/taskfile.yaml
new file mode 100644
index 0000000..38eae79
--- /dev/null
+++ b/plugins/ObjectCacheServer/server/taskfile.yaml
@@ -0,0 +1,193 @@
+# https://taskfile.dev
+
+#Inlcuded taskfile for object cache server that is used to produce
+#ci builds for standalone caching servers
+
+version: "3"
+
+vars:
+ BUILDS_URL: https://www.vaughnnugent.com/public/resources/software/builds
+ SCRIPT_DIR: '{{.TASKFILE_DIR}}'
+ BINARY_DIR: '{{.PROJECT_DIR}}/bin' #binary dir is not available for dotnet plugis
+
+includes:
+ install:
+ taskfile: install.taskfile.yaml
+ optional: true
+
+ container:
+ dir: container #always run from the container directory
+ taskfile: container/Taskfile.yaml
+ optional: true
+ vars:
+ BUILDS_URL: '{{.BUILDS_URL}}'
+ PACKAGE_FILE_NAME: "vncache-alpine3.19-oci.tgz" #the name of the output package file
+
+tasks:
+# CLIENT-SIDE TASKS
+ default:
+ desc: "Runs the VNCache server"
+ cmds:
+ - task: run
+
+ run:
+ desc: "Runs the VNCache server"
+ silent: true
+ env:
+ #server should detect the file extension and load the correct library
+ VNLIB_SHARED_HEAP_FILE_PATH: lib/libvn_rpmalloc
+
+ cmds:
+ - cmd: dotnet webserver/VNLib.WebServer.dll --config config/config.json --input-off --inline-scheduler {{.ARGS}}
+ #setup sever environment
+
+
+ setup-debian:
+ desc: "Performs initial setup on Debian x64 based machines"
+ silent: true
+ cmds:
+ - apt update
+ - apt install -y dotnet-runtime-8.0 gcc cmake
+ - task: setup
+ - echo "Setup complete"
+
+ setup-fedora:
+ desc: "Performs initial setup on Fedora/Redhat x64 (dnf) based machines"
+ silent: true
+ cmds:
+ - dnf update
+ - dnf install -y dotnet-runtime-8.0 gcc cmake
+ - task: setup
+ - echo "Setup complete"
+
+ setup-alpine:
+ desc: "Performs initial setup on Alpine x64 based machines"
+ silent: true
+ cmds:
+ - apk update
+ - apk add --no-cache dotnet8-runtime gcc cmake
+ - task: setup
+ - echo "Setup complete"
+
+ setup:
+ cmds:
+ #build rpmalloc lib
+ - task: build-rpmalloc
+
+ build-rpmalloc:
+ internal: true
+ dir: 'lib/'
+ vars:
+ RPMALLOC_DIR: 'vnlib_rpmalloc'
+ cmds:
+ #build rpmalloc library
+ - cmd: cd vnlib_rpmalloc/ && task
+
+ - cmd: cp vnlib_rpmalloc/build/libvn_rpmalloc.so libvn_rpmalloc.so
+ platforms: [ linux ]
+
+ - cmd: cp vnlib_rpmalloc/build/libvn_rpmalloc.dylib libvn_rpmalloc.dylib
+ platforms: [ darwin ]
+
+ - cmd: powershell -Command "cp vnlib_rpmalloc/build/Release/vnlib_rpmalloc.dll libvn_rpmalloc.dll"
+ platforms: [ windows/amd64 ]
+
+# CI BUILD TASKS
+ build:
+ desc: "CI ONLY! DO NOT RUN"
+ cmds:
+ - task: install-plugins
+ - task: install-webserver
+
+ #run container build last
+ - task: container:build
+
+ install-webserver:
+ internal: true
+ cmds:
+ - for: [ win-x64, linux-x64, osx-x64, linux-arm64 ]
+ task: create-env
+ vars:
+ TARGET_OS: '{{.ITEM}}'
+
+ install-plugins:
+ internal: true
+ cmds:
+ - cmd: powershell -Command "mkdir lib -Force"
+ ignore_error: true
+
+ #copy the object-cache plugin output to the local plugins directory
+ - cmd: powershell -Command "cp -Recurse -Force {{.PROJECT_DIR}}/bin/Release/net8.0/publish/ plugins/{{.PROJECT_NAME}}/"
+
+ #download rpmalloc
+ - task: install:install
+ vars:
+ PROJECT_NAME: 'vnlib_rpmalloc'
+ MODULE_NAME: "VNLib.Core"
+ FILE_NAME: "src.tgz"
+ DIR: './lib/vnlib_rpmalloc'
+
+ postbuild_success:
+ desc: "CI ONLY! DO NOT RUN"
+ cmds:
+ - for: [ win-x64, linux-x64, osx-x64, linux-arm64 ]
+ task: pack
+ vars:
+ TARGET_OS: '{{.ITEM}}'
+
+ #cleanup unnecessary build files that clog up the pipeline
+ - for: [ build, plugins, lib ]
+ cmd: powershell -Command "rm -Recurse '{{.ITEM}}'"
+ ignore_error: true
+
+ - task: container:postbuild_success
+
+ build-container:
+ internal: true
+ cmds:
+ - task: container:build
+
+ #Creates a new webserver build environment for an operating system configuration
+ create-env:
+ internal: true
+ vars:
+ BUILD_DIR: './build/{{.TARGET_OS}}'
+ cmds:
+ #create dir for env
+ - cmd: powershell -Command "mkdir {{.BUILD_DIR}} -Force"
+ ignore_error: true
+
+ #copy build files
+ - for: [ plugins, lib, config, taskfile.yaml ]
+ cmd: powershell -Command "cp -Recurse -Force {{.ITEM}} {{.BUILD_DIR}}"
+
+ - task: get-webserver
+ vars:
+ TARGET_OS: '{{.TARGET_OS}}'
+ BUILD_DIR: '{{.BUILD_DIR}}'
+
+ #fetches a copy of (the desired os version) VNLib.WebServer project and installs it into the build directory
+ get-webserver:
+ internal: true
+ cmds:
+ - task: install:install
+ vars:
+ PROJECT_NAME: 'VNLib.Webserver'
+ MODULE_NAME: "VNLib.Webserver"
+ FILE_NAME: "{{.TARGET_OS}}-release.tgz"
+ DIR: '{{.BUILD_DIR}}/webserver'
+
+ pack:
+ internal: true
+ cmds:
+ - cd build/{{.TARGET_OS}} && tar -czf '{{ .BINARY_DIR }}/{{ .TARGET_OS }}-release.tgz' .
+
+ clean:
+ desc: "CI ONLY! DO NOT RUN"
+ ignore_error: true
+ cmds:
+ - for: [ build/, bin/, plugins/, lib/]
+ cmd: powershell -Command "rm -Recurse -Force '{{.ITEM}}'"
+
+ - task: container:clean
+
diff --git a/plugins/ObjectCacheServer/src/BucketLocalManagerFactory.cs b/plugins/ObjectCacheServer/src/BucketLocalManagerFactory.cs
index 40f4c29..6f733ed 100644
--- a/plugins/ObjectCacheServer/src/BucketLocalManagerFactory.cs
+++ b/plugins/ObjectCacheServer/src/BucketLocalManagerFactory.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2023 Vaughn Nugent
+* Copyright (c) 2024 Vaughn Nugent
*
* Library: VNLib
* Package: VNLib.Plugins.Extensions.VNCache
@@ -24,6 +24,7 @@
using System;
+using System.Linq;
using System.Buffers;
using System.Text.Json;
using System.Collections.Generic;
@@ -32,9 +33,10 @@ using System.Runtime.CompilerServices;
using VNLib.Plugins;
using VNLib.Utils;
using VNLib.Utils.Memory;
+using VNLib.Utils.Memory.Diagnostics;
+using VNLib.Utils.Logging;
using VNLib.Utils.Extensions;
using VNLib.Plugins.Extensions.Loading;
-
/*
* How bucket local memory works:
*
@@ -53,6 +55,8 @@ namespace VNLib.Data.Caching.ObjectCache.Server
{
private readonly LinkedList<BucketLocalManager> _managers = new ();
private readonly bool _zeroAll;
+ private readonly bool _enableHeapTracking;
+ private readonly ILogProvider _statsLogger;
///<inheritdoc/>
public ICacheEntryMemoryManager CreateForBucket(uint bucketId)
@@ -60,6 +64,13 @@ namespace VNLib.Data.Caching.ObjectCache.Server
//Init a new heap for the individual bucket
IUnmangedHeap localHeap = MemoryUtil.InitializeNewHeapForProcess();
+ if (_enableHeapTracking)
+ {
+#pragma warning disable CA2000 // Dispose objects before losing scope
+ localHeap = new TrackedHeapWrapper(localHeap, true);
+#pragma warning restore CA2000 // Dispose objects before losing scope
+ }
+
BucketLocalManager manager = new (localHeap, bucketId, _zeroAll);
_managers.AddLast(manager);
@@ -74,11 +85,13 @@ namespace VNLib.Data.Caching.ObjectCache.Server
if (config != null)
{
//Try to get the zero all flag
- if (config.TryGetValue("zero_all", out JsonElement zeroEl))
- {
- _zeroAll = zeroEl.GetBoolean();
- }
+ _zeroAll = config.TryGetValue("zero_all", out JsonElement zeroEl) && zeroEl.GetBoolean();
+
+ //Get the heap tracking flag
+ _enableHeapTracking = config.TryGetValue("diag_mem", out JsonElement trackEl) && trackEl.GetBoolean();
}
+
+ _statsLogger = plugin.Log.CreateScope("Cache MemStats");
}
protected override void Free()
@@ -90,6 +103,24 @@ namespace VNLib.Data.Caching.ObjectCache.Server
}
}
+ public void LogHeapStats()
+ {
+ //If tracking is not enabled, the heap instances stored by the managers will not be tracked, and the cast in the code below will fail
+ if (!_enableHeapTracking)
+ {
+ return;
+ }
+
+ string[] statsPerHeap = _managers.Select(hm =>
+ {
+ HeapStatistics stats = (hm.Heap as TrackedHeapWrapper)!.GetCurrentStats();
+ return $"\tBucket {hm.BucketId}: Current {stats.AllocatedBytes / 1024}kB, Blocks {stats.AllocatedBlocks}, Max size {stats.MaxHeapSize / 1024}kB";
+
+ }).ToArray();
+
+ _statsLogger.Debug("Memory statistics for cache memory manager: {hm}\n{stats}", GetHashCode(), statsPerHeap);
+ }
+
/*
* Buckets are mutually exclusive, so we can use a single heap for each bucket
* to get a little more performance on memory operations
@@ -104,7 +135,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server
///<inheritdoc/>
public void FreeHandle(object handle)
{
- _ = handle ?? throw new ArgumentNullException(nameof(handle));
+ ArgumentNullException.ThrowIfNull(handle);
MemoryHandle<byte> _handle = Unsafe.As<object, MemoryHandle<byte>>(ref handle);
//Free the handle
@@ -114,7 +145,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server
///<inheritdoc/>
public uint GetHandleSize(object handle)
{
- _ = handle ?? throw new ArgumentNullException(nameof(handle));
+ ArgumentNullException.ThrowIfNull(handle);
MemoryHandle<byte> _handle = Unsafe.As<object, MemoryHandle<byte>>(ref handle);
return (uint)_handle.Length;
@@ -123,7 +154,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server
///<inheritdoc/>
public Span<byte> GetSpan(object handle, uint offset, uint length)
{
- _ = handle ?? throw new ArgumentNullException(nameof(handle));
+ ArgumentNullException.ThrowIfNull(handle);
MemoryHandle<byte> _handle = Unsafe.As<object, MemoryHandle<byte>>(ref handle);
return _handle.GetOffsetSpan(offset, checked((int)length));
@@ -132,7 +163,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server
///<inheritdoc/>
public MemoryHandle PinHandle(object handle, int offset)
{
- _ = handle ?? throw new ArgumentNullException(nameof(handle));
+ ArgumentNullException.ThrowIfNull(handle);
MemoryHandle<byte> _handle = Unsafe.As<object, MemoryHandle<byte>>(ref handle);
//Pin the handle
@@ -142,7 +173,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server
///<inheritdoc/>
public void ResizeHandle(object handle, uint newSize)
{
- _ = handle ?? throw new ArgumentNullException(nameof(handle));
+ ArgumentNullException.ThrowIfNull(handle);
MemoryHandle<byte> _handle = Unsafe.As<object, MemoryHandle<byte>>(ref handle);
//Resize the handle
diff --git a/plugins/ObjectCacheServer/src/Cache/CacheListenerPubQueue.cs b/plugins/ObjectCacheServer/src/Cache/CacheListenerPubQueue.cs
index 6942828..aef0255 100644
--- a/plugins/ObjectCacheServer/src/Cache/CacheListenerPubQueue.cs
+++ b/plugins/ObjectCacheServer/src/Cache/CacheListenerPubQueue.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2023 Vaughn Nugent
+* Copyright (c) 2024 Vaughn Nugent
*
* Library: VNLib
* Package: ObjectCacheServer
@@ -45,39 +45,33 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache
internal sealed class CacheListenerPubQueue : ICacheListenerEventQueue<IPeerEventQueue>, IAsyncBackgroundWork
{
- private const int MAX_LOCAL_QUEUE_ITEMS = 10000;
- private const string LOG_SCOPE_NAME = "QUEUE";
-
private readonly AsyncQueue<ChangeEvent> _listenerQueue;
private readonly ILogProvider _logProvider;
- private readonly ICacheEventQueueManager _queueManager;
+ private readonly PeerEventQueueManager _queueManager;
- public CacheListenerPubQueue(PluginBase plugin)
+ public CacheListenerPubQueue(PluginBase plugin, PeerEventQueueManager queueMan)
{
- _queueManager = plugin.GetOrCreateSingleton<CacheEventQueueManager>();
- _logProvider = plugin.Log.CreateScope(LOG_SCOPE_NAME);
+ _queueManager = queueMan;
+ _logProvider = plugin.Log.CreateScope(CacheConstants.LogScopes.CacheListenerPubQueue);
//Init local queue to store published events
- _listenerQueue = new(new BoundedChannelOptions(MAX_LOCAL_QUEUE_ITEMS)
+ _listenerQueue = new(new BoundedChannelOptions(CacheConstants.CacheListenerChangeQueueSize)
{
AllowSynchronousContinuations = true,
FullMode = BoundedChannelFullMode.DropOldest,
- SingleReader = true,
+ SingleReader = true, //Always a singe thread reading events
SingleWriter = false,
});
}
///<inheritdoc/>
- async Task IAsyncBackgroundWork.DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken)
+ async Task IAsyncBackgroundWork.DoWorkAsync(ILogProvider _, CancellationToken exitToken)
{
const int accumulatorSize = 64;
- //Create scope
- pluginLog = pluginLog.CreateScope(LOG_SCOPE_NAME);
-
try
{
- pluginLog.Debug("Change queue worker listening for local cache changes");
+ _logProvider.Debug("Change queue worker listening for local cache changes");
//Accumulator for events
ChangeEvent[] accumulator = new ChangeEvent[accumulatorSize];
@@ -105,15 +99,12 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache
catch (OperationCanceledException)
{
//Normal exit
- pluginLog.Debug("Change queue listener worker exited");
+ _logProvider.Debug("Change queue listener worker exited");
}
}
///<inheritdoc/>
- public bool IsEnabled([NotNullWhen(true)] IPeerEventQueue? userState)
- {
- return userState is IPeerEventQueue;
- }
+ public bool IsEnabled([NotNullWhen(true)] IPeerEventQueue? userState) => userState is not null;
///<inheritdoc/>
public void PublishEvent(ChangeEvent changeEvent)
diff --git a/plugins/ObjectCacheServer/src/Cache/CacheConfiguration.cs b/plugins/ObjectCacheServer/src/Cache/CacheMemoryConfiguration.cs
index bd15d24..c404cc5 100644
--- a/plugins/ObjectCacheServer/src/Cache/CacheConfiguration.cs
+++ b/plugins/ObjectCacheServer/src/Cache/CacheMemoryConfiguration.cs
@@ -1,12 +1,12 @@
/*
-* Copyright (c) 2023 Vaughn Nugent
+* Copyright (c) 2024 Vaughn Nugent
*
* Library: VNLib
* Package: ObjectCacheServer
-* File: CacheConfiguration.cs
+* File: CacheMemoryConfiguration.cs
*
-* CacheConfiguration.cs is part of ObjectCacheServer which is part of the larger
-* VNLib collection of libraries and utilities.
+* CacheMemoryConfiguration.cs is part of ObjectCacheServer which
+* is part of the larger VNLib collection of libraries and utilities.
*
* ObjectCacheServer is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
@@ -26,7 +26,7 @@ using System.Text.Json.Serialization;
namespace VNLib.Data.Caching.ObjectCache.Server.Cache
{
- internal sealed class CacheConfiguration
+ internal sealed class CacheMemoryConfiguration
{
[JsonPropertyName("buffer_recv_max")]
public int MaxRecvBufferSize { get; set; } = 1000 * 1024;
@@ -36,6 +36,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache
[JsonPropertyName("buffer_header_max")]
public int MaxHeaderBufferSize { get; set; } = 2 * 1024;
+
[JsonPropertyName("buffer_header_min")]
public int MinHeaderBufferSize { get; set; } = 128;
@@ -49,5 +50,9 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache
[JsonPropertyName("buckets")]
public uint BucketCount { get; set; } = 10;
+
+
+ [JsonPropertyName("memory_lib_path")]
+ public string? ExternLibPath { get; set; }
}
}
diff --git a/plugins/ObjectCacheServer/src/Cache/CacheStore.cs b/plugins/ObjectCacheServer/src/Cache/CacheStore.cs
deleted file mode 100644
index 75abe37..0000000
--- a/plugins/ObjectCacheServer/src/Cache/CacheStore.cs
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
-* Copyright (c) 2024 Vaughn Nugent
-*
-* Library: VNLib
-* Package: ObjectCacheServer
-* File: CacheStore.cs
-*
-* CacheStore.cs is part of ObjectCacheServer which is part of the larger
-* VNLib collection of libraries and utilities.
-*
-* ObjectCacheServer is free software: you can redistribute it and/or modify
-* it under the terms of the GNU Affero General Public License as
-* published by the Free Software Foundation, either version 3 of the
-* License, or (at your option) any later version.
-*
-* ObjectCacheServer is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-* GNU Affero General Public License for more details.
-*
-* You should have received a copy of the GNU Affero General Public License
-* along with this program. If not, see https://www.gnu.org/licenses/.
-*/
-
-using System;
-using System.Threading;
-using System.Threading.Tasks;
-
-using VNLib.Utils.Logging;
-using VNLib.Net.Messaging.FBM;
-using VNLib.Plugins;
-using VNLib.Plugins.Extensions.Loading;
-
-namespace VNLib.Data.Caching.ObjectCache.Server.Cache
-{
- /*
- * Implements the blob cache store, which is an abstraction around the blob cache listener.
- * This allows for publishing local events (say from other nodes) to keep caches in sync.
- */
-
- [ConfigurationName("cache")]
- internal sealed class CacheStore : ICacheStore, IDisposable
- {
- /// <summary>
- /// Gets the underlying cache listener
- /// </summary>
- public BlobCacheListener<IPeerEventQueue> Listener { get; }
-
-
- public CacheStore(PluginBase plugin, IConfigScope config)
- {
- //Init cache
- Listener = InitializeCache((ObjectCacheServerEntry)plugin, config);
- }
-
- ///<inheritdoc/>
- ValueTask ICacheStore.AddOrUpdateBlobAsync<T>(string objectId, string? alternateId, ObjectDataGet<T> bodyData, T state, CancellationToken token)
- {
- return Listener.Cache.AddOrUpdateObjectAsync(objectId, alternateId, bodyData, state, default, token);
- }
-
- ///<inheritdoc/>
- void ICacheStore.Clear()
- {
- throw new NotImplementedException();
- }
-
- ///<inheritdoc/>
- ValueTask<bool> ICacheStore.DeleteItemAsync(string id, CancellationToken token)
- {
- return Listener.Cache.DeleteObjectAsync(id, token);
- }
-
- private static BlobCacheListener<IPeerEventQueue> InitializeCache(ObjectCacheServerEntry plugin, IConfigScope config)
- {
- const string CacheConfigTemplate =
-@"
-Cache Configuration:
- Max memory: {max} Mb
- Buckets: {bc}
- Entries per-bucket: {mc}
-";
-
- //Deserialize the cache config
- CacheConfiguration cacheConf = config.Deserialze<CacheConfiguration>();
-
- if (cacheConf.MaxCacheEntries < 2)
- {
- throw new ArgumentException("You must configure a 'max_cache' size larger than 1 item");
- }
-
- //Suggestion
- if (cacheConf.MaxCacheEntries < 200)
- {
- plugin.Log.Information("Suggestion: You may want a larger cache size, you have less than 200 items in cache");
- }
-
- //calculate the max memory usage
- ulong maxByteSize = cacheConf.MaxCacheEntries * (ulong)cacheConf.BucketCount * (ulong)cacheConf.MaxMessageSize;
-
- //Log the cache config
- plugin.Log.Information(CacheConfigTemplate,
- maxByteSize / (1024 * 1000),
- cacheConf.BucketCount,
- cacheConf.MaxCacheEntries
- );
-
- //Get the event listener
- ICacheListenerEventQueue<IPeerEventQueue> queue = plugin.GetOrCreateSingleton<CacheListenerPubQueue>();
-
- //Get the memory manager
- ICacheMemoryManagerFactory manager = plugin.GetOrCreateSingleton<BucketLocalManagerFactory>();
-
- //Load the blob cache table system
- IBlobCacheTable bc = plugin.LoadMemoryCacheSystem(config, manager, cacheConf);
-
- FallbackFBMMemoryManager fbmMemManager = new(plugin.ListenerHeap);
-
- //Endpoint only allows for a single reader
- return new(bc, queue, plugin.Log, fbmMemManager);
- }
-
- /*
- * Cleaned up by the plugin on exit
- */
- public void Dispose()
- {
- Listener.Dispose();
- }
- }
-}
diff --git a/plugins/ObjectCacheServer/src/Cache/CacheSystemUtil.cs b/plugins/ObjectCacheServer/src/Cache/CacheSystemUtil.cs
index b7bf83f..8f196b0 100644
--- a/plugins/ObjectCacheServer/src/Cache/CacheSystemUtil.cs
+++ b/plugins/ObjectCacheServer/src/Cache/CacheSystemUtil.cs
@@ -29,6 +29,7 @@ using System.Text.Json;
using VNLib.Utils.Resources;
using VNLib.Plugins;
using VNLib.Plugins.Extensions.Loading;
+using VNLib.Utils.Extensions;
namespace VNLib.Data.Caching.ObjectCache.Server.Cache
{
@@ -49,7 +50,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache
/// <param name="cacheConf">The cache configuration object</param>
/// <returns>The loaded <see cref="IBlobCacheTable"/> implementation</returns>
/// <exception cref="FileNotFoundException"></exception>
- public static IBlobCacheTable LoadMemoryCacheSystem(this PluginBase plugin, IConfigScope config, ICacheMemoryManagerFactory heap, CacheConfiguration cacheConf)
+ public static IBlobCacheTable LoadMemoryCacheSystem(this PluginBase plugin, IConfigScope config, ICacheMemoryManagerFactory heap, CacheMemoryConfiguration cacheConf)
{
#pragma warning disable CA2000 // Dispose objects before losing scope
@@ -94,10 +95,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache
if(initMethod != null)
{
//Itterate all buckets
- foreach (IBlobCacheBucket bucket in table)
- {
- initMethod.Invoke(bucket.Id);
- }
+ table.ForEach(bucket => initMethod(bucket.Id));
}
}
diff --git a/plugins/ObjectCacheServer/src/Cache/CacheEventQueueManager.cs b/plugins/ObjectCacheServer/src/Cache/PeerEventQueueManager.cs
index e3c613d..4b76a9b 100644
--- a/plugins/ObjectCacheServer/src/Cache/CacheEventQueueManager.cs
+++ b/plugins/ObjectCacheServer/src/Cache/PeerEventQueueManager.cs
@@ -1,11 +1,11 @@
/*
-* Copyright (c) 2023 Vaughn Nugent
+* Copyright (c) 2024 Vaughn Nugent
*
* Library: VNLib
* Package: ObjectCacheServer
-* File: CacheEventQueueManager.cs
+* File: PeerEventQueueManager.cs
*
-* CacheEventQueueManager.cs is part of ObjectCacheServer which is
+* PeerEventQueueManager.cs is part of ObjectCacheServer which is
* part of the larger VNLib collection of libraries and utilities.
*
* ObjectCacheServer is free software: you can redistribute it and/or modify
@@ -38,41 +38,37 @@ using VNLib.Plugins.Extensions.Loading.Events;
namespace VNLib.Data.Caching.ObjectCache.Server.Cache
{
- internal sealed class CacheEventQueueManager : ICacheEventQueueManager, IDisposable, IIntervalScheduleable
+ internal sealed class PeerEventQueueManager : ICacheEventQueueManager, IIntervalScheduleable
{
private readonly int MaxQueueDepth;
- private readonly object SubLock;
- private readonly LinkedList<NodeQueue> Subscribers;
+ private readonly object SubLock = new();
+ private readonly LinkedList<PeerEventListenerQueue> Subscribers = [];
- private readonly object StoreLock;
- private readonly Dictionary<string, NodeQueue> QueueStore;
+ private readonly object StoreLock = new();
+ private readonly Dictionary<string, PeerEventListenerQueue> QueueStore = new(StringComparer.OrdinalIgnoreCase);
-
- public CacheEventQueueManager(PluginBase plugin)
+ public PeerEventQueueManager(PluginBase plugin, ServerClusterConfig config)
{
- //Get node config
- NodeConfig config = plugin.GetOrCreateSingleton<NodeConfig>();
-
- //Get max queue depth
MaxQueueDepth = config.MaxQueueDepth;
/*
- * Schedule purge interval to clean up stale queues
- */
+ * Schedule purge interval to clean up stale queues
+ */
plugin.ScheduleInterval(this, config.EventQueuePurgeInterval);
-
- SubLock = new();
- Subscribers = new();
-
- StoreLock = new();
- QueueStore = new(StringComparer.OrdinalIgnoreCase);
+
+ //Cleanup disposeables on unload
+ _ = plugin.RegisterForUnload(() =>
+ {
+ QueueStore.Clear();
+ Subscribers.Clear();
+ });
}
///<inheritdoc/>
public IPeerEventQueue Subscribe(ICachePeer peer)
{
- NodeQueue? nq;
+ PeerEventListenerQueue? nq;
bool isNew = false;
@@ -82,13 +78,13 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache
//Try to recover the queue for the node
if (!QueueStore.TryGetValue(peer.NodeId, out nq))
{
- //Create new queue
+ //Create new queue since an existing queue was not found
nq = new(peer.NodeId, MaxQueueDepth);
QueueStore.Add(peer.NodeId, nq);
isNew = true;
}
- //Increment listener count
+ //Increment listener count since a new listener has attached
nq.Listeners++;
}
@@ -109,11 +105,20 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache
///<inheritdoc/>
public void Unsubscribe(ICachePeer peer)
{
+ /*
+ * The reason I am not purging queues that no longer have listeners
+ * now is because it is possible that a listener needed to detach because of
+ * a network issue and will be reconnecting shortly. If the node doesnt
+ * come back before the next purge interval, it's events will be purged.
+ *
+ * Point is: there is a reason for the garbage collection style purging
+ */
+
//Detach a listener for a node
lock (StoreLock)
{
//Get the queue and decrement the listener count
- NodeQueue nq = QueueStore[peer.NodeId];
+ PeerEventListenerQueue nq = QueueStore[peer.NodeId];
nq.Listeners--;
}
}
@@ -125,7 +130,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache
lock (SubLock)
{
//Loop through ll the fast way
- LinkedListNode<NodeQueue>? q = Subscribers.First;
+ LinkedListNode<PeerEventListenerQueue>? q = Subscribers.First;
while (q != null)
{
@@ -145,7 +150,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache
lock (SubLock)
{
//Loop through ll the fast way
- LinkedListNode<NodeQueue>? q = Subscribers.First;
+ LinkedListNode<PeerEventListenerQueue>? q = Subscribers.First;
while (q != null)
{
@@ -167,9 +172,9 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache
lock (StoreLock)
{
//Get all stale queues (queues without listeners)
- NodeQueue[] staleQueues = QueueStore.Values.Where(static nq => nq.Listeners == 0).ToArray();
+ PeerEventListenerQueue[] staleQueues = QueueStore.Values.Where(static nq => nq.Listeners == 0).ToArray();
- foreach (NodeQueue nq in staleQueues)
+ foreach (PeerEventListenerQueue nq in staleQueues)
{
//Remove from store
QueueStore.Remove(nq.NodeId);
@@ -191,54 +196,39 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache
return Task.CompletedTask;
}
- void IDisposable.Dispose()
- {
- QueueStore.Clear();
- Subscribers.Clear();
- }
/*
* Holds queues for each node and keeps track of the number of listeners
* attached to the queue
+ *
+ * The role of this class is to store change events for a given peer node,
+ * and return them when the peer requests them. It also keeps track of the
+ * number of active listeners (server connections) to the queue.
*/
- private sealed class NodeQueue : IPeerEventQueue
+ private sealed class PeerEventListenerQueue(string nodeId, int maxDepth) : IPeerEventQueue
{
public int Listeners;
- public string NodeId { get; }
-
- public AsyncQueue<ChangeEvent> Queue { get; }
+ public string NodeId => nodeId;
- public NodeQueue(string nodeId, int maxDepth)
+ /*
+ * Create a bounded channel that acts as a lru and evicts
+ * the oldest item when the queue is full
+ *
+ * There will also only ever be a single thread writing events
+ * to the queue
+ */
+ private readonly AsyncQueue<ChangeEvent> Queue = new(new BoundedChannelOptions(maxDepth)
{
- NodeId = nodeId;
-
- /*
- * Create a bounded channel that acts as a lru and evicts
- * the oldest item when the queue is full
- *
- * There will also only ever be a single thread writing events
- * to the queue
- */
-
- BoundedChannelOptions queueOptions = new(maxDepth)
- {
- AllowSynchronousContinuations = true,
- SingleReader = false,
- SingleWriter = true,
- //Drop oldest item in queue if full
- FullMode = BoundedChannelFullMode.DropOldest,
- };
-
- //Init queue/channel
- Queue = new(queueOptions);
- }
+ AllowSynchronousContinuations = true,
+ SingleReader = false,
+ SingleWriter = true,
+ //Drop oldest item in queue if full
+ FullMode = BoundedChannelFullMode.DropOldest,
+ });
- public void PublishChange(ChangeEvent change)
- {
- Queue.TryEnque(change);
- }
+ public void PublishChange(ChangeEvent change) => Queue.TryEnque(change);
public void PublishChanges(Span<ChangeEvent> changes)
{
@@ -249,16 +239,10 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Cache
}
///<inheritdoc/>
- public ValueTask<ChangeEvent> DequeueAsync(CancellationToken cancellation)
- {
- return Queue.DequeueAsync(cancellation);
- }
+ public ValueTask<ChangeEvent> DequeueAsync(CancellationToken cancellation) => Queue.DequeueAsync(cancellation);
///<inheritdoc/>
- public bool TryDequeue(out ChangeEvent change)
- {
- return Queue.TryDequeue(out change);
- }
+ public bool TryDequeue(out ChangeEvent change) => Queue.TryDequeue(out change);
}
}
}
diff --git a/plugins/ObjectCacheServer/src/CacheAuthKeyStore.cs b/plugins/ObjectCacheServer/src/CacheAuthKeyStore.cs
index 5fc700b..5be0776 100644
--- a/plugins/ObjectCacheServer/src/CacheAuthKeyStore.cs
+++ b/plugins/ObjectCacheServer/src/CacheAuthKeyStore.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2023 Vaughn Nugent
+* Copyright (c) 2024 Vaughn Nugent
*
* Library: VNLib
* Package: ObjectCacheServer
@@ -34,16 +34,10 @@ using VNLib.Data.Caching.Extensions;
namespace VNLib.Data.Caching.ObjectCache.Server
{
- sealed record class CacheAuthKeyStore : ICacheAuthManager
+ sealed class CacheAuthKeyStore(PluginBase plugin) : ICacheAuthManager
{
- private readonly IAsyncLazy<ReadOnlyJsonWebKey> _clientPub;
- private readonly IAsyncLazy<ReadOnlyJsonWebKey> _cachePriv;
-
- public CacheAuthKeyStore(PluginBase plugin)
- {
- _clientPub = plugin.GetSecretAsync("client_public_key").ToLazy(r => r.GetJsonWebKey());
- _cachePriv = plugin.GetSecretAsync("cache_private_key").ToLazy(r => r.GetJsonWebKey());
- }
+ private readonly IAsyncLazy<ReadOnlyJsonWebKey> _clientPub = plugin.Secrets().GetSecretAsync("client_public_key").ToLazy(r => r.GetJsonWebKey());
+ private readonly IAsyncLazy<ReadOnlyJsonWebKey> _cachePriv = plugin.Secrets().GetSecretAsync("cache_private_key").ToLazy(r => r.GetJsonWebKey());
///<inheritdoc/>
public IReadOnlyDictionary<string, string?> GetJwtHeader()
diff --git a/plugins/ObjectCacheServer/src/CacheConstants.cs b/plugins/ObjectCacheServer/src/CacheConstants.cs
new file mode 100644
index 0000000..85f737d
--- /dev/null
+++ b/plugins/ObjectCacheServer/src/CacheConstants.cs
@@ -0,0 +1,107 @@
+/*
+* Copyright (c) 2024 Vaughn Nugent
+*
+* Library: VNLib
+* Package: ObjectCacheServer
+* File: CacheConstants.cs
+*
+* CacheConstants.cs is part of ObjectCacheServer which is
+* part of the larger VNLib collection of libraries and utilities.
+*
+* ObjectCacheServer is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* ObjectCacheServer is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+
+namespace VNLib.Data.Caching.ObjectCache.Server
+{
+ internal static class CacheConstants
+ {
+ /// <summary>
+ /// The default path for the VNCache well known endpoint (aka discovery endpoint)
+ /// </summary>
+ public const string DefaultWellKnownPath = "/.well-known/vncache";
+
+ /// <summary>
+ /// The maximum size of buffers for FBM messages sent between servers.
+ /// </summary>
+ public const int MaxSyncMessageSize = 12 * 1024;
+
+ /// <summary>
+ /// The maximum size of the change queue for the cache listener
+ /// </summary>
+ public const int CacheListenerChangeQueueSize = 10000;
+
+ /// <summary>
+ /// The time a client authorization token is valid for
+ /// </summary>
+ public static readonly TimeSpan ClientAuthTokenExpiration = TimeSpan.FromSeconds(30);
+
+ public static class LogScopes
+ {
+ /// <summary>
+ /// The log scope for the cache listener
+ /// </summary>
+ public const string BlobCacheListener = "CacheListener";
+
+ /// <summary>
+ /// The peer discovery log scope
+ /// </summary>
+ public const string PeerDiscovery = "DISC";
+
+ /// <summary>
+ /// The log scope for the replication FBM client debug log (if debugging is enabled)
+ /// </summary>
+ public const string ReplicationFbmDebug = "REPL-CLNT";
+
+ /// <summary>
+ /// The log scope for cache replication events
+ /// </summary>
+ public const string RepliactionManager = "REPL-MGR";
+
+ /// <summary>
+ /// The log scope for the cache listener change event queue
+ /// </summary>
+ public const string CacheListenerPubQueue = "QUEUE";
+
+ /// <summary>
+ /// The log scope for the cache connection websocket endpoint
+ /// </summary>
+ public const string ConnectionEndpoint = "CONEP";
+ }
+
+ public static class Delays
+ {
+ /// <summary>
+ /// The amount of startup delay before starting an initial peer discovery
+ /// </summary>
+ public static readonly TimeSpan InitialDiscovery = TimeSpan.FromSeconds(15);
+
+ /// <summary>
+ /// The amount of time to wait before retrying a failed resolve
+ /// of a well-known peers
+ /// </summary>
+ public static readonly TimeSpan WellKnownResolveFailed = TimeSpan.FromSeconds(20);
+
+ /// <summary>
+ /// The amount of time to wait when getting the value of a changed item from the cache
+ /// </summary>
+ /// <remarks>
+ /// When an item change was detected from another peer, the cache will wait this
+ /// amount of time to get the new value from the cache before timing out.
+ /// </remarks>
+ public static readonly TimeSpan CacheSyncGetItemTimeout = TimeSpan.FromSeconds(10);
+ }
+ }
+}
diff --git a/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs b/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs
index a240dde..92f0352 100644
--- a/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs
+++ b/plugins/ObjectCacheServer/src/Clustering/CacheNodeReplicationMaanger.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2023 Vaughn Nugent
+* Copyright (c) 2024 Vaughn Nugent
*
* Library: VNLib
* Package: ObjectCacheServer
@@ -36,7 +36,6 @@ using VNLib.Net.Messaging.FBM;
using VNLib.Net.Messaging.FBM.Client;
using VNLib.Plugins.Extensions.Loading;
using VNLib.Data.Caching.Extensions.Clustering;
-using VNLib.Data.Caching.ObjectCache.Server.Cache;
namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
{
@@ -55,43 +54,36 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
internal sealed class CacheNodeReplicationMaanger : IAsyncBackgroundWork
{
- private const string LOG_SCOPE_NAME = "REPL";
-
- private static readonly TimeSpan GetItemTimeout = TimeSpan.FromSeconds(10);
- private const int MAX_MESSAGE_SIZE = 12 * 1024;
-
private readonly PluginBase _plugin;
private readonly ILogProvider _log;
- private readonly NodeConfig _nodeConfig;
- private readonly ICacheStore _cacheStore;
- private readonly ICachePeerAdapter _peerAdapter;
private readonly FBMClientFactory _clientFactory;
-
+ private readonly ObjectCacheSystemState _sysState;
+
private readonly bool _isDebug;
private int _openConnections;
public CacheNodeReplicationMaanger(PluginBase plugin)
{
- //Load the node config
- _nodeConfig = plugin.GetOrCreateSingleton<NodeConfig>();
- _cacheStore = plugin.GetOrCreateSingleton<CacheStore>();
- _peerAdapter = plugin.GetOrCreateSingleton<PeerDiscoveryManager>();
+ _sysState = plugin.GetOrCreateSingleton<ObjectCacheSystemState>();
//Init fbm config with fixed message size
FBMClientConfig clientConfig = FBMDataCacheExtensions.GetDefaultConfig(
- (plugin as ObjectCacheServerEntry)!.ListenerHeap,
- MAX_MESSAGE_SIZE,
- debugLog: plugin.IsDebug() ? plugin.Log : null
+ _sysState.SharedCacheHeap,
+ CacheConstants.MaxSyncMessageSize,
+ debugLog: plugin.IsDebug() ? plugin.Log.CreateScope(CacheConstants.LogScopes.ReplicationFbmDebug) : null
);
//Init ws fallback factory and client factory
- FBMFallbackClientWsFactory wsFactory = new();
- _clientFactory = new(in clientConfig, wsFactory);
+ _clientFactory = new(
+ ref clientConfig,
+ new FBMFallbackClientWsFactory(),
+ (int)_sysState.ClusterConfig.MaxPeerConnections
+ );
_plugin = plugin;
_isDebug = plugin.IsDebug();
- _log = plugin.Log.CreateScope(LOG_SCOPE_NAME);
+ _log = plugin.Log.CreateScope(CacheConstants.LogScopes.RepliactionManager);
}
public async Task DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken)
@@ -103,7 +95,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
while (true)
{
//Get all new peers
- CacheNodeAdvertisment[] peers = _peerAdapter.GetNewPeers();
+ CacheNodeAdvertisment[] peers = _sysState.PeerDiscovery.GetNewPeers();
if (peers.Length == 0 && _isDebug)
{
@@ -111,7 +103,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
}
//Make sure we don't exceed the max connections
- if(_openConnections >= _nodeConfig.MaxPeerConnections)
+ if(_openConnections >= _sysState.ClusterConfig.MaxPeerConnections)
{
if (_isDebug)
{
@@ -148,15 +140,24 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
_log.Information("Node replication worker exited");
}
+ /*
+ * This method is called when a new peer has connected (or discovered) to establish a
+ * replication connection.
+ */
private async Task OnNewPeerDoWorkAsync(CacheNodeAdvertisment newPeer, ILogProvider log, CancellationToken exitToken)
{
- _ = newPeer ?? throw new ArgumentNullException(nameof(newPeer));
-
- //Setup client
+ ArgumentNullException.ThrowIfNull(newPeer);
+
FBMClient client = _clientFactory.CreateClient();
- //Add peer to monitor
- _peerAdapter.OnPeerListenerAttached(newPeer);
+ /*
+ * Notify discovery that we will be listening to this peer
+ *
+ * This exists so when a new discovery happens, the work loop will produce
+ * the difference of new peers to existing peers, and we can connect to them.
+ * Avoiding infinite connections to the same peer.
+ */
+ _sysState.PeerDiscovery.OnPeerListenerAttached(newPeer);
Interlocked.Increment(ref _openConnections);
@@ -165,12 +166,12 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
log.Information("Establishing replication connection to peer {server}...", newPeer.NodeId);
//Connect to the server
- await client.ConnectToCacheAsync(newPeer, _nodeConfig.Config, exitToken);
+ await client.ConnectToCacheAsync(newPeer, _sysState.NodeConfig, exitToken);
log.Information("Connected to {server}, starting queue listeners", newPeer.NodeId);
//Start worker tasks
- List<Task> workerTasks = new();
+ List<Task> workerTasks = [];
for (int i = 0; i < Environment.ProcessorCount; i++)
{
@@ -187,6 +188,10 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
//Disconnect client gracefully
await client.DisconnectAsync(CancellationToken.None);
}
+ catch(FBMServerNegiationException fbm)
+ {
+ log.Error("Failed to negotiate buffer configuration, check your cache memory configuration. Error:{err}", fbm.Message);
+ }
catch (InvalidResponseException ie)
{
//See if the plugin is unloading
@@ -218,7 +223,15 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
}
catch (Exception ex)
{
- log.Warn("Lost connection to peer {h}\n {m}", newPeer.NodeId, ex);
+ //Avoid call stacks unless debug or higher logging levels
+ if (log.IsEnabled(LogLevel.Debug))
+ {
+ log.Warn("Lost connection to peer {h}\n {m}", newPeer.NodeId, ex);
+ }
+ else
+ {
+ log.Warn("Lost connection to peer {h}\n {m}", newPeer.NodeId, ex.Message);
+ }
}
finally
{
@@ -226,8 +239,8 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
client.Dispose();
- //Notify monitor of disconnect
- _peerAdapter.OnPeerListenerDetatched(newPeer);
+ //Notify monitor of disconnect to make it available again later
+ _sysState.PeerDiscovery.OnPeerListenerDetatched(newPeer);
}
}
@@ -259,7 +272,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
return;
case "deleted":
//Delete the object from the store
- await _cacheStore.DeleteItemAsync(changedObject.CurrentId, CancellationToken.None);
+ await _sysState.InternalStore.DeleteItemAsync(changedObject.CurrentId, CancellationToken.None);
break;
case "modified":
//Reload the record from the store
@@ -287,7 +300,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
modRequest.WriteHeader(ObjectId, string.IsNullOrWhiteSpace(newId) ? objectId : newId);
//Make request
- using FBMResponse response = await client.SendAsync(modRequest, GetItemTimeout, cancellation);
+ using FBMResponse response = await client.SendAsync(modRequest, CacheConstants.Delays.CacheSyncGetItemTimeout, cancellation);
response.ThrowIfNotSet();
@@ -297,7 +310,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
if (ResponseCodes.Okay.Equals(status, StringComparison.Ordinal))
{
//Update the record
- await _cacheStore.AddOrUpdateBlobAsync(objectId, newId, static (t) => t.ResponseBody, response, cancellation);
+ await _sysState.InternalStore.AddOrUpdateBlobAsync(objectId, newId, static (t) => t.ResponseBody, response, cancellation);
log.Debug("Updated object {id}", objectId);
}
else
diff --git a/plugins/ObjectCacheServer/src/Clustering/CachePeerMonitor.cs b/plugins/ObjectCacheServer/src/Clustering/CachePeerMonitor.cs
index c49a54b..c3fbd8e 100644
--- a/plugins/ObjectCacheServer/src/Clustering/CachePeerMonitor.cs
+++ b/plugins/ObjectCacheServer/src/Clustering/CachePeerMonitor.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2023 Vaughn Nugent
+* Copyright (c) 2024 Vaughn Nugent
*
* Library: VNLib
* Package: ObjectCacheServer
@@ -29,7 +29,6 @@ using System.Collections.Generic;
using VNLib.Utils;
using VNLib.Utils.Extensions;
-using VNLib.Plugins;
namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
{
@@ -37,12 +36,9 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
internal sealed class CachePeerMonitor : VnDisposeable, IPeerMonitor
{
- private readonly LinkedList<ICachePeer> peers = new();
+ private readonly List<ICachePeer> peers = new();
private readonly ManualResetEvent newPeerTrigger = new (false);
- public CachePeerMonitor(PluginBase plugin)
- { }
-
/// <summary>
/// Waits for new peers to connect to the server
/// </summary>
@@ -70,7 +66,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
//When a peer is connected we can add it to the list so the replication manager can see it
lock(peers)
{
- peers.AddLast(peer);
+ peers.Add(peer);
}
//Trigger monitor when change occurs
@@ -92,6 +88,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
protected override void Free()
{
+ peers.Clear();
newPeerTrigger.Dispose();
}
}
diff --git a/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs b/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs
index 6475f9c..b9a220d 100644
--- a/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs
+++ b/plugins/ObjectCacheServer/src/Clustering/PeerDiscoveryManager.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2023 Vaughn Nugent
+* Copyright (c) 2024 Vaughn Nugent
*
* Library: VNLib
* Package: ObjectCacheServer
@@ -24,14 +24,11 @@
using System;
using System.Linq;
-using System.Net.Http;
using System.Threading;
-using System.Net.Sockets;
using System.Threading.Tasks;
using System.Collections.Generic;
using VNLib.Utils.Logging;
-using VNLib.Plugins;
using VNLib.Plugins.Extensions.Loading;
using VNLib.Data.Caching.Extensions;
using VNLib.Data.Caching.Extensions.Clustering;
@@ -43,54 +40,19 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
* This class is responsible for resolving and discovering peer nodes in the cluster network.
*/
- internal sealed class PeerDiscoveryManager : IAsyncBackgroundWork, ICachePeerAdapter
+ internal sealed class PeerDiscoveryManager(
+ CacheNodeConfiguration config,
+ ServerClusterConfig clusterConf,
+ CachePeerMonitor Monitor,
+ ILogProvider Log,
+ bool IsDebug,
+ bool HasWellKnown
+ )
+ : IAsyncBackgroundWork, ICachePeerAdapter
{
- private const string LOG_SCOPE_NAME = "DISC";
- /*
- * The initial discovery delay. This allows for the server to initialize before
- * starting the discovery process. This will probably be a shorter delay
- * than a usual discovery interval.
- */
- private static readonly TimeSpan InitialDelay = TimeSpan.FromSeconds(15);
- private static readonly TimeSpan WhenWellKnownResolveFailed = TimeSpan.FromSeconds(20);
-
-
- private readonly List<CacheNodeAdvertisment> _connectedPeers;
- private readonly NodeConfig Config;
- private readonly CachePeerMonitor Monitor;
- private readonly ILogProvider Log;
- private readonly bool IsDebug;
- private readonly bool HasWellKnown;
-
- public PeerDiscoveryManager(PluginBase plugin)
- {
- //Get config
- Config = plugin.GetOrCreateSingleton<NodeConfig>();
-
- //Get the known peers array from config, its allowed to be null for master nodes
- IConfigScope? config = plugin.TryGetConfig("known_peers");
- string[] kownPeers = config?.Deserialze<string[]>() ?? Array.Empty<string>();
-
- //Add known peers to the monitor
- Config.Config.WithInitialPeers(kownPeers.Select(static s => new Uri(s)));
-
- HasWellKnown = kownPeers.Length > 0;
-
- //Get the peer monitor
- Monitor = plugin.GetOrCreateSingleton<CachePeerMonitor>();
-
- _connectedPeers = new();
-
- //Create scoped logger
- Log = plugin.Log.CreateScope(LOG_SCOPE_NAME);
-
- Log.Information("Inital peer nodes: {nodes}", kownPeers);
- //Setup discovery error handler
- Config.Config.WithErrorHandler(new ErrorHandler(Log));
-
- IsDebug = plugin.IsDebug();
- }
+ private readonly List<CacheNodeAdvertisment> _connectedPeers = [];
+ private readonly VNCacheClusterManager clusterMan = new(config);
async Task IAsyncBackgroundWork.DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken)
{
@@ -103,12 +65,11 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
//Start the change listener
Task watcher = WatchForPeersAsync(exitToken);
- Log.Information("Node discovery worker started, waiting for {idel} to start initial discovery", InitialDelay);
+ Log.Information("Node discovery worker started, waiting for {idel} to start initial discovery", CacheConstants.Delays.InitialDiscovery);
try
- {
- //Wait for the initial delay
- await Task.Delay(InitialDelay, exitToken);
+ {
+ await Task.Delay(CacheConstants.Delays.InitialDiscovery, exitToken);
Log.Debug("Begining discovery loop");
@@ -123,26 +84,32 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
Log.Debug("Begining node discovery");
}
- //Resolve all known peers
- CacheNodeAdvertisment[] wellKnown = await Config.Config.ResolveWellKnownAsync(exitToken);
- wellKnownFailed = wellKnown.Length == 0;
+ /*
+ * On every loop we will need to resolve well-known servers incase they go down
+ * or change. There probably should be some more advanced logic and caching here.
+ *
+ * Node may not have any well-known nodes, so we need to check for that.
+ */
+ CacheNodeAdvertisment[] wellKnown = HasWellKnown ?
+ await clusterMan.ResolveWellKnownAsync(exitToken) :
+ Array.Empty<CacheNodeAdvertisment>();
//Use the monitor to get the initial peers
IEnumerable<CacheNodeAdvertisment> ads = GetMonitorAds();
- //Combine well-known with new connected peers
+ //Combine well-known peers that are currently connected to this server
CacheNodeAdvertisment[] allAds = ads.Union(wellKnown).ToArray();
if (allAds.Length > 0)
{
- //Discover all known nodes
- await Config.Config.DiscoverNodesAsync(allAds, exitToken);
+ //Build the discovery map from all the known nodes to find all known nodes in the entire cluster
+ await clusterMan.DiscoverNodesAsync(allAds, exitToken);
}
//Log the discovered nodes if verbose logging is enabled
if (IsDebug)
{
- CacheNodeAdvertisment[] found = Config.Config.NodeCollection.GetAllNodes();
+ CacheNodeAdvertisment[] found = clusterMan.DiscoveredNodes.GetAllNodes();
Log.Debug("Discovered {count} nodes\n\t{nodes}", found.Length, found.Select(static s => s.NodeId));
}
@@ -168,16 +135,16 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
{
if (IsDebug)
{
- Log.Debug("Well known node discovery failed, waiting for {idel} before retrying", WhenWellKnownResolveFailed);
+ Log.Debug("Well known node discovery failed, waiting for {idel} before retrying", CacheConstants.Delays.WellKnownResolveFailed);
}
//Wait for shorter duration
- await Task.Delay(WhenWellKnownResolveFailed, exitToken);
+ await Task.Delay(CacheConstants.Delays.WellKnownResolveFailed, exitToken);
}
else
{
//Delay the next discovery
- await Task.Delay(Config.DiscoveryInterval, exitToken);
+ await Task.Delay(clusterConf.DiscoveryInterval, exitToken);
}
}
}
@@ -188,7 +155,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
}
finally
{
-
+ Monitor.Dispose();
}
//Wait for the watcher to exit
@@ -197,10 +164,11 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
private IEnumerable<CacheNodeAdvertisment> GetMonitorAds()
{
+ string selfId = (clusterMan.Config as CacheNodeConfiguration)!.NodeId;
return Monitor.GetAllPeers()
.Where(static p => p.Advertisment != null)
//Without us
- .Where(n => n.NodeId != Config.Config.NodeId)
+ .Where(n => !string.Equals(n.NodeId, selfId, StringComparison.OrdinalIgnoreCase))
.Select(static p => p.Advertisment!);
}
@@ -222,7 +190,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
//Use the monitor to get the initial peers
IEnumerable<CacheNodeAdvertisment> ads = GetMonitorAds();
- ((NodeDiscoveryCollection)Config.Config.NodeCollection).AddManualNodes(ads);
+ clusterMan.AddManualNodes(ads);
}
}
catch (OperationCanceledException)
@@ -239,7 +207,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
lock (_connectedPeers)
{
//Get all discovered peers
- CacheNodeAdvertisment[] peers = Config.Config.NodeCollection.GetAllNodes();
+ CacheNodeAdvertisment[] peers = clusterMan.DiscoveredNodes.GetAllNodes();
//Get the difference between the discovered peers and the connected peers
return peers.Except(_connectedPeers).ToArray();
@@ -265,31 +233,5 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Clustering
_connectedPeers.Remove(peer);
}
}
-
-
- private sealed record class ErrorHandler(ILogProvider Logger) : ICacheDiscoveryErrorHandler
- {
- public void OnDiscoveryError(CacheNodeAdvertisment errorNode, Exception ex)
- {
-
- if (ex is HttpRequestException hre)
- {
- if (hre.InnerException is SocketException se)
- {
- //traisnport failed
- Logger.Warn("Failed to connect to server {serv} because {err}", errorNode, se.Message);
- }
- else
- {
- Logger.Error("Failed to connect to node {n}\n{err}", errorNode, hre);
- }
- }
- else
- {
- Logger.Error("Failed to discover nodes from nodeid {nid}, with error\n{err}", errorNode, ex);
- }
-
- }
- }
}
}
diff --git a/plugins/ObjectCacheServer/src/Endpoints/CacheNegotationManager.cs b/plugins/ObjectCacheServer/src/Endpoints/CacheNegotationManager.cs
index d1591f8..99433e1 100644
--- a/plugins/ObjectCacheServer/src/Endpoints/CacheNegotationManager.cs
+++ b/plugins/ObjectCacheServer/src/Endpoints/CacheNegotationManager.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2023 Vaughn Nugent
+* Copyright (c) 2024 Vaughn Nugent
*
* Library: VNLib
* Package: ObjectCacheServer
@@ -45,7 +45,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
public bool IsPeer { get; set; }
}
- internal sealed class CacheNegotationManager
+ internal sealed class CacheNegotationManager(PluginBase plugin)
{
/*
* Cache keys are centralized and may be shared between all cache server nodes. This means
@@ -62,23 +62,11 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
* client could use trial and error to find the servers buffer configuration.
*/
- private static readonly TimeSpan AuthTokenExpiration = TimeSpan.FromSeconds(30);
+ private readonly string AudienceLocalServerId = Guid.NewGuid().ToString("N");
- private readonly string AudienceLocalServerId;
- private readonly NodeConfig _nodeConfig;
- private readonly CacheConfiguration _cacheConfig;
+ private readonly ObjectCacheSystemState _sysState = plugin.GetOrCreateSingleton<ObjectCacheSystemState>();
- public CacheNegotationManager(PluginBase plugin)
- {
- //Get node configuration
- _nodeConfig = plugin.GetOrCreateSingleton<NodeConfig>();
-
- //Get the cache store configuration
- _cacheConfig = plugin.GetConfigForType<CacheStore>().Deserialze<CacheConfiguration>();
-
- AudienceLocalServerId = Guid.NewGuid().ToString("N");
- }
-
+ private CacheMemoryConfiguration CacheConfig => _sysState.MemoryConfiguration;
public bool IsClientNegotiationValid(string authToken, out ClientNegotiationState state)
{
@@ -88,12 +76,12 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
using JsonWebToken jwt = JsonWebToken.Parse(authToken);
//verify signature for client
- if (_nodeConfig.KeyStore.VerifyJwt(jwt, false))
+ if (_sysState.KeyStore.VerifyJwt(jwt, false))
{
//Validated as normal client
}
//May be signed by a cache server
- else if (_nodeConfig.KeyStore.VerifyJwt(jwt, true))
+ else if (_sysState.KeyStore.VerifyJwt(jwt, true))
{
//Set peer and verified flag since the another cache server signed the request
state.IsPeer = true;
@@ -117,16 +105,16 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
return true;
}
- public JsonWebToken ConfirmCLientNegotiation(ClientNegotiationState state, IPAddress clientIp, DateTimeOffset now)
+ public JsonWebToken ConfirmClientNegotiation(ClientNegotiationState state, IPAddress clientIp, DateTimeOffset now)
{
//Verified, now we can create an auth message with a short expiration
JsonWebToken auth = new();
- auth.WriteHeader(_nodeConfig.KeyStore.GetJwtHeader());
+ auth.WriteHeader(_sysState.KeyStore.GetJwtHeader());
auth.InitPayloadClaim()
.AddClaim("aud", AudienceLocalServerId)
.AddClaim("iat", now.ToUnixTimeSeconds())
- .AddClaim("exp", now.Add(AuthTokenExpiration).ToUnixTimeSeconds())
+ .AddClaim("exp", now.Add(CacheConstants.ClientAuthTokenExpiration).ToUnixTimeSeconds())
.AddClaim("nonce", RandomHash.GetRandomBase32(8))
.AddClaim("chl", state.Challenge!)
//Set the ispeer flag if the request was signed by a cache server
@@ -136,24 +124,29 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
//Set ip address
.AddClaim("ip", clientIp.ToString())
//Add negotiaion args
- .AddClaim(FBMClient.REQ_HEAD_BUF_QUERY_ARG, _cacheConfig.MaxHeaderBufferSize)
- .AddClaim(FBMClient.REQ_RECV_BUF_QUERY_ARG, _cacheConfig.MaxRecvBufferSize)
- .AddClaim(FBMClient.REQ_MAX_MESS_QUERY_ARG, _cacheConfig.MaxMessageSize)
+ .AddClaim(FBMClient.REQ_HEAD_BUF_QUERY_ARG, CacheConfig.MaxHeaderBufferSize)
+ .AddClaim(FBMClient.REQ_RECV_BUF_QUERY_ARG, CacheConfig.MaxRecvBufferSize)
+ .AddClaim(FBMClient.REQ_MAX_MESS_QUERY_ARG, CacheConfig.MaxMessageSize)
.CommitClaims();
//Sign the auth message from our private key
- _nodeConfig.KeyStore.SignJwt(auth);
+ _sysState.KeyStore.SignJwt(auth);
return auth;
}
- public bool ValidateUpgrade(string upgradeToken, string tokenSignature, DateTimeOffset now, IPAddress connectionIp, ref string? nodeId, ref bool isPeer)
+ public bool ValidateUpgrade(string? upgradeToken, string? tokenSignature, DateTimeOffset now, IPAddress connectionIp, ref string? nodeId, ref bool isPeer)
{
+ if(string.IsNullOrWhiteSpace(upgradeToken) || string.IsNullOrWhiteSpace(tokenSignature))
+ {
+ return false;
+ }
+
//Parse jwt
using JsonWebToken jwt = JsonWebToken.Parse(upgradeToken);
//verify signature against the cache public key, since this server must have signed it
- if (!_nodeConfig.KeyStore.VerifyCachePeer(jwt))
+ if (!_sysState.KeyStore.VerifyCachePeer(jwt))
{
return false;
}
@@ -175,7 +168,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
}
//Check node ip address matches if required
- if (_nodeConfig.VerifyIp)
+ if (_sysState.ClusterConfig.VerifyIp)
{
if (!doc.RootElement.TryGetProperty("ip", out JsonElement ipEl))
{
@@ -201,7 +194,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
}
//Verify token signature against a fellow cache public key
- return _nodeConfig.KeyStore.VerifyUpgradeToken(tokenSignature, upgradeToken, isPeer);
+ return _sysState.KeyStore.VerifyUpgradeToken(tokenSignature, upgradeToken, isPeer);
}
}
}
diff --git a/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs b/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs
index 816e6c3..8368d3a 100644
--- a/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs
+++ b/plugins/ObjectCacheServer/src/Endpoints/ConnectEndpoint.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2023 Vaughn Nugent
+* Copyright (c) 2024 Vaughn Nugent
*
* Library: VNLib
* Package: ObjectCacheServer
@@ -53,13 +53,13 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
internal sealed class ConnectEndpoint : ResourceEndpointBase
{
- internal const string LOG_SCOPE_NAME = "CONEP";
+ private readonly ObjectCacheSystemState _sysState;
-
- private readonly ICacheEventQueueManager PubSubManager;
- private readonly IPeerMonitor Peers;
- private readonly BlobCacheListener<IPeerEventQueue> Store;
- private readonly NodeConfig NodeConfiguration;
+ private PeerEventQueueManager PubSubManager => _sysState.PeerEventQueue;
+ private CachePeerMonitor Peers => _sysState.PeerMonitor;
+ private BlobCacheListener<IPeerEventQueue> Listener => _sysState.Listener;
+ private ServerClusterConfig ClusterConfiguration => _sysState.ClusterConfig;
+
private readonly CacheNegotationManager AuthManager;
private uint _connectedClients;
@@ -72,7 +72,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
/// <summary>
/// The cache store configuration
/// </summary>
- public CacheConfiguration CacheConfig { get; }
+ public CacheMemoryConfiguration CacheConfig => _sysState.MemoryConfiguration;
//Loosen up protection settings
///<inheritdoc/>
@@ -83,24 +83,11 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
public ConnectEndpoint(PluginBase plugin)
{
- //Get node configuration
- NodeConfiguration = plugin.GetOrCreateSingleton<NodeConfig>();
+ _sysState = plugin.GetOrCreateSingleton<ObjectCacheSystemState>();
//Init from config and create a new log scope
- InitPathAndLog(NodeConfiguration.ConnectPath, plugin.Log.CreateScope(LOG_SCOPE_NAME));
-
- //Setup pub/sub manager
- PubSubManager = plugin.GetOrCreateSingleton<CacheEventQueueManager>();
-
- //Get peer monitor
- Peers = plugin.GetOrCreateSingleton<CachePeerMonitor>();
-
- //Init the cache store
- Store = plugin.GetOrCreateSingleton<CacheStore>().Listener;
-
- //Get the cache store configuration
- CacheConfig = plugin.GetConfigForType<CacheStore>().Deserialze<CacheConfiguration>();
-
+ InitPathAndLog(ClusterConfiguration.ConnectPath, plugin.Log.CreateScope(CacheConstants.LogScopes.ConnectionEndpoint));
+
//Get the auth manager
AuthManager = plugin.GetOrCreateSingleton<CacheNegotationManager>();
}
@@ -127,6 +114,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
{
//Parse jwt from authoriation
string? jwtAuth = entity.Server.Headers[HttpRequestHeader.Authorization];
+
if (string.IsNullOrWhiteSpace(jwtAuth))
{
return VirtualClose(entity, HttpStatusCode.Forbidden);
@@ -149,26 +137,34 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
}
//Verified, now we can create an auth message with a short expiration
- using JsonWebToken auth = AuthManager.ConfirmCLientNegotiation(state, entity.TrustedRemoteIp, entity.RequestedTimeUtc);
+ using JsonWebToken auth = AuthManager.ConfirmClientNegotiation(state, entity.TrustedRemoteIp, entity.RequestedTimeUtc);
- //Close response
+ //Close response by sending a copy of the signed token
entity.CloseResponse(HttpStatusCode.OK, ContentType.Text, auth.DataBuffer);
return VfReturnType.VirtualSkip;
}
protected override VfReturnType WebsocketRequested(HttpEntity entity)
{
+ /*
+ * Check to see if any more connections are allowed,
+ * otherwise deny the connection
+ *
+ * This is done here to prevent the server from being overloaded
+ * on a new connection. It would be ideal to not grant new tokens
+ * but malicious clients could cache a bunch of tokens and use them
+ * later, exhausting resources.
+ */
+ if(_connectedClients >= ClusterConfiguration.MaxConcurrentConnections)
+ {
+ return VirtualClose(entity, HttpStatusCode.ServiceUnavailable);
+ }
+
//Parse jwt from authorization
string? jwtAuth = entity.Server.Headers[HttpRequestHeader.Authorization];
string? clientSignature = entity.Server.Headers[FBMDataCacheExtensions.X_UPGRADE_SIG_HEADER];
string? optionalDiscovery = entity.Server.Headers[FBMDataCacheExtensions.X_NODE_DISCOVERY_HEADER];
- //Not null
- if (string.IsNullOrWhiteSpace(jwtAuth) || string.IsNullOrWhiteSpace(clientSignature))
- {
- return VfReturnType.Forbidden;
- }
-
string? nodeId = null;
bool isPeer = false;
@@ -178,17 +174,17 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
return VirtualClose(entity, HttpStatusCode.Unauthorized);
}
- CacheNodeAdvertisment? discoveryAd = null;
-
/*
* If the client is a peer server, it may offer a signed advertisment
* that this node will have the duty of making available to other peers
* if it is valid
*/
- if (isPeer && !string.IsNullOrWhiteSpace(optionalDiscovery))
+ CacheNodeAdvertisment? discoveryAd = null;
+
+ if (isPeer)
{
- discoveryAd = NodeConfiguration.KeyStore.VerifyPeerAdvertisment(optionalDiscovery);
+ discoveryAd = _sysState.KeyStore.VerifyPeerAdvertisment(optionalDiscovery);
}
WsUserState state;
@@ -196,11 +192,10 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
try
{
//Get query config suggestions from the client
- string recvBufCmd = entity.QueryArgs[FBMClient.REQ_RECV_BUF_QUERY_ARG];
- string maxHeaderCharCmd = entity.QueryArgs[FBMClient.REQ_HEAD_BUF_QUERY_ARG];
- string maxMessageSizeCmd = entity.QueryArgs[FBMClient.REQ_MAX_MESS_QUERY_ARG];
+ string? recvBufCmd = entity.QueryArgs.GetValueOrDefault(FBMClient.REQ_RECV_BUF_QUERY_ARG);
+ string? maxHeaderCharCmd = entity.QueryArgs.GetValueOrDefault(FBMClient.REQ_HEAD_BUF_QUERY_ARG);
+ string? maxMessageSizeCmd = entity.QueryArgs.GetValueOrDefault(FBMClient.REQ_MAX_MESS_QUERY_ARG);
- //Parse recv buffer size
int recvBufSize = int.TryParse(recvBufCmd, out int rbs) ? rbs : CacheConfig.MinRecvBufferSize;
int maxHeadBufSize = int.TryParse(maxHeaderCharCmd, out int hbs) ? hbs : CacheConfig.MinHeaderBufferSize;
int maxMessageSize = int.TryParse(maxMessageSizeCmd, out int mxs) ? mxs : CacheConfig.MaxMessageSize;
@@ -253,9 +248,8 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
Peers.OnPeerConnected(state);
//Register plugin exit token to cancel the connected socket
- CancellationTokenRegistration reg = this.GetPlugin().UnloadToken.Register(wss.CancelAll);
-
- //Inc connected count
+ await using CancellationTokenRegistration reg = this.GetPlugin().UnloadToken.Register(wss.CancelAll);
+
Interlocked.Increment(ref _connectedClients);
try
@@ -280,7 +274,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
try
{
//Begin listening for messages with a queue
- await Store.ListenAsync(wss, queue, args);
+ await Listener.ListenAsync(wss, queue, args);
}
finally
{
@@ -291,7 +285,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
else
{
//Begin listening for messages without a queue
- await Store.ListenAsync(wss, null!, args);
+ await Listener.ListenAsync(wss, null!, args);
}
}
catch (OperationCanceledException)
@@ -303,15 +297,13 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
}
catch (Exception ex)
{
- Log.Debug(ex);
+ //If debug logging is enabled print a more detailed error message
+ Log.Error("An error occured on websocket connection: node {con} -> {error}", state.NodeId, ex.Message);
+ Log.Debug("Websocket connection error: node {con}\n{error}", state.NodeId, ex);
}
-
- //Dec connected count
+
Interlocked.Decrement(ref _connectedClients);
- //Unregister the token
- reg.Unregister();
-
//Notify monitor of disconnect
Peers.OnPeerDisconnected(state);
diff --git a/plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs b/plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs
index 7d376b8..8038b70 100644
--- a/plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs
+++ b/plugins/ObjectCacheServer/src/Endpoints/PeerDiscoveryEndpoint.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2023 Vaughn Nugent
+* Copyright (c) 2024 Vaughn Nugent
*
* Library: VNLib
* Package: ObjectCacheServer
@@ -40,25 +40,27 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
{
internal sealed class PeerDiscoveryEndpoint : ResourceEndpointBase
{
- private readonly IPeerMonitor PeerMonitor;
- private readonly NodeConfig Config;
+ private readonly ObjectCacheSystemState _sysState;
+
+ private CacheAuthKeyStore KeyStore => _sysState.KeyStore;
+
+ private CachePeerMonitor PeerMonitor => _sysState.PeerMonitor;
- //Loosen up protection settings
///<inheritdoc/>
protected override ProtectionSettings EndpointProtectionSettings { get; } = new()
{
- DisableSessionsRequired = true
+ /*
+ * Sessions will not be used or required for this endpoint.
+ * We should also assume the session system is not even loaded
+ */
+ DisableSessionsRequired = true
};
public PeerDiscoveryEndpoint(PluginBase plugin)
{
- //Get the peer monitor
- PeerMonitor = plugin.GetOrCreateSingleton<CachePeerMonitor>();
+ _sysState = plugin.GetOrCreateSingleton<ObjectCacheSystemState>();
- //Get the node config
- Config = plugin.GetOrCreateSingleton<NodeConfig>();
-
- InitPathAndLog(Config.DiscoveryPath, plugin.Log);
+ InitPathAndLog(_sysState.ClusterConfig.DiscoveryPath!, plugin.Log);
}
protected override VfReturnType Get(HttpEntity entity)
@@ -68,36 +70,41 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
if(string.IsNullOrWhiteSpace(authToken))
{
- entity.CloseResponse(HttpStatusCode.Unauthorized);
- return VfReturnType.VirtualSkip;
+ return VirtualClose(entity, HttpStatusCode.Unauthorized);
}
string subject = string.Empty;
string challenge = string.Empty;
- //Parse auth token
- using(JsonWebToken jwt = JsonWebToken.Parse(authToken))
+ try
{
+ //Parse auth token
+ using JsonWebToken jwt = JsonWebToken.Parse(authToken);
+
//try to verify against cache node first
- if (!Config.KeyStore.VerifyJwt(jwt, true))
+ if (!KeyStore.VerifyJwt(jwt, true))
{
//failed...
//try to verify against client key
- if (!Config.KeyStore.VerifyJwt(jwt, false))
+ if (!KeyStore.VerifyJwt(jwt, false))
{
//invalid token
- entity.CloseResponse(HttpStatusCode.Unauthorized);
- return VfReturnType.VirtualSkip;
+ return VirtualClose(entity, HttpStatusCode.Unauthorized);
}
}
using JsonDocument payload = jwt.GetPayload();
//Get client info to pass back
- subject = payload.RootElement.TryGetProperty("sub", out JsonElement subEl) ? subEl.GetString() ?? string.Empty : string.Empty;
+ subject = payload.RootElement.TryGetProperty("sub", out JsonElement subEl) ? subEl.GetString() ?? string.Empty : string.Empty;
challenge = payload.RootElement.GetProperty("chl").GetString() ?? string.Empty;
}
+ catch (FormatException)
+ {
+ //If tokens are invalid format, let the client know instead of a server error
+ return VfReturnType.BadRequest;
+ }
//Valid key, get peer list to send to client
CacheNodeAdvertisment[] peers = PeerMonitor.GetAllPeers()
@@ -109,10 +116,10 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
using JsonWebToken response = new();
//set header from cache config
- response.WriteHeader(Config.KeyStore.GetJwtHeader());
+ response.WriteHeader(KeyStore.GetJwtHeader());
response.InitPayloadClaim()
- .AddClaim("iss", Config.Config.NodeId)
+ .AddClaim("iss", _sysState.NodeConfig.NodeId)
//Audience is the requestor id
.AddClaim("sub", subject)
.AddClaim("iat", entity.RequestedTimeUtc.ToUnixTimeSeconds())
@@ -122,10 +129,9 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
.AddClaim("chl", challenge)
.CommitClaims();
- //Sign the response
- Config.KeyStore.SignJwt(response);
-
- //Send response to client
+
+ KeyStore.SignJwt(response);
+
entity.CloseResponse(HttpStatusCode.OK, Net.Http.ContentType.Text, response.DataBuffer);
return VfReturnType.VirtualSkip;
}
diff --git a/plugins/ObjectCacheServer/src/Endpoints/WellKnownEndpoint.cs b/plugins/ObjectCacheServer/src/Endpoints/WellKnownEndpoint.cs
index 87a471b..18855e3 100644
--- a/plugins/ObjectCacheServer/src/Endpoints/WellKnownEndpoint.cs
+++ b/plugins/ObjectCacheServer/src/Endpoints/WellKnownEndpoint.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2023 Vaughn Nugent
+* Copyright (c) 2024 Vaughn Nugent
*
* Library: VNLib
* Package: ObjectCacheServer
@@ -59,13 +59,13 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Endpoints
public WellKnownEndpoint(PluginBase plugin)
{
//Get the node config
- NodeConfig nodeConfig = plugin.GetOrCreateSingleton<NodeConfig>();
+ ObjectCacheSystemState conf = plugin.GetOrCreateSingleton<ObjectCacheSystemState>();
//serialize the config, discovery may not be enabled
- _advertisment = nodeConfig.Config.Advertisment;
- _keyStore = nodeConfig.KeyStore;
+ _advertisment = conf.NodeConfig.Advertisment;
+ _keyStore = conf.KeyStore;
- InitPathAndLog(nodeConfig.WellKnownPath, plugin.Log);
+ InitPathAndLog(conf.ClusterConfig.WellKnownPath, plugin.Log);
}
protected override VfReturnType Get(HttpEntity entity)
diff --git a/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs b/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs
index aada787..42bd0c7 100644
--- a/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs
+++ b/plugins/ObjectCacheServer/src/ObjectCacheServerEntry.cs
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2023 Vaughn Nugent
+* Copyright (c) 2024 Vaughn Nugent
*
* Library: VNLib
* Package: ObjectCacheServer
@@ -23,13 +23,11 @@
*/
using System;
-using System.Threading;
using System.Collections.Generic;
using VNLib.Plugins;
-using VNLib.Utils.Memory;
+using VNLib.Utils;
using VNLib.Utils.Logging;
-using VNLib.Utils.Memory.Diagnostics;
using VNLib.Plugins.Extensions.Loading;
using VNLib.Plugins.Extensions.Loading.Routing;
@@ -43,38 +41,15 @@ namespace VNLib.Data.Caching.ObjectCache.Server
{
public override string PluginName => "ObjectCache.Service";
- private readonly Lazy<IUnmangedHeap> _cacheHeap;
-
- internal IUnmangedHeap ListenerHeap => _cacheHeap.Value;
-
- public ObjectCacheServerEntry()
- {
- //Init heap
- _cacheHeap = new Lazy<IUnmangedHeap>(InitializeHeap, LazyThreadSafetyMode.PublicationOnly);
- }
-
- internal IUnmangedHeap InitializeHeap()
- {
- //Create default heap
- IUnmangedHeap _heap = MemoryUtil.InitializeNewHeapForProcess();
- try
- {
- //If the plugin is in debug mode enable heap tracking
- return this.IsDebug() ? new TrackedHeapWrapper(_heap, true) : _heap;
- }
- catch
- {
- _heap.Dispose();
- throw;
- }
- }
+ ObjectCacheSystemState? sysState;
protected override void OnLoad()
{
try
{
- //Get the node configuration first
- NodeConfig config = this.GetOrCreateSingleton<NodeConfig>();
+ //Initialize the cache node builder
+ sysState = this.GetOrCreateSingleton<ObjectCacheSystemState>();
+ sysState.Initialize();
//Route well-known endpoint
this.Route<WellKnownEndpoint>();
@@ -85,8 +60,8 @@ namespace VNLib.Data.Caching.ObjectCache.Server
//We must initialize the replication manager
_ = this.GetOrCreateSingleton<CacheNodeReplicationMaanger>();
- //Setup discovery endpoint
- if(!string.IsNullOrWhiteSpace(config.DiscoveryPath))
+ //Setup discovery endpoint only if the user enabled clustering
+ if(!string.IsNullOrWhiteSpace(sysState.ClusterConfig.DiscoveryPath))
{
this.Route<PeerDiscoveryEndpoint>();
}
@@ -101,18 +76,34 @@ namespace VNLib.Data.Caching.ObjectCache.Server
protected override void OnUnLoad()
{
- //dispose heap if initialized
- if(_cacheHeap.IsValueCreated)
- {
- _cacheHeap.Value.Dispose();
- }
-
Log.Information("Plugin unloaded");
}
protected override void ProcessHostCommand(string cmd)
{
- throw new NotImplementedException();
+ if(string.IsNullOrWhiteSpace(cmd))
+ {
+ return;
+ }
+
+ ArgumentList al = new(cmd.Split(" "));
+
+ if(al.Count == 0)
+ {
+ Log.Warn("Invalid command");
+ return;
+ }
+
+ switch (al[0].ToLower(null))
+ {
+ case "memstats":
+ sysState?.LogMemoryStats();
+ break;
+
+ default:
+ Log.Warn("Invalid command");
+ break;
+ }
}
}
}
diff --git a/plugins/ObjectCacheServer/src/ObjectCacheSystemState.cs b/plugins/ObjectCacheServer/src/ObjectCacheSystemState.cs
new file mode 100644
index 0000000..cd5bf1b
--- /dev/null
+++ b/plugins/ObjectCacheServer/src/ObjectCacheSystemState.cs
@@ -0,0 +1,331 @@
+/*
+* Copyright (c) 2024 Vaughn Nugent
+*
+* Library: VNLib
+* Package: ObjectCacheServer
+* File: ObjectCacheSystemState.cs
+*
+* ObjectCacheSystemState.cs is part of ObjectCacheServer which is
+* part of the larger VNLib collection of libraries and utilities.
+*
+* ObjectCacheServer is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as
+* published by the Free Software Foundation, either version 3 of the
+* License, or (at your option) any later version.
+*
+* ObjectCacheServer is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see https://www.gnu.org/licenses/.
+*/
+
+using System;
+using System.Linq;
+using System.Net.Http;
+using System.Net.Sockets;
+using System.Threading;
+using System.Threading.Tasks;
+using System.Collections.Generic;
+
+using VNLib.Utils.Logging;
+using VNLib.Utils.Memory;
+using VNLib.Utils.Memory.Diagnostics;
+using VNLib.Net.Messaging.FBM;
+using VNLib.Plugins;
+using VNLib.Plugins.Extensions.Loading;
+
+using VNLib.Data.Caching.Extensions.Clustering;
+using VNLib.Data.Caching.ObjectCache.Server.Cache;
+using VNLib.Data.Caching.ObjectCache.Server.Clustering;
+
+namespace VNLib.Data.Caching.ObjectCache.Server
+{
+ /*
+ * The purpose of this class is to manage the state of the entire cache server.
+ * All configuration and state should be creatd and managed by this class. To make it
+ * easier to manage.
+ */
+ [ConfigurationName("cache")]
+ internal sealed class ObjectCacheSystemState(PluginBase plugin, IConfigScope config) : IDisposable
+ {
+ public BlobCacheListener<IPeerEventQueue> Listener { get; private set; } = null!;
+
+ public ICacheStore InternalStore { get; private set; } = null!;
+
+ /// <summary>
+ /// Used for miscellaneous shared memory allocations (like the cache listener)
+ /// </summary>
+ public IUnmangedHeap SharedCacheHeap { get; private set; } = null!;
+
+ /// <summary>
+ /// The plugin-wide, shared node configuration
+ /// </summary>
+ public ServerClusterConfig ClusterConfig { get; } = plugin.GetOrCreateSingleton<ServerClusterConfig>();
+
+ /// <summary>
+ /// The system wide cache authenticator
+ /// </summary>
+ public CacheAuthKeyStore KeyStore { get; } = new(plugin);
+
+ /// <summary>
+ /// The system cache node configuration
+ /// </summary>
+ public CacheNodeConfiguration NodeConfig { get; private set; }
+
+ /// <summary>
+ /// The peer discovery manager
+ /// </summary>
+ public PeerDiscoveryManager PeerDiscovery { get; private set; } = null!;
+
+ /// <summary>
+ /// System wide peer monitor
+ /// </summary>
+ public CachePeerMonitor PeerMonitor { get; } = new();
+
+ public CacheMemoryConfiguration MemoryConfiguration { get; } = config.Deserialze<CacheMemoryConfiguration>();
+
+ /// <summary>
+ /// The system wide peer event queue manager
+ /// </summary>
+ public PeerEventQueueManager PeerEventQueue { get; private set; }
+
+ private ICacheMemoryManagerFactory _cacheMemManager;
+
+ void IDisposable.Dispose()
+ {
+ SharedCacheHeap.Dispose();
+ Listener.Dispose();
+ }
+
+ /// <summary>
+ /// Initializes the cache node state
+ /// </summary>
+ public void Initialize()
+ {
+ CacheMemoryConfiguration cacheConf = MemoryConfiguration;
+
+ ArgumentOutOfRangeException.ThrowIfLessThan(cacheConf.MaxCacheEntries, 2u);
+
+ //Suggestion
+ if (cacheConf.MaxCacheEntries < 200)
+ {
+ plugin.Log.Information("Suggestion: You may want a larger cache size, you have less than 200 items in cache");
+ }
+
+ LogMemConfiguration();
+
+ PeerEventQueue = new(plugin, ClusterConfig);
+
+ //If the plugin is in debug mode enable heap tracking
+ SharedCacheHeap = plugin.IsDebug() ?
+ new TrackedHeapWrapper(MemoryUtil.InitializeNewHeapForProcess(), true)
+ : MemoryUtil.InitializeNewHeapForProcess();
+
+ //Load node configuration first
+ (NodeConfig = ClusterConfig.BuildNodeConfig())
+ .WithAuthenticator(KeyStore); //Also pass the key store to the node config
+
+ ConfigurePeerDiscovery();
+
+ ConfigureCacheListener();
+ }
+
+ private void ConfigurePeerDiscovery()
+ {
+ //Get the known peers array from config, its allowed to be null for master nodes
+ IConfigScope? config = plugin.TryGetConfig("known_peers");
+ string[] kownPeers = config?.Deserialze<string[]>() ?? [];
+
+ ILogProvider discLogger = plugin.Log.CreateScope(CacheConstants.LogScopes.PeerDiscovery);
+
+ //Allow just origin nodes to be used as known peers
+ IEnumerable<Uri> peerUris = kownPeers.Select(static p =>
+ {
+ Uri bUri = new(p, UriKind.Absolute);
+ return bUri.LocalPath == "/" ? new Uri(bUri, CacheConstants.DefaultWellKnownPath) : bUri;
+ });
+
+ NodeConfig.WithInitialPeers(peerUris)
+ .WithErrorHandler(new ErrorHandler(discLogger));
+
+ discLogger.Information("Inital peer nodes: {nodes}", kownPeers);
+
+ PeerDiscovery = new PeerDiscoveryManager(
+ NodeConfig,
+ ClusterConfig,
+ PeerMonitor,
+ discLogger,
+ plugin.IsDebug(),
+ kownPeers.Length > 0
+ );
+
+ //Discovery manager needs to be scheduled for background work to run the discovery loop
+ _ = plugin.ObserveWork(PeerDiscovery, 10);
+ }
+
+ private void ConfigureCacheListener()
+ {
+ /*
+ * Allow loading external managed dll for a bucket-local memory manager
+ */
+ ICacheMemoryManagerFactory manager;
+
+ if (string.IsNullOrWhiteSpace(MemoryConfiguration.ExternLibPath))
+ {
+ //Get the memory manager
+ manager = plugin.GetOrCreateSingleton<BucketLocalManagerFactory>();
+ }
+ else
+ {
+ manager = plugin.CreateServiceExternal<ICacheMemoryManagerFactory>(MemoryConfiguration.ExternLibPath);
+ }
+
+ _cacheMemManager = manager;
+
+ CacheListenerPubQueue queue = new(plugin, PeerEventQueue);
+
+ //Must register background worker to listen for changes
+ _ = plugin.ObserveWork(queue, 150);
+
+ //Endpoint only allows for a single reader
+ Listener = new(
+ plugin.LoadMemoryCacheSystem(config, manager, MemoryConfiguration),
+ queue,
+ plugin.Log.CreateScope(CacheConstants.LogScopes.BlobCacheListener),
+ new SharedHeapFBMMemoryManager(SharedCacheHeap)
+ );
+
+ InternalStore = new CacheStore(Listener.Cache);
+ }
+
+ private void LogMemConfiguration()
+ {
+ const string CacheConfigTemplate =
+@"
+Cache Configuration:
+ Max memory: {max} Mb
+ Buckets: {bc}
+ Entries per-bucket: {mc}
+ HeapTracking: {ht}
+";
+
+ CacheMemoryConfiguration cacheConf = MemoryConfiguration;
+
+ //calculate the max memory usage
+ ulong maxByteSize = cacheConf.MaxCacheEntries * (ulong)cacheConf.BucketCount * (ulong)cacheConf.MaxMessageSize;
+
+ //Log the cache config
+ plugin.Log.Information(
+ CacheConfigTemplate,
+ maxByteSize / (1024 * 1000),
+ cacheConf.BucketCount,
+ cacheConf.MaxCacheEntries,
+ plugin.IsDebug()
+ );
+ }
+
+ public void LogMemoryStats()
+ {
+ if(SharedCacheHeap is TrackedHeapWrapper thw)
+ {
+ const string shStatTemplate =
+@" VNCache shared heap stats:
+ Current: {cur}kB
+ Blocks: {blks}
+ Max size: {max}kB
+";
+ HeapStatistics stats = thw.GetCurrentStats();
+ plugin.Log.Debug(
+ shStatTemplate,
+ stats.AllocatedBytes / 1024,
+ stats.AllocatedBlocks,
+ stats.MaxHeapSize / 1024
+ );
+
+ }
+
+ //Also print logs for the bucket local managers if they are enabled
+ if(_cacheMemManager is BucketLocalManagerFactory blmf)
+ {
+ blmf.LogHeapStats();
+ }
+ }
+
+ private sealed class ErrorHandler(ILogProvider Logger) : ICacheDiscoveryErrorHandler
+ {
+ public void OnDiscoveryError(CacheNodeAdvertisment errorNode, Exception ex)
+ => LogError(ex, errorNode.NodeId, errorNode.ConnectEndpoint);
+
+ public void OnDiscoveryError(Uri errorAddress, Exception ex)
+ => LogError(ex, null, errorAddress);
+
+ private void LogError(Exception ex, string? nodId, Uri? connectAddress)
+ {
+ //For logging purposes, use the node id if its available, otherwise use the address
+ if(nodId == null && connectAddress != null)
+ {
+ nodId = connectAddress.ToString();
+ }
+
+ if (ex is HttpRequestException hre)
+ {
+ if (hre.InnerException is SocketException se)
+ {
+ //transport failed
+ Logger.Warn("Failed to connect to server {serv} because {err}", nodId, se.Message);
+ }
+ else
+ {
+ Logger.Error("Failed to connect to node {n}\n{err}", nodId, hre);
+ }
+ }
+ if (ex is OperationCanceledException)
+ {
+ Logger.Error("Failed to discover nodes from nodeid {nid}, because the operation was canceled");
+ }
+ else if (ex is TimeoutException)
+ {
+ //Only log exception stack when in debug logging mode
+ Logger.Warn("Failed to discover nodes from nodeid {nid}, because a timeout occured", nodId);
+ }
+ else
+ {
+ //Only log exception stack when in debug logging mode
+ if (Logger.IsEnabled(LogLevel.Debug))
+ {
+ Logger.Error("Failed to discover nodes from nodeid {nid}, with error\n{err}", nodId, ex);
+ }
+ else
+ {
+ Logger.Error("Failed to discover nodes from nodeid {nid}, with error: {err}", nodId, ex.Message);
+ }
+ }
+ }
+ }
+
+ internal sealed class CacheStore(IBlobCacheTable table) : ICacheStore
+ {
+
+ ///<inheritdoc/>
+ ValueTask ICacheStore.AddOrUpdateBlobAsync<T>(string objectId, string? alternateId, ObjectDataGet<T> bodyData, T state, CancellationToken token)
+ {
+ return table.AddOrUpdateObjectAsync(objectId, alternateId, bodyData, state, default, token);
+ }
+
+ ///<inheritdoc/>
+ void ICacheStore.Clear()
+ {
+ throw new NotImplementedException();
+ }
+
+ ///<inheritdoc/>
+ ValueTask<bool> ICacheStore.DeleteItemAsync(string id, CancellationToken token)
+ {
+ return table.DeleteObjectAsync(id, token);
+ }
+ }
+ }
+}
diff --git a/plugins/ObjectCacheServer/src/NodeConfig.cs b/plugins/ObjectCacheServer/src/ServerClusterConfig.cs
index 3a2e10e..8e098cd 100644
--- a/plugins/ObjectCacheServer/src/NodeConfig.cs
+++ b/plugins/ObjectCacheServer/src/ServerClusterConfig.cs
@@ -1,11 +1,11 @@
/*
-* Copyright (c) 2023 Vaughn Nugent
+* Copyright (c) 2024 Vaughn Nugent
*
* Library: VNLib
* Package: ObjectCacheServer
-* File: NodeConfig.cs
+* File: ServerClusterConfig.cs
*
-* NodeConfig.cs is part of ObjectCacheServer which is part of the larger
+* ServerClusterConfig.cs is part of ObjectCacheServer which is part of the larger
* VNLib collection of libraries and utilities.
*
* ObjectCacheServer is free software: you can redistribute it and/or modify
@@ -34,40 +34,56 @@ using VNLib.Utils.Extensions;
using VNLib.Plugins.Extensions.Loading;
using VNLib.Data.Caching.Extensions.Clustering;
-
namespace VNLib.Data.Caching.ObjectCache.Server
{
[ConfigurationName("cluster")]
- internal sealed class NodeConfig
+ internal sealed class ServerClusterConfig(PluginBase plugin, IConfigScope config)
{
- //Default path for the well known endpoint
- const string DefaultPath = "/.well-known/vncache";
-
- public CacheNodeConfiguration Config { get; }
-
- public CacheAuthKeyStore KeyStore { get; }
-
- public TimeSpan DiscoveryInterval { get; }
+ public TimeSpan DiscoveryInterval { get; } = config.GetRequiredProperty("discovery_interval_sec", p => p.GetTimeSpan(TimeParseType.Seconds));
- public TimeSpan EventQueuePurgeInterval { get; }
+ public TimeSpan EventQueuePurgeInterval { get; } = config.GetRequiredProperty("queue_purge_interval_sec", p => p.GetTimeSpan(TimeParseType.Seconds));
- public int MaxQueueDepth { get; }
+ public int MaxQueueDepth { get; } = (int)config.GetRequiredProperty("max_queue_depth", p => p.GetUInt32());
- public string? DiscoveryPath { get; }
+ public string? DiscoveryPath { get; } = config.GetValueOrDefault("discovery_path", p => p.GetString(), null);
- public string ConnectPath { get; }
+ public string ConnectPath { get; } = config.GetRequiredProperty("connect_path", p => p.GetString()!);
- public string WellKnownPath { get; }
+ public string WellKnownPath { get; } = config.GetValueOrDefault("well_known_path", p => p.GetString()!, CacheConstants.DefaultWellKnownPath)
+ ?? CacheConstants.DefaultWellKnownPath;
- public bool VerifyIp { get; }
+ public bool VerifyIp { get; } = config.GetRequiredProperty("verify_ip", p => p.GetBoolean());
/// <summary>
/// The maximum number of peer connections to allow
/// </summary>
- public uint MaxPeerConnections { get; } = 10;
+ public uint MaxPeerConnections { get; } = config.GetValueOrDefault("max_peers", p => p.GetUInt32(), 10u);
+
+ /// <summary>
+ /// The maxium number of concurrent client connections to allow
+ /// before rejecting new connections
+ /// </summary>
+ public uint MaxConcurrentConnections { get; } = config.GetValueOrDefault("max_concurrent_connections", p => p.GetUInt32(), 100u);
+
+ const string CacheConfigTemplate =
+@"
+Cluster Configuration:
+ Node Id: {id}
+ TlsEndabled: {tls}
+ Verify Ip: {vi}
+ Well-Known: {wk}
+ Cache Endpoint: {ep}
+ Discovery Endpoint: {dep}
+ Discovery Interval: {di}
+ Max Peer Connections: {mpc}
+ Max Queue Depth: {mqd}
+ Event Queue Purge Interval: {eqpi}
+";
+
+ internal CacheNodeConfiguration BuildNodeConfig()
+ {
+ CacheNodeConfiguration conf = new();
- public NodeConfig(PluginBase plugin, IConfigScope config)
- {
//Get the port of the primary webserver
int port;
bool usingTls;
@@ -88,81 +104,24 @@ namespace VNLib.Data.Caching.ObjectCache.Server
//Server id is just dns name for now
string nodeId = $"{hostname}:{port}";
-
- //Init key store
- KeyStore = new(plugin);
-
-
- DiscoveryInterval = config["discovery_interval_sec"].GetTimeSpan(TimeParseType.Seconds);
-
- //Get the event queue purge interval
- EventQueuePurgeInterval = config["queue_purge_interval_sec"].GetTimeSpan(TimeParseType.Seconds);
-
- //Get the max queue depth
- MaxQueueDepth = (int)config["max_queue_depth"].GetUInt32();
-
-
- //Get the connect path
- ConnectPath = config["connect_path"].GetString() ?? throw new KeyNotFoundException("Missing required key 'connect_path' in cluster config");
-
- //Get the verify ip setting
- VerifyIp = config["verify_ip"].GetBoolean();
Uri connectEp = BuildUri(usingTls, hostname, port, ConnectPath);
Uri? discoveryEp = null;
-
- Config = new();
-
- //Setup cache node config
- Config.WithCacheEndpoint(connectEp)
- .WithNodeId(nodeId)
- .WithAuthenticator(KeyStore)
- .WithTls(usingTls);
+
+
+ conf.WithCacheEndpoint(connectEp)
+ .WithNodeId(nodeId)
+ .WithTls(usingTls);
//Get the discovery path (optional)
- if (config.TryGetValue("discovery_path", out JsonElement discoveryPathEl))
+ if (!string.IsNullOrWhiteSpace(DiscoveryPath))
{
- DiscoveryPath = discoveryPathEl.GetString();
-
- //Enable advertisment if a discovery path is present
- if (!string.IsNullOrEmpty(DiscoveryPath))
- {
- //Build the discovery endpoint, it must be an absolute uri
- discoveryEp = BuildUri(usingTls, hostname, port, DiscoveryPath);
- Config.EnableAdvertisment(discoveryEp);
- }
+ //Build the discovery endpoint, it must be an absolute uri
+ discoveryEp = BuildUri(usingTls, hostname, port, DiscoveryPath);
+ conf.EnableAdvertisment(discoveryEp);
}
- //Allow custom well-known path
- if(config.TryGetValue("well_known_path", out JsonElement wkEl))
- {
- WellKnownPath = wkEl.GetString() ?? DefaultPath;
- }
- //Default if not set
- WellKnownPath ??= DefaultPath;
-
- //Get the max peer connections
- if (config.TryGetValue("max_peers", out JsonElement maxPeerEl))
- {
- MaxPeerConnections = maxPeerEl.GetUInt32();
- }
-
- const string CacheConfigTemplate =
-@"
-Cluster Configuration:
- Node Id: {id}
- TlsEndabled: {tls}
- Verify Ip: {vi}
- Well-Known: {wk}
- Cache Endpoint: {ep}
- Discovery Endpoint: {dep}
- Discovery Interval: {di}
- Max Peer Connections: {mpc}
- Max Queue Depth: {mqd}
- Event Queue Purge Interval: {eqpi}
-";
-
- //log the config
+ //print the cluster configuration to the log
plugin.Log.Information(CacheConfigTemplate,
nodeId,
usingTls,
@@ -175,6 +134,8 @@ Cluster Configuration:
MaxQueueDepth,
EventQueuePurgeInterval
);
+
+ return conf;
}
private static Uri BuildUri(bool tls, string host, int port, string path)