Remove items from p2p's list of items to fetch when we expect them to become unfetchable
This commit is contained in:
parent
70fefe4f5f
commit
7bc7004ef9
1 changed files with 57 additions and 31 deletions
|
|
@ -250,9 +250,12 @@ namespace graphene { namespace net { namespace detail {
|
|||
{
|
||||
item_id item;
|
||||
unsigned sequence_number;
|
||||
fc::time_point timestamp; // the time we last heard about this item in an inventory message
|
||||
|
||||
prioritized_item_id(const item_id& item, unsigned sequence_number) :
|
||||
item(item),
|
||||
sequence_number(sequence_number)
|
||||
sequence_number(sequence_number),
|
||||
timestamp(fc::time_point::now())
|
||||
{}
|
||||
bool operator<(const prioritized_item_id& rhs) const
|
||||
{
|
||||
|
|
@ -473,6 +476,7 @@ namespace graphene { namespace net { namespace detail {
|
|||
// @}
|
||||
|
||||
fc::future<void> _terminate_inactive_connections_loop_done;
|
||||
uint8_t _recent_block_interval_in_seconds; // a cached copy of the block interval, to avoid a thread hop to the blockchain to get the current value
|
||||
|
||||
std::string _user_agent_string;
|
||||
/** _node_public_key is a key automatically generated when the client is first run, stored in
|
||||
|
|
@ -793,6 +797,7 @@ namespace graphene { namespace net { namespace detail {
|
|||
_suspend_fetching_sync_blocks(false),
|
||||
_items_to_fetch_updated(false),
|
||||
_items_to_fetch_sequence_counter(0),
|
||||
_recent_block_interval_in_seconds(GRAPHENE_MAX_BLOCK_INTERVAL),
|
||||
_user_agent_string(user_agent),
|
||||
_desired_number_of_connections(GRAPHENE_NET_DEFAULT_DESIRED_CONNECTIONS),
|
||||
_maximum_number_of_connections(GRAPHENE_NET_DEFAULT_MAX_CONNECTIONS),
|
||||
|
|
@ -1106,6 +1111,7 @@ namespace graphene { namespace net { namespace detail {
|
|||
dlog("beginning an iteration of fetch items (${count} items to fetch)",
|
||||
("count", _items_to_fetch.size()));
|
||||
|
||||
fc::time_point oldest_timestamp_to_fetch = fc::time_point::now() - fc::seconds(_recent_block_interval_in_seconds * GRAPHENE_NET_MESSAGE_CACHE_DURATION_IN_BLOCKS);
|
||||
fc::time_point next_peer_unblocked_time = fc::time_point::maximum();
|
||||
|
||||
// we need to construct a list of items to request from each peer first,
|
||||
|
|
@ -1134,34 +1140,45 @@ namespace graphene { namespace net { namespace detail {
|
|||
// now loop over all items we want to fetch
|
||||
for (auto item_iter = _items_to_fetch.begin(); item_iter != _items_to_fetch.end();)
|
||||
{
|
||||
// and find a peer that has it, we'll use the one who has the least requests going to it to load balance
|
||||
bool item_fetched = false;
|
||||
for (auto peer_iter = items_by_peer.get<requested_item_count_index>().begin(); peer_iter != items_by_peer.get<requested_item_count_index>().end(); ++peer_iter)
|
||||
if (item_iter->timestamp < oldest_timestamp_to_fetch)
|
||||
{
|
||||
const peer_connection_ptr& peer = peer_iter->peer;
|
||||
// if they have the item and we haven't already decided to ask them for too many other items
|
||||
if (peer_iter->item_ids.size() < GRAPHENE_NET_MAX_ITEMS_PER_PEER_DURING_NORMAL_OPERATION &&
|
||||
peer->inventory_peer_advertised_to_us.find(item_iter->item) != peer->inventory_peer_advertised_to_us.end())
|
||||
{
|
||||
if (item_iter->item.item_type == graphene::net::trx_message_type && peer->is_transaction_fetching_inhibited())
|
||||
next_peer_unblocked_time = std::min(peer->transaction_fetching_inhibited_until, next_peer_unblocked_time);
|
||||
else
|
||||
{
|
||||
//dlog("requesting item ${hash} from peer ${endpoint}",
|
||||
// ("hash", iter->item.item_hash)("endpoint", peer->get_remote_endpoint()));
|
||||
item_id item_id_to_fetch = item_iter->item;
|
||||
peer->items_requested_from_peer.insert(peer_connection::item_to_time_map_type::value_type(item_id_to_fetch, fc::time_point::now()));
|
||||
item_iter = _items_to_fetch.erase(item_iter);
|
||||
item_fetched = true;
|
||||
items_by_peer.get<requested_item_count_index>().modify(peer_iter, [&item_id_to_fetch](peer_and_items_to_fetch& peer_and_items) {
|
||||
peer_and_items.item_ids.push_back(item_id_to_fetch);
|
||||
});
|
||||
break;
|
||||
}
|
||||
}
|
||||
// this item has probably already fallen out of our peers' caches, we'll just ignore it.
|
||||
// this can happen during flooding, and the _items_to_fetch could otherwise get clogged
|
||||
// with a bunch of items that we'll never be able to request from any peer
|
||||
wlog("Unable to fetch item ${item} before its likely expiration time, removing it from our list of items to fetch", ("item", item_iter->item));
|
||||
item_iter = _items_to_fetch.erase(item_iter);
|
||||
}
|
||||
else
|
||||
{
|
||||
// find a peer that has it, we'll use the one who has the least requests going to it to load balance
|
||||
bool item_fetched = false;
|
||||
for (auto peer_iter = items_by_peer.get<requested_item_count_index>().begin(); peer_iter != items_by_peer.get<requested_item_count_index>().end(); ++peer_iter)
|
||||
{
|
||||
const peer_connection_ptr& peer = peer_iter->peer;
|
||||
// if they have the item and we haven't already decided to ask them for too many other items
|
||||
if (peer_iter->item_ids.size() < GRAPHENE_NET_MAX_ITEMS_PER_PEER_DURING_NORMAL_OPERATION &&
|
||||
peer->inventory_peer_advertised_to_us.find(item_iter->item) != peer->inventory_peer_advertised_to_us.end())
|
||||
{
|
||||
if (item_iter->item.item_type == graphene::net::trx_message_type && peer->is_transaction_fetching_inhibited())
|
||||
next_peer_unblocked_time = std::min(peer->transaction_fetching_inhibited_until, next_peer_unblocked_time);
|
||||
else
|
||||
{
|
||||
//dlog("requesting item ${hash} from peer ${endpoint}",
|
||||
// ("hash", iter->item.item_hash)("endpoint", peer->get_remote_endpoint()));
|
||||
item_id item_id_to_fetch = item_iter->item;
|
||||
peer->items_requested_from_peer.insert(peer_connection::item_to_time_map_type::value_type(item_id_to_fetch, fc::time_point::now()));
|
||||
item_iter = _items_to_fetch.erase(item_iter);
|
||||
item_fetched = true;
|
||||
items_by_peer.get<requested_item_count_index>().modify(peer_iter, [&item_id_to_fetch](peer_and_items_to_fetch& peer_and_items) {
|
||||
peer_and_items.item_ids.push_back(item_id_to_fetch);
|
||||
});
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!item_fetched)
|
||||
++item_iter;
|
||||
}
|
||||
if (!item_fetched)
|
||||
++item_iter;
|
||||
}
|
||||
|
||||
// we've figured out which peer will be providing each item, now send the messages.
|
||||
|
|
@ -1295,7 +1312,7 @@ namespace graphene { namespace net { namespace detail {
|
|||
std::list<peer_connection_ptr> peers_to_send_keep_alive;
|
||||
std::list<peer_connection_ptr> peers_to_terminate;
|
||||
|
||||
uint8_t current_block_interval_in_seconds = _delegate->get_current_block_interval_in_seconds();
|
||||
_recent_block_interval_in_seconds = _delegate->get_current_block_interval_in_seconds();
|
||||
|
||||
// Disconnect peers that haven't sent us any data recently
|
||||
// These numbers are just guesses and we need to think through how this works better.
|
||||
|
|
@ -1334,7 +1351,7 @@ namespace graphene { namespace net { namespace detail {
|
|||
}
|
||||
|
||||
// timeout for any active peers is two block intervals
|
||||
uint32_t active_disconnect_timeout = 10 * current_block_interval_in_seconds;
|
||||
uint32_t active_disconnect_timeout = 10 * _recent_block_interval_in_seconds;
|
||||
uint32_t active_send_keepalive_timeout = active_disconnect_timeout / 2;
|
||||
|
||||
// set the ignored request time out to 1 second. When we request a block
|
||||
|
|
@ -2843,13 +2860,22 @@ namespace graphene { namespace net { namespace detail {
|
|||
}
|
||||
else
|
||||
{
|
||||
auto insert_result = _items_to_fetch.insert(prioritized_item_id(advertised_item_id, _items_to_fetch_sequence_counter++));
|
||||
if (insert_result.second)
|
||||
auto items_to_fetch_iter = _items_to_fetch.get<item_id_index>().find(advertised_item_id);
|
||||
if (items_to_fetch_iter == _items_to_fetch.get<item_id_index>().end())
|
||||
{
|
||||
// it's new to us
|
||||
_items_to_fetch.insert(prioritized_item_id(advertised_item_id, _items_to_fetch_sequence_counter++));
|
||||
dlog("adding item ${item_hash} from inventory message to our list of items to fetch",
|
||||
("item_hash", item_hash));
|
||||
trigger_fetch_items_loop();
|
||||
}
|
||||
else
|
||||
{
|
||||
// another peer has told us about this item already, but this peer just told us it has the item
|
||||
// too, we can expect it to be around in this peer's cache for longer, so update its timestamp
|
||||
_items_to_fetch.get<item_id_index>().modify(items_to_fetch_iter,
|
||||
[](prioritized_item_id& item) { item.timestamp = fc::time_point::now(); });
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue