When the p2p code pushes a transaction that fails to apply, remember that transaction so we don't fetch it again as soon as a different peer tells us about it.
This commit is contained in:
parent
d92736617e
commit
405a9e71b2
1 changed files with 28 additions and 6 deletions
|
|
@ -462,6 +462,7 @@ namespace graphene { namespace net { namespace detail {
|
||||||
> items_to_fetch_set_type;
|
> items_to_fetch_set_type;
|
||||||
unsigned _items_to_fetch_sequence_counter;
|
unsigned _items_to_fetch_sequence_counter;
|
||||||
items_to_fetch_set_type _items_to_fetch; /// list of items we know another peer has and we want
|
items_to_fetch_set_type _items_to_fetch; /// list of items we know another peer has and we want
|
||||||
|
peer_connection::timestamped_items_set_type _recently_failed_items; /// list of transactions we've recently pushed and had rejected by the delegate
|
||||||
// @}
|
// @}
|
||||||
|
|
||||||
/// used by the task that advertises inventory during normal operation
|
/// used by the task that advertises inventory during normal operation
|
||||||
|
|
@ -1454,6 +1455,13 @@ namespace graphene { namespace net { namespace detail {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// this has nothing to do with updating the peer list, but we need to prune this list
|
||||||
|
// at regular intervals, this is a fine place to do it.
|
||||||
|
fc::time_point_sec oldest_failed_ids_to_keep(fc::time_point::now() - fc::minutes(15));
|
||||||
|
auto oldest_failed_ids_to_keep_iter = _recently_failed_items.get<peer_connection::timestamp_index>().lower_bound(oldest_failed_ids_to_keep);
|
||||||
|
auto begin_iter = _recently_failed_items.get<peer_connection::timestamp_index>().begin();
|
||||||
|
_recently_failed_items.get<peer_connection::timestamp_index>().erase(begin_iter, oldest_failed_ids_to_keep_iter);
|
||||||
|
|
||||||
if (!_node_is_shutting_down && !_fetch_updated_peer_lists_loop_done.canceled() )
|
if (!_node_is_shutting_down && !_fetch_updated_peer_lists_loop_done.canceled() )
|
||||||
_fetch_updated_peer_lists_loop_done = fc::schedule( [this](){ fetch_updated_peer_lists_loop(); },
|
_fetch_updated_peer_lists_loop_done = fc::schedule( [this](){ fetch_updated_peer_lists_loop(); },
|
||||||
fc::time_point::now() + fc::minutes(15),
|
fc::time_point::now() + fc::minutes(15),
|
||||||
|
|
@ -2786,12 +2794,20 @@ namespace graphene { namespace net { namespace detail {
|
||||||
originating_peer->inventory_peer_advertised_to_us.insert(peer_connection::timestamped_item_id(advertised_item_id, fc::time_point::now()));
|
originating_peer->inventory_peer_advertised_to_us.insert(peer_connection::timestamped_item_id(advertised_item_id, fc::time_point::now()));
|
||||||
if (!we_requested_this_item_from_a_peer)
|
if (!we_requested_this_item_from_a_peer)
|
||||||
{
|
{
|
||||||
auto insert_result = _items_to_fetch.insert(prioritized_item_id(advertised_item_id, _items_to_fetch_sequence_counter++));
|
if (_recently_failed_items.find(item_id(item_ids_inventory_message_received.item_type, item_hash)) != _recently_failed_items.end())
|
||||||
if (insert_result.second)
|
|
||||||
{
|
{
|
||||||
dlog("adding item ${item_hash} from inventory message to our list of items to fetch",
|
dlog("not adding ${item_hash} to our list of items to fetch because we've recently fetched a copy and it failed to push",
|
||||||
("item_hash", item_hash));
|
("item_hash", item_hash));
|
||||||
trigger_fetch_items_loop();
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
auto insert_result = _items_to_fetch.insert(prioritized_item_id(advertised_item_id, _items_to_fetch_sequence_counter++));
|
||||||
|
if (insert_result.second)
|
||||||
|
{
|
||||||
|
dlog("adding item ${item_hash} from inventory message to our list of items to fetch",
|
||||||
|
("item_hash", item_hash));
|
||||||
|
trigger_fetch_items_loop();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -3696,7 +3712,7 @@ namespace graphene { namespace net { namespace detail {
|
||||||
fc::time_point message_receive_time = fc::time_point::now();
|
fc::time_point message_receive_time = fc::time_point::now();
|
||||||
|
|
||||||
// only process it if we asked for it
|
// only process it if we asked for it
|
||||||
auto iter = originating_peer->items_requested_from_peer.find( item_id(message_to_process.msg_type, message_hash ) );
|
auto iter = originating_peer->items_requested_from_peer.find( item_id(message_to_process.msg_type, message_hash) );
|
||||||
if( iter == originating_peer->items_requested_from_peer.end() )
|
if( iter == originating_peer->items_requested_from_peer.end() )
|
||||||
{
|
{
|
||||||
wlog( "received a message I didn't ask for from peer ${endpoint}, disconnecting from peer",
|
wlog( "received a message I didn't ask for from peer ${endpoint}, disconnecting from peer",
|
||||||
|
|
@ -3717,7 +3733,11 @@ namespace graphene { namespace net { namespace detail {
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
if (message_to_process.msg_type == trx_message_type)
|
if (message_to_process.msg_type == trx_message_type)
|
||||||
_delegate->handle_transaction( message_to_process.as<trx_message>() );
|
{
|
||||||
|
trx_message transaction_message_to_process = message_to_process.as<trx_message>();
|
||||||
|
dlog("passing message containing transaction ${trx} to client", ("trx", transaction_message_to_process.trx.id()));
|
||||||
|
_delegate->handle_transaction(transaction_message_to_process);
|
||||||
|
}
|
||||||
else
|
else
|
||||||
_delegate->handle_message( message_to_process );
|
_delegate->handle_message( message_to_process );
|
||||||
message_validated_time = fc::time_point::now();
|
message_validated_time = fc::time_point::now();
|
||||||
|
|
@ -3737,6 +3757,8 @@ namespace graphene { namespace net { namespace detail {
|
||||||
catch ( const fc::exception& e )
|
catch ( const fc::exception& e )
|
||||||
{
|
{
|
||||||
wlog( "client rejected message sent by peer ${peer}, ${e}", ("peer", originating_peer->get_remote_endpoint() )("e", e) );
|
wlog( "client rejected message sent by peer ${peer}, ${e}", ("peer", originating_peer->get_remote_endpoint() )("e", e) );
|
||||||
|
// record it so we don't try to fetch this item again
|
||||||
|
_recently_failed_items.insert(peer_connection::timestamped_item_id(item_id(message_to_process.msg_type, message_hash ), fc::time_point::now()));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue