aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorLibravatar Michael J <37635304+buttercat1791@users.noreply.github.com>2024-05-09 08:28:50 -0500
committerLibravatar GitHub <noreply@github.com>2024-05-09 08:28:50 -0500
commit663fb4e7199e1b4318a5bc107096f6a529823e02 (patch)
tree7d92570de852f97550f84140bd794fa5d64c567b /src
parent0d87b4053983ec8edaff5b73491b717866876586 (diff)
parentd6faf6c815611450d1b61045b53525d7f25ac5c9 (diff)
Merge pull request #3 from buttercat1791/relay-readv0.0.2
Full Relay Read/Write Support
Diffstat (limited to 'src')
-rw-r--r--src/client/websocketpp_client.cpp30
-rw-r--r--src/event.cpp154
-rw-r--r--src/filters.cpp67
-rw-r--r--src/nostr_service.cpp504
4 files changed, 672 insertions, 83 deletions
diff --git a/src/client/websocketpp_client.cpp b/src/client/websocketpp_client.cpp
index 1386e1a..baae054 100644
--- a/src/client/websocketpp_client.cpp
+++ b/src/client/websocketpp_client.cpp
@@ -1,9 +1,7 @@
-#include <websocketpp/client.hpp>
-#include <websocketpp/config/asio_client.hpp>
-
#include "web_socket_client.hpp"
using std::error_code;
+using std::function;
using std::lock_guard;
using std::make_tuple;
using std::mutex;
@@ -76,13 +74,33 @@ public:
if (error.value() == -1)
{
- // PLOG_ERROR << "Error publishing event to relay " << relay << ": " << error.message();
return make_tuple(uri, false);
}
return make_tuple(uri, true);
};
+ tuple<string, bool> send(string message, string uri, function<void(const string&)> messageHandler) override
+ {
+ auto successes = this->send(message, uri);
+ this->receive(uri, messageHandler);
+ return successes;
+ };
+
+ void receive(string uri, function<void(const string&)> messageHandler) override
+ {
+ lock_guard<mutex> lock(this->_propertyMutex);
+ auto connectionHandle = this->_connectionHandles[uri];
+ auto connection = this->_client.get_con_from_hdl(connectionHandle);
+
+ connection->set_message_handler([messageHandler](
+ websocketpp::connection_hdl connectionHandle,
+ websocketpp_client::message_ptr message)
+ {
+ messageHandler(message->get_payload());
+ });
+ };
+
void closeConnection(string uri) override
{
lock_guard<mutex> lock(this->_propertyMutex);
@@ -103,5 +121,9 @@ private:
websocketpp_client _client;
unordered_map<string, websocketpp::connection_hdl> _connectionHandles;
mutex _propertyMutex;
+
+ void onMessage(websocketpp::connection_hdl handle, websocketpp_client::message_ptr message)
+ {
+ };
};
} // namespace client
diff --git a/src/event.cpp b/src/event.cpp
index 75f2ee8..703efae 100644
--- a/src/event.cpp
+++ b/src/event.cpp
@@ -1,38 +1,126 @@
-#pragma once
-
-#include <string>
-#include <vector>
-#include <nlohmann/json.hpp>
+#include <ctime>
#include "nostr.hpp"
-using std::string;
+using namespace nlohmann;
+using namespace std;
+
+namespace nostr
+{
+string Event::serialize()
+{
+ try
+ {
+ this->validate();
+ }
+ catch (const invalid_argument& e)
+ {
+ throw e;
+ }
+
+ json j = {
+ {"pubkey", this->pubkey},
+ {"created_at", this->createdAt},
+ {"kind", this->kind},
+ {"tags", this->tags},
+ {"content", this->content},
+ {"sig", this->sig}};
+
+ j["id"] = this->generateId(j.dump());
+
+ return j.dump();
+};
+
+Event Event::fromString(string jstr)
+{
+ json j = json::parse(jstr);
+ Event event;
+
+ try
+ {
+ event = Event::fromJson(j);
+ }
+ catch (const invalid_argument& e)
+ {
+ throw e;
+ }
+
+ return event;
+};
+
+Event Event::fromJson(json j)
+{
+ Event event;
+
+ try {
+ event.id = j.at("id");
+ event.pubkey = j.at("pubkey");
+ event.createdAt = j.at("created_at");
+ event.kind = j.at("kind");
+ event.tags = j.at("tags");
+ event.content = j.at("content");
+ event.sig = j.at("sig");
+ } catch (const json::out_of_range& e) {
+ ostringstream oss;
+ oss << "Event::fromJson: Tried to access an out-of-range element: " << e.what();
+ throw invalid_argument(oss.str());
+ }
-namespace nostr
+ return event;
+};
+
+void Event::validate()
+{
+ bool hasPubkey = this->pubkey.length() > 0;
+ if (!hasPubkey)
+ {
+ throw std::invalid_argument("Event::validate: The pubkey of the event author is required.");
+ }
+
+ bool hasCreatedAt = this->createdAt > 0;
+ if (!hasCreatedAt)
+ {
+ this->createdAt = time(nullptr);
+ }
+
+ bool hasKind = this->kind >= 0 && this->kind < 40000;
+ if (!hasKind)
+ {
+ throw std::invalid_argument("Event::validate: A valid event kind is required.");
+ }
+
+ bool hasSignature = this->sig.length() > 0;
+ if (!hasSignature)
+ {
+ throw std::invalid_argument("Event::validate: The event must be signed.");
+ }
+};
+
+string Event::generateId(string serializedData) const
{
- nlohmann::json Event::serialize() const
- {
- nlohmann::json j = {
- {"id", this->id},
- {"pubkey", this->pubkey},
- {"created_at", this->created_at},
- {"kind", this->kind},
- {"tags", this->tags},
- {"content", this->content},
- {"sig", this->sig}
- };
- return j.dump();
- };
-
- void Event::deserialize(string jsonString)
- {
- nlohmann::json j = nlohmann::json::parse(jsonString);
- this->id = j["id"];
- this->pubkey = j["pubkey"];
- this->created_at = j["created_at"];
- this->kind = j["kind"];
- this->tags = j["tags"];
- this->content = j["content"];
- this->sig = j["sig"];
- };
-}
+ unsigned char hash[SHA256_DIGEST_LENGTH];
+ EVP_Digest(serializedData.c_str(), serializedData.length(), hash, NULL, EVP_sha256(), NULL);
+
+ stringstream ss;
+ for (int i = 0; i < SHA256_DIGEST_LENGTH; i++)
+ {
+ ss << hex << setw(2) << setfill('0') << (int)hash[i];
+ }
+
+ return ss.str();
+};
+
+bool Event::operator==(const Event& other) const
+{
+ if (this->id.empty())
+ {
+ throw invalid_argument("Event::operator==: Cannot check equality, the left-side argument is undefined.");
+ }
+ if (other.id.empty())
+ {
+ throw invalid_argument("Event::operator==: Cannot check equality, the right-side argument is undefined.");
+ }
+
+ return this->id == other.id;
+};
+} // namespace nostr
diff --git a/src/filters.cpp b/src/filters.cpp
new file mode 100644
index 0000000..40596eb
--- /dev/null
+++ b/src/filters.cpp
@@ -0,0 +1,67 @@
+#include "nostr.hpp"
+
+using namespace nlohmann;
+using namespace std;
+
+namespace nostr
+{
+string Filters::serialize(string& subscriptionId)
+{
+ try
+ {
+ this->validate();
+ }
+ catch (const invalid_argument& e)
+ {
+ throw e;
+ }
+
+ json j = {
+ {"ids", this->ids},
+ {"authors", this->authors},
+ {"kinds", this->kinds},
+ {"since", this->since},
+ {"until", this->until},
+ {"limit", this->limit}};
+
+ for (auto& tag : this->tags)
+ {
+ stringstream ss;
+ ss << "#" << tag.first;
+ string tagname = ss.str();
+
+ j[tagname] = tag.second;
+ }
+
+ json jarr = json::array({ "REQ", subscriptionId, j });
+
+ return jarr.dump();
+};
+
+void Filters::validate()
+{
+ bool hasLimit = this->limit > 0;
+ if (!hasLimit)
+ {
+ throw invalid_argument("Filters::validate: The limit must be greater than 0.");
+ }
+
+ bool hasUntil = this->until > 0;
+ if (!hasUntil)
+ {
+ this->until = time(nullptr);
+ }
+
+ bool hasIds = this->ids.size() > 0;
+ bool hasAuthors = this->authors.size() > 0;
+ bool hasKinds = this->kinds.size() > 0;
+ bool hasTags = this->tags.size() > 0;
+
+ bool hasFilter = hasIds || hasAuthors || hasKinds || hasTags;
+
+ if (!hasFilter)
+ {
+ throw invalid_argument("Filters::validate: At least one filter must be set.");
+ }
+};
+} // namespace nostr
diff --git a/src/nostr_service.cpp b/src/nostr_service.cpp
index 09be6e3..664243f 100644
--- a/src/nostr_service.cpp
+++ b/src/nostr_service.cpp
@@ -1,53 +1,48 @@
-#include <plog/Init.h>
-#include <plog/Log.h>
-#include <websocketpp/client.hpp>
-#include <websocketpp/config/asio_client.hpp>
-
#include "nostr.hpp"
#include "client/web_socket_client.hpp"
-using std::async;
-using std::future;
-using std::lock_guard;
-using std::make_tuple;
-using std::move;
-using std::mutex;
-using std::string;
-using std::thread;
-using std::tuple;
-using std::vector;
+using namespace nlohmann;
+using namespace std;
namespace nostr
{
-NostrService::NostrService(plog::IAppender* appender, client::IWebSocketClient* client)
- : NostrService(appender, client, {}) { };
-
-NostrService::NostrService(plog::IAppender* appender, client::IWebSocketClient* client, RelayList relays)
- : _defaultRelays(relays), _client(client)
-{
- plog::init(plog::debug, appender);
+NostrService::NostrService(
+ shared_ptr<plog::IAppender> appender,
+ shared_ptr<client::IWebSocketClient> client,
+ shared_ptr<ISigner> signer)
+: NostrService(appender, client, signer, {}) { };
+
+NostrService::NostrService(
+ shared_ptr<plog::IAppender> appender,
+ shared_ptr<client::IWebSocketClient> client,
+ shared_ptr<ISigner> signer,
+ vector<string> relays)
+: _defaultRelays(relays), _client(client), _signer(signer)
+{
+ plog::init(plog::debug, appender.get());
client->start();
};
NostrService::~NostrService()
{
this->_client->stop();
- delete this->_client;
};
-RelayList NostrService::defaultRelays() const { return this->_defaultRelays; };
+vector<string> NostrService::defaultRelays() const { return this->_defaultRelays; };
-RelayList NostrService::activeRelays() const { return this->_activeRelays; };
+vector<string> NostrService::activeRelays() const { return this->_activeRelays; };
-RelayList NostrService::openRelayConnections()
+unordered_map<string, vector<string>> NostrService::subscriptions() const { return this->_subscriptions; };
+
+vector<string> NostrService::openRelayConnections()
{
return this->openRelayConnections(this->_defaultRelays);
};
-RelayList NostrService::openRelayConnections(RelayList relays)
+vector<string> NostrService::openRelayConnections(vector<string> relays)
{
PLOG_INFO << "Attempting to connect to Nostr relays.";
- RelayList unconnectedRelays = this->getUnconnectedRelays(relays);
+ vector<string> unconnectedRelays = this->getUnconnectedRelays(relays);
vector<thread> connectionThreads;
for (string relay : unconnectedRelays)
@@ -82,10 +77,10 @@ void NostrService::closeRelayConnections()
this->closeRelayConnections(this->_activeRelays);
};
-void NostrService::closeRelayConnections(RelayList relays)
+void NostrService::closeRelayConnections(vector<string> relays)
{
PLOG_INFO << "Disconnecting from Nostr relays.";
- RelayList connectedRelays = getConnectedRelays(relays);
+ vector<string> connectedRelays = getConnectedRelays(relays);
vector<thread> disconnectionThreads;
for (string relay : connectedRelays)
@@ -94,6 +89,10 @@ void NostrService::closeRelayConnections(RelayList relays)
this->disconnect(relay);
});
disconnectionThreads.push_back(move(disconnectionThread));
+
+ // TODO: Close subscriptions before disconnecting.
+ lock_guard<mutex> lock(this->_propertyMutex);
+ this->_subscriptions.erase(relay);
}
for (thread& disconnectionThread : disconnectionThreads)
@@ -102,23 +101,64 @@ void NostrService::closeRelayConnections(RelayList relays)
}
};
-tuple<RelayList, RelayList> NostrService::publishEvent(Event event)
+// TODO: Make this method return a promise.
+tuple<vector<string>, vector<string>> NostrService::publishEvent(shared_ptr<Event> event)
{
- // TODO: Add validation function.
-
- RelayList successfulRelays;
- RelayList failedRelays;
+ vector<string> successfulRelays;
+ vector<string> failedRelays;
PLOG_INFO << "Attempting to publish event to Nostr relays.";
+ json message;
+ try
+ {
+ this->_signer->sign(event);
+ message = json::array({ "EVENT", event->serialize() });
+ }
+ catch (const std::invalid_argument& e)
+ {
+ PLOG_ERROR << "Failed to sign event: " << e.what();
+ throw e;
+ }
+ catch (const json::exception& je)
+ {
+ PLOG_ERROR << "Failed to serialize event: " << je.what();
+ throw je;
+ }
+
+ lock_guard<mutex> lock(this->_propertyMutex);
+ vector<string> targetRelays = this->_activeRelays;
vector<future<tuple<string, bool>>> publishFutures;
- for (string relay : this->_activeRelays)
+ for (const string& relay : targetRelays)
{
- future<tuple<string, bool>> publishFuture = async([this, relay, event]() {
- return this->_client->send(event.serialize(), relay);
- });
-
- publishFutures.push_back(move(publishFuture));
+ promise<tuple<string, bool>> publishPromise;
+ publishFutures.push_back(move(publishPromise.get_future()));
+
+ auto [uri, success] = this->_client->send(
+ message.dump(),
+ relay,
+ [this, &relay, &event, &publishPromise](string response)
+ {
+ this->onAcceptance(response, [this, &relay, &event, &publishPromise](bool isAccepted)
+ {
+ if (isAccepted)
+ {
+ PLOG_INFO << "Relay " << relay << " accepted event: " << event->id;
+ publishPromise.set_value(make_tuple(relay, true));
+ }
+ else
+ {
+ PLOG_WARNING << "Relay " << relay << " rejected event: " << event->id;
+ publishPromise.set_value(make_tuple(relay, false));
+ }
+ });
+ });
+
+ if (!success)
+ {
+ PLOG_WARNING << "Failed to send event to relay " << relay;
+ publishPromise.set_value(make_tuple(relay, false));
+ }
}
for (auto& publishFuture : publishFutures)
@@ -134,17 +174,288 @@ tuple<RelayList, RelayList> NostrService::publishEvent(Event event)
}
}
- size_t targetCount = this->_activeRelays.size();
+ size_t targetCount = targetRelays.size();
size_t successfulCount = successfulRelays.size();
PLOG_INFO << "Published event to " << successfulCount << "/" << targetCount << " target relays.";
return make_tuple(successfulRelays, failedRelays);
};
-RelayList NostrService::getConnectedRelays(RelayList relays)
+// TODO: Make this method return a promise.
+// TODO: Add a timeout to this method to prevent hanging while waiting for the relay.
+vector<shared_ptr<Event>> NostrService::queryRelays(shared_ptr<Filters> filters)
+{
+ if (filters->limit > 64 || filters->limit < 1)
+ {
+ PLOG_WARNING << "Filters limit must be between 1 and 64, inclusive. Setting limit to 16.";
+ filters->limit = 16;
+ }
+
+ vector<shared_ptr<Event>> events;
+
+ string subscriptionId = this->generateSubscriptionId();
+ string request;
+
+ try
+ {
+ request = filters->serialize(subscriptionId);
+ }
+ catch (const invalid_argument& e)
+ {
+ PLOG_ERROR << "Failed to serialize filters - invalid object: " << e.what();
+ throw e;
+ }
+ catch (const json::exception& je)
+ {
+ PLOG_ERROR << "Failed to serialize filters - JSON exception: " << je.what();
+ throw je;
+ }
+
+ vector<future<tuple<string, bool>>> requestFutures;
+
+ // Send the same query to each relay. As events trickle in from each relay, they will be added
+ // to the events vector. Multiple copies of an event may be received if the same event is
+ // stored on multiple relays. The function will block until all of the relays send an EOSE or
+ // CLOSE message.
+ for (const string relay : this->_activeRelays)
+ {
+ promise<tuple<string, bool>> eosePromise;
+ requestFutures.push_back(move(eosePromise.get_future()));
+
+ auto [uri, success] = this->_client->send(
+ request,
+ relay,
+ [this, &relay, &events, &eosePromise](string payload)
+ {
+ this->onSubscriptionMessage(
+ payload,
+ [&events](const string&, shared_ptr<Event> event)
+ {
+ events.push_back(event);
+ },
+ [relay, &eosePromise](const string&)
+ {
+ eosePromise.set_value(make_tuple(relay, true));
+ },
+ [relay, &eosePromise](const string&, const string&)
+ {
+ eosePromise.set_value(make_tuple(relay, false));
+ });
+ });
+
+ if (success)
+ {
+ PLOG_INFO << "Sent query to relay " << relay;
+ lock_guard<mutex> lock(this->_propertyMutex);
+ this->_subscriptions[subscriptionId].push_back(relay);
+ }
+ else
+ {
+ PLOG_WARNING << "Failed to send query to relay " << relay;
+ eosePromise.set_value(make_tuple(uri, false));
+ }
+ }
+
+ // Close open subscriptions and disconnect from relays after events are received.
+ for (auto& publishFuture : requestFutures)
+ {
+ auto [relay, isEose] = publishFuture.get();
+ if (isEose)
+ {
+ PLOG_INFO << "Received EOSE message from relay " << relay;
+ }
+ else
+ {
+ PLOG_WARNING << "Received CLOSE message from relay " << relay;
+ this->closeRelayConnections({ relay });
+ }
+ }
+ this->closeSubscription(subscriptionId);
+
+ // TODO: De-duplicate events in the vector before returning.
+
+ return events;
+};
+
+string NostrService::queryRelays(
+ shared_ptr<Filters> filters,
+ function<void(const string&, shared_ptr<Event>)> eventHandler,
+ function<void(const string&)> eoseHandler,
+ function<void(const string&, const string&)> closeHandler)
+{
+ vector<string> successfulRelays;
+ vector<string> failedRelays;
+
+ string subscriptionId = this->generateSubscriptionId();
+ string request = filters->serialize(subscriptionId);
+ vector<future<tuple<string, bool>>> requestFutures;
+ for (const string relay : this->_activeRelays)
+ {
+ unique_lock<mutex> lock(this->_propertyMutex);
+ this->_subscriptions[subscriptionId].push_back(relay);
+ lock.unlock();
+
+ future<tuple<string, bool>> requestFuture = async(
+ [this, &relay, &request, &eventHandler, &eoseHandler, &closeHandler]()
+ {
+ return this->_client->send(
+ request,
+ relay,
+ [this, &eventHandler, &eoseHandler, &closeHandler](string payload)
+ {
+ this->onSubscriptionMessage(payload, eventHandler, eoseHandler, closeHandler);
+ });
+ });
+ requestFutures.push_back(move(requestFuture));
+ }
+
+ for (auto& publishFuture : requestFutures)
+ {
+ auto [relay, isSuccess] = publishFuture.get();
+ if (isSuccess)
+ {
+ successfulRelays.push_back(relay);
+ }
+ else
+ {
+ failedRelays.push_back(relay);
+ }
+ }
+
+ size_t targetCount = this->_activeRelays.size();
+ size_t successfulCount = successfulRelays.size();
+ PLOG_INFO << "Sent query to " << successfulCount << "/" << targetCount << " open relay connections.";
+
+ return subscriptionId;
+};
+
+tuple<vector<string>, vector<string>> NostrService::closeSubscription(string subscriptionId)
+{
+ vector<string> successfulRelays;
+ vector<string> failedRelays;
+
+ vector<string> subscriptionRelays;
+ size_t subscriptionRelayCount;
+ vector<future<tuple<string, bool>>> closeFutures;
+
+ try
+ {
+ unique_lock<mutex> lock(this->_propertyMutex);
+ subscriptionRelays = this->_subscriptions.at(subscriptionId);
+ subscriptionRelayCount = subscriptionRelays.size();
+ lock.unlock();
+ }
+ catch (const out_of_range& oor)
+ {
+ PLOG_WARNING << "Subscription " << subscriptionId << " not found.";
+ return make_tuple(successfulRelays, failedRelays);
+ }
+
+ for (const string relay : subscriptionRelays)
+ {
+ future<tuple<string, bool>> closeFuture = async([this, subscriptionId, relay]()
+ {
+ bool success = this->closeSubscription(subscriptionId, relay);
+
+ return make_tuple(relay, success);
+ });
+ closeFutures.push_back(move(closeFuture));
+ }
+
+ for (auto& closeFuture : closeFutures)
+ {
+ auto [uri, success] = closeFuture.get();
+ if (success)
+ {
+ successfulRelays.push_back(uri);
+ }
+ else
+ {
+ failedRelays.push_back(uri);
+ }
+ }
+
+ size_t successfulCount = successfulRelays.size();
+ PLOG_INFO << "Sent CLOSE request for subscription " << subscriptionId << " to " << successfulCount << "/" << subscriptionRelayCount << " open relay connections.";
+
+ // If there were no failures, and the subscription has been closed on all of its relays, forget
+ // about the subscription.
+ if (failedRelays.empty())
+ {
+ lock_guard<mutex> lock(this->_propertyMutex);
+ this->_subscriptions.erase(subscriptionId);
+ }
+
+ return make_tuple(successfulRelays, failedRelays);
+};
+
+bool NostrService::closeSubscription(string subscriptionId, string relay)
+{
+ if (!this->hasSubscription(subscriptionId, relay))
+ {
+ PLOG_WARNING << "Subscription " << subscriptionId << " not found on relay " << relay;
+ return false;
+ }
+
+ if (!this->isConnected(relay))
+ {
+ PLOG_WARNING << "Relay " << relay << " is not connected.";
+ return false;
+ }
+
+ string request = this->generateCloseRequest(subscriptionId);
+ auto [uri, success] = this->_client->send(request, relay);
+
+ if (success)
+ {
+ lock_guard<mutex> lock(this->_propertyMutex);
+ auto it = find(
+ this->_subscriptions[subscriptionId].begin(),
+ this->_subscriptions[subscriptionId].end(),
+ relay);
+
+ if (it != this->_subscriptions[subscriptionId].end())
+ {
+ this->_subscriptions[subscriptionId].erase(it);
+ }
+
+ PLOG_INFO << "Sent close request for subscription " << subscriptionId << " to relay " << relay;
+ }
+ else
+ {
+ PLOG_WARNING << "Failed to send close request to relay " << relay;
+ }
+
+ return success;
+};
+
+vector<string> NostrService::closeSubscriptions()
+{
+ unique_lock<mutex> lock(this->_propertyMutex);
+ vector<string> subscriptionIds;
+ for (auto& [subscriptionId, relays] : this->_subscriptions)
+ {
+ subscriptionIds.push_back(subscriptionId);
+ }
+ lock.unlock();
+
+ vector<string> remainingSubscriptions;
+ for (const string& subscriptionId : subscriptionIds)
+ {
+ auto [successes, failures] = this->closeSubscription(subscriptionId);
+ if (!failures.empty())
+ {
+ remainingSubscriptions.push_back(subscriptionId);
+ }
+ }
+
+ return remainingSubscriptions;
+};
+
+vector<string> NostrService::getConnectedRelays(vector<string> relays)
{
PLOG_VERBOSE << "Identifying connected relays.";
- RelayList connectedRelays;
+ vector<string> connectedRelays;
for (string relay : relays)
{
bool isActive = find(this->_activeRelays.begin(), this->_activeRelays.end(), relay)
@@ -169,10 +480,10 @@ RelayList NostrService::getConnectedRelays(RelayList relays)
return connectedRelays;
};
-RelayList NostrService::getUnconnectedRelays(RelayList relays)
+vector<string> NostrService::getUnconnectedRelays(vector<string> relays)
{
PLOG_VERBOSE << "Identifying unconnected relays.";
- RelayList unconnectedRelays;
+ vector<string> unconnectedRelays;
for (string relay : relays)
{
bool isActive = find(this->_activeRelays.begin(), this->_activeRelays.end(), relay)
@@ -245,4 +556,105 @@ void NostrService::disconnect(string relay)
lock_guard<mutex> lock(this->_propertyMutex);
this->eraseActiveRelay(relay);
};
+
+string NostrService::generateSubscriptionId()
+{
+ UUIDv4::UUIDGenerator<std::mt19937_64> uuidGenerator;
+ UUIDv4::UUID uuid = uuidGenerator.getUUID();
+ return uuid.str();
+};
+
+string NostrService::generateCloseRequest(string subscriptionId)
+{
+ json jarr = json::array({ "CLOSE", subscriptionId });
+ return jarr.dump();
+};
+
+bool NostrService::hasSubscription(string subscriptionId)
+{
+ lock_guard<mutex> lock(this->_propertyMutex);
+ auto it = this->_subscriptions.find(subscriptionId);
+
+ return it != this->_subscriptions.end();
+};
+
+bool NostrService::hasSubscription(string subscriptionId, string relay)
+{
+ lock_guard<mutex> lock(this->_propertyMutex);
+ auto subscriptionIt = this->_subscriptions.find(subscriptionId);
+
+ if (subscriptionIt == this->_subscriptions.end())
+ {
+ return false;
+ }
+
+ auto relays = this->_subscriptions[subscriptionId];
+ auto relayIt = find(relays.begin(), relays.end(), relay);
+
+ return relayIt != relays.end();
+};
+
+void NostrService::onSubscriptionMessage(
+ string message,
+ function<void(const string&, shared_ptr<Event>)> eventHandler,
+ function<void(const string&)> eoseHandler,
+ function<void(const string&, const string&)> closeHandler)
+{
+ try
+ {
+ json jMessage = json::parse(message);
+ string messageType = jMessage.at(0);
+ if (messageType == "EVENT")
+ {
+ string subscriptionId = jMessage.at(1);
+ Event event = Event::fromString(jMessage.at(2));
+ eventHandler(subscriptionId, make_shared<Event>(event));
+ }
+ else if (messageType == "EOSE")
+ {
+ string subscriptionId = jMessage.at(1);
+ eoseHandler(subscriptionId);
+ }
+ else if (messageType == "CLOSE")
+ {
+ string subscriptionId = jMessage.at(1);
+ string reason = jMessage.at(2);
+ closeHandler(subscriptionId, reason);
+ }
+ }
+ catch (const json::out_of_range& joor)
+ {
+ PLOG_ERROR << "JSON out-of-range exception: " << joor.what();
+ throw joor;
+ }
+ catch (const json::exception& je)
+ {
+ PLOG_ERROR << "JSON handling exception: " << je.what();
+ throw je;
+ }
+ catch (const invalid_argument& ia)
+ {
+ PLOG_ERROR << "Invalid argument exception: " << ia.what();
+ throw ia;
+ }
+};
+
+void NostrService::onAcceptance(string message, function<void(const bool)> acceptanceHandler)
+{
+ try
+ {
+ json jMessage = json::parse(message);
+ string messageType = jMessage[0];
+ if (messageType == "OK")
+ {
+ bool isAccepted = jMessage[2];
+ acceptanceHandler(isAccepted);
+ }
+ }
+ catch (const json::exception& je)
+ {
+ PLOG_ERROR << "JSON handling exception: " << je.what();
+ throw je;
+ }
+};
} // namespace nostr