From 74bbde785b3d9c492007aced389df597b16cf124 Mon Sep 17 00:00:00 2001 From: Eric Frias Date: Tue, 29 Sep 2015 17:37:32 -0400 Subject: [PATCH] In the p2p networking code, request multiple transactions at a time to improve performance during flooding --- libraries/net/include/graphene/net/config.hpp | 18 ++++- libraries/net/node.cpp | 80 ++++++++++++++----- tests/tests/block_tests.cpp | 10 +-- 3 files changed, 83 insertions(+), 25 deletions(-) diff --git a/libraries/net/include/graphene/net/config.hpp b/libraries/net/include/graphene/net/config.hpp index 5273be0f..cf5cacb4 100644 --- a/libraries/net/include/graphene/net/config.hpp +++ b/libraries/net/include/graphene/net/config.hpp @@ -56,8 +56,11 @@ * our peers and save a copy in a cache were we will find it if * a peer requests it. We expire out old items out of the cache * after this number of blocks go by. + * + * Recently lowered from 30 to match the default expiration time + * the web wallet imposes on transactions. */ -#define GRAPHENE_NET_MESSAGE_CACHE_DURATION_IN_BLOCKS 30 +#define GRAPHENE_NET_MESSAGE_CACHE_DURATION_IN_BLOCKS 5 /** * We prevent a peer from offering us a list of blocks which, if we fetched them @@ -74,6 +77,19 @@ #define GRAPHENE_NET_MAX_BLOCKS_PER_PEER_DURING_SYNCING 100 +/** + * During normal operation, how many items will be fetched from each + * peer at a time. This will only come into play when the network + * is being flooded -- typically transactions will be fetched as soon + * as we find out about them, so only one item will be requested + * at a time. + * + * No tests have been done to find the optimal value for this + * parameter, so consider increasing or decreasing it if performance + * during flooding is lacking. + */ +#define GRAPHENE_NET_MAX_ITEMS_PER_PEER_DURING_NORMAL_OPERATION 50 + /** * Instead of fetching all item IDs from a peer, then fetching all blocks * from a peer, we will interleave them. Fetch at least this many block IDs, diff --git a/libraries/net/node.cpp b/libraries/net/node.cpp index fc49f0f3..a06584b3 100644 --- a/libraries/net/node.cpp +++ b/libraries/net/node.cpp @@ -1108,39 +1108,81 @@ namespace graphene { namespace net { namespace detail { fc::time_point next_peer_unblocked_time = fc::time_point::maximum(); - std::forward_list > fetch_messages_to_send; - std::vector > write_ops; - for (auto iter = _items_to_fetch.begin(); iter != _items_to_fetch.end();) + // we need to construct a list of items to request from each peer first, + // then send the messages (in two steps, to avoid yielding while iterating) + // we want to evenly distribute our requests among our peers. + struct requested_item_count_index {}; + struct peer_and_items_to_fetch { + peer_connection_ptr peer; + std::vector item_ids; + peer_and_items_to_fetch(const peer_connection_ptr& peer) : peer(peer) {} + bool operator<(const peer_and_items_to_fetch& rhs) const { return peer < rhs.peer; } + size_t number_of_items() const { return item_ids.size(); } + }; + typedef boost::multi_index_container >, + boost::multi_index::ordered_non_unique, + boost::multi_index::const_mem_fun > > > fetch_messages_to_send_set; + fetch_messages_to_send_set items_by_peer; + + // initialize the fetch_messages_to_send with an empty set of items for all idle peers + for (const peer_connection_ptr& peer : _active_connections) + if (peer->idle()) + items_by_peer.insert(peer_and_items_to_fetch(peer)); + + // now loop over all items we want to fetch + for (auto item_iter = _items_to_fetch.begin(); item_iter != _items_to_fetch.end();) + { + // and find a peer that has it, we'll use the one who has the least requests going to it to load balance bool item_fetched = false; - for (const peer_connection_ptr& peer : _active_connections) + for (auto peer_iter = items_by_peer.get().begin(); peer_iter != items_by_peer.get().end(); ++peer_iter) { - if (peer->idle() && - peer->inventory_peer_advertised_to_us.find(iter->item) != peer->inventory_peer_advertised_to_us.end()) + const peer_connection_ptr& peer = peer_iter->peer; + // if they have the item and we haven't already decided to ask them for too many other items + if (peer_iter->item_ids.size() < GRAPHENE_NET_MAX_ITEMS_PER_PEER_DURING_NORMAL_OPERATION && + peer->inventory_peer_advertised_to_us.find(item_iter->item) != peer->inventory_peer_advertised_to_us.end()) { - if (peer->is_transaction_fetching_inhibited() && iter->item.item_type == graphene::net::trx_message_type) + if (item_iter->item.item_type == graphene::net::trx_message_type && peer->is_transaction_fetching_inhibited()) next_peer_unblocked_time = std::min(peer->transaction_fetching_inhibited_until, next_peer_unblocked_time); else { - dlog("requesting item ${hash} from peer ${endpoint}", - ("hash", iter->item.item_hash)("endpoint", peer->get_remote_endpoint())); - peer->items_requested_from_peer.insert(peer_connection::item_to_time_map_type::value_type(iter->item, fc::time_point::now())); - item_id item_id_to_fetch = iter->item; - iter = _items_to_fetch.erase(iter); + //dlog("requesting item ${hash} from peer ${endpoint}", + // ("hash", iter->item.item_hash)("endpoint", peer->get_remote_endpoint())); + item_id item_id_to_fetch = item_iter->item; + peer->items_requested_from_peer.insert(peer_connection::item_to_time_map_type::value_type(item_id_to_fetch, fc::time_point::now())); + item_iter = _items_to_fetch.erase(item_iter); item_fetched = true; - fetch_messages_to_send.emplace_front(std::make_pair(peer, item_id_to_fetch)); + items_by_peer.get().modify(peer_iter, [&item_id_to_fetch](peer_and_items_to_fetch& peer_and_items) { + peer_and_items.item_ids.push_back(item_id_to_fetch); + }); break; } - } + } } if (!item_fetched) - ++iter; + ++item_iter; } - for (const auto& peer_and_item : fetch_messages_to_send) - peer_and_item.first->send_message(fetch_items_message(peer_and_item.second.item_type, - std::vector{peer_and_item.second.item_hash})); - fetch_messages_to_send.clear(); + // we've figured out which peer will be providing each item, now send the messages. + for (const peer_and_items_to_fetch& peer_and_items : items_by_peer) + { + // the item lists are heterogenous and + // the fetch_items_message can only deal with one item type at a time. + std::map > items_to_fetch_by_type; + for (const item_id& item : peer_and_items.item_ids) + items_to_fetch_by_type[item.item_type].push_back(item.item_hash); + for (auto& items_by_type : items_to_fetch_by_type) + { + dlog("requesting ${count} items of type ${type} from peer ${endpoint}: ${hashes}", + ("count", items_by_type.second.size())("type", (uint32_t)items_by_type.first) + ("endpoint", peer_and_items.peer->get_remote_endpoint()) + ("hashes", items_by_type.second)); + peer_and_items.peer->send_message(fetch_items_message(items_by_type.first, + items_by_type.second)); + } + } + items_by_peer.clear(); if (!_items_to_fetch_updated) { diff --git a/tests/tests/block_tests.cpp b/tests/tests/block_tests.cpp index f3755452..0ef0eeaf 100644 --- a/tests/tests/block_tests.cpp +++ b/tests/tests/block_tests.cpp @@ -1035,7 +1035,7 @@ BOOST_FIXTURE_TEST_CASE( transaction_invalidated_in_cache, database_fixture ) { ACTORS( (alice)(bob) ); - auto generate_block = [&]( database& d, uint32_t skip = database::skip_nothing ) -> signed_block + auto generate_block = [&]( database& d, uint32_t skip ) -> signed_block { return d.generate_block(d.get_slot_time(1), d.get_scheduled_witness(1), init_account_priv_key, skip); }; @@ -1058,7 +1058,7 @@ BOOST_FIXTURE_TEST_CASE( transaction_invalidated_in_cache, database_fixture ) BOOST_CHECK( db2.get( alice_id ).name == "alice" ); BOOST_CHECK( db2.get( bob_id ).name == "bob" ); - db2.push_block(generate_block(db)); + db2.push_block(generate_block(db, database::skip_nothing)); transfer( account_id_type(), alice_id, asset( 1000 ) ); transfer( account_id_type(), bob_id, asset( 1000 ) ); // need to skip authority check here as well for same reason as above @@ -1073,7 +1073,7 @@ BOOST_FIXTURE_TEST_CASE( transaction_invalidated_in_cache, database_fixture ) { for( int i=0; i