diff options
-rw-r--r-- | include/nostr.hpp | 21 | ||||
-rw-r--r-- | src/nostr_service.cpp | 56 |
2 files changed, 74 insertions, 3 deletions
diff --git a/include/nostr.hpp b/include/nostr.hpp index 448ad64..1a4e33c 100644 --- a/include/nostr.hpp +++ b/include/nostr.hpp @@ -147,6 +147,8 @@ public: * @brief Queries all open relay connections for events matching the given set of filters. * @param filters The filters to use for the query. * @returns The ID of the subscription created for the query. + * @remarks The service will store a limited number of events returned from the relay for the + * given filters. These events may be retrieved via `getNewEvents`. */ std::string queryRelays(Filters filters); @@ -156,6 +158,9 @@ public: * @param responseHandler A callable object that will be invoked each time the client receives * an event matching the filters. * @returns The ID of the subscription created for the query. + * @remark By providing a response handler, the caller assumes responsibility for handling all + * events returned from the relay for the given filters. The service will not store the + * events, and they will not be accessible via `getNewEvents`. */ std::string queryRelays(Filters filters, std::function<void(std::string, Event)> responseHandler); @@ -164,14 +169,14 @@ public: * subscriptions. * @returns A pointer to a vector of new events. */ - std::unique_ptr<std::vector<Event>> getNewEvents(); + std::vector<Event> getNewEvents(); /** * @brief Get any new events received since the last call to this method, for the given * subscription. * @returns A pointer to a vector of new events. */ - std::unique_ptr<std::vector<Event>> getNewEvents(std::string subscriptionId); + std::vector<Event> getNewEvents(std::string subscriptionId); /** * @brief Closes the subscription with the given ID on all open relay connections. @@ -198,12 +203,22 @@ public: std::tuple<RelayList, RelayList> closeSubscriptions(RelayList relays); private: + ///< The maximum number of events the service will store for each subscription. + const int MAX_EVENTS_PER_SUBSCRIPTION = 128; + + ///< The WebSocket client used to communicate with relays. client::IWebSocketClient* _client; + ///< A mutex to protect the instance properties. std::mutex _propertyMutex; + ///< The default set of Nostr relays to which the service will attempt to connect. RelayList _defaultRelays; - RelayList _activeRelays; + ///< The set of Nostr relays to which the service is currently connected. + RelayList _activeRelays; + ///< A map from relay URIs to the subscription IDs open on each relay. std::unordered_map<std::string, std::vector<std::string>> _subscriptions; + ///< A map from subscription IDs to the events returned by the relays for each subscription. std::unordered_map<std::string, std::vector<Event>> _events; + ///< A map from the subscription IDs to the latest read event for each subscription. std::unordered_map<std::string, std::vector<Event>::iterator> _eventIterators; /** diff --git a/src/nostr_service.cpp b/src/nostr_service.cpp index 13d5ff5..50609b4 100644 --- a/src/nostr_service.cpp +++ b/src/nostr_service.cpp @@ -21,6 +21,7 @@ using std::lock_guard; using std::make_tuple; using std::move; using std::mutex; +using std::out_of_range; using std::string; using std::thread; using std::tuple; @@ -151,6 +152,7 @@ tuple<RelayList, RelayList> NostrService::publishEvent(Event event) string NostrService::queryRelays(Filters filters) { return this->queryRelays(filters, [this](string subscriptionId, Event event) { + this->_eventIterators[subscriptionId] = this->_events[subscriptionId].begin(); this->onEvent(subscriptionId, event); }); }; @@ -199,6 +201,46 @@ string NostrService::queryRelays(Filters filters, function<void(string, Event)> return subscriptionId; }; +vector<Event> NostrService::getNewEvents() +{ + vector<Event> newEvents; + + for (auto& [subscriptionId, events] : this->_events) + { + vector<Event> subscriptionEvents = this->getNewEvents(subscriptionId); + newEvents.insert(newEvents.end(), subscriptionEvents.begin(), subscriptionEvents.end()); + } + + return newEvents; +}; + +vector<Event> NostrService::getNewEvents(string subscriptionId) +{ + if (this->_events.find(subscriptionId) == this->_events.end()) + { + PLOG_ERROR << "No events found for subscription: " << subscriptionId; + throw out_of_range("No events found for subscription: " + subscriptionId); + } + + if (this->_eventIterators.find(subscriptionId) == this->_eventIterators.end()) + { + PLOG_ERROR << "No event iterator found for subscription: " << subscriptionId; + throw out_of_range("No event iterator found for subscription: " + subscriptionId); + } + + vector<Event> newEvents; + vector<Event> receivedEvents = this->_events[subscriptionId]; + vector<Event>::iterator eventIt = this->_eventIterators[subscriptionId]; + + while (eventIt != receivedEvents.end()) + { + newEvents.push_back(move(*eventIt)); + eventIt++; + } + + return newEvents; +}; + tuple<RelayList, RelayList> NostrService::closeSubscription(string subscriptionId) { RelayList successfulRelays; @@ -413,5 +455,19 @@ void NostrService::onEvent(string subscriptionId, Event event) { _events[subscriptionId].push_back(event); PLOG_INFO << "Received event for subscription: " << subscriptionId; + + // To protect memory, only keep a limited number of events per subscription. + while (_events[subscriptionId].size() > NostrService::MAX_EVENTS_PER_SUBSCRIPTION) + { + auto startIt = _events[subscriptionId].begin(); + auto eventIt = _eventIterators[subscriptionId]; + + if (eventIt == startIt) + { + eventIt++; + } + + _events[subscriptionId].erase(_events[subscriptionId].begin()); + } }; } // namespace nostr |