From aaba3db6976f9bb8e92ae7ff1075f9719f8936c1 Mon Sep 17 00:00:00 2001 From: Michael Jurkoic Date: Mon, 18 Mar 2024 20:00:17 -0500 Subject: Provide option to store events for async retrieval --- src/nostr_service.cpp | 56 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 56 insertions(+) (limited to 'src') diff --git a/src/nostr_service.cpp b/src/nostr_service.cpp index 13d5ff5..50609b4 100644 --- a/src/nostr_service.cpp +++ b/src/nostr_service.cpp @@ -21,6 +21,7 @@ using std::lock_guard; using std::make_tuple; using std::move; using std::mutex; +using std::out_of_range; using std::string; using std::thread; using std::tuple; @@ -151,6 +152,7 @@ tuple NostrService::publishEvent(Event event) string NostrService::queryRelays(Filters filters) { return this->queryRelays(filters, [this](string subscriptionId, Event event) { + this->_eventIterators[subscriptionId] = this->_events[subscriptionId].begin(); this->onEvent(subscriptionId, event); }); }; @@ -199,6 +201,46 @@ string NostrService::queryRelays(Filters filters, function return subscriptionId; }; +vector NostrService::getNewEvents() +{ + vector newEvents; + + for (auto& [subscriptionId, events] : this->_events) + { + vector subscriptionEvents = this->getNewEvents(subscriptionId); + newEvents.insert(newEvents.end(), subscriptionEvents.begin(), subscriptionEvents.end()); + } + + return newEvents; +}; + +vector NostrService::getNewEvents(string subscriptionId) +{ + if (this->_events.find(subscriptionId) == this->_events.end()) + { + PLOG_ERROR << "No events found for subscription: " << subscriptionId; + throw out_of_range("No events found for subscription: " + subscriptionId); + } + + if (this->_eventIterators.find(subscriptionId) == this->_eventIterators.end()) + { + PLOG_ERROR << "No event iterator found for subscription: " << subscriptionId; + throw out_of_range("No event iterator found for subscription: " + subscriptionId); + } + + vector newEvents; + vector receivedEvents = this->_events[subscriptionId]; + vector::iterator eventIt = this->_eventIterators[subscriptionId]; + + while (eventIt != receivedEvents.end()) + { + newEvents.push_back(move(*eventIt)); + eventIt++; + } + + return newEvents; +}; + tuple NostrService::closeSubscription(string subscriptionId) { RelayList successfulRelays; @@ -413,5 +455,19 @@ void NostrService::onEvent(string subscriptionId, Event event) { _events[subscriptionId].push_back(event); PLOG_INFO << "Received event for subscription: " << subscriptionId; + + // To protect memory, only keep a limited number of events per subscription. + while (_events[subscriptionId].size() > NostrService::MAX_EVENTS_PER_SUBSCRIPTION) + { + auto startIt = _events[subscriptionId].begin(); + auto eventIt = _eventIterators[subscriptionId]; + + if (eventIt == startIt) + { + eventIt++; + } + + _events[subscriptionId].erase(_events[subscriptionId].begin()); + } }; } // namespace nostr -- cgit