From fea9005732607ee58a4bcb113b1805028954498a Mon Sep 17 00:00:00 2001 From: Michael Jurkoic Date: Sun, 17 Mar 2024 18:32:45 -0500 Subject: Add service methods to close filter subscriptions --- include/nostr.hpp | 32 ++++++++++++++++++++++++-- src/nostr_service.cpp | 64 ++++++++++++++++++++++++++++++++++++++++++++++++--- 2 files changed, 91 insertions(+), 5 deletions(-) diff --git a/include/nostr.hpp b/include/nostr.hpp index 22d9956..c410046 100644 --- a/include/nostr.hpp +++ b/include/nostr.hpp @@ -151,13 +151,28 @@ public: std::tuple queryRelays(Filters filters); // TODO: Write a method that receives events for an active subscription. - // TODO: Write a method that closes active subscriptions. + + /** + * @brief Closes the subscription with the given ID on all open relay connections. + * @returns A tuple of `RelayList` objects, of the form ``, indicating + * to which relays the message was sent successfully, and which relays failed to receive the + * message. + */ + std::tuple closeSubscription(std::string subscriptionId); + + /** + * @brief Closes all open subscriptions on the given relays. + * @returns A tuple of `RelayList` objects, of the form ``, indicating + * to which relays the message was sent successfully, and which relays failed to receive the + * message. + */ + std::tuple closeSubscriptions(RelayList relays); private: std::mutex _propertyMutex; RelayList _defaultRelays; RelayList _activeRelays; - std::unordered_map> _subscriptionIds; + std::unordered_map> _subscriptions; client::IWebSocketClient* _client; /** @@ -198,5 +213,18 @@ private: * @returns A stringified UUID. */ std::string generateSubscriptionId(); + + /** + * @brief Generates a message requesting a relay to close the subscription with the given ID. + * @returns A stringified JSON object representing the close request. + */ + std::string generateCloseRequest(std::string subscriptionId); + + /** + * @brief Indicates whether the connection to the given relay has a subscription with the given + * ID. + * @returns True if the relay has the subscription, false otherwise. + */ + bool hasSubscription(std::string relay, std::string subscriptionId); }; } // namespace nostr diff --git a/src/nostr_service.cpp b/src/nostr_service.cpp index 3025b96..e3b1f19 100644 --- a/src/nostr_service.cpp +++ b/src/nostr_service.cpp @@ -1,6 +1,7 @@ #include #include #include +#include #include #include #include @@ -9,10 +10,11 @@ #include "nostr.hpp" #include "client/web_socket_client.hpp" -using std::async; using boost::uuids::random_generator; using boost::uuids::to_string; using boost::uuids::uuid; +using nlohmann::json; +using std::async; using std::future; using std::lock_guard; using std::make_tuple; @@ -154,7 +156,7 @@ tuple NostrService::queryRelays(Filters filters) for (const string relay : this->_activeRelays) { string subscriptionId = this->generateSubscriptionId(); - this->_subscriptionIds[relay].push_back(subscriptionId); + this->_subscriptions[relay].push_back(subscriptionId); string request = filters.serialize(subscriptionId); future> requestFuture = async([this, &relay, &request]() { @@ -178,7 +180,47 @@ tuple NostrService::queryRelays(Filters filters) size_t targetCount = this->_activeRelays.size(); size_t successfulCount = successfulRelays.size(); - PLOG_INFO << "Published event to " << successfulCount << "/" << targetCount << " target relays."; + PLOG_INFO << "Sent query to " << successfulCount << "/" << targetCount << " open relay connections."; + + return make_tuple(successfulRelays, failedRelays); +}; + +tuple NostrService::closeSubscription(string subscriptionId) +{ + RelayList successfulRelays; + RelayList failedRelays; + + vector>> closeFutures; + for (const string relay : this->_activeRelays) + { + if (!this->hasSubscription(relay, subscriptionId)) + { + continue; + } + + string request = this->generateCloseRequest(subscriptionId); + future> closeFuture = async([this, &relay, &request]() { + return this->_client->send(request, relay); + }); + closeFutures.push_back(move(closeFuture)); + } + + for (auto& closeFuture : closeFutures) + { + auto [relay, isSuccess] = closeFuture.get(); + if (isSuccess) + { + successfulRelays.push_back(relay); + } + else + { + failedRelays.push_back(relay); + } + } + + size_t targetCount = this->_activeRelays.size(); + size_t successfulCount = successfulRelays.size(); + PLOG_INFO << "Sent close request to " << successfulCount << "/" << targetCount << " open relay connections."; return make_tuple(successfulRelays, failedRelays); }; @@ -293,4 +335,20 @@ string NostrService::generateSubscriptionId() uuid uuid = random_generator()(); return to_string(uuid); }; + +string NostrService::generateCloseRequest(string subscriptionId) +{ + json jarr = json::array({ "CLOSE", subscriptionId }); + return jarr.dump(); +}; + +bool NostrService::hasSubscription(string relay, string subscriptionId) +{ + auto it = find(this->_subscriptions[relay].begin(), this->_subscriptions[relay].end(), subscriptionId); + if (it != this->_subscriptions[relay].end()) // If the subscription is in this->_subscriptions[relay] + { + return true; + } + return false; +}; } // namespace nostr -- cgit