In the p2p networking code, request multiple transactions at a time to improve performance during flooding
This commit is contained in:
parent
5885c243fb
commit
74bbde785b
3 changed files with 83 additions and 25 deletions
|
|
@ -56,8 +56,11 @@
|
||||||
* our peers and save a copy in a cache were we will find it if
|
* our peers and save a copy in a cache were we will find it if
|
||||||
* a peer requests it. We expire out old items out of the cache
|
* a peer requests it. We expire out old items out of the cache
|
||||||
* after this number of blocks go by.
|
* after this number of blocks go by.
|
||||||
|
*
|
||||||
|
* Recently lowered from 30 to match the default expiration time
|
||||||
|
* the web wallet imposes on transactions.
|
||||||
*/
|
*/
|
||||||
#define GRAPHENE_NET_MESSAGE_CACHE_DURATION_IN_BLOCKS 30
|
#define GRAPHENE_NET_MESSAGE_CACHE_DURATION_IN_BLOCKS 5
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* We prevent a peer from offering us a list of blocks which, if we fetched them
|
* We prevent a peer from offering us a list of blocks which, if we fetched them
|
||||||
|
|
@ -74,6 +77,19 @@
|
||||||
|
|
||||||
#define GRAPHENE_NET_MAX_BLOCKS_PER_PEER_DURING_SYNCING 100
|
#define GRAPHENE_NET_MAX_BLOCKS_PER_PEER_DURING_SYNCING 100
|
||||||
|
|
||||||
|
/**
|
||||||
|
* During normal operation, how many items will be fetched from each
|
||||||
|
* peer at a time. This will only come into play when the network
|
||||||
|
* is being flooded -- typically transactions will be fetched as soon
|
||||||
|
* as we find out about them, so only one item will be requested
|
||||||
|
* at a time.
|
||||||
|
*
|
||||||
|
* No tests have been done to find the optimal value for this
|
||||||
|
* parameter, so consider increasing or decreasing it if performance
|
||||||
|
* during flooding is lacking.
|
||||||
|
*/
|
||||||
|
#define GRAPHENE_NET_MAX_ITEMS_PER_PEER_DURING_NORMAL_OPERATION 50
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Instead of fetching all item IDs from a peer, then fetching all blocks
|
* Instead of fetching all item IDs from a peer, then fetching all blocks
|
||||||
* from a peer, we will interleave them. Fetch at least this many block IDs,
|
* from a peer, we will interleave them. Fetch at least this many block IDs,
|
||||||
|
|
|
||||||
|
|
@ -1108,39 +1108,81 @@ namespace graphene { namespace net { namespace detail {
|
||||||
|
|
||||||
fc::time_point next_peer_unblocked_time = fc::time_point::maximum();
|
fc::time_point next_peer_unblocked_time = fc::time_point::maximum();
|
||||||
|
|
||||||
std::forward_list<std::pair<peer_connection_ptr, item_id> > fetch_messages_to_send;
|
// we need to construct a list of items to request from each peer first,
|
||||||
std::vector<fc::future<void> > write_ops;
|
// then send the messages (in two steps, to avoid yielding while iterating)
|
||||||
for (auto iter = _items_to_fetch.begin(); iter != _items_to_fetch.end();)
|
// we want to evenly distribute our requests among our peers.
|
||||||
|
struct requested_item_count_index {};
|
||||||
|
struct peer_and_items_to_fetch
|
||||||
{
|
{
|
||||||
|
peer_connection_ptr peer;
|
||||||
|
std::vector<item_id> item_ids;
|
||||||
|
peer_and_items_to_fetch(const peer_connection_ptr& peer) : peer(peer) {}
|
||||||
|
bool operator<(const peer_and_items_to_fetch& rhs) const { return peer < rhs.peer; }
|
||||||
|
size_t number_of_items() const { return item_ids.size(); }
|
||||||
|
};
|
||||||
|
typedef boost::multi_index_container<peer_and_items_to_fetch,
|
||||||
|
boost::multi_index::indexed_by<boost::multi_index::ordered_unique<boost::multi_index::member<peer_and_items_to_fetch, peer_connection_ptr, &peer_and_items_to_fetch::peer> >,
|
||||||
|
boost::multi_index::ordered_non_unique<boost::multi_index::tag<requested_item_count_index>,
|
||||||
|
boost::multi_index::const_mem_fun<peer_and_items_to_fetch, size_t, &peer_and_items_to_fetch::number_of_items> > > > fetch_messages_to_send_set;
|
||||||
|
fetch_messages_to_send_set items_by_peer;
|
||||||
|
|
||||||
|
// initialize the fetch_messages_to_send with an empty set of items for all idle peers
|
||||||
|
for (const peer_connection_ptr& peer : _active_connections)
|
||||||
|
if (peer->idle())
|
||||||
|
items_by_peer.insert(peer_and_items_to_fetch(peer));
|
||||||
|
|
||||||
|
// now loop over all items we want to fetch
|
||||||
|
for (auto item_iter = _items_to_fetch.begin(); item_iter != _items_to_fetch.end();)
|
||||||
|
{
|
||||||
|
// and find a peer that has it, we'll use the one who has the least requests going to it to load balance
|
||||||
bool item_fetched = false;
|
bool item_fetched = false;
|
||||||
for (const peer_connection_ptr& peer : _active_connections)
|
for (auto peer_iter = items_by_peer.get<requested_item_count_index>().begin(); peer_iter != items_by_peer.get<requested_item_count_index>().end(); ++peer_iter)
|
||||||
{
|
{
|
||||||
if (peer->idle() &&
|
const peer_connection_ptr& peer = peer_iter->peer;
|
||||||
peer->inventory_peer_advertised_to_us.find(iter->item) != peer->inventory_peer_advertised_to_us.end())
|
// if they have the item and we haven't already decided to ask them for too many other items
|
||||||
|
if (peer_iter->item_ids.size() < GRAPHENE_NET_MAX_ITEMS_PER_PEER_DURING_NORMAL_OPERATION &&
|
||||||
|
peer->inventory_peer_advertised_to_us.find(item_iter->item) != peer->inventory_peer_advertised_to_us.end())
|
||||||
{
|
{
|
||||||
if (peer->is_transaction_fetching_inhibited() && iter->item.item_type == graphene::net::trx_message_type)
|
if (item_iter->item.item_type == graphene::net::trx_message_type && peer->is_transaction_fetching_inhibited())
|
||||||
next_peer_unblocked_time = std::min(peer->transaction_fetching_inhibited_until, next_peer_unblocked_time);
|
next_peer_unblocked_time = std::min(peer->transaction_fetching_inhibited_until, next_peer_unblocked_time);
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
dlog("requesting item ${hash} from peer ${endpoint}",
|
//dlog("requesting item ${hash} from peer ${endpoint}",
|
||||||
("hash", iter->item.item_hash)("endpoint", peer->get_remote_endpoint()));
|
// ("hash", iter->item.item_hash)("endpoint", peer->get_remote_endpoint()));
|
||||||
peer->items_requested_from_peer.insert(peer_connection::item_to_time_map_type::value_type(iter->item, fc::time_point::now()));
|
item_id item_id_to_fetch = item_iter->item;
|
||||||
item_id item_id_to_fetch = iter->item;
|
peer->items_requested_from_peer.insert(peer_connection::item_to_time_map_type::value_type(item_id_to_fetch, fc::time_point::now()));
|
||||||
iter = _items_to_fetch.erase(iter);
|
item_iter = _items_to_fetch.erase(item_iter);
|
||||||
item_fetched = true;
|
item_fetched = true;
|
||||||
fetch_messages_to_send.emplace_front(std::make_pair(peer, item_id_to_fetch));
|
items_by_peer.get<requested_item_count_index>().modify(peer_iter, [&item_id_to_fetch](peer_and_items_to_fetch& peer_and_items) {
|
||||||
|
peer_and_items.item_ids.push_back(item_id_to_fetch);
|
||||||
|
});
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (!item_fetched)
|
if (!item_fetched)
|
||||||
++iter;
|
++item_iter;
|
||||||
}
|
}
|
||||||
|
|
||||||
for (const auto& peer_and_item : fetch_messages_to_send)
|
// we've figured out which peer will be providing each item, now send the messages.
|
||||||
peer_and_item.first->send_message(fetch_items_message(peer_and_item.second.item_type,
|
for (const peer_and_items_to_fetch& peer_and_items : items_by_peer)
|
||||||
std::vector<item_hash_t>{peer_and_item.second.item_hash}));
|
{
|
||||||
fetch_messages_to_send.clear();
|
// the item lists are heterogenous and
|
||||||
|
// the fetch_items_message can only deal with one item type at a time.
|
||||||
|
std::map<uint32_t, std::vector<item_hash_t> > items_to_fetch_by_type;
|
||||||
|
for (const item_id& item : peer_and_items.item_ids)
|
||||||
|
items_to_fetch_by_type[item.item_type].push_back(item.item_hash);
|
||||||
|
for (auto& items_by_type : items_to_fetch_by_type)
|
||||||
|
{
|
||||||
|
dlog("requesting ${count} items of type ${type} from peer ${endpoint}: ${hashes}",
|
||||||
|
("count", items_by_type.second.size())("type", (uint32_t)items_by_type.first)
|
||||||
|
("endpoint", peer_and_items.peer->get_remote_endpoint())
|
||||||
|
("hashes", items_by_type.second));
|
||||||
|
peer_and_items.peer->send_message(fetch_items_message(items_by_type.first,
|
||||||
|
items_by_type.second));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
items_by_peer.clear();
|
||||||
|
|
||||||
if (!_items_to_fetch_updated)
|
if (!_items_to_fetch_updated)
|
||||||
{
|
{
|
||||||
|
|
|
||||||
|
|
@ -1035,7 +1035,7 @@ BOOST_FIXTURE_TEST_CASE( transaction_invalidated_in_cache, database_fixture )
|
||||||
{
|
{
|
||||||
ACTORS( (alice)(bob) );
|
ACTORS( (alice)(bob) );
|
||||||
|
|
||||||
auto generate_block = [&]( database& d, uint32_t skip = database::skip_nothing ) -> signed_block
|
auto generate_block = [&]( database& d, uint32_t skip ) -> signed_block
|
||||||
{
|
{
|
||||||
return d.generate_block(d.get_slot_time(1), d.get_scheduled_witness(1), init_account_priv_key, skip);
|
return d.generate_block(d.get_slot_time(1), d.get_scheduled_witness(1), init_account_priv_key, skip);
|
||||||
};
|
};
|
||||||
|
|
@ -1058,7 +1058,7 @@ BOOST_FIXTURE_TEST_CASE( transaction_invalidated_in_cache, database_fixture )
|
||||||
BOOST_CHECK( db2.get( alice_id ).name == "alice" );
|
BOOST_CHECK( db2.get( alice_id ).name == "alice" );
|
||||||
BOOST_CHECK( db2.get( bob_id ).name == "bob" );
|
BOOST_CHECK( db2.get( bob_id ).name == "bob" );
|
||||||
|
|
||||||
db2.push_block(generate_block(db));
|
db2.push_block(generate_block(db, database::skip_nothing));
|
||||||
transfer( account_id_type(), alice_id, asset( 1000 ) );
|
transfer( account_id_type(), alice_id, asset( 1000 ) );
|
||||||
transfer( account_id_type(), bob_id, asset( 1000 ) );
|
transfer( account_id_type(), bob_id, asset( 1000 ) );
|
||||||
// need to skip authority check here as well for same reason as above
|
// need to skip authority check here as well for same reason as above
|
||||||
|
|
@ -1073,7 +1073,7 @@ BOOST_FIXTURE_TEST_CASE( transaction_invalidated_in_cache, database_fixture )
|
||||||
{
|
{
|
||||||
for( int i=0; i<n; i++ )
|
for( int i=0; i<n; i++ )
|
||||||
{
|
{
|
||||||
signed_block b = generate_block(db2);
|
signed_block b = generate_block(db2, database::skip_nothing);
|
||||||
PUSH_BLOCK( db, b );
|
PUSH_BLOCK( db, b );
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
@ -1112,7 +1112,7 @@ BOOST_FIXTURE_TEST_CASE( transaction_invalidated_in_cache, database_fixture )
|
||||||
BOOST_CHECK_EQUAL(db.get_balance( bob_id, asset_id_type()).amount.value, 1000);
|
BOOST_CHECK_EQUAL(db.get_balance( bob_id, asset_id_type()).amount.value, 1000);
|
||||||
|
|
||||||
// generate a block with db and ensure we don't somehow apply it
|
// generate a block with db and ensure we don't somehow apply it
|
||||||
PUSH_BLOCK(db2, generate_block(db));
|
PUSH_BLOCK(db2, generate_block(db, database::skip_nothing));
|
||||||
BOOST_CHECK_EQUAL(db.get_balance(alice_id, asset_id_type()).amount.value, 1000);
|
BOOST_CHECK_EQUAL(db.get_balance(alice_id, asset_id_type()).amount.value, 1000);
|
||||||
BOOST_CHECK_EQUAL(db.get_balance( bob_id, asset_id_type()).amount.value, 1000);
|
BOOST_CHECK_EQUAL(db.get_balance( bob_id, asset_id_type()).amount.value, 1000);
|
||||||
|
|
||||||
|
|
@ -1133,7 +1133,7 @@ BOOST_FIXTURE_TEST_CASE( transaction_invalidated_in_cache, database_fixture )
|
||||||
signed_transaction tx_b = generate_xfer_tx( alice_id, bob_id, 2000, 10 );
|
signed_transaction tx_b = generate_xfer_tx( alice_id, bob_id, 2000, 10 );
|
||||||
signed_transaction tx_c = generate_xfer_tx( alice_id, bob_id, 500, 10 );
|
signed_transaction tx_c = generate_xfer_tx( alice_id, bob_id, 500, 10 );
|
||||||
|
|
||||||
generate_block( db );
|
generate_block( db, database::skip_nothing );
|
||||||
|
|
||||||
PUSH_TX( db, tx_a );
|
PUSH_TX( db, tx_a );
|
||||||
BOOST_CHECK_EQUAL(db.get_balance(alice_id, asset_id_type()).amount.value, 2000);
|
BOOST_CHECK_EQUAL(db.get_balance(alice_id, asset_id_type()).amount.value, 2000);
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue