diff --git a/libraries/CMakeLists.txt b/libraries/CMakeLists.txt index 6af806f9..be71012d 100644 --- a/libraries/CMakeLists.txt +++ b/libraries/CMakeLists.txt @@ -4,6 +4,7 @@ add_subdirectory( deterministic_openssl_rand ) add_subdirectory( chain ) add_subdirectory( egenesis ) add_subdirectory( net ) +#add_subdirectory( p2p ) add_subdirectory( time ) add_subdirectory( utilities ) add_subdirectory( app ) diff --git a/libraries/app/api.cpp b/libraries/app/api.cpp index a946491d..b0fab7fe 100644 --- a/libraries/app/api.cpp +++ b/libraries/app/api.cpp @@ -52,8 +52,45 @@ namespace graphene { namespace app { elog("freeing database api ${x}", ("x",int64_t(this)) ); } + void database_api::set_subscribe_callback( std::function cb, bool clear_filter ) + { + edump((clear_filter)); + _subscribe_callback = cb; + if( clear_filter || !cb ) + { + static fc::bloom_parameters param; + param.projected_element_count = 10000; + param.false_positive_probability = 1.0/10000; + param.maximum_size = 1024*8*8*2; + param.compute_optimal_parameters(); + _subscribe_filter = fc::bloom_filter(param); + } + } + + void database_api::subscribe_to_id( object_id_type id )const + { + idump((id)); + if( _subscribe_callback ) + _subscribe_filter.insert( (const unsigned char*)&id, sizeof(id) ); + else + elog( "unable to subscribe to id because there is no subscribe callback set" ); + } fc::variants database_api::get_objects(const vector& ids)const { + if( _subscribe_callback ) { + for( auto id : ids ) + { + if( id.type() == operation_history_object_type && id.space() == protocol_ids ) continue; + if( id.type() == impl_account_transaction_history_object_type && id.space() == implementation_ids ) continue; + + subscribe_to_id( id ); + } + } + else + { + elog( "getObjects without subscribe callback??" ); + } + fc::variants result; result.reserve(ids.size()); @@ -150,7 +187,10 @@ namespace graphene { namespace app { std::transform(account_ids.begin(), account_ids.end(), std::back_inserter(result), [this](account_id_type id) -> optional { if(auto o = _db.find(id)) + { + subscribe_to_id( id ); return *o; + } return {}; }); return result; @@ -162,7 +202,10 @@ namespace graphene { namespace app { std::transform(asset_ids.begin(), asset_ids.end(), std::back_inserter(result), [this](asset_id_type id) -> optional { if(auto o = _db.find(id)) + { + subscribe_to_id( id ); return *o; + } return {}; }); return result; @@ -182,35 +225,17 @@ namespace graphene { namespace app { for( auto itr = accounts_by_name.lower_bound(lower_bound_name); limit-- && itr != accounts_by_name.end(); ++itr ) + { result.insert(make_pair(itr->name, itr->get_id())); + if( limit == 1 ) + subscribe_to_id( itr->get_id() ); + } return result; } - void database_api::unsubscribe_from_accounts( const vector& names_or_ids ) - { - for (const std::string& account_name_or_id : names_or_ids) - { - const account_object* account = nullptr; - if (std::isdigit(account_name_or_id[0])) - account = _db.find(fc::variant(account_name_or_id).as()); - else - { - const auto& idx = _db.get_index_type().indices().get(); - auto itr = idx.find(account_name_or_id); - if (itr != idx.end()) - account = &*itr; - } - if (account == nullptr) - continue; - _account_subscriptions.erase(account->id); - } - } - - std::map database_api::get_full_accounts(std::function callback, - const vector& names_or_ids, bool subscribe) + std::map database_api::get_full_accounts( const vector& names_or_ids, bool subscribe) { - FC_ASSERT( _account_subscriptions.size() < 1024 ); std::map results; for (const std::string& account_name_or_id : names_or_ids) @@ -231,12 +256,7 @@ namespace graphene { namespace app { if( subscribe ) { ilog( "subscribe to ${id}", ("id",account->name) ); - _account_subscriptions[account->id] = callback; - } - else - { - wlog( "unsubscribe to ${id}", ("id",account->name) ); - _account_subscriptions.erase(account->id); + subscribe_to_id( account->id ); } // fc::mutable_variant_object full_account; @@ -794,7 +814,7 @@ namespace graphene { namespace app { /// we need to ensure the database_api is not deleted for the life of the async operation auto capture_this = shared_from_this(); - if( _account_subscriptions.size() ) + if( _subscribe_callback ) { map > broadcast_queue; for( const auto& obj : objs ) @@ -802,24 +822,24 @@ namespace graphene { namespace app { auto relevant = get_relevant_accounts( obj ); for( const auto& r : relevant ) { - auto sub = _account_subscriptions.find(r); - if( sub != _account_subscriptions.end() ) + if( _subscribe_filter.contains(r) ) + { broadcast_queue[r].emplace_back(obj->to_variant()); + break; + } } + if( relevant.size() == 0 && _subscribe_filter.contains(obj->id) ) + broadcast_queue[account_id_type()].emplace_back(obj->to_variant()); } if( broadcast_queue.size() ) { fc::async([capture_this,broadcast_queue,this](){ - for( const auto& item : broadcast_queue ) - { - auto sub = _account_subscriptions.find(item.first); - if( sub != _account_subscriptions.end() ) - sub->second( fc::variant(item.second ) ); - } + _subscribe_callback( fc::variant(broadcast_queue) ); }); } } + if( _market_subscriptions.size() ) { map< pair, vector > broadcast_queue; @@ -849,16 +869,14 @@ namespace graphene { namespace app { void database_api::on_objects_changed(const vector& ids) { - vector my_objects; - map > broadcast_queue; + vector updates; map< pair, vector > market_broadcast_queue; + + idump((ids)); for(auto id : ids) { - if(_subscriptions.find(id) != _subscriptions.end()) - my_objects.push_back(id); - const object* obj = nullptr; - if( _account_subscriptions.size() ) + if( _subscribe_callback ) { obj = _db.find_object( id ); if( obj ) @@ -866,18 +884,26 @@ namespace graphene { namespace app { vector relevant = get_relevant_accounts( obj ); for( const auto& r : relevant ) { - auto sub = _account_subscriptions.find(r); - if( sub != _account_subscriptions.end() ) - broadcast_queue[r].emplace_back(obj->to_variant()); + if( _subscribe_filter.contains(r) ) + { + updates.emplace_back(obj->to_variant()); + break; + } } + if( relevant.size() == 0 && _subscribe_filter.contains(obj->id) ) + updates.emplace_back(obj->to_variant()); } else - elog( "unable to find object ${id}", ("id",id) ); + { + if( _subscribe_filter.contains(id) ) + updates.emplace_back(id); // send just the id to indicate removal + } } if( _market_subscriptions.size() ) { - if( !_account_subscriptions.size() ) obj = _db.find_object( id ); + if( !_subscribe_callback ) + obj = _db.find_object( id ); if( obj ) { const limit_order_object* order = dynamic_cast(obj); @@ -896,42 +922,15 @@ namespace graphene { namespace app { /// pushing the future back / popping the prior future if it is complete. /// if a connection hangs then this could get backed up and result in /// a failure to exit cleanly. - fc::async([capture_this,this,broadcast_queue,market_broadcast_queue,my_objects](){ - for( const auto& item : broadcast_queue ) - { - edump( (item) ); - try { - auto sub = _account_subscriptions.find(item.first); - if( sub != _account_subscriptions.end() ) - sub->second( fc::variant(item.second ) ); - } catch ( const fc::exception& e ) - { - edump((e.to_detail_string())); - } - } + fc::async([capture_this,this,updates,market_broadcast_queue](){ + if( _subscribe_callback ) _subscribe_callback( updates ); + for( const auto& item : market_broadcast_queue ) { auto sub = _market_subscriptions.find(item.first); if( sub != _market_subscriptions.end() ) sub->second( fc::variant(item.second ) ); } - for(auto id : my_objects) - { - // just incase _usbscriptions changed between filter and broadcast - auto itr = _subscriptions.find( id ); - if( itr != _subscriptions.end() ) - { - const object* obj = _db.find_object( id ); - if( obj != nullptr ) - { - itr->second(obj->to_variant()); - } - else - { - itr->second(fc::variant(id)); - } - } - } }); } @@ -979,20 +978,6 @@ namespace graphene { namespace app { }); } - - vector database_api::subscribe_to_objects( const std::function& callback, const vector& ids) - { - FC_ASSERT( _subscriptions.size() < 1024 ); - for(auto id : ids) _subscriptions[id] = callback; - return get_objects( ids ); - } - - bool database_api::unsubscribe_from_objects(const vector& ids) - { - for(auto id : ids) _subscriptions.erase(id); - return true; - } - void database_api::subscribe_to_market(std::function callback, asset_id_type a, asset_id_type b) { if(a > b) std::swap(a,b); diff --git a/libraries/app/include/graphene/app/api.hpp b/libraries/app/include/graphene/app/api.hpp index 04ad5cfb..0d459bf5 100644 --- a/libraries/app/include/graphene/app/api.hpp +++ b/libraries/app/include/graphene/app/api.hpp @@ -40,6 +40,7 @@ #include #include +#include namespace graphene { namespace app { using namespace graphene::chain; @@ -176,13 +177,8 @@ namespace graphene { namespace app { * ignored. All other accounts will be retrieved and subscribed. * */ - std::map get_full_accounts(std::function callback, - const vector& names_or_ids, bool subscribe ); + std::map get_full_accounts( const vector& names_or_ids, bool subscribe ); - /** - * Stop receiving updates generated by get_full_accounts() - */ - void unsubscribe_from_accounts( const vector& names_or_ids ); /** * @brief Get limit orders in a given market @@ -277,24 +273,6 @@ namespace graphene { namespace app { */ vector> get_committee_members(const vector& committee_member_ids)const; - /** - * @group Push Notification Methods - * These methods may be used to get push notifications whenever an object or market is changed - * @{ - */ - /** - * @brief Request notifications when some object(s) change - * @param callback Callback method which is called with the new version of a changed object - * @param ids The set of object IDs to watch - * @return get_objects(ids) - */ - vector subscribe_to_objects(const std::function& callback, - const vector& ids); - /** - * @brief Stop receiving notifications for some object(s) - * @param ids The set of object IDs to stop watching - */ - bool unsubscribe_from_objects(const vector& ids); /** * @brief Request notification when the active orders in the market between two assets changes * @param callback Callback method which is called when the market changes @@ -318,7 +296,7 @@ namespace graphene { namespace app { * This unsubscribes from all subscribed markets and objects. */ void cancel_all_subscriptions() - { _subscriptions.clear(); _market_subscriptions.clear(); } + { set_subscribe_callback( std::function(), true); _market_subscriptions.clear(); } ///@} /// @brief Get a hexdump of the serialized binary form of a transaction @@ -377,18 +355,22 @@ namespace graphene { namespace app { */ vector get_required_fees( const vector& ops, asset_id_type id = asset_id_type() )const; + void set_subscribe_callback( std::function cb, bool clear_filter ); private: + void subscribe_to_id( object_id_type id )const; + /** called every time a block is applied to report the objects that were changed */ void on_objects_changed(const vector& ids); void on_objects_removed(const vector& objs); void on_applied_block(); + mutable fc::bloom_filter _subscribe_filter; + std::function _subscribe_callback; + boost::signals2::scoped_connection _change_connection; boost::signals2::scoped_connection _removed_connection; boost::signals2::scoped_connection _applied_block_connection; - map > _subscriptions; - map > _account_subscriptions; - map< pair, std::function > _market_subscriptions; + map< pair, std::function > _market_subscriptions; graphene::chain::database& _db; }; @@ -561,7 +543,6 @@ FC_API(graphene::app::database_api, (get_account_count) (lookup_accounts) (get_full_accounts) - (unsubscribe_from_accounts) (get_account_balances) (get_named_account_balances) (lookup_asset_symbols) @@ -577,8 +558,6 @@ FC_API(graphene::app::database_api, (get_witness_count) (lookup_witness_accounts) (lookup_committee_member_accounts) - (subscribe_to_objects) - (unsubscribe_from_objects) (subscribe_to_market) (unsubscribe_from_market) (cancel_all_subscriptions) @@ -594,6 +573,7 @@ FC_API(graphene::app::database_api, (verify_authority) (get_blinded_balances) (get_required_fees) + (set_subscribe_callback) ) FC_API(graphene::app::history_api, (get_account_history) diff --git a/libraries/chain/db_update.cpp b/libraries/chain/db_update.cpp index 8c7a00ae..11e5c0e0 100644 --- a/libraries/chain/db_update.cpp +++ b/libraries/chain/db_update.cpp @@ -44,7 +44,7 @@ void database::update_global_dynamic_data( const signed_block& b ) modify( _dgp, [&]( dynamic_global_property_object& dgp ){ if( BOOST_UNLIKELY( b.block_num() == 1 ) ) dgp.recently_missed_count = 0; - else if( _checkpoints.size() && _checkpoints.rbegin()->first >= b.block_num() ) + else if( _checkpoints.size() && _checkpoints.rbegin()->first >= b.block_num() ) dgp.recently_missed_count = 0; else if( missed_blocks ) dgp.recently_missed_count += GRAPHENE_RECENTLY_MISSED_COUNT_INCREMENT*missed_blocks; diff --git a/libraries/chain/db_witness_schedule.cpp b/libraries/chain/db_witness_schedule.cpp index d12bf69e..29ffeb7b 100644 --- a/libraries/chain/db_witness_schedule.cpp +++ b/libraries/chain/db_witness_schedule.cpp @@ -45,12 +45,11 @@ witness_id_type database::get_scheduled_witness( uint32_t slot_num )const const flat_set< witness_id_type >& active_witnesses = get_global_properties().active_witnesses; uint32_t n = active_witnesses.size(); - uint64_t min_witness_separation = (n / 2)+1; + uint64_t min_witness_separation = (n / 2); /// should work in cases where n is 0,1, and 2 uint64_t current_aslot = get_dynamic_global_properties().current_aslot + slot_num; uint64_t start_of_current_round_aslot = current_aslot - (current_aslot % n); - uint64_t first_ineligible_aslot = std::min( - start_of_current_round_aslot, current_aslot - min_witness_separation ); + uint64_t first_ineligible_aslot = std::min( start_of_current_round_aslot + 1, current_aslot - min_witness_separation ); // // overflow analysis of above subtraction: // @@ -76,7 +75,9 @@ witness_id_type database::get_scheduled_witness( uint32_t slot_num )const if( wit.last_aslot >= first_ineligible_aslot ) continue; - uint64_t k = now_hi + uint64_t(wit_id); + /// High performance random generator + /// http://xorshift.di.unimi.it/ + uint64_t k = now_hi + uint64_t(wit_id)*2685821657736338717ULL; k ^= (k >> 12); k ^= (k << 25); k ^= (k >> 27); @@ -93,8 +94,19 @@ witness_id_type database::get_scheduled_witness( uint32_t slot_num )const // at most K elements are susceptible to the filter, // otherwise we have an inconsistent database (such as // wit.last_aslot values that are non-unique or in the future) + if( !success ) { + edump((best_k)(slot_num)(first_ineligible_aslot)(current_aslot)(start_of_current_round_aslot)(min_witness_separation)(active_witnesses.size())); + + for( const witness_id_type& wit_id : active_witnesses ) + { + const witness_object& wit = wit_id(*this); + if( wit.last_aslot >= first_ineligible_aslot ) + idump((wit_id)(wit.last_aslot)); + } + + assert( success ); + } - assert( success ); return best_wit; } diff --git a/libraries/chain/include/graphene/chain/witness_object.hpp b/libraries/chain/include/graphene/chain/witness_object.hpp index 8fff82a0..ddee1958 100644 --- a/libraries/chain/include/graphene/chain/witness_object.hpp +++ b/libraries/chain/include/graphene/chain/witness_object.hpp @@ -44,6 +44,7 @@ namespace graphene { namespace chain { struct by_account; struct by_vote_id; + struct by_last_block; using witness_multi_index_type = multi_index_container< witness_object, indexed_by< diff --git a/libraries/db/include/graphene/db/object_id.hpp b/libraries/db/include/graphene/db/object_id.hpp index 84428c7e..eb784f90 100644 --- a/libraries/db/include/graphene/db/object_id.hpp +++ b/libraries/db/include/graphene/db/object_id.hpp @@ -54,6 +54,12 @@ namespace graphene { namespace db { object_id_type& operator++(int) { ++number; return *this; } object_id_type& operator++() { ++number; return *this; } + friend object_id_type operator+(const object_id_type& a, int delta ) { + return object_id_type( a.space(), a.type(), a.instance() + delta ); + } + friend object_id_type operator+(const object_id_type& a, int64_t delta ) { + return object_id_type( a.space(), a.type(), a.instance() + delta ); + } friend size_t hash_value( object_id_type v ) { return std::hash()(v.number); } friend bool operator < ( const object_id_type& a, const object_id_type& b ) @@ -83,6 +89,10 @@ namespace graphene { namespace db { object_id( object_id_type id ):instance(id.instance()) { } + + friend object_id operator+(const object_id a, int64_t delta ) { return object_id( uint64_t(a.instance.value+delta) ); } + friend object_id operator+(const object_id a, int delta ) { return object_id( uint64_t(a.instance.value+delta) ); } + operator object_id_type()const { return object_id_type( SpaceID, TypeID, instance.value ); } operator uint64_t()const { return object_id_type( *this ).number; } diff --git a/libraries/plugins/delayed_node/delayed_node_plugin.cpp b/libraries/plugins/delayed_node/delayed_node_plugin.cpp index d0461a39..be5cbe2a 100644 --- a/libraries/plugins/delayed_node/delayed_node_plugin.cpp +++ b/libraries/plugins/delayed_node/delayed_node_plugin.cpp @@ -97,14 +97,26 @@ void delayed_node_plugin::plugin_startup() try { connect(); + my->database_api->set_subscribe_callback([this] (const fc::variant& v) { + auto& updates = v.get_array(); + for( const auto& v : updates ) + { + if( v.is_object() ) + { + auto& obj = v.get_object(); + if( obj["id"].as() == graphene::chain::dynamic_global_property_id_type() ) + { + auto props = v.as(); + sync_with_trusted_node(props.head_block_number); + } + } + } + }, true); + // Go ahead and get in sync now, before subscribing chain::dynamic_global_property_object props = my->database_api->get_dynamic_global_properties(); sync_with_trusted_node(props.head_block_number); - my->database_api->subscribe_to_objects([this] (const fc::variant& v) { - auto props = v.as(); - sync_with_trusted_node(props.head_block_number); - }, {graphene::chain::dynamic_global_property_id_type()}); return; } catch (const fc::exception& e) { elog("Error during connection: ${e}", ("e", e.to_detail_string())); diff --git a/libraries/wallet/wallet.cpp b/libraries/wallet/wallet.cpp index 1b17b8d5..44c0a7c7 100644 --- a/libraries/wallet/wallet.cpp +++ b/libraries/wallet/wallet.cpp @@ -375,10 +375,6 @@ public: ("chain_id", _chain_id) ); } init_prototype_ops(); - _remote_db->subscribe_to_objects( [=]( const fc::variant& obj ) - { - fc::async([this]{resync();}, "Resync after block"); - }, {dynamic_global_property_id_type()} ); _wallet.chain_id = _chain_id; _wallet.ws_server = initial_data.ws_server; _wallet.ws_user = initial_data.ws_user; @@ -629,10 +625,7 @@ public: _keys[wif_pub_key] = wif_key; - if( _wallet.update_account(account) ) - _remote_db->subscribe_to_objects([this](const fc::variant& v) { - _wallet.update_account(v.as()); - }, {account.id}); + _wallet.update_account(account); _wallet.extra_keys[account.id].insert(wif_pub_key); @@ -649,17 +642,11 @@ public: if( ! fc::exists( wallet_filename ) ) return false; - if( !_wallet.my_accounts.empty() ) - _remote_db->unsubscribe_from_objects(_wallet.my_account_ids()); _wallet = fc::json::from_file( wallet_filename ).as< wallet_data >(); if( _wallet.chain_id != _chain_id ) FC_THROW( "Wallet chain ID does not match", ("wallet.chain_id", _wallet.chain_id) ("chain_id", _chain_id) ); - if( !_wallet.my_accounts.empty() ) - _remote_db->subscribe_to_objects([this](const fc::variant& v) { - _wallet.update_account(v.as()); - }, _wallet.my_account_ids()); return true; } void save_wallet_file(string wallet_filename = "") diff --git a/programs/CMakeLists.txt b/programs/CMakeLists.txt index 75bb6af3..ab267cb1 100644 --- a/programs/CMakeLists.txt +++ b/programs/CMakeLists.txt @@ -6,7 +6,7 @@ add_subdirectory( size_checker ) set(BUILD_QT_GUI FALSE CACHE BOOL "Build the Qt-based light client GUI") if(BUILD_QT_GUI) - add_subdirectory(light_client) +# add_subdirectory(light_client) endif() set(BUILD_WEB_NODE FALSE CACHE BOOL "Build the Qt-based full node with web GUI") if(BUILD_WEB_NODE) diff --git a/programs/js_operation_serializer/main.cpp b/programs/js_operation_serializer/main.cpp index 8f01f70c..57bfe0f3 100644 --- a/programs/js_operation_serializer/main.cpp +++ b/programs/js_operation_serializer/main.cpp @@ -24,6 +24,7 @@ #include #include #include +#include #include #include diff --git a/programs/size_checker/main.cpp b/programs/size_checker/main.cpp index 9ef4420a..0d3fc7f1 100644 --- a/programs/size_checker/main.cpp +++ b/programs/size_checker/main.cpp @@ -64,6 +64,20 @@ int main( int argc, char** argv ) { graphene::chain::operation op; + + vector witnesses; witnesses.resize(50); + for( uint32_t i = 0; i < 60*60*24*30; ++i ) + { + witnesses[ rand() % 50 ]++; + } + + std::sort( witnesses.begin(), witnesses.end() ); + idump((witnesses.back() - witnesses.front()) ); + idump((60*60*24*30/50)); + idump(("deviation: ")((60*60*24*30/50-witnesses.front())/(60*60*24*30/50.0))); + + idump( (witnesses) ); + for( int32_t i = 0; i < op.count(); ++i ) { op.set_which(i); diff --git a/tests/intense/block_tests.cpp b/tests/intense/block_tests.cpp index 7e975537..be26b9ce 100644 --- a/tests/intense/block_tests.cpp +++ b/tests/intense/block_tests.cpp @@ -313,93 +313,6 @@ BOOST_FIXTURE_TEST_CASE( witness_order_mc_test, database_fixture ) } } -/** - * To have a secure random number we need to ensure that the same - * witness does not get to produce two blocks in a row. There is - * always a chance that the last witness of one round will be the - * first witness of the next round. - * - * This means that when we shuffle witness we need to make sure - * that there is at least N/2 witness between consecutive turns - * of the same witness. This means that durring the random - * shuffle we need to restrict the placement of witness to maintain - * this invariant. - * - * This test checks the requirement using Monte Carlo approach - * (produce lots of blocks and check the invariant holds). - */ -BOOST_FIXTURE_TEST_CASE( generic_scheduler_mc_test, database_fixture ) -{ - try { - size_t num_witnesses = db.get_global_properties().active_witnesses.size(); - size_t dmin = num_witnesses >> 1; - witness_scheduler_rng rng( - // - - - - + - - - - 1 - - - - + - - - - 2 - - - - + - - - - "\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0" - ); - witness_scheduler scheduler; - vector< witness_id_type > witness_ids; - - witness_ids.reserve( num_witnesses ); - for( size_t i=0; i cur_round; - vector< witness_id_type > full_schedule; - // if we make the maximum witness count testable, - // we'll need to enlarge this. - std::bitset< 0x40 > witness_seen; - size_t total_blocks = 1000000; - - cur_round.reserve( num_witnesses ); - full_schedule.reserve( total_blocks ); - - // we assert so the test doesn't continue, which would - // corrupt memory - assert( num_witnesses <= witness_seen.size() ); - - while( full_schedule.size() < total_blocks ) - { - scheduler.produce_schedule( rng ); - witness_id_type wid = scheduler.consume_schedule(); - full_schedule.push_back( wid ); - cur_round.push_back( wid ); - if( cur_round.size() == num_witnesses ) - { - // check that the current round contains exactly 1 copy - // of each witness - witness_seen.reset(); - for( const witness_id_type& w : cur_round ) - { - uint64_t inst = w.instance.value; - BOOST_CHECK( !witness_seen.test( inst ) ); - assert( !witness_seen.test( inst ) ); - witness_seen.set( inst ); - } - cur_round.clear(); - } - } - - for( size_t i=0,m=full_schedule.size(); i