aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorLibravatar Michael Jurkoic <mjjurkoic@gmail.com>2024-04-10 21:33:45 -0500
committerLibravatar Michael Jurkoic <mjjurkoic@gmail.com>2024-04-10 21:33:45 -0500
commitc8bb6c8f56e0c6d93c8623722ab932c04de882b5 (patch)
treeb61f055748e8c8058f9d27c1441bf68d3bca030d /src
parent0a185a13aa4c202ad8d76ac3e62a878dc5f06619 (diff)
Handle relay response messages
These changes do not yet have unit tests.
Diffstat (limited to 'src')
-rw-r--r--src/client/websocketpp_client.cpp8
-rw-r--r--src/nostr_service.cpp164
2 files changed, 144 insertions, 28 deletions
diff --git a/src/client/websocketpp_client.cpp b/src/client/websocketpp_client.cpp
index 981d4ec..276c5dd 100644
--- a/src/client/websocketpp_client.cpp
+++ b/src/client/websocketpp_client.cpp
@@ -77,13 +77,19 @@ public:
if (error.value() == -1)
{
- // PLOG_ERROR << "Error publishing event to relay " << relay << ": " << error.message();
return make_tuple(uri, false);
}
return make_tuple(uri, true);
};
+ tuple<string, bool> send(string message, string uri, function<void(const string&)> messageHandler) override
+ {
+ auto successes = this->send(message, uri);
+ this->receive(uri, messageHandler);
+ return successes;
+ };
+
void receive(string uri, function<void(const string&)> messageHandler) override
{
lock_guard<mutex> lock(this->_propertyMutex);
diff --git a/src/nostr_service.cpp b/src/nostr_service.cpp
index 614e64f..e8f14f6 100644
--- a/src/nostr_service.cpp
+++ b/src/nostr_service.cpp
@@ -25,6 +25,7 @@ using std::make_tuple;
using std::move;
using std::mutex;
using std::out_of_range;
+using std::promise;
using std::shared_ptr;
using std::string;
using std::thread;
@@ -123,6 +124,7 @@ void NostrService::closeRelayConnections(RelayList relays)
}
};
+// TODO: Make this method return a promise.
tuple<RelayList, RelayList> NostrService::publishEvent(shared_ptr<Event> event)
{
RelayList successfulRelays;
@@ -151,12 +153,34 @@ tuple<RelayList, RelayList> NostrService::publishEvent(shared_ptr<Event> event)
vector<future<tuple<string, bool>>> publishFutures;
for (const string& relay : this->_activeRelays)
{
- PLOG_INFO << "Entering lambda.";
- future<tuple<string, bool>> publishFuture = async([this, relay, message]()
+ promise<tuple<string, bool>> publishPromise;
+ publishFutures.push_back(move(publishPromise.get_future()));
+
+ auto [targetRelay, isSuccess] = this->_client->send(
+ message.dump(),
+ relay,
+ [this, &relay, &event, &publishPromise](string response)
+ {
+ this->onAcceptance(response, [this, &relay, &event, &publishPromise](bool isAccepted)
+ {
+ if (isAccepted)
+ {
+ PLOG_INFO << "Relay " << relay << " accepted event: " << event->id;
+ publishPromise.set_value(make_tuple(relay, true));
+ }
+ else
+ {
+ PLOG_WARNING << "Relay " << relay << " rejected event: " << event->id;
+ publishPromise.set_value(make_tuple(relay, false));
+ }
+ });
+ });
+
+ if (!isSuccess)
{
- return this->_client->send(message.dump(), relay);
- });
- publishFutures.push_back(move(publishFuture));
+ PLOG_WARNING << "Failed to send event to relay: " << relay;
+ publishPromise.set_value(make_tuple(relay, false));
+ }
}
for (auto& publishFuture : publishFutures)
@@ -179,9 +203,72 @@ tuple<RelayList, RelayList> NostrService::publishEvent(shared_ptr<Event> event)
return make_tuple(successfulRelays, failedRelays);
};
+// TODO: Make this method return a promise.
+// TODO: Add a timeout to this method to prevent hanging while waiting for the relay.
+vector<shared_ptr<Event>> NostrService::queryRelays(shared_ptr<Filters> filters)
+{
+ if (filters->limit > 64 || filters->limit < 1)
+ {
+ PLOG_WARNING << "Filters limit must be between 1 and 64, inclusive. Setting limit to 16.";
+ filters->limit = 16;
+ }
+
+ vector<shared_ptr<Event>> events;
+
+ string subscriptionId = this->generateSubscriptionId();
+ string request = filters->serialize(subscriptionId);
+ vector<future<tuple<string, bool>>> requestFutures;
+
+ // Send the same query to each relay. As events trickle in from each relay, they will be added
+ // to the events vector. Multiple copies of an event may be received if the same event is
+ // stored on multiple relays. The function will block until all of the relays send an EOSE or
+ // CLOSE message.
+ for (const string relay : this->_activeRelays)
+ {
+ promise<tuple<string, bool>> eosePromise;
+ requestFutures.push_back(move(eosePromise.get_future()));
+
+ this->_client->send(
+ request,
+ relay,
+ [this, &relay, &events, &eosePromise](string payload)
+ {
+ this->onSubscriptionMessage(
+ payload,
+ [&events](const string&, shared_ptr<Event> event)
+ {
+ events.push_back(event);
+ },
+ [relay, &eosePromise](const string&)
+ {
+ eosePromise.set_value(make_tuple(relay, true));
+ },
+ [relay, &eosePromise](const string&, const string&)
+ {
+ eosePromise.set_value(make_tuple(relay, false));
+ });
+ });
+ }
+
+ for (auto& publishFuture : requestFutures)
+ {
+ auto [relay, isEose] = publishFuture.get();
+ if (!isEose)
+ {
+ PLOG_WARNING << "Receive CLOSE message from relay: " << relay;
+ }
+ }
+
+ // TODO: De-duplicate events in the vector before returning.
+
+ return events;
+};
+
string NostrService::queryRelays(
shared_ptr<Filters> filters,
- function<void(const string&, shared_ptr<Event>)> responseHandler)
+ function<void(const string&, shared_ptr<Event>)> eventHandler,
+ function<void(const string&)> eoseHandler,
+ function<void(const string&, const string&)> closeHandler)
{
RelayList successfulRelays;
RelayList failedRelays;
@@ -189,24 +276,22 @@ string NostrService::queryRelays(
string subscriptionId = this->generateSubscriptionId();
string request = filters->serialize(subscriptionId);
vector<future<tuple<string, bool>>> requestFutures;
- vector<future<void>> receiveFutures;
for (const string relay : this->_activeRelays)
{
this->_subscriptions[relay].push_back(subscriptionId);
- future<tuple<string, bool>> requestFuture = async([this, &relay, &request]()
- {
- return this->_client->send(request, relay);
- });
- requestFutures.push_back(move(requestFuture));
-
- auto receiveFuture = async([this, &relay, &responseHandler]()
- {
- this->_client->receive(relay, [this, responseHandler](string payload) {
- this->onMessage(payload, responseHandler);
+ future<tuple<string, bool>> requestFuture = async(
+ [this, &relay, &request, &eventHandler, &eoseHandler, &closeHandler]()
+ {
+ return this->_client->send(
+ request,
+ relay,
+ [this, &eventHandler, &eoseHandler, &closeHandler](string payload)
+ {
+ this->onSubscriptionMessage(payload, eventHandler, eoseHandler, closeHandler);
+ });
});
- });
- receiveFutures.push_back(move(receiveFuture));
+ requestFutures.push_back(move(requestFuture));
}
for (auto& publishFuture : requestFutures)
@@ -222,11 +307,6 @@ string NostrService::queryRelays(
}
}
- for (auto& receiveFuture : receiveFutures)
- {
- receiveFuture.get();
- }
-
size_t targetCount = this->_activeRelays.size();
size_t successfulCount = successfulRelays.size();
PLOG_INFO << "Sent query to " << successfulCount << "/" << targetCount << " open relay connections.";
@@ -446,9 +526,11 @@ bool NostrService::hasSubscription(string relay, string subscriptionId)
return false;
};
-void NostrService::onMessage(
+void NostrService::onSubscriptionMessage(
string message,
- function<void(const string&, shared_ptr<Event>)> eventHandler)
+ function<void(const string&, shared_ptr<Event>)> eventHandler,
+ function<void(const string&)> eoseHandler,
+ function<void(const string&, const string&)> closeHandler)
{
try
{
@@ -460,8 +542,36 @@ void NostrService::onMessage(
Event event = Event::fromString(jMessage[2]);
eventHandler(subscriptionId, make_shared<Event>(event));
}
+ else if (messageType == "EOSE")
+ {
+ string subscriptionId = jMessage[1];
+ eoseHandler(subscriptionId);
+ }
+ else if (messageType == "CLOSE")
+ {
+ string subscriptionId = jMessage[1];
+ string reason = jMessage[2];
+ closeHandler(subscriptionId, reason);
+ }
+ }
+ catch (const json::exception& je)
+ {
+ PLOG_ERROR << "JSON handling exception: " << je.what();
+ throw je;
+ }
+};
- // Support other message types here, if necessary.
+void NostrService::onAcceptance(string message, function<void(const bool)> acceptanceHandler)
+{
+ try
+ {
+ json jMessage = json::parse(message);
+ string messageType = jMessage[0];
+ if (messageType == "OK")
+ {
+ bool isAccepted = jMessage[2];
+ acceptanceHandler(isAccepted);
+ }
}
catch (const json::exception& je)
{