From aab5fddb2f6521fbc0720eea45392d7c7361e42d Mon Sep 17 00:00:00 2001 From: elmato Date: Wed, 8 Mar 2017 01:50:01 +0000 Subject: [PATCH] Keep track of subscribed accounts and check for events that reference them --- libraries/app/database_api.cpp | 85 +++++++++++++++++++++------------- 1 file changed, 53 insertions(+), 32 deletions(-) diff --git a/libraries/app/database_api.cpp b/libraries/app/database_api.cpp index 3d464dfa..a09c5f76 100644 --- a/libraries/app/database_api.cpp +++ b/libraries/app/database_api.cpp @@ -166,6 +166,16 @@ class database_api_impl : public std::enable_shared_from_this return _subscribe_filter.contains( i ); } + bool is_impacted_account( const flat_set& accounts) + { + if( !_subscribed_accounts.size() || !accounts.size() ) + return false; + + return std::any_of(accounts.begin(), accounts.end(), [this](const account_id_type& account) { + return _subscribed_accounts.find(account) != _subscribed_accounts.end(); + }); + } + template void enqueue_if_subscribed_to_market(const object* obj, market_queue_type& queue, bool full_object=true) { @@ -182,16 +192,17 @@ class database_api_impl : public std::enable_shared_from_this void broadcast_updates( const vector& updates ); void broadcast_market_updates( const market_queue_type& queue); - void handle_object_changed(bool force_notify, const vector& ids, std::function find_object); + void handle_object_changed(bool force_notify, bool full_object, const vector& ids, const flat_set& impacted_accounts, std::function find_object); /** called every time a block is applied to report the objects that were changed */ - void on_objects_new(const vector& ids); - void on_objects_changed(const vector& ids); - void on_objects_removed(const vector& ids, const vector& objs); + void on_objects_new(const vector& ids, const flat_set& impacted_accounts); + void on_objects_changed(const vector& ids, const flat_set& impacted_accounts); + void on_objects_removed(const vector& ids, const vector& objs, const flat_set& impacted_accounts); void on_applied_block(); bool _notify_remove_create = false; mutable fc::bloom_filter _subscribe_filter; + std::set _subscribed_accounts; std::function _subscribe_callback; std::function _pending_trx_callback; std::function _block_applied_callback; @@ -219,14 +230,14 @@ database_api::~database_api() {} database_api_impl::database_api_impl( graphene::chain::database& db ):_db(db) { wlog("creating database api ${x}", ("x",int64_t(this)) ); - _new_connection = _db.new_objects.connect([this](const vector& ids) { - on_objects_new(ids); + _new_connection = _db.new_objects.connect([this](const vector& ids, const flat_set& impacted_accounts) { + on_objects_new(ids, impacted_accounts); }); - _change_connection = _db.changed_objects.connect([this](const vector& ids) { - on_objects_changed(ids); + _change_connection = _db.changed_objects.connect([this](const vector& ids, const flat_set& impacted_accounts) { + on_objects_changed(ids, impacted_accounts); }); - _removed_connection = _db.removed_objects.connect([this](const vector& ids, const vector& objs) { - on_objects_removed(ids, objs); + _removed_connection = _db.removed_objects.connect([this](const vector& ids, const vector& objs, const flat_set& impacted_accounts) { + on_objects_removed(ids, objs, impacted_accounts); }); _applied_block_connection = _db.applied_block.connect([this](const signed_block&){ on_applied_block(); }); @@ -292,6 +303,7 @@ void database_api_impl::set_subscribe_callback( std::function database_api_impl::get_full_accounts( const if( subscribe ) { + FC_ASSERT( std::distance(_subscribed_accounts.begin(), _subscribed_accounts.end()) < 100 ); + _subscribed_accounts.insert( account->get_id() ); subscribe_to_item( account->id ); } @@ -1849,36 +1863,37 @@ void database_api_impl::broadcast_market_updates( const market_queue_type& queue } } -void database_api_impl::on_objects_removed( const vector& ids, const vector& objs ) +void database_api_impl::on_objects_removed( const vector& ids, const vector& objs, const flat_set& impacted_accounts) { - handle_object_changed(_notify_remove_create, ids, [objs](object_id_type id) -> const object* { + handle_object_changed(_notify_remove_create, false, ids, impacted_accounts, + [objs](object_id_type id) -> const object* { + auto it = std::find_if( + objs.begin(), objs.end(), + [id](const object* o) {return o != nullptr && o->id == id;}); - auto it = std::find_if( - objs.begin(), objs.end(), - [id](const object* o) {return o != nullptr && o->id == id;}); + if (it != objs.end()) + return *it; - if (it != objs.end()) - return *it; - - return nullptr; - }); + return nullptr; + } + ); } -void database_api_impl::on_objects_new(const vector& ids) +void database_api_impl::on_objects_new(const vector& ids, const flat_set& impacted_accounts) { - handle_object_changed(_notify_remove_create, ids, + handle_object_changed(_notify_remove_create, true, ids, impacted_accounts, std::bind(&object_database::find_object, &_db, std::placeholders::_1) ); } -void database_api_impl::on_objects_changed(const vector& ids) +void database_api_impl::on_objects_changed(const vector& ids, const flat_set& impacted_accounts) { - handle_object_changed(false, ids, + handle_object_changed(false, true, ids, impacted_accounts, std::bind(&object_database::find_object, &_db, std::placeholders::_1) ); } -void database_api_impl::handle_object_changed(bool force_notify, const vector& ids, std::function find_object) +void database_api_impl::handle_object_changed(bool force_notify, bool full_object, const vector& ids, const flat_set& impacted_accounts, std::function find_object) { if( _subscribe_callback ) { @@ -1886,13 +1901,19 @@ void database_api_impl::handle_object_changed(bool force_notify, const vectorto_variant() ); + auto obj = find_object(id); + if( obj ) + { + updates.emplace_back( obj->to_variant() ); + } + } + else + { + updates.emplace_back( id ); } } } @@ -1908,11 +1929,11 @@ void database_api_impl::handle_object_changed(bool force_notify, const vector() ) { - enqueue_if_subscribed_to_market( find_object(id), broadcast_queue ); + enqueue_if_subscribed_to_market( find_object(id), broadcast_queue, full_object ); } else if( id.is() ) { - enqueue_if_subscribed_to_market( find_object(id), broadcast_queue ); + enqueue_if_subscribed_to_market( find_object(id), broadcast_queue, full_object ); } }