diff --git a/libraries/app/application.cpp b/libraries/app/application.cpp index e8a266c4..f27e1c6c 100644 --- a/libraries/app/application.cpp +++ b/libraries/app/application.cpp @@ -27,6 +27,7 @@ #include #include +#include #include @@ -48,6 +49,8 @@ #include #include +#include + namespace graphene { namespace app { using net::item_hash_t; using net::item_id; @@ -380,6 +383,10 @@ namespace detail { ilog("Got block #${n} from network", ("n", blk_msg.block.block_num())); try { + // TODO: in the case where this block is valid but on a fork that's too old for us to switch to, + // you can help the network code out by throwing a block_older_than_undo_history exception. + // when the net code sees that, it will stop trying to push blocks from that chain, but + // leave that peer connected so that they can get sync blocks from us bool result = _chain_db->push_block(blk_msg.block, _is_block_producer ? database::skip_nothing : database::skip_transaction_signatures); // the block was accepted, so we now know all of the transactions contained in the block @@ -422,6 +429,13 @@ namespace detail { FC_THROW( "Invalid Message Type" ); } + bool is_included_block(const block_id_type& block_id) + { + uint32_t block_num = block_header::num_from_id(block_id); + block_id_type block_id_in_preferred_chain = _chain_db->get_block_id_for_num(block_num); + return block_id == block_id_in_preferred_chain; + } + /** * Assuming all data elements are ordered in some way, this method should * return up to limit ids that occur *after* the last ID in synopsis that @@ -431,31 +445,42 @@ namespace detail { * in our blockchain after the last item returned in the result, * or 0 if the result contains the last item in the blockchain */ - virtual std::vector get_item_ids(uint32_t item_type, - const std::vector& blockchain_synopsis, - uint32_t& remaining_item_count, - uint32_t limit) override + virtual std::vector get_block_ids(const std::vector& blockchain_synopsis, + uint32_t& remaining_item_count, + uint32_t limit) override { try { - FC_ASSERT( item_type == graphene::net::block_message_type ); - vector result; + vector result; remaining_item_count = 0; if( _chain_db->head_block_num() == 0 ) return result; result.reserve(limit); block_id_type last_known_block_id; - auto itr = blockchain_synopsis.rbegin(); - while( itr != blockchain_synopsis.rend() ) + + if (blockchain_synopsis.empty() || + (blockchain_synopsis.size() == 1 && blockchain_synopsis[0] == block_id_type())) { - if( _chain_db->is_known_block(*itr) || *itr == block_id_type() ) - { - last_known_block_id = *itr; - break; - } - ++itr; - } + // peer has sent us an empty synopsis meaning they have no blocks. + // A bug in old versions would cause them to send a synopsis containing block 000000000 + // when they had an empty blockchain, so pretend they sent the right thing here. - for( auto num = block_header::num_from_id(last_known_block_id); + // do nothing, leave last_known_block_id set to zero + } + else + { + bool found_a_block_in_synopsis = false; + for (const item_hash_t& block_id_in_synopsis : boost::adaptors::reverse(blockchain_synopsis)) + if (block_id_in_synopsis == block_id_type() || + (_chain_db->is_known_block(block_id_in_synopsis) && is_included_block(block_id_in_synopsis))) + { + last_known_block_id = block_id_in_synopsis; + found_a_block_in_synopsis = true; + break; + } + if (!found_a_block_in_synopsis) + FC_THROW_EXCEPTION(graphene::net::peer_is_on_an_unreachable_fork, "Unable to provide a list of blocks starting at any of the blocks in peer's synopsis"); + } + for( uint32_t num = block_header::num_from_id(last_known_block_id); num <= _chain_db->head_block_num() && result.size() < limit; ++num ) if( num > 0 ) @@ -492,38 +517,180 @@ namespace detail { } /** - * Returns a synopsis of the blockchain used for syncing. - * This consists of a list of selected item hashes from our current preferred - * blockchain, exponentially falling off into the past. Horrible explanation. + * Returns a synopsis of the blockchain used for syncing. This consists of a list of + * block hashes at intervals exponentially increasing towards the genesis block. + * When syncing to a peer, the peer uses this data to determine if we're on the same + * fork as they are, and if not, what blocks they need to send us to get us on their + * fork. * - * If the blockchain is empty, it will return the empty list. - * If the blockchain has one block, it will return a list containing just that block. - * If it contains more than one block: - * the first element in the list will be the hash of the genesis block - * the second element will be the hash of an item at the half way point in the blockchain - * the third will be ~3/4 of the way through the block chain - * the fourth will be at ~7/8... - * &c. - * the last item in the list will be the hash of the most recent block on our preferred chain + * In the over-simplified case, this is a straighforward synopsis of our current + * preferred blockchain; when we first connect up to a peer, this is what we will be sending. + * It looks like this: + * If the blockchain is empty, it will return the empty list. + * If the blockchain has one block, it will return a list containing just that block. + * If it contains more than one block: + * the first element in the list will be the hash of the highest numbered block that + * we cannot undo + * the second element will be the hash of an item at the half way point in the undoable + * segment of the blockchain + * the third will be ~3/4 of the way through the undoable segment of the block chain + * the fourth will be at ~7/8... + * &c. + * the last item in the list will be the hash of the most recent block on our preferred chain + * so if the blockchain had 26 blocks labeled a - z, the synopsis would be: + * a n u x z + * the idea being that by sending a small (<30) number of block ids, we can summarize a huge + * blockchain. The block ids are more dense near the end of the chain where because we are + * more likely to be almost in sync when we first connect, and forks are likely to be short. + * If the peer we're syncing with in our example is on a fork that started at block 'v', + * then they will reply to our synopsis with a list of all blocks starting from block 'u', + * the last block they know that we had in common. + * + * In the real code, there are several complications. + * + * First, as an optimization, we don't usually send a synopsis of the entire blockchain, we + * send a synopsis of only the segment of the blockchain that we have undo data for. If their + * fork doesn't build off of something in our undo history, we would be unable to switch, so there's + * no reason to fetch the blocks. + * + * Second, when a peer replies to our initial synopsis and gives us a list of the blocks they think + * we are missing, they only send a chunk of a few thousand blocks at once. After we get those + * block ids, we need to request more blocks by sending another synopsis (we can't just say "send me + * the next 2000 ids" because they may have switched forks themselves and they don't track what + * they've sent us). For faster performance, we want to get a fairly long list of block ids first, + * then start downloading the blocks. + * The peer doesn't handle these follow-up block id requests any different from the initial request; + * it treats the synopsis we send as our blockchain and bases its response entirely off that. So to + * get the response we want (the next chunk of block ids following the last one they sent us, or, + * failing that, the shortest fork off of the last list of block ids they sent), we need to construct + * a synopsis as if our blockchain was made up of: + * 1. the blocks in our block chain up to the fork point (if there is a fork) or the head block (if no fork) + * 2. the blocks we've already pushed from their fork (if there's a fork) + * 3. the block ids they've previously sent us + * Segment 3 is handled in the p2p code, it just tells us the number of blocks it has (in + * number_of_blocks_after_reference_point) so we can leave space in the synopsis for them. + * We're responsible for constructing the synopsis of Segments 1 and 2 from our active blockchain and + * fork database. The reference_point parameter is the last block from that peer that has been + * successfully pushed to the blockchain, so that tells us whether the peer is on a fork or on + * the main chain. */ - virtual std::vector get_blockchain_synopsis(uint32_t item_type, - const graphene::net::item_hash_t& reference_point, + virtual std::vector get_blockchain_synopsis(const item_hash_t& reference_point, uint32_t number_of_blocks_after_reference_point) override { try { - std::vector result; - result.reserve(30); - uint32_t head_block_num = _chain_db->head_block_num(); - result.push_back(_chain_db->head_block_id()); - uint32_t current = 1; - while( current < head_block_num ) - { - result.push_back(_chain_db->get_block_id_for_num(head_block_num - current)); - current = current*2; - } - std::reverse( result.begin(), result.end() ); - //idump((reference_point)(number_of_blocks_after_reference_point)(result)); - return result; - } FC_CAPTURE_AND_RETHROW( (reference_point)(number_of_blocks_after_reference_point) ) } + std::vector synopsis; + synopsis.reserve(30); + uint32_t high_block_num; + uint32_t non_fork_high_block_num; + uint32_t low_block_num = _chain_db->last_non_undoable_block_num(); + std::vector fork_history; + + if (reference_point != item_hash_t()) + { + // the node is asking for a summary of the block chain up to a specified + // block, which may or may not be on a fork + // for now, assume it's not on a fork + if (is_included_block(reference_point)) + { + // reference_point is a block we know about and is on the main chain + uint32_t reference_point_block_num = block_header::num_from_id(reference_point); + assert(reference_point_block_num > 0); + high_block_num = reference_point_block_num; + non_fork_high_block_num = high_block_num; + + if (reference_point_block_num < low_block_num) + { + // we're on the same fork (at least as far as reference_point) but we've passed + // reference point and could no longer undo that far if we diverged after that + // block. This should probably only happen due to a race condition where + // the network thread calls this function, and then immediately pushes a bunch of blocks, + // then the main thread finally processes this function. + // with the current framework, there's not much we can do to tell the network + // thread what our current head block is, so we'll just pretend that + // our head is actually the reference point. + // this *may* enable us to fetch blocks that we're unable to push, but that should + // be a rare case (and correctly handled) + low_block_num = reference_point_block_num; + } + } + else + { + // block is a block we know about, but it is on a fork + try + { + std::vector fork_history = _chain_db->get_block_ids_on_fork(reference_point); + // returns a vector where the first element is the common ancestor with the preferred chain, + // and the last element is the reference point you passed in + assert(fork_history.size() >= 2); + assert(fork_history.back() == reference_point); + block_id_type last_non_fork_block = fork_history.front(); + fork_history.erase(fork_history.begin()); // remove the common ancestor + + if (last_non_fork_block == block_id_type()) // if the fork goes all the way back to genesis (does graphene's fork db allow this?) + non_fork_high_block_num = 0; + else + non_fork_high_block_num = block_header::num_from_id(last_non_fork_block); + + high_block_num = non_fork_high_block_num + fork_history.size(); + assert(high_block_num == block_header::num_from_id(fork_history.back())); + } + catch (const fc::exception& e) + { + // unable to get fork history for some reason. maybe not linked? + // we can't return a synopsis of its chain + elog("Unable to construct a blockchain synopsis for reference hash ${hash}: ${exception}", ("hash", reference_point)("exception", e)); + throw; + } + if (non_fork_high_block_num < low_block_num) + { + wlog("Unable to generate a usable synopsis because the peer we're generating it for forked too long ago " + "(our chains diverge after block #${non_fork_high_block_num} but only undoable to block #${low_block_num})", + ("low_block_num", low_block_num) + ("non_fork_high_block_num", non_fork_high_block_num)); + FC_THROW_EXCEPTION(graphene::net::block_older_than_undo_history, "Peer is are on a fork I'm unable to switch to"); + } + } + } + else + { + // no reference point specified, summarize the whole block chain + high_block_num = _chain_db->head_block_num(); + non_fork_high_block_num = high_block_num; + if (high_block_num == 0) + return synopsis; // we have no blocks + } + + // at this point: + // low_block_num is the block before the first block we can undo, + // non_fork_high_block_num is the block before the fork (if the peer is on a fork, or otherwise it is the same as high_block_num) + // high_block_num is the block number of the reference block, or the end of the chain if no reference provided + + // true_high_block_num is the ending block number after the network code appends any item ids it + // knows about that we don't + uint32_t true_high_block_num = high_block_num + number_of_blocks_after_reference_point; + do + { + // for each block in the synopsis, figure out where to pull the block id from. + // if it's <= non_fork_high_block_num, we grab it from the main blockchain; + // if it's not, we pull it from the fork history + if (low_block_num <= non_fork_high_block_num) + synopsis.push_back(_chain_db->get_block_id_for_num(low_block_num)); + else + { + // for debugging + int index = low_block_num - non_fork_high_block_num - 1; + if (index < 0 || index > fork_history.size()) + { + int i = 0; + } + synopsis.push_back(fork_history[low_block_num - non_fork_high_block_num - 1]); + } + low_block_num += (true_high_block_num - low_block_num + 2) / 2; + } + while (low_block_num <= high_block_num); + + idump((synopsis)); + return synopsis; + } FC_CAPTURE_AND_RETHROW() } /** * Call this after the call to handle_message succeeds. diff --git a/libraries/chain/db_block.cpp b/libraries/chain/db_block.cpp index 7bed189d..9014300e 100644 --- a/libraries/chain/db_block.cpp +++ b/libraries/chain/db_block.cpp @@ -76,6 +76,16 @@ const signed_transaction& database::get_recent_transaction(const transaction_id_ return itr->trx; } +std::vector database::get_block_ids_on_fork(block_id_type head_of_fork) const +{ + pair branches = _fork_db.fetch_branch_from(head_block_id(), head_of_fork); + assert(branches.first.back()->id == branches.second.back()->id); + std::vector result; + for (const item_ptr& fork_block : branches.second) + result.emplace_back(fork_block->id); + return result; +} + /** * Push block "may fail" in which case every partial change is unwound. After * push block is successful the block is appended to the chain database on disk. diff --git a/libraries/chain/db_getter.cpp b/libraries/chain/db_getter.cpp index d9a08105..4cd842dc 100644 --- a/libraries/chain/db_getter.cpp +++ b/libraries/chain/db_getter.cpp @@ -84,4 +84,10 @@ node_property_object& database::node_properties() return _node_property_object; } +uint32_t database::last_non_undoable_block_num() const +{ + return head_block_num() - _undo_db.size(); +} + + } } diff --git a/libraries/chain/include/graphene/chain/database.hpp b/libraries/chain/include/graphene/chain/database.hpp index abfd1936..4445654a 100644 --- a/libraries/chain/include/graphene/chain/database.hpp +++ b/libraries/chain/include/graphene/chain/database.hpp @@ -112,6 +112,7 @@ namespace graphene { namespace chain { optional fetch_block_by_id( const block_id_type& id )const; optional fetch_block_by_number( uint32_t num )const; const signed_transaction& get_recent_transaction( const transaction_id_type& trx_id )const; + std::vector get_block_ids_on_fork(block_id_type head_of_fork) const; /** * Calculate the percent of block production slots that were missed in the @@ -245,6 +246,8 @@ namespace graphene { namespace chain { node_property_object& node_properties(); + + uint32_t last_non_undoable_block_num() const; //////////////////// db_init.cpp //////////////////// void initialize_evaluators(); diff --git a/libraries/net/include/graphene/net/exceptions.hpp b/libraries/net/include/graphene/net/exceptions.hpp index 7985ebc0..7750455d 100644 --- a/libraries/net/include/graphene/net/exceptions.hpp +++ b/libraries/net/include/graphene/net/exceptions.hpp @@ -26,5 +26,6 @@ namespace graphene { namespace net { FC_DECLARE_DERIVED_EXCEPTION( insufficient_relay_fee, graphene::net::net_exception, 90002, "insufficient relay fee" ); FC_DECLARE_DERIVED_EXCEPTION( already_connected_to_requested_peer, graphene::net::net_exception, 90003, "already connected to requested peer" ); FC_DECLARE_DERIVED_EXCEPTION( block_older_than_undo_history, graphene::net::net_exception, 90004, "block is older than our undo history allows us to process" ); + FC_DECLARE_DERIVED_EXCEPTION( peer_is_on_an_unreachable_fork, graphene::net::net_exception, 90005, "peer is on another fork" ); } } diff --git a/libraries/net/include/graphene/net/node.hpp b/libraries/net/include/graphene/net/node.hpp index c8e53f00..71ff0c55 100644 --- a/libraries/net/include/graphene/net/node.hpp +++ b/libraries/net/include/graphene/net/node.hpp @@ -99,10 +99,9 @@ namespace graphene { namespace net { * in our blockchain after the last item returned in the result, * or 0 if the result contains the last item in the blockchain */ - virtual std::vector get_item_ids(uint32_t item_type, - const std::vector& blockchain_synopsis, - uint32_t& remaining_item_count, - uint32_t limit = 2000) = 0; + virtual std::vector get_block_ids(const std::vector& blockchain_synopsis, + uint32_t& remaining_item_count, + uint32_t limit = 2000) = 0; /** * Given the hash of the requested data, fetch the body. @@ -119,14 +118,17 @@ namespace graphene { namespace net { * If the blockchain is empty, it will return the empty list. * If the blockchain has one block, it will return a list containing just that block. * If it contains more than one block: - * the first element in the list will be the hash of the genesis block - * the second element will be the hash of an item at the half way point in the blockchain - * the third will be ~3/4 of the way through the block chain + * the first element in the list will be the hash of the highest numbered block that + * we cannot undo + * the second element will be the hash of an item at the half way point in the undoable + * segment of the blockchain + * the third will be ~3/4 of the way through the undoable segment of the block chain * the fourth will be at ~7/8... * &c. * the last item in the list will be the hash of the most recent block on our preferred chain */ - virtual std::vector get_blockchain_synopsis(uint32_t item_type, const graphene::net::item_hash_t& reference_point = graphene::net::item_hash_t(), uint32_t number_of_blocks_after_reference_point = 0) = 0; + virtual std::vector get_blockchain_synopsis(const item_hash_t& reference_point, + uint32_t number_of_blocks_after_reference_point) = 0; /** * Call this after the call to handle_message succeeds. diff --git a/libraries/net/include/graphene/net/peer_connection.hpp b/libraries/net/include/graphene/net/peer_connection.hpp index ed706b8a..4e506026 100644 --- a/libraries/net/include/graphene/net/peer_connection.hpp +++ b/libraries/net/include/graphene/net/peer_connection.hpp @@ -219,7 +219,7 @@ namespace graphene { namespace net uint32_t number_of_unfetched_item_ids; /// number of items in the blockchain that follow ids_of_items_to_get but the peer hasn't yet told us their ids bool peer_needs_sync_items_from_us; bool we_need_sync_items_from_peer; - fc::optional > item_ids_requested_from_peer; /// we check this to detect a timed-out request and in busy() + fc::optional, fc::time_point> > item_ids_requested_from_peer; /// we check this to detect a timed-out request and in busy() item_to_time_map_type sync_items_requested_from_peer; /// ids of blocks we've requested from this peer during sync. fetch from another peer if this peer disconnects uint32_t last_block_number_delegate_has_seen; /// the number of the last block this peer has told us about that the delegate knows (ids_of_items_to_get[0] should be the id of block [this value + 1]) item_hash_t last_block_delegate_has_seen; /// the hash of the last block this peer has told us about that the peer knows diff --git a/libraries/net/node.cpp b/libraries/net/node.cpp index 6a8ef753..c109b434 100644 --- a/libraries/net/node.cpp +++ b/libraries/net/node.cpp @@ -37,6 +37,7 @@ #include #include #include +#include #include #include @@ -279,7 +280,7 @@ namespace graphene { namespace net { namespace detail { (handle_message) \ (handle_block) \ (handle_transaction) \ - (get_item_ids) \ + (get_block_ids) \ (get_item) \ (get_chain_id) \ (get_blockchain_synopsis) \ @@ -375,15 +376,13 @@ namespace graphene { namespace net { namespace detail { void handle_message( const message& ) override; bool handle_block( const graphene::net::block_message& block_message, bool sync_mode, std::vector& contained_transaction_message_ids ) override; void handle_transaction( const graphene::net::trx_message& transaction_message ) override; - std::vector get_item_ids(uint32_t item_type, - const std::vector& blockchain_synopsis, - uint32_t& remaining_item_count, - uint32_t limit = 2000) override; + std::vector get_block_ids(const std::vector& blockchain_synopsis, + uint32_t& remaining_item_count, + uint32_t limit = 2000) override; message get_item( const item_id& id ) override; chain_id_type get_chain_id() const override; - std::vector get_blockchain_synopsis(uint32_t item_type, - const graphene::net::item_hash_t& reference_point = graphene::net::item_hash_t(), - uint32_t number_of_blocks_after_reference_point = 0) override; + std::vector get_blockchain_synopsis(const item_hash_t& reference_point, + uint32_t number_of_blocks_after_reference_point) override; void sync_status( uint32_t item_type, uint32_t item_count ) override; void connection_count_changed( uint32_t c ) override; uint32_t get_block_number(const item_hash_t& block_id) override; @@ -1322,9 +1321,9 @@ namespace graphene { namespace net { namespace detail { active_peer->item_ids_requested_from_peer && active_peer->item_ids_requested_from_peer->get<1>() < active_ignored_request_threshold) { - wlog("Disconnecting peer ${peer} because they didn't respond to my request for sync item ids after ${id}", + wlog("Disconnecting peer ${peer} because they didn't respond to my request for sync item ids after ${synopsis}", ("peer", active_peer->get_remote_endpoint()) - ("id", active_peer->item_ids_requested_from_peer->get<0>().item_hash)); + ("synopsis", active_peer->item_ids_requested_from_peer->get<0>())); disconnect_due_to_request_timeout = true; } if (!disconnect_due_to_request_timeout) @@ -2145,20 +2144,38 @@ namespace graphene { namespace net { namespace detail { const fetch_blockchain_item_ids_message& fetch_blockchain_item_ids_message_received) { VERIFY_CORRECT_THREAD(); - item_id peers_last_item_seen; - if( !fetch_blockchain_item_ids_message_received.blockchain_synopsis.empty() ) - peers_last_item_seen = item_id( fetch_blockchain_item_ids_message_received.item_type, - fetch_blockchain_item_ids_message_received.blockchain_synopsis.back() ); - dlog( "sync: received a request for item ids after ${last_item_seen} from peer ${peer_endpoint} (full request: ${synopsis})", - ( "last_item_seen", peers_last_item_seen ) - ( "peer_endpoint", originating_peer->get_remote_endpoint() ) - ( "synopsis", fetch_blockchain_item_ids_message_received.blockchain_synopsis ) ); + item_id peers_last_item_seen = item_id(fetch_blockchain_item_ids_message_received.item_type, item_hash_t()); + if (fetch_blockchain_item_ids_message_received.blockchain_synopsis.empty()) + { + dlog("sync: received a request for item ids starting at the beginning of the chain from peer ${peer_endpoint} (full request: ${synopsis})", + ("peer_endpoint", originating_peer->get_remote_endpoint()) + ("synopsis", fetch_blockchain_item_ids_message_received.blockchain_synopsis)); + } + else + { + item_hash_t peers_last_item_hash_seen = fetch_blockchain_item_ids_message_received.blockchain_synopsis.back(); + dlog("sync: received a request for item ids after ${last_item_seen} from peer ${peer_endpoint} (full request: ${synopsis})", + ("last_item_seen", peers_last_item_hash_seen) + ("peer_endpoint", originating_peer->get_remote_endpoint()) + ("synopsis", fetch_blockchain_item_ids_message_received.blockchain_synopsis)); + peers_last_item_seen.item_hash = peers_last_item_hash_seen; + } blockchain_item_ids_inventory_message reply_message; - reply_message.item_hashes_available = _delegate->get_item_ids( fetch_blockchain_item_ids_message_received.item_type, - fetch_blockchain_item_ids_message_received.blockchain_synopsis, - reply_message.total_remaining_item_count ); reply_message.item_type = fetch_blockchain_item_ids_message_received.item_type; + reply_message.total_remaining_item_count = 0; + try + { + reply_message.item_hashes_available = _delegate->get_block_ids(fetch_blockchain_item_ids_message_received.blockchain_synopsis, + reply_message.total_remaining_item_count); + } + catch (const peer_is_on_an_unreachable_fork&) + { + dlog("Peer is on a fork and there's no set of blocks we can provide to switch them to our fork"); + // we reply with an empty list as if we had an empty blockchain; + // we don't want to disconnect because they may be able to provide + // us with blocks on their chain + } bool disconnect_from_inhibited_peer = false; // if our client doesn't have any items after the item the peer requested, it will send back @@ -2170,8 +2187,7 @@ namespace graphene { namespace net { namespace detail { reply_message.item_hashes_available.size() == 1 && std::find(fetch_blockchain_item_ids_message_received.blockchain_synopsis.begin(), fetch_blockchain_item_ids_message_received.blockchain_synopsis.end(), - reply_message.item_hashes_available.back() ) != fetch_blockchain_item_ids_message_received.blockchain_synopsis.end() - ) + reply_message.item_hashes_available.back() ) != fetch_blockchain_item_ids_message_received.blockchain_synopsis.end() ) { /* the last item in the peer's list matches the last item in our list */ originating_peer->peer_needs_sync_items_from_us = false; @@ -2197,7 +2213,6 @@ namespace graphene { namespace net { namespace detail { } else { - //dlog( "sync: peer is out of sync, sending peer ${count} items ids: ${item_ids}", ("count", reply_message.item_hashes_available.size() )("item_ids", reply_message.item_hashes_available ) ); dlog("sync: peer is out of sync, sending peer ${count} items ids: first: ${first_item_id}, last: ${last_item_id}", ("count", reply_message.item_hashes_available.size()) ("first_item_id", reply_message.item_hashes_available.front()) @@ -2270,24 +2285,35 @@ namespace graphene { namespace net { namespace detail { // This is pretty expensive, we should find a better way to do this std::unique_ptr > original_ids_of_items_to_get(new std::vector(peer->ids_of_items_to_get.begin(), peer->ids_of_items_to_get.end())); - std::vector synopsis = _delegate->get_blockchain_synopsis(_sync_item_type, reference_point, number_of_blocks_after_reference_point); - assert(reference_point == item_hash_t() || !synopsis.empty()); + std::vector synopsis = _delegate->get_blockchain_synopsis(reference_point, number_of_blocks_after_reference_point); - // if we passed in a reference point, we believe it is one the client has already accepted and should - // be able to generate a synopsis based on it - if( reference_point != item_hash_t() && synopsis.empty() ) - synopsis = _delegate->get_blockchain_synopsis( _sync_item_type, reference_point, number_of_blocks_after_reference_point ); +#if 0 + // just for debugging, enable this and set a breakpoint to step through + if (synopsis.empty()) + synopsis = _delegate->get_blockchain_synopsis(reference_point, number_of_blocks_after_reference_point); + + // TODO: it's possible that the returned synopsis is empty if the blockchain is empty (that's fine) + // or if the reference point is now past our undo history (that's not). + // in the second case, we should mark this peer as one we're unable to sync with and + // disconnect them. + if (reference_point != item_hash_t() && synopsis.empty()) + FC_THROW_EXCEPTION(block_older_than_undo_history, "You are on a fork I'm unable to switch to"); +#endif if( number_of_blocks_after_reference_point ) { // then the synopsis is incomplete, add the missing elements from ids_of_items_to_get uint32_t true_high_block_num = reference_point_block_num + number_of_blocks_after_reference_point; - uint32_t low_block_num = 1; + + // in order to generate a seamless synopsis, we need to be using the same low_block_num as the + // backend code; the first block in the synopsis will be the low block number it used + uint32_t low_block_num = synopsis.empty() ? 1 : _delegate->get_block_number(synopsis.front()); + do { if( low_block_num > reference_point_block_num ) - synopsis.push_back( (*original_ids_of_items_to_get)[low_block_num - reference_point_block_num - 1] ); - low_block_num += ( (true_high_block_num - low_block_num + 2 ) / 2 ); + synopsis.push_back((*original_ids_of_items_to_get)[low_block_num - reference_point_block_num - 1]); + low_block_num += (true_high_block_num - low_block_num + 2 ) / 2; } while ( low_block_num <= true_high_block_num ); assert(synopsis.back() == original_ids_of_items_to_get->back()); @@ -2305,14 +2331,25 @@ namespace graphene { namespace net { namespace detail { peer->last_block_time_delegate_has_seen = _delegate->get_block_time(item_hash_t()); } - std::vector blockchain_synopsis = create_blockchain_synopsis_for_peer( peer ); - item_hash_t last_item_seen = blockchain_synopsis.empty() ? item_hash_t() : blockchain_synopsis.back(); - dlog( "sync: sending a request for the next items after ${last_item_seen} to peer ${peer}, (full request is ${blockchain_synopsis})", - ( "last_item_seen", last_item_seen ) - ( "peer", peer->get_remote_endpoint() ) - ( "blockchain_synopsis", blockchain_synopsis ) ); - peer->item_ids_requested_from_peer = boost::make_tuple( item_id(_sync_item_type, last_item_seen ), fc::time_point::now() ); - peer->send_message( fetch_blockchain_item_ids_message(_sync_item_type, blockchain_synopsis ) ); + fc::oexception synopsis_exception; + try + { + std::vector blockchain_synopsis = create_blockchain_synopsis_for_peer( peer ); + + item_hash_t last_item_seen = blockchain_synopsis.empty() ? item_hash_t() : blockchain_synopsis.back(); + dlog( "sync: sending a request for the next items after ${last_item_seen} to peer ${peer}, (full request is ${blockchain_synopsis})", + ( "last_item_seen", last_item_seen ) + ( "peer", peer->get_remote_endpoint() ) + ( "blockchain_synopsis", blockchain_synopsis ) ); + peer->item_ids_requested_from_peer = boost::make_tuple( blockchain_synopsis, fc::time_point::now() ); + peer->send_message( fetch_blockchain_item_ids_message(_sync_item_type, blockchain_synopsis ) ); + } + catch (const block_older_than_undo_history& e) + { + synopsis_exception = e; + } + if (synopsis_exception) + disconnect_from_peer(peer, "You are on a fork I'm unable to switch to"); } void node_impl::on_blockchain_item_ids_inventory_message(peer_connection* originating_peer, @@ -2322,6 +2359,58 @@ namespace graphene { namespace net { namespace detail { // ignore unless we asked for the data if( originating_peer->item_ids_requested_from_peer ) { + // verify that the peer's the block ids the peer sent is a valid response to our request; + // It should either be an empty list of blocks, or a list of blocks that builds off of one of + // the blocks in the synopsis we sent + if (!blockchain_item_ids_inventory_message_received.item_hashes_available.empty()) + { + const std::vector& synopsis_sent_in_request = originating_peer->item_ids_requested_from_peer->get<0>(); + const item_hash_t& first_item_hash = blockchain_item_ids_inventory_message_received.item_hashes_available.front(); + + if (synopsis_sent_in_request.empty()) + { + // if we sent an empty synopsis, we were asking for all blocks, so the first block should be block 1 + if (_delegate->get_block_number(first_item_hash) != 1) + { + wlog("Invalid response from peer ${peer_endpoint}. We requested a list of sync blocks starting from the beginning of the chain, " + "but they provided a list of blocks starting with ${first_block}", + ("peer_endpoint", originating_peer->get_remote_endpoint()) + ("first_block", first_item_hash)); + // TODO: enable these once committed + //fc::exception error_for_peer(FC_LOG_MESSAGE(error, "You gave an invalid response for my request for sync blocks. I asked for blocks starting from the beginning of the chain, " + // "but you returned a list of blocks starting with ${first_block}", + // ("first_block", first_item_hash))); + //disconnect_from_peer(originating_peer, + // "You gave an invalid response to my request for sync blocks", + // true, error_for_peer); + disconnect_from_peer(originating_peer, + "You gave an invalid response to my request for sync blocks"); + return; + } + } + else // synopsis was not empty, we expect a response building off one of the blocks we sent + { + if (boost::range::find(synopsis_sent_in_request, first_item_hash) == synopsis_sent_in_request.end()) + { + wlog("Invalid response from peer ${peer_endpoint}. We requested a list of sync blocks based on the synopsis ${synopsis}, but they " + "provided a list of blocks starting with ${first_block}", + ("peer_endpoint", originating_peer->get_remote_endpoint()) + ("synopsis", synopsis_sent_in_request) + ("first_block", first_item_hash)); + // TODO: enable these once committed + //fc::exception error_for_peer(FC_LOG_MESSAGE(error, "You gave an invalid response for my request for sync blocks. I asked for blocks following something in " + // "${synopsis}, but you returned a list of blocks starting with ${first_block} which wasn't one of your choices", + // ("synopsis", synopsis_sent_in_request) + // ("first_block", first_item_hash))); + //disconnect_from_peer(originating_peer, + // "You gave an invalid response to my request for sync blocks", + // true, error_for_peer); + disconnect_from_peer(originating_peer, + "You gave an invalid response to my request for sync blocks"); + return; + } + } + } originating_peer->item_ids_requested_from_peer.reset(); dlog( "sync: received a list of ${count} available items from ${peer_endpoint}", @@ -5185,12 +5274,11 @@ namespace graphene { namespace net { namespace detail { INVOKE_AND_COLLECT_STATISTICS(handle_transaction, transaction_message); } - std::vector statistics_gathering_node_delegate_wrapper::get_item_ids(uint32_t item_type, - const std::vector& blockchain_synopsis, - uint32_t& remaining_item_count, - uint32_t limit /* = 2000 */) + std::vector statistics_gathering_node_delegate_wrapper::get_block_ids(const std::vector& blockchain_synopsis, + uint32_t& remaining_item_count, + uint32_t limit /* = 2000 */) { - INVOKE_AND_COLLECT_STATISTICS(get_item_ids, item_type, blockchain_synopsis, remaining_item_count, limit); + INVOKE_AND_COLLECT_STATISTICS(get_block_ids, blockchain_synopsis, remaining_item_count, limit); } message statistics_gathering_node_delegate_wrapper::get_item( const item_id& id ) @@ -5203,11 +5291,9 @@ namespace graphene { namespace net { namespace detail { INVOKE_AND_COLLECT_STATISTICS(get_chain_id); } - std::vector statistics_gathering_node_delegate_wrapper::get_blockchain_synopsis(uint32_t item_type, - const graphene::net::item_hash_t& reference_point /* = graphene::net::item_hash_t() */, - uint32_t number_of_blocks_after_reference_point /* = 0 */) + std::vector statistics_gathering_node_delegate_wrapper::get_blockchain_synopsis(const item_hash_t& reference_point, uint32_t number_of_blocks_after_reference_point) { - INVOKE_AND_COLLECT_STATISTICS(get_blockchain_synopsis, item_type, reference_point, number_of_blocks_after_reference_point); + INVOKE_AND_COLLECT_STATISTICS(get_blockchain_synopsis, reference_point, number_of_blocks_after_reference_point); } void statistics_gathering_node_delegate_wrapper::sync_status( uint32_t item_type, uint32_t item_count ) diff --git a/libraries/net/peer_connection.cpp b/libraries/net/peer_connection.cpp index b912eeb2..b0df01b7 100644 --- a/libraries/net/peer_connection.cpp +++ b/libraries/net/peer_connection.cpp @@ -276,8 +276,8 @@ namespace graphene { namespace net #ifndef NDEBUG struct counter { unsigned& _send_message_queue_tasks_counter; - counter(unsigned& var) : _send_message_queue_tasks_counter(var) { dlog("entering peer_connection::send_queued_messages_task()"); assert(_send_message_queue_tasks_counter == 0); ++_send_message_queue_tasks_counter; } - ~counter() { assert(_send_message_queue_tasks_counter == 1); --_send_message_queue_tasks_counter; dlog("leaving peer_connection::send_queued_messages_task()"); } + counter(unsigned& var) : _send_message_queue_tasks_counter(var) { /* dlog("entering peer_connection::send_queued_messages_task()"); */ assert(_send_message_queue_tasks_counter == 0); ++_send_message_queue_tasks_counter; } + ~counter() { assert(_send_message_queue_tasks_counter == 1); --_send_message_queue_tasks_counter; /* dlog("leaving peer_connection::send_queued_messages_task()"); */ } } concurrent_invocation_counter(_send_message_queue_tasks_running); #endif while (!_queued_messages.empty()) @@ -286,12 +286,12 @@ namespace graphene { namespace net message message_to_send = _queued_messages.front()->get_message(_node); try { - dlog("peer_connection::send_queued_messages_task() calling message_oriented_connection::send_message() " - "to send message of type ${type} for peer ${endpoint}", - ("type", message_to_send.msg_type)("endpoint", get_remote_endpoint())); + //dlog("peer_connection::send_queued_messages_task() calling message_oriented_connection::send_message() " + // "to send message of type ${type} for peer ${endpoint}", + // ("type", message_to_send.msg_type)("endpoint", get_remote_endpoint())); _message_connection.send_message(message_to_send); - dlog("peer_connection::send_queued_messages_task()'s call to message_oriented_connection::send_message() completed normally for peer ${endpoint}", - ("endpoint", get_remote_endpoint())); + //dlog("peer_connection::send_queued_messages_task()'s call to message_oriented_connection::send_message() completed normally for peer ${endpoint}", + // ("endpoint", get_remote_endpoint())); } catch (const fc::canceled_exception&) { @@ -323,7 +323,7 @@ namespace graphene { namespace net _total_queued_messages_size -= _queued_messages.front()->get_size_in_queue(); _queued_messages.pop(); } - dlog("leaving peer_connection::send_queued_messages_task() due to queue exhaustion"); + //dlog("leaving peer_connection::send_queued_messages_task() due to queue exhaustion"); } void peer_connection::send_queueable_message(std::unique_ptr&& message_to_send) @@ -351,18 +351,18 @@ namespace graphene { namespace net if (!_send_queued_messages_done.valid() || _send_queued_messages_done.ready()) { - dlog("peer_connection::send_message() is firing up send_queued_message_task"); + //dlog("peer_connection::send_message() is firing up send_queued_message_task"); _send_queued_messages_done = fc::async([this](){ send_queued_messages_task(); }, "send_queued_messages_task"); } - else - dlog("peer_connection::send_message() doesn't need to fire up send_queued_message_task, it's already running"); + //else + // dlog("peer_connection::send_message() doesn't need to fire up send_queued_message_task, it's already running"); } void peer_connection::send_message(const message& message_to_send, size_t message_send_time_field_offset) { VERIFY_CORRECT_THREAD(); - dlog("peer_connection::send_message() enqueueing message of type ${type} for peer ${endpoint}", - ("type", message_to_send.msg_type)("endpoint", get_remote_endpoint())); + //dlog("peer_connection::send_message() enqueueing message of type ${type} for peer ${endpoint}", + // ("type", message_to_send.msg_type)("endpoint", get_remote_endpoint())); std::unique_ptr message_to_enqueue(new real_queued_message(message_to_send, message_send_time_field_offset)); send_queueable_message(std::move(message_to_enqueue)); } @@ -370,8 +370,8 @@ namespace graphene { namespace net void peer_connection::send_item(const item_id& item_to_send) { VERIFY_CORRECT_THREAD(); - dlog("peer_connection::send_item() enqueueing message of type ${type} for peer ${endpoint}", - ("type", item_to_send.item_type)("endpoint", get_remote_endpoint())); + //dlog("peer_connection::send_item() enqueueing message of type ${type} for peer ${endpoint}", + // ("type", item_to_send.item_type)("endpoint", get_remote_endpoint())); std::unique_ptr message_to_enqueue(new virtual_queued_message(item_to_send)); send_queueable_message(std::move(message_to_enqueue)); } diff --git a/tests/tests/block_tests.cpp b/tests/tests/block_tests.cpp index 4a904390..f0830d48 100644 --- a/tests/tests/block_tests.cpp +++ b/tests/tests/block_tests.cpp @@ -1076,7 +1076,7 @@ BOOST_FIXTURE_TEST_CASE( transaction_invalidated_in_cache, database_fixture ) } }; - auto generate_xfer_tx = [&]( account_id_type from, account_id_type to, share_type amount, int blocks_to_expire=10 ) -> signed_transaction + auto generate_xfer_tx = [&]( account_id_type from, account_id_type to, share_type amount, int blocks_to_expire ) -> signed_transaction { signed_transaction tx; transfer_operation xfer_op; @@ -1128,8 +1128,8 @@ BOOST_FIXTURE_TEST_CASE( transaction_invalidated_in_cache, database_fixture ) // signed_transaction tx_a = generate_xfer_tx( bob_id, alice_id, 1000, 3 ); - signed_transaction tx_b = generate_xfer_tx( alice_id, bob_id, 2000 ); - signed_transaction tx_c = generate_xfer_tx( alice_id, bob_id, 500 ); + signed_transaction tx_b = generate_xfer_tx( alice_id, bob_id, 2000, 10 ); + signed_transaction tx_c = generate_xfer_tx( alice_id, bob_id, 500, 10 ); generate_block( db );