aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorLibravatar buttercat1791 <mjjurkoic@gmail.com>2024-05-07 09:07:25 -0500
committerLibravatar buttercat1791 <mjjurkoic@gmail.com>2024-05-07 09:07:25 -0500
commit14ba707c600f13012b3b7f441541f9a6db8ddb8a (patch)
treebe59ac35575f8a6e98e6af4473ba08493469fc38 /src
parent80885e4f6b83a3a63b9a74640a13a66cc27d7933 (diff)
Update and test methods for closing subscriptions
Diffstat (limited to 'src')
-rw-r--r--src/nostr_service.cpp177
1 files changed, 108 insertions, 69 deletions
diff --git a/src/nostr_service.cpp b/src/nostr_service.cpp
index 6ffb06d..5443aac 100644
--- a/src/nostr_service.cpp
+++ b/src/nostr_service.cpp
@@ -90,6 +90,7 @@ void NostrService::closeRelayConnections(RelayList relays)
});
disconnectionThreads.push_back(move(disconnectionThread));
+ // TODO: Close subscriptions before disconnecting.
lock_guard<mutex> lock(this->_propertyMutex);
this->_subscriptions.erase(relay);
}
@@ -155,7 +156,7 @@ tuple<RelayList, RelayList> NostrService::publishEvent(shared_ptr<Event> event)
if (!success)
{
- PLOG_WARNING << "Failed to send event to relay: " << relay;
+ PLOG_WARNING << "Failed to send event to relay " << relay;
publishPromise.set_value(make_tuple(relay, false));
}
}
@@ -244,13 +245,13 @@ vector<shared_ptr<Event>> NostrService::queryRelays(shared_ptr<Filters> filters)
if (success)
{
- PLOG_INFO << "Sent query to relay: " << relay;
+ PLOG_INFO << "Sent query to relay " << relay;
lock_guard<mutex> lock(this->_propertyMutex);
- this->_subscriptions[relay].push_back(subscriptionId);
+ this->_subscriptions[subscriptionId].push_back(relay);
}
else
{
- PLOG_WARNING << "Failed to send query to relay: " << relay;
+ PLOG_WARNING << "Failed to send query to relay " << relay;
eosePromise.set_value(make_tuple(uri, false));
}
}
@@ -261,16 +262,15 @@ vector<shared_ptr<Event>> NostrService::queryRelays(shared_ptr<Filters> filters)
auto [relay, isEose] = publishFuture.get();
if (isEose)
{
- PLOG_INFO << "Received EOSE message from relay: " << relay;
+ PLOG_INFO << "Received EOSE message from relay " << relay;
}
else
{
- PLOG_WARNING << "Received CLOSE message from relay: " << relay;
+ PLOG_WARNING << "Received CLOSE message from relay " << relay;
this->closeRelayConnections({ relay });
}
}
this->closeSubscription(subscriptionId);
- this->closeRelayConnections(this->_activeRelays);
// TODO: De-duplicate events in the vector before returning.
@@ -292,7 +292,7 @@ string NostrService::queryRelays(
for (const string relay : this->_activeRelays)
{
unique_lock<mutex> lock(this->_propertyMutex);
- this->_subscriptions[relay].push_back(subscriptionId);
+ this->_subscriptions[subscriptionId].push_back(relay);
lock.unlock();
future<tuple<string, bool>> requestFuture = async(
@@ -333,97 +333,123 @@ tuple<RelayList, RelayList> NostrService::closeSubscription(string subscriptionI
{
RelayList successfulRelays;
RelayList failedRelays;
+
+ vector<string> subscriptionRelays;
+ size_t subscriptionRelayCount;
vector<future<tuple<string, bool>>> closeFutures;
+
+ try
+ {
+ unique_lock<mutex> lock(this->_propertyMutex);
+ subscriptionRelays = this->_subscriptions.at(subscriptionId);
+ subscriptionRelayCount = subscriptionRelays.size();
+ lock.unlock();
+ }
+ catch (const out_of_range& oor)
+ {
+ PLOG_WARNING << "Subscription " << subscriptionId << " not found.";
+ return make_tuple(successfulRelays, failedRelays);
+ }
- for (const string relay : this->_activeRelays)
+ for (const string relay : subscriptionRelays)
{
- if (!this->hasSubscription(relay, subscriptionId))
- {
- continue;
- }
-
- string request = this->generateCloseRequest(subscriptionId);
- future<tuple<string, bool>> closeFuture = async([this, relay, request]()
+ future<tuple<string, bool>> closeFuture = async([this, subscriptionId, relay]()
{
- PLOG_INFO << "Sending " << request << " to relay " << relay;
- return this->_client->send(request, relay);
+ bool success = this->closeSubscription(subscriptionId, relay);
+
+ return make_tuple(relay, success);
});
closeFutures.push_back(move(closeFuture));
}
for (auto& closeFuture : closeFutures)
{
- auto [relay, isSuccess] = closeFuture.get();
- if (isSuccess)
+ auto [uri, success] = closeFuture.get();
+ if (success)
{
- successfulRelays.push_back(relay);
-
- lock_guard<mutex> lock(this->_propertyMutex);
- auto it = find(
- this->_subscriptions[relay].begin(),
- this->_subscriptions[relay].end(),
- subscriptionId);
- this->_subscriptions[relay].erase(it);
+ successfulRelays.push_back(uri);
}
else
{
- failedRelays.push_back(relay);
+ failedRelays.push_back(uri);
}
}
- size_t targetCount = this->_activeRelays.size();
size_t successfulCount = successfulRelays.size();
- PLOG_INFO << "Sent close request to " << successfulCount << "/" << targetCount << " open relay connections.";
+ PLOG_INFO << "Sent CLOSE request for subscription " << subscriptionId << " to " << successfulCount << "/" << subscriptionRelayCount << " open relay connections.";
+
+ // If there were no failures, and the subscription has been closed on all of its relays, forget
+ // about the subscription.
+ if (failedRelays.empty())
+ {
+ lock_guard<mutex> lock(this->_propertyMutex);
+ this->_subscriptions.erase(subscriptionId);
+ }
return make_tuple(successfulRelays, failedRelays);
};
-tuple<RelayList, RelayList> NostrService::closeSubscriptions()
+bool NostrService::closeSubscription(string subscriptionId, string relay)
{
- return this->closeSubscriptions(this->_activeRelays);
-};
+ if (!this->hasSubscription(subscriptionId, relay))
+ {
+ PLOG_WARNING << "Subscription " << subscriptionId << " not found on relay " << relay;
+ return false;
+ }
-tuple<RelayList, RelayList> NostrService::closeSubscriptions(RelayList relays)
-{
- RelayList successfulRelays;
- RelayList failedRelays;
+ if (!this->isConnected(relay))
+ {
+ PLOG_WARNING << "Relay " << relay << " is not connected.";
+ return false;
+ }
+
+ string request = this->generateCloseRequest(subscriptionId);
+ auto [uri, success] = this->_client->send(request, relay);
- vector<future<tuple<RelayList, RelayList>>> closeFutures;
- for (const string relay : relays)
+ if (success)
{
- future<tuple<RelayList, RelayList>> closeFuture = async([this, &relay]()
+ lock_guard<mutex> lock(this->_propertyMutex);
+ auto it = find(
+ this->_subscriptions[subscriptionId].begin(),
+ this->_subscriptions[subscriptionId].end(),
+ relay);
+
+ if (it != this->_subscriptions[subscriptionId].end())
{
- RelayList successfulRelays;
- RelayList failedRelays;
+ this->_subscriptions[subscriptionId].erase(it);
+ }
- unique_lock<mutex> lock(this->_propertyMutex);
- auto subscriptionIds = this->_subscriptions[relay];
- lock.unlock();
+ PLOG_INFO << "Sent close request for subscription " << subscriptionId << " to relay " << relay;
+ }
+ else
+ {
+ PLOG_WARNING << "Failed to send close request to relay " << relay;
+ }
- for (const string& subscriptionId : subscriptionIds)
- {
- auto [successes, failures] = this->closeSubscription(subscriptionId);
- successfulRelays.insert(successfulRelays.end(), successes.begin(), successes.end());
- failedRelays.insert(failedRelays.end(), failures.begin(), failures.end());
- }
+ return success;
+};
- return make_tuple(successfulRelays, failedRelays);
- });
- closeFutures.push_back(move(closeFuture));
+vector<string> NostrService::closeSubscriptions()
+{
+ unique_lock<mutex> lock(this->_propertyMutex);
+ vector<string> subscriptionIds;
+ for (auto& [subscriptionId, relays] : this->_subscriptions)
+ {
+ subscriptionIds.push_back(subscriptionId);
}
+ lock.unlock();
- for (auto& closeFuture : closeFutures)
+ vector<string> remainingSubscriptions;
+ for (const string& subscriptionId : subscriptionIds)
{
- auto [successes, failures] = closeFuture.get();
- successfulRelays.insert(successfulRelays.end(), successes.begin(), successes.end());
- failedRelays.insert(failedRelays.end(), failures.begin(), failures.end());
+ auto [successes, failures] = this->closeSubscription(subscriptionId);
+ if (!failures.empty())
+ {
+ remainingSubscriptions.push_back(subscriptionId);
+ }
}
- size_t targetCount = relays.size();
- size_t successfulCount = successfulRelays.size();
- PLOG_INFO << "Sent close requests to " << successfulCount << "/" << targetCount << " open relay connections.";
-
- return make_tuple(successfulRelays, failedRelays);
+ return remainingSubscriptions;
};
RelayList NostrService::getConnectedRelays(RelayList relays)
@@ -544,15 +570,28 @@ string NostrService::generateCloseRequest(string subscriptionId)
return jarr.dump();
};
-bool NostrService::hasSubscription(string relay, string subscriptionId)
+bool NostrService::hasSubscription(string subscriptionId)
+{
+ lock_guard<mutex> lock(this->_propertyMutex);
+ auto it = this->_subscriptions.find(subscriptionId);
+
+ return it != this->_subscriptions.end();
+};
+
+bool NostrService::hasSubscription(string subscriptionId, string relay)
{
lock_guard<mutex> lock(this->_propertyMutex);
- auto it = find(this->_subscriptions[relay].begin(), this->_subscriptions[relay].end(), subscriptionId);
- if (it != this->_subscriptions[relay].end()) // If the subscription is in this->_subscriptions[relay]
+ auto subscriptionIt = this->_subscriptions.find(subscriptionId);
+
+ if (subscriptionIt == this->_subscriptions.end())
{
- return true;
+ return false;
}
- return false;
+
+ auto relays = this->_subscriptions[subscriptionId];
+ auto relayIt = find(relays.begin(), relays.end(), relay);
+
+ return relayIt != relays.end();
};
void NostrService::onSubscriptionMessage(