aboutsummaryrefslogtreecommitdiff
path: root/plugins/ObjectCacheServer/src/Distribution/CacheNodeReplicationMaanger.cs
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/ObjectCacheServer/src/Distribution/CacheNodeReplicationMaanger.cs')
-rw-r--r--plugins/ObjectCacheServer/src/Distribution/CacheNodeReplicationMaanger.cs16
1 files changed, 10 insertions, 6 deletions
diff --git a/plugins/ObjectCacheServer/src/Distribution/CacheNodeReplicationMaanger.cs b/plugins/ObjectCacheServer/src/Distribution/CacheNodeReplicationMaanger.cs
index b453dcc..a55e8e2 100644
--- a/plugins/ObjectCacheServer/src/Distribution/CacheNodeReplicationMaanger.cs
+++ b/plugins/ObjectCacheServer/src/Distribution/CacheNodeReplicationMaanger.cs
@@ -58,7 +58,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Distribution
//Get peer adapter
PeerAdapter = plugin.GetOrCreateSingleton<PeerDiscoveryManager>();
-
+ CacheStore = plugin.GetOrCreateSingleton<CacheStore>();
}
public async Task DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken)
@@ -70,15 +70,15 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Distribution
while (true)
{
//Get all new peers
- ICachePeerAdvertisment[] peers = PeerAdapter.GetNewPeers();
+ ICacheNodeAdvertisment[] peers = PeerAdapter.GetNewPeers();
if (peers.Length == 0)
{
pluginLog.Verbose("[REPL] No new peers to connect to");
}
- //Connect to each peer
- foreach (ICachePeerAdvertisment peer in peers)
+ //Connect to each peer as a background task
+ foreach (ICacheNodeAdvertisment peer in peers)
{
_ = Plugin.ObserveWork(() => OnNewPeerDoWorkAsync(peer, pluginLog, exitToken));
}
@@ -104,13 +104,16 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Distribution
pluginLog.Information("[REPL] Node replication worker exited");
}
- private async Task OnNewPeerDoWorkAsync(ICachePeerAdvertisment newPeer, ILogProvider log, CancellationToken exitToken)
+ private async Task OnNewPeerDoWorkAsync(ICacheNodeAdvertisment newPeer, ILogProvider log, CancellationToken exitToken)
{
_ = newPeer ?? throw new ArgumentNullException(nameof(newPeer));
//Setup client
FBMClient client = new(ClientConfig);
+ //Add peer to monitor
+ PeerAdapter.OnPeerListenerAttached(newPeer);
+
try
{
log.Information("Establishing replication connection to peer {server}...", newPeer.NodeId);
@@ -198,7 +201,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Distribution
return;
case "deleted":
//Delete the object from the store
- await CacheStore.DeleteItemAsync(changedObject.CurrentId);
+ await CacheStore.DeleteItemAsync(changedObject.CurrentId, CancellationToken.None);
break;
case "modified":
//Reload the record from the store
@@ -226,6 +229,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Distribution
//Check response code
string status = response.Headers.First(static s => s.Header == HeaderCommand.Status).Value.ToString();
+
if (ResponseCodes.Okay.Equals(status, StringComparison.Ordinal))
{
//Update the record