aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLibravatar buttercat1791 <mjjurkoic@gmail.com>2024-05-07 09:07:25 -0500
committerLibravatar buttercat1791 <mjjurkoic@gmail.com>2024-05-07 09:07:25 -0500
commit14ba707c600f13012b3b7f441541f9a6db8ddb8a (patch)
treebe59ac35575f8a6e98e6af4473ba08493469fc38
parent80885e4f6b83a3a63b9a74640a13a66cc27d7933 (diff)
Update and test methods for closing subscriptions
-rw-r--r--include/nostr.hpp36
-rw-r--r--src/nostr_service.cpp177
-rw-r--r--test/nostr_service_test.cpp195
3 files changed, 307 insertions, 101 deletions
diff --git a/include/nostr.hpp b/include/nostr.hpp
index 326a637..e76d1e5 100644
--- a/include/nostr.hpp
+++ b/include/nostr.hpp
@@ -218,20 +218,24 @@ public:
std::tuple<RelayList, RelayList> closeSubscription(std::string subscriptionId);
/**
+ * @brief Closes the subscription with the given ID on the given relay.
+ * @returns True if the relay received the CLOSE message, false otherwise.
+ * @remark If the subscription does not exist on the given relay, or if the relay is not
+ * connected, the method will do nothing and return false.
+ */
+ bool closeSubscription(std::string subscriptionId, std::string relay);
+
+ /**
* @brief Closes all open subscriptions on all open relay connections.
- * @returns A tuple of `RelayList` objects, of the form `<successes, failures>`, indicating
- * to which relays the message was sent successfully, and which relays failed to receive the
- * message.
+ * @returns A list of any subscription IDs that failed to close.
*/
- std::tuple<RelayList, RelayList> closeSubscriptions();
+ std::vector<std::string> closeSubscriptions();
/**
* @brief Closes all open subscriptions on the given relays.
- * @returns A tuple of `RelayList` objects, of the form `<successes, failures>`, indicating
- * to which relays the message was sent successfully, and which relays failed to receive the
- * message.
+ * @returns A list of any subscription IDs that failed to close.
*/
- std::tuple<RelayList, RelayList> closeSubscriptions(RelayList relays);
+ std::vector<std::string> closeSubscriptions(RelayList relays);
private:
///< The maximum number of events the service will store for each subscription.
@@ -248,7 +252,7 @@ private:
RelayList _defaultRelays;
///< The set of Nostr relays to which the service is currently connected.
RelayList _activeRelays;
- ///< A map from relay URIs to the subscription IDs open on each relay.
+ ///< A map from subscription IDs to the relays on which each subscription is open.
std::unordered_map<std::string, std::vector<std::string>> _subscriptions;
/**
@@ -297,11 +301,17 @@ private:
std::string generateCloseRequest(std::string subscriptionId);
/**
- * @brief Indicates whether the connection to the given relay has a subscription with the given
- * ID.
- * @returns True if the relay has the subscription, false otherwise.
+ * @brief Indicates whether the the service has an open subscription with the given ID.
+ * @returns True if the service has the subscription, false otherwise.
+ */
+ bool hasSubscription(std::string subscriptionId);
+
+ /**
+ * @brief Indicates whether the service has an open subscription with the given ID on the given
+ * relay.
+ * @returns True if the subscription exists on the relay, false otherwise.
*/
- bool hasSubscription(std::string relay, std::string subscriptionId);
+ bool hasSubscription(std::string subscriptionId, std::string relay);
/**
* @brief Parses EVENT messages received from the relay and invokes the given event handler.
diff --git a/src/nostr_service.cpp b/src/nostr_service.cpp
index 6ffb06d..5443aac 100644
--- a/src/nostr_service.cpp
+++ b/src/nostr_service.cpp
@@ -90,6 +90,7 @@ void NostrService::closeRelayConnections(RelayList relays)
});
disconnectionThreads.push_back(move(disconnectionThread));
+ // TODO: Close subscriptions before disconnecting.
lock_guard<mutex> lock(this->_propertyMutex);
this->_subscriptions.erase(relay);
}
@@ -155,7 +156,7 @@ tuple<RelayList, RelayList> NostrService::publishEvent(shared_ptr<Event> event)
if (!success)
{
- PLOG_WARNING << "Failed to send event to relay: " << relay;
+ PLOG_WARNING << "Failed to send event to relay " << relay;
publishPromise.set_value(make_tuple(relay, false));
}
}
@@ -244,13 +245,13 @@ vector<shared_ptr<Event>> NostrService::queryRelays(shared_ptr<Filters> filters)
if (success)
{
- PLOG_INFO << "Sent query to relay: " << relay;
+ PLOG_INFO << "Sent query to relay " << relay;
lock_guard<mutex> lock(this->_propertyMutex);
- this->_subscriptions[relay].push_back(subscriptionId);
+ this->_subscriptions[subscriptionId].push_back(relay);
}
else
{
- PLOG_WARNING << "Failed to send query to relay: " << relay;
+ PLOG_WARNING << "Failed to send query to relay " << relay;
eosePromise.set_value(make_tuple(uri, false));
}
}
@@ -261,16 +262,15 @@ vector<shared_ptr<Event>> NostrService::queryRelays(shared_ptr<Filters> filters)
auto [relay, isEose] = publishFuture.get();
if (isEose)
{
- PLOG_INFO << "Received EOSE message from relay: " << relay;
+ PLOG_INFO << "Received EOSE message from relay " << relay;
}
else
{
- PLOG_WARNING << "Received CLOSE message from relay: " << relay;
+ PLOG_WARNING << "Received CLOSE message from relay " << relay;
this->closeRelayConnections({ relay });
}
}
this->closeSubscription(subscriptionId);
- this->closeRelayConnections(this->_activeRelays);
// TODO: De-duplicate events in the vector before returning.
@@ -292,7 +292,7 @@ string NostrService::queryRelays(
for (const string relay : this->_activeRelays)
{
unique_lock<mutex> lock(this->_propertyMutex);
- this->_subscriptions[relay].push_back(subscriptionId);
+ this->_subscriptions[subscriptionId].push_back(relay);
lock.unlock();
future<tuple<string, bool>> requestFuture = async(
@@ -333,97 +333,123 @@ tuple<RelayList, RelayList> NostrService::closeSubscription(string subscriptionI
{
RelayList successfulRelays;
RelayList failedRelays;
+
+ vector<string> subscriptionRelays;
+ size_t subscriptionRelayCount;
vector<future<tuple<string, bool>>> closeFutures;
+
+ try
+ {
+ unique_lock<mutex> lock(this->_propertyMutex);
+ subscriptionRelays = this->_subscriptions.at(subscriptionId);
+ subscriptionRelayCount = subscriptionRelays.size();
+ lock.unlock();
+ }
+ catch (const out_of_range& oor)
+ {
+ PLOG_WARNING << "Subscription " << subscriptionId << " not found.";
+ return make_tuple(successfulRelays, failedRelays);
+ }
- for (const string relay : this->_activeRelays)
+ for (const string relay : subscriptionRelays)
{
- if (!this->hasSubscription(relay, subscriptionId))
- {
- continue;
- }
-
- string request = this->generateCloseRequest(subscriptionId);
- future<tuple<string, bool>> closeFuture = async([this, relay, request]()
+ future<tuple<string, bool>> closeFuture = async([this, subscriptionId, relay]()
{
- PLOG_INFO << "Sending " << request << " to relay " << relay;
- return this->_client->send(request, relay);
+ bool success = this->closeSubscription(subscriptionId, relay);
+
+ return make_tuple(relay, success);
});
closeFutures.push_back(move(closeFuture));
}
for (auto& closeFuture : closeFutures)
{
- auto [relay, isSuccess] = closeFuture.get();
- if (isSuccess)
+ auto [uri, success] = closeFuture.get();
+ if (success)
{
- successfulRelays.push_back(relay);
-
- lock_guard<mutex> lock(this->_propertyMutex);
- auto it = find(
- this->_subscriptions[relay].begin(),
- this->_subscriptions[relay].end(),
- subscriptionId);
- this->_subscriptions[relay].erase(it);
+ successfulRelays.push_back(uri);
}
else
{
- failedRelays.push_back(relay);
+ failedRelays.push_back(uri);
}
}
- size_t targetCount = this->_activeRelays.size();
size_t successfulCount = successfulRelays.size();
- PLOG_INFO << "Sent close request to " << successfulCount << "/" << targetCount << " open relay connections.";
+ PLOG_INFO << "Sent CLOSE request for subscription " << subscriptionId << " to " << successfulCount << "/" << subscriptionRelayCount << " open relay connections.";
+
+ // If there were no failures, and the subscription has been closed on all of its relays, forget
+ // about the subscription.
+ if (failedRelays.empty())
+ {
+ lock_guard<mutex> lock(this->_propertyMutex);
+ this->_subscriptions.erase(subscriptionId);
+ }
return make_tuple(successfulRelays, failedRelays);
};
-tuple<RelayList, RelayList> NostrService::closeSubscriptions()
+bool NostrService::closeSubscription(string subscriptionId, string relay)
{
- return this->closeSubscriptions(this->_activeRelays);
-};
+ if (!this->hasSubscription(subscriptionId, relay))
+ {
+ PLOG_WARNING << "Subscription " << subscriptionId << " not found on relay " << relay;
+ return false;
+ }
-tuple<RelayList, RelayList> NostrService::closeSubscriptions(RelayList relays)
-{
- RelayList successfulRelays;
- RelayList failedRelays;
+ if (!this->isConnected(relay))
+ {
+ PLOG_WARNING << "Relay " << relay << " is not connected.";
+ return false;
+ }
+
+ string request = this->generateCloseRequest(subscriptionId);
+ auto [uri, success] = this->_client->send(request, relay);
- vector<future<tuple<RelayList, RelayList>>> closeFutures;
- for (const string relay : relays)
+ if (success)
{
- future<tuple<RelayList, RelayList>> closeFuture = async([this, &relay]()
+ lock_guard<mutex> lock(this->_propertyMutex);
+ auto it = find(
+ this->_subscriptions[subscriptionId].begin(),
+ this->_subscriptions[subscriptionId].end(),
+ relay);
+
+ if (it != this->_subscriptions[subscriptionId].end())
{
- RelayList successfulRelays;
- RelayList failedRelays;
+ this->_subscriptions[subscriptionId].erase(it);
+ }
- unique_lock<mutex> lock(this->_propertyMutex);
- auto subscriptionIds = this->_subscriptions[relay];
- lock.unlock();
+ PLOG_INFO << "Sent close request for subscription " << subscriptionId << " to relay " << relay;
+ }
+ else
+ {
+ PLOG_WARNING << "Failed to send close request to relay " << relay;
+ }
- for (const string& subscriptionId : subscriptionIds)
- {
- auto [successes, failures] = this->closeSubscription(subscriptionId);
- successfulRelays.insert(successfulRelays.end(), successes.begin(), successes.end());
- failedRelays.insert(failedRelays.end(), failures.begin(), failures.end());
- }
+ return success;
+};
- return make_tuple(successfulRelays, failedRelays);
- });
- closeFutures.push_back(move(closeFuture));
+vector<string> NostrService::closeSubscriptions()
+{
+ unique_lock<mutex> lock(this->_propertyMutex);
+ vector<string> subscriptionIds;
+ for (auto& [subscriptionId, relays] : this->_subscriptions)
+ {
+ subscriptionIds.push_back(subscriptionId);
}
+ lock.unlock();
- for (auto& closeFuture : closeFutures)
+ vector<string> remainingSubscriptions;
+ for (const string& subscriptionId : subscriptionIds)
{
- auto [successes, failures] = closeFuture.get();
- successfulRelays.insert(successfulRelays.end(), successes.begin(), successes.end());
- failedRelays.insert(failedRelays.end(), failures.begin(), failures.end());
+ auto [successes, failures] = this->closeSubscription(subscriptionId);
+ if (!failures.empty())
+ {
+ remainingSubscriptions.push_back(subscriptionId);
+ }
}
- size_t targetCount = relays.size();
- size_t successfulCount = successfulRelays.size();
- PLOG_INFO << "Sent close requests to " << successfulCount << "/" << targetCount << " open relay connections.";
-
- return make_tuple(successfulRelays, failedRelays);
+ return remainingSubscriptions;
};
RelayList NostrService::getConnectedRelays(RelayList relays)
@@ -544,15 +570,28 @@ string NostrService::generateCloseRequest(string subscriptionId)
return jarr.dump();
};
-bool NostrService::hasSubscription(string relay, string subscriptionId)
+bool NostrService::hasSubscription(string subscriptionId)
+{
+ lock_guard<mutex> lock(this->_propertyMutex);
+ auto it = this->_subscriptions.find(subscriptionId);
+
+ return it != this->_subscriptions.end();
+};
+
+bool NostrService::hasSubscription(string subscriptionId, string relay)
{
lock_guard<mutex> lock(this->_propertyMutex);
- auto it = find(this->_subscriptions[relay].begin(), this->_subscriptions[relay].end(), subscriptionId);
- if (it != this->_subscriptions[relay].end()) // If the subscription is in this->_subscriptions[relay]
+ auto subscriptionIt = this->_subscriptions.find(subscriptionId);
+
+ if (subscriptionIt == this->_subscriptions.end())
{
- return true;
+ return false;
}
- return false;
+
+ auto relays = this->_subscriptions[subscriptionId];
+ auto relayIt = find(relays.begin(), relays.end(), relay);
+
+ return relayIt != relays.end();
};
void NostrService::onSubscriptionMessage(
diff --git a/test/nostr_service_test.cpp b/test/nostr_service_test.cpp
index 460be73..0f0d439 100644
--- a/test/nostr_service_test.cpp
+++ b/test/nostr_service_test.cpp
@@ -804,6 +804,9 @@ TEST_F(NostrServiceTest, QueryRelays_ReturnsEvents_UpToEOSE)
}),
signedTestEvents.end());
}
+
+ auto subscriptions = nostrService->subscriptions();
+ ASSERT_TRUE(subscriptions.empty());
};
TEST_F(NostrServiceTest, QueryRelays_CallsHandler_WithReturnedEvents)
@@ -907,20 +910,8 @@ TEST_F(NostrServiceTest, QueryRelays_CallsHandler_WithReturnedEvents)
// Check that the service is keeping track of its active subscriptions.
auto subscriptions = nostrService->subscriptions();
- for (string uri : nostrService->activeRelays())
- {
- ASSERT_NO_THROW(subscriptions.at(uri));
- ASSERT_EQ(subscriptions.at(uri).size(), 1);
- ASSERT_NE(
- find_if(
- subscriptions[uri].begin(),
- subscriptions[uri].end(),
- [&generatedSubscriptionId](const string& subscriptionId)
- {
- return subscriptionId == generatedSubscriptionId;
- }),
- subscriptions[uri].end());
- }
+ ASSERT_NO_THROW(subscriptions.at(generatedSubscriptionId));
+ ASSERT_EQ(subscriptions.at(generatedSubscriptionId).size(), 2);
EXPECT_CALL(*mockClient, send(HasSubstr("CLOSE"), _))
.Times(2)
@@ -929,15 +920,181 @@ TEST_F(NostrServiceTest, QueryRelays_CallsHandler_WithReturnedEvents)
return make_tuple(uri, true);
}));
- nostrService->closeSubscription(generatedSubscriptionId);
+ auto [successes, failures] = nostrService->closeSubscription(generatedSubscriptionId);
+
+ ASSERT_TRUE(failures.empty());
// Check that the service has forgotten about the subscriptions after closing them.
subscriptions = nostrService->subscriptions();
- for (string uri : nostrService->activeRelays())
+ ASSERT_TRUE(subscriptions.empty());
+};
+
+TEST_F(NostrServiceTest, Service_MaintainsMultipleSubscriptions_ThenClosesAll)
+{
+ // Mock connections.
+ mutex connectionStatusMutex;
+ auto connectionStatus = make_shared<unordered_map<string, bool>>();
+ vector<string> testRelays = { "wss://theforest.nostr1.com" };
+ connectionStatus->insert({ testRelays[0], false });
+
+ EXPECT_CALL(*mockClient, isConnected(_))
+ .WillRepeatedly(Invoke([connectionStatus, &connectionStatusMutex](string uri)
+ {
+ lock_guard<mutex> lock(connectionStatusMutex);
+ bool status = connectionStatus->at(uri);
+ if (status == false)
+ {
+ connectionStatus->at(uri) = true;
+ }
+ return status;
+ }));
+
+ auto signer = make_unique<FakeSigner>();
+ auto nostrService = make_unique<nostr::NostrService>(
+ testAppender,
+ mockClient,
+ fakeSigner,
+ testRelays);
+ nostrService->openRelayConnections();
+
+ // Mock relay responses.
+ auto testEvents = getMultipleTextNoteTestEvents();
+ vector<shared_ptr<nostr::Event>> signedTestEvents;
+ for (nostr::Event testEvent : testEvents)
{
- ASSERT_EQ(subscriptions.at(uri).size(), 0);
+ auto signedEvent = make_shared<nostr::Event>(testEvent);
+ signer->sign(signedEvent);
+
+ auto serializedEvent = signedEvent->serialize();
+ auto deserializedEvent = nostr::Event::fromString(serializedEvent);
+
+ signedEvent = make_shared<nostr::Event>(deserializedEvent);
+ signedTestEvents.push_back(signedEvent);
}
-};
-// TODO: Add unit tests for closing subscriptions.
+ vector<string> subscriptionIds;
+ EXPECT_CALL(*mockClient, send(HasSubstr("REQ"), _, _))
+ .Times(2)
+ .WillOnce(Invoke([&testEvents, &signer, &subscriptionIds](
+ string message,
+ string uri,
+ function<void(const string&)> messageHandler)
+ {
+ json messageArr = json::parse(message);
+ subscriptionIds.push_back(messageArr.at(1));
+
+ for (auto event : testEvents)
+ {
+ auto sendableEvent = make_shared<nostr::Event>(event);
+ signer->sign(sendableEvent);
+ json jarr = json::array({ "EVENT", subscriptionIds.at(0), sendableEvent->serialize() });
+ messageHandler(jarr.dump());
+ }
+
+ json jarr = json::array({ "EOSE", subscriptionIds.at(0), });
+ messageHandler(jarr.dump());
+
+ return make_tuple(uri, true);
+ }))
+ .WillOnce(Invoke([&testEvents, &signer, &subscriptionIds](
+ string message,
+ string uri,
+ function<void(const string&)> messageHandler)
+ {
+ json messageArr = json::parse(message);
+ subscriptionIds.push_back(messageArr.at(1));
+
+ for (auto event : testEvents)
+ {
+ auto sendableEvent = make_shared<nostr::Event>(event);
+ signer->sign(sendableEvent);
+ json jarr = json::array({ "EVENT", subscriptionIds.at(1), sendableEvent->serialize() });
+ messageHandler(jarr.dump());
+ }
+
+ json jarr = json::array({ "EOSE", subscriptionIds.at(1), });
+ messageHandler(jarr.dump());
+
+ return make_tuple(uri, true);
+ }));
+
+ // Send queries.
+ auto shortFormFilters = make_shared<nostr::Filters>(getKind0And1TestFilters());
+ auto longFormFilters = make_shared<nostr::Filters>(getKind30023TestFilters());
+ promise<void> shortFormPromise;
+ promise<void> longFormPromise;
+ auto shortFormFuture = shortFormPromise.get_future();
+ auto longFormFuture = longFormPromise.get_future();
+
+ string shortFormSubscriptionId = nostrService->queryRelays(
+ shortFormFilters,
+ [&shortFormSubscriptionId, &signedTestEvents](const string& subscriptionId, shared_ptr<nostr::Event> event)
+ {
+ ASSERT_STREQ(subscriptionId.c_str(), shortFormSubscriptionId.c_str());
+ ASSERT_NE(
+ find_if(
+ signedTestEvents.begin(),
+ signedTestEvents.end(),
+ [&event](shared_ptr<nostr::Event> testEvent)
+ {
+ return *testEvent == *event;
+ }),
+ signedTestEvents.end());
+ },
+ [&shortFormSubscriptionId, &shortFormPromise]
+ (const string& subscriptionId)
+ {
+ ASSERT_STREQ(subscriptionId.c_str(), shortFormSubscriptionId.c_str());
+ shortFormPromise.set_value();
+ },
+ [](const string&, const string&) {});
+ string longFormSubscriptionId = nostrService->queryRelays(
+ shortFormFilters,
+ [&longFormSubscriptionId, &signedTestEvents](const string& subscriptionId, shared_ptr<nostr::Event> event)
+ {
+ ASSERT_STREQ(subscriptionId.c_str(), longFormSubscriptionId.c_str());
+ ASSERT_NE(
+ find_if(
+ signedTestEvents.begin(),
+ signedTestEvents.end(),
+ [&event](shared_ptr<nostr::Event> testEvent)
+ {
+ return *testEvent == *event;
+ }),
+ signedTestEvents.end());
+ },
+ [&longFormSubscriptionId, &longFormPromise]
+ (const string& subscriptionId)
+ {
+ ASSERT_STREQ(subscriptionId.c_str(), longFormSubscriptionId.c_str());
+ longFormPromise.set_value();
+ },
+ [](const string&, const string&) {});
+
+ shortFormFuture.wait();
+ longFormFuture.wait();
+
+ // Check that the service has opened a subscription for each query.
+ auto subscriptions = nostrService->subscriptions();
+ ASSERT_NO_THROW(subscriptions.at(shortFormSubscriptionId));
+ ASSERT_EQ(subscriptions.at(shortFormSubscriptionId).size(), 1);
+ ASSERT_NO_THROW(subscriptions.at(longFormSubscriptionId));
+ ASSERT_EQ(subscriptions.at(longFormSubscriptionId).size(), 1);
+
+ // Mock the relay response for closing subscriptions.
+ EXPECT_CALL(*mockClient, send(HasSubstr("CLOSE"), _))
+ .Times(2)
+ .WillRepeatedly(Invoke([](string message, string uri)
+ {
+ return make_tuple(uri, true);
+ }));
+
+ // Close all subscriptions maintained by the service.
+ auto remainingSubscriptions = nostrService->closeSubscriptions();
+ ASSERT_TRUE(remainingSubscriptions.empty());
+
+ // Check that all subscriptions have been closed.
+ subscriptions = nostrService->subscriptions();
+ ASSERT_TRUE(subscriptions.empty());
+};
} // namespace nostr_test