aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/nostr_service.cpp170
-rw-r--r--test/nostr_service_test.cpp4
2 files changed, 90 insertions, 84 deletions
diff --git a/src/nostr_service.cpp b/src/nostr_service.cpp
index 83555eb..2c1f9a8 100644
--- a/src/nostr_service.cpp
+++ b/src/nostr_service.cpp
@@ -182,104 +182,108 @@ tuple<vector<string>, vector<string>> NostrService::publishEvent(shared_ptr<Even
return make_tuple(successfulRelays, failedRelays);
};
-// TODO: Make this method return a promise.
// TODO: Add a timeout to this method to prevent hanging while waiting for the relay.
-std::future<vector<shared_ptr<Event>> NostrService::queryRelays(shared_ptr<Filters> filters)
+std::future<vector<shared_ptr<Event>>> NostrService::queryRelays(shared_ptr<Filters> filters)
{
- if (filters->limit > 64 || filters->limit < 1)
- {
- PLOG_WARNING << "Filters limit must be between 1 and 64, inclusive. Setting limit to 16.";
- filters->limit = 16;
- }
-
- vector<shared_ptr<Event>> events;
-
- string subscriptionId = this->generateSubscriptionId();
- string request;
-
- try
- {
- request = filters->serialize(subscriptionId);
- }
- catch (const invalid_argument& e)
- {
- PLOG_ERROR << "Failed to serialize filters - invalid object: " << e.what();
- throw e;
- }
- catch (const json::exception& je)
+ return std::async(std::launch::async, [this, filters]() -> vector<shared_ptr<Event>>
{
- PLOG_ERROR << "Failed to serialize filters - JSON exception: " << je.what();
- throw je;
- }
+ if (filters->limit > 64 || filters->limit < 1)
+ {
+ PLOG_WARNING << "Filters limit must be between 1 and 64, inclusive. Setting limit to 16.";
+ filters->limit = 16;
+ }
- vector<future<tuple<string, bool>>> requestFutures;
+ vector<shared_ptr<Event>> events;
- unordered_set<string> uniqueEventIds;
+ string subscriptionId = this->generateSubscriptionId();
+ string request;
- // Send the same query to each relay. As events trickle in from each relay, they will be added
- // to the events vector. Duplicate copies of the same event will be ignored, as events are
- // stored on multiple relays. The function will block until all of the relays send an EOSE or
- // CLOSE message.
- for (const string relay : this->_activeRelays)
- {
- promise<tuple<string, bool>> eosePromise;
- requestFutures.push_back(move(eosePromise.get_future()));
-
- auto [uri, success] = this->_client->send(
- request,
- relay,
- [this, &relay, &events, &eosePromise, &uniqueEventIds](string payload)
- {
- this->onSubscriptionMessage(
- payload,
- [&events, &uniqueEventIds](const string&, shared_ptr<Event> event)
- {
- // Check if the event is unique before adding.
- if (uniqueEventIds.insert(event->id).second)
- {
- events.push_back(event);
- }
- },
- [relay, &eosePromise](const string&)
- {
- eosePromise.set_value(make_tuple(relay, true));
- },
- [relay, &eosePromise](const string&, const string&)
- {
- eosePromise.set_value(make_tuple(relay, false));
- });
- });
-
- if (success)
+ try
{
- PLOG_INFO << "Sent query to relay " << relay;
- lock_guard<mutex> lock(this->_propertyMutex);
- this->_subscriptions[subscriptionId].push_back(relay);
+ request = filters->serialize(subscriptionId);
}
- else
+ catch (const invalid_argument& e)
{
- PLOG_WARNING << "Failed to send query to relay " << relay;
- eosePromise.set_value(make_tuple(uri, false));
+ PLOG_ERROR << "Failed to serialize filters - invalid object: " << e.what();
+ throw e;
}
- }
-
- // Close open subscriptions and disconnect from relays after events are received.
- for (auto& publishFuture : requestFutures)
- {
- auto [relay, isEose] = publishFuture.get();
- if (isEose)
+ catch (const json::exception& je)
{
- PLOG_INFO << "Received EOSE message from relay " << relay;
+ PLOG_ERROR << "Failed to serialize filters - JSON exception: " << je.what();
+ throw je;
}
- else
+
+ vector<future<tuple<string, bool>>> requestFutures;
+
+ unordered_set<string> uniqueEventIds;
+
+ // Send the same query to each relay. As events trickle in from each relay, they will be added
+ // to the events vector. Duplicate copies of the same event will be ignored, as events are
+ // stored on multiple relays. The function will block until all of the relays send an EOSE or
+ // CLOSE message.
+ for (const string relay : this->_activeRelays)
{
- PLOG_WARNING << "Received CLOSE message from relay " << relay;
- this->closeRelayConnections({ relay });
+ promise<tuple<string, bool>> eosePromise;
+ requestFutures.push_back(move(eosePromise.get_future()));
+
+ auto [uri, success] = this->_client->send(
+ request,
+ relay,
+ [this, &relay, &events, &eosePromise, &uniqueEventIds](string payload)
+ {
+ this->onSubscriptionMessage(
+ payload,
+ [&events, &uniqueEventIds](const string&, shared_ptr<Event> event)
+ {
+ // Check if the event is unique before adding.
+ if (uniqueEventIds.insert(event->id).second)
+ {
+ events.push_back(event);
+ }
+ },
+ [relay, &eosePromise](const string&)
+ {
+ eosePromise.set_value(make_tuple(relay, true));
+ },
+ [relay, &eosePromise](const string&, const string&)
+ {
+ eosePromise.set_value(make_tuple(relay, false));
+ });
+ });
+
+ if (success)
+ {
+ PLOG_INFO << "Sent query to relay " << relay;
+ lock_guard<mutex> lock(this->_propertyMutex);
+ this->_subscriptions[subscriptionId].push_back(relay);
+ }
+ else
+ {
+ PLOG_WARNING << "Failed to send query to relay " << relay;
+ eosePromise.set_value(make_tuple(uri, false));
+ }
}
- }
- this->closeSubscription(subscriptionId);
- return events;
+
+ // Close open subscriptions and disconnect from relays after events are received.
+
+ for (auto& publishFuture : requestFutures)
+ {
+ auto [relay, isEose] = publishFuture.get();
+ if (isEose)
+ {
+ PLOG_INFO << "Received EOSE message from relay " << relay;
+ }
+ else
+ {
+ PLOG_WARNING << "Received CLOSE message from relay " << relay;
+ this->closeRelayConnections({ relay });
+ }
+ }
+ this->closeSubscription(subscriptionId);
+
+ return events;
+ });
};
string NostrService::queryRelays(
diff --git a/test/nostr_service_test.cpp b/test/nostr_service_test.cpp
index 4e29960..6000423 100644
--- a/test/nostr_service_test.cpp
+++ b/test/nostr_service_test.cpp
@@ -771,7 +771,9 @@ TEST_F(NostrServiceTest, QueryRelays_ReturnsEvents_UpToEOSE)
}));
auto filters = make_shared<nostr::Filters>(getKind0And1TestFilters());
- auto results = nostrService->queryRelays(filters);
+
+ // await the future and get the results
+ auto results = nostrService->queryRelays(filters).get();
// Check results size to ensure there are no duplicates.
ASSERT_EQ(results.size(), testEvents.size());