diff options
-rw-r--r-- | include/client/web_socket_client.hpp | 13 | ||||
-rw-r--r-- | include/nostr.hpp | 44 | ||||
-rw-r--r-- | src/client/websocketpp_client.cpp | 8 | ||||
-rw-r--r-- | src/nostr_service.cpp | 164 | ||||
-rw-r--r-- | test/nostr_service_test.cpp | 69 |
5 files changed, 199 insertions, 99 deletions
diff --git a/include/client/web_socket_client.hpp b/include/client/web_socket_client.hpp index 3ef2b86..6fbede6 100644 --- a/include/client/web_socket_client.hpp +++ b/include/client/web_socket_client.hpp @@ -43,6 +43,19 @@ public: virtual std::tuple<std::string, bool> send(std::string message, std::string uri) = 0; /** + * @brief Sends the given message to the given server and sets up a message handler for + * messages received from the server. + * @returns A tuple indicating the server URI and whether the message was successfully + * sent. + * @remark Use this method to send a message and set up a message handler for responses in the + * same call. + */ + virtual std::tuple<std::string, bool> send( + std::string message, + std::string uri, + std::function<void(const std::string&)> messageHandler) = 0; + + /** * @brief Sets up a message handler for the given server. * @param uri The URI of the server to which the message handler should be attached. * @param messageHandler A callable object that will be invoked with the payload the client diff --git a/include/nostr.hpp b/include/nostr.hpp index e450505..62eceff 100644 --- a/include/nostr.hpp +++ b/include/nostr.hpp @@ -157,10 +157,24 @@ public: * @returns A tuple of `RelayList` objects, of the form `<successes, failures>`, indicating * to which relays the event was published successfully, and to which relays the event failed * to publish. - */ + */ std::tuple<RelayList, RelayList> publishEvent(std::shared_ptr<Event> event); /** + * @brief Queries all open relay connections for events matching the given set of filters, and + * returns all stored matching events returned by the relays. + * @param filters The filters to use for the query. + * @returns A vector of all events matching the filters from all open relay connections. + * @remark This method runs until the relays send an EOSE message, indicating they have no more + * stored events matching the given filters. When the EOSE message is received, the method + * will close the subscription for each relay and return the received events. + * @remark Use this method to fetch a batch of events from the relays. A `limit` value must be + * set on the filters in the range 1-64, inclusive. If no valid limit is given, it will be + * defaulted to 16. + */ + std::vector<std::shared_ptr<Event>> queryRelays(std::shared_ptr<Filters> filters); + + /** * @brief Queries all open relay connections for events matching the given set of filters. * @param filters The filters to use for the query. * @param responseHandler A callable object that will be invoked each time the client receives @@ -172,7 +186,9 @@ public: */ std::string queryRelays( std::shared_ptr<Filters> filters, - std::function<void(const std::string&, std::shared_ptr<Event>)> responseHandler); + std::function<void(const std::string&, std::shared_ptr<Event>)> eventHandler, + std::function<void(const std::string&)> eoseHandler, + std::function<void(const std::string&, const std::string&)> closeHandler); /** * @brief Closes the subscription with the given ID on all open relay connections. @@ -269,11 +285,29 @@ private: bool hasSubscription(std::string relay, std::string subscriptionId); /** - * @brief Parses messages received from the relay and invokes the appropriate message handler. + * @brief Parses EVENT messages received from the relay and invokes the given event handler. + * @param message The raw message received from the relay. + * @param eventHandler A callable object that will be invoked with the subscription ID and the + * payload of the event. + * @param eoseHandler A callable object that will be invoked with the subscription ID when the + * relay sends an EOSE message, indicating it has reached the end of stored events for the + * given query. + * @param closeHandler A callable object that will be invoked with the subscription ID and the + * message sent by the relay if the subscription is ended by the relay. */ - void onMessage( + void onSubscriptionMessage( std::string message, - std::function<void(const std::string&, std::shared_ptr<Event>)> eventHandler); + std::function<void(const std::string&, std::shared_ptr<Event>)> eventHandler, + std::function<void(const std::string&)> eoseHandler, + std::function<void(const std::string&, const std::string&)> closeHandler); + + /** + * @brief Parses OK messages received from the relay and invokes the given acceptance handler. + * @remark The OK message type is sent to indicate whether the relay has accepted an event sent + * by the client. Note that this is distinct from whether the message was successfully sent to + * the relay over the WebSocket connection. + */ + void onAcceptance(std::string message, std::function<void(const bool)> acceptanceHandler); }; class ISigner diff --git a/src/client/websocketpp_client.cpp b/src/client/websocketpp_client.cpp index 981d4ec..276c5dd 100644 --- a/src/client/websocketpp_client.cpp +++ b/src/client/websocketpp_client.cpp @@ -77,13 +77,19 @@ public: if (error.value() == -1) { - // PLOG_ERROR << "Error publishing event to relay " << relay << ": " << error.message(); return make_tuple(uri, false); } return make_tuple(uri, true); }; + tuple<string, bool> send(string message, string uri, function<void(const string&)> messageHandler) override + { + auto successes = this->send(message, uri); + this->receive(uri, messageHandler); + return successes; + }; + void receive(string uri, function<void(const string&)> messageHandler) override { lock_guard<mutex> lock(this->_propertyMutex); diff --git a/src/nostr_service.cpp b/src/nostr_service.cpp index 614e64f..e8f14f6 100644 --- a/src/nostr_service.cpp +++ b/src/nostr_service.cpp @@ -25,6 +25,7 @@ using std::make_tuple; using std::move; using std::mutex; using std::out_of_range; +using std::promise; using std::shared_ptr; using std::string; using std::thread; @@ -123,6 +124,7 @@ void NostrService::closeRelayConnections(RelayList relays) } }; +// TODO: Make this method return a promise. tuple<RelayList, RelayList> NostrService::publishEvent(shared_ptr<Event> event) { RelayList successfulRelays; @@ -151,12 +153,34 @@ tuple<RelayList, RelayList> NostrService::publishEvent(shared_ptr<Event> event) vector<future<tuple<string, bool>>> publishFutures; for (const string& relay : this->_activeRelays) { - PLOG_INFO << "Entering lambda."; - future<tuple<string, bool>> publishFuture = async([this, relay, message]() + promise<tuple<string, bool>> publishPromise; + publishFutures.push_back(move(publishPromise.get_future())); + + auto [targetRelay, isSuccess] = this->_client->send( + message.dump(), + relay, + [this, &relay, &event, &publishPromise](string response) + { + this->onAcceptance(response, [this, &relay, &event, &publishPromise](bool isAccepted) + { + if (isAccepted) + { + PLOG_INFO << "Relay " << relay << " accepted event: " << event->id; + publishPromise.set_value(make_tuple(relay, true)); + } + else + { + PLOG_WARNING << "Relay " << relay << " rejected event: " << event->id; + publishPromise.set_value(make_tuple(relay, false)); + } + }); + }); + + if (!isSuccess) { - return this->_client->send(message.dump(), relay); - }); - publishFutures.push_back(move(publishFuture)); + PLOG_WARNING << "Failed to send event to relay: " << relay; + publishPromise.set_value(make_tuple(relay, false)); + } } for (auto& publishFuture : publishFutures) @@ -179,9 +203,72 @@ tuple<RelayList, RelayList> NostrService::publishEvent(shared_ptr<Event> event) return make_tuple(successfulRelays, failedRelays); }; +// TODO: Make this method return a promise. +// TODO: Add a timeout to this method to prevent hanging while waiting for the relay. +vector<shared_ptr<Event>> NostrService::queryRelays(shared_ptr<Filters> filters) +{ + if (filters->limit > 64 || filters->limit < 1) + { + PLOG_WARNING << "Filters limit must be between 1 and 64, inclusive. Setting limit to 16."; + filters->limit = 16; + } + + vector<shared_ptr<Event>> events; + + string subscriptionId = this->generateSubscriptionId(); + string request = filters->serialize(subscriptionId); + vector<future<tuple<string, bool>>> requestFutures; + + // Send the same query to each relay. As events trickle in from each relay, they will be added + // to the events vector. Multiple copies of an event may be received if the same event is + // stored on multiple relays. The function will block until all of the relays send an EOSE or + // CLOSE message. + for (const string relay : this->_activeRelays) + { + promise<tuple<string, bool>> eosePromise; + requestFutures.push_back(move(eosePromise.get_future())); + + this->_client->send( + request, + relay, + [this, &relay, &events, &eosePromise](string payload) + { + this->onSubscriptionMessage( + payload, + [&events](const string&, shared_ptr<Event> event) + { + events.push_back(event); + }, + [relay, &eosePromise](const string&) + { + eosePromise.set_value(make_tuple(relay, true)); + }, + [relay, &eosePromise](const string&, const string&) + { + eosePromise.set_value(make_tuple(relay, false)); + }); + }); + } + + for (auto& publishFuture : requestFutures) + { + auto [relay, isEose] = publishFuture.get(); + if (!isEose) + { + PLOG_WARNING << "Receive CLOSE message from relay: " << relay; + } + } + + // TODO: De-duplicate events in the vector before returning. + + return events; +}; + string NostrService::queryRelays( shared_ptr<Filters> filters, - function<void(const string&, shared_ptr<Event>)> responseHandler) + function<void(const string&, shared_ptr<Event>)> eventHandler, + function<void(const string&)> eoseHandler, + function<void(const string&, const string&)> closeHandler) { RelayList successfulRelays; RelayList failedRelays; @@ -189,24 +276,22 @@ string NostrService::queryRelays( string subscriptionId = this->generateSubscriptionId(); string request = filters->serialize(subscriptionId); vector<future<tuple<string, bool>>> requestFutures; - vector<future<void>> receiveFutures; for (const string relay : this->_activeRelays) { this->_subscriptions[relay].push_back(subscriptionId); - future<tuple<string, bool>> requestFuture = async([this, &relay, &request]() - { - return this->_client->send(request, relay); - }); - requestFutures.push_back(move(requestFuture)); - - auto receiveFuture = async([this, &relay, &responseHandler]() - { - this->_client->receive(relay, [this, responseHandler](string payload) { - this->onMessage(payload, responseHandler); + future<tuple<string, bool>> requestFuture = async( + [this, &relay, &request, &eventHandler, &eoseHandler, &closeHandler]() + { + return this->_client->send( + request, + relay, + [this, &eventHandler, &eoseHandler, &closeHandler](string payload) + { + this->onSubscriptionMessage(payload, eventHandler, eoseHandler, closeHandler); + }); }); - }); - receiveFutures.push_back(move(receiveFuture)); + requestFutures.push_back(move(requestFuture)); } for (auto& publishFuture : requestFutures) @@ -222,11 +307,6 @@ string NostrService::queryRelays( } } - for (auto& receiveFuture : receiveFutures) - { - receiveFuture.get(); - } - size_t targetCount = this->_activeRelays.size(); size_t successfulCount = successfulRelays.size(); PLOG_INFO << "Sent query to " << successfulCount << "/" << targetCount << " open relay connections."; @@ -446,9 +526,11 @@ bool NostrService::hasSubscription(string relay, string subscriptionId) return false; }; -void NostrService::onMessage( +void NostrService::onSubscriptionMessage( string message, - function<void(const string&, shared_ptr<Event>)> eventHandler) + function<void(const string&, shared_ptr<Event>)> eventHandler, + function<void(const string&)> eoseHandler, + function<void(const string&, const string&)> closeHandler) { try { @@ -460,8 +542,36 @@ void NostrService::onMessage( Event event = Event::fromString(jMessage[2]); eventHandler(subscriptionId, make_shared<Event>(event)); } + else if (messageType == "EOSE") + { + string subscriptionId = jMessage[1]; + eoseHandler(subscriptionId); + } + else if (messageType == "CLOSE") + { + string subscriptionId = jMessage[1]; + string reason = jMessage[2]; + closeHandler(subscriptionId, reason); + } + } + catch (const json::exception& je) + { + PLOG_ERROR << "JSON handling exception: " << je.what(); + throw je; + } +}; - // Support other message types here, if necessary. +void NostrService::onAcceptance(string message, function<void(const bool)> acceptanceHandler) +{ + try + { + json jMessage = json::parse(message); + string messageType = jMessage[0]; + if (messageType == "OK") + { + bool isAccepted = jMessage[2]; + acceptanceHandler(isAccepted); + } } catch (const json::exception& je) { diff --git a/test/nostr_service_test.cpp b/test/nostr_service_test.cpp index 7adda7e..854de78 100644 --- a/test/nostr_service_test.cpp +++ b/test/nostr_service_test.cpp @@ -20,8 +20,10 @@ using std::string; using std::tuple; using std::unordered_map; using ::testing::_; +using ::testing::Args; using ::testing::Invoke; using ::testing::Return; +using ::testing::Truly; namespace nostr_test { @@ -32,6 +34,7 @@ public: MOCK_METHOD(void, openConnection, (string uri), (override)); MOCK_METHOD(bool, isConnected, (string uri), (override)); MOCK_METHOD((tuple<string, bool>), send, (string message, string uri), (override)); + MOCK_METHOD((tuple<string, bool>), send, (string message, string uri, function<void(const string&)> messageHandler), (override)); MOCK_METHOD(void, receive, (string uri, function<void(const string&)> messageHandler), (override)); MOCK_METHOD(void, closeConnection, (string uri), (override)); }; @@ -531,70 +534,4 @@ TEST_F(NostrServiceTest, PublishEvent_CorrectlyIndicates_MixedSuccessesAndFailur ASSERT_EQ(failures.size(), 1); ASSERT_EQ(failures[0], defaultTestRelays[1]); }; - -TEST_F(NostrServiceTest, QueryRelays_UsesProvidedHandler_AndReturnsSubscriptionId) -{ - mutex connectionStatusMutex; - auto connectionStatus = make_shared<unordered_map<string, bool>>(); - connectionStatus->insert({ defaultTestRelays[0], false }); - connectionStatus->insert({ defaultTestRelays[1], false }); - - EXPECT_CALL(*mockClient, isConnected(_)) - .WillRepeatedly(Invoke([connectionStatus, &connectionStatusMutex](string uri) - { - lock_guard<mutex> lock(connectionStatusMutex); - bool status = connectionStatus->at(uri); - if (status == false) - { - connectionStatus->at(uri) = true; - } - return status; - })); - - auto nostrService = make_unique<nostr::NostrService>( - testAppender, - mockClient, - fakeSigner, - defaultTestRelays); - nostrService->openRelayConnections(); - - auto sentSubscriptionId = make_shared<string>(); - EXPECT_CALL(*mockClient, send(_, _)) - .Times(2) - .WillRepeatedly(Invoke([sentSubscriptionId](string message, string uri) - { - json jarr = json::array(); - jarr = json::parse(message); - - string temp = jarr[1].dump(); - if (!temp.empty() && temp[0] == '\"' && temp[temp.size() - 1] == '\"') - { - *sentSubscriptionId = temp.substr(1, temp.size() - 2); - } - - return make_tuple(uri, true); - })); - EXPECT_CALL(*mockClient, receive(_, _)) - .Times(2) - .WillRepeatedly(Invoke([sentSubscriptionId](string _, function<void(const string&)> messageHandler) - { - auto event = make_shared<nostr::Event>(getTextNoteTestEvent()); - messageHandler(getTestEventMessage(event, *sentSubscriptionId)); - })); - - auto filters = make_shared<nostr::Filters>(getKind0And1TestFilters()); - nostr::Event expectedEvent = getTextNoteTestEvent(); - auto receivedSubscriptionId = nostrService->queryRelays( - filters, - [expectedEvent](const string& subscriptionId, shared_ptr<nostr::Event> event) - { - ASSERT_STREQ(event->pubkey.c_str(), expectedEvent.pubkey.c_str()); - ASSERT_EQ(event->kind, expectedEvent.kind); - ASSERT_EQ(event->tags.size(), expectedEvent.tags.size()); - ASSERT_STREQ(event->content.c_str(), expectedEvent.content.c_str()); - ASSERT_GT(event->sig.size(), 0); - }); - - ASSERT_STREQ(receivedSubscriptionId.c_str(), sentSubscriptionId->c_str()); -}; } // namespace nostr_test |