From f66eeeb73b75c6cbd7ec704a70958415a22e3d04 Mon Sep 17 00:00:00 2001 From: Eric Frias Date: Tue, 8 Aug 2017 12:05:57 -0400 Subject: [PATCH] Fix bug where peers could get stuck in sync mode. There was a case where we had requested a block through the sync mechanism and also received it through the normal inventory mechanism where we would leave the peer in a sync state, but never ask them for more sync blocks. This commit fixes the bug that put us into that stuck state, and also adds code to disconnect peers if we ever manage to get into that stalled state. --- .../include/graphene/net/peer_connection.hpp | 6 +- libraries/net/node.cpp | 450 +++++++++++------- libraries/net/peer_connection.cpp | 23 +- 3 files changed, 289 insertions(+), 190 deletions(-) diff --git a/libraries/net/include/graphene/net/peer_connection.hpp b/libraries/net/include/graphene/net/peer_connection.hpp index 7cfa316a..8c738af1 100644 --- a/libraries/net/include/graphene/net/peer_connection.hpp +++ b/libraries/net/include/graphene/net/peer_connection.hpp @@ -269,6 +269,7 @@ namespace graphene { namespace net fc::thread* _thread; unsigned _send_message_queue_tasks_running; // temporary debugging #endif + bool _currently_handling_message; // true while we're in the middle of handling a message from the remote system private: peer_connection(peer_connection_delegate* delegate); void destroy(); @@ -299,8 +300,9 @@ namespace graphene { namespace net fc::ip::endpoint get_local_endpoint(); void set_remote_endpoint(fc::optional new_remote_endpoint); - bool busy(); - bool idle(); + bool busy() const; + bool idle() const; + bool is_currently_handling_message() const; bool is_transaction_fetching_inhibited() const; fc::sha512 get_shared_secret() const; diff --git a/libraries/net/node.cpp b/libraries/net/node.cpp index 7d639878..fce8b326 100644 --- a/libraries/net/node.cpp +++ b/libraries/net/node.cpp @@ -1421,6 +1421,19 @@ namespace graphene { namespace net { namespace detail { wlog( "Sending a keepalive message to peer ${peer} who hasn't sent us any messages in the last ${timeout} seconds", ( "peer", active_peer->get_remote_endpoint() )("timeout", active_send_keepalive_timeout ) ); peers_to_send_keep_alive.push_back(active_peer); + } + else if (active_peer->we_need_sync_items_from_peer && + !active_peer->is_currently_handling_message() && + !active_peer->item_ids_requested_from_peer && + active_peer->ids_of_items_to_get.empty()) + { + // This is a state we should never get into in the first place, but if we do, we should disconnect the peer + // to re-establish the connection. + fc_wlog(fc::logger::get("sync"), "Disconnecting peer ${peer} because we think we need blocks from them but sync has stalled.", + ("peer", active_peer->get_remote_endpoint())); + wlog("Disconnecting peer ${peer} because we think we need blocks from them but sync has stalled.", + ("peer", active_peer->get_remote_endpoint())); + peers_to_disconnect_forcibly.push_back(active_peer); } } } @@ -2452,24 +2465,24 @@ namespace graphene { namespace net { namespace detail { uint32_t expected_num = first_block_number_in_reponse + i; if (actual_num != expected_num) { - wlog("Invalid response from peer ${peer_endpoint}. The list of blocks they provided is not sequential, " - "the ${position}th block in their reply was block number ${actual_num}, " - "but it should have been number ${expected_num}", - ("peer_endpoint", originating_peer->get_remote_endpoint()) - ("position", i) - ("actual_num", actual_num) - ("expected_num", expected_num)); - fc::exception error_for_peer(FC_LOG_MESSAGE(error, - "You gave an invalid response to my request for sync blocks. The list of blocks you provided is not sequential, " - "the ${position}th block in their reply was block number ${actual_num}, " - "but it should have been number ${expected_num}", - ("position", i) - ("actual_num", actual_num) - ("expected_num", expected_num))); - disconnect_from_peer(originating_peer, - "You gave an invalid response to my request for sync blocks", - true, error_for_peer); - return; + wlog("Invalid response from peer ${peer_endpoint}. The list of blocks they provided is not sequential, " + "the ${position}th block in their reply was block number ${actual_num}, " + "but it should have been number ${expected_num}", + ("peer_endpoint", originating_peer->get_remote_endpoint()) + ("position", i) + ("actual_num", actual_num) + ("expected_num", expected_num)); + fc::exception error_for_peer(FC_LOG_MESSAGE(error, + "You gave an invalid response to my request for sync blocks. The list of blocks you provided is not sequential, " + "the ${position}th block in their reply was block number ${actual_num}, " + "but it should have been number ${expected_num}", + ("position", i) + ("actual_num", actual_num) + ("expected_num", expected_num))); + disconnect_from_peer(originating_peer, + "You gave an invalid response to my request for sync blocks", + true, error_for_peer); + return; } } @@ -2516,181 +2529,205 @@ namespace graphene { namespace net { namespace detail { } originating_peer->item_ids_requested_from_peer.reset(); - dlog( "sync: received a list of ${count} available items from ${peer_endpoint}", - ( "count", blockchain_item_ids_inventory_message_received.item_hashes_available.size() ) - ( "peer_endpoint", originating_peer->get_remote_endpoint() ) ); - //for( const item_hash_t& item_hash : blockchain_item_ids_inventory_message_received.item_hashes_available ) - //{ - // dlog( "sync: ${hash}", ("hash", item_hash ) ); - //} - - // if the peer doesn't have any items after the one we asked for - if( blockchain_item_ids_inventory_message_received.total_remaining_item_count == 0 && - ( blockchain_item_ids_inventory_message_received.item_hashes_available.empty() || // there are no items in the peer's blockchain. this should only happen if our blockchain was empty when we requested, might want to verify that. - ( blockchain_item_ids_inventory_message_received.item_hashes_available.size() == 1 && - _delegate->has_item( item_id(blockchain_item_ids_inventory_message_received.item_type, - blockchain_item_ids_inventory_message_received.item_hashes_available.front() ) ) ) ) && // we've already seen the last item in the peer's blockchain - originating_peer->ids_of_items_to_get.empty() && - originating_peer->number_of_unfetched_item_ids == 0 ) // <-- is the last check necessary? + // if exceptions are throw after clearing the item_ids_requested_from_peer (above), + // it could leave our sync in a stalled state. Wrap a try/catch around the rest + // of the function so we can log if this ever happens. + try { - dlog( "sync: peer said we're up-to-date, entering normal operation with this peer" ); - originating_peer->we_need_sync_items_from_peer = false; + dlog( "sync: received a list of ${count} available items from ${peer_endpoint}", + ( "count", blockchain_item_ids_inventory_message_received.item_hashes_available.size() ) + ( "peer_endpoint", originating_peer->get_remote_endpoint() ) ); + //for( const item_hash_t& item_hash : blockchain_item_ids_inventory_message_received.item_hashes_available ) + //{ + // dlog( "sync: ${hash}", ("hash", item_hash ) ); + //} - uint32_t new_number_of_unfetched_items = calculate_unsynced_block_count_from_all_peers(); - _total_number_of_unfetched_items = new_number_of_unfetched_items; - if( new_number_of_unfetched_items == 0 ) - _delegate->sync_status( blockchain_item_ids_inventory_message_received.item_type, 0 ); - - return; - } - - std::deque item_hashes_received( blockchain_item_ids_inventory_message_received.item_hashes_available.begin(), - blockchain_item_ids_inventory_message_received.item_hashes_available.end() ); - originating_peer->number_of_unfetched_item_ids = blockchain_item_ids_inventory_message_received.total_remaining_item_count; - // flush any items this peer sent us that we've already received and processed from another peer - if (!item_hashes_received.empty() && - originating_peer->ids_of_items_to_get.empty()) - { - bool is_first_item_for_other_peer = false; - for (const peer_connection_ptr& peer : _active_connections) - if (peer != originating_peer->shared_from_this() && - !peer->ids_of_items_to_get.empty() && - peer->ids_of_items_to_get.front() == blockchain_item_ids_inventory_message_received.item_hashes_available.front()) - { - dlog("The item ${newitem} is the first item for peer ${peer}", - ("newitem", blockchain_item_ids_inventory_message_received.item_hashes_available.front()) - ("peer", peer->get_remote_endpoint())); - is_first_item_for_other_peer = true; - break; - } - dlog("is_first_item_for_other_peer: ${is_first}. item_hashes_received.size() = ${size}", - ("is_first", is_first_item_for_other_peer)("size", item_hashes_received.size())); - if (!is_first_item_for_other_peer) + // if the peer doesn't have any items after the one we asked for + if( blockchain_item_ids_inventory_message_received.total_remaining_item_count == 0 && + ( blockchain_item_ids_inventory_message_received.item_hashes_available.empty() || // there are no items in the peer's blockchain. this should only happen if our blockchain was empty when we requested, might want to verify that. + ( blockchain_item_ids_inventory_message_received.item_hashes_available.size() == 1 && + _delegate->has_item( item_id(blockchain_item_ids_inventory_message_received.item_type, + blockchain_item_ids_inventory_message_received.item_hashes_available.front() ) ) ) ) && // we've already seen the last item in the peer's blockchain + originating_peer->ids_of_items_to_get.empty() && + originating_peer->number_of_unfetched_item_ids == 0 ) // <-- is the last check necessary? { - while (!item_hashes_received.empty() && - _delegate->has_item(item_id(blockchain_item_ids_inventory_message_received.item_type, - item_hashes_received.front()))) + dlog( "sync: peer said we're up-to-date, entering normal operation with this peer" ); + originating_peer->we_need_sync_items_from_peer = false; + + uint32_t new_number_of_unfetched_items = calculate_unsynced_block_count_from_all_peers(); + _total_number_of_unfetched_items = new_number_of_unfetched_items; + if( new_number_of_unfetched_items == 0 ) + _delegate->sync_status( blockchain_item_ids_inventory_message_received.item_type, 0 ); + + return; + } + + std::deque item_hashes_received( blockchain_item_ids_inventory_message_received.item_hashes_available.begin(), + blockchain_item_ids_inventory_message_received.item_hashes_available.end() ); + originating_peer->number_of_unfetched_item_ids = blockchain_item_ids_inventory_message_received.total_remaining_item_count; + // flush any items this peer sent us that we've already received and processed from another peer + if (!item_hashes_received.empty() && + originating_peer->ids_of_items_to_get.empty()) + { + bool is_first_item_for_other_peer = false; + for (const peer_connection_ptr& peer : _active_connections) + if (peer != originating_peer->shared_from_this() && + !peer->ids_of_items_to_get.empty() && + peer->ids_of_items_to_get.front() == blockchain_item_ids_inventory_message_received.item_hashes_available.front()) + { + dlog("The item ${newitem} is the first item for peer ${peer}", + ("newitem", blockchain_item_ids_inventory_message_received.item_hashes_available.front()) + ("peer", peer->get_remote_endpoint())); + is_first_item_for_other_peer = true; + break; + } + dlog("is_first_item_for_other_peer: ${is_first}. item_hashes_received.size() = ${size}", + ("is_first", is_first_item_for_other_peer)("size", item_hashes_received.size())); + if (!is_first_item_for_other_peer) { - assert(item_hashes_received.front() != item_hash_t()); + while (!item_hashes_received.empty() && + _delegate->has_item(item_id(blockchain_item_ids_inventory_message_received.item_type, + item_hashes_received.front()))) + { + assert(item_hashes_received.front() != item_hash_t()); + originating_peer->last_block_delegate_has_seen = item_hashes_received.front(); + originating_peer->last_block_time_delegate_has_seen = _delegate->get_block_time(item_hashes_received.front()); + dlog("popping item because delegate has already seen it. peer ${peer}'s last block the delegate has seen is now ${block_id} (actual block #${actual_block_num})", + ("peer", originating_peer->get_remote_endpoint()) + ("block_id", originating_peer->last_block_delegate_has_seen) + ("actual_block_num", _delegate->get_block_number(item_hashes_received.front()))); + + item_hashes_received.pop_front(); + } + dlog("after removing all items we have already seen, item_hashes_received.size() = ${size}", ("size", item_hashes_received.size())); + } + } + else if (!item_hashes_received.empty()) + { + // we received a list of items and we already have a list of items to fetch from this peer. + // In the normal case, this list will immediately follow the existing list, meaning the + // last hash of our existing list will match the first hash of the new list. + + // In the much less likely case, we've received a partial list of items from the peer, then + // the peer switched forks before sending us the remaining list. In this case, the first + // hash in the new list may not be the last hash in the existing list (it may be earlier, or + // it may not exist at all. + + // In either case, pop items off the back of our existing list until we find our first + // item, then append our list. + while (!originating_peer->ids_of_items_to_get.empty()) + { + if (item_hashes_received.front() != originating_peer->ids_of_items_to_get.back()) + originating_peer->ids_of_items_to_get.pop_back(); + else + break; + } + if (originating_peer->ids_of_items_to_get.empty()) + { + // this happens when the peer has switched forks between the last inventory message and + // this one, and there weren't any unfetched items in common + // We don't know where in the blockchain the new front() actually falls, all we can + // expect is that it is a block that we knew about because it should be one of the + // blocks we sent in the initial synopsis. + assert(_delegate->has_item(item_id(_sync_item_type, item_hashes_received.front()))); originating_peer->last_block_delegate_has_seen = item_hashes_received.front(); originating_peer->last_block_time_delegate_has_seen = _delegate->get_block_time(item_hashes_received.front()); - dlog("popping item because delegate has already seen it. peer ${peer}'s last block the delegate has seen is now ${block_id} (actual block #${actual_block_num})", - ("peer", originating_peer->get_remote_endpoint()) - ("block_id", originating_peer->last_block_delegate_has_seen) - ("actual_block_num", _delegate->get_block_number(item_hashes_received.front()))); - item_hashes_received.pop_front(); } - dlog("after removing all items we have already seen, item_hashes_received.size() = ${size}", ("size", item_hashes_received.size())); - } - } - else if (!item_hashes_received.empty()) - { - // we received a list of items and we already have a list of items to fetch from this peer. - // In the normal case, this list will immediately follow the existing list, meaning the - // last hash of our existing list will match the first hash of the new list. - - // In the much less likely case, we've received a partial list of items from the peer, then - // the peer switched forks before sending us the remaining list. In this case, the first - // hash in the new list may not be the last hash in the existing list (it may be earlier, or - // it may not exist at all. - - // In either case, pop items off the back of our existing list until we find our first - // item, then append our list. - while (!originating_peer->ids_of_items_to_get.empty()) - { - if (item_hashes_received.front() != originating_peer->ids_of_items_to_get.back()) - originating_peer->ids_of_items_to_get.pop_back(); else - break; + { + // the common simple case: the new list extends the old. pop off the duplicate element + originating_peer->ids_of_items_to_get.pop_back(); + } } - if (originating_peer->ids_of_items_to_get.empty()) + + if (!item_hashes_received.empty() && !originating_peer->ids_of_items_to_get.empty()) + assert(item_hashes_received.front() != originating_peer->ids_of_items_to_get.back()); + + // append the remaining items to the peer's list + boost::push_back(originating_peer->ids_of_items_to_get, item_hashes_received); + + originating_peer->number_of_unfetched_item_ids = blockchain_item_ids_inventory_message_received.total_remaining_item_count; + + // at any given time, there's a maximum number of blocks that can possibly be out there + // [(now - genesis time) / block interval]. If they offer us more blocks than that, + // they must be an attacker or have a buggy client. + fc::time_point_sec minimum_time_of_last_offered_block = + originating_peer->last_block_time_delegate_has_seen + // timestamp of the block immediately before the first unfetched block + originating_peer->number_of_unfetched_item_ids * GRAPHENE_MIN_BLOCK_INTERVAL; + fc::time_point_sec now = fc::time_point::now(); + if (minimum_time_of_last_offered_block > now + GRAPHENE_NET_FUTURE_SYNC_BLOCKS_GRACE_PERIOD_SEC) { - // this happens when the peer has switched forks between the last inventory message and - // this one, and there weren't any unfetched items in common - // We don't know where in the blockchain the new front() actually falls, all we can - // expect is that it is a block that we knew about because it should be one of the - // blocks we sent in the initial synopsis. - assert(_delegate->has_item(item_id(_sync_item_type, item_hashes_received.front()))); - originating_peer->last_block_delegate_has_seen = item_hashes_received.front(); - originating_peer->last_block_time_delegate_has_seen = _delegate->get_block_time(item_hashes_received.front()); - item_hashes_received.pop_front(); + wlog("Disconnecting from peer ${peer} who offered us an implausible number of blocks, their last block would be in the future (${timestamp})", + ("peer", originating_peer->get_remote_endpoint()) + ("timestamp", minimum_time_of_last_offered_block)); + fc::exception error_for_peer(FC_LOG_MESSAGE(error, "You offered me a list of more sync blocks than could possibly exist. Total blocks offered: ${blocks}, Minimum time of the last block you offered: ${minimum_time_of_last_offered_block}, Now: ${now}", + ("blocks", originating_peer->number_of_unfetched_item_ids) + ("minimum_time_of_last_offered_block", minimum_time_of_last_offered_block) + ("now", now))); + disconnect_from_peer(originating_peer, + "You offered me a list of more sync blocks than could possibly exist", + true, error_for_peer); + return; + } + + uint32_t new_number_of_unfetched_items = calculate_unsynced_block_count_from_all_peers(); + if (new_number_of_unfetched_items != _total_number_of_unfetched_items) + _delegate->sync_status(blockchain_item_ids_inventory_message_received.item_type, + new_number_of_unfetched_items); + _total_number_of_unfetched_items = new_number_of_unfetched_items; + + if (blockchain_item_ids_inventory_message_received.total_remaining_item_count != 0) + { + // the peer hasn't sent us all the items it knows about. + if (originating_peer->ids_of_items_to_get.size() > GRAPHENE_NET_MIN_BLOCK_IDS_TO_PREFETCH) + { + // we have a good number of item ids from this peer, start fetching blocks from it; + // we'll switch back later to finish the job. + trigger_fetch_sync_items_loop(); + } + else + { + // keep fetching the peer's list of sync items until we get enough to switch into block- + // fetchimg mode + fetch_next_batch_of_item_ids_from_peer(originating_peer); + } } else { - // the common simple case: the new list extends the old. pop off the duplicate element - originating_peer->ids_of_items_to_get.pop_back(); + // the peer has told us about all of the items it knows + if (!originating_peer->ids_of_items_to_get.empty()) + { + // we now know about all of the items the peer knows about, and there are some items on the list + // that we should try to fetch. Kick off the fetch loop. + trigger_fetch_sync_items_loop(); + } + else + { + // If we get here, the peer has sent us a non-empty list of items, but we have already + // received all of the items from other peers. Send a new request to the peer to + // see if we're really in sync + fetch_next_batch_of_item_ids_from_peer(originating_peer); + } } } - - if (!item_hashes_received.empty() && !originating_peer->ids_of_items_to_get.empty()) - assert(item_hashes_received.front() != originating_peer->ids_of_items_to_get.back()); - - // append the remaining items to the peer's list - boost::push_back(originating_peer->ids_of_items_to_get, item_hashes_received); - - originating_peer->number_of_unfetched_item_ids = blockchain_item_ids_inventory_message_received.total_remaining_item_count; - - // at any given time, there's a maximum number of blocks that can possibly be out there - // [(now - genesis time) / block interval]. If they offer us more blocks than that, - // they must be an attacker or have a buggy client. - fc::time_point_sec minimum_time_of_last_offered_block = - originating_peer->last_block_time_delegate_has_seen + // timestamp of the block immediately before the first unfetched block - originating_peer->number_of_unfetched_item_ids * GRAPHENE_MIN_BLOCK_INTERVAL; - fc::time_point_sec now = fc::time_point::now(); - if (minimum_time_of_last_offered_block > now + GRAPHENE_NET_FUTURE_SYNC_BLOCKS_GRACE_PERIOD_SEC) + catch (const fc::canceled_exception&) { - wlog("Disconnecting from peer ${peer} who offered us an implausible number of blocks, their last block would be in the future (${timestamp})", - ("peer", originating_peer->get_remote_endpoint()) - ("timestamp", minimum_time_of_last_offered_block)); - fc::exception error_for_peer(FC_LOG_MESSAGE(error, "You offered me a list of more sync blocks than could possibly exist. Total blocks offered: ${blocks}, Minimum time of the last block you offered: ${minimum_time_of_last_offered_block}, Now: ${now}", - ("blocks", originating_peer->number_of_unfetched_item_ids) - ("minimum_time_of_last_offered_block", minimum_time_of_last_offered_block) - ("now", now))); - disconnect_from_peer(originating_peer, - "You offered me a list of more sync blocks than could possibly exist", - true, error_for_peer); - return; + throw; } - - uint32_t new_number_of_unfetched_items = calculate_unsynced_block_count_from_all_peers(); - if (new_number_of_unfetched_items != _total_number_of_unfetched_items) - _delegate->sync_status(blockchain_item_ids_inventory_message_received.item_type, - new_number_of_unfetched_items); - _total_number_of_unfetched_items = new_number_of_unfetched_items; - - if (blockchain_item_ids_inventory_message_received.total_remaining_item_count != 0) + catch (const fc::exception& e) { - // the peer hasn't sent us all the items it knows about. - if (originating_peer->ids_of_items_to_get.size() > GRAPHENE_NET_MIN_BLOCK_IDS_TO_PREFETCH) - { - // we have a good number of item ids from this peer, start fetching blocks from it; - // we'll switch back later to finish the job. - trigger_fetch_sync_items_loop(); - } - else - { - // keep fetching the peer's list of sync items until we get enough to switch into block- - // fetchimg mode - fetch_next_batch_of_item_ids_from_peer(originating_peer); - } + elog("Caught unexpected exception: ${e}", ("e", e)); + assert(false && "exceptions not expected here"); } - else + catch (const std::exception& e) { - // the peer has told us about all of the items it knows - if (!originating_peer->ids_of_items_to_get.empty()) - { - // we now know about all of the items the peer knows about, and there are some items on the list - // that we should try to fetch. Kick off the fetch loop. - trigger_fetch_sync_items_loop(); - } - else - { - // If we get here, the peer has sent us a non-empty list of items, but we have already - // received all of the items from other peers. Send a new request to the peer to - // see if we're really in sync - fetch_next_batch_of_item_ids_from_peer(originating_peer); - } + elog("Caught unexpected exception: ${e}", ("e", e.what())); + assert(false && "exceptions not expected here"); + } + catch (...) + { + elog("Caught unexpected exception, could break sync operation"); } } else @@ -3267,7 +3304,30 @@ namespace graphene { namespace net { namespace detail { block_processed_this_iteration = true; } else + { dlog("Already received and accepted this block (presumably through normal inventory mechanism), treating it as accepted"); + for (const peer_connection_ptr& peer : _active_connections) + { + auto items_being_processed_iter = peer->ids_of_items_being_processed.find(received_block_iter->block_id); + if (items_being_processed_iter != peer->ids_of_items_being_processed.end()) + { + peer->ids_of_items_being_processed.erase(items_being_processed_iter); + dlog("Removed item from ${endpoint}'s list of items being processed, still processing ${len} blocks", + ("endpoint", peer->get_remote_endpoint())("len", peer->ids_of_items_being_processed.size())); + + // if we just processed the last item in our list from this peer, we will want to + // send another request to find out if we are now in sync (this is normally handled in + // send_sync_block_to_node_delegate) + if (peer->ids_of_items_to_get.empty() && + peer->number_of_unfetched_item_ids == 0 && + peer->ids_of_items_being_processed.empty()) + { + dlog("We received last item in our list for peer ${endpoint}, setup to do a sync check", ("endpoint", peer->get_remote_endpoint())); + fetch_next_batch_of_item_ids_from_peer(peer.get()); + } + } + } + } break; // start iterating _received_sync_items from the beginning } // end if potential_first_block @@ -3483,19 +3543,43 @@ namespace graphene { namespace net { namespace detail { if (sync_item_iter != originating_peer->sync_items_requested_from_peer.end()) { originating_peer->sync_items_requested_from_peer.erase(sync_item_iter); - _active_sync_requests.erase(block_message_to_process.block_id); - process_block_during_sync(originating_peer, block_message_to_process, message_hash); - if (originating_peer->idle()) + // if exceptions are throw here after removing the sync item from the list (above), + // it could leave our sync in a stalled state. Wrap a try/catch around the rest + // of the function so we can log if this ever happens. + try { - // we have finished fetching a batch of items, so we either need to grab another batch of items - // or we need to get another list of item ids. - if (originating_peer->number_of_unfetched_item_ids > 0 && - originating_peer->ids_of_items_to_get.size() < GRAPHENE_NET_MIN_BLOCK_IDS_TO_PREFETCH) - fetch_next_batch_of_item_ids_from_peer(originating_peer); - else - trigger_fetch_sync_items_loop(); + _active_sync_requests.erase(block_message_to_process.block_id); + process_block_during_sync(originating_peer, block_message_to_process, message_hash); + if (originating_peer->idle()) + { + // we have finished fetching a batch of items, so we either need to grab another batch of items + // or we need to get another list of item ids. + if (originating_peer->number_of_unfetched_item_ids > 0 && + originating_peer->ids_of_items_to_get.size() < GRAPHENE_NET_MIN_BLOCK_IDS_TO_PREFETCH) + fetch_next_batch_of_item_ids_from_peer(originating_peer); + else + trigger_fetch_sync_items_loop(); + } + return; + } + catch (const fc::canceled_exception& e) + { + throw; + } + catch (const fc::exception& e) + { + elog("Caught unexpected exception: ${e}", ("e", e)); + assert(false && "exceptions not expected here"); + } + catch (const std::exception& e) + { + elog("Caught unexpected exception: ${e}", ("e", e.what())); + assert(false && "exceptions not expected here"); + } + catch (...) + { + elog("Caught unexpected exception, could break sync operation"); } - return; } } diff --git a/libraries/net/peer_connection.cpp b/libraries/net/peer_connection.cpp index 4dfcec3b..f1f20d3f 100644 --- a/libraries/net/peer_connection.cpp +++ b/libraries/net/peer_connection.cpp @@ -29,6 +29,8 @@ #include +#include + #ifdef DEFAULT_LOGGER # undef DEFAULT_LOGGER #endif @@ -86,11 +88,12 @@ namespace graphene { namespace net inhibit_fetching_sync_blocks(false), transaction_fetching_inhibited_until(fc::time_point::min()), last_known_fork_block_number(0), - firewall_check_state(nullptr) + firewall_check_state(nullptr), #ifndef NDEBUG - ,_thread(&fc::thread::current()), - _send_message_queue_tasks_running(0) + _thread(&fc::thread::current()), + _send_message_queue_tasks_running(0), #endif + _currently_handling_message(false) { } @@ -265,6 +268,10 @@ namespace graphene { namespace net void peer_connection::on_message( message_oriented_connection* originating_connection, const message& received_message ) { VERIFY_CORRECT_THREAD(); + _currently_handling_message = true; + BOOST_SCOPE_EXIT(this_) { + this_->_currently_handling_message = false; + } BOOST_SCOPE_EXIT_END _node->on_message( this, received_message ); } @@ -438,18 +445,24 @@ namespace graphene { namespace net _remote_endpoint = new_remote_endpoint; } - bool peer_connection::busy() + bool peer_connection::busy() const { VERIFY_CORRECT_THREAD(); return !items_requested_from_peer.empty() || !sync_items_requested_from_peer.empty() || item_ids_requested_from_peer; } - bool peer_connection::idle() + bool peer_connection::idle() const { VERIFY_CORRECT_THREAD(); return !busy(); } + bool peer_connection::is_currently_handling_message() const + { + VERIFY_CORRECT_THREAD(); + return _currently_handling_message; + } + bool peer_connection::is_transaction_fetching_inhibited() const { VERIFY_CORRECT_THREAD();