aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/nostr.hpp5
-rw-r--r--src/nostr_service.cpp22
-rw-r--r--test/nostr_service_test.cpp44
3 files changed, 63 insertions, 8 deletions
diff --git a/include/nostr.hpp b/include/nostr.hpp
index 5e7dbfe..326a637 100644
--- a/include/nostr.hpp
+++ b/include/nostr.hpp
@@ -22,7 +22,6 @@
namespace nostr
{
typedef std::vector<std::string> RelayList;
-typedef std::unordered_map<std::string, std::vector<std::string>> TagMap;
class ISigner;
class NostrService;
@@ -101,7 +100,7 @@ struct Filters
std::vector<std::string> ids; ///< Event IDs.
std::vector<std::string> authors; ///< Event author npubs.
std::vector<int> kinds; ///< Kind numbers.
- TagMap tags; ///< Tag names mapped to lists of tag values.
+ std::unordered_map<std::string, std::vector<std::string>> tags; ///< Tag names mapped to lists of tag values.
std::time_t since; ///< Unix timestamp. Matching events must be newer than this.
std::time_t until; ///< Unix timestamp. Matching events must be older than this.
int limit; ///< The maximum number of events the relay should return on the initial query.
@@ -143,6 +142,8 @@ public:
RelayList activeRelays() const;
+ std::unordered_map<std::string, std::vector<std::string>> subscriptions() const;
+
/**
* @brief Opens connections to the default Nostr relays of the instance, as specified in
* the constructor.
diff --git a/src/nostr_service.cpp b/src/nostr_service.cpp
index 94904ac..6ffb06d 100644
--- a/src/nostr_service.cpp
+++ b/src/nostr_service.cpp
@@ -32,6 +32,8 @@ RelayList NostrService::defaultRelays() const { return this->_defaultRelays; };
RelayList NostrService::activeRelays() const { return this->_activeRelays; };
+unordered_map<string, vector<string>> NostrService::subscriptions() const { return this->_subscriptions; };
+
RelayList NostrService::openRelayConnections()
{
return this->openRelayConnections(this->_defaultRelays);
@@ -87,6 +89,9 @@ void NostrService::closeRelayConnections(RelayList relays)
this->disconnect(relay);
});
disconnectionThreads.push_back(move(disconnectionThread));
+
+ lock_guard<mutex> lock(this->_propertyMutex);
+ this->_subscriptions.erase(relay);
}
for (thread& disconnectionThread : disconnectionThreads)
@@ -286,8 +291,10 @@ string NostrService::queryRelays(
vector<future<tuple<string, bool>>> requestFutures;
for (const string relay : this->_activeRelays)
{
- lock_guard<mutex> lock(this->_propertyMutex);
+ unique_lock<mutex> lock(this->_propertyMutex);
this->_subscriptions[relay].push_back(subscriptionId);
+ lock.unlock();
+
future<tuple<string, bool>> requestFuture = async(
[this, &relay, &request, &eventHandler, &eoseHandler, &closeHandler]()
{
@@ -350,6 +357,13 @@ tuple<RelayList, RelayList> NostrService::closeSubscription(string subscriptionI
if (isSuccess)
{
successfulRelays.push_back(relay);
+
+ lock_guard<mutex> lock(this->_propertyMutex);
+ auto it = find(
+ this->_subscriptions[relay].begin(),
+ this->_subscriptions[relay].end(),
+ subscriptionId);
+ this->_subscriptions[relay].erase(it);
}
else
{
@@ -382,7 +396,11 @@ tuple<RelayList, RelayList> NostrService::closeSubscriptions(RelayList relays)
RelayList successfulRelays;
RelayList failedRelays;
- for (const string& subscriptionId : this->_subscriptions[relay])
+ unique_lock<mutex> lock(this->_propertyMutex);
+ auto subscriptionIds = this->_subscriptions[relay];
+ lock.unlock();
+
+ for (const string& subscriptionId : subscriptionIds)
{
auto [successes, failures] = this->closeSubscription(subscriptionId);
successfulRelays.insert(successfulRelays.end(), successes.begin(), successes.end());
diff --git a/test/nostr_service_test.cpp b/test/nostr_service_test.cpp
index 14eb048..460be73 100644
--- a/test/nostr_service_test.cpp
+++ b/test/nostr_service_test.cpp
@@ -778,8 +778,12 @@ TEST_F(NostrServiceTest, QueryRelays_ReturnsEvents_UpToEOSE)
return make_tuple(uri, true);
}));
// Expect the close subscription messages after the client receives events.
- // TODO: Expect close message.
- EXPECT_CALL(*mockClient, send(HasSubstr("CLOSE"), _)).Times(2);
+ EXPECT_CALL(*mockClient, send(HasSubstr("CLOSE"), _))
+ .Times(2)
+ .WillRepeatedly(Invoke([](string message, string uri)
+ {
+ return make_tuple(uri, true);
+ }));
auto filters = make_shared<nostr::Filters>(getKind0And1TestFilters());
auto results = nostrService->queryRelays(filters);
@@ -843,7 +847,7 @@ TEST_F(NostrServiceTest, QueryRelays_CallsHandler_WithReturnedEvents)
signedTestEvents.push_back(signedEvent);
}
- EXPECT_CALL(*mockClient, send(_, _, _))
+ EXPECT_CALL(*mockClient, send(HasSubstr("REQ"), _, _))
.Times(2)
.WillRepeatedly(Invoke([&testEvents, &signer](
string message,
@@ -900,8 +904,40 @@ TEST_F(NostrServiceTest, QueryRelays_CallsHandler_WithReturnedEvents)
[](const string&, const string&) {});
eoseReceivedFuture.wait();
+
+ // Check that the service is keeping track of its active subscriptions.
+ auto subscriptions = nostrService->subscriptions();
+ for (string uri : nostrService->activeRelays())
+ {
+ ASSERT_NO_THROW(subscriptions.at(uri));
+ ASSERT_EQ(subscriptions.at(uri).size(), 1);
+ ASSERT_NE(
+ find_if(
+ subscriptions[uri].begin(),
+ subscriptions[uri].end(),
+ [&generatedSubscriptionId](const string& subscriptionId)
+ {
+ return subscriptionId == generatedSubscriptionId;
+ }),
+ subscriptions[uri].end());
+ }
+
+ EXPECT_CALL(*mockClient, send(HasSubstr("CLOSE"), _))
+ .Times(2)
+ .WillRepeatedly(Invoke([](string message, string uri)
+ {
+ return make_tuple(uri, true);
+ }));
+
+ nostrService->closeSubscription(generatedSubscriptionId);
+
+ // Check that the service has forgotten about the subscriptions after closing them.
+ subscriptions = nostrService->subscriptions();
+ for (string uri : nostrService->activeRelays())
+ {
+ ASSERT_EQ(subscriptions.at(uri).size(), 0);
+ }
};
// TODO: Add unit tests for closing subscriptions.
-
} // namespace nostr_test