From 7bc7004ef92381d9c6ab5c11185fde2738aa03f0 Mon Sep 17 00:00:00 2001 From: Eric Frias Date: Thu, 8 Oct 2015 15:13:05 -0400 Subject: [PATCH] Remove items from p2p's list of items to fetch when we expect them to become unfetchable --- libraries/net/node.cpp | 88 +++++++++++++++++++++++++++--------------- 1 file changed, 57 insertions(+), 31 deletions(-) diff --git a/libraries/net/node.cpp b/libraries/net/node.cpp index 1eda204d..fead4042 100644 --- a/libraries/net/node.cpp +++ b/libraries/net/node.cpp @@ -250,9 +250,12 @@ namespace graphene { namespace net { namespace detail { { item_id item; unsigned sequence_number; + fc::time_point timestamp; // the time we last heard about this item in an inventory message + prioritized_item_id(const item_id& item, unsigned sequence_number) : item(item), - sequence_number(sequence_number) + sequence_number(sequence_number), + timestamp(fc::time_point::now()) {} bool operator<(const prioritized_item_id& rhs) const { @@ -473,6 +476,7 @@ namespace graphene { namespace net { namespace detail { // @} fc::future _terminate_inactive_connections_loop_done; + uint8_t _recent_block_interval_in_seconds; // a cached copy of the block interval, to avoid a thread hop to the blockchain to get the current value std::string _user_agent_string; /** _node_public_key is a key automatically generated when the client is first run, stored in @@ -793,6 +797,7 @@ namespace graphene { namespace net { namespace detail { _suspend_fetching_sync_blocks(false), _items_to_fetch_updated(false), _items_to_fetch_sequence_counter(0), + _recent_block_interval_in_seconds(GRAPHENE_MAX_BLOCK_INTERVAL), _user_agent_string(user_agent), _desired_number_of_connections(GRAPHENE_NET_DEFAULT_DESIRED_CONNECTIONS), _maximum_number_of_connections(GRAPHENE_NET_DEFAULT_MAX_CONNECTIONS), @@ -1106,6 +1111,7 @@ namespace graphene { namespace net { namespace detail { dlog("beginning an iteration of fetch items (${count} items to fetch)", ("count", _items_to_fetch.size())); + fc::time_point oldest_timestamp_to_fetch = fc::time_point::now() - fc::seconds(_recent_block_interval_in_seconds * GRAPHENE_NET_MESSAGE_CACHE_DURATION_IN_BLOCKS); fc::time_point next_peer_unblocked_time = fc::time_point::maximum(); // we need to construct a list of items to request from each peer first, @@ -1134,34 +1140,45 @@ namespace graphene { namespace net { namespace detail { // now loop over all items we want to fetch for (auto item_iter = _items_to_fetch.begin(); item_iter != _items_to_fetch.end();) { - // and find a peer that has it, we'll use the one who has the least requests going to it to load balance - bool item_fetched = false; - for (auto peer_iter = items_by_peer.get().begin(); peer_iter != items_by_peer.get().end(); ++peer_iter) + if (item_iter->timestamp < oldest_timestamp_to_fetch) { - const peer_connection_ptr& peer = peer_iter->peer; - // if they have the item and we haven't already decided to ask them for too many other items - if (peer_iter->item_ids.size() < GRAPHENE_NET_MAX_ITEMS_PER_PEER_DURING_NORMAL_OPERATION && - peer->inventory_peer_advertised_to_us.find(item_iter->item) != peer->inventory_peer_advertised_to_us.end()) - { - if (item_iter->item.item_type == graphene::net::trx_message_type && peer->is_transaction_fetching_inhibited()) - next_peer_unblocked_time = std::min(peer->transaction_fetching_inhibited_until, next_peer_unblocked_time); - else - { - //dlog("requesting item ${hash} from peer ${endpoint}", - // ("hash", iter->item.item_hash)("endpoint", peer->get_remote_endpoint())); - item_id item_id_to_fetch = item_iter->item; - peer->items_requested_from_peer.insert(peer_connection::item_to_time_map_type::value_type(item_id_to_fetch, fc::time_point::now())); - item_iter = _items_to_fetch.erase(item_iter); - item_fetched = true; - items_by_peer.get().modify(peer_iter, [&item_id_to_fetch](peer_and_items_to_fetch& peer_and_items) { - peer_and_items.item_ids.push_back(item_id_to_fetch); - }); - break; - } - } + // this item has probably already fallen out of our peers' caches, we'll just ignore it. + // this can happen during flooding, and the _items_to_fetch could otherwise get clogged + // with a bunch of items that we'll never be able to request from any peer + wlog("Unable to fetch item ${item} before its likely expiration time, removing it from our list of items to fetch", ("item", item_iter->item)); + item_iter = _items_to_fetch.erase(item_iter); + } + else + { + // find a peer that has it, we'll use the one who has the least requests going to it to load balance + bool item_fetched = false; + for (auto peer_iter = items_by_peer.get().begin(); peer_iter != items_by_peer.get().end(); ++peer_iter) + { + const peer_connection_ptr& peer = peer_iter->peer; + // if they have the item and we haven't already decided to ask them for too many other items + if (peer_iter->item_ids.size() < GRAPHENE_NET_MAX_ITEMS_PER_PEER_DURING_NORMAL_OPERATION && + peer->inventory_peer_advertised_to_us.find(item_iter->item) != peer->inventory_peer_advertised_to_us.end()) + { + if (item_iter->item.item_type == graphene::net::trx_message_type && peer->is_transaction_fetching_inhibited()) + next_peer_unblocked_time = std::min(peer->transaction_fetching_inhibited_until, next_peer_unblocked_time); + else + { + //dlog("requesting item ${hash} from peer ${endpoint}", + // ("hash", iter->item.item_hash)("endpoint", peer->get_remote_endpoint())); + item_id item_id_to_fetch = item_iter->item; + peer->items_requested_from_peer.insert(peer_connection::item_to_time_map_type::value_type(item_id_to_fetch, fc::time_point::now())); + item_iter = _items_to_fetch.erase(item_iter); + item_fetched = true; + items_by_peer.get().modify(peer_iter, [&item_id_to_fetch](peer_and_items_to_fetch& peer_and_items) { + peer_and_items.item_ids.push_back(item_id_to_fetch); + }); + break; + } + } + } + if (!item_fetched) + ++item_iter; } - if (!item_fetched) - ++item_iter; } // we've figured out which peer will be providing each item, now send the messages. @@ -1295,7 +1312,7 @@ namespace graphene { namespace net { namespace detail { std::list peers_to_send_keep_alive; std::list peers_to_terminate; - uint8_t current_block_interval_in_seconds = _delegate->get_current_block_interval_in_seconds(); + _recent_block_interval_in_seconds = _delegate->get_current_block_interval_in_seconds(); // Disconnect peers that haven't sent us any data recently // These numbers are just guesses and we need to think through how this works better. @@ -1334,7 +1351,7 @@ namespace graphene { namespace net { namespace detail { } // timeout for any active peers is two block intervals - uint32_t active_disconnect_timeout = 10 * current_block_interval_in_seconds; + uint32_t active_disconnect_timeout = 10 * _recent_block_interval_in_seconds; uint32_t active_send_keepalive_timeout = active_disconnect_timeout / 2; // set the ignored request time out to 1 second. When we request a block @@ -2843,13 +2860,22 @@ namespace graphene { namespace net { namespace detail { } else { - auto insert_result = _items_to_fetch.insert(prioritized_item_id(advertised_item_id, _items_to_fetch_sequence_counter++)); - if (insert_result.second) + auto items_to_fetch_iter = _items_to_fetch.get().find(advertised_item_id); + if (items_to_fetch_iter == _items_to_fetch.get().end()) { + // it's new to us + _items_to_fetch.insert(prioritized_item_id(advertised_item_id, _items_to_fetch_sequence_counter++)); dlog("adding item ${item_hash} from inventory message to our list of items to fetch", ("item_hash", item_hash)); trigger_fetch_items_loop(); } + else + { + // another peer has told us about this item already, but this peer just told us it has the item + // too, we can expect it to be around in this peer's cache for longer, so update its timestamp + _items_to_fetch.get().modify(items_to_fetch_iter, + [](prioritized_item_id& item) { item.timestamp = fc::time_point::now(); }); + } } } }