aboutsummaryrefslogtreecommitdiff
path: root/src/nostr_service.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/nostr_service.cpp')
-rw-r--r--src/nostr_service.cpp74
1 files changed, 48 insertions, 26 deletions
diff --git a/src/nostr_service.cpp b/src/nostr_service.cpp
index 73ce95e..d1744e3 100644
--- a/src/nostr_service.cpp
+++ b/src/nostr_service.cpp
@@ -130,16 +130,21 @@ tuple<RelayList, RelayList> NostrService::publishEvent(shared_ptr<Event> event)
PLOG_INFO << "Attempting to publish event to Nostr relays.";
- string serializedEvent;
+ json message;
try
{
this->_signer->sign(event);
- serializedEvent = event->serialize();
+ message = json::array({ "EVENT", event->serialize() });
}
- catch (const std::invalid_argument& error)
+ catch (const std::invalid_argument& e)
{
- PLOG_ERROR << "Failed to sign event: " << error.what();
- throw error;
+ PLOG_ERROR << "Failed to sign event: " << e.what();
+ throw e;
+ }
+ catch (const json::exception& je)
+ {
+ PLOG_ERROR << "Failed to serialize event: " << je.what();
+ throw je;
}
lock_guard<mutex> lock(this->_propertyMutex);
@@ -147,8 +152,9 @@ tuple<RelayList, RelayList> NostrService::publishEvent(shared_ptr<Event> event)
for (const string& relay : this->_activeRelays)
{
PLOG_INFO << "Entering lambda.";
- future<tuple<string, bool>> publishFuture = async([this, relay, serializedEvent]() {
- return this->_client->send(serializedEvent, relay);
+ future<tuple<string, bool>> publishFuture = async([this, relay, message]()
+ {
+ return this->_client->send(message.dump(), relay);
});
publishFutures.push_back(move(publishFuture));
}
@@ -176,7 +182,6 @@ tuple<RelayList, RelayList> NostrService::publishEvent(shared_ptr<Event> event)
string NostrService::queryRelays(shared_ptr<Filters> filters)
{
return this->queryRelays(filters, [this](string subscriptionId, shared_ptr<Event> event) {
- lock_guard<mutex> lock(this->_propertyMutex);
this->_lastRead[subscriptionId] = event->id;
this->onEvent(subscriptionId, event);
});
@@ -190,21 +195,26 @@ string NostrService::queryRelays(
RelayList failedRelays;
string subscriptionId = this->generateSubscriptionId();
+ string request = filters->serialize(subscriptionId);
vector<future<tuple<string, bool>>> requestFutures;
+ vector<future<void>> receiveFutures;
for (const string relay : this->_activeRelays)
{
- lock_guard<mutex> lock(this->_propertyMutex);
this->_subscriptions[relay].push_back(subscriptionId);
- string request = filters->serialize(subscriptionId);
-
- future<tuple<string, bool>> requestFuture = async([this, &relay, &request]() {
+
+ future<tuple<string, bool>> requestFuture = async([this, &relay, &request]()
+ {
return this->_client->send(request, relay);
});
requestFutures.push_back(move(requestFuture));
- this->_client->receive(relay, [this, responseHandler](string payload) {
- this->onMessage(payload, responseHandler);
+ auto receiveFuture = async([this, &relay, &responseHandler]()
+ {
+ this->_client->receive(relay, [this, responseHandler](string payload) {
+ this->onMessage(payload, responseHandler);
+ });
});
+ receiveFutures.push_back(move(receiveFuture));
}
for (auto& publishFuture : requestFutures)
@@ -220,6 +230,11 @@ string NostrService::queryRelays(
}
}
+ for (auto& receiveFuture : receiveFutures)
+ {
+ receiveFuture.get();
+ }
+
size_t targetCount = this->_activeRelays.size();
size_t successfulCount = successfulRelays.size();
PLOG_INFO << "Sent query to " << successfulCount << "/" << targetCount << " open relay connections.";
@@ -287,7 +302,8 @@ tuple<RelayList, RelayList> NostrService::closeSubscription(string subscriptionI
}
string request = this->generateCloseRequest(subscriptionId);
- future<tuple<string, bool>> closeFuture = async([this, &relay, &request]() {
+ future<tuple<string, bool>> closeFuture = async([this, &relay, &request]()
+ {
return this->_client->send(request, relay);
});
closeFutures.push_back(move(closeFuture));
@@ -326,7 +342,8 @@ tuple<RelayList, RelayList> NostrService::closeSubscriptions(RelayList relays)
vector<future<tuple<RelayList, RelayList>>> closeFutures;
for (const string relay : relays)
{
- future<tuple<RelayList, RelayList>> closeFuture = async([this, &relay]() {
+ future<tuple<RelayList, RelayList>> closeFuture = async([this, &relay]()
+ {
RelayList successfulRelays;
RelayList failedRelays;
@@ -487,19 +504,24 @@ void NostrService::onMessage(
string message,
function<void(const string&, shared_ptr<Event>)> eventHandler)
{
- json jarr = json::array();
- jarr = json::parse(message);
-
- string messageType = jarr[0];
+ try
+ {
+ json jMessage = json::parse(message);
+ string messageType = jMessage[0];
+ if (messageType == "EVENT")
+ {
+ string subscriptionId = jMessage[1];
+ Event event = Event::fromString(jMessage[2]);
+ eventHandler(subscriptionId, make_shared<Event>(event));
+ }
- if (messageType == "EVENT")
+ // Support other message types here, if necessary.
+ }
+ catch (const json::exception& je)
{
- string subscriptionId = jarr[1];
- Event event = Event::fromJson(jarr[2]);
- eventHandler(subscriptionId, make_shared<Event>(event));
+ PLOG_ERROR << "JSON handling exception: " << je.what();
+ throw je;
}
-
- // Support other message types here, if necessary.
};
void NostrService::onEvent(string subscriptionId, shared_ptr<Event> event)