diff options
author | Michael Jurkoic <mjjurkoic@gmail.com> | 2024-04-10 21:33:45 -0500 |
---|---|---|
committer | Michael Jurkoic <mjjurkoic@gmail.com> | 2024-04-10 21:33:45 -0500 |
commit | c8bb6c8f56e0c6d93c8623722ab932c04de882b5 (patch) | |
tree | b61f055748e8c8058f9d27c1441bf68d3bca030d /src/nostr_service.cpp | |
parent | 0a185a13aa4c202ad8d76ac3e62a878dc5f06619 (diff) |
Handle relay response messages
These changes do not yet have unit tests.
Diffstat (limited to 'src/nostr_service.cpp')
-rw-r--r-- | src/nostr_service.cpp | 164 |
1 files changed, 137 insertions, 27 deletions
diff --git a/src/nostr_service.cpp b/src/nostr_service.cpp index 614e64f..e8f14f6 100644 --- a/src/nostr_service.cpp +++ b/src/nostr_service.cpp @@ -25,6 +25,7 @@ using std::make_tuple; using std::move; using std::mutex; using std::out_of_range; +using std::promise; using std::shared_ptr; using std::string; using std::thread; @@ -123,6 +124,7 @@ void NostrService::closeRelayConnections(RelayList relays) } }; +// TODO: Make this method return a promise. tuple<RelayList, RelayList> NostrService::publishEvent(shared_ptr<Event> event) { RelayList successfulRelays; @@ -151,12 +153,34 @@ tuple<RelayList, RelayList> NostrService::publishEvent(shared_ptr<Event> event) vector<future<tuple<string, bool>>> publishFutures; for (const string& relay : this->_activeRelays) { - PLOG_INFO << "Entering lambda."; - future<tuple<string, bool>> publishFuture = async([this, relay, message]() + promise<tuple<string, bool>> publishPromise; + publishFutures.push_back(move(publishPromise.get_future())); + + auto [targetRelay, isSuccess] = this->_client->send( + message.dump(), + relay, + [this, &relay, &event, &publishPromise](string response) + { + this->onAcceptance(response, [this, &relay, &event, &publishPromise](bool isAccepted) + { + if (isAccepted) + { + PLOG_INFO << "Relay " << relay << " accepted event: " << event->id; + publishPromise.set_value(make_tuple(relay, true)); + } + else + { + PLOG_WARNING << "Relay " << relay << " rejected event: " << event->id; + publishPromise.set_value(make_tuple(relay, false)); + } + }); + }); + + if (!isSuccess) { - return this->_client->send(message.dump(), relay); - }); - publishFutures.push_back(move(publishFuture)); + PLOG_WARNING << "Failed to send event to relay: " << relay; + publishPromise.set_value(make_tuple(relay, false)); + } } for (auto& publishFuture : publishFutures) @@ -179,9 +203,72 @@ tuple<RelayList, RelayList> NostrService::publishEvent(shared_ptr<Event> event) return make_tuple(successfulRelays, failedRelays); }; +// TODO: Make this method return a promise. +// TODO: Add a timeout to this method to prevent hanging while waiting for the relay. +vector<shared_ptr<Event>> NostrService::queryRelays(shared_ptr<Filters> filters) +{ + if (filters->limit > 64 || filters->limit < 1) + { + PLOG_WARNING << "Filters limit must be between 1 and 64, inclusive. Setting limit to 16."; + filters->limit = 16; + } + + vector<shared_ptr<Event>> events; + + string subscriptionId = this->generateSubscriptionId(); + string request = filters->serialize(subscriptionId); + vector<future<tuple<string, bool>>> requestFutures; + + // Send the same query to each relay. As events trickle in from each relay, they will be added + // to the events vector. Multiple copies of an event may be received if the same event is + // stored on multiple relays. The function will block until all of the relays send an EOSE or + // CLOSE message. + for (const string relay : this->_activeRelays) + { + promise<tuple<string, bool>> eosePromise; + requestFutures.push_back(move(eosePromise.get_future())); + + this->_client->send( + request, + relay, + [this, &relay, &events, &eosePromise](string payload) + { + this->onSubscriptionMessage( + payload, + [&events](const string&, shared_ptr<Event> event) + { + events.push_back(event); + }, + [relay, &eosePromise](const string&) + { + eosePromise.set_value(make_tuple(relay, true)); + }, + [relay, &eosePromise](const string&, const string&) + { + eosePromise.set_value(make_tuple(relay, false)); + }); + }); + } + + for (auto& publishFuture : requestFutures) + { + auto [relay, isEose] = publishFuture.get(); + if (!isEose) + { + PLOG_WARNING << "Receive CLOSE message from relay: " << relay; + } + } + + // TODO: De-duplicate events in the vector before returning. + + return events; +}; + string NostrService::queryRelays( shared_ptr<Filters> filters, - function<void(const string&, shared_ptr<Event>)> responseHandler) + function<void(const string&, shared_ptr<Event>)> eventHandler, + function<void(const string&)> eoseHandler, + function<void(const string&, const string&)> closeHandler) { RelayList successfulRelays; RelayList failedRelays; @@ -189,24 +276,22 @@ string NostrService::queryRelays( string subscriptionId = this->generateSubscriptionId(); string request = filters->serialize(subscriptionId); vector<future<tuple<string, bool>>> requestFutures; - vector<future<void>> receiveFutures; for (const string relay : this->_activeRelays) { this->_subscriptions[relay].push_back(subscriptionId); - future<tuple<string, bool>> requestFuture = async([this, &relay, &request]() - { - return this->_client->send(request, relay); - }); - requestFutures.push_back(move(requestFuture)); - - auto receiveFuture = async([this, &relay, &responseHandler]() - { - this->_client->receive(relay, [this, responseHandler](string payload) { - this->onMessage(payload, responseHandler); + future<tuple<string, bool>> requestFuture = async( + [this, &relay, &request, &eventHandler, &eoseHandler, &closeHandler]() + { + return this->_client->send( + request, + relay, + [this, &eventHandler, &eoseHandler, &closeHandler](string payload) + { + this->onSubscriptionMessage(payload, eventHandler, eoseHandler, closeHandler); + }); }); - }); - receiveFutures.push_back(move(receiveFuture)); + requestFutures.push_back(move(requestFuture)); } for (auto& publishFuture : requestFutures) @@ -222,11 +307,6 @@ string NostrService::queryRelays( } } - for (auto& receiveFuture : receiveFutures) - { - receiveFuture.get(); - } - size_t targetCount = this->_activeRelays.size(); size_t successfulCount = successfulRelays.size(); PLOG_INFO << "Sent query to " << successfulCount << "/" << targetCount << " open relay connections."; @@ -446,9 +526,11 @@ bool NostrService::hasSubscription(string relay, string subscriptionId) return false; }; -void NostrService::onMessage( +void NostrService::onSubscriptionMessage( string message, - function<void(const string&, shared_ptr<Event>)> eventHandler) + function<void(const string&, shared_ptr<Event>)> eventHandler, + function<void(const string&)> eoseHandler, + function<void(const string&, const string&)> closeHandler) { try { @@ -460,8 +542,36 @@ void NostrService::onMessage( Event event = Event::fromString(jMessage[2]); eventHandler(subscriptionId, make_shared<Event>(event)); } + else if (messageType == "EOSE") + { + string subscriptionId = jMessage[1]; + eoseHandler(subscriptionId); + } + else if (messageType == "CLOSE") + { + string subscriptionId = jMessage[1]; + string reason = jMessage[2]; + closeHandler(subscriptionId, reason); + } + } + catch (const json::exception& je) + { + PLOG_ERROR << "JSON handling exception: " << je.what(); + throw je; + } +}; - // Support other message types here, if necessary. +void NostrService::onAcceptance(string message, function<void(const bool)> acceptanceHandler) +{ + try + { + json jMessage = json::parse(message); + string messageType = jMessage[0]; + if (messageType == "OK") + { + bool isAccepted = jMessage[2]; + acceptanceHandler(isAccepted); + } } catch (const json::exception& je) { |