aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLibravatar buttercat1791 <mjjurkoic@gmail.com>2024-05-05 12:32:42 -0500
committerLibravatar buttercat1791 <mjjurkoic@gmail.com>2024-05-05 12:32:42 -0500
commit8a170b56b5c53c658af14f82111254e05062a23c (patch)
tree2b6863603174612d508f55591744e1c1bbc4c7d6
parentae458b29b7c5f9124e6cc4499bed60c865d7badd (diff)
Close relays after batch query and update unit test
-rw-r--r--src/nostr_service.cpp27
-rw-r--r--test/nostr_service_test.cpp10
2 files changed, 29 insertions, 8 deletions
diff --git a/src/nostr_service.cpp b/src/nostr_service.cpp
index a1f475c..94904ac 100644
--- a/src/nostr_service.cpp
+++ b/src/nostr_service.cpp
@@ -237,21 +237,35 @@ vector<shared_ptr<Event>> NostrService::queryRelays(shared_ptr<Filters> filters)
});
});
- if (!success)
+ if (success)
+ {
+ PLOG_INFO << "Sent query to relay: " << relay;
+ lock_guard<mutex> lock(this->_propertyMutex);
+ this->_subscriptions[relay].push_back(subscriptionId);
+ }
+ else
{
PLOG_WARNING << "Failed to send query to relay: " << relay;
eosePromise.set_value(make_tuple(uri, false));
}
}
+ // Close open subscriptions and disconnect from relays after events are received.
for (auto& publishFuture : requestFutures)
{
auto [relay, isEose] = publishFuture.get();
- if (!isEose)
+ if (isEose)
+ {
+ PLOG_INFO << "Received EOSE message from relay: " << relay;
+ }
+ else
{
- PLOG_WARNING << "Receive CLOSE message from relay: " << relay;
+ PLOG_WARNING << "Received CLOSE message from relay: " << relay;
+ this->closeRelayConnections({ relay });
}
}
+ this->closeSubscription(subscriptionId);
+ this->closeRelayConnections(this->_activeRelays);
// TODO: De-duplicate events in the vector before returning.
@@ -272,6 +286,7 @@ string NostrService::queryRelays(
vector<future<tuple<string, bool>>> requestFutures;
for (const string relay : this->_activeRelays)
{
+ lock_guard<mutex> lock(this->_propertyMutex);
this->_subscriptions[relay].push_back(subscriptionId);
future<tuple<string, bool>> requestFuture = async(
[this, &relay, &request, &eventHandler, &eoseHandler, &closeHandler]()
@@ -311,8 +326,8 @@ tuple<RelayList, RelayList> NostrService::closeSubscription(string subscriptionI
{
RelayList successfulRelays;
RelayList failedRelays;
-
vector<future<tuple<string, bool>>> closeFutures;
+
for (const string relay : this->_activeRelays)
{
if (!this->hasSubscription(relay, subscriptionId))
@@ -321,8 +336,9 @@ tuple<RelayList, RelayList> NostrService::closeSubscription(string subscriptionI
}
string request = this->generateCloseRequest(subscriptionId);
- future<tuple<string, bool>> closeFuture = async([this, &relay, &request]()
+ future<tuple<string, bool>> closeFuture = async([this, relay, request]()
{
+ PLOG_INFO << "Sending " << request << " to relay " << relay;
return this->_client->send(request, relay);
});
closeFutures.push_back(move(closeFuture));
@@ -512,6 +528,7 @@ string NostrService::generateCloseRequest(string subscriptionId)
bool NostrService::hasSubscription(string relay, string subscriptionId)
{
+ lock_guard<mutex> lock(this->_propertyMutex);
auto it = find(this->_subscriptions[relay].begin(), this->_subscriptions[relay].end(), subscriptionId);
if (it != this->_subscriptions[relay].end()) // If the subscription is in this->_subscriptions[relay]
{
diff --git a/test/nostr_service_test.cpp b/test/nostr_service_test.cpp
index b2f6876..14eb048 100644
--- a/test/nostr_service_test.cpp
+++ b/test/nostr_service_test.cpp
@@ -25,6 +25,7 @@ using std::unordered_map;
using std::vector;
using ::testing::_;
using ::testing::Args;
+using ::testing::HasSubstr;
using ::testing::Invoke;
using ::testing::Return;
using ::testing::Truly;
@@ -711,7 +712,6 @@ TEST_F(NostrServiceTest, PublishEvent_CorrectlyIndicates_EventRejectedBySomeRela
ASSERT_EQ(failures[0], defaultTestRelays[1]);
};
-// TODO: Add unit tests for queries.
TEST_F(NostrServiceTest, QueryRelays_ReturnsEvents_UpToEOSE)
{
mutex connectionStatusMutex;
@@ -753,7 +753,8 @@ TEST_F(NostrServiceTest, QueryRelays_ReturnsEvents_UpToEOSE)
signedTestEvents.push_back(signedEvent);
}
- EXPECT_CALL(*mockClient, send(_, _, _))
+ // Expect the query messages.
+ EXPECT_CALL(*mockClient, send(HasSubstr("REQ"), _, _))
.Times(2)
.WillRepeatedly(Invoke([&testEvents, &signer](
string message,
@@ -776,6 +777,9 @@ TEST_F(NostrServiceTest, QueryRelays_ReturnsEvents_UpToEOSE)
return make_tuple(uri, true);
}));
+ // Expect the close subscription messages after the client receives events.
+ // TODO: Expect close message.
+ EXPECT_CALL(*mockClient, send(HasSubstr("CLOSE"), _)).Times(2);
auto filters = make_shared<nostr::Filters>(getKind0And1TestFilters());
auto results = nostrService->queryRelays(filters);
@@ -886,7 +890,6 @@ TEST_F(NostrServiceTest, QueryRelays_CallsHandler_WithReturnedEvents)
[&generatedSubscriptionId, &eoseReceivedPromise, &eoseCount]
(const string& subscriptionId)
{
- std::cout << "EOSE received for subscription ID: " << subscriptionId << std::endl;
ASSERT_STREQ(subscriptionId.c_str(), generatedSubscriptionId.c_str());
if (++eoseCount == 2)
@@ -900,4 +903,5 @@ TEST_F(NostrServiceTest, QueryRelays_CallsHandler_WithReturnedEvents)
};
// TODO: Add unit tests for closing subscriptions.
+
} // namespace nostr_test