diff options
Diffstat (limited to 'plugins/ObjectCacheServer/src/Distribution/CacheNodeReplicationMaanger.cs')
-rw-r--r-- | plugins/ObjectCacheServer/src/Distribution/CacheNodeReplicationMaanger.cs | 16 |
1 files changed, 10 insertions, 6 deletions
diff --git a/plugins/ObjectCacheServer/src/Distribution/CacheNodeReplicationMaanger.cs b/plugins/ObjectCacheServer/src/Distribution/CacheNodeReplicationMaanger.cs index b453dcc..a55e8e2 100644 --- a/plugins/ObjectCacheServer/src/Distribution/CacheNodeReplicationMaanger.cs +++ b/plugins/ObjectCacheServer/src/Distribution/CacheNodeReplicationMaanger.cs @@ -58,7 +58,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Distribution //Get peer adapter PeerAdapter = plugin.GetOrCreateSingleton<PeerDiscoveryManager>(); - + CacheStore = plugin.GetOrCreateSingleton<CacheStore>(); } public async Task DoWorkAsync(ILogProvider pluginLog, CancellationToken exitToken) @@ -70,15 +70,15 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Distribution while (true) { //Get all new peers - ICachePeerAdvertisment[] peers = PeerAdapter.GetNewPeers(); + ICacheNodeAdvertisment[] peers = PeerAdapter.GetNewPeers(); if (peers.Length == 0) { pluginLog.Verbose("[REPL] No new peers to connect to"); } - //Connect to each peer - foreach (ICachePeerAdvertisment peer in peers) + //Connect to each peer as a background task + foreach (ICacheNodeAdvertisment peer in peers) { _ = Plugin.ObserveWork(() => OnNewPeerDoWorkAsync(peer, pluginLog, exitToken)); } @@ -104,13 +104,16 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Distribution pluginLog.Information("[REPL] Node replication worker exited"); } - private async Task OnNewPeerDoWorkAsync(ICachePeerAdvertisment newPeer, ILogProvider log, CancellationToken exitToken) + private async Task OnNewPeerDoWorkAsync(ICacheNodeAdvertisment newPeer, ILogProvider log, CancellationToken exitToken) { _ = newPeer ?? throw new ArgumentNullException(nameof(newPeer)); //Setup client FBMClient client = new(ClientConfig); + //Add peer to monitor + PeerAdapter.OnPeerListenerAttached(newPeer); + try { log.Information("Establishing replication connection to peer {server}...", newPeer.NodeId); @@ -198,7 +201,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Distribution return; case "deleted": //Delete the object from the store - await CacheStore.DeleteItemAsync(changedObject.CurrentId); + await CacheStore.DeleteItemAsync(changedObject.CurrentId, CancellationToken.None); break; case "modified": //Reload the record from the store @@ -226,6 +229,7 @@ namespace VNLib.Data.Caching.ObjectCache.Server.Distribution //Check response code string status = response.Headers.First(static s => s.Header == HeaderCommand.Status).Value.ToString(); + if (ResponseCodes.Okay.Equals(status, StringComparison.Ordinal)) { //Update the record |