Fix bug where peers could get stuck in sync mode.

There was a case where we had requested a block through the sync
mechanism and also received it through the normal inventory mechanism
where we would leave the peer in a sync state, but never ask them
for more sync blocks.

This commit fixes the bug that put us into that stuck state, and also
adds code to disconnect peers if we ever manage to get into that stalled
state.
This commit is contained in:
Eric Frias 2017-08-08 12:05:57 -04:00
parent c054a92d5e
commit f66eeeb73b
3 changed files with 289 additions and 190 deletions

View file

@ -269,6 +269,7 @@ namespace graphene { namespace net
fc::thread* _thread; fc::thread* _thread;
unsigned _send_message_queue_tasks_running; // temporary debugging unsigned _send_message_queue_tasks_running; // temporary debugging
#endif #endif
bool _currently_handling_message; // true while we're in the middle of handling a message from the remote system
private: private:
peer_connection(peer_connection_delegate* delegate); peer_connection(peer_connection_delegate* delegate);
void destroy(); void destroy();
@ -299,8 +300,9 @@ namespace graphene { namespace net
fc::ip::endpoint get_local_endpoint(); fc::ip::endpoint get_local_endpoint();
void set_remote_endpoint(fc::optional<fc::ip::endpoint> new_remote_endpoint); void set_remote_endpoint(fc::optional<fc::ip::endpoint> new_remote_endpoint);
bool busy(); bool busy() const;
bool idle(); bool idle() const;
bool is_currently_handling_message() const;
bool is_transaction_fetching_inhibited() const; bool is_transaction_fetching_inhibited() const;
fc::sha512 get_shared_secret() const; fc::sha512 get_shared_secret() const;

View file

@ -1421,6 +1421,19 @@ namespace graphene { namespace net { namespace detail {
wlog( "Sending a keepalive message to peer ${peer} who hasn't sent us any messages in the last ${timeout} seconds", wlog( "Sending a keepalive message to peer ${peer} who hasn't sent us any messages in the last ${timeout} seconds",
( "peer", active_peer->get_remote_endpoint() )("timeout", active_send_keepalive_timeout ) ); ( "peer", active_peer->get_remote_endpoint() )("timeout", active_send_keepalive_timeout ) );
peers_to_send_keep_alive.push_back(active_peer); peers_to_send_keep_alive.push_back(active_peer);
}
else if (active_peer->we_need_sync_items_from_peer &&
!active_peer->is_currently_handling_message() &&
!active_peer->item_ids_requested_from_peer &&
active_peer->ids_of_items_to_get.empty())
{
// This is a state we should never get into in the first place, but if we do, we should disconnect the peer
// to re-establish the connection.
fc_wlog(fc::logger::get("sync"), "Disconnecting peer ${peer} because we think we need blocks from them but sync has stalled.",
("peer", active_peer->get_remote_endpoint()));
wlog("Disconnecting peer ${peer} because we think we need blocks from them but sync has stalled.",
("peer", active_peer->get_remote_endpoint()));
peers_to_disconnect_forcibly.push_back(active_peer);
} }
} }
} }
@ -2452,24 +2465,24 @@ namespace graphene { namespace net { namespace detail {
uint32_t expected_num = first_block_number_in_reponse + i; uint32_t expected_num = first_block_number_in_reponse + i;
if (actual_num != expected_num) if (actual_num != expected_num)
{ {
wlog("Invalid response from peer ${peer_endpoint}. The list of blocks they provided is not sequential, " wlog("Invalid response from peer ${peer_endpoint}. The list of blocks they provided is not sequential, "
"the ${position}th block in their reply was block number ${actual_num}, " "the ${position}th block in their reply was block number ${actual_num}, "
"but it should have been number ${expected_num}", "but it should have been number ${expected_num}",
("peer_endpoint", originating_peer->get_remote_endpoint()) ("peer_endpoint", originating_peer->get_remote_endpoint())
("position", i) ("position", i)
("actual_num", actual_num) ("actual_num", actual_num)
("expected_num", expected_num)); ("expected_num", expected_num));
fc::exception error_for_peer(FC_LOG_MESSAGE(error, fc::exception error_for_peer(FC_LOG_MESSAGE(error,
"You gave an invalid response to my request for sync blocks. The list of blocks you provided is not sequential, " "You gave an invalid response to my request for sync blocks. The list of blocks you provided is not sequential, "
"the ${position}th block in their reply was block number ${actual_num}, " "the ${position}th block in their reply was block number ${actual_num}, "
"but it should have been number ${expected_num}", "but it should have been number ${expected_num}",
("position", i) ("position", i)
("actual_num", actual_num) ("actual_num", actual_num)
("expected_num", expected_num))); ("expected_num", expected_num)));
disconnect_from_peer(originating_peer, disconnect_from_peer(originating_peer,
"You gave an invalid response to my request for sync blocks", "You gave an invalid response to my request for sync blocks",
true, error_for_peer); true, error_for_peer);
return; return;
} }
} }
@ -2516,181 +2529,205 @@ namespace graphene { namespace net { namespace detail {
} }
originating_peer->item_ids_requested_from_peer.reset(); originating_peer->item_ids_requested_from_peer.reset();
dlog( "sync: received a list of ${count} available items from ${peer_endpoint}", // if exceptions are throw after clearing the item_ids_requested_from_peer (above),
( "count", blockchain_item_ids_inventory_message_received.item_hashes_available.size() ) // it could leave our sync in a stalled state. Wrap a try/catch around the rest
( "peer_endpoint", originating_peer->get_remote_endpoint() ) ); // of the function so we can log if this ever happens.
//for( const item_hash_t& item_hash : blockchain_item_ids_inventory_message_received.item_hashes_available ) try
//{
// dlog( "sync: ${hash}", ("hash", item_hash ) );
//}
// if the peer doesn't have any items after the one we asked for
if( blockchain_item_ids_inventory_message_received.total_remaining_item_count == 0 &&
( blockchain_item_ids_inventory_message_received.item_hashes_available.empty() || // there are no items in the peer's blockchain. this should only happen if our blockchain was empty when we requested, might want to verify that.
( blockchain_item_ids_inventory_message_received.item_hashes_available.size() == 1 &&
_delegate->has_item( item_id(blockchain_item_ids_inventory_message_received.item_type,
blockchain_item_ids_inventory_message_received.item_hashes_available.front() ) ) ) ) && // we've already seen the last item in the peer's blockchain
originating_peer->ids_of_items_to_get.empty() &&
originating_peer->number_of_unfetched_item_ids == 0 ) // <-- is the last check necessary?
{ {
dlog( "sync: peer said we're up-to-date, entering normal operation with this peer" ); dlog( "sync: received a list of ${count} available items from ${peer_endpoint}",
originating_peer->we_need_sync_items_from_peer = false; ( "count", blockchain_item_ids_inventory_message_received.item_hashes_available.size() )
( "peer_endpoint", originating_peer->get_remote_endpoint() ) );
//for( const item_hash_t& item_hash : blockchain_item_ids_inventory_message_received.item_hashes_available )
//{
// dlog( "sync: ${hash}", ("hash", item_hash ) );
//}
uint32_t new_number_of_unfetched_items = calculate_unsynced_block_count_from_all_peers(); // if the peer doesn't have any items after the one we asked for
_total_number_of_unfetched_items = new_number_of_unfetched_items; if( blockchain_item_ids_inventory_message_received.total_remaining_item_count == 0 &&
if( new_number_of_unfetched_items == 0 ) ( blockchain_item_ids_inventory_message_received.item_hashes_available.empty() || // there are no items in the peer's blockchain. this should only happen if our blockchain was empty when we requested, might want to verify that.
_delegate->sync_status( blockchain_item_ids_inventory_message_received.item_type, 0 ); ( blockchain_item_ids_inventory_message_received.item_hashes_available.size() == 1 &&
_delegate->has_item( item_id(blockchain_item_ids_inventory_message_received.item_type,
return; blockchain_item_ids_inventory_message_received.item_hashes_available.front() ) ) ) ) && // we've already seen the last item in the peer's blockchain
} originating_peer->ids_of_items_to_get.empty() &&
originating_peer->number_of_unfetched_item_ids == 0 ) // <-- is the last check necessary?
std::deque<item_hash_t> item_hashes_received( blockchain_item_ids_inventory_message_received.item_hashes_available.begin(),
blockchain_item_ids_inventory_message_received.item_hashes_available.end() );
originating_peer->number_of_unfetched_item_ids = blockchain_item_ids_inventory_message_received.total_remaining_item_count;
// flush any items this peer sent us that we've already received and processed from another peer
if (!item_hashes_received.empty() &&
originating_peer->ids_of_items_to_get.empty())
{
bool is_first_item_for_other_peer = false;
for (const peer_connection_ptr& peer : _active_connections)
if (peer != originating_peer->shared_from_this() &&
!peer->ids_of_items_to_get.empty() &&
peer->ids_of_items_to_get.front() == blockchain_item_ids_inventory_message_received.item_hashes_available.front())
{
dlog("The item ${newitem} is the first item for peer ${peer}",
("newitem", blockchain_item_ids_inventory_message_received.item_hashes_available.front())
("peer", peer->get_remote_endpoint()));
is_first_item_for_other_peer = true;
break;
}
dlog("is_first_item_for_other_peer: ${is_first}. item_hashes_received.size() = ${size}",
("is_first", is_first_item_for_other_peer)("size", item_hashes_received.size()));
if (!is_first_item_for_other_peer)
{ {
while (!item_hashes_received.empty() && dlog( "sync: peer said we're up-to-date, entering normal operation with this peer" );
_delegate->has_item(item_id(blockchain_item_ids_inventory_message_received.item_type, originating_peer->we_need_sync_items_from_peer = false;
item_hashes_received.front())))
uint32_t new_number_of_unfetched_items = calculate_unsynced_block_count_from_all_peers();
_total_number_of_unfetched_items = new_number_of_unfetched_items;
if( new_number_of_unfetched_items == 0 )
_delegate->sync_status( blockchain_item_ids_inventory_message_received.item_type, 0 );
return;
}
std::deque<item_hash_t> item_hashes_received( blockchain_item_ids_inventory_message_received.item_hashes_available.begin(),
blockchain_item_ids_inventory_message_received.item_hashes_available.end() );
originating_peer->number_of_unfetched_item_ids = blockchain_item_ids_inventory_message_received.total_remaining_item_count;
// flush any items this peer sent us that we've already received and processed from another peer
if (!item_hashes_received.empty() &&
originating_peer->ids_of_items_to_get.empty())
{
bool is_first_item_for_other_peer = false;
for (const peer_connection_ptr& peer : _active_connections)
if (peer != originating_peer->shared_from_this() &&
!peer->ids_of_items_to_get.empty() &&
peer->ids_of_items_to_get.front() == blockchain_item_ids_inventory_message_received.item_hashes_available.front())
{
dlog("The item ${newitem} is the first item for peer ${peer}",
("newitem", blockchain_item_ids_inventory_message_received.item_hashes_available.front())
("peer", peer->get_remote_endpoint()));
is_first_item_for_other_peer = true;
break;
}
dlog("is_first_item_for_other_peer: ${is_first}. item_hashes_received.size() = ${size}",
("is_first", is_first_item_for_other_peer)("size", item_hashes_received.size()));
if (!is_first_item_for_other_peer)
{ {
assert(item_hashes_received.front() != item_hash_t()); while (!item_hashes_received.empty() &&
_delegate->has_item(item_id(blockchain_item_ids_inventory_message_received.item_type,
item_hashes_received.front())))
{
assert(item_hashes_received.front() != item_hash_t());
originating_peer->last_block_delegate_has_seen = item_hashes_received.front();
originating_peer->last_block_time_delegate_has_seen = _delegate->get_block_time(item_hashes_received.front());
dlog("popping item because delegate has already seen it. peer ${peer}'s last block the delegate has seen is now ${block_id} (actual block #${actual_block_num})",
("peer", originating_peer->get_remote_endpoint())
("block_id", originating_peer->last_block_delegate_has_seen)
("actual_block_num", _delegate->get_block_number(item_hashes_received.front())));
item_hashes_received.pop_front();
}
dlog("after removing all items we have already seen, item_hashes_received.size() = ${size}", ("size", item_hashes_received.size()));
}
}
else if (!item_hashes_received.empty())
{
// we received a list of items and we already have a list of items to fetch from this peer.
// In the normal case, this list will immediately follow the existing list, meaning the
// last hash of our existing list will match the first hash of the new list.
// In the much less likely case, we've received a partial list of items from the peer, then
// the peer switched forks before sending us the remaining list. In this case, the first
// hash in the new list may not be the last hash in the existing list (it may be earlier, or
// it may not exist at all.
// In either case, pop items off the back of our existing list until we find our first
// item, then append our list.
while (!originating_peer->ids_of_items_to_get.empty())
{
if (item_hashes_received.front() != originating_peer->ids_of_items_to_get.back())
originating_peer->ids_of_items_to_get.pop_back();
else
break;
}
if (originating_peer->ids_of_items_to_get.empty())
{
// this happens when the peer has switched forks between the last inventory message and
// this one, and there weren't any unfetched items in common
// We don't know where in the blockchain the new front() actually falls, all we can
// expect is that it is a block that we knew about because it should be one of the
// blocks we sent in the initial synopsis.
assert(_delegate->has_item(item_id(_sync_item_type, item_hashes_received.front())));
originating_peer->last_block_delegate_has_seen = item_hashes_received.front(); originating_peer->last_block_delegate_has_seen = item_hashes_received.front();
originating_peer->last_block_time_delegate_has_seen = _delegate->get_block_time(item_hashes_received.front()); originating_peer->last_block_time_delegate_has_seen = _delegate->get_block_time(item_hashes_received.front());
dlog("popping item because delegate has already seen it. peer ${peer}'s last block the delegate has seen is now ${block_id} (actual block #${actual_block_num})",
("peer", originating_peer->get_remote_endpoint())
("block_id", originating_peer->last_block_delegate_has_seen)
("actual_block_num", _delegate->get_block_number(item_hashes_received.front())));
item_hashes_received.pop_front(); item_hashes_received.pop_front();
} }
dlog("after removing all items we have already seen, item_hashes_received.size() = ${size}", ("size", item_hashes_received.size()));
}
}
else if (!item_hashes_received.empty())
{
// we received a list of items and we already have a list of items to fetch from this peer.
// In the normal case, this list will immediately follow the existing list, meaning the
// last hash of our existing list will match the first hash of the new list.
// In the much less likely case, we've received a partial list of items from the peer, then
// the peer switched forks before sending us the remaining list. In this case, the first
// hash in the new list may not be the last hash in the existing list (it may be earlier, or
// it may not exist at all.
// In either case, pop items off the back of our existing list until we find our first
// item, then append our list.
while (!originating_peer->ids_of_items_to_get.empty())
{
if (item_hashes_received.front() != originating_peer->ids_of_items_to_get.back())
originating_peer->ids_of_items_to_get.pop_back();
else else
break; {
// the common simple case: the new list extends the old. pop off the duplicate element
originating_peer->ids_of_items_to_get.pop_back();
}
} }
if (originating_peer->ids_of_items_to_get.empty())
if (!item_hashes_received.empty() && !originating_peer->ids_of_items_to_get.empty())
assert(item_hashes_received.front() != originating_peer->ids_of_items_to_get.back());
// append the remaining items to the peer's list
boost::push_back(originating_peer->ids_of_items_to_get, item_hashes_received);
originating_peer->number_of_unfetched_item_ids = blockchain_item_ids_inventory_message_received.total_remaining_item_count;
// at any given time, there's a maximum number of blocks that can possibly be out there
// [(now - genesis time) / block interval]. If they offer us more blocks than that,
// they must be an attacker or have a buggy client.
fc::time_point_sec minimum_time_of_last_offered_block =
originating_peer->last_block_time_delegate_has_seen + // timestamp of the block immediately before the first unfetched block
originating_peer->number_of_unfetched_item_ids * GRAPHENE_MIN_BLOCK_INTERVAL;
fc::time_point_sec now = fc::time_point::now();
if (minimum_time_of_last_offered_block > now + GRAPHENE_NET_FUTURE_SYNC_BLOCKS_GRACE_PERIOD_SEC)
{ {
// this happens when the peer has switched forks between the last inventory message and wlog("Disconnecting from peer ${peer} who offered us an implausible number of blocks, their last block would be in the future (${timestamp})",
// this one, and there weren't any unfetched items in common ("peer", originating_peer->get_remote_endpoint())
// We don't know where in the blockchain the new front() actually falls, all we can ("timestamp", minimum_time_of_last_offered_block));
// expect is that it is a block that we knew about because it should be one of the fc::exception error_for_peer(FC_LOG_MESSAGE(error, "You offered me a list of more sync blocks than could possibly exist. Total blocks offered: ${blocks}, Minimum time of the last block you offered: ${minimum_time_of_last_offered_block}, Now: ${now}",
// blocks we sent in the initial synopsis. ("blocks", originating_peer->number_of_unfetched_item_ids)
assert(_delegate->has_item(item_id(_sync_item_type, item_hashes_received.front()))); ("minimum_time_of_last_offered_block", minimum_time_of_last_offered_block)
originating_peer->last_block_delegate_has_seen = item_hashes_received.front(); ("now", now)));
originating_peer->last_block_time_delegate_has_seen = _delegate->get_block_time(item_hashes_received.front()); disconnect_from_peer(originating_peer,
item_hashes_received.pop_front(); "You offered me a list of more sync blocks than could possibly exist",
true, error_for_peer);
return;
}
uint32_t new_number_of_unfetched_items = calculate_unsynced_block_count_from_all_peers();
if (new_number_of_unfetched_items != _total_number_of_unfetched_items)
_delegate->sync_status(blockchain_item_ids_inventory_message_received.item_type,
new_number_of_unfetched_items);
_total_number_of_unfetched_items = new_number_of_unfetched_items;
if (blockchain_item_ids_inventory_message_received.total_remaining_item_count != 0)
{
// the peer hasn't sent us all the items it knows about.
if (originating_peer->ids_of_items_to_get.size() > GRAPHENE_NET_MIN_BLOCK_IDS_TO_PREFETCH)
{
// we have a good number of item ids from this peer, start fetching blocks from it;
// we'll switch back later to finish the job.
trigger_fetch_sync_items_loop();
}
else
{
// keep fetching the peer's list of sync items until we get enough to switch into block-
// fetchimg mode
fetch_next_batch_of_item_ids_from_peer(originating_peer);
}
} }
else else
{ {
// the common simple case: the new list extends the old. pop off the duplicate element // the peer has told us about all of the items it knows
originating_peer->ids_of_items_to_get.pop_back(); if (!originating_peer->ids_of_items_to_get.empty())
{
// we now know about all of the items the peer knows about, and there are some items on the list
// that we should try to fetch. Kick off the fetch loop.
trigger_fetch_sync_items_loop();
}
else
{
// If we get here, the peer has sent us a non-empty list of items, but we have already
// received all of the items from other peers. Send a new request to the peer to
// see if we're really in sync
fetch_next_batch_of_item_ids_from_peer(originating_peer);
}
} }
} }
catch (const fc::canceled_exception&)
if (!item_hashes_received.empty() && !originating_peer->ids_of_items_to_get.empty())
assert(item_hashes_received.front() != originating_peer->ids_of_items_to_get.back());
// append the remaining items to the peer's list
boost::push_back(originating_peer->ids_of_items_to_get, item_hashes_received);
originating_peer->number_of_unfetched_item_ids = blockchain_item_ids_inventory_message_received.total_remaining_item_count;
// at any given time, there's a maximum number of blocks that can possibly be out there
// [(now - genesis time) / block interval]. If they offer us more blocks than that,
// they must be an attacker or have a buggy client.
fc::time_point_sec minimum_time_of_last_offered_block =
originating_peer->last_block_time_delegate_has_seen + // timestamp of the block immediately before the first unfetched block
originating_peer->number_of_unfetched_item_ids * GRAPHENE_MIN_BLOCK_INTERVAL;
fc::time_point_sec now = fc::time_point::now();
if (minimum_time_of_last_offered_block > now + GRAPHENE_NET_FUTURE_SYNC_BLOCKS_GRACE_PERIOD_SEC)
{ {
wlog("Disconnecting from peer ${peer} who offered us an implausible number of blocks, their last block would be in the future (${timestamp})", throw;
("peer", originating_peer->get_remote_endpoint())
("timestamp", minimum_time_of_last_offered_block));
fc::exception error_for_peer(FC_LOG_MESSAGE(error, "You offered me a list of more sync blocks than could possibly exist. Total blocks offered: ${blocks}, Minimum time of the last block you offered: ${minimum_time_of_last_offered_block}, Now: ${now}",
("blocks", originating_peer->number_of_unfetched_item_ids)
("minimum_time_of_last_offered_block", minimum_time_of_last_offered_block)
("now", now)));
disconnect_from_peer(originating_peer,
"You offered me a list of more sync blocks than could possibly exist",
true, error_for_peer);
return;
} }
catch (const fc::exception& e)
uint32_t new_number_of_unfetched_items = calculate_unsynced_block_count_from_all_peers();
if (new_number_of_unfetched_items != _total_number_of_unfetched_items)
_delegate->sync_status(blockchain_item_ids_inventory_message_received.item_type,
new_number_of_unfetched_items);
_total_number_of_unfetched_items = new_number_of_unfetched_items;
if (blockchain_item_ids_inventory_message_received.total_remaining_item_count != 0)
{ {
// the peer hasn't sent us all the items it knows about. elog("Caught unexpected exception: ${e}", ("e", e));
if (originating_peer->ids_of_items_to_get.size() > GRAPHENE_NET_MIN_BLOCK_IDS_TO_PREFETCH) assert(false && "exceptions not expected here");
{
// we have a good number of item ids from this peer, start fetching blocks from it;
// we'll switch back later to finish the job.
trigger_fetch_sync_items_loop();
}
else
{
// keep fetching the peer's list of sync items until we get enough to switch into block-
// fetchimg mode
fetch_next_batch_of_item_ids_from_peer(originating_peer);
}
} }
else catch (const std::exception& e)
{ {
// the peer has told us about all of the items it knows elog("Caught unexpected exception: ${e}", ("e", e.what()));
if (!originating_peer->ids_of_items_to_get.empty()) assert(false && "exceptions not expected here");
{ }
// we now know about all of the items the peer knows about, and there are some items on the list catch (...)
// that we should try to fetch. Kick off the fetch loop. {
trigger_fetch_sync_items_loop(); elog("Caught unexpected exception, could break sync operation");
}
else
{
// If we get here, the peer has sent us a non-empty list of items, but we have already
// received all of the items from other peers. Send a new request to the peer to
// see if we're really in sync
fetch_next_batch_of_item_ids_from_peer(originating_peer);
}
} }
} }
else else
@ -3267,7 +3304,30 @@ namespace graphene { namespace net { namespace detail {
block_processed_this_iteration = true; block_processed_this_iteration = true;
} }
else else
{
dlog("Already received and accepted this block (presumably through normal inventory mechanism), treating it as accepted"); dlog("Already received and accepted this block (presumably through normal inventory mechanism), treating it as accepted");
for (const peer_connection_ptr& peer : _active_connections)
{
auto items_being_processed_iter = peer->ids_of_items_being_processed.find(received_block_iter->block_id);
if (items_being_processed_iter != peer->ids_of_items_being_processed.end())
{
peer->ids_of_items_being_processed.erase(items_being_processed_iter);
dlog("Removed item from ${endpoint}'s list of items being processed, still processing ${len} blocks",
("endpoint", peer->get_remote_endpoint())("len", peer->ids_of_items_being_processed.size()));
// if we just processed the last item in our list from this peer, we will want to
// send another request to find out if we are now in sync (this is normally handled in
// send_sync_block_to_node_delegate)
if (peer->ids_of_items_to_get.empty() &&
peer->number_of_unfetched_item_ids == 0 &&
peer->ids_of_items_being_processed.empty())
{
dlog("We received last item in our list for peer ${endpoint}, setup to do a sync check", ("endpoint", peer->get_remote_endpoint()));
fetch_next_batch_of_item_ids_from_peer(peer.get());
}
}
}
}
break; // start iterating _received_sync_items from the beginning break; // start iterating _received_sync_items from the beginning
} // end if potential_first_block } // end if potential_first_block
@ -3483,19 +3543,43 @@ namespace graphene { namespace net { namespace detail {
if (sync_item_iter != originating_peer->sync_items_requested_from_peer.end()) if (sync_item_iter != originating_peer->sync_items_requested_from_peer.end())
{ {
originating_peer->sync_items_requested_from_peer.erase(sync_item_iter); originating_peer->sync_items_requested_from_peer.erase(sync_item_iter);
_active_sync_requests.erase(block_message_to_process.block_id); // if exceptions are throw here after removing the sync item from the list (above),
process_block_during_sync(originating_peer, block_message_to_process, message_hash); // it could leave our sync in a stalled state. Wrap a try/catch around the rest
if (originating_peer->idle()) // of the function so we can log if this ever happens.
try
{ {
// we have finished fetching a batch of items, so we either need to grab another batch of items _active_sync_requests.erase(block_message_to_process.block_id);
// or we need to get another list of item ids. process_block_during_sync(originating_peer, block_message_to_process, message_hash);
if (originating_peer->number_of_unfetched_item_ids > 0 && if (originating_peer->idle())
originating_peer->ids_of_items_to_get.size() < GRAPHENE_NET_MIN_BLOCK_IDS_TO_PREFETCH) {
fetch_next_batch_of_item_ids_from_peer(originating_peer); // we have finished fetching a batch of items, so we either need to grab another batch of items
else // or we need to get another list of item ids.
trigger_fetch_sync_items_loop(); if (originating_peer->number_of_unfetched_item_ids > 0 &&
originating_peer->ids_of_items_to_get.size() < GRAPHENE_NET_MIN_BLOCK_IDS_TO_PREFETCH)
fetch_next_batch_of_item_ids_from_peer(originating_peer);
else
trigger_fetch_sync_items_loop();
}
return;
}
catch (const fc::canceled_exception& e)
{
throw;
}
catch (const fc::exception& e)
{
elog("Caught unexpected exception: ${e}", ("e", e));
assert(false && "exceptions not expected here");
}
catch (const std::exception& e)
{
elog("Caught unexpected exception: ${e}", ("e", e.what()));
assert(false && "exceptions not expected here");
}
catch (...)
{
elog("Caught unexpected exception, could break sync operation");
} }
return;
} }
} }

View file

@ -29,6 +29,8 @@
#include <fc/thread/thread.hpp> #include <fc/thread/thread.hpp>
#include <boost/scope_exit.hpp>
#ifdef DEFAULT_LOGGER #ifdef DEFAULT_LOGGER
# undef DEFAULT_LOGGER # undef DEFAULT_LOGGER
#endif #endif
@ -86,11 +88,12 @@ namespace graphene { namespace net
inhibit_fetching_sync_blocks(false), inhibit_fetching_sync_blocks(false),
transaction_fetching_inhibited_until(fc::time_point::min()), transaction_fetching_inhibited_until(fc::time_point::min()),
last_known_fork_block_number(0), last_known_fork_block_number(0),
firewall_check_state(nullptr) firewall_check_state(nullptr),
#ifndef NDEBUG #ifndef NDEBUG
,_thread(&fc::thread::current()), _thread(&fc::thread::current()),
_send_message_queue_tasks_running(0) _send_message_queue_tasks_running(0),
#endif #endif
_currently_handling_message(false)
{ {
} }
@ -265,6 +268,10 @@ namespace graphene { namespace net
void peer_connection::on_message( message_oriented_connection* originating_connection, const message& received_message ) void peer_connection::on_message( message_oriented_connection* originating_connection, const message& received_message )
{ {
VERIFY_CORRECT_THREAD(); VERIFY_CORRECT_THREAD();
_currently_handling_message = true;
BOOST_SCOPE_EXIT(this_) {
this_->_currently_handling_message = false;
} BOOST_SCOPE_EXIT_END
_node->on_message( this, received_message ); _node->on_message( this, received_message );
} }
@ -438,18 +445,24 @@ namespace graphene { namespace net
_remote_endpoint = new_remote_endpoint; _remote_endpoint = new_remote_endpoint;
} }
bool peer_connection::busy() bool peer_connection::busy() const
{ {
VERIFY_CORRECT_THREAD(); VERIFY_CORRECT_THREAD();
return !items_requested_from_peer.empty() || !sync_items_requested_from_peer.empty() || item_ids_requested_from_peer; return !items_requested_from_peer.empty() || !sync_items_requested_from_peer.empty() || item_ids_requested_from_peer;
} }
bool peer_connection::idle() bool peer_connection::idle() const
{ {
VERIFY_CORRECT_THREAD(); VERIFY_CORRECT_THREAD();
return !busy(); return !busy();
} }
bool peer_connection::is_currently_handling_message() const
{
VERIFY_CORRECT_THREAD();
return _currently_handling_message;
}
bool peer_connection::is_transaction_fetching_inhibited() const bool peer_connection::is_transaction_fetching_inhibited() const
{ {
VERIFY_CORRECT_THREAD(); VERIFY_CORRECT_THREAD();