diff --git a/libraries/app/database_api.cpp b/libraries/app/database_api.cpp index 9bf3bfee..356da5d0 100644 --- a/libraries/app/database_api.cpp +++ b/libraries/app/database_api.cpp @@ -41,6 +41,8 @@ #define GET_REQUIRED_FEES_MAX_RECURSION 4 +typedef std::map< std::pair, std::vector > market_queue_type; + namespace graphene { namespace app { class database_api_impl; @@ -56,7 +58,7 @@ class database_api_impl : public std::enable_shared_from_this fc::variants get_objects(const vector& ids)const; // Subscriptions - void set_subscribe_callback( std::function cb, bool clear_filter ); + void set_subscribe_callback( std::function cb, bool notify_remove_create ); void set_pending_transaction_callback( std::function cb ); void set_block_applied_callback( std::function cb ); void cancel_all_subscriptions(); @@ -167,22 +169,41 @@ class database_api_impl : public std::enable_shared_from_this { if( !_subscribe_callback ) return false; - return true; + return _subscribe_filter.contains( i ); } - void broadcast_updates( const vector& updates ); + template + void enqueue_if_subscribed_to_market(const object* obj, market_queue_type& queue, bool full_object=true) + { + const T* order = dynamic_cast(obj); + FC_ASSERT( order != nullptr); + auto market = order->get_market(); + + auto sub = _market_subscriptions.find( market ); + if( sub != _market_subscriptions.end() ) { + queue[market].emplace_back( full_object ? obj->to_variant() : fc::variant(obj->id) ); + } + } + + void broadcast_updates( const vector& updates ); + void broadcast_market_updates( const market_queue_type& queue); + void check_for_market_objects(const vector& ids); + /** called every time a block is applied to report the objects that were changed */ + void on_objects_new(const vector& ids); void on_objects_changed(const vector& ids); void on_objects_removed(const vector& objs); void on_applied_block(); - mutable fc::bloom_filter _subscribe_filter; + bool _notify_remove_create = false; + mutable fc::bloom_filter _subscribe_filter; std::function _subscribe_callback; std::function _pending_trx_callback; std::function _block_applied_callback; + boost::signals2::scoped_connection _new_connection; boost::signals2::scoped_connection _change_connection; boost::signals2::scoped_connection _removed_connection; boost::signals2::scoped_connection _applied_block_connection; @@ -205,6 +226,9 @@ database_api::~database_api() {} database_api_impl::database_api_impl( graphene::chain::database& db ):_db(db) { wlog("creating database api ${x}", ("x",int64_t(this)) ); + _new_connection = _db.new_objects.connect([this](const vector& ids) { + on_objects_new(ids); + }); _change_connection = _db.changed_objects.connect([this](const vector& ids) { on_objects_changed(ids); }); @@ -265,24 +289,23 @@ fc::variants database_api_impl::get_objects(const vector& ids)co // // ////////////////////////////////////////////////////////////////////// -void database_api::set_subscribe_callback( std::function cb, bool clear_filter ) +void database_api::set_subscribe_callback( std::function cb, bool notify_remove_create ) { - my->set_subscribe_callback( cb, clear_filter ); + my->set_subscribe_callback( cb, notify_remove_create ); } -void database_api_impl::set_subscribe_callback( std::function cb, bool clear_filter ) +void database_api_impl::set_subscribe_callback( std::function cb, bool notify_remove_create ) { - edump((clear_filter)); + //edump((clear_filter)); _subscribe_callback = cb; - if( clear_filter || !cb ) - { - static fc::bloom_parameters param; - param.projected_element_count = 10000; - param.false_positive_probability = 1.0/10000; - param.maximum_size = 1024*8*8*2; - param.compute_optimal_parameters(); - _subscribe_filter = fc::bloom_filter(param); - } + _notify_remove_create = notify_remove_create; + + static fc::bloom_parameters param; + param.projected_element_count = 10000; + param.false_positive_probability = 1.0/10000; + param.maximum_size = 1024*8*8*2; + param.compute_optimal_parameters(); + _subscribe_filter = fc::bloom_filter(param); } void database_api::set_pending_transaction_callback( std::function cb ) @@ -586,7 +609,6 @@ std::map database_api_impl::get_full_accounts( const if( subscribe ) { - ilog( "subscribe to ${id}", ("id",account->name) ); subscribe_to_item( account->id ); } @@ -648,7 +670,7 @@ std::map database_api_impl::get_full_accounts( const [&acnt] (const call_order_object& call) { acnt.call_orders.emplace_back(call); }); - + // get assets issued by user auto asset_range = _db.get_index_type().indices().get().equal_range(account->id); std::for_each(asset_range.first, asset_range.second, @@ -1867,10 +1889,27 @@ vector database_api_impl::get_blinded_balances( const fl void database_api_impl::broadcast_updates( const vector& updates ) { - if( updates.size() ) { + if( updates.size() && _subscribe_callback ) { auto capture_this = shared_from_this(); fc::async([capture_this,updates](){ - capture_this->_subscribe_callback( fc::variant(updates) ); + if(capture_this->_subscribe_callback) + capture_this->_subscribe_callback( fc::variant(updates) ); + }); + } +} + +void database_api_impl::broadcast_market_updates( const market_queue_type& queue) +{ + if( queue.size() ) + { + auto capture_this = shared_from_this(); + fc::async([capture_this, this, queue](){ + for( const auto& item : queue ) + { + auto sub = _market_subscriptions.find(item.first); + if( sub != _market_subscriptions.end() ) + sub->second( fc::variant(item.second ) ); + } }); } } @@ -1880,95 +1919,89 @@ void database_api_impl::on_objects_removed( const vector& objs ) /// we need to ensure the database_api is not deleted for the life of the async operation if( _subscribe_callback ) { - vector updates; - updates.reserve(objs.size()); + vector updates; for( auto obj : objs ) - updates.emplace_back( obj->id ); + { + if ( is_subscribed_to_item(obj->id) ) + { + updates.push_back( fc::variant(obj->id) ); + } + } + broadcast_updates( updates ); } if( _market_subscriptions.size() ) { - map< pair, vector > broadcast_queue; + market_queue_type broadcast_queue; + for( const auto& obj : objs ) { - const limit_order_object* order = dynamic_cast(obj); - if( order ) + if( obj->id.is() ) { - auto sub = _market_subscriptions.find( order->get_market() ); - if( sub != _market_subscriptions.end() ) - broadcast_queue[order->get_market()].emplace_back( order->id ); + enqueue_if_subscribed_to_market( obj, broadcast_queue, false ); + } + else if( obj->id.is() ) + { + enqueue_if_subscribed_to_market( obj, broadcast_queue, false ); } } - if( broadcast_queue.size() ) - { - auto capture_this = shared_from_this(); - fc::async([capture_this,this,broadcast_queue](){ - for( const auto& item : broadcast_queue ) - { - auto sub = _market_subscriptions.find(item.first); - if( sub != _market_subscriptions.end() ) - sub->second( fc::variant(item.second ) ); - } - }); - } + + broadcast_market_updates(broadcast_queue); } } +void database_api_impl::check_for_market_objects(const vector& ids) +{ + if( _market_subscriptions.size() ) + { + market_queue_type broadcast_queue; + + for(auto id : ids) + { + if( id.is() ) + { + enqueue_if_subscribed_to_market( _db.find_object(id), broadcast_queue ); + } + else if( id.is() ) + { + enqueue_if_subscribed_to_market( _db.find_object(id), broadcast_queue ); + } + } + + broadcast_market_updates(broadcast_queue); + } +} + +void database_api_impl::on_objects_new(const vector& ids) +{ + check_for_market_objects(ids); +} + void database_api_impl::on_objects_changed(const vector& ids) { - vector updates; - map< pair, vector > market_broadcast_queue; - - for(auto id : ids) + if( _subscribe_callback ) { - const object* obj = nullptr; - if( _subscribe_callback ) + vector updates; + + for(auto id : ids) { - obj = _db.find_object( id ); - if( obj ) + const object* obj = nullptr; + if( is_subscribed_to_item(id) ) { - updates.emplace_back( obj->to_variant() ); - } - else - { - updates.emplace_back(id); // send just the id to indicate removal - } - } - - if( _market_subscriptions.size() ) - { - if( !_subscribe_callback ) obj = _db.find_object( id ); - if( obj ) - { - const limit_order_object* order = dynamic_cast(obj); - if( order ) + if( obj ) { - auto sub = _market_subscriptions.find( order->get_market() ); - if( sub != _market_subscriptions.end() ) - market_broadcast_queue[order->get_market()].emplace_back( order->id ); + updates.emplace_back( obj->to_variant() ); } } } + + broadcast_updates(updates); } - auto capture_this = shared_from_this(); - - /// pushing the future back / popping the prior future if it is complete. - /// if a connection hangs then this could get backed up and result in - /// a failure to exit cleanly. - fc::async([capture_this,this,updates,market_broadcast_queue](){ - if( _subscribe_callback ) _subscribe_callback( updates ); - - for( const auto& item : market_broadcast_queue ) - { - auto sub = _market_subscriptions.find(item.first); - if( sub != _market_subscriptions.end() ) - sub->second( fc::variant(item.second ) ); - } - }); + check_for_market_objects(ids); } /** note: this method cannot yield because it is called in the middle of diff --git a/libraries/chain/db_block.cpp b/libraries/chain/db_block.cpp index 63306e66..f55f0396 100644 --- a/libraries/chain/db_block.cpp +++ b/libraries/chain/db_block.cpp @@ -241,7 +241,7 @@ processed_transaction database::_push_transaction( const signed_transaction& trx auto processed_trx = _apply_transaction( trx ); _pending_tx.push_back(processed_trx); - notify_changed_objects(); + // notify_changed_objects(); // The transaction applied successfully. Merge its changes into the pending block session. temp_session.merge(); @@ -549,19 +549,26 @@ void database::notify_changed_objects() if( _undo_db.enabled() ) { const auto& head_undo = _undo_db.head(); + + vector new_ids; new_ids.reserve(head_undo.new_ids.size()); + for( const auto& item : head_undo.new_ids ) new_ids.push_back(item); + vector changed_ids; changed_ids.reserve(head_undo.old_values.size()); for( const auto& item : head_undo.old_values ) changed_ids.push_back(item.first); - for( const auto& item : head_undo.new_ids ) changed_ids.push_back(item); - vector removed; - removed.reserve( head_undo.removed.size() ); + + vector removed_ids; removed_ids.reserve( head_undo.removed.size() ); + vector removed; removed.reserve( head_undo.removed.size() ); for( const auto& item : head_undo.removed ) { - changed_ids.push_back( item.first ); - removed.emplace_back( item.second.get() ); + removed_ids.emplace_back( item.first ); + removed.emplace_back( item.second.get() ); } + + new_objects(new_ids); changed_objects(changed_ids); + removed_objects(removed_ids, removed); } -} FC_CAPTURE_AND_RETHROW() } +} FC_CAPTURE_AND_LOG( () ) } processed_transaction database::apply_transaction(const signed_transaction& trx, uint32_t skip) { diff --git a/libraries/chain/include/graphene/chain/database.hpp b/libraries/chain/include/graphene/chain/database.hpp index 595e254c..217bc7ff 100644 --- a/libraries/chain/include/graphene/chain/database.hpp +++ b/libraries/chain/include/graphene/chain/database.hpp @@ -189,6 +189,12 @@ namespace graphene { namespace chain { */ fc::signal on_pending_transaction; + /** + * Emitted After a block has been applied and committed. The callback + * should not yield and should execute quickly. + */ + fc::signal&)> new_objects; + /** * Emitted After a block has been applied and committed. The callback * should not yield and should execute quickly. @@ -198,7 +204,7 @@ namespace graphene { namespace chain { /** this signal is emitted any time an object is removed and contains a * pointer to the last value of every object that was removed. */ - fc::signal&)> removed_objects; + fc::signal&, const vector&)> removed_objects; //////////////////// db_witness_schedule.cpp //////////////////// diff --git a/libraries/chain/include/graphene/chain/market_object.hpp b/libraries/chain/include/graphene/chain/market_object.hpp index c41def13..b56f4e9c 100644 --- a/libraries/chain/include/graphene/chain/market_object.hpp +++ b/libraries/chain/include/graphene/chain/market_object.hpp @@ -120,6 +120,13 @@ class call_order_object : public abstract_object share_type collateral; ///< call_price.base.asset_id, access via get_collateral share_type debt; ///< call_price.quote.asset_id, access via get_collateral price call_price; ///< Debt / Collateral + + pair get_market()const + { + auto tmp = std::make_pair( call_price.base.asset_id, call_price.quote.asset_id ); + if( tmp.first > tmp.second ) std::swap( tmp.first, tmp.second ); + return tmp; + } }; /**