From e5106c15a376c3ee71d72aa7a19278ba5b43a4b0 Mon Sep 17 00:00:00 2001 From: Daniel Larimer Date: Wed, 26 Aug 2015 18:01:48 -0400 Subject: [PATCH] update subscribe callback --- libraries/app/api.cpp | 164 ++++++++---------- libraries/app/include/graphene/app/api.hpp | 42 ++--- .../delayed_node/delayed_node_plugin.cpp | 20 ++- libraries/wallet/wallet.cpp | 15 +- programs/CMakeLists.txt | 2 +- 5 files changed, 100 insertions(+), 143 deletions(-) diff --git a/libraries/app/api.cpp b/libraries/app/api.cpp index ef7f3a13..e851258c 100644 --- a/libraries/app/api.cpp +++ b/libraries/app/api.cpp @@ -52,8 +52,44 @@ namespace graphene { namespace app { elog("freeing database api ${x}", ("x",int64_t(this)) ); } + void database_api::set_subscribe_callback( std::function cb, bool clear_filter ) + { + edump((clear_filter)); + _subscribe_callback = cb; + if( clear_filter || !cb ) + { + static fc::bloom_parameters param; + param.projected_element_count = 10000; + param.false_positive_probability = 1.0/10000; + param.maximum_size = 1024*8*8*2; + _subscribe_filter = fc::bloom_filter(param); + } + } + + void database_api::subscribe_to_id( object_id_type id )const + { + idump((id)); + if( _subscribe_callback ) + _subscribe_filter.insert( (const unsigned char*)&id, sizeof(id) ); + else + elog( "unable to subscribe to id because there is no subscribe callback set" ); + } fc::variants database_api::get_objects(const vector& ids)const { + if( _subscribe_callback ) { + for( auto id : ids ) + { + if( id.type() == operation_history_object_type && id.space() == protocol_ids ) continue; + if( id.type() == impl_account_transaction_history_object_type && id.space() == implementation_ids ) continue; + + subscribe_to_id( id ); + } + } + else + { + elog( "getObjects without subscribe callback??" ); + } + fc::variants result; result.reserve(ids.size()); @@ -150,7 +186,10 @@ namespace graphene { namespace app { std::transform(account_ids.begin(), account_ids.end(), std::back_inserter(result), [this](account_id_type id) -> optional { if(auto o = _db.find(id)) + { + subscribe_to_id( id ); return *o; + } return {}; }); return result; @@ -162,7 +201,10 @@ namespace graphene { namespace app { std::transform(asset_ids.begin(), asset_ids.end(), std::back_inserter(result), [this](asset_id_type id) -> optional { if(auto o = _db.find(id)) + { + subscribe_to_id( id ); return *o; + } return {}; }); return result; @@ -182,35 +224,17 @@ namespace graphene { namespace app { for( auto itr = accounts_by_name.lower_bound(lower_bound_name); limit-- && itr != accounts_by_name.end(); ++itr ) + { result.insert(make_pair(itr->name, itr->get_id())); + if( limit == 1 ) + subscribe_to_id( itr->get_id() ); + } return result; } - void database_api::unsubscribe_from_accounts( const vector& names_or_ids ) - { - for (const std::string& account_name_or_id : names_or_ids) - { - const account_object* account = nullptr; - if (std::isdigit(account_name_or_id[0])) - account = _db.find(fc::variant(account_name_or_id).as()); - else - { - const auto& idx = _db.get_index_type().indices().get(); - auto itr = idx.find(account_name_or_id); - if (itr != idx.end()) - account = &*itr; - } - if (account == nullptr) - continue; - _account_subscriptions.erase(account->id); - } - } - - std::map database_api::get_full_accounts(std::function callback, - const vector& names_or_ids, bool subscribe) + std::map database_api::get_full_accounts( const vector& names_or_ids, bool subscribe) { - FC_ASSERT( _account_subscriptions.size() < 1024 ); std::map results; for (const std::string& account_name_or_id : names_or_ids) @@ -231,12 +255,7 @@ namespace graphene { namespace app { if( subscribe ) { ilog( "subscribe to ${id}", ("id",account->name) ); - _account_subscriptions[account->id] = callback; - } - else - { - wlog( "unsubscribe to ${id}", ("id",account->name) ); - _account_subscriptions.erase(account->id); + subscribe_to_id( account->id ); } // fc::mutable_variant_object full_account; @@ -795,7 +814,7 @@ namespace graphene { namespace app { /// we need to ensure the database_api is not deleted for the life of the async operation auto capture_this = shared_from_this(); - if( _account_subscriptions.size() ) + if( _subscribe_callback ) { map > broadcast_queue; for( const auto& obj : objs ) @@ -803,24 +822,21 @@ namespace graphene { namespace app { auto relevant = get_relevant_accounts( obj ); for( const auto& r : relevant ) { - auto sub = _account_subscriptions.find(r); - if( sub != _account_subscriptions.end() ) + if( _subscribe_filter.contains(r) ) broadcast_queue[r].emplace_back(obj->to_variant()); } + if( relevant.size() == 0 && _subscribe_filter.contains(obj->id) ) + broadcast_queue[account_id_type()].emplace_back(obj->to_variant()); } if( broadcast_queue.size() ) { fc::async([capture_this,broadcast_queue,this](){ - for( const auto& item : broadcast_queue ) - { - auto sub = _account_subscriptions.find(item.first); - if( sub != _account_subscriptions.end() ) - sub->second( fc::variant(item.second ) ); - } + _subscribe_callback( fc::variant(broadcast_queue) ); }); } } + if( _market_subscriptions.size() ) { map< pair, vector > broadcast_queue; @@ -850,16 +866,14 @@ namespace graphene { namespace app { void database_api::on_objects_changed(const vector& ids) { - vector my_objects; - map > broadcast_queue; + vector updates; map< pair, vector > market_broadcast_queue; + + idump((ids)); for(auto id : ids) { - if(_subscriptions.find(id) != _subscriptions.end()) - my_objects.push_back(id); - const object* obj = nullptr; - if( _account_subscriptions.size() ) + if( _subscribe_callback ) { obj = _db.find_object( id ); if( obj ) @@ -867,18 +881,23 @@ namespace graphene { namespace app { vector relevant = get_relevant_accounts( obj ); for( const auto& r : relevant ) { - auto sub = _account_subscriptions.find(r); - if( sub != _account_subscriptions.end() ) - broadcast_queue[r].emplace_back(obj->to_variant()); + if( _subscribe_filter.contains(r) ) + updates.emplace_back(obj->to_variant()); } + if( relevant.size() == 0 && _subscribe_filter.contains(obj->id) ) + updates.emplace_back(obj->to_variant()); } else - elog( "unable to find object ${id}", ("id",id) ); + { + if( _subscribe_filter.contains(id) ) + updates.emplace_back(id); // send just the id to indicate removal + } } if( _market_subscriptions.size() ) { - if( !_account_subscriptions.size() ) obj = _db.find_object( id ); + if( !_subscribe_callback ) + obj = _db.find_object( id ); if( obj ) { const limit_order_object* order = dynamic_cast(obj); @@ -897,42 +916,15 @@ namespace graphene { namespace app { /// pushing the future back / popping the prior future if it is complete. /// if a connection hangs then this could get backed up and result in /// a failure to exit cleanly. - fc::async([capture_this,this,broadcast_queue,market_broadcast_queue,my_objects](){ - for( const auto& item : broadcast_queue ) - { - edump( (item) ); - try { - auto sub = _account_subscriptions.find(item.first); - if( sub != _account_subscriptions.end() ) - sub->second( fc::variant(item.second ) ); - } catch ( const fc::exception& e ) - { - edump((e.to_detail_string())); - } - } + fc::async([capture_this,this,updates,market_broadcast_queue](){ + if( _subscribe_callback ) _subscribe_callback( updates ); + for( const auto& item : market_broadcast_queue ) { auto sub = _market_subscriptions.find(item.first); if( sub != _market_subscriptions.end() ) sub->second( fc::variant(item.second ) ); } - for(auto id : my_objects) - { - // just incase _usbscriptions changed between filter and broadcast - auto itr = _subscriptions.find( id ); - if( itr != _subscriptions.end() ) - { - const object* obj = _db.find_object( id ); - if( obj != nullptr ) - { - itr->second(obj->to_variant()); - } - else - { - itr->second(fc::variant(id)); - } - } - } }); } @@ -980,20 +972,6 @@ namespace graphene { namespace app { }); } - - vector database_api::subscribe_to_objects( const std::function& callback, const vector& ids) - { - FC_ASSERT( _subscriptions.size() < 1024 ); - for(auto id : ids) _subscriptions[id] = callback; - return get_objects( ids ); - } - - bool database_api::unsubscribe_from_objects(const vector& ids) - { - for(auto id : ids) _subscriptions.erase(id); - return true; - } - void database_api::subscribe_to_market(std::function callback, asset_id_type a, asset_id_type b) { if(a > b) std::swap(a,b); diff --git a/libraries/app/include/graphene/app/api.hpp b/libraries/app/include/graphene/app/api.hpp index 04ad5cfb..0d459bf5 100644 --- a/libraries/app/include/graphene/app/api.hpp +++ b/libraries/app/include/graphene/app/api.hpp @@ -40,6 +40,7 @@ #include #include +#include namespace graphene { namespace app { using namespace graphene::chain; @@ -176,13 +177,8 @@ namespace graphene { namespace app { * ignored. All other accounts will be retrieved and subscribed. * */ - std::map get_full_accounts(std::function callback, - const vector& names_or_ids, bool subscribe ); + std::map get_full_accounts( const vector& names_or_ids, bool subscribe ); - /** - * Stop receiving updates generated by get_full_accounts() - */ - void unsubscribe_from_accounts( const vector& names_or_ids ); /** * @brief Get limit orders in a given market @@ -277,24 +273,6 @@ namespace graphene { namespace app { */ vector> get_committee_members(const vector& committee_member_ids)const; - /** - * @group Push Notification Methods - * These methods may be used to get push notifications whenever an object or market is changed - * @{ - */ - /** - * @brief Request notifications when some object(s) change - * @param callback Callback method which is called with the new version of a changed object - * @param ids The set of object IDs to watch - * @return get_objects(ids) - */ - vector subscribe_to_objects(const std::function& callback, - const vector& ids); - /** - * @brief Stop receiving notifications for some object(s) - * @param ids The set of object IDs to stop watching - */ - bool unsubscribe_from_objects(const vector& ids); /** * @brief Request notification when the active orders in the market between two assets changes * @param callback Callback method which is called when the market changes @@ -318,7 +296,7 @@ namespace graphene { namespace app { * This unsubscribes from all subscribed markets and objects. */ void cancel_all_subscriptions() - { _subscriptions.clear(); _market_subscriptions.clear(); } + { set_subscribe_callback( std::function(), true); _market_subscriptions.clear(); } ///@} /// @brief Get a hexdump of the serialized binary form of a transaction @@ -377,18 +355,22 @@ namespace graphene { namespace app { */ vector get_required_fees( const vector& ops, asset_id_type id = asset_id_type() )const; + void set_subscribe_callback( std::function cb, bool clear_filter ); private: + void subscribe_to_id( object_id_type id )const; + /** called every time a block is applied to report the objects that were changed */ void on_objects_changed(const vector& ids); void on_objects_removed(const vector& objs); void on_applied_block(); + mutable fc::bloom_filter _subscribe_filter; + std::function _subscribe_callback; + boost::signals2::scoped_connection _change_connection; boost::signals2::scoped_connection _removed_connection; boost::signals2::scoped_connection _applied_block_connection; - map > _subscriptions; - map > _account_subscriptions; - map< pair, std::function > _market_subscriptions; + map< pair, std::function > _market_subscriptions; graphene::chain::database& _db; }; @@ -561,7 +543,6 @@ FC_API(graphene::app::database_api, (get_account_count) (lookup_accounts) (get_full_accounts) - (unsubscribe_from_accounts) (get_account_balances) (get_named_account_balances) (lookup_asset_symbols) @@ -577,8 +558,6 @@ FC_API(graphene::app::database_api, (get_witness_count) (lookup_witness_accounts) (lookup_committee_member_accounts) - (subscribe_to_objects) - (unsubscribe_from_objects) (subscribe_to_market) (unsubscribe_from_market) (cancel_all_subscriptions) @@ -594,6 +573,7 @@ FC_API(graphene::app::database_api, (verify_authority) (get_blinded_balances) (get_required_fees) + (set_subscribe_callback) ) FC_API(graphene::app::history_api, (get_account_history) diff --git a/libraries/plugins/delayed_node/delayed_node_plugin.cpp b/libraries/plugins/delayed_node/delayed_node_plugin.cpp index d0461a39..be5cbe2a 100644 --- a/libraries/plugins/delayed_node/delayed_node_plugin.cpp +++ b/libraries/plugins/delayed_node/delayed_node_plugin.cpp @@ -97,14 +97,26 @@ void delayed_node_plugin::plugin_startup() try { connect(); + my->database_api->set_subscribe_callback([this] (const fc::variant& v) { + auto& updates = v.get_array(); + for( const auto& v : updates ) + { + if( v.is_object() ) + { + auto& obj = v.get_object(); + if( obj["id"].as() == graphene::chain::dynamic_global_property_id_type() ) + { + auto props = v.as(); + sync_with_trusted_node(props.head_block_number); + } + } + } + }, true); + // Go ahead and get in sync now, before subscribing chain::dynamic_global_property_object props = my->database_api->get_dynamic_global_properties(); sync_with_trusted_node(props.head_block_number); - my->database_api->subscribe_to_objects([this] (const fc::variant& v) { - auto props = v.as(); - sync_with_trusted_node(props.head_block_number); - }, {graphene::chain::dynamic_global_property_id_type()}); return; } catch (const fc::exception& e) { elog("Error during connection: ${e}", ("e", e.to_detail_string())); diff --git a/libraries/wallet/wallet.cpp b/libraries/wallet/wallet.cpp index b61c752c..cf9ab236 100644 --- a/libraries/wallet/wallet.cpp +++ b/libraries/wallet/wallet.cpp @@ -375,10 +375,6 @@ public: ("chain_id", _chain_id) ); } init_prototype_ops(); - _remote_db->subscribe_to_objects( [=]( const fc::variant& obj ) - { - fc::async([this]{resync();}, "Resync after block"); - }, {dynamic_global_property_id_type()} ); _wallet.chain_id = _chain_id; _wallet.ws_server = initial_data.ws_server; _wallet.ws_user = initial_data.ws_user; @@ -629,10 +625,7 @@ public: _keys[wif_pub_key] = wif_key; - if( _wallet.update_account(account) ) - _remote_db->subscribe_to_objects([this](const fc::variant& v) { - _wallet.update_account(v.as()); - }, {account.id}); + _wallet.update_account(account); _wallet.extra_keys[account.id].insert(wif_pub_key); @@ -649,17 +642,11 @@ public: if( ! fc::exists( wallet_filename ) ) return false; - if( !_wallet.my_accounts.empty() ) - _remote_db->unsubscribe_from_objects(_wallet.my_account_ids()); _wallet = fc::json::from_file( wallet_filename ).as< wallet_data >(); if( _wallet.chain_id != _chain_id ) FC_THROW( "Wallet chain ID does not match", ("wallet.chain_id", _wallet.chain_id) ("chain_id", _chain_id) ); - if( !_wallet.my_accounts.empty() ) - _remote_db->subscribe_to_objects([this](const fc::variant& v) { - _wallet.update_account(v.as()); - }, _wallet.my_account_ids()); return true; } void save_wallet_file(string wallet_filename = "") diff --git a/programs/CMakeLists.txt b/programs/CMakeLists.txt index 75bb6af3..ab267cb1 100644 --- a/programs/CMakeLists.txt +++ b/programs/CMakeLists.txt @@ -6,7 +6,7 @@ add_subdirectory( size_checker ) set(BUILD_QT_GUI FALSE CACHE BOOL "Build the Qt-based light client GUI") if(BUILD_QT_GUI) - add_subdirectory(light_client) +# add_subdirectory(light_client) endif() set(BUILD_WEB_NODE FALSE CACHE BOOL "Build the Qt-based full node with web GUI") if(BUILD_WEB_NODE)