diff options
Diffstat (limited to 'src/nostr_service.cpp')
-rw-r--r-- | src/nostr_service.cpp | 177 |
1 files changed, 108 insertions, 69 deletions
diff --git a/src/nostr_service.cpp b/src/nostr_service.cpp index 6ffb06d..5443aac 100644 --- a/src/nostr_service.cpp +++ b/src/nostr_service.cpp @@ -90,6 +90,7 @@ void NostrService::closeRelayConnections(RelayList relays) }); disconnectionThreads.push_back(move(disconnectionThread)); + // TODO: Close subscriptions before disconnecting. lock_guard<mutex> lock(this->_propertyMutex); this->_subscriptions.erase(relay); } @@ -155,7 +156,7 @@ tuple<RelayList, RelayList> NostrService::publishEvent(shared_ptr<Event> event) if (!success) { - PLOG_WARNING << "Failed to send event to relay: " << relay; + PLOG_WARNING << "Failed to send event to relay " << relay; publishPromise.set_value(make_tuple(relay, false)); } } @@ -244,13 +245,13 @@ vector<shared_ptr<Event>> NostrService::queryRelays(shared_ptr<Filters> filters) if (success) { - PLOG_INFO << "Sent query to relay: " << relay; + PLOG_INFO << "Sent query to relay " << relay; lock_guard<mutex> lock(this->_propertyMutex); - this->_subscriptions[relay].push_back(subscriptionId); + this->_subscriptions[subscriptionId].push_back(relay); } else { - PLOG_WARNING << "Failed to send query to relay: " << relay; + PLOG_WARNING << "Failed to send query to relay " << relay; eosePromise.set_value(make_tuple(uri, false)); } } @@ -261,16 +262,15 @@ vector<shared_ptr<Event>> NostrService::queryRelays(shared_ptr<Filters> filters) auto [relay, isEose] = publishFuture.get(); if (isEose) { - PLOG_INFO << "Received EOSE message from relay: " << relay; + PLOG_INFO << "Received EOSE message from relay " << relay; } else { - PLOG_WARNING << "Received CLOSE message from relay: " << relay; + PLOG_WARNING << "Received CLOSE message from relay " << relay; this->closeRelayConnections({ relay }); } } this->closeSubscription(subscriptionId); - this->closeRelayConnections(this->_activeRelays); // TODO: De-duplicate events in the vector before returning. @@ -292,7 +292,7 @@ string NostrService::queryRelays( for (const string relay : this->_activeRelays) { unique_lock<mutex> lock(this->_propertyMutex); - this->_subscriptions[relay].push_back(subscriptionId); + this->_subscriptions[subscriptionId].push_back(relay); lock.unlock(); future<tuple<string, bool>> requestFuture = async( @@ -333,97 +333,123 @@ tuple<RelayList, RelayList> NostrService::closeSubscription(string subscriptionI { RelayList successfulRelays; RelayList failedRelays; + + vector<string> subscriptionRelays; + size_t subscriptionRelayCount; vector<future<tuple<string, bool>>> closeFutures; + + try + { + unique_lock<mutex> lock(this->_propertyMutex); + subscriptionRelays = this->_subscriptions.at(subscriptionId); + subscriptionRelayCount = subscriptionRelays.size(); + lock.unlock(); + } + catch (const out_of_range& oor) + { + PLOG_WARNING << "Subscription " << subscriptionId << " not found."; + return make_tuple(successfulRelays, failedRelays); + } - for (const string relay : this->_activeRelays) + for (const string relay : subscriptionRelays) { - if (!this->hasSubscription(relay, subscriptionId)) - { - continue; - } - - string request = this->generateCloseRequest(subscriptionId); - future<tuple<string, bool>> closeFuture = async([this, relay, request]() + future<tuple<string, bool>> closeFuture = async([this, subscriptionId, relay]() { - PLOG_INFO << "Sending " << request << " to relay " << relay; - return this->_client->send(request, relay); + bool success = this->closeSubscription(subscriptionId, relay); + + return make_tuple(relay, success); }); closeFutures.push_back(move(closeFuture)); } for (auto& closeFuture : closeFutures) { - auto [relay, isSuccess] = closeFuture.get(); - if (isSuccess) + auto [uri, success] = closeFuture.get(); + if (success) { - successfulRelays.push_back(relay); - - lock_guard<mutex> lock(this->_propertyMutex); - auto it = find( - this->_subscriptions[relay].begin(), - this->_subscriptions[relay].end(), - subscriptionId); - this->_subscriptions[relay].erase(it); + successfulRelays.push_back(uri); } else { - failedRelays.push_back(relay); + failedRelays.push_back(uri); } } - size_t targetCount = this->_activeRelays.size(); size_t successfulCount = successfulRelays.size(); - PLOG_INFO << "Sent close request to " << successfulCount << "/" << targetCount << " open relay connections."; + PLOG_INFO << "Sent CLOSE request for subscription " << subscriptionId << " to " << successfulCount << "/" << subscriptionRelayCount << " open relay connections."; + + // If there were no failures, and the subscription has been closed on all of its relays, forget + // about the subscription. + if (failedRelays.empty()) + { + lock_guard<mutex> lock(this->_propertyMutex); + this->_subscriptions.erase(subscriptionId); + } return make_tuple(successfulRelays, failedRelays); }; -tuple<RelayList, RelayList> NostrService::closeSubscriptions() +bool NostrService::closeSubscription(string subscriptionId, string relay) { - return this->closeSubscriptions(this->_activeRelays); -}; + if (!this->hasSubscription(subscriptionId, relay)) + { + PLOG_WARNING << "Subscription " << subscriptionId << " not found on relay " << relay; + return false; + } -tuple<RelayList, RelayList> NostrService::closeSubscriptions(RelayList relays) -{ - RelayList successfulRelays; - RelayList failedRelays; + if (!this->isConnected(relay)) + { + PLOG_WARNING << "Relay " << relay << " is not connected."; + return false; + } + + string request = this->generateCloseRequest(subscriptionId); + auto [uri, success] = this->_client->send(request, relay); - vector<future<tuple<RelayList, RelayList>>> closeFutures; - for (const string relay : relays) + if (success) { - future<tuple<RelayList, RelayList>> closeFuture = async([this, &relay]() + lock_guard<mutex> lock(this->_propertyMutex); + auto it = find( + this->_subscriptions[subscriptionId].begin(), + this->_subscriptions[subscriptionId].end(), + relay); + + if (it != this->_subscriptions[subscriptionId].end()) { - RelayList successfulRelays; - RelayList failedRelays; + this->_subscriptions[subscriptionId].erase(it); + } - unique_lock<mutex> lock(this->_propertyMutex); - auto subscriptionIds = this->_subscriptions[relay]; - lock.unlock(); + PLOG_INFO << "Sent close request for subscription " << subscriptionId << " to relay " << relay; + } + else + { + PLOG_WARNING << "Failed to send close request to relay " << relay; + } - for (const string& subscriptionId : subscriptionIds) - { - auto [successes, failures] = this->closeSubscription(subscriptionId); - successfulRelays.insert(successfulRelays.end(), successes.begin(), successes.end()); - failedRelays.insert(failedRelays.end(), failures.begin(), failures.end()); - } + return success; +}; - return make_tuple(successfulRelays, failedRelays); - }); - closeFutures.push_back(move(closeFuture)); +vector<string> NostrService::closeSubscriptions() +{ + unique_lock<mutex> lock(this->_propertyMutex); + vector<string> subscriptionIds; + for (auto& [subscriptionId, relays] : this->_subscriptions) + { + subscriptionIds.push_back(subscriptionId); } + lock.unlock(); - for (auto& closeFuture : closeFutures) + vector<string> remainingSubscriptions; + for (const string& subscriptionId : subscriptionIds) { - auto [successes, failures] = closeFuture.get(); - successfulRelays.insert(successfulRelays.end(), successes.begin(), successes.end()); - failedRelays.insert(failedRelays.end(), failures.begin(), failures.end()); + auto [successes, failures] = this->closeSubscription(subscriptionId); + if (!failures.empty()) + { + remainingSubscriptions.push_back(subscriptionId); + } } - size_t targetCount = relays.size(); - size_t successfulCount = successfulRelays.size(); - PLOG_INFO << "Sent close requests to " << successfulCount << "/" << targetCount << " open relay connections."; - - return make_tuple(successfulRelays, failedRelays); + return remainingSubscriptions; }; RelayList NostrService::getConnectedRelays(RelayList relays) @@ -544,15 +570,28 @@ string NostrService::generateCloseRequest(string subscriptionId) return jarr.dump(); }; -bool NostrService::hasSubscription(string relay, string subscriptionId) +bool NostrService::hasSubscription(string subscriptionId) +{ + lock_guard<mutex> lock(this->_propertyMutex); + auto it = this->_subscriptions.find(subscriptionId); + + return it != this->_subscriptions.end(); +}; + +bool NostrService::hasSubscription(string subscriptionId, string relay) { lock_guard<mutex> lock(this->_propertyMutex); - auto it = find(this->_subscriptions[relay].begin(), this->_subscriptions[relay].end(), subscriptionId); - if (it != this->_subscriptions[relay].end()) // If the subscription is in this->_subscriptions[relay] + auto subscriptionIt = this->_subscriptions.find(subscriptionId); + + if (subscriptionIt == this->_subscriptions.end()) { - return true; + return false; } - return false; + + auto relays = this->_subscriptions[subscriptionId]; + auto relayIt = find(relays.begin(), relays.end(), relay); + + return relayIt != relays.end(); }; void NostrService::onSubscriptionMessage( |