From 11a5d2b620f05c7134dc4d3423b47ba19be9c2af Mon Sep 17 00:00:00 2001 From: Eric Frias Date: Thu, 16 Jul 2015 15:30:05 -0400 Subject: [PATCH] When the p2p code processes a block that contains transactions we haven't seen, avoid fetching those transactions separately --- libraries/app/application.cpp | 35 +++++++-- libraries/chain/db_block.cpp | 4 +- libraries/net/include/graphene/net/node.hpp | 36 +++++----- libraries/net/node.cpp | 80 +++++++++++++++------ libraries/wallet/wallet.cpp | 5 +- 5 files changed, 111 insertions(+), 49 deletions(-) diff --git a/libraries/app/application.cpp b/libraries/app/application.cpp index 305b689a..227a83db 100644 --- a/libraries/app/application.cpp +++ b/libraries/app/application.cpp @@ -312,11 +312,29 @@ namespace detail { * * @throws exception if error validating the item, otherwise the item is safe to broadcast on. */ - virtual bool handle_block(const graphene::net::block_message& blk_msg, bool sync_mode) override + virtual bool handle_block(const graphene::net::block_message& blk_msg, bool sync_mode, + std::vector& contained_transaction_message_ids) override { try { ilog("Got block #${n} from network", ("n", blk_msg.block.block_num())); try { - return _chain_db->push_block(blk_msg.block, _is_block_producer? database::skip_nothing : database::skip_transaction_signatures); + bool result = _chain_db->push_block(blk_msg.block, _is_block_producer ? database::skip_nothing : database::skip_transaction_signatures); + + // the block was accepted, so we now know all of the transactions contained in the block + if (!sync_mode) + { + // if we're not in sync mode, there's a chance we will be seeing some transactions + // included in blocks before we see the free-floating transaction itself. If that + // happens, there's no reason to fetch the transactions, so construct a list of the + // transaction message ids we no longer need. + // during sync, it is unlikely that we'll see any old + for (const processed_transaction& transaction : blk_msg.block.transactions) + { + graphene::net::trx_message transaction_message(transaction); + contained_transaction_message_ids.push_back(graphene::net::message(transaction_message).id()); + } + } + + return result; } catch( const fc::exception& e ) { elog("Error when pushing block:\n${e}", ("e", e.to_detail_string())); throw; @@ -329,12 +347,17 @@ namespace detail { } } FC_CAPTURE_AND_RETHROW( (blk_msg)(sync_mode) ) } - virtual bool handle_transaction(const graphene::net::trx_message& trx_msg, bool sync_mode) override + virtual void handle_transaction(const graphene::net::trx_message& transaction_message) override { try { ilog("Got transaction from network"); - _chain_db->push_transaction( trx_msg.trx ); - return false; - } FC_CAPTURE_AND_RETHROW( (trx_msg)(sync_mode) ) } + _chain_db->push_transaction( transaction_message.trx ); + } FC_CAPTURE_AND_RETHROW( (transaction_message) ) } + + virtual void handle_message(const message& message_to_process) override + { + // not a transaction, not a block + FC_THROW( "Invalid Message Type" ); + } /** * Assuming all data elements are ordered in some way, this method should diff --git a/libraries/chain/db_block.cpp b/libraries/chain/db_block.cpp index dccabbb0..b08b0959 100644 --- a/libraries/chain/db_block.cpp +++ b/libraries/chain/db_block.cpp @@ -97,7 +97,7 @@ bool database::_push_block(const signed_block& new_block) uint32_t skip = get_node_properties().skip_flags; if( !(skip&skip_fork_db) ) { - auto new_head = _fork_db.push_block(new_block); + shared_ptr new_head = _fork_db.push_block(new_block); //If the head block from the longest chain does not build off of the current head, we need to switch forks. if( new_head->data.previous != head_block_id() ) { @@ -116,7 +116,7 @@ bool database::_push_block(const signed_block& new_block) { optional except; try { - auto session = _undo_db.start_undo_session(); + undo_database::session session = _undo_db.start_undo_session(); apply_block( (*ritr)->data, skip ); _block_id_to_block.store( (*ritr)->id, (*ritr)->data ); session.commit(); diff --git a/libraries/net/include/graphene/net/node.hpp b/libraries/net/include/graphene/net/node.hpp index b5bfdeb7..22ef496e 100644 --- a/libraries/net/include/graphene/net/node.hpp +++ b/libraries/net/include/graphene/net/node.hpp @@ -62,8 +62,7 @@ namespace graphene { namespace net { virtual bool has_item( const net::item_id& id ) = 0; /** - * @brief allows the application to validate an item prior to - * broadcasting to peers. + * @brief Called when a new block comes in from the network * * @param sync_mode true if the message was fetched through the sync process, false during normal operation * @returns true if this message caused the blockchain to switch forks, false if it did not @@ -71,21 +70,26 @@ namespace graphene { namespace net { * @throws exception if error validating the item, otherwise the item is * safe to broadcast on. */ - virtual bool handle_block( const graphene::net::block_message& blk_msg, bool syncmode ) = 0; - virtual bool handle_transaction( const graphene::net::trx_message& trx_msg, bool syncmode ) = 0; + virtual bool handle_block( const graphene::net::block_message& blk_msg, bool sync_mode, + std::vector& contained_transaction_message_ids ) = 0; + + /** + * @brief Called when a new transaction comes in from the network + * + * @throws exception if error validating the item, otherwise the item is + * safe to broadcast on. + */ + virtual void handle_transaction( const graphene::net::trx_message& trx_msg ) = 0; - virtual bool handle_message( const message& message_to_process, bool sync_mode ) - { - switch( message_to_process.msg_type ) - { - case block_message_type: - return handle_block(message_to_process.as(), sync_mode); - case trx_message_type: - return handle_transaction(message_to_process.as(), sync_mode); - default: - FC_THROW( "Invalid Message Type" ); - }; - } + /** + * @brief Called when a new message comes in from the network other than a + * block or a transaction. Currently there are no other possible + * messages, so this should never be called. + * + * @throws exception if error validating the item, otherwise the item is + * safe to broadcast on. + */ + virtual void handle_message( const message& message_to_process ) = 0; /** * Assuming all data elements are ordered in some way, this method should diff --git a/libraries/net/node.cpp b/libraries/net/node.cpp index 9e84907b..4c568d76 100644 --- a/libraries/net/node.cpp +++ b/libraries/net/node.cpp @@ -276,6 +276,8 @@ namespace graphene { namespace net { namespace detail { boost::accumulators::tag::count> > call_stats_accumulator; #define NODE_DELEGATE_METHOD_NAMES (has_item) \ (handle_message) \ + (handle_block) \ + (handle_transaction) \ (get_item_ids) \ (get_item) \ (get_chain_id) \ @@ -367,9 +369,9 @@ namespace graphene { namespace net { namespace detail { fc::variant_object get_call_statistics(); bool has_item( const net::item_id& id ) override; - bool handle_message( const message&, bool sync_mode ) override; - bool handle_block( const graphene::net::block_message& blk_msg, bool syncmode ) override; - bool handle_transaction( const graphene::net::trx_message& trx_msg, bool syncmode ) override; + void handle_message( const message& ) override; + bool handle_block( const graphene::net::block_message& block_message, bool sync_mode, std::vector& contained_transaction_message_ids ) override; + void handle_transaction( const graphene::net::trx_message& transaction_message ) override; std::vector get_item_ids(uint32_t item_type, const std::vector& blockchain_synopsis, uint32_t& remaining_item_count, @@ -448,9 +450,11 @@ namespace graphene { namespace net { namespace detail { bool _items_to_fetch_updated; fc::future _fetch_item_loop_done; + struct item_id_index{}; typedef boost::multi_index_container >, - boost::multi_index::hashed_unique, + boost::multi_index::hashed_unique, + boost::multi_index::member, std::hash > > > items_to_fetch_set_type; unsigned _items_to_fetch_sequence_counter; @@ -2777,7 +2781,8 @@ namespace graphene { namespace net { namespace detail { try { - _delegate->handle_block(block_message_to_send, true); + std::vector contained_transaction_message_ids; + _delegate->handle_block(block_message_to_send, true, contained_transaction_message_ids); ilog("Successfully pushed sync block ${num} (id:${id})", ("num", block_message_to_send.block.block_num()) ("id", block_message_to_send.block_id)); @@ -3095,12 +3100,31 @@ namespace graphene { namespace net { namespace detail { if (std::find(_most_recent_blocks_accepted.begin(), _most_recent_blocks_accepted.end(), block_message_to_process.block_id) == _most_recent_blocks_accepted.end()) { - _delegate->handle_block(block_message_to_process, false); + std::vector contained_transaction_message_ids; + _delegate->handle_block(block_message_to_process, false, contained_transaction_message_ids); message_validated_time = fc::time_point::now(); ilog("Successfully pushed block ${num} (id:${id})", ("num", block_message_to_process.block.block_num()) ("id", block_message_to_process.block_id)); _most_recent_blocks_accepted.push_back(block_message_to_process.block_id); + + bool new_transaction_discovered = false; + for (const item_hash_t& transaction_message_hash : contained_transaction_message_ids) + { + size_t items_erased = _items_to_fetch.get().erase(item_id(trx_message_type, transaction_message_hash)); + // there are two ways we could behave here: we could either act as if we received + // the transaction outside the block and offer it to our peers, or we could just + // forget about it (we would still advertise this block to our peers so they should + // get the transaction through that mechanism). + // We take the second approach, bring in the next if block to try the first approach + //if (items_erased) + //{ + // new_transaction_discovered = true; + // _new_inventory.insert(item_id(trx_message_type, transaction_message_hash)); + //} + } + if (new_transaction_discovered) + trigger_advertise_inventory_loop(); } else dlog( "Already received and accepted this block (presumably through sync mechanism), treating it as accepted" ); @@ -3532,7 +3556,10 @@ namespace graphene { namespace net { namespace detail { fc::time_point message_validated_time; try { - _delegate->handle_message( message_to_process, false ); + if (message_to_process.msg_type == trx_message_type) + _delegate->handle_transaction( message_to_process.as() ); + else + _delegate->handle_message( message_to_process ); message_validated_time = fc::time_point::now(); } catch ( const insufficient_relay_fee& ) @@ -4914,8 +4941,8 @@ namespace graphene { namespace net { namespace detail { INVOKE_IN_IMPL(clear_peer_database); } - void node::set_total_bandwidth_limit( uint32_t upload_bytes_per_second, - uint32_t download_bytes_per_second ) + void node::set_total_bandwidth_limit(uint32_t upload_bytes_per_second, + uint32_t download_bytes_per_second) { INVOKE_IN_IMPL(set_total_bandwidth_limit, upload_bytes_per_second, download_bytes_per_second); } @@ -4968,7 +4995,16 @@ namespace graphene { namespace net { namespace detail { { try { - destination_node->delegate->handle_message(destination_node->messages_to_deliver.front(), false); + const message& message_to_deliver = destination_node->messages_to_deliver.front(); + if (message_to_deliver.msg_type == trx_message_type) + destination_node->delegate->handle_transaction(message_to_deliver.as()); + else if (message_to_deliver.msg_type == block_message_type) + { + std::vector contained_transaction_message_ids; + destination_node->delegate->handle_block(message_to_deliver.as(), false, contained_transaction_message_ids); + } + else + destination_node->delegate->handle_message(message_to_deliver); } catch ( const fc::exception& e ) { @@ -5098,27 +5134,25 @@ namespace graphene { namespace net { namespace detail { INVOKE_AND_COLLECT_STATISTICS(has_item, id); } - bool statistics_gathering_node_delegate_wrapper::handle_message( const message& message_to_handle, bool sync_mode ) + void statistics_gathering_node_delegate_wrapper::handle_message( const message& message_to_handle ) { - INVOKE_AND_COLLECT_STATISTICS(handle_message, message_to_handle, sync_mode); + INVOKE_AND_COLLECT_STATISTICS(handle_message, message_to_handle); } - bool statistics_gathering_node_delegate_wrapper::handle_block( const graphene::net::block_message& blk_msg, bool syncmode ) + bool statistics_gathering_node_delegate_wrapper::handle_block( const graphene::net::block_message& block_message, bool sync_mode, std::vector& contained_transaction_message_ids) { - if (_thread->is_current()) { return _node_delegate->handle_block(blk_msg,syncmode); } - else return _thread->async([&](){ return _node_delegate->handle_block(blk_msg,syncmode); }, "invoke handle_block").wait(); + INVOKE_AND_COLLECT_STATISTICS(handle_block, block_message, sync_mode, contained_transaction_message_ids); } - bool statistics_gathering_node_delegate_wrapper::handle_transaction( const graphene::net::trx_message& trx_msg, bool syncmode ) + void statistics_gathering_node_delegate_wrapper::handle_transaction( const graphene::net::trx_message& transaction_message ) { - if (_thread->is_current()) { return _node_delegate->handle_transaction(trx_msg,syncmode); } - else return _thread->async([&](){ return _node_delegate->handle_transaction(trx_msg,syncmode); }, "invoke handle_transaction").wait(); + INVOKE_AND_COLLECT_STATISTICS(handle_transaction, transaction_message); } std::vector statistics_gathering_node_delegate_wrapper::get_item_ids(uint32_t item_type, - const std::vector& blockchain_synopsis, - uint32_t& remaining_item_count, - uint32_t limit /* = 2000 */) + const std::vector& blockchain_synopsis, + uint32_t& remaining_item_count, + uint32_t limit /* = 2000 */) { INVOKE_AND_COLLECT_STATISTICS(get_item_ids, item_type, blockchain_synopsis, remaining_item_count, limit); } @@ -5134,8 +5168,8 @@ namespace graphene { namespace net { namespace detail { } std::vector statistics_gathering_node_delegate_wrapper::get_blockchain_synopsis(uint32_t item_type, - const graphene::net::item_hash_t& reference_point /* = graphene::net::item_hash_t() */, - uint32_t number_of_blocks_after_reference_point /* = 0 */) + const graphene::net::item_hash_t& reference_point /* = graphene::net::item_hash_t() */, + uint32_t number_of_blocks_after_reference_point /* = 0 */) { INVOKE_AND_COLLECT_STATISTICS(get_blockchain_synopsis, item_type, reference_point, number_of_blocks_after_reference_point); } diff --git a/libraries/wallet/wallet.cpp b/libraries/wallet/wallet.cpp index c3e5b43c..03968f22 100644 --- a/libraries/wallet/wallet.cpp +++ b/libraries/wallet/wallet.cpp @@ -1707,8 +1707,9 @@ public: dbg_make_uia(master.name, "SHILL"); } catch(...) {/* Ignore; the asset probably already exists.*/} - fc::time_point start = fc::time_point::now(); for( int i = 0; i < number_of_accounts; ++i ) - create_account_with_private_key(key, prefix + fc::to_string(i), master.name, master.name, true, false); + fc::time_point start = fc::time_point::now(); + for( int i = 0; i < number_of_accounts; ++i ) + create_account_with_private_key(key, prefix + fc::to_string(i), master.name, master.name, /* broadcast = */ true, /* save wallet = */ false); fc::time_point end = fc::time_point::now(); ilog("Created ${n} accounts in ${time} milliseconds", ("n", number_of_accounts)("time", (end - start).count() / 1000));