aboutsummaryrefslogtreecommitdiff
path: root/src/service/nostr_service_base.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/service/nostr_service_base.cpp')
-rw-r--r--src/service/nostr_service_base.cpp667
1 files changed, 667 insertions, 0 deletions
diff --git a/src/service/nostr_service_base.cpp b/src/service/nostr_service_base.cpp
new file mode 100644
index 0000000..ec847b3
--- /dev/null
+++ b/src/service/nostr_service_base.cpp
@@ -0,0 +1,667 @@
+#include <exception>
+#include <future>
+#include <stdexcept>
+#include <thread>
+
+#include <uuid_v4.h>
+
+#include "service/nostr_service_base.hpp"
+
+using namespace nlohmann;
+using namespace std;
+
+nostr::service::NostrServiceBase::NostrServiceBase(
+ shared_ptr<plog::IAppender> appender,
+ shared_ptr<client::IWebSocketClient> client)
+: NostrServiceBase(appender, client, {}) { };
+
+nostr::service::NostrServiceBase::NostrServiceBase(
+ shared_ptr<plog::IAppender> appender,
+ shared_ptr<client::IWebSocketClient> client,
+ vector<string> relays)
+: _defaultRelays(relays), _client(client)
+{
+ plog::init(plog::debug, appender.get());
+ client->start();
+};
+
+nostr::service::NostrServiceBase::~NostrServiceBase()
+{
+ this->_client->stop();
+};
+
+vector<string> nostr::service::NostrServiceBase::defaultRelays() const
+{ return this->_defaultRelays; };
+
+vector<string> nostr::service::NostrServiceBase::activeRelays() const
+{ return this->_activeRelays; };
+
+unordered_map<string, vector<string>> nostr::service::NostrServiceBase::subscriptions() const
+{ return this->_subscriptions; };
+
+vector<string> nostr::service::NostrServiceBase::openRelayConnections()
+{
+ return this->openRelayConnections(this->_defaultRelays);
+};
+
+vector<string> nostr::service::NostrServiceBase::openRelayConnections(vector<string> relays)
+{
+ PLOG_INFO << "Attempting to connect to Nostr relays.";
+ vector<string> unconnectedRelays = this->_getUnconnectedRelays(relays);
+
+ vector<thread> connectionThreads;
+ for (string relay : unconnectedRelays)
+ {
+ thread connectionThread([this, relay]() {
+ this->_connect(relay);
+ });
+ connectionThreads.push_back(move(connectionThread));
+ }
+
+ for (thread& connectionThread : connectionThreads)
+ {
+ connectionThread.join();
+ }
+
+ std::size_t targetCount = relays.size();
+ std::size_t activeCount = this->_activeRelays.size();
+ PLOG_INFO << "Connected to " << activeCount << "/" << targetCount << " target relays.";
+
+ // This property should only contain successful relays at this point.
+ return this->_activeRelays;
+};
+
+void nostr::service::NostrServiceBase::closeRelayConnections()
+{
+ if (this->_activeRelays.size() == 0)
+ {
+ PLOG_INFO << "No active relay connections to close.";
+ return;
+ }
+
+ this->closeRelayConnections(this->_activeRelays);
+};
+
+void nostr::service::NostrServiceBase::closeRelayConnections(vector<string> relays)
+{
+ PLOG_INFO << "Disconnecting from Nostr relays.";
+ vector<string> connectedRelays = this->_getConnectedRelays(relays);
+
+ vector<thread> disconnectionThreads;
+ for (string relay : connectedRelays)
+ {
+ thread disconnectionThread([this, relay]() {
+ this->_disconnect(relay);
+ });
+ disconnectionThreads.push_back(move(disconnectionThread));
+
+ // TODO: Close subscriptions before disconnecting.
+ lock_guard<mutex> lock(this->_propertyMutex);
+ this->_subscriptions.erase(relay);
+ }
+
+ for (thread& disconnectionThread : disconnectionThreads)
+ {
+ disconnectionThread.join();
+ }
+};
+
+// TODO: Make this method return a promise.
+tuple<vector<string>, vector<string>> nostr::service::NostrServiceBase::publishEvent(
+ shared_ptr<nostr::data::Event> event)
+{
+ vector<string> successfulRelays;
+ vector<string> failedRelays;
+
+ PLOG_INFO << "Attempting to publish event to Nostr relays.";
+
+ json message;
+ try
+ {
+ message = json::array({ "EVENT", event->serialize() });
+ }
+ catch (const std::invalid_argument& e)
+ {
+ PLOG_ERROR << "Failed to sign event: " << e.what();
+ throw e;
+ }
+ catch (const json::exception& je)
+ {
+ PLOG_ERROR << "Failed to serialize event: " << je.what();
+ throw je;
+ }
+
+ lock_guard<mutex> lock(this->_propertyMutex);
+ vector<string> targetRelays = this->_activeRelays;
+ vector<future<tuple<string, bool>>> publishFutures;
+ for (const string& relay : targetRelays)
+ {
+ promise<tuple<string, bool>> publishPromise;
+ publishFutures.push_back(move(publishPromise.get_future()));
+
+ auto [uri, success] = this->_client->send(
+ message.dump(),
+ relay,
+ [this, &relay, &event, &publishPromise](string response)
+ {
+ this->_onAcceptance(response, [this, &relay, &event, &publishPromise](bool isAccepted)
+ {
+ if (isAccepted)
+ {
+ PLOG_INFO << "Relay " << relay << " accepted event: " << event->id;
+ publishPromise.set_value(make_tuple(relay, true));
+ }
+ else
+ {
+ PLOG_WARNING << "Relay " << relay << " rejected event: " << event->id;
+ publishPromise.set_value(make_tuple(relay, false));
+ }
+ });
+ });
+
+ if (!success)
+ {
+ PLOG_WARNING << "Failed to send event to relay " << relay;
+ publishPromise.set_value(make_tuple(relay, false));
+ }
+ }
+
+ for (auto& publishFuture : publishFutures)
+ {
+ auto [relay, isSuccess] = publishFuture.get();
+ if (isSuccess)
+ {
+ successfulRelays.push_back(relay);
+ }
+ else
+ {
+ failedRelays.push_back(relay);
+ }
+ }
+
+ std::size_t targetCount = targetRelays.size();
+ std::size_t successfulCount = successfulRelays.size();
+ PLOG_INFO << "Published event to " << successfulCount << "/" << targetCount << " target relays.";
+
+ return make_tuple(successfulRelays, failedRelays);
+};
+
+// TODO: Make this method return a promise.
+// TODO: Add a timeout to this method to prevent hanging while waiting for the relay.
+vector<shared_ptr<nostr::data::Event>> nostr::service::NostrServiceBase::queryRelays(
+ shared_ptr<nostr::data::Filters> filters)
+{
+ if (filters->limit > 64 || filters->limit < 1)
+ {
+ PLOG_WARNING << "Filters limit must be between 1 and 64, inclusive. Setting limit to 16.";
+ filters->limit = 16;
+ }
+
+ vector<shared_ptr<nostr::data::Event>> events;
+
+ string subscriptionId = this->_generateSubscriptionId();
+ string request;
+
+ try
+ {
+ request = filters->serialize(subscriptionId);
+ }
+ catch (const invalid_argument& e)
+ {
+ PLOG_ERROR << "Failed to serialize filters - invalid object: " << e.what();
+ throw e;
+ }
+ catch (const json::exception& je)
+ {
+ PLOG_ERROR << "Failed to serialize filters - JSON exception: " << je.what();
+ throw je;
+ }
+
+ vector<future<tuple<string, bool>>> requestFutures;
+
+ // Send the same query to each relay. As events trickle in from each relay, they will be added
+ // to the events vector. Multiple copies of an event may be received if the same event is
+ // stored on multiple relays. The function will block until all of the relays send an EOSE or
+ // CLOSE message.
+ for (const string relay : this->_activeRelays)
+ {
+ promise<tuple<string, bool>> eosePromise;
+ requestFutures.push_back(move(eosePromise.get_future()));
+
+ auto [uri, success] = this->_client->send(
+ request,
+ relay,
+ [this, &relay, &events, &eosePromise](string payload)
+ {
+ this->_onSubscriptionMessage(
+ payload,
+ [&events](const string&, shared_ptr<nostr::data::Event> event)
+ {
+ events.push_back(event);
+ },
+ [relay, &eosePromise](const string&)
+ {
+ eosePromise.set_value(make_tuple(relay, true));
+ },
+ [relay, &eosePromise](const string&, const string&)
+ {
+ eosePromise.set_value(make_tuple(relay, false));
+ });
+ });
+
+ if (success)
+ {
+ PLOG_INFO << "Sent query to relay " << relay;
+ lock_guard<mutex> lock(this->_propertyMutex);
+ this->_subscriptions[subscriptionId].push_back(relay);
+ }
+ else
+ {
+ PLOG_WARNING << "Failed to send query to relay " << relay;
+ eosePromise.set_value(make_tuple(uri, false));
+ }
+ }
+
+ // Close open subscriptions and disconnect from relays after events are received.
+ for (auto& publishFuture : requestFutures)
+ {
+ auto [relay, isEose] = publishFuture.get();
+ if (isEose)
+ {
+ PLOG_INFO << "Received EOSE message from relay " << relay;
+ }
+ else
+ {
+ PLOG_WARNING << "Received CLOSE message from relay " << relay;
+ this->closeRelayConnections({ relay });
+ }
+ }
+ this->closeSubscription(subscriptionId);
+
+ // TODO: De-duplicate events in the vector before returning.
+
+ return events;
+};
+
+string nostr::service::NostrServiceBase::queryRelays(
+ shared_ptr<nostr::data::Filters> filters,
+ function<void(const string&, shared_ptr<nostr::data::Event>)> eventHandler,
+ function<void(const string&)> eoseHandler,
+ function<void(const string&, const string&)> closeHandler)
+{
+ vector<string> successfulRelays;
+ vector<string> failedRelays;
+
+ string subscriptionId = this->_generateSubscriptionId();
+ string request = filters->serialize(subscriptionId);
+ vector<future<tuple<string, bool>>> requestFutures;
+ for (const string relay : this->_activeRelays)
+ {
+ unique_lock<mutex> lock(this->_propertyMutex);
+ this->_subscriptions[subscriptionId].push_back(relay);
+ lock.unlock();
+
+ future<tuple<string, bool>> requestFuture = async(
+ [this, &relay, &request, &eventHandler, &eoseHandler, &closeHandler]()
+ {
+ return this->_client->send(
+ request,
+ relay,
+ [this, &eventHandler, &eoseHandler, &closeHandler](string payload)
+ {
+ this->_onSubscriptionMessage(payload, eventHandler, eoseHandler, closeHandler);
+ });
+ });
+ requestFutures.push_back(move(requestFuture));
+ }
+
+ for (auto& publishFuture : requestFutures)
+ {
+ auto [relay, isSuccess] = publishFuture.get();
+ if (isSuccess)
+ {
+ successfulRelays.push_back(relay);
+ }
+ else
+ {
+ failedRelays.push_back(relay);
+ }
+ }
+
+ std::size_t targetCount = this->_activeRelays.size();
+ std::size_t successfulCount = successfulRelays.size();
+ PLOG_INFO << "Sent query to " << successfulCount << "/" << targetCount << " open relay connections.";
+
+ return subscriptionId;
+};
+
+tuple<vector<string>, vector<string>> nostr::service::NostrServiceBase::closeSubscription(string subscriptionId)
+{
+ vector<string> successfulRelays;
+ vector<string> failedRelays;
+
+ vector<string> subscriptionRelays;
+ std::size_t subscriptionRelayCount;
+ vector<future<tuple<string, bool>>> closeFutures;
+
+ try
+ {
+ unique_lock<mutex> lock(this->_propertyMutex);
+ subscriptionRelays = this->_subscriptions.at(subscriptionId);
+ subscriptionRelayCount = subscriptionRelays.size();
+ lock.unlock();
+ }
+ catch (const out_of_range& oor)
+ {
+ PLOG_WARNING << "Subscription " << subscriptionId << " not found.";
+ return make_tuple(successfulRelays, failedRelays);
+ }
+
+ for (const string relay : subscriptionRelays)
+ {
+ future<tuple<string, bool>> closeFuture = async([this, subscriptionId, relay]()
+ {
+ bool success = this->closeSubscription(subscriptionId, relay);
+
+ return make_tuple(relay, success);
+ });
+ closeFutures.push_back(move(closeFuture));
+ }
+
+ for (auto& closeFuture : closeFutures)
+ {
+ auto [uri, success] = closeFuture.get();
+ if (success)
+ {
+ successfulRelays.push_back(uri);
+ }
+ else
+ {
+ failedRelays.push_back(uri);
+ }
+ }
+
+ std::size_t successfulCount = successfulRelays.size();
+ PLOG_INFO << "Sent CLOSE request for subscription " << subscriptionId << " to " << successfulCount << "/" << subscriptionRelayCount << " open relay connections.";
+
+ // If there were no failures, and the subscription has been closed on all of its relays, forget
+ // about the subscription.
+ if (failedRelays.empty())
+ {
+ lock_guard<mutex> lock(this->_propertyMutex);
+ this->_subscriptions.erase(subscriptionId);
+ }
+
+ return make_tuple(successfulRelays, failedRelays);
+};
+
+bool nostr::service::NostrServiceBase::closeSubscription(string subscriptionId, string relay)
+{
+ if (!this->_hasSubscription(subscriptionId, relay))
+ {
+ PLOG_WARNING << "Subscription " << subscriptionId << " not found on relay " << relay;
+ return false;
+ }
+
+ if (!this->_isConnected(relay))
+ {
+ PLOG_WARNING << "Relay " << relay << " is not connected.";
+ return false;
+ }
+
+ string request = this->_generateCloseRequest(subscriptionId);
+ auto [uri, success] = this->_client->send(request, relay);
+
+ if (success)
+ {
+ lock_guard<mutex> lock(this->_propertyMutex);
+ auto it = find(
+ this->_subscriptions[subscriptionId].begin(),
+ this->_subscriptions[subscriptionId].end(),
+ relay);
+
+ if (it != this->_subscriptions[subscriptionId].end())
+ {
+ this->_subscriptions[subscriptionId].erase(it);
+ }
+
+ PLOG_INFO << "Sent close request for subscription " << subscriptionId << " to relay " << relay;
+ }
+ else
+ {
+ PLOG_WARNING << "Failed to send close request to relay " << relay;
+ }
+
+ return success;
+};
+
+vector<string> nostr::service::NostrServiceBase::closeSubscriptions()
+{
+ unique_lock<mutex> lock(this->_propertyMutex);
+ vector<string> subscriptionIds;
+ for (auto& [subscriptionId, relays] : this->_subscriptions)
+ {
+ subscriptionIds.push_back(subscriptionId);
+ }
+ lock.unlock();
+
+ vector<string> remainingSubscriptions;
+ for (const string& subscriptionId : subscriptionIds)
+ {
+ auto [successes, failures] = this->closeSubscription(subscriptionId);
+ if (!failures.empty())
+ {
+ remainingSubscriptions.push_back(subscriptionId);
+ }
+ }
+
+ return remainingSubscriptions;
+};
+
+vector<string> nostr::service::NostrServiceBase::_getConnectedRelays(vector<string> relays)
+{
+ PLOG_VERBOSE << "Identifying connected relays.";
+ vector<string> connectedRelays;
+ for (string relay : relays)
+ {
+ bool isActive = find(this->_activeRelays.begin(), this->_activeRelays.end(), relay)
+ != this->_activeRelays.end();
+ bool isConnected = this->_client->isConnected(relay);
+ PLOG_VERBOSE << "Relay " << relay << " is active: " << isActive << ", is connected: " << isConnected;
+
+ if (isActive && isConnected)
+ {
+ connectedRelays.push_back(relay);
+ }
+ else if (isActive && !isConnected)
+ {
+ this->_eraseActiveRelay(relay);
+ }
+ else if (!isActive && isConnected)
+ {
+ this->_activeRelays.push_back(relay);
+ connectedRelays.push_back(relay);
+ }
+ }
+ return connectedRelays;
+};
+
+vector<string> nostr::service::NostrServiceBase::_getUnconnectedRelays(vector<string> relays)
+{
+ PLOG_VERBOSE << "Identifying unconnected relays.";
+ vector<string> unconnectedRelays;
+ for (string relay : relays)
+ {
+ bool isActive = find(this->_activeRelays.begin(), this->_activeRelays.end(), relay)
+ != this->_activeRelays.end();
+ bool isConnected = this->_client->isConnected(relay);
+ PLOG_VERBOSE << "Relay " << relay << " is active: " << isActive << ", is connected: " << isConnected;
+
+ if (!isActive && !isConnected)
+ {
+ PLOG_VERBOSE << "Relay " << relay << " is not active and not connected.";
+ unconnectedRelays.push_back(relay);
+ }
+ else if (isActive && !isConnected)
+ {
+ PLOG_VERBOSE << "Relay " << relay << " is active but not connected. Removing from active relays list.";
+ this->_eraseActiveRelay(relay);
+ unconnectedRelays.push_back(relay);
+ }
+ else if (!isActive && isConnected)
+ {
+ PLOG_VERBOSE << "Relay " << relay << " is connected but not active. Adding to active relays list.";
+ this->_activeRelays.push_back(relay);
+ }
+ }
+ return unconnectedRelays;
+};
+
+bool nostr::service::NostrServiceBase::_isConnected(string relay)
+{
+ auto it = find(this->_activeRelays.begin(), this->_activeRelays.end(), relay);
+ if (it != this->_activeRelays.end()) // If the relay is in this->_activeRelays
+ {
+ return true;
+ }
+ return false;
+};
+
+void nostr::service::NostrServiceBase::_eraseActiveRelay(string relay)
+{
+ auto it = find(this->_activeRelays.begin(), this->_activeRelays.end(), relay);
+ if (it != this->_activeRelays.end()) // If the relay is in this->_activeRelays
+ {
+ this->_activeRelays.erase(it);
+ }
+};
+
+void nostr::service::NostrServiceBase::_connect(string relay)
+{
+ PLOG_VERBOSE << "Connecting to relay " << relay;
+ this->_client->openConnection(relay);
+
+ lock_guard<mutex> lock(this->_propertyMutex);
+ bool isConnected = this->_client->isConnected(relay);
+
+ if (isConnected)
+ {
+ PLOG_VERBOSE << "Connected to relay " << relay << ": " << isConnected;
+ this->_activeRelays.push_back(relay);
+ }
+ else
+ {
+ PLOG_ERROR << "Failed to connect to relay " << relay;
+ }
+};
+
+void nostr::service::NostrServiceBase::_disconnect(string relay)
+{
+ this->_client->closeConnection(relay);
+
+ lock_guard<mutex> lock(this->_propertyMutex);
+ this->_eraseActiveRelay(relay);
+};
+
+string nostr::service::NostrServiceBase::_generateSubscriptionId()
+{
+ UUIDv4::UUIDGenerator<std::mt19937_64> uuidGenerator;
+ UUIDv4::UUID uuid = uuidGenerator.getUUID();
+ return uuid.str();
+};
+
+string nostr::service::NostrServiceBase::_generateCloseRequest(string subscriptionId)
+{
+ json jarr = json::array({ "CLOSE", subscriptionId });
+ return jarr.dump();
+};
+
+bool nostr::service::NostrServiceBase::_hasSubscription(string subscriptionId)
+{
+ lock_guard<mutex> lock(this->_propertyMutex);
+ auto it = this->_subscriptions.find(subscriptionId);
+
+ return it != this->_subscriptions.end();
+};
+
+bool nostr::service::NostrServiceBase::_hasSubscription(string subscriptionId, string relay)
+{
+ lock_guard<mutex> lock(this->_propertyMutex);
+ auto subscriptionIt = this->_subscriptions.find(subscriptionId);
+
+ if (subscriptionIt == this->_subscriptions.end())
+ {
+ return false;
+ }
+
+ auto relays = this->_subscriptions[subscriptionId];
+ auto relayIt = find(relays.begin(), relays.end(), relay);
+
+ return relayIt != relays.end();
+};
+
+void nostr::service::NostrServiceBase::_onSubscriptionMessage(
+ string message,
+ function<void(const string&, shared_ptr<nostr::data::Event>)> eventHandler,
+ function<void(const string&)> eoseHandler,
+ function<void(const string&, const string&)> closeHandler)
+{
+ try
+ {
+ json jMessage = json::parse(message);
+ string messageType = jMessage.at(0);
+ if (messageType == "EVENT")
+ {
+ string subscriptionId = jMessage.at(1);
+ nostr::data::Event event = nostr::data::Event::fromString(jMessage.at(2));
+ eventHandler(subscriptionId, make_shared<nostr::data::Event>(event));
+ }
+ else if (messageType == "EOSE")
+ {
+ string subscriptionId = jMessage.at(1);
+ eoseHandler(subscriptionId);
+ }
+ else if (messageType == "CLOSE")
+ {
+ string subscriptionId = jMessage.at(1);
+ string reason = jMessage.at(2);
+ closeHandler(subscriptionId, reason);
+ }
+ }
+ catch (const json::out_of_range& joor)
+ {
+ PLOG_ERROR << "JSON out-of-range exception: " << joor.what();
+ throw joor;
+ }
+ catch (const json::exception& je)
+ {
+ PLOG_ERROR << "JSON handling exception: " << je.what();
+ throw je;
+ }
+ catch (const invalid_argument& ia)
+ {
+ PLOG_ERROR << "Invalid argument exception: " << ia.what();
+ throw ia;
+ }
+};
+
+void nostr::service::NostrServiceBase::_onAcceptance(
+ string message,
+ function<void(const bool)> acceptanceHandler)
+{
+ try
+ {
+ json jMessage = json::parse(message);
+ string messageType = jMessage[0];
+ if (messageType == "OK")
+ {
+ bool isAccepted = jMessage[2];
+ acceptanceHandler(isAccepted);
+ }
+ }
+ catch (const json::exception& je)
+ {
+ PLOG_ERROR << "JSON handling exception: " << je.what();
+ throw je;
+ }
+};