aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLibravatar Michael Jurkoic <mjjurkoic@gmail.com>2024-04-10 21:33:45 -0500
committerLibravatar Michael Jurkoic <mjjurkoic@gmail.com>2024-04-10 21:33:45 -0500
commitc8bb6c8f56e0c6d93c8623722ab932c04de882b5 (patch)
treeb61f055748e8c8058f9d27c1441bf68d3bca030d
parent0a185a13aa4c202ad8d76ac3e62a878dc5f06619 (diff)
Handle relay response messages
These changes do not yet have unit tests.
-rw-r--r--include/client/web_socket_client.hpp13
-rw-r--r--include/nostr.hpp44
-rw-r--r--src/client/websocketpp_client.cpp8
-rw-r--r--src/nostr_service.cpp164
-rw-r--r--test/nostr_service_test.cpp69
5 files changed, 199 insertions, 99 deletions
diff --git a/include/client/web_socket_client.hpp b/include/client/web_socket_client.hpp
index 3ef2b86..6fbede6 100644
--- a/include/client/web_socket_client.hpp
+++ b/include/client/web_socket_client.hpp
@@ -43,6 +43,19 @@ public:
virtual std::tuple<std::string, bool> send(std::string message, std::string uri) = 0;
/**
+ * @brief Sends the given message to the given server and sets up a message handler for
+ * messages received from the server.
+ * @returns A tuple indicating the server URI and whether the message was successfully
+ * sent.
+ * @remark Use this method to send a message and set up a message handler for responses in the
+ * same call.
+ */
+ virtual std::tuple<std::string, bool> send(
+ std::string message,
+ std::string uri,
+ std::function<void(const std::string&)> messageHandler) = 0;
+
+ /**
* @brief Sets up a message handler for the given server.
* @param uri The URI of the server to which the message handler should be attached.
* @param messageHandler A callable object that will be invoked with the payload the client
diff --git a/include/nostr.hpp b/include/nostr.hpp
index e450505..62eceff 100644
--- a/include/nostr.hpp
+++ b/include/nostr.hpp
@@ -157,10 +157,24 @@ public:
* @returns A tuple of `RelayList` objects, of the form `<successes, failures>`, indicating
* to which relays the event was published successfully, and to which relays the event failed
* to publish.
- */
+ */
std::tuple<RelayList, RelayList> publishEvent(std::shared_ptr<Event> event);
/**
+ * @brief Queries all open relay connections for events matching the given set of filters, and
+ * returns all stored matching events returned by the relays.
+ * @param filters The filters to use for the query.
+ * @returns A vector of all events matching the filters from all open relay connections.
+ * @remark This method runs until the relays send an EOSE message, indicating they have no more
+ * stored events matching the given filters. When the EOSE message is received, the method
+ * will close the subscription for each relay and return the received events.
+ * @remark Use this method to fetch a batch of events from the relays. A `limit` value must be
+ * set on the filters in the range 1-64, inclusive. If no valid limit is given, it will be
+ * defaulted to 16.
+ */
+ std::vector<std::shared_ptr<Event>> queryRelays(std::shared_ptr<Filters> filters);
+
+ /**
* @brief Queries all open relay connections for events matching the given set of filters.
* @param filters The filters to use for the query.
* @param responseHandler A callable object that will be invoked each time the client receives
@@ -172,7 +186,9 @@ public:
*/
std::string queryRelays(
std::shared_ptr<Filters> filters,
- std::function<void(const std::string&, std::shared_ptr<Event>)> responseHandler);
+ std::function<void(const std::string&, std::shared_ptr<Event>)> eventHandler,
+ std::function<void(const std::string&)> eoseHandler,
+ std::function<void(const std::string&, const std::string&)> closeHandler);
/**
* @brief Closes the subscription with the given ID on all open relay connections.
@@ -269,11 +285,29 @@ private:
bool hasSubscription(std::string relay, std::string subscriptionId);
/**
- * @brief Parses messages received from the relay and invokes the appropriate message handler.
+ * @brief Parses EVENT messages received from the relay and invokes the given event handler.
+ * @param message The raw message received from the relay.
+ * @param eventHandler A callable object that will be invoked with the subscription ID and the
+ * payload of the event.
+ * @param eoseHandler A callable object that will be invoked with the subscription ID when the
+ * relay sends an EOSE message, indicating it has reached the end of stored events for the
+ * given query.
+ * @param closeHandler A callable object that will be invoked with the subscription ID and the
+ * message sent by the relay if the subscription is ended by the relay.
*/
- void onMessage(
+ void onSubscriptionMessage(
std::string message,
- std::function<void(const std::string&, std::shared_ptr<Event>)> eventHandler);
+ std::function<void(const std::string&, std::shared_ptr<Event>)> eventHandler,
+ std::function<void(const std::string&)> eoseHandler,
+ std::function<void(const std::string&, const std::string&)> closeHandler);
+
+ /**
+ * @brief Parses OK messages received from the relay and invokes the given acceptance handler.
+ * @remark The OK message type is sent to indicate whether the relay has accepted an event sent
+ * by the client. Note that this is distinct from whether the message was successfully sent to
+ * the relay over the WebSocket connection.
+ */
+ void onAcceptance(std::string message, std::function<void(const bool)> acceptanceHandler);
};
class ISigner
diff --git a/src/client/websocketpp_client.cpp b/src/client/websocketpp_client.cpp
index 981d4ec..276c5dd 100644
--- a/src/client/websocketpp_client.cpp
+++ b/src/client/websocketpp_client.cpp
@@ -77,13 +77,19 @@ public:
if (error.value() == -1)
{
- // PLOG_ERROR << "Error publishing event to relay " << relay << ": " << error.message();
return make_tuple(uri, false);
}
return make_tuple(uri, true);
};
+ tuple<string, bool> send(string message, string uri, function<void(const string&)> messageHandler) override
+ {
+ auto successes = this->send(message, uri);
+ this->receive(uri, messageHandler);
+ return successes;
+ };
+
void receive(string uri, function<void(const string&)> messageHandler) override
{
lock_guard<mutex> lock(this->_propertyMutex);
diff --git a/src/nostr_service.cpp b/src/nostr_service.cpp
index 614e64f..e8f14f6 100644
--- a/src/nostr_service.cpp
+++ b/src/nostr_service.cpp
@@ -25,6 +25,7 @@ using std::make_tuple;
using std::move;
using std::mutex;
using std::out_of_range;
+using std::promise;
using std::shared_ptr;
using std::string;
using std::thread;
@@ -123,6 +124,7 @@ void NostrService::closeRelayConnections(RelayList relays)
}
};
+// TODO: Make this method return a promise.
tuple<RelayList, RelayList> NostrService::publishEvent(shared_ptr<Event> event)
{
RelayList successfulRelays;
@@ -151,12 +153,34 @@ tuple<RelayList, RelayList> NostrService::publishEvent(shared_ptr<Event> event)
vector<future<tuple<string, bool>>> publishFutures;
for (const string& relay : this->_activeRelays)
{
- PLOG_INFO << "Entering lambda.";
- future<tuple<string, bool>> publishFuture = async([this, relay, message]()
+ promise<tuple<string, bool>> publishPromise;
+ publishFutures.push_back(move(publishPromise.get_future()));
+
+ auto [targetRelay, isSuccess] = this->_client->send(
+ message.dump(),
+ relay,
+ [this, &relay, &event, &publishPromise](string response)
+ {
+ this->onAcceptance(response, [this, &relay, &event, &publishPromise](bool isAccepted)
+ {
+ if (isAccepted)
+ {
+ PLOG_INFO << "Relay " << relay << " accepted event: " << event->id;
+ publishPromise.set_value(make_tuple(relay, true));
+ }
+ else
+ {
+ PLOG_WARNING << "Relay " << relay << " rejected event: " << event->id;
+ publishPromise.set_value(make_tuple(relay, false));
+ }
+ });
+ });
+
+ if (!isSuccess)
{
- return this->_client->send(message.dump(), relay);
- });
- publishFutures.push_back(move(publishFuture));
+ PLOG_WARNING << "Failed to send event to relay: " << relay;
+ publishPromise.set_value(make_tuple(relay, false));
+ }
}
for (auto& publishFuture : publishFutures)
@@ -179,9 +203,72 @@ tuple<RelayList, RelayList> NostrService::publishEvent(shared_ptr<Event> event)
return make_tuple(successfulRelays, failedRelays);
};
+// TODO: Make this method return a promise.
+// TODO: Add a timeout to this method to prevent hanging while waiting for the relay.
+vector<shared_ptr<Event>> NostrService::queryRelays(shared_ptr<Filters> filters)
+{
+ if (filters->limit > 64 || filters->limit < 1)
+ {
+ PLOG_WARNING << "Filters limit must be between 1 and 64, inclusive. Setting limit to 16.";
+ filters->limit = 16;
+ }
+
+ vector<shared_ptr<Event>> events;
+
+ string subscriptionId = this->generateSubscriptionId();
+ string request = filters->serialize(subscriptionId);
+ vector<future<tuple<string, bool>>> requestFutures;
+
+ // Send the same query to each relay. As events trickle in from each relay, they will be added
+ // to the events vector. Multiple copies of an event may be received if the same event is
+ // stored on multiple relays. The function will block until all of the relays send an EOSE or
+ // CLOSE message.
+ for (const string relay : this->_activeRelays)
+ {
+ promise<tuple<string, bool>> eosePromise;
+ requestFutures.push_back(move(eosePromise.get_future()));
+
+ this->_client->send(
+ request,
+ relay,
+ [this, &relay, &events, &eosePromise](string payload)
+ {
+ this->onSubscriptionMessage(
+ payload,
+ [&events](const string&, shared_ptr<Event> event)
+ {
+ events.push_back(event);
+ },
+ [relay, &eosePromise](const string&)
+ {
+ eosePromise.set_value(make_tuple(relay, true));
+ },
+ [relay, &eosePromise](const string&, const string&)
+ {
+ eosePromise.set_value(make_tuple(relay, false));
+ });
+ });
+ }
+
+ for (auto& publishFuture : requestFutures)
+ {
+ auto [relay, isEose] = publishFuture.get();
+ if (!isEose)
+ {
+ PLOG_WARNING << "Receive CLOSE message from relay: " << relay;
+ }
+ }
+
+ // TODO: De-duplicate events in the vector before returning.
+
+ return events;
+};
+
string NostrService::queryRelays(
shared_ptr<Filters> filters,
- function<void(const string&, shared_ptr<Event>)> responseHandler)
+ function<void(const string&, shared_ptr<Event>)> eventHandler,
+ function<void(const string&)> eoseHandler,
+ function<void(const string&, const string&)> closeHandler)
{
RelayList successfulRelays;
RelayList failedRelays;
@@ -189,24 +276,22 @@ string NostrService::queryRelays(
string subscriptionId = this->generateSubscriptionId();
string request = filters->serialize(subscriptionId);
vector<future<tuple<string, bool>>> requestFutures;
- vector<future<void>> receiveFutures;
for (const string relay : this->_activeRelays)
{
this->_subscriptions[relay].push_back(subscriptionId);
- future<tuple<string, bool>> requestFuture = async([this, &relay, &request]()
- {
- return this->_client->send(request, relay);
- });
- requestFutures.push_back(move(requestFuture));
-
- auto receiveFuture = async([this, &relay, &responseHandler]()
- {
- this->_client->receive(relay, [this, responseHandler](string payload) {
- this->onMessage(payload, responseHandler);
+ future<tuple<string, bool>> requestFuture = async(
+ [this, &relay, &request, &eventHandler, &eoseHandler, &closeHandler]()
+ {
+ return this->_client->send(
+ request,
+ relay,
+ [this, &eventHandler, &eoseHandler, &closeHandler](string payload)
+ {
+ this->onSubscriptionMessage(payload, eventHandler, eoseHandler, closeHandler);
+ });
});
- });
- receiveFutures.push_back(move(receiveFuture));
+ requestFutures.push_back(move(requestFuture));
}
for (auto& publishFuture : requestFutures)
@@ -222,11 +307,6 @@ string NostrService::queryRelays(
}
}
- for (auto& receiveFuture : receiveFutures)
- {
- receiveFuture.get();
- }
-
size_t targetCount = this->_activeRelays.size();
size_t successfulCount = successfulRelays.size();
PLOG_INFO << "Sent query to " << successfulCount << "/" << targetCount << " open relay connections.";
@@ -446,9 +526,11 @@ bool NostrService::hasSubscription(string relay, string subscriptionId)
return false;
};
-void NostrService::onMessage(
+void NostrService::onSubscriptionMessage(
string message,
- function<void(const string&, shared_ptr<Event>)> eventHandler)
+ function<void(const string&, shared_ptr<Event>)> eventHandler,
+ function<void(const string&)> eoseHandler,
+ function<void(const string&, const string&)> closeHandler)
{
try
{
@@ -460,8 +542,36 @@ void NostrService::onMessage(
Event event = Event::fromString(jMessage[2]);
eventHandler(subscriptionId, make_shared<Event>(event));
}
+ else if (messageType == "EOSE")
+ {
+ string subscriptionId = jMessage[1];
+ eoseHandler(subscriptionId);
+ }
+ else if (messageType == "CLOSE")
+ {
+ string subscriptionId = jMessage[1];
+ string reason = jMessage[2];
+ closeHandler(subscriptionId, reason);
+ }
+ }
+ catch (const json::exception& je)
+ {
+ PLOG_ERROR << "JSON handling exception: " << je.what();
+ throw je;
+ }
+};
- // Support other message types here, if necessary.
+void NostrService::onAcceptance(string message, function<void(const bool)> acceptanceHandler)
+{
+ try
+ {
+ json jMessage = json::parse(message);
+ string messageType = jMessage[0];
+ if (messageType == "OK")
+ {
+ bool isAccepted = jMessage[2];
+ acceptanceHandler(isAccepted);
+ }
}
catch (const json::exception& je)
{
diff --git a/test/nostr_service_test.cpp b/test/nostr_service_test.cpp
index 7adda7e..854de78 100644
--- a/test/nostr_service_test.cpp
+++ b/test/nostr_service_test.cpp
@@ -20,8 +20,10 @@ using std::string;
using std::tuple;
using std::unordered_map;
using ::testing::_;
+using ::testing::Args;
using ::testing::Invoke;
using ::testing::Return;
+using ::testing::Truly;
namespace nostr_test
{
@@ -32,6 +34,7 @@ public:
MOCK_METHOD(void, openConnection, (string uri), (override));
MOCK_METHOD(bool, isConnected, (string uri), (override));
MOCK_METHOD((tuple<string, bool>), send, (string message, string uri), (override));
+ MOCK_METHOD((tuple<string, bool>), send, (string message, string uri, function<void(const string&)> messageHandler), (override));
MOCK_METHOD(void, receive, (string uri, function<void(const string&)> messageHandler), (override));
MOCK_METHOD(void, closeConnection, (string uri), (override));
};
@@ -531,70 +534,4 @@ TEST_F(NostrServiceTest, PublishEvent_CorrectlyIndicates_MixedSuccessesAndFailur
ASSERT_EQ(failures.size(), 1);
ASSERT_EQ(failures[0], defaultTestRelays[1]);
};
-
-TEST_F(NostrServiceTest, QueryRelays_UsesProvidedHandler_AndReturnsSubscriptionId)
-{
- mutex connectionStatusMutex;
- auto connectionStatus = make_shared<unordered_map<string, bool>>();
- connectionStatus->insert({ defaultTestRelays[0], false });
- connectionStatus->insert({ defaultTestRelays[1], false });
-
- EXPECT_CALL(*mockClient, isConnected(_))
- .WillRepeatedly(Invoke([connectionStatus, &connectionStatusMutex](string uri)
- {
- lock_guard<mutex> lock(connectionStatusMutex);
- bool status = connectionStatus->at(uri);
- if (status == false)
- {
- connectionStatus->at(uri) = true;
- }
- return status;
- }));
-
- auto nostrService = make_unique<nostr::NostrService>(
- testAppender,
- mockClient,
- fakeSigner,
- defaultTestRelays);
- nostrService->openRelayConnections();
-
- auto sentSubscriptionId = make_shared<string>();
- EXPECT_CALL(*mockClient, send(_, _))
- .Times(2)
- .WillRepeatedly(Invoke([sentSubscriptionId](string message, string uri)
- {
- json jarr = json::array();
- jarr = json::parse(message);
-
- string temp = jarr[1].dump();
- if (!temp.empty() && temp[0] == '\"' && temp[temp.size() - 1] == '\"')
- {
- *sentSubscriptionId = temp.substr(1, temp.size() - 2);
- }
-
- return make_tuple(uri, true);
- }));
- EXPECT_CALL(*mockClient, receive(_, _))
- .Times(2)
- .WillRepeatedly(Invoke([sentSubscriptionId](string _, function<void(const string&)> messageHandler)
- {
- auto event = make_shared<nostr::Event>(getTextNoteTestEvent());
- messageHandler(getTestEventMessage(event, *sentSubscriptionId));
- }));
-
- auto filters = make_shared<nostr::Filters>(getKind0And1TestFilters());
- nostr::Event expectedEvent = getTextNoteTestEvent();
- auto receivedSubscriptionId = nostrService->queryRelays(
- filters,
- [expectedEvent](const string& subscriptionId, shared_ptr<nostr::Event> event)
- {
- ASSERT_STREQ(event->pubkey.c_str(), expectedEvent.pubkey.c_str());
- ASSERT_EQ(event->kind, expectedEvent.kind);
- ASSERT_EQ(event->tags.size(), expectedEvent.tags.size());
- ASSERT_STREQ(event->content.c_str(), expectedEvent.content.c_str());
- ASSERT_GT(event->sig.size(), 0);
- });
-
- ASSERT_STREQ(receivedSubscriptionId.c_str(), sentSubscriptionId->c_str());
-};
} // namespace nostr_test