diff --git a/libraries/net/include/graphene/net/peer_connection.hpp b/libraries/net/include/graphene/net/peer_connection.hpp index 7cfa316a..8c738af1 100644 --- a/libraries/net/include/graphene/net/peer_connection.hpp +++ b/libraries/net/include/graphene/net/peer_connection.hpp @@ -269,6 +269,7 @@ namespace graphene { namespace net fc::thread* _thread; unsigned _send_message_queue_tasks_running; // temporary debugging #endif + bool _currently_handling_message; // true while we're in the middle of handling a message from the remote system private: peer_connection(peer_connection_delegate* delegate); void destroy(); @@ -299,8 +300,9 @@ namespace graphene { namespace net fc::ip::endpoint get_local_endpoint(); void set_remote_endpoint(fc::optional new_remote_endpoint); - bool busy(); - bool idle(); + bool busy() const; + bool idle() const; + bool is_currently_handling_message() const; bool is_transaction_fetching_inhibited() const; fc::sha512 get_shared_secret() const; diff --git a/libraries/net/node.cpp b/libraries/net/node.cpp index 7d639878..fce8b326 100644 --- a/libraries/net/node.cpp +++ b/libraries/net/node.cpp @@ -1421,6 +1421,19 @@ namespace graphene { namespace net { namespace detail { wlog( "Sending a keepalive message to peer ${peer} who hasn't sent us any messages in the last ${timeout} seconds", ( "peer", active_peer->get_remote_endpoint() )("timeout", active_send_keepalive_timeout ) ); peers_to_send_keep_alive.push_back(active_peer); + } + else if (active_peer->we_need_sync_items_from_peer && + !active_peer->is_currently_handling_message() && + !active_peer->item_ids_requested_from_peer && + active_peer->ids_of_items_to_get.empty()) + { + // This is a state we should never get into in the first place, but if we do, we should disconnect the peer + // to re-establish the connection. + fc_wlog(fc::logger::get("sync"), "Disconnecting peer ${peer} because we think we need blocks from them but sync has stalled.", + ("peer", active_peer->get_remote_endpoint())); + wlog("Disconnecting peer ${peer} because we think we need blocks from them but sync has stalled.", + ("peer", active_peer->get_remote_endpoint())); + peers_to_disconnect_forcibly.push_back(active_peer); } } } @@ -2452,24 +2465,24 @@ namespace graphene { namespace net { namespace detail { uint32_t expected_num = first_block_number_in_reponse + i; if (actual_num != expected_num) { - wlog("Invalid response from peer ${peer_endpoint}. The list of blocks they provided is not sequential, " - "the ${position}th block in their reply was block number ${actual_num}, " - "but it should have been number ${expected_num}", - ("peer_endpoint", originating_peer->get_remote_endpoint()) - ("position", i) - ("actual_num", actual_num) - ("expected_num", expected_num)); - fc::exception error_for_peer(FC_LOG_MESSAGE(error, - "You gave an invalid response to my request for sync blocks. The list of blocks you provided is not sequential, " - "the ${position}th block in their reply was block number ${actual_num}, " - "but it should have been number ${expected_num}", - ("position", i) - ("actual_num", actual_num) - ("expected_num", expected_num))); - disconnect_from_peer(originating_peer, - "You gave an invalid response to my request for sync blocks", - true, error_for_peer); - return; + wlog("Invalid response from peer ${peer_endpoint}. The list of blocks they provided is not sequential, " + "the ${position}th block in their reply was block number ${actual_num}, " + "but it should have been number ${expected_num}", + ("peer_endpoint", originating_peer->get_remote_endpoint()) + ("position", i) + ("actual_num", actual_num) + ("expected_num", expected_num)); + fc::exception error_for_peer(FC_LOG_MESSAGE(error, + "You gave an invalid response to my request for sync blocks. The list of blocks you provided is not sequential, " + "the ${position}th block in their reply was block number ${actual_num}, " + "but it should have been number ${expected_num}", + ("position", i) + ("actual_num", actual_num) + ("expected_num", expected_num))); + disconnect_from_peer(originating_peer, + "You gave an invalid response to my request for sync blocks", + true, error_for_peer); + return; } } @@ -2516,181 +2529,205 @@ namespace graphene { namespace net { namespace detail { } originating_peer->item_ids_requested_from_peer.reset(); - dlog( "sync: received a list of ${count} available items from ${peer_endpoint}", - ( "count", blockchain_item_ids_inventory_message_received.item_hashes_available.size() ) - ( "peer_endpoint", originating_peer->get_remote_endpoint() ) ); - //for( const item_hash_t& item_hash : blockchain_item_ids_inventory_message_received.item_hashes_available ) - //{ - // dlog( "sync: ${hash}", ("hash", item_hash ) ); - //} - - // if the peer doesn't have any items after the one we asked for - if( blockchain_item_ids_inventory_message_received.total_remaining_item_count == 0 && - ( blockchain_item_ids_inventory_message_received.item_hashes_available.empty() || // there are no items in the peer's blockchain. this should only happen if our blockchain was empty when we requested, might want to verify that. - ( blockchain_item_ids_inventory_message_received.item_hashes_available.size() == 1 && - _delegate->has_item( item_id(blockchain_item_ids_inventory_message_received.item_type, - blockchain_item_ids_inventory_message_received.item_hashes_available.front() ) ) ) ) && // we've already seen the last item in the peer's blockchain - originating_peer->ids_of_items_to_get.empty() && - originating_peer->number_of_unfetched_item_ids == 0 ) // <-- is the last check necessary? + // if exceptions are throw after clearing the item_ids_requested_from_peer (above), + // it could leave our sync in a stalled state. Wrap a try/catch around the rest + // of the function so we can log if this ever happens. + try { - dlog( "sync: peer said we're up-to-date, entering normal operation with this peer" ); - originating_peer->we_need_sync_items_from_peer = false; + dlog( "sync: received a list of ${count} available items from ${peer_endpoint}", + ( "count", blockchain_item_ids_inventory_message_received.item_hashes_available.size() ) + ( "peer_endpoint", originating_peer->get_remote_endpoint() ) ); + //for( const item_hash_t& item_hash : blockchain_item_ids_inventory_message_received.item_hashes_available ) + //{ + // dlog( "sync: ${hash}", ("hash", item_hash ) ); + //} - uint32_t new_number_of_unfetched_items = calculate_unsynced_block_count_from_all_peers(); - _total_number_of_unfetched_items = new_number_of_unfetched_items; - if( new_number_of_unfetched_items == 0 ) - _delegate->sync_status( blockchain_item_ids_inventory_message_received.item_type, 0 ); - - return; - } - - std::deque item_hashes_received( blockchain_item_ids_inventory_message_received.item_hashes_available.begin(), - blockchain_item_ids_inventory_message_received.item_hashes_available.end() ); - originating_peer->number_of_unfetched_item_ids = blockchain_item_ids_inventory_message_received.total_remaining_item_count; - // flush any items this peer sent us that we've already received and processed from another peer - if (!item_hashes_received.empty() && - originating_peer->ids_of_items_to_get.empty()) - { - bool is_first_item_for_other_peer = false; - for (const peer_connection_ptr& peer : _active_connections) - if (peer != originating_peer->shared_from_this() && - !peer->ids_of_items_to_get.empty() && - peer->ids_of_items_to_get.front() == blockchain_item_ids_inventory_message_received.item_hashes_available.front()) - { - dlog("The item ${newitem} is the first item for peer ${peer}", - ("newitem", blockchain_item_ids_inventory_message_received.item_hashes_available.front()) - ("peer", peer->get_remote_endpoint())); - is_first_item_for_other_peer = true; - break; - } - dlog("is_first_item_for_other_peer: ${is_first}. item_hashes_received.size() = ${size}", - ("is_first", is_first_item_for_other_peer)("size", item_hashes_received.size())); - if (!is_first_item_for_other_peer) + // if the peer doesn't have any items after the one we asked for + if( blockchain_item_ids_inventory_message_received.total_remaining_item_count == 0 && + ( blockchain_item_ids_inventory_message_received.item_hashes_available.empty() || // there are no items in the peer's blockchain. this should only happen if our blockchain was empty when we requested, might want to verify that. + ( blockchain_item_ids_inventory_message_received.item_hashes_available.size() == 1 && + _delegate->has_item( item_id(blockchain_item_ids_inventory_message_received.item_type, + blockchain_item_ids_inventory_message_received.item_hashes_available.front() ) ) ) ) && // we've already seen the last item in the peer's blockchain + originating_peer->ids_of_items_to_get.empty() && + originating_peer->number_of_unfetched_item_ids == 0 ) // <-- is the last check necessary? { - while (!item_hashes_received.empty() && - _delegate->has_item(item_id(blockchain_item_ids_inventory_message_received.item_type, - item_hashes_received.front()))) + dlog( "sync: peer said we're up-to-date, entering normal operation with this peer" ); + originating_peer->we_need_sync_items_from_peer = false; + + uint32_t new_number_of_unfetched_items = calculate_unsynced_block_count_from_all_peers(); + _total_number_of_unfetched_items = new_number_of_unfetched_items; + if( new_number_of_unfetched_items == 0 ) + _delegate->sync_status( blockchain_item_ids_inventory_message_received.item_type, 0 ); + + return; + } + + std::deque item_hashes_received( blockchain_item_ids_inventory_message_received.item_hashes_available.begin(), + blockchain_item_ids_inventory_message_received.item_hashes_available.end() ); + originating_peer->number_of_unfetched_item_ids = blockchain_item_ids_inventory_message_received.total_remaining_item_count; + // flush any items this peer sent us that we've already received and processed from another peer + if (!item_hashes_received.empty() && + originating_peer->ids_of_items_to_get.empty()) + { + bool is_first_item_for_other_peer = false; + for (const peer_connection_ptr& peer : _active_connections) + if (peer != originating_peer->shared_from_this() && + !peer->ids_of_items_to_get.empty() && + peer->ids_of_items_to_get.front() == blockchain_item_ids_inventory_message_received.item_hashes_available.front()) + { + dlog("The item ${newitem} is the first item for peer ${peer}", + ("newitem", blockchain_item_ids_inventory_message_received.item_hashes_available.front()) + ("peer", peer->get_remote_endpoint())); + is_first_item_for_other_peer = true; + break; + } + dlog("is_first_item_for_other_peer: ${is_first}. item_hashes_received.size() = ${size}", + ("is_first", is_first_item_for_other_peer)("size", item_hashes_received.size())); + if (!is_first_item_for_other_peer) { - assert(item_hashes_received.front() != item_hash_t()); + while (!item_hashes_received.empty() && + _delegate->has_item(item_id(blockchain_item_ids_inventory_message_received.item_type, + item_hashes_received.front()))) + { + assert(item_hashes_received.front() != item_hash_t()); + originating_peer->last_block_delegate_has_seen = item_hashes_received.front(); + originating_peer->last_block_time_delegate_has_seen = _delegate->get_block_time(item_hashes_received.front()); + dlog("popping item because delegate has already seen it. peer ${peer}'s last block the delegate has seen is now ${block_id} (actual block #${actual_block_num})", + ("peer", originating_peer->get_remote_endpoint()) + ("block_id", originating_peer->last_block_delegate_has_seen) + ("actual_block_num", _delegate->get_block_number(item_hashes_received.front()))); + + item_hashes_received.pop_front(); + } + dlog("after removing all items we have already seen, item_hashes_received.size() = ${size}", ("size", item_hashes_received.size())); + } + } + else if (!item_hashes_received.empty()) + { + // we received a list of items and we already have a list of items to fetch from this peer. + // In the normal case, this list will immediately follow the existing list, meaning the + // last hash of our existing list will match the first hash of the new list. + + // In the much less likely case, we've received a partial list of items from the peer, then + // the peer switched forks before sending us the remaining list. In this case, the first + // hash in the new list may not be the last hash in the existing list (it may be earlier, or + // it may not exist at all. + + // In either case, pop items off the back of our existing list until we find our first + // item, then append our list. + while (!originating_peer->ids_of_items_to_get.empty()) + { + if (item_hashes_received.front() != originating_peer->ids_of_items_to_get.back()) + originating_peer->ids_of_items_to_get.pop_back(); + else + break; + } + if (originating_peer->ids_of_items_to_get.empty()) + { + // this happens when the peer has switched forks between the last inventory message and + // this one, and there weren't any unfetched items in common + // We don't know where in the blockchain the new front() actually falls, all we can + // expect is that it is a block that we knew about because it should be one of the + // blocks we sent in the initial synopsis. + assert(_delegate->has_item(item_id(_sync_item_type, item_hashes_received.front()))); originating_peer->last_block_delegate_has_seen = item_hashes_received.front(); originating_peer->last_block_time_delegate_has_seen = _delegate->get_block_time(item_hashes_received.front()); - dlog("popping item because delegate has already seen it. peer ${peer}'s last block the delegate has seen is now ${block_id} (actual block #${actual_block_num})", - ("peer", originating_peer->get_remote_endpoint()) - ("block_id", originating_peer->last_block_delegate_has_seen) - ("actual_block_num", _delegate->get_block_number(item_hashes_received.front()))); - item_hashes_received.pop_front(); } - dlog("after removing all items we have already seen, item_hashes_received.size() = ${size}", ("size", item_hashes_received.size())); - } - } - else if (!item_hashes_received.empty()) - { - // we received a list of items and we already have a list of items to fetch from this peer. - // In the normal case, this list will immediately follow the existing list, meaning the - // last hash of our existing list will match the first hash of the new list. - - // In the much less likely case, we've received a partial list of items from the peer, then - // the peer switched forks before sending us the remaining list. In this case, the first - // hash in the new list may not be the last hash in the existing list (it may be earlier, or - // it may not exist at all. - - // In either case, pop items off the back of our existing list until we find our first - // item, then append our list. - while (!originating_peer->ids_of_items_to_get.empty()) - { - if (item_hashes_received.front() != originating_peer->ids_of_items_to_get.back()) - originating_peer->ids_of_items_to_get.pop_back(); else - break; + { + // the common simple case: the new list extends the old. pop off the duplicate element + originating_peer->ids_of_items_to_get.pop_back(); + } } - if (originating_peer->ids_of_items_to_get.empty()) + + if (!item_hashes_received.empty() && !originating_peer->ids_of_items_to_get.empty()) + assert(item_hashes_received.front() != originating_peer->ids_of_items_to_get.back()); + + // append the remaining items to the peer's list + boost::push_back(originating_peer->ids_of_items_to_get, item_hashes_received); + + originating_peer->number_of_unfetched_item_ids = blockchain_item_ids_inventory_message_received.total_remaining_item_count; + + // at any given time, there's a maximum number of blocks that can possibly be out there + // [(now - genesis time) / block interval]. If they offer us more blocks than that, + // they must be an attacker or have a buggy client. + fc::time_point_sec minimum_time_of_last_offered_block = + originating_peer->last_block_time_delegate_has_seen + // timestamp of the block immediately before the first unfetched block + originating_peer->number_of_unfetched_item_ids * GRAPHENE_MIN_BLOCK_INTERVAL; + fc::time_point_sec now = fc::time_point::now(); + if (minimum_time_of_last_offered_block > now + GRAPHENE_NET_FUTURE_SYNC_BLOCKS_GRACE_PERIOD_SEC) { - // this happens when the peer has switched forks between the last inventory message and - // this one, and there weren't any unfetched items in common - // We don't know where in the blockchain the new front() actually falls, all we can - // expect is that it is a block that we knew about because it should be one of the - // blocks we sent in the initial synopsis. - assert(_delegate->has_item(item_id(_sync_item_type, item_hashes_received.front()))); - originating_peer->last_block_delegate_has_seen = item_hashes_received.front(); - originating_peer->last_block_time_delegate_has_seen = _delegate->get_block_time(item_hashes_received.front()); - item_hashes_received.pop_front(); + wlog("Disconnecting from peer ${peer} who offered us an implausible number of blocks, their last block would be in the future (${timestamp})", + ("peer", originating_peer->get_remote_endpoint()) + ("timestamp", minimum_time_of_last_offered_block)); + fc::exception error_for_peer(FC_LOG_MESSAGE(error, "You offered me a list of more sync blocks than could possibly exist. Total blocks offered: ${blocks}, Minimum time of the last block you offered: ${minimum_time_of_last_offered_block}, Now: ${now}", + ("blocks", originating_peer->number_of_unfetched_item_ids) + ("minimum_time_of_last_offered_block", minimum_time_of_last_offered_block) + ("now", now))); + disconnect_from_peer(originating_peer, + "You offered me a list of more sync blocks than could possibly exist", + true, error_for_peer); + return; + } + + uint32_t new_number_of_unfetched_items = calculate_unsynced_block_count_from_all_peers(); + if (new_number_of_unfetched_items != _total_number_of_unfetched_items) + _delegate->sync_status(blockchain_item_ids_inventory_message_received.item_type, + new_number_of_unfetched_items); + _total_number_of_unfetched_items = new_number_of_unfetched_items; + + if (blockchain_item_ids_inventory_message_received.total_remaining_item_count != 0) + { + // the peer hasn't sent us all the items it knows about. + if (originating_peer->ids_of_items_to_get.size() > GRAPHENE_NET_MIN_BLOCK_IDS_TO_PREFETCH) + { + // we have a good number of item ids from this peer, start fetching blocks from it; + // we'll switch back later to finish the job. + trigger_fetch_sync_items_loop(); + } + else + { + // keep fetching the peer's list of sync items until we get enough to switch into block- + // fetchimg mode + fetch_next_batch_of_item_ids_from_peer(originating_peer); + } } else { - // the common simple case: the new list extends the old. pop off the duplicate element - originating_peer->ids_of_items_to_get.pop_back(); + // the peer has told us about all of the items it knows + if (!originating_peer->ids_of_items_to_get.empty()) + { + // we now know about all of the items the peer knows about, and there are some items on the list + // that we should try to fetch. Kick off the fetch loop. + trigger_fetch_sync_items_loop(); + } + else + { + // If we get here, the peer has sent us a non-empty list of items, but we have already + // received all of the items from other peers. Send a new request to the peer to + // see if we're really in sync + fetch_next_batch_of_item_ids_from_peer(originating_peer); + } } } - - if (!item_hashes_received.empty() && !originating_peer->ids_of_items_to_get.empty()) - assert(item_hashes_received.front() != originating_peer->ids_of_items_to_get.back()); - - // append the remaining items to the peer's list - boost::push_back(originating_peer->ids_of_items_to_get, item_hashes_received); - - originating_peer->number_of_unfetched_item_ids = blockchain_item_ids_inventory_message_received.total_remaining_item_count; - - // at any given time, there's a maximum number of blocks that can possibly be out there - // [(now - genesis time) / block interval]. If they offer us more blocks than that, - // they must be an attacker or have a buggy client. - fc::time_point_sec minimum_time_of_last_offered_block = - originating_peer->last_block_time_delegate_has_seen + // timestamp of the block immediately before the first unfetched block - originating_peer->number_of_unfetched_item_ids * GRAPHENE_MIN_BLOCK_INTERVAL; - fc::time_point_sec now = fc::time_point::now(); - if (minimum_time_of_last_offered_block > now + GRAPHENE_NET_FUTURE_SYNC_BLOCKS_GRACE_PERIOD_SEC) + catch (const fc::canceled_exception&) { - wlog("Disconnecting from peer ${peer} who offered us an implausible number of blocks, their last block would be in the future (${timestamp})", - ("peer", originating_peer->get_remote_endpoint()) - ("timestamp", minimum_time_of_last_offered_block)); - fc::exception error_for_peer(FC_LOG_MESSAGE(error, "You offered me a list of more sync blocks than could possibly exist. Total blocks offered: ${blocks}, Minimum time of the last block you offered: ${minimum_time_of_last_offered_block}, Now: ${now}", - ("blocks", originating_peer->number_of_unfetched_item_ids) - ("minimum_time_of_last_offered_block", minimum_time_of_last_offered_block) - ("now", now))); - disconnect_from_peer(originating_peer, - "You offered me a list of more sync blocks than could possibly exist", - true, error_for_peer); - return; + throw; } - - uint32_t new_number_of_unfetched_items = calculate_unsynced_block_count_from_all_peers(); - if (new_number_of_unfetched_items != _total_number_of_unfetched_items) - _delegate->sync_status(blockchain_item_ids_inventory_message_received.item_type, - new_number_of_unfetched_items); - _total_number_of_unfetched_items = new_number_of_unfetched_items; - - if (blockchain_item_ids_inventory_message_received.total_remaining_item_count != 0) + catch (const fc::exception& e) { - // the peer hasn't sent us all the items it knows about. - if (originating_peer->ids_of_items_to_get.size() > GRAPHENE_NET_MIN_BLOCK_IDS_TO_PREFETCH) - { - // we have a good number of item ids from this peer, start fetching blocks from it; - // we'll switch back later to finish the job. - trigger_fetch_sync_items_loop(); - } - else - { - // keep fetching the peer's list of sync items until we get enough to switch into block- - // fetchimg mode - fetch_next_batch_of_item_ids_from_peer(originating_peer); - } + elog("Caught unexpected exception: ${e}", ("e", e)); + assert(false && "exceptions not expected here"); } - else + catch (const std::exception& e) { - // the peer has told us about all of the items it knows - if (!originating_peer->ids_of_items_to_get.empty()) - { - // we now know about all of the items the peer knows about, and there are some items on the list - // that we should try to fetch. Kick off the fetch loop. - trigger_fetch_sync_items_loop(); - } - else - { - // If we get here, the peer has sent us a non-empty list of items, but we have already - // received all of the items from other peers. Send a new request to the peer to - // see if we're really in sync - fetch_next_batch_of_item_ids_from_peer(originating_peer); - } + elog("Caught unexpected exception: ${e}", ("e", e.what())); + assert(false && "exceptions not expected here"); + } + catch (...) + { + elog("Caught unexpected exception, could break sync operation"); } } else @@ -3267,7 +3304,30 @@ namespace graphene { namespace net { namespace detail { block_processed_this_iteration = true; } else + { dlog("Already received and accepted this block (presumably through normal inventory mechanism), treating it as accepted"); + for (const peer_connection_ptr& peer : _active_connections) + { + auto items_being_processed_iter = peer->ids_of_items_being_processed.find(received_block_iter->block_id); + if (items_being_processed_iter != peer->ids_of_items_being_processed.end()) + { + peer->ids_of_items_being_processed.erase(items_being_processed_iter); + dlog("Removed item from ${endpoint}'s list of items being processed, still processing ${len} blocks", + ("endpoint", peer->get_remote_endpoint())("len", peer->ids_of_items_being_processed.size())); + + // if we just processed the last item in our list from this peer, we will want to + // send another request to find out if we are now in sync (this is normally handled in + // send_sync_block_to_node_delegate) + if (peer->ids_of_items_to_get.empty() && + peer->number_of_unfetched_item_ids == 0 && + peer->ids_of_items_being_processed.empty()) + { + dlog("We received last item in our list for peer ${endpoint}, setup to do a sync check", ("endpoint", peer->get_remote_endpoint())); + fetch_next_batch_of_item_ids_from_peer(peer.get()); + } + } + } + } break; // start iterating _received_sync_items from the beginning } // end if potential_first_block @@ -3483,19 +3543,43 @@ namespace graphene { namespace net { namespace detail { if (sync_item_iter != originating_peer->sync_items_requested_from_peer.end()) { originating_peer->sync_items_requested_from_peer.erase(sync_item_iter); - _active_sync_requests.erase(block_message_to_process.block_id); - process_block_during_sync(originating_peer, block_message_to_process, message_hash); - if (originating_peer->idle()) + // if exceptions are throw here after removing the sync item from the list (above), + // it could leave our sync in a stalled state. Wrap a try/catch around the rest + // of the function so we can log if this ever happens. + try { - // we have finished fetching a batch of items, so we either need to grab another batch of items - // or we need to get another list of item ids. - if (originating_peer->number_of_unfetched_item_ids > 0 && - originating_peer->ids_of_items_to_get.size() < GRAPHENE_NET_MIN_BLOCK_IDS_TO_PREFETCH) - fetch_next_batch_of_item_ids_from_peer(originating_peer); - else - trigger_fetch_sync_items_loop(); + _active_sync_requests.erase(block_message_to_process.block_id); + process_block_during_sync(originating_peer, block_message_to_process, message_hash); + if (originating_peer->idle()) + { + // we have finished fetching a batch of items, so we either need to grab another batch of items + // or we need to get another list of item ids. + if (originating_peer->number_of_unfetched_item_ids > 0 && + originating_peer->ids_of_items_to_get.size() < GRAPHENE_NET_MIN_BLOCK_IDS_TO_PREFETCH) + fetch_next_batch_of_item_ids_from_peer(originating_peer); + else + trigger_fetch_sync_items_loop(); + } + return; + } + catch (const fc::canceled_exception& e) + { + throw; + } + catch (const fc::exception& e) + { + elog("Caught unexpected exception: ${e}", ("e", e)); + assert(false && "exceptions not expected here"); + } + catch (const std::exception& e) + { + elog("Caught unexpected exception: ${e}", ("e", e.what())); + assert(false && "exceptions not expected here"); + } + catch (...) + { + elog("Caught unexpected exception, could break sync operation"); } - return; } } diff --git a/libraries/net/peer_connection.cpp b/libraries/net/peer_connection.cpp index 4dfcec3b..f1f20d3f 100644 --- a/libraries/net/peer_connection.cpp +++ b/libraries/net/peer_connection.cpp @@ -29,6 +29,8 @@ #include +#include + #ifdef DEFAULT_LOGGER # undef DEFAULT_LOGGER #endif @@ -86,11 +88,12 @@ namespace graphene { namespace net inhibit_fetching_sync_blocks(false), transaction_fetching_inhibited_until(fc::time_point::min()), last_known_fork_block_number(0), - firewall_check_state(nullptr) + firewall_check_state(nullptr), #ifndef NDEBUG - ,_thread(&fc::thread::current()), - _send_message_queue_tasks_running(0) + _thread(&fc::thread::current()), + _send_message_queue_tasks_running(0), #endif + _currently_handling_message(false) { } @@ -265,6 +268,10 @@ namespace graphene { namespace net void peer_connection::on_message( message_oriented_connection* originating_connection, const message& received_message ) { VERIFY_CORRECT_THREAD(); + _currently_handling_message = true; + BOOST_SCOPE_EXIT(this_) { + this_->_currently_handling_message = false; + } BOOST_SCOPE_EXIT_END _node->on_message( this, received_message ); } @@ -438,18 +445,24 @@ namespace graphene { namespace net _remote_endpoint = new_remote_endpoint; } - bool peer_connection::busy() + bool peer_connection::busy() const { VERIFY_CORRECT_THREAD(); return !items_requested_from_peer.empty() || !sync_items_requested_from_peer.empty() || item_ids_requested_from_peer; } - bool peer_connection::idle() + bool peer_connection::idle() const { VERIFY_CORRECT_THREAD(); return !busy(); } + bool peer_connection::is_currently_handling_message() const + { + VERIFY_CORRECT_THREAD(); + return _currently_handling_message; + } + bool peer_connection::is_transaction_fetching_inhibited() const { VERIFY_CORRECT_THREAD();