From 0d87b4053983ec8edaff5b73491b717866876586 Mon Sep 17 00:00:00 2001 From: Michael J <37635304+buttercat1791@users.noreply.github.com> Date: Sun, 3 Mar 2024 11:56:37 -0600 Subject: Create Nostr Service and Add Write Capabilities (#1) --- src/client/websocketpp_client.cpp | 107 ++++++++++++++++ src/event.cpp | 38 ++++++ src/nostr_service.cpp | 248 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 393 insertions(+) create mode 100644 src/client/websocketpp_client.cpp create mode 100644 src/event.cpp create mode 100644 src/nostr_service.cpp (limited to 'src') diff --git a/src/client/websocketpp_client.cpp b/src/client/websocketpp_client.cpp new file mode 100644 index 0000000..1386e1a --- /dev/null +++ b/src/client/websocketpp_client.cpp @@ -0,0 +1,107 @@ +#include +#include + +#include "web_socket_client.hpp" + +using std::error_code; +using std::lock_guard; +using std::make_tuple; +using std::mutex; +using std::string; +using std::tuple; +using std::unordered_map; + +namespace client +{ +/** + * @brief An implementation of the `IWebSocketClient` interface that uses the WebSocket++ library. + */ +class WebsocketppClient : public IWebSocketClient +{ +public: + void start() override + { + this->_client.init_asio(); + this->_client.start_perpetual(); + }; + + void stop() override + { + this->_client.stop_perpetual(); + this->_client.stop(); + }; + + void openConnection(string uri) override + { + error_code error; + websocketpp_client::connection_ptr connection = this->_client.get_connection(uri, error); + + if (error.value() == -1) + { + // PLOG_ERROR << "Error connecting to relay " << relay << ": " << error.message(); + } + + // Configure the connection here via the connection pointer. + connection->set_fail_handler([this, uri](auto handle) { + // PLOG_ERROR << "Error connecting to relay " << relay << ": Handshake failed."; + lock_guard lock(this->_propertyMutex); + if (this->isConnected(uri)) + { + this->_connectionHandles.erase(uri); + } + }); + + lock_guard lock(this->_propertyMutex); + this->_connectionHandles[uri] = connection->get_handle(); + this->_client.connect(connection); + }; + + bool isConnected(string uri) override + { + lock_guard lock(this->_propertyMutex); + return this->_connectionHandles.find(uri) != this->_connectionHandles.end(); + }; + + tuple send(string message, string uri) override + { + error_code error; + + // Make sure the connection isn't closed from under us. + lock_guard lock(this->_propertyMutex); + this->_client.send( + this->_connectionHandles[uri], + message, + websocketpp::frame::opcode::text, + error); + + if (error.value() == -1) + { + // PLOG_ERROR << "Error publishing event to relay " << relay << ": " << error.message(); + return make_tuple(uri, false); + } + + return make_tuple(uri, true); + }; + + void closeConnection(string uri) override + { + lock_guard lock(this->_propertyMutex); + + websocketpp::connection_hdl handle = this->_connectionHandles[uri]; + this->_client.close( + handle, + websocketpp::close::status::going_away, + "_client requested close."); + + this->_connectionHandles.erase(uri); + }; + +private: + typedef websocketpp::client websocketpp_client; + typedef unordered_map::iterator connection_hdl_iterator; + + websocketpp_client _client; + unordered_map _connectionHandles; + mutex _propertyMutex; +}; +} // namespace client diff --git a/src/event.cpp b/src/event.cpp new file mode 100644 index 0000000..75f2ee8 --- /dev/null +++ b/src/event.cpp @@ -0,0 +1,38 @@ +#pragma once + +#include +#include +#include + +#include "nostr.hpp" + +using std::string; + +namespace nostr +{ + nlohmann::json Event::serialize() const + { + nlohmann::json j = { + {"id", this->id}, + {"pubkey", this->pubkey}, + {"created_at", this->created_at}, + {"kind", this->kind}, + {"tags", this->tags}, + {"content", this->content}, + {"sig", this->sig} + }; + return j.dump(); + }; + + void Event::deserialize(string jsonString) + { + nlohmann::json j = nlohmann::json::parse(jsonString); + this->id = j["id"]; + this->pubkey = j["pubkey"]; + this->created_at = j["created_at"]; + this->kind = j["kind"]; + this->tags = j["tags"]; + this->content = j["content"]; + this->sig = j["sig"]; + }; +} diff --git a/src/nostr_service.cpp b/src/nostr_service.cpp new file mode 100644 index 0000000..09be6e3 --- /dev/null +++ b/src/nostr_service.cpp @@ -0,0 +1,248 @@ +#include +#include +#include +#include + +#include "nostr.hpp" +#include "client/web_socket_client.hpp" + +using std::async; +using std::future; +using std::lock_guard; +using std::make_tuple; +using std::move; +using std::mutex; +using std::string; +using std::thread; +using std::tuple; +using std::vector; + +namespace nostr +{ +NostrService::NostrService(plog::IAppender* appender, client::IWebSocketClient* client) + : NostrService(appender, client, {}) { }; + +NostrService::NostrService(plog::IAppender* appender, client::IWebSocketClient* client, RelayList relays) + : _defaultRelays(relays), _client(client) +{ + plog::init(plog::debug, appender); + client->start(); +}; + +NostrService::~NostrService() +{ + this->_client->stop(); + delete this->_client; +}; + +RelayList NostrService::defaultRelays() const { return this->_defaultRelays; }; + +RelayList NostrService::activeRelays() const { return this->_activeRelays; }; + +RelayList NostrService::openRelayConnections() +{ + return this->openRelayConnections(this->_defaultRelays); +}; + +RelayList NostrService::openRelayConnections(RelayList relays) +{ + PLOG_INFO << "Attempting to connect to Nostr relays."; + RelayList unconnectedRelays = this->getUnconnectedRelays(relays); + + vector connectionThreads; + for (string relay : unconnectedRelays) + { + thread connectionThread([this, relay]() { + this->connect(relay); + }); + connectionThreads.push_back(move(connectionThread)); + } + + for (thread& connectionThread : connectionThreads) + { + connectionThread.join(); + } + + size_t targetCount = relays.size(); + size_t activeCount = this->_activeRelays.size(); + PLOG_INFO << "Connected to " << activeCount << "/" << targetCount << " target relays."; + + // This property should only contain successful relays at this point. + return this->_activeRelays; +}; + +void NostrService::closeRelayConnections() +{ + if (this->_activeRelays.size() == 0) + { + PLOG_INFO << "No active relay connections to close."; + return; + } + + this->closeRelayConnections(this->_activeRelays); +}; + +void NostrService::closeRelayConnections(RelayList relays) +{ + PLOG_INFO << "Disconnecting from Nostr relays."; + RelayList connectedRelays = getConnectedRelays(relays); + + vector disconnectionThreads; + for (string relay : connectedRelays) + { + thread disconnectionThread([this, relay]() { + this->disconnect(relay); + }); + disconnectionThreads.push_back(move(disconnectionThread)); + } + + for (thread& disconnectionThread : disconnectionThreads) + { + disconnectionThread.join(); + } +}; + +tuple NostrService::publishEvent(Event event) +{ + // TODO: Add validation function. + + RelayList successfulRelays; + RelayList failedRelays; + + PLOG_INFO << "Attempting to publish event to Nostr relays."; + + vector>> publishFutures; + for (string relay : this->_activeRelays) + { + future> publishFuture = async([this, relay, event]() { + return this->_client->send(event.serialize(), relay); + }); + + publishFutures.push_back(move(publishFuture)); + } + + for (auto& publishFuture : publishFutures) + { + auto [relay, isSuccess] = publishFuture.get(); + if (isSuccess) + { + successfulRelays.push_back(relay); + } + else + { + failedRelays.push_back(relay); + } + } + + size_t targetCount = this->_activeRelays.size(); + size_t successfulCount = successfulRelays.size(); + PLOG_INFO << "Published event to " << successfulCount << "/" << targetCount << " target relays."; + + return make_tuple(successfulRelays, failedRelays); +}; + +RelayList NostrService::getConnectedRelays(RelayList relays) +{ + PLOG_VERBOSE << "Identifying connected relays."; + RelayList connectedRelays; + for (string relay : relays) + { + bool isActive = find(this->_activeRelays.begin(), this->_activeRelays.end(), relay) + != this->_activeRelays.end(); + bool isConnected = this->_client->isConnected(relay); + PLOG_VERBOSE << "Relay " << relay << " is active: " << isActive << ", is connected: " << isConnected; + + if (isActive && isConnected) + { + connectedRelays.push_back(relay); + } + else if (isActive && !isConnected) + { + this->eraseActiveRelay(relay); + } + else if (!isActive && isConnected) + { + this->_activeRelays.push_back(relay); + connectedRelays.push_back(relay); + } + } + return connectedRelays; +}; + +RelayList NostrService::getUnconnectedRelays(RelayList relays) +{ + PLOG_VERBOSE << "Identifying unconnected relays."; + RelayList unconnectedRelays; + for (string relay : relays) + { + bool isActive = find(this->_activeRelays.begin(), this->_activeRelays.end(), relay) + != this->_activeRelays.end(); + bool isConnected = this->_client->isConnected(relay); + PLOG_VERBOSE << "Relay " << relay << " is active: " << isActive << ", is connected: " << isConnected; + + if (!isActive && !isConnected) + { + PLOG_VERBOSE << "Relay " << relay << " is not active and not connected."; + unconnectedRelays.push_back(relay); + } + else if (isActive && !isConnected) + { + PLOG_VERBOSE << "Relay " << relay << " is active but not connected. Removing from active relays list."; + this->eraseActiveRelay(relay); + unconnectedRelays.push_back(relay); + } + else if (!isActive && isConnected) + { + PLOG_VERBOSE << "Relay " << relay << " is connected but not active. Adding to active relays list."; + this->_activeRelays.push_back(relay); + } + } + return unconnectedRelays; +}; + +bool NostrService::isConnected(string relay) +{ + auto it = find(this->_activeRelays.begin(), this->_activeRelays.end(), relay); + if (it != this->_activeRelays.end()) // If the relay is in this->_activeRelays + { + return true; + } + return false; +}; + +void NostrService::eraseActiveRelay(string relay) +{ + auto it = find(this->_activeRelays.begin(), this->_activeRelays.end(), relay); + if (it != this->_activeRelays.end()) // If the relay is in this->_activeRelays + { + this->_activeRelays.erase(it); + } +}; + +void NostrService::connect(string relay) +{ + PLOG_VERBOSE << "Connecting to relay " << relay; + this->_client->openConnection(relay); + + lock_guard lock(this->_propertyMutex); + bool isConnected = this->_client->isConnected(relay); + + if (isConnected) + { + PLOG_VERBOSE << "Connected to relay " << relay << ": " << isConnected; + this->_activeRelays.push_back(relay); + } + else + { + PLOG_ERROR << "Failed to connect to relay " << relay; + } +}; + +void NostrService::disconnect(string relay) +{ + this->_client->closeConnection(relay); + + lock_guard lock(this->_propertyMutex); + this->eraseActiveRelay(relay); +}; +} // namespace nostr -- cgit