diff --git a/libraries/app/api.cpp b/libraries/app/api.cpp index fc8baa19..f16c91d4 100644 --- a/libraries/app/api.cpp +++ b/libraries/app/api.cpp @@ -21,6 +21,9 @@ #include #include #include +#include +#include +#include #include #include @@ -32,6 +35,9 @@ namespace graphene { namespace app { _change_connection = _db.changed_objects.connect([this](const vector& ids) { on_objects_changed(ids); }); + _removed_connection = _db.removed_objects.connect([this](const vector& objs) { + on_objects_removed(objs); + }); _applied_block_connection = _db.applied_block.connect([this](const signed_block&){ on_applied_block(); }); } @@ -158,8 +164,8 @@ namespace graphene { namespace app { std::map database_api::get_full_accounts(std::function callback, const vector& names_or_ids) { + FC_ASSERT( _account_subscriptions.size() < 1024 ); std::map results; - std::set ids_to_subscribe; for (const std::string& account_name_or_id : names_or_ids) { @@ -176,7 +182,7 @@ namespace graphene { namespace app { if (account == nullptr) continue; - ids_to_subscribe.insert({account->id, account->statistics}); + _account_subscriptions[account->id] = callback; // fc::mutable_variant_object full_account; full_account acnt; @@ -194,7 +200,6 @@ namespace graphene { namespace app { */ if (account->cashback_vb) { - ids_to_subscribe.insert(*account->cashback_vb); acnt.cashback_balance = account->cashback_balance(_db); } @@ -202,36 +207,31 @@ namespace graphene { namespace app { auto balance_range = _db.get_index_type().indices().get().equal_range(account->id); //vector balances; std::for_each(balance_range.first, balance_range.second, - [&acnt, &ids_to_subscribe](const account_balance_object& balance) { + [&acnt](const account_balance_object& balance) { acnt.balances.emplace_back(balance); - ids_to_subscribe.insert(balance.id); }); // Add the account's vesting balances auto vesting_range = _db.get_index_type().indices().get().equal_range(account->id); std::for_each(vesting_range.first, vesting_range.second, - [&acnt, &ids_to_subscribe](const vesting_balance_object& balance) { + [&acnt](const vesting_balance_object& balance) { acnt.vesting_balances.emplace_back(balance); - ids_to_subscribe.insert(balance.id); }); // Add the account's orders auto order_range = _db.get_index_type().indices().get().equal_range(account->id); std::for_each(order_range.first, order_range.second, - [&acnt, &ids_to_subscribe] (const limit_order_object& order) { + [&acnt] (const limit_order_object& order) { acnt.limit_orders.emplace_back(order); - ids_to_subscribe.insert(order.id); }); auto call_range = _db.get_index_type().indices().get().equal_range(account->id); std::for_each(call_range.first, call_range.second, - [&acnt, &ids_to_subscribe] (const call_order_object& call) { + [&acnt] (const call_order_object& call) { acnt.call_orders.emplace_back(call); - ids_to_subscribe.insert(call.id); }); results[account_name_or_id] = acnt; } wdump((results)); - subscribe_to_objects(callback, vector(ids_to_subscribe.begin(), ids_to_subscribe.end())); return results; } @@ -554,24 +554,205 @@ namespace graphene { namespace app { return *_history_api; } + vector get_relevant_accounts( const object* obj ) + { + vector result; + if( obj->id.space() == protocol_ids ) + { + switch( (object_type)obj->id.type() ) + { + case null_object_type: + case base_object_type: + case OBJECT_TYPE_COUNT: + return result; + case account_object_type:{ + result.push_back( obj->id ); + break; + } case asset_object_type:{ + const auto& aobj = dynamic_cast(obj); + assert( aobj != nullptr ); + result.push_back( aobj->issuer ); + break; + } case force_settlement_object_type:{ + const auto& aobj = dynamic_cast(obj); + assert( aobj != nullptr ); + result.push_back( aobj->owner ); + break; + } case committee_member_object_type:{ + const auto& aobj = dynamic_cast(obj); + assert( aobj != nullptr ); + result.push_back( aobj->committee_member_account ); + break; + } case witness_object_type:{ + const auto& aobj = dynamic_cast(obj); + assert( aobj != nullptr ); + result.push_back( aobj->witness_account ); + break; + } case limit_order_object_type:{ + const auto& aobj = dynamic_cast(obj); + assert( aobj != nullptr ); + result.push_back( aobj->seller ); + break; + } case call_order_object_type:{ + const auto& aobj = dynamic_cast(obj); + assert( aobj != nullptr ); + result.push_back( aobj->borrower ); + break; + } case custom_object_type:{ + } case proposal_object_type:{ + const auto& aobj = dynamic_cast(obj); + assert( aobj != nullptr ); + flat_set impacted; + aobj->proposed_transaction.get_impacted_accounts( impacted ); + result.reserve( impacted.size() ); + for( auto& item : impacted ) result.emplace_back(item); + break; + } case operation_history_object_type:{ + const auto& aobj = dynamic_cast(obj); + assert( aobj != nullptr ); + flat_set impacted; + operation_get_impacted_accounts( aobj->op, impacted ); + result.reserve( impacted.size() ); + for( auto& item : impacted ) result.emplace_back(item); + break; + } case withdraw_permission_object_type:{ + const auto& aobj = dynamic_cast(obj); + assert( aobj != nullptr ); + result.push_back( aobj->withdraw_from_account ); + result.push_back( aobj->authorized_account ); + break; + } case vesting_balance_object_type:{ + const auto& aobj = dynamic_cast(obj); + assert( aobj != nullptr ); + result.push_back( aobj->owner ); + break; + } case worker_object_type:{ + const auto& aobj = dynamic_cast(obj); + assert( aobj != nullptr ); + result.push_back( aobj->worker_account ); + break; + } case balance_object_type:{ + /** these are free from any accounts */ + } + } + } + else if( obj->id.space() == implementation_ids ) + { + switch( (impl_object_type)obj->id.type() ) + { + case impl_global_property_object_type:{ + } case impl_dynamic_global_property_object_type:{ + } case impl_index_meta_object_type:{ + } case impl_asset_dynamic_data_type:{ + } case impl_asset_bitasset_data_type:{ + break; + } case impl_account_balance_object_type:{ + const auto& aobj = dynamic_cast(obj); + assert( aobj != nullptr ); + result.push_back( aobj->owner ); + break; + } case impl_account_statistics_object_type:{ + const auto& aobj = dynamic_cast(obj); + assert( aobj != nullptr ); + result.push_back( aobj->owner ); + break; + } case impl_transaction_object_type:{ + const auto& aobj = dynamic_cast(obj); + assert( aobj != nullptr ); + flat_set impacted; + aobj->trx.get_impacted_accounts( impacted ); + result.reserve( impacted.size() ); + for( auto& item : impacted ) result.emplace_back(item); + break; + } case impl_block_summary_object_type:{ + } case impl_account_transaction_history_object_type:{ + } case impl_witness_schedule_object_type: { + } + } + } + return result; + } // end get_relevant_accounts( obj ) + + + void database_api::on_objects_removed( const vector& objs ) + { + if( _account_subscriptions.size() ) + { + map > broadcast_queue; + for( const auto& obj : objs ) + { + auto relevant = get_relevant_accounts( obj ); + for( const auto& r : relevant ) + { + auto sub = _account_subscriptions.find(r); + if( sub != _account_subscriptions.end() ) + broadcast_queue[r].emplace_back(obj->to_variant()); + } + } + + _broadcast_removed_complete = fc::async([=](){ + for( const auto& item : broadcast_queue ) + { + auto sub = _account_subscriptions.find(item.first); + if( sub != _account_subscriptions.end() ) + sub->second( fc::variant(item.second ) ); + } + }); + } + } + void database_api::on_objects_changed(const vector& ids) { - vector my_objects; + vector my_objects; + map > broadcast_queue; for(auto id : ids) + { if(_subscriptions.find(id) != _subscriptions.end()) my_objects.push_back(id); + if( _account_subscriptions.size() ) + { + const object* obj = _db.find_object( id ); + if( obj ) + { + vector relevant = get_relevant_accounts( obj ); + for( const auto& r : relevant ) + { + auto sub = _account_subscriptions.find(r); + if( sub != _account_subscriptions.end() ) + broadcast_queue[r].emplace_back(obj->to_variant()); + } + } + } + } + + + /// TODO: consider making _broadcast_changes_complete a deque and + /// pushing the future back / popping the prior future if it is complete. + /// if a connection hangs then this could get backed up and result in + /// a failure to exit cleanly. _broadcast_changes_complete = fc::async([=](){ + for( const auto& item : broadcast_queue ) + { + auto sub = _account_subscriptions.find(item.first); + if( sub != _account_subscriptions.end() ) + sub->second( fc::variant(item.second ) ); + } for(auto id : my_objects) { - const object* obj = _db.find_object(id); - if(obj) + // just incase _usbscriptions changed between filter and broadcast + auto itr = _subscriptions.find( id ); + if( itr != _subscriptions.end() ) { - _subscriptions[id](obj->to_variant()); - } - else - { - _subscriptions[id](fc::variant(id)); + const object* obj = _db.find_object( id ); + if( obj != nullptr ) + { + itr->second(obj->to_variant()); + } + else + { + itr->second(fc::variant(id)); + } } } }); @@ -624,6 +805,11 @@ namespace graphene { namespace app { _broadcast_changes_complete.cancel(); _broadcast_changes_complete.wait(); } + if(_broadcast_removed_complete.valid()) + { + _broadcast_removed_complete.cancel(); + _broadcast_removed_complete.wait(); + } } catch (const fc::exception& e) { wlog("${e}", ("e",e.to_detail_string())); @@ -632,6 +818,7 @@ namespace graphene { namespace app { bool database_api::subscribe_to_objects( const std::function& callback, const vector& ids) { + FC_ASSERT( _subscriptions.size() < 1024 ); for(auto id : ids) _subscriptions[id] = callback; return true; } diff --git a/libraries/app/include/graphene/app/api.hpp b/libraries/app/include/graphene/app/api.hpp index 99d707c9..42be161c 100644 --- a/libraries/app/include/graphene/app/api.hpp +++ b/libraries/app/include/graphene/app/api.hpp @@ -318,12 +318,16 @@ namespace graphene { namespace app { private: /** called every time a block is applied to report the objects that were changed */ void on_objects_changed(const vector& ids); + void on_objects_removed(const vector& objs); void on_applied_block(); fc::future _broadcast_changes_complete; + fc::future _broadcast_removed_complete; boost::signals2::scoped_connection _change_connection; + boost::signals2::scoped_connection _removed_connection; boost::signals2::scoped_connection _applied_block_connection; map > _subscriptions; + map > _account_subscriptions; map< pair, std::function > _market_subscriptions; graphene::chain::database& _db; }; diff --git a/libraries/chain/db_block.cpp b/libraries/chain/db_block.cpp index dc27e290..2afb64e3 100644 --- a/libraries/chain/db_block.cpp +++ b/libraries/chain/db_block.cpp @@ -443,6 +443,14 @@ void database::notify_changed_objects() const auto& head_undo = _undo_db.head(); vector changed_ids; changed_ids.reserve(head_undo.old_values.size()); for( const auto& item : head_undo.old_values ) changed_ids.push_back(item.first); + for( const auto& item : head_undo.new_ids ) changed_ids.push_back(item); + vector removed; + removed.reserve( head_undo.removed.size() ); + for( const auto& item : head_undo.removed ) + { + changed_ids.push_back( item.first ); + removed.emplace_back( item.second.get() ); + } changed_objects(changed_ids); } diff --git a/libraries/chain/include/graphene/chain/account_object.hpp b/libraries/chain/include/graphene/chain/account_object.hpp index f8f1d08e..cb4f61c7 100644 --- a/libraries/chain/include/graphene/chain/account_object.hpp +++ b/libraries/chain/include/graphene/chain/account_object.hpp @@ -38,6 +38,8 @@ namespace graphene { namespace chain { static const uint8_t space_id = implementation_ids; static const uint8_t type_id = impl_account_statistics_object_type; + account_id_type owner; + /** * Keep the most recent operation as a root pointer to a linked list of the transaction history. This field is * not required by core validation and could in theory be made an annotation on the account object, but @@ -335,6 +337,7 @@ FC_REFLECT_DERIVED( graphene::chain::meta_account_object, (memo_key)(committee_member_id) ) FC_REFLECT_DERIVED( graphene::chain::account_statistics_object, (graphene::chain::object), + (owner) (most_recent_op) (total_core_in_orders) (lifetime_fees_paid) diff --git a/libraries/chain/include/graphene/chain/database.hpp b/libraries/chain/include/graphene/chain/database.hpp index f89346a4..cb68f9f0 100644 --- a/libraries/chain/include/graphene/chain/database.hpp +++ b/libraries/chain/include/graphene/chain/database.hpp @@ -198,6 +198,11 @@ namespace graphene { namespace chain { */ fc::signal&)> changed_objects; + /** this signal is emitted any time an object is removed and contains a + * pointer to the last value of every object that was removed. + */ + fc::signal&)> removed_objects; + //////////////////// db_witness_schedule.cpp //////////////////// /** diff --git a/libraries/chain/include/graphene/chain/protocol/transaction.hpp b/libraries/chain/include/graphene/chain/protocol/transaction.hpp index 4ada2b34..b3896218 100644 --- a/libraries/chain/include/graphene/chain/protocol/transaction.hpp +++ b/libraries/chain/include/graphene/chain/protocol/transaction.hpp @@ -100,6 +100,7 @@ namespace graphene { namespace chain { } void get_required_authorities( flat_set& active, flat_set& owner, vector& other )const; + void get_impacted_accounts( flat_set& )const; }; /** diff --git a/libraries/chain/include/graphene/chain/protocol/types.hpp b/libraries/chain/include/graphene/chain/protocol/types.hpp index 64d49325..f78b93cf 100644 --- a/libraries/chain/include/graphene/chain/protocol/types.hpp +++ b/libraries/chain/include/graphene/chain/protocol/types.hpp @@ -133,10 +133,8 @@ namespace graphene { namespace chain { impl_index_meta_object_type, impl_asset_dynamic_data_type, impl_asset_bitasset_data_type, - impl_committee_member_feeds_object_type, impl_account_balance_object_type, impl_account_statistics_object_type, - impl_account_debt_object_type, impl_transaction_object_type, impl_block_summary_object_type, impl_account_transaction_history_object_type, @@ -193,7 +191,6 @@ namespace graphene { namespace chain { class asset_bitasset_data_object; class account_balance_object; class account_statistics_object; - class account_debt_object; class transaction_object; class block_summary_object; class account_transaction_history_object; @@ -204,7 +201,6 @@ namespace graphene { namespace chain { typedef object_id< implementation_ids, impl_asset_bitasset_data_type, asset_bitasset_data_object> asset_bitasset_data_id_type; typedef object_id< implementation_ids, impl_account_balance_object_type, account_balance_object> account_balance_id_type; typedef object_id< implementation_ids, impl_account_statistics_object_type,account_statistics_object> account_statistics_id_type; - typedef object_id< implementation_ids, impl_account_debt_object_type, account_debt_object> account_debt_id_type; typedef object_id< implementation_ids, impl_transaction_object_type, transaction_object> transaction_obj_id_type; typedef object_id< implementation_ids, impl_block_summary_object_type, block_summary_object> block_summary_id_type; @@ -384,10 +380,8 @@ FC_REFLECT_ENUM( graphene::chain::impl_object_type, (impl_index_meta_object_type) (impl_asset_dynamic_data_type) (impl_asset_bitasset_data_type) - (impl_committee_member_feeds_object_type) (impl_account_balance_object_type) (impl_account_statistics_object_type) - (impl_account_debt_object_type) (impl_transaction_object_type) (impl_block_summary_object_type) (impl_account_transaction_history_object_type) @@ -417,7 +411,6 @@ FC_REFLECT_TYPENAME( graphene::chain::asset_dynamic_data_id_type ) FC_REFLECT_TYPENAME( graphene::chain::asset_bitasset_data_id_type ) FC_REFLECT_TYPENAME( graphene::chain::account_balance_id_type ) FC_REFLECT_TYPENAME( graphene::chain::account_statistics_id_type ) -FC_REFLECT_TYPENAME( graphene::chain::account_debt_id_type ) FC_REFLECT_TYPENAME( graphene::chain::transaction_obj_id_type ) FC_REFLECT_TYPENAME( graphene::chain::block_summary_id_type ) FC_REFLECT_TYPENAME( graphene::chain::account_transaction_history_id_type ) diff --git a/libraries/chain/protocol/operations.cpp b/libraries/chain/protocol/operations.cpp index 7dadf232..9743b686 100644 --- a/libraries/chain/protocol/operations.cpp +++ b/libraries/chain/protocol/operations.cpp @@ -133,6 +133,23 @@ struct required_owner_visitor }; +struct get_impacted_account_visitor +{ + flat_set& _impacted; + get_impacted_account_visitor( flat_set& impact ):_impacted(impact) {} + typedef void result_type; + + template + void operator()( const T& o )const + { + o.get_impacted_accounts( _impacted ); + } +}; +void operation_get_impacted_accounts( const operation& op, flat_set& result ) +{ + op.visit( get_impacted_account_visitor( result ) ); +} + void operation_get_required_authorities( const operation& op, vector& result ) { op.visit( required_auth_visitor( result ) ); diff --git a/libraries/chain/protocol/transaction.cpp b/libraries/chain/protocol/transaction.cpp index 39d43c5e..e8eb8e7f 100644 --- a/libraries/chain/protocol/transaction.cpp +++ b/libraries/chain/protocol/transaction.cpp @@ -275,4 +275,10 @@ void signed_transaction::verify_authority( const std::function& impacted ) const +{ + for( const auto& op : operations ) + operation_get_impacted_accounts( op, impacted ); +} + } } // graphene::chain diff --git a/libraries/plugins/account_history/account_history_plugin.cpp b/libraries/plugins/account_history/account_history_plugin.cpp index 3e5146d6..24d5b0f4 100644 --- a/libraries/plugins/account_history/account_history_plugin.cpp +++ b/libraries/plugins/account_history/account_history_plugin.cpp @@ -68,12 +68,6 @@ struct operation_get_impacted_accounts {} typedef void result_type; - void add_authority( const authority& a )const - { - for( auto& item : a.account_auths ) - _impacted.insert( item.first ); - } - void operator()( const account_create_operation& o )const { _impacted.insert( _op_history.result.get() ); }