Merge branch 'bug/501-connection-pool' into 'develop'

#501 - concurrent_unordered_set for connection

See merge request PBSA/peerplays!212
This commit is contained in:
Vlad Dobromyslov 2023-02-27 13:34:41 +00:00
commit 5867a8ae27

View file

@ -128,6 +128,124 @@ namespace graphene { namespace net {
namespace detail
{
namespace bmi = boost::multi_index;
/*******
* A class to wrap std::unordered_set for multithreading
*/
template <class Key, class Hash = std::hash<Key>, class Pred = std::equal_to<Key> >
class concurrent_unordered_set : private std::unordered_set<Key, Hash, Pred>
{
private:
mutable fc::mutex mux;
public:
/// Iterations require a lock. This exposes the mutex. Use with care (i.e. lock_guard)
fc::mutex& get_mutex()const { return mux; }
/// Insertion
/// @{
std::pair< typename std::unordered_set<Key, Hash, Pred>::iterator, bool> emplace( Key key)
{
fc::scoped_lock<fc::mutex> lock(mux);
return std::unordered_set<Key, Hash, Pred>::emplace( key );
}
std::pair< typename std::unordered_set<Key, Hash, Pred>::iterator, bool> insert (const Key& val)
{
fc::scoped_lock<fc::mutex> lock(mux);
return std::unordered_set<Key, Hash, Pred>::insert( val );
}
/// @}
/// Size
/// @{
size_t size() const
{
fc::scoped_lock<fc::mutex> lock(mux);
return std::unordered_set<Key, Hash, Pred>::size();
}
bool empty() const noexcept
{
fc::scoped_lock<fc::mutex> lock(mux);
return std::unordered_set<Key, Hash, Pred>::empty();
}
/// @}
/// Removal
/// @{
void clear() noexcept
{
fc::scoped_lock<fc::mutex> lock(mux);
std::unordered_set<Key, Hash, Pred>::clear();
}
typename std::unordered_set<Key, Hash, Pred>::iterator erase(
typename std::unordered_set<Key, Hash, Pred>::const_iterator itr)
{
fc::scoped_lock<fc::mutex> lock(mux);
return std::unordered_set<Key, Hash, Pred>::erase( itr);
}
size_t erase( const Key& key)
{
fc::scoped_lock<fc::mutex> lock(mux);
return std::unordered_set<Key, Hash, Pred>::erase( key );
}
/// @}
/// Swap
/// @{
void swap( typename std::unordered_set<Key, Hash, Pred>& other ) noexcept
{
fc::scoped_lock<fc::mutex> lock(mux);
std::unordered_set<Key, Hash, Pred>::swap( other );
}
/// @}
/// Iteration
/// @{
typename std::unordered_set<Key, Hash, Pred>::iterator begin() noexcept
{
fc::scoped_lock<fc::mutex> lock(mux);
return std::unordered_set<Key, Hash, Pred>::begin();
}
typename std::unordered_set<Key, Hash, Pred>::const_iterator begin() const noexcept
{
fc::scoped_lock<fc::mutex> lock(mux);
return std::unordered_set<Key, Hash, Pred>::begin();
}
typename std::unordered_set<Key, Hash, Pred>::local_iterator begin(size_t n)
{
fc::scoped_lock<fc::mutex> lock(mux);
return std::unordered_set<Key, Hash, Pred>::begin(n);
}
typename std::unordered_set<Key, Hash, Pred>::const_local_iterator begin(size_t n) const
{
fc::scoped_lock<fc::mutex> lock(mux);
return std::unordered_set<Key, Hash, Pred>::begin(n);
}
typename std::unordered_set<Key, Hash, Pred>::iterator end() noexcept
{
fc::scoped_lock<fc::mutex> lock(mux);
return std::unordered_set<Key, Hash, Pred>::end();
}
typename std::unordered_set<Key, Hash, Pred>::const_iterator end() const noexcept
{
fc::scoped_lock<fc::mutex> lock(mux);
return std::unordered_set<Key, Hash, Pred>::end();
}
typename std::unordered_set<Key, Hash, Pred>::local_iterator end(size_t n)
{
fc::scoped_lock<fc::mutex> lock(mux);
return std::unordered_set<Key, Hash, Pred>::end(n);
}
typename std::unordered_set<Key, Hash, Pred>::const_local_iterator end(size_t n) const
{
fc::scoped_lock<fc::mutex> lock(mux);
return std::unordered_set<Key, Hash, Pred>::end(n);
}
/// @}
/// Search
typename std::unordered_set<Key, Hash, Pred>::const_iterator find(Key key)
{
fc::scoped_lock<fc::mutex> lock(mux);
return std::unordered_set<Key, Hash, Pred>::find(key);
}
};
class blockchain_tied_message_cache
{
private:
@ -483,7 +601,7 @@ namespace graphene { namespace net { namespace detail {
// @{
fc::promise<void>::ptr _retrigger_advertise_inventory_loop_promise;
fc::future<void> _advertise_inventory_loop_done;
std::unordered_set<item_id> _new_inventory; /// list of items we have received but not yet advertised to our peers
concurrent_unordered_set<item_id> _new_inventory; /// list of items we have received but not yet advertised to our peers
// @}
fc::future<void> _terminate_inactive_connections_loop_done;
@ -519,13 +637,13 @@ namespace graphene { namespace net { namespace detail {
/** Stores all connections which have not yet finished key exchange or are still sending initial handshaking messages
* back and forth (not yet ready to initiate syncing) */
std::unordered_set<peer_connection_ptr> _handshaking_connections;
concurrent_unordered_set<peer_connection_ptr> _handshaking_connections;
/** stores fully established connections we're either syncing with or in normal operation with */
std::unordered_set<peer_connection_ptr> _active_connections;
concurrent_unordered_set<peer_connection_ptr> _active_connections;
/** stores connections we've closed (sent closing message, not actually closed), but are still waiting for the remote end to close before we delete them */
std::unordered_set<peer_connection_ptr> _closing_connections;
concurrent_unordered_set<peer_connection_ptr> _closing_connections;
/** stores connections we've closed, but are still waiting for the OS to notify us that the socket is really closed */
std::unordered_set<peer_connection_ptr> _terminating_connections;
concurrent_unordered_set<peer_connection_ptr> _terminating_connections;
boost::circular_buffer<item_hash_t> _most_recent_blocks_accepted; // the /n/ most recent blocks we've accepted (currently tuned to the max number of connections)
@ -854,6 +972,8 @@ namespace graphene { namespace net { namespace detail {
ilog( "cleaning up node" );
_node_is_shutting_down.store(true);
{
fc::scoped_lock<fc::mutex> lock(_active_connections.get_mutex());
for (const peer_connection_ptr& active_peer : _active_connections)
{
fc::optional<fc::ip::endpoint> inbound_endpoint = active_peer->get_endpoint_for_connecting();
@ -867,6 +987,7 @@ namespace graphene { namespace net { namespace detail {
}
}
}
}
try
{
@ -1061,6 +1182,7 @@ namespace graphene { namespace net { namespace detail {
std::set<item_hash_t> sync_items_to_request;
// for each idle peer that we're syncing with
fc::scoped_lock<fc::mutex> lock(_active_connections.get_mutex());
for( const peer_connection_ptr& peer : _active_connections )
{
if( peer->we_need_sync_items_from_peer &&
@ -1119,6 +1241,7 @@ namespace graphene { namespace net { namespace detail {
bool node_impl::is_item_in_any_peers_inventory(const item_id& item) const
{
fc::scoped_lock<fc::mutex> lock(_active_connections.get_mutex());
for( const peer_connection_ptr& peer : _active_connections )
{
if (peer->inventory_peer_advertised_to_us.find(item) != peer->inventory_peer_advertised_to_us.end() )
@ -1158,9 +1281,13 @@ namespace graphene { namespace net { namespace detail {
fetch_messages_to_send_set items_by_peer;
// initialize the fetch_messages_to_send with an empty set of items for all idle peers
for (const peer_connection_ptr& peer : _active_connections)
{
fc::scoped_lock<fc::mutex> lock(_active_connections.get_mutex());
for (const peer_connection_ptr& peer : _active_connections) {
if (peer->idle())
items_by_peer.insert(peer_and_items_to_fetch(peer));
}
}
// now loop over all items we want to fetch
for (auto item_iter = _items_to_fetch.begin(); item_iter != _items_to_fetch.end();)
@ -1262,13 +1389,15 @@ namespace graphene { namespace net { namespace detail {
dlog("beginning an iteration of advertise inventory");
// swap inventory into local variable, clearing the node's copy
std::unordered_set<item_id> inventory_to_advertise;
inventory_to_advertise.swap(_new_inventory);
_new_inventory.swap(inventory_to_advertise);
// process all inventory to advertise and construct the inventory messages we'll send
// first, then send them all in a batch (to avoid any fiber interruption points while
// we're computing the messages)
std::list<std::pair<peer_connection_ptr, item_ids_inventory_message> > inventory_messages_to_send;
{
fc::scoped_lock<fc::mutex> lock(_active_connections.get_mutex());
for (const peer_connection_ptr& peer : _active_connections)
{
// only advertise to peers who are in sync with us
@ -1313,6 +1442,7 @@ namespace graphene { namespace net { namespace detail {
}
peer->clear_old_inventory();
}
}
for (auto iter = inventory_messages_to_send.begin(); iter != inventory_messages_to_send.end(); ++iter)
iter->first->send_message(iter->second);
@ -1360,7 +1490,10 @@ namespace graphene { namespace net { namespace detail {
uint32_t handshaking_timeout = _peer_inactivity_timeout;
fc::time_point handshaking_disconnect_threshold = fc::time_point::now() - fc::seconds(handshaking_timeout);
{
fc::scoped_lock<fc::mutex> lock(_handshaking_connections.get_mutex());
for( const peer_connection_ptr handshaking_peer : _handshaking_connections )
{
if( handshaking_peer->connection_initiation_time < handshaking_disconnect_threshold &&
handshaking_peer->get_last_message_received_time() < handshaking_disconnect_threshold &&
handshaking_peer->get_last_message_sent_time() < handshaking_disconnect_threshold )
@ -1379,6 +1512,8 @@ namespace graphene { namespace net { namespace detail {
("received", handshaking_peer->get_total_bytes_received())));
peers_to_disconnect_forcibly.push_back( handshaking_peer );
}
}
}
// timeout for any active peers is two block intervals
uint32_t active_disconnect_timeout = 10 * _recent_block_interval_in_seconds;
@ -1398,6 +1533,8 @@ namespace graphene { namespace net { namespace detail {
fc::time_point active_disconnect_threshold = fc::time_point::now() - fc::seconds(active_disconnect_timeout);
fc::time_point active_send_keepalive_threshold = fc::time_point::now() - fc::seconds(active_send_keepalive_timeout);
fc::time_point active_ignored_request_threshold = fc::time_point::now() - active_ignored_request_timeout;
{
fc::scoped_lock<fc::mutex> lock(_active_connections.get_mutex());
for( const peer_connection_ptr& active_peer : _active_connections )
{
if( active_peer->connection_initiation_time < active_disconnect_threshold &&
@ -1465,27 +1602,34 @@ namespace graphene { namespace net { namespace detail {
}
}
}
}
fc::time_point closing_disconnect_threshold = fc::time_point::now() - fc::seconds(GRAPHENE_NET_PEER_DISCONNECT_TIMEOUT);
for( const peer_connection_ptr& closing_peer : _closing_connections )
if( closing_peer->connection_closed_time < closing_disconnect_threshold )
{
fc::scoped_lock<fc::mutex> lock(_closing_connections.get_mutex());
for( const peer_connection_ptr& closing_peer : _closing_connections ) {
if (closing_peer->connection_closed_time < closing_disconnect_threshold) {
// we asked this peer to close their connectoin to us at least GRAPHENE_NET_PEER_DISCONNECT_TIMEOUT
// seconds ago, but they haven't done it yet. Terminate the connection now
wlog("Forcibly disconnecting peer ${peer} who failed to close their connection in a timely manner",
("peer", closing_peer->get_remote_endpoint()));
peers_to_disconnect_forcibly.push_back(closing_peer);
}
}
}
uint32_t failed_terminate_timeout_seconds = 120;
fc::time_point failed_terminate_threshold = fc::time_point::now() - fc::seconds(failed_terminate_timeout_seconds);
for (const peer_connection_ptr& peer : _terminating_connections )
if (peer->get_connection_terminated_time() != fc::time_point::min() &&
peer->get_connection_terminated_time() < failed_terminate_threshold)
{
fc::scoped_lock<fc::mutex> lock(_terminating_connections.get_mutex());
for (const peer_connection_ptr& peer : _terminating_connections ) {
if (peer->get_connection_terminated_time() != fc::time_point::min() &&
peer->get_connection_terminated_time() < failed_terminate_threshold) {
wlog("Terminating connection with peer ${peer}, closing the connection didn't work", ("peer", peer->get_remote_endpoint()));
peers_to_terminate.push_back(peer);
}
}
}
// That's the end of the sorting step; now all peers that require further processing are now in one of the
// lists peers_to_disconnect_gently, peers_to_disconnect_forcibly, peers_to_send_keep_alive, or peers_to_terminate
@ -1493,12 +1637,15 @@ namespace graphene { namespace net { namespace detail {
// if we've decided to delete any peers, do it now; in its current implementation this doesn't yield,
// and once we start yielding, we may find that we've moved that peer to another list (closed or active)
// and that triggers assertions, maybe even errors
{
fc::scoped_lock<fc::mutex> lock(_terminating_connections.get_mutex());
for (const peer_connection_ptr& peer : peers_to_terminate )
{
assert(_terminating_connections.find(peer) != _terminating_connections.end());
_terminating_connections.erase(peer);
schedule_peer_for_deletion(peer);
}
}
peers_to_terminate.clear();
// if we're going to abruptly disconnect anyone, do it here
@ -1516,6 +1663,7 @@ namespace graphene { namespace net { namespace detail {
// disconnect reason, so it may yield)
for( const peer_connection_ptr& peer : peers_to_disconnect_gently )
{
fc::scoped_lock<fc::mutex> lock(_active_connections.get_mutex());
fc::exception detailed_error( FC_LOG_MESSAGE(warn, "Disconnecting due to inactivity",
( "last_message_received_seconds_ago", (peer->get_last_message_received_time() - fc::time_point::now() ).count() / fc::seconds(1 ).count() )
( "last_message_sent_seconds_ago", (peer->get_last_message_sent_time() - fc::time_point::now() ).count() / fc::seconds(1 ).count() )
@ -1539,6 +1687,7 @@ namespace graphene { namespace net { namespace detail {
{
VERIFY_CORRECT_THREAD();
fc::scoped_lock<fc::mutex> lock(_active_connections.get_mutex());
std::list<peer_connection_ptr> original_active_peers(_active_connections.begin(), _active_connections.end());
for( const peer_connection_ptr& active_peer : original_active_peers )
{
@ -1710,12 +1859,19 @@ namespace graphene { namespace net { namespace detail {
peer_connection_ptr node_impl::get_peer_by_node_id(const node_id_t& node_id)
{
{
fc::scoped_lock<fc::mutex> lock(_active_connections.get_mutex());
for (const peer_connection_ptr& active_peer : _active_connections)
if (node_id == active_peer->node_id)
return active_peer;
}
{
fc::scoped_lock<fc::mutex> lock(_handshaking_connections.get_mutex());
for (const peer_connection_ptr& handshaking_peer : _handshaking_connections)
if (node_id == handshaking_peer->node_id)
return handshaking_peer;
}
return peer_connection_ptr();
}
@ -1727,18 +1883,25 @@ namespace graphene { namespace net { namespace detail {
dlog("is_already_connected_to_id returning true because the peer is us");
return true;
}
for (const peer_connection_ptr active_peer : _active_connections)
if (node_id == active_peer->node_id)
{
fc::scoped_lock<fc::mutex> lock(_active_connections.get_mutex());
for (const peer_connection_ptr active_peer : _active_connections) {
if (node_id == active_peer->node_id) {
dlog("is_already_connected_to_id returning true because the peer is already in our active list");
return true;
}
for (const peer_connection_ptr handshaking_peer : _handshaking_connections)
if (node_id == handshaking_peer->node_id)
}
}
{
fc::scoped_lock<fc::mutex> lock(_handshaking_connections.get_mutex());
for (const peer_connection_ptr handshaking_peer : _handshaking_connections) {
if (node_id == handshaking_peer->node_id) {
dlog("is_already_connected_to_id returning true because the peer is already in our handshaking list");
return true;
}
}
}
return false;
}
@ -1770,6 +1933,8 @@ namespace graphene { namespace net { namespace detail {
("max", _maximum_number_of_connections));
dlog(" my id is ${id}", ("id", _node_id));
{
fc::scoped_lock<fc::mutex> lock(_active_connections.get_mutex());
for (const peer_connection_ptr& active_connection : _active_connections)
{
dlog(" active: ${endpoint} with ${id} [${direction}]",
@ -1777,6 +1942,9 @@ namespace graphene { namespace net { namespace detail {
("id", active_connection->node_id)
("direction", active_connection->direction));
}
}
{
fc::scoped_lock<fc::mutex> lock(_handshaking_connections.get_mutex());
for (const peer_connection_ptr& handshaking_connection : _handshaking_connections)
{
dlog(" handshaking: ${endpoint} with ${id} [${direction}]",
@ -1785,6 +1953,7 @@ namespace graphene { namespace net { namespace detail {
("direction", handshaking_connection->direction));
}
}
}
void node_impl::on_message( peer_connection* originating_peer, const message& received_message )
{
@ -2229,6 +2398,7 @@ namespace graphene { namespace net { namespace detail {
if (!_peer_advertising_disabled)
{
reply.addresses.reserve(_active_connections.size());
fc::scoped_lock<fc::mutex> lock(_active_connections.get_mutex());
for (const peer_connection_ptr& active_peer : _active_connections)
{
fc::optional<potential_peer_record> updated_peer_record = _potential_peer_db.lookup_entry_for_endpoint(*active_peer->get_remote_endpoint());
@ -2414,12 +2584,15 @@ namespace graphene { namespace net { namespace detail {
{
VERIFY_CORRECT_THREAD();
uint32_t max_number_of_unfetched_items = 0;
{
fc::scoped_lock<fc::mutex> lock(_active_connections.get_mutex());
for( const peer_connection_ptr& peer : _active_connections )
{
uint32_t this_peer_number_of_unfetched_items = (uint32_t)peer->ids_of_items_to_get.size() + peer->number_of_unfetched_item_ids;
max_number_of_unfetched_items = std::max(max_number_of_unfetched_items,
this_peer_number_of_unfetched_items);
}
}
return max_number_of_unfetched_items;
}
@ -2633,17 +2806,19 @@ namespace graphene { namespace net { namespace detail {
originating_peer->ids_of_items_to_get.empty())
{
bool is_first_item_for_other_peer = false;
for (const peer_connection_ptr& peer : _active_connections)
{
fc::scoped_lock<fc::mutex> lock(_active_connections.get_mutex());
for (const peer_connection_ptr& peer : _active_connections) {
if (peer != originating_peer->shared_from_this() &&
!peer->ids_of_items_to_get.empty() &&
peer->ids_of_items_to_get.front() == blockchain_item_ids_inventory_message_received.item_hashes_available.front())
{
peer->ids_of_items_to_get.front() == blockchain_item_ids_inventory_message_received.item_hashes_available.front()) {
dlog("The item ${newitem} is the first item for peer ${peer}",
("newitem", blockchain_item_ids_inventory_message_received.item_hashes_available.front())
("peer", peer->get_remote_endpoint()));
("newitem", blockchain_item_ids_inventory_message_received.item_hashes_available.front())("peer", peer->get_remote_endpoint()));
is_first_item_for_other_peer = true;
break;
}
}
}
dlog("is_first_item_for_other_peer: ${is_first}. item_hashes_received.size() = ${size}",
("is_first", is_first_item_for_other_peer)("size", item_hashes_received.size()));
if (!is_first_item_for_other_peer)
@ -2933,6 +3108,8 @@ namespace graphene { namespace net { namespace detail {
item_id advertised_item_id(item_ids_inventory_message_received.item_type, item_hash);
bool we_advertised_this_item_to_a_peer = false;
bool we_requested_this_item_from_a_peer = false;
{
fc::scoped_lock<fc::mutex> lock(_active_connections.get_mutex());
for (const peer_connection_ptr peer : _active_connections)
{
if (peer->inventory_advertised_to_peer.find(advertised_item_id) != peer->inventory_advertised_to_peer.end())
@ -2943,6 +3120,7 @@ namespace graphene { namespace net { namespace detail {
if (peer->items_requested_from_peer.find(advertised_item_id) != peer->items_requested_from_peer.end())
we_requested_this_item_from_a_peer = true;
}
}
// if we have already advertised it to a peer, we must have it, no need to do anything else
if (!we_advertised_this_item_to_a_peer)
@ -3172,6 +3350,7 @@ namespace graphene { namespace net { namespace detail {
};
bool is_fork_block = is_hard_fork_block(block_message_to_send.block.block_num());
fc::scoped_lock<fc::mutex> lock(_active_connections.get_mutex());
for (const peer_connection_ptr& peer : _active_connections)
{
ASSERT_TASK_NOT_PREEMPTED(); // don't yield while iterating over _active_connections
@ -3254,6 +3433,7 @@ namespace graphene { namespace net { namespace detail {
else
{
// invalid message received
fc::scoped_lock<fc::mutex> lock(_active_connections.get_mutex());
for (const peer_connection_ptr& peer : _active_connections)
{
ASSERT_TASK_NOT_PREEMPTED(); // don't yield while iterating over _active_connections
@ -3356,6 +3536,8 @@ namespace graphene { namespace net { namespace detail {
// find out if this block is the next block on the active chain or one of the forks
bool potential_first_block = false;
{
fc::scoped_lock<fc::mutex> lock(_active_connections.get_mutex());
for (const peer_connection_ptr& peer : _active_connections)
{
ASSERT_TASK_NOT_PREEMPTED(); // don't yield while iterating over _active_connections
@ -3367,6 +3549,7 @@ namespace graphene { namespace net { namespace detail {
peer->ids_of_items_being_processed.insert(received_block_iter->block_id);
}
}
}
// if it is, process it, remove it from all sync peers lists
if (potential_first_block)
@ -3392,6 +3575,7 @@ namespace graphene { namespace net { namespace detail {
{
dlog("Already received and accepted this block (presumably through normal inventory mechanism), treating it as accepted");
std::vector< peer_connection_ptr > peers_needing_next_batch;
fc::scoped_lock<fc::mutex> lock(_active_connections.get_mutex());
for (const peer_connection_ptr& peer : _active_connections)
{
auto items_being_processed_iter = peer->ids_of_items_being_processed.find(received_block_iter->block_id);
@ -3527,6 +3711,8 @@ namespace graphene { namespace net { namespace detail {
fc::time_point_sec block_time = block_message_to_process.block.timestamp;
bool disconnect_this_peer = false;
{
fc::scoped_lock<fc::mutex> lock(_active_connections.get_mutex());
for (const peer_connection_ptr& peer : _active_connections)
{
ASSERT_TASK_NOT_PREEMPTED(); // don't yield while iterating over _active_connections
@ -3543,10 +3729,14 @@ namespace graphene { namespace net { namespace detail {
}
peer->clear_old_inventory();
}
}
message_propagation_data propagation_data{message_receive_time, message_validated_time, originating_peer->node_id};
broadcast( block_message_to_process, propagation_data );
_message_cache.block_accepted();
{
fc::scoped_lock<fc::mutex> lock(_active_connections.get_mutex());
for (const peer_connection_ptr& peer : _active_connections)
{
if (is_hard_fork_block(block_number) )
@ -3578,6 +3768,7 @@ namespace graphene { namespace net { namespace detail {
#endif
}
}
}
if(rejecting_block_due_hf)
{
@ -3614,10 +3805,12 @@ namespace graphene { namespace net { namespace detail {
disconnect_reason = "You offered me a block that I have deemed to be invalid";
peers_to_disconnect.insert( originating_peer->shared_from_this() );
for (const peer_connection_ptr& peer : _active_connections)
fc::scoped_lock<fc::mutex> lock(_active_connections.get_mutex());
for (const peer_connection_ptr& peer : _active_connections) {
if (!peer->ids_of_items_to_get.empty() && peer->ids_of_items_to_get.front() == block_message_to_process.block_id)
peers_to_disconnect.insert(peer);
}
}
if (restart_sync_exception)
{
@ -3737,6 +3930,8 @@ namespace graphene { namespace net { namespace detail {
void node_impl::forward_firewall_check_to_next_available_peer(firewall_check_state_data* firewall_check_state)
{
{
fc::scoped_lock<fc::mutex> lock(_active_connections.get_mutex());
for (const peer_connection_ptr& peer : _active_connections)
{
if (firewall_check_state->expected_node_id != peer->node_id && // it's not the node who is asking us to test
@ -3756,6 +3951,8 @@ namespace graphene { namespace net { namespace detail {
return;
}
}
}
wlog("Unable to forward firewall check for node ${to_check} to any other peers, returning 'unable'",
("to_check", firewall_check_state->endpoint_to_test));
@ -3928,6 +4125,8 @@ namespace graphene { namespace net { namespace detail {
}
fc::time_point now = fc::time_point::now();
{
fc::scoped_lock<fc::mutex> lock(_active_connections.get_mutex());
for (const peer_connection_ptr& peer : _active_connections)
{
ASSERT_TASK_NOT_PREEMPTED(); // don't yield while iterating over _active_connections
@ -3963,6 +4162,8 @@ namespace graphene { namespace net { namespace detail {
data_for_this_peer.user_data = user_data;
reply.current_connections.emplace_back(data_for_this_peer);
}
}
originating_peer->send_message(reply);
}
@ -4047,6 +4248,7 @@ namespace graphene { namespace net { namespace detail {
void node_impl::start_synchronizing()
{
fc::scoped_lock<fc::mutex> lock(_active_connections.get_mutex());
for( const peer_connection_ptr& peer : _active_connections )
start_synchronizing_with_peer( peer );
}
@ -4253,9 +4455,19 @@ namespace graphene { namespace net { namespace detail {
// the read loop before it gets an EOF).
// operate off copies of the lists in case they change during iteration
std::list<peer_connection_ptr> all_peers;
boost::push_back(all_peers, _active_connections);
boost::push_back(all_peers, _handshaking_connections);
boost::push_back(all_peers, _closing_connections);
auto p_back = [&all_peers](const peer_connection_ptr& conn) { all_peers.push_back(conn); };
{
fc::scoped_lock<fc::mutex> lock(_active_connections.get_mutex());
std::for_each(_active_connections.begin(), _active_connections.end(), p_back);
}
{
fc::scoped_lock<fc::mutex> lock(_handshaking_connections.get_mutex());
std::for_each(_handshaking_connections.begin(), _handshaking_connections.end(), p_back);
}
{
fc::scoped_lock<fc::mutex> lock(_closing_connections.get_mutex());
std::for_each(_closing_connections.begin(), _closing_connections.end(), p_back);
}
for (const peer_connection_ptr& peer : all_peers)
{
@ -4521,9 +4733,7 @@ namespace graphene { namespace net { namespace detail {
// whether the peer is firewalled, we want to disconnect now.
_handshaking_connections.erase(new_peer);
_terminating_connections.erase(new_peer);
assert(_active_connections.find(new_peer) == _active_connections.end());
_active_connections.erase(new_peer);
assert(_closing_connections.find(new_peer) == _closing_connections.end());
_closing_connections.erase(new_peer);
display_current_connections();
@ -4867,18 +5077,25 @@ namespace graphene { namespace net { namespace detail {
peer_connection_ptr node_impl::get_connection_to_endpoint( const fc::ip::endpoint& remote_endpoint )
{
VERIFY_CORRECT_THREAD();
{
fc::scoped_lock<fc::mutex> lock(_active_connections.get_mutex());
for( const peer_connection_ptr& active_peer : _active_connections )
{
fc::optional<fc::ip::endpoint> endpoint_for_this_peer( active_peer->get_remote_endpoint() );
if( endpoint_for_this_peer && *endpoint_for_this_peer == remote_endpoint )
return active_peer;
}
}
{
fc::scoped_lock<fc::mutex> lock(_handshaking_connections.get_mutex());
for( const peer_connection_ptr& handshaking_peer : _handshaking_connections )
{
fc::optional<fc::ip::endpoint> endpoint_for_this_peer( handshaking_peer->get_remote_endpoint() );
if( endpoint_for_this_peer && *endpoint_for_this_peer == remote_endpoint )
return handshaking_peer;
}
}
return peer_connection_ptr();
}
@ -4922,6 +5139,8 @@ namespace graphene { namespace net { namespace detail {
ilog( " number of peers: ${active} active, ${handshaking}, ${closing} closing. attempting to maintain ${desired} - ${maximum} peers",
( "active", _active_connections.size() )("handshaking", _handshaking_connections.size() )("closing",_closing_connections.size() )
( "desired", _desired_number_of_connections )("maximum", _maximum_number_of_connections ) );
{
fc::scoped_lock<fc::mutex> lock(_active_connections.get_mutex());
for( const peer_connection_ptr& peer : _active_connections )
{
ilog( " active peer ${endpoint} peer_is_in_sync_with_us:${in_sync_with_us} we_are_in_sync_with_peer:${in_sync_with_them}",
@ -4933,11 +5152,15 @@ namespace graphene { namespace net { namespace detail {
ilog( " we are not fetching sync blocks from the above peer (inhibit_fetching_sync_blocks == true)" );
}
}
{
fc::scoped_lock<fc::mutex> lock(_handshaking_connections.get_mutex());
for( const peer_connection_ptr& peer : _handshaking_connections )
{
ilog( " handshaking peer ${endpoint} in state ours(${our_state}) theirs(${their_state})",
( "endpoint", peer->get_remote_endpoint() )("our_state", peer->our_state )("their_state", peer->their_state ) );
}
}
ilog( "--------- MEMORY USAGE ------------" );
ilog( "node._active_sync_requests size: ${size}", ("size", _active_sync_requests.size() ) );
@ -4946,6 +5169,7 @@ namespace graphene { namespace net { namespace detail {
ilog( "node._items_to_fetch size: ${size}", ("size", _items_to_fetch.size() ) );
ilog( "node._new_inventory size: ${size}", ("size", _new_inventory.size() ) );
ilog( "node._message_cache size: ${size}", ("size", _message_cache.size() ) );
fc::scoped_lock<fc::mutex> lock(_active_connections.get_mutex());
for( const peer_connection_ptr& peer : _active_connections )
{
ilog( " peer ${endpoint}", ("endpoint", peer->get_remote_endpoint() ) );
@ -5043,6 +5267,7 @@ namespace graphene { namespace net { namespace detail {
{
VERIFY_CORRECT_THREAD();
std::vector<peer_status> statuses;
fc::scoped_lock<fc::mutex> lock(_active_connections.get_mutex());
for (const peer_connection_ptr& peer : _active_connections)
{
ASSERT_TASK_NOT_PREEMPTED(); // don't yield while iterating over _active_connections
@ -5233,10 +5458,13 @@ namespace graphene { namespace net { namespace detail {
_allowed_peers.clear();
_allowed_peers.insert(allowed_peers.begin(), allowed_peers.end());
std::list<peer_connection_ptr> peers_to_disconnect;
if (!_allowed_peers.empty())
for (const peer_connection_ptr& peer : _active_connections)
if (!_allowed_peers.empty()) {
fc::scoped_lock<fc::mutex> lock(_active_connections.get_mutex());
for (const peer_connection_ptr &peer : _active_connections) {
if (_allowed_peers.find(peer->node_id) == _allowed_peers.end())
peers_to_disconnect.push_back(peer);
}
}
for (const peer_connection_ptr& peer : peers_to_disconnect)
disconnect_from_peer(peer.get(), "My allowed_peers list has changed, and you're no longer allowed. Bye.");
#endif // ENABLE_P2P_DEBUGGING_API