aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLibravatar Michael Jurkoic <mjjurkoic@gmail.com>2024-03-18 20:00:17 -0500
committerLibravatar Michael Jurkoic <mjjurkoic@gmail.com>2024-03-18 20:00:17 -0500
commitaaba3db6976f9bb8e92ae7ff1075f9719f8936c1 (patch)
tree1f60c3778ca8da02205b93b4a1f50e305463b690
parent299a2567430dd96800d6b3ca81a3d198be4a18fd (diff)
Provide option to store events for async retrieval
-rw-r--r--include/nostr.hpp21
-rw-r--r--src/nostr_service.cpp56
2 files changed, 74 insertions, 3 deletions
diff --git a/include/nostr.hpp b/include/nostr.hpp
index 448ad64..1a4e33c 100644
--- a/include/nostr.hpp
+++ b/include/nostr.hpp
@@ -147,6 +147,8 @@ public:
* @brief Queries all open relay connections for events matching the given set of filters.
* @param filters The filters to use for the query.
* @returns The ID of the subscription created for the query.
+ * @remarks The service will store a limited number of events returned from the relay for the
+ * given filters. These events may be retrieved via `getNewEvents`.
*/
std::string queryRelays(Filters filters);
@@ -156,6 +158,9 @@ public:
* @param responseHandler A callable object that will be invoked each time the client receives
* an event matching the filters.
* @returns The ID of the subscription created for the query.
+ * @remark By providing a response handler, the caller assumes responsibility for handling all
+ * events returned from the relay for the given filters. The service will not store the
+ * events, and they will not be accessible via `getNewEvents`.
*/
std::string queryRelays(Filters filters, std::function<void(std::string, Event)> responseHandler);
@@ -164,14 +169,14 @@ public:
* subscriptions.
* @returns A pointer to a vector of new events.
*/
- std::unique_ptr<std::vector<Event>> getNewEvents();
+ std::vector<Event> getNewEvents();
/**
* @brief Get any new events received since the last call to this method, for the given
* subscription.
* @returns A pointer to a vector of new events.
*/
- std::unique_ptr<std::vector<Event>> getNewEvents(std::string subscriptionId);
+ std::vector<Event> getNewEvents(std::string subscriptionId);
/**
* @brief Closes the subscription with the given ID on all open relay connections.
@@ -198,12 +203,22 @@ public:
std::tuple<RelayList, RelayList> closeSubscriptions(RelayList relays);
private:
+ ///< The maximum number of events the service will store for each subscription.
+ const int MAX_EVENTS_PER_SUBSCRIPTION = 128;
+
+ ///< The WebSocket client used to communicate with relays.
client::IWebSocketClient* _client;
+ ///< A mutex to protect the instance properties.
std::mutex _propertyMutex;
+ ///< The default set of Nostr relays to which the service will attempt to connect.
RelayList _defaultRelays;
- RelayList _activeRelays;
+ ///< The set of Nostr relays to which the service is currently connected.
+ RelayList _activeRelays;
+ ///< A map from relay URIs to the subscription IDs open on each relay.
std::unordered_map<std::string, std::vector<std::string>> _subscriptions;
+ ///< A map from subscription IDs to the events returned by the relays for each subscription.
std::unordered_map<std::string, std::vector<Event>> _events;
+ ///< A map from the subscription IDs to the latest read event for each subscription.
std::unordered_map<std::string, std::vector<Event>::iterator> _eventIterators;
/**
diff --git a/src/nostr_service.cpp b/src/nostr_service.cpp
index 13d5ff5..50609b4 100644
--- a/src/nostr_service.cpp
+++ b/src/nostr_service.cpp
@@ -21,6 +21,7 @@ using std::lock_guard;
using std::make_tuple;
using std::move;
using std::mutex;
+using std::out_of_range;
using std::string;
using std::thread;
using std::tuple;
@@ -151,6 +152,7 @@ tuple<RelayList, RelayList> NostrService::publishEvent(Event event)
string NostrService::queryRelays(Filters filters)
{
return this->queryRelays(filters, [this](string subscriptionId, Event event) {
+ this->_eventIterators[subscriptionId] = this->_events[subscriptionId].begin();
this->onEvent(subscriptionId, event);
});
};
@@ -199,6 +201,46 @@ string NostrService::queryRelays(Filters filters, function<void(string, Event)>
return subscriptionId;
};
+vector<Event> NostrService::getNewEvents()
+{
+ vector<Event> newEvents;
+
+ for (auto& [subscriptionId, events] : this->_events)
+ {
+ vector<Event> subscriptionEvents = this->getNewEvents(subscriptionId);
+ newEvents.insert(newEvents.end(), subscriptionEvents.begin(), subscriptionEvents.end());
+ }
+
+ return newEvents;
+};
+
+vector<Event> NostrService::getNewEvents(string subscriptionId)
+{
+ if (this->_events.find(subscriptionId) == this->_events.end())
+ {
+ PLOG_ERROR << "No events found for subscription: " << subscriptionId;
+ throw out_of_range("No events found for subscription: " + subscriptionId);
+ }
+
+ if (this->_eventIterators.find(subscriptionId) == this->_eventIterators.end())
+ {
+ PLOG_ERROR << "No event iterator found for subscription: " << subscriptionId;
+ throw out_of_range("No event iterator found for subscription: " + subscriptionId);
+ }
+
+ vector<Event> newEvents;
+ vector<Event> receivedEvents = this->_events[subscriptionId];
+ vector<Event>::iterator eventIt = this->_eventIterators[subscriptionId];
+
+ while (eventIt != receivedEvents.end())
+ {
+ newEvents.push_back(move(*eventIt));
+ eventIt++;
+ }
+
+ return newEvents;
+};
+
tuple<RelayList, RelayList> NostrService::closeSubscription(string subscriptionId)
{
RelayList successfulRelays;
@@ -413,5 +455,19 @@ void NostrService::onEvent(string subscriptionId, Event event)
{
_events[subscriptionId].push_back(event);
PLOG_INFO << "Received event for subscription: " << subscriptionId;
+
+ // To protect memory, only keep a limited number of events per subscription.
+ while (_events[subscriptionId].size() > NostrService::MAX_EVENTS_PER_SUBSCRIPTION)
+ {
+ auto startIt = _events[subscriptionId].begin();
+ auto eventIt = _eventIterators[subscriptionId];
+
+ if (eventIt == startIt)
+ {
+ eventIt++;
+ }
+
+ _events[subscriptionId].erase(_events[subscriptionId].begin());
+ }
};
} // namespace nostr